import os, pandas as pd from datetime import datetime from sqlalchemy import create_engine, text from functools import reduce # ---------- 配置 ---------- DB_USER = "whu" DB_PASS = "09093f4e6b33ddd" DB_HOST = "222.130.26.206" DB_PORT = 4000 # 时间配置 START_TIME = datetime(2025, 11, 28, 0, 0, 0) END_TIME = datetime(2026, 2, 20, 0, 0, 0) BOUNDARY = datetime(2025, 3, 25, 0, 0, 0) DELETE_PERIODS = [ ("2024-04-24 13:42:00", "2024-04-24 18:26:00"), ("2024-11-09 12:34:00", "2024-11-11 10:46:00"), ("2024-12-12 08:52:00", "2024-12-12 17:22:00"), ("2024-12-15 16:00:00", "2024-12-16 09:34:00"), ("2025-01-28 10:58:00", "2025-02-05 11:24:00"), ] # 来自张昊师兄的,需要被去除的黑名单区间 DELETE_PERIODS = [(pd.to_datetime(s), pd.to_datetime(e)) for s, e in DELETE_PERIODS] # 新水岛 # ---------- 传感器配置(新水厂系统) ---------- # 机组编号 UNITS = [1, 2] # 新系统变量模板 BASE_VARIABLES = [ "ns=3;s={}#UF_JSFLOW_O", # 进水流量 "ns=3;s={}#UF_JSPRESS_O", # 进水压力(如有) "ns=3;s=UF{}_SSD_KMYC", # 跨膜压差(如有) "ns=3;s=UF{}_STEP" # 步序 ] # 系统总变量(如果存在) SYSTEM_VARIABLES = [ "ns=3;s=ZJS_TEMP_O", # 进水温度 "ns=3;s=RO_JSORP_O", # 总产水ORP(示例) "ns=3;s=RO_JSPH_O", # 总产水PH(示例) ] # 生成所有变量名称 SENSOR_NAMES = [] for unit in UNITS: for var_template in BASE_VARIABLES: SENSOR_NAMES.append(var_template.format(unit)) SENSOR_NAMES.extend(SYSTEM_VARIABLES) print(f"总共查询 {len(SENSOR_NAMES)} 个变量") for i, var in enumerate(SENSOR_NAMES, 1): print(f"{i:2d}. {var}") # 输出目录 - 只需要processed文件夹 BASE_OUTPUT_DIR = "../datasets/UF_longting_data" PROCESSED_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "raw") # 创建目录 os.makedirs(PROCESSED_OUTPUT_DIR, exist_ok=True) # ---------- 创建数据库引擎 ---------- def create_db_engines(): """ 创建数据库引擎,每个数据库只创建一次 """ try: # 为两个数据库分别创建引擎 engine_test = create_engine( f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/ws_data_test?charset=utf8mb4" ) engine_prod = create_engine( f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/ws_data?charset=utf8mb4" ) # 测试连接 with engine_test.connect() as conn: print("✅ 测试数据库连接成功!") with engine_prod.connect() as conn: print("✅ 生产数据库连接成功!") return engine_test, engine_prod except Exception as e: print(f"❌ 数据库连接失败: {e}") return None, None # ---------- 一分钟聚合查询函数(子查询方式)---------- def fetch_valve_aggregated(name, start, end, engine, interval_minutes=1): """ 从数据库获取传感器数据并直接进行时间聚合 使用子查询方式避免 ONLY_FULL_GROUP_BY 问题 """ interval_seconds = interval_minutes * 60 sql = text(f""" SELECT MIN(h_time) AS time, AVG(val) AS val FROM ( SELECT h_time, val, FLOOR(UNIX_TIMESTAMP(h_time) / {interval_seconds}) AS time_group FROM dc_item_history_data_1450 WHERE item_name = :name AND h_time BETWEEN :st AND :et AND val IS NOT NULL ) t GROUP BY t.time_group ORDER BY time """) try: df = pd.read_sql(sql, engine, params={"name": name, "st": start, "et": end}) if not df.empty: df['time'] = pd.to_datetime(df['time']) print(f" ✓ {name}: {len(df)} 条记录") return df except Exception as e: print(f" ✗ {name} 查询失败: {str(e)}") return pd.DataFrame() # ---------- 传感器数据查询函数(只获取聚合数据)---------- def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test, engine_prod): """ 获取多个传感器的分钟级聚合数据并合并为宽表 参数: sensor_names: 传感器名称列表 start_time: 开始时间 end_time: 结束时间 boundary: 数据库切换边界 engine_test: 测试数据库引擎 engine_prod: 生产数据库引擎 """ all_data = [] # 只存储聚合数据 print("\n开始获取各传感器数据:") for sensor in sensor_names: try: # 根据边界时间选择合适的数据库引擎 if end_time <= boundary: # 全部在测试数据库 df = fetch_valve_aggregated(sensor, start_time, end_time, engine_test) elif start_time >= boundary: # 全部在生产数据库 df = fetch_valve_aggregated(sensor, start_time, end_time, engine_prod) else: # 跨越两个数据库 df1 = fetch_valve_aggregated(sensor, start_time, boundary, engine_test) df2 = fetch_valve_aggregated(sensor, boundary, end_time, engine_prod) df = pd.concat([df1, df2], ignore_index=True) if not df.empty: # 重命名列以区分不同传感器 df_renamed = df.rename(columns={'val': sensor}) all_data.append(df_renamed[['time', sensor]]) else: print(f" ⚠ {sensor}: 无数据") except Exception as e: print(f" ⚠ {sensor}: 处理失败 - {str(e)}") continue if not all_data: print("\n❌ 未获取到任何传感器数据") return pd.DataFrame() print(f"\n开始合并 {len(all_data)} 个传感器的数据...") # 使用reduce逐步合并DataFrame merged_df = reduce( lambda left, right: pd.merge(left, right, on='time', how='outer'), all_data ) # 按时间排序 merged_df = merged_df.sort_values('time').reset_index(drop=True) print(f"合并完成,共 {len(merged_df)} 条时间记录") # 删除黑名单时段 print("删除黑名单时段...") original_len = len(merged_df) for s, e in DELETE_PERIODS: merged_df = merged_df[(merged_df['time'] < s) | (merged_df['time'] > e)] deleted_count = original_len - len(merged_df) if deleted_count > 0: print(f"已删除 {deleted_count} 条黑名单时段数据") return merged_df # ---------- 数据后处理函数(填充空值)---------- def post_process_data(df, method='both'): """ 对聚合后的数据进行后处理:填充空值 参数: df: 输入的宽表DataFrame method: 填充方法 返回: pd.DataFrame: 处理后的DataFrame """ if df.empty: print("警告:输入数据为空") return df df_processed = df.copy() df_processed['time'] = pd.to_datetime(df_processed['time']) df_processed = df_processed.set_index('time') print(f"\n开始填充空值...") print(f"数据时间范围: {df_processed.index.min()} 到 {df_processed.index.max()}") print(f"数据行数: {len(df_processed)}") missing_before = df_processed.isnull().sum().sum() print(f"填充前空值数量: {missing_before}") if method == 'bfill': df_processed = df_processed.bfill() elif method == 'ffill': df_processed = df_processed.ffill() else: # both df_processed = df_processed.ffill().bfill() missing_after = df_processed.isnull().sum().sum() print(f"填充后空值数量: {missing_after}") print(f"填充了 {missing_before - missing_after} 个空值") # 重置索引 df_processed = df_processed.reset_index().rename(columns={'index': 'time'}) return df_processed # ---------- 数据分块保存函数 ---------- def save_data_chunks(df, output_dir, prefix="sensor_data", chunk_size=200000): """ 将数据集分块保存,命名格式为 {prefix}_partX_of_Y.csv """ total_rows = len(df) num_chunks = (total_rows + chunk_size - 1) // chunk_size # 向上取整 print(f"\n开始分块保存数据(共 {num_chunks} 个文件)...") for i in range(num_chunks): start_idx = i * chunk_size end_idx = min((i + 1) * chunk_size, total_rows) chunk_df = df.iloc[start_idx:end_idx] if not chunk_df.empty: file_path = os.path.join( output_dir, f"{prefix}_part{i + 1}_of_{num_chunks}.csv" ) chunk_df.to_csv(file_path, index=False, encoding='utf_8_sig') print(f" ✓ 已保存第 {i + 1}/{num_chunks} 个文件: {os.path.basename(file_path)} ({len(chunk_df)} 行)") return num_chunks # ---------- 主程序 ---------- if __name__ == "__main__": print("=" * 70) print("传感器数据采集程序 - 分钟级聚合版本") print("=" * 70) # 1. 创建数据库引擎 print("\n[1/4] 创建数据库连接...") engine_test, engine_prod = create_db_engines() if engine_test is None or engine_prod is None: print("❌ 数据库连接失败,程序退出") exit(1) # 2. 获取一分钟聚合数据 print(f"\n[2/4] 获取一分钟聚合数据...") print(f"时间范围: {START_TIME} 到 {END_TIME}") agg_df = fetch_sensor_data( SENSOR_NAMES, START_TIME, END_TIME, BOUNDARY, engine_test, engine_prod ) if agg_df.empty: print("\n❌ 未获取到任何聚合数据,程序退出") exit(1) print(f"\n✅ 聚合数据获取完成!") print(f"总数据量: {len(agg_df)} 条记录") print(f"时间范围: {agg_df['time'].min()} 到 {agg_df['time'].max()}") print(f"时间间隔: 1分钟") print(f"传感器数量: {len(agg_df.columns) - 1} 个") # 减1是因为time列 # 3. 后处理聚合数据(填充空值) print(f"\n[3/4] 后处理聚合数据(填充空值)...") processed_df = post_process_data(agg_df, method='both') print(f"\n✅ 后处理完成!") print(f"处理后数据行数: {len(processed_df)}") # 4. 保存数据(分块保存) print(f"\n[4/4] 保存数据...") # 直接使用分块保存,不再保存单个大文件 chunk_size = 200000 # 每块20万行 num_chunks = save_data_chunks( processed_df, PROCESSED_OUTPUT_DIR, "uf_all_units_processed_1min", chunk_size ) print("\n" + "=" * 70) print("✅ 所有任务完成!") print("=" * 70) # 显示文件信息 print(f"\n文件信息:") print(f"输出目录: {PROCESSED_OUTPUT_DIR}") print(f"文件数量: {num_chunks} 个") # 计算总大小 total_size = 0 for i in range(num_chunks): file_path = os.path.join( PROCESSED_OUTPUT_DIR, f"uf_all_units_processed_1min_part{i + 1}_of_{num_chunks}.csv" ) if os.path.exists(file_path): file_size = os.path.getsize(file_path) / (1024 * 1024) total_size += file_size print(f" {os.path.basename(file_path)}: {file_size:.2f} MB") print(f"总文件大小: {total_size:.2f} MB")