import os, pandas as pd from datetime import datetime from sqlalchemy import create_engine, text from functools import reduce # ---------- 配置 ---------- DB_USER = "whu" DB_PASS = "09093f4e6b33ddd" DB_HOST = "222.130.26.206" DB_PORT = 4000 # 时间配置 START_TIME = datetime(2026, 1, 22, 0, 0, 0) END_TIME = datetime(2026, 3, 15, 6, 0, 0) BOUNDARY = datetime(2025, 3, 25, 0, 0, 0) DELETE_PERIODS = [ ("2024-04-24 13:42:00", "2024-04-24 18:26:00"), ("2024-11-09 12:34:00", "2024-11-11 10:46:00"), ("2024-12-12 08:52:00", "2024-12-12 17:22:00"), ("2024-12-15 16:00:00", "2024-12-16 09:34:00"), ("2025-01-28 10:58:00", "2025-02-05 11:24:00"), ] # 来自张昊师兄的,需要被去除的黑名单区间 DELETE_PERIODS = [(pd.to_datetime(s), pd.to_datetime(e)) for s, e in DELETE_PERIODS] # ---------- 传感器配置---------- # 机组编号 UNITS = [1, 2] BASE_VARIABLES = [ "ns=3;s={}#UF_JSFLOW_O", # 进水流量 "ns=3;s={}#UF_JSPRESS_O", # 进水压力 "ns=3;s=UF{}_SSD_KMYC", # 跨膜压差 "ns=3;s=UF{}_STEP", # 步序/控制字 "ns=3;s=ZZ_{}#UFBWB_POWER" # 反洗泵功率 ] SYSTEM_VARIABLES = [ "ns=3;s=ZJS_TEMP_O", # 进水温度 "ns=3;s=RO_JSORP_O", # 总产水ORP "ns=3;s=RO_JSPH_O", # 总产水PH "ns=3;s=RO_JSDD_O", # 总产水电导 "ns=3;s=CN_LEVEL_O", # 次钠液位 "ns=3;s=S_LEVEL_O", # 酸液位 "ns=3;s=J_LEVEL_O", # 碱液位 "ns=3;s=ZZ_UFGSB_POWER", # 超滤供水泵功率 ] # 输出目录 BASE_OUTPUT_DIR = "../datasets/UF_yancheng_data" PROCESSED_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "raw") # 创建目录 os.makedirs(PROCESSED_OUTPUT_DIR, exist_ok=True) # 生成所有变量名称 SENSOR_NAMES = [] for unit in UNITS: for var_template in BASE_VARIABLES: SENSOR_NAMES.append(var_template.format(unit)) SENSOR_NAMES.extend(SYSTEM_VARIABLES) print(f"总共查询 {len(SENSOR_NAMES)} 个变量") for i, var in enumerate(SENSOR_NAMES, 1): print(f"{i:2d}. {var}") # ---------- 创建数据库引擎 ---------- def create_db_engines(): """ 创建数据库引擎,每个数据库只创建一次 """ try: # 为两个数据库分别创建引擎 engine_test = create_engine( f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/ws_data_test?charset=utf8mb4" ) engine_prod = create_engine( f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/ws_data?charset=utf8mb4" ) # 测试连接 with engine_test.connect() as conn: print("✅ 测试数据库连接成功!") with engine_prod.connect() as conn: print("✅ 生产数据库连接成功!") return engine_test, engine_prod except Exception as e: print(f"❌ 数据库连接失败: {e}") return None, None # ---------- 一分钟聚合查询函数(子查询方式)---------- def fetch_valve_aggregated(name, start, end, engine, interval_minutes=1): """ 从数据库获取传感器数据并按分钟聚合,时间戳对齐到分钟开始(00秒) """ interval_seconds = interval_minutes * 60 sql = text(f""" SELECT FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(MIN(h_time)) / {interval_seconds}) * {interval_seconds}) AS time, AVG(val) AS val FROM ( SELECT h_time, val, FLOOR(UNIX_TIMESTAMP(h_time) / {interval_seconds}) AS time_group FROM dc_item_history_data_1497 WHERE item_name = :name AND h_time BETWEEN :st AND :et AND val IS NOT NULL ) t GROUP BY t.time_group ORDER BY time """) try: df = pd.read_sql(sql, engine, params={"name": name, "st": start, "et": end}) if not df.empty: df['time'] = pd.to_datetime(df['time']) # 确保时间戳是整分钟(去除秒和微秒) df['time'] = df['time'].dt.floor('1min') print(f" ✓ {name}: {len(df)} 条记录") return df except Exception as e: print(f" ✗ {name} 查询失败: {str(e)}") return pd.DataFrame() def fetch_special_data(sensor, start, end, boundary, engine_test, engine_prod): """ 专门获取步序数据,保持原始变化点 返回原始时间戳和值 """ sql = text(""" SELECT h_time AS time, val FROM dc_item_history_data_1497 WHERE item_name = :name AND h_time BETWEEN :st AND :et AND val IS NOT NULL ORDER BY h_time """) try: # 根据边界时间选择合适的数据库引擎 if end <= boundary: df = pd.read_sql(sql, engine_test, params={"name": sensor, "st": start, "et": end}) elif start >= boundary: df = pd.read_sql(sql, engine_prod, params={"name": sensor, "st": start, "et": end}) else: df1 = pd.read_sql(sql, engine_test, params={"name": sensor, "st": start, "et": boundary}) df2 = pd.read_sql(sql, engine_prod, params={"name": sensor, "st": boundary, "et": end}) df = pd.concat([df1, df2], ignore_index=True) if not df.empty: df['time'] = pd.to_datetime(df['time']) return df except Exception as e: print(f" ✗ {sensor} 查询失败: {str(e)}") return pd.DataFrame() # ---------- 传感器数据查询函数(只获取聚合数据)---------- # ---------- 传感器数据查询函数(只获取聚合数据)---------- def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test, engine_prod): """ 获取多个传感器的分钟级聚合数据并合并为宽表 步序变量单独处理 """ # 识别步序变量和功率变量 step_vars = [v for v in sensor_names if 'STEP' in v] power_vars = [v for v in sensor_names if 'POWER' in v] # 需要特殊处理的变量 special_vars = step_vars + power_vars # 其他连续变量 continuous_vars = [v for v in sensor_names if v not in special_vars] print(f"\n识别到 {len(special_vars)} 个离散变量, {len(continuous_vars)} 个连续变量") # 3. 创建完整的时间网格(整分钟)- 先创建 print(f"\n创建完整时间网格...") time_grid = pd.date_range( start=start_time.replace(second=0, microsecond=0), end=end_time.replace(second=0, microsecond=0), freq='1min' ) # 创建以时间为索引的DataFrame merged_df = pd.DataFrame(index=time_grid) print(f"时间网格: {len(time_grid)} 个时间点") # 1. 处理连续变量(按分钟聚合,时间对齐到整分钟) if continuous_vars: print("\n获取连续变量数据(分钟平均):") for sensor in continuous_vars: try: # 根据边界时间选择合适的数据库引擎 if end_time <= boundary: df = fetch_valve_aggregated(sensor, start_time, end_time, engine_test) elif start_time >= boundary: df = fetch_valve_aggregated(sensor, start_time, end_time, engine_prod) else: df1 = fetch_valve_aggregated(sensor, start_time, boundary, engine_test) df2 = fetch_valve_aggregated(sensor, boundary, end_time, engine_prod) df = pd.concat([df1, df2], ignore_index=True) if not df.empty: # 确保时间戳是整分钟并设为索引 df['time'] = pd.to_datetime(df['time']).dt.floor('1min') df = df.drop_duplicates(subset=['time'], keep='first') df = df.set_index('time') # 添加到合并DataFrame merged_df[sensor] = df['val'] print(f" ✓ {sensor}: {len(df)} 条记录") else: print(f" ⚠ {sensor}: 无数据") except Exception as e: print(f" ⚠ {sensor}: 处理失败 - {str(e)}") continue # 2. 处理离散变量(保持原始变化点) if special_vars: print("\n获取离散变量数据(原始变化点):") for sensor in special_vars: try: # 获取原始数据(不聚合) df = fetch_special_data(sensor, start_time, end_time, boundary, engine_test, engine_prod) if not df.empty: # 创建分钟级的重采样 df['time'] = pd.to_datetime(df['time']) df = df.set_index('time') # 重采样到分钟,对于离散变量使用前向填充 df_resampled = df.resample('1min').ffill() # 添加到合并DataFrame merged_df[sensor] = df_resampled['val'] print(f" ✓ {sensor}: {len(df)} 个原始点 -> {len(df_resampled)} 个分钟点") else: print(f" ⚠ {sensor}: 无数据") except Exception as e: print(f" ⚠ {sensor}: 处理失败 - {str(e)}") continue if merged_df.empty or len(merged_df.columns) == 0: print("\n❌ 未获取到任何传感器数据") return pd.DataFrame() # 重置索引,将时间变为列 merged_df = merged_df.reset_index() merged_df = merged_df.rename(columns={'index': 'time'}) print(f"\n合并完成,共 {len(merged_df)} 条时间记录 × {len(merged_df.columns) - 1} 个传感器") print(f"数据框形状: {merged_df.shape}") # 5. 删除黑名单时段 print("\n删除黑名单时段...") original_len = len(merged_df) for s, e in DELETE_PERIODS: s = pd.to_datetime(s).floor('1min') e = pd.to_datetime(e).floor('1min') merged_df = merged_df[(merged_df['time'] < s) | (merged_df['time'] > e)] deleted_count = original_len - len(merged_df) if deleted_count > 0: print(f"已删除 {deleted_count} 条黑名单时段数据") return merged_df # ---------- 数据后处理函数(填充空值)---------- def post_process_data(df, continuous_vars, step_vars): """ 对聚合后的数据进行后处理 连续变量:线性插值或前后填充 步序变量:已经前向填充,只需处理开头可能存在的空值 """ if df.empty: print("警告:输入数据为空") return df df_processed = df.copy() df_processed['time'] = pd.to_datetime(df_processed['time']) df_processed = df_processed.set_index('time') print(f"\n开始填充空值...") print(f"数据时间范围: {df_processed.index.min()} 到 {df_processed.index.max()}") print(f"数据行数: {len(df_processed)}") missing_before = df_processed.isnull().sum().sum() print(f"填充前空值数量: {missing_before}") # 处理连续变量(使用线性插值更适合连续物理量) for var in continuous_vars: if var in df_processed.columns: # 先线性插值,再前后填充边界 df_processed[var] = df_processed[var].interpolate(method='linear', limit_direction='both') # 处理步序变量(可能开头有NaN,用后向填充) for var in step_vars: if var in df_processed.columns: df_processed[var] = df_processed[var].bfill() # 开头空值用第一个有效值填充 missing_after = df_processed.isnull().sum().sum() print(f"填充后空值数量: {missing_after}") print(f"填充了 {missing_before - missing_after} 个空值") return df_processed.reset_index() # ---------- 数据分块保存函数 ---------- def save_data_chunks(df, output_dir, prefix="sensor_data", chunk_size=200000): """ 将数据集分块保存,命名格式为 {prefix}_partX_of_Y.csv """ total_rows = len(df) num_chunks = (total_rows + chunk_size - 1) // chunk_size # 向上取整 print(f"\n开始分块保存数据(共 {num_chunks} 个文件)...") for i in range(num_chunks): start_idx = i * chunk_size end_idx = min((i + 1) * chunk_size, total_rows) chunk_df = df.iloc[start_idx:end_idx] if not chunk_df.empty: file_path = os.path.join( output_dir, f"{prefix}_part{i + 1}_of_{num_chunks}.csv" ) chunk_df.to_csv(file_path, index=False, encoding='utf_8_sig') print(f" ✓ 已保存第 {i + 1}/{num_chunks} 个文件: {os.path.basename(file_path)} ({len(chunk_df)} 行)") return num_chunks # ---------- 主程序 ---------- if __name__ == "__main__": print("=" * 70) print("传感器数据采集程序 - 分钟级聚合版本") print("=" * 70) # 1. 创建数据库引擎 print("\n[1/4] 创建数据库连接...") engine_test, engine_prod = create_db_engines() if engine_test is None or engine_prod is None: print("❌ 数据库连接失败,程序退出") exit(1) # 2. 获取一分钟聚合数据 print(f"\n[2/4] 获取一分钟聚合数据...") print(f"时间范围: {START_TIME} 到 {END_TIME}") agg_df = fetch_sensor_data( SENSOR_NAMES, START_TIME, END_TIME, BOUNDARY, engine_test, engine_prod ) if agg_df.empty: print("\n❌ 未获取到任何聚合数据,程序退出") exit(1) print(f"\n✅ 聚合数据获取完成!") print(f"总数据量: {len(agg_df)} 条记录") print(f"时间范围: {agg_df['time'].min()} 到 {agg_df['time'].max()}") print(f"时间间隔: 1分钟(整点)") print(f"传感器数量: {len(agg_df.columns) - 1} 个") # 识别步序变量 step_vars = [col for col in agg_df.columns if 'STEP' in col] continuous_vars = [col for col in agg_df.columns if col != 'time' and col not in step_vars] # 3. 后处理数据 print(f"\n[3/4] 后处理聚合数据...") processed_df = post_process_data(agg_df, continuous_vars, step_vars) print(f"\n✅ 后处理完成!") print(f"处理后数据行数: {len(processed_df)}") # 4. 保存数据(分块保存) print(f"\n[4/4] 保存数据...") # 直接使用分块保存,不再保存单个大文件 chunk_size = 200000 # 每块20万行 num_chunks = save_data_chunks( processed_df, PROCESSED_OUTPUT_DIR, "uf_all_units_processed_1min", chunk_size ) print("\n" + "=" * 70) print("✅ 所有任务完成!") print("=" * 70) # 显示文件信息 print(f"\n文件信息:") print(f"输出目录: {PROCESSED_OUTPUT_DIR}") print(f"文件数量: {num_chunks} 个") # 计算总大小 total_size = 0 for i in range(num_chunks): file_path = os.path.join( PROCESSED_OUTPUT_DIR, f"uf_all_units_processed_1min_part{i + 1}_of_{num_chunks}.csv" ) if os.path.exists(file_path): file_size = os.path.getsize(file_path) / (1024 * 1024) total_size += file_size print(f" {os.path.basename(file_path)}: {file_size:.2f} MB") print(f"总文件大小: {total_size:.2f} MB")