|
|
@@ -0,0 +1,349 @@
|
|
|
+import os, pandas as pd
|
|
|
+from datetime import datetime
|
|
|
+from sqlalchemy import create_engine, text
|
|
|
+from functools import reduce
|
|
|
+
|
|
|
+# ---------- 配置 ----------
|
|
|
+DB_USER = "whu"
|
|
|
+DB_PASS = "09093f4e6b33ddd"
|
|
|
+DB_HOST = "222.130.26.206"
|
|
|
+DB_PORT = 4000
|
|
|
+
|
|
|
+# 时间配置
|
|
|
+START_TIME = datetime(2025, 11, 28, 0, 0, 0)
|
|
|
+END_TIME = datetime(2026, 2, 20, 0, 0, 0)
|
|
|
+BOUNDARY = datetime(2025, 3, 25, 0, 0, 0)
|
|
|
+
|
|
|
+DELETE_PERIODS = [
|
|
|
+ ("2024-04-24 13:42:00", "2024-04-24 18:26:00"),
|
|
|
+ ("2024-11-09 12:34:00", "2024-11-11 10:46:00"),
|
|
|
+ ("2024-12-12 08:52:00", "2024-12-12 17:22:00"),
|
|
|
+ ("2024-12-15 16:00:00", "2024-12-16 09:34:00"),
|
|
|
+ ("2025-01-28 10:58:00", "2025-02-05 11:24:00"),
|
|
|
+] # 来自张昊师兄的,需要被去除的黑名单区间
|
|
|
+DELETE_PERIODS = [(pd.to_datetime(s), pd.to_datetime(e)) for s, e in DELETE_PERIODS]
|
|
|
+
|
|
|
+# 新水岛
|
|
|
+# ---------- 传感器配置(新水厂系统) ----------
|
|
|
+# 机组编号
|
|
|
+UNITS = [1, 2]
|
|
|
+
|
|
|
+# 新系统变量模板
|
|
|
+BASE_VARIABLES = [
|
|
|
+ "ns=3;s={}#UF_JSFLOW_O", # 进水流量
|
|
|
+ "ns=3;s={}#UF_JSPRESS_O", # 进水压力(如有)
|
|
|
+ "ns=3;s=UF{}_SSD_KMYC", # 跨膜压差(如有)
|
|
|
+ "ns=3;s=UF{}_STEP" # 步序
|
|
|
+]
|
|
|
+
|
|
|
+# 系统总变量(如果存在)
|
|
|
+SYSTEM_VARIABLES = [
|
|
|
+ "ns=3;s=ZJS_TEMP_O", # 进水温度
|
|
|
+ "ns=3;s=RO_JSORP_O", # 总产水ORP(示例)
|
|
|
+ "ns=3;s=RO_JSPH_O", # 总产水PH(示例)
|
|
|
+]
|
|
|
+
|
|
|
+# 生成所有变量名称
|
|
|
+SENSOR_NAMES = []
|
|
|
+
|
|
|
+for unit in UNITS:
|
|
|
+ for var_template in BASE_VARIABLES:
|
|
|
+ SENSOR_NAMES.append(var_template.format(unit))
|
|
|
+
|
|
|
+SENSOR_NAMES.extend(SYSTEM_VARIABLES)
|
|
|
+
|
|
|
+print(f"总共查询 {len(SENSOR_NAMES)} 个变量")
|
|
|
+for i, var in enumerate(SENSOR_NAMES, 1):
|
|
|
+ print(f"{i:2d}. {var}")
|
|
|
+
|
|
|
+# 输出目录 - 只需要processed文件夹
|
|
|
+BASE_OUTPUT_DIR = "../datasets/UF_longting_data"
|
|
|
+PROCESSED_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "raw")
|
|
|
+
|
|
|
+# 创建目录
|
|
|
+os.makedirs(PROCESSED_OUTPUT_DIR, exist_ok=True)
|
|
|
+
|
|
|
+
|
|
|
+# ---------- 创建数据库引擎 ----------
|
|
|
+def create_db_engines():
|
|
|
+ """
|
|
|
+ 创建数据库引擎,每个数据库只创建一次
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 为两个数据库分别创建引擎
|
|
|
+ engine_test = create_engine(
|
|
|
+ f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/ws_data_test?charset=utf8mb4"
|
|
|
+ )
|
|
|
+ engine_prod = create_engine(
|
|
|
+ f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/ws_data?charset=utf8mb4"
|
|
|
+ )
|
|
|
+
|
|
|
+ # 测试连接
|
|
|
+ with engine_test.connect() as conn:
|
|
|
+ print("✅ 测试数据库连接成功!")
|
|
|
+ with engine_prod.connect() as conn:
|
|
|
+ print("✅ 生产数据库连接成功!")
|
|
|
+
|
|
|
+ return engine_test, engine_prod
|
|
|
+ except Exception as e:
|
|
|
+ print(f"❌ 数据库连接失败: {e}")
|
|
|
+ return None, None
|
|
|
+
|
|
|
+
|
|
|
+# ---------- 一分钟聚合查询函数(子查询方式)----------
|
|
|
+def fetch_valve_aggregated(name, start, end, engine, interval_minutes=1):
|
|
|
+ """
|
|
|
+ 从数据库获取传感器数据并直接进行时间聚合
|
|
|
+ 使用子查询方式避免 ONLY_FULL_GROUP_BY 问题
|
|
|
+ """
|
|
|
+ interval_seconds = interval_minutes * 60
|
|
|
+
|
|
|
+ sql = text(f"""
|
|
|
+ SELECT
|
|
|
+ MIN(h_time) AS time,
|
|
|
+ AVG(val) AS val
|
|
|
+ FROM (
|
|
|
+ SELECT
|
|
|
+ h_time,
|
|
|
+ val,
|
|
|
+ FLOOR(UNIX_TIMESTAMP(h_time) / {interval_seconds}) AS time_group
|
|
|
+ FROM dc_item_history_data_1450
|
|
|
+ WHERE item_name = :name
|
|
|
+ AND h_time BETWEEN :st AND :et
|
|
|
+ AND val IS NOT NULL
|
|
|
+ ) t
|
|
|
+ GROUP BY t.time_group
|
|
|
+ ORDER BY time
|
|
|
+ """)
|
|
|
+
|
|
|
+ try:
|
|
|
+ df = pd.read_sql(sql, engine, params={"name": name, "st": start, "et": end})
|
|
|
+ if not df.empty:
|
|
|
+ df['time'] = pd.to_datetime(df['time'])
|
|
|
+ print(f" ✓ {name}: {len(df)} 条记录")
|
|
|
+ return df
|
|
|
+ except Exception as e:
|
|
|
+ print(f" ✗ {name} 查询失败: {str(e)}")
|
|
|
+ return pd.DataFrame()
|
|
|
+
|
|
|
+
|
|
|
+# ---------- 传感器数据查询函数(只获取聚合数据)----------
|
|
|
+def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test, engine_prod):
|
|
|
+ """
|
|
|
+ 获取多个传感器的分钟级聚合数据并合并为宽表
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ sensor_names: 传感器名称列表
|
|
|
+ start_time: 开始时间
|
|
|
+ end_time: 结束时间
|
|
|
+ boundary: 数据库切换边界
|
|
|
+ engine_test: 测试数据库引擎
|
|
|
+ engine_prod: 生产数据库引擎
|
|
|
+ """
|
|
|
+ all_data = [] # 只存储聚合数据
|
|
|
+
|
|
|
+ print("\n开始获取各传感器数据:")
|
|
|
+
|
|
|
+ for sensor in sensor_names:
|
|
|
+ try:
|
|
|
+ # 根据边界时间选择合适的数据库引擎
|
|
|
+ if end_time <= boundary:
|
|
|
+ # 全部在测试数据库
|
|
|
+ df = fetch_valve_aggregated(sensor, start_time, end_time, engine_test)
|
|
|
+ elif start_time >= boundary:
|
|
|
+ # 全部在生产数据库
|
|
|
+ df = fetch_valve_aggregated(sensor, start_time, end_time, engine_prod)
|
|
|
+ else:
|
|
|
+ # 跨越两个数据库
|
|
|
+ df1 = fetch_valve_aggregated(sensor, start_time, boundary, engine_test)
|
|
|
+ df2 = fetch_valve_aggregated(sensor, boundary, end_time, engine_prod)
|
|
|
+ df = pd.concat([df1, df2], ignore_index=True)
|
|
|
+
|
|
|
+ if not df.empty:
|
|
|
+ # 重命名列以区分不同传感器
|
|
|
+ df_renamed = df.rename(columns={'val': sensor})
|
|
|
+ all_data.append(df_renamed[['time', sensor]])
|
|
|
+ else:
|
|
|
+ print(f" ⚠ {sensor}: 无数据")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f" ⚠ {sensor}: 处理失败 - {str(e)}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ if not all_data:
|
|
|
+ print("\n❌ 未获取到任何传感器数据")
|
|
|
+ return pd.DataFrame()
|
|
|
+
|
|
|
+ print(f"\n开始合并 {len(all_data)} 个传感器的数据...")
|
|
|
+
|
|
|
+ # 使用reduce逐步合并DataFrame
|
|
|
+ merged_df = reduce(
|
|
|
+ lambda left, right: pd.merge(left, right, on='time', how='outer'),
|
|
|
+ all_data
|
|
|
+ )
|
|
|
+
|
|
|
+ # 按时间排序
|
|
|
+ merged_df = merged_df.sort_values('time').reset_index(drop=True)
|
|
|
+
|
|
|
+ print(f"合并完成,共 {len(merged_df)} 条时间记录")
|
|
|
+
|
|
|
+ # 删除黑名单时段
|
|
|
+ print("删除黑名单时段...")
|
|
|
+ original_len = len(merged_df)
|
|
|
+ for s, e in DELETE_PERIODS:
|
|
|
+ merged_df = merged_df[(merged_df['time'] < s) | (merged_df['time'] > e)]
|
|
|
+
|
|
|
+ deleted_count = original_len - len(merged_df)
|
|
|
+ if deleted_count > 0:
|
|
|
+ print(f"已删除 {deleted_count} 条黑名单时段数据")
|
|
|
+
|
|
|
+ return merged_df
|
|
|
+
|
|
|
+
|
|
|
+# ---------- 数据后处理函数(填充空值)----------
|
|
|
+def post_process_data(df, method='both'):
|
|
|
+ """
|
|
|
+ 对聚合后的数据进行后处理:填充空值
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ df: 输入的宽表DataFrame
|
|
|
+ method: 填充方法
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ pd.DataFrame: 处理后的DataFrame
|
|
|
+ """
|
|
|
+ if df.empty:
|
|
|
+ print("警告:输入数据为空")
|
|
|
+ return df
|
|
|
+
|
|
|
+ df_processed = df.copy()
|
|
|
+ df_processed['time'] = pd.to_datetime(df_processed['time'])
|
|
|
+ df_processed = df_processed.set_index('time')
|
|
|
+
|
|
|
+ print(f"\n开始填充空值...")
|
|
|
+ print(f"数据时间范围: {df_processed.index.min()} 到 {df_processed.index.max()}")
|
|
|
+ print(f"数据行数: {len(df_processed)}")
|
|
|
+
|
|
|
+ missing_before = df_processed.isnull().sum().sum()
|
|
|
+ print(f"填充前空值数量: {missing_before}")
|
|
|
+
|
|
|
+ if method == 'bfill':
|
|
|
+ df_processed = df_processed.bfill()
|
|
|
+ elif method == 'ffill':
|
|
|
+ df_processed = df_processed.ffill()
|
|
|
+ else: # both
|
|
|
+ df_processed = df_processed.ffill().bfill()
|
|
|
+
|
|
|
+ missing_after = df_processed.isnull().sum().sum()
|
|
|
+ print(f"填充后空值数量: {missing_after}")
|
|
|
+ print(f"填充了 {missing_before - missing_after} 个空值")
|
|
|
+
|
|
|
+ # 重置索引
|
|
|
+ df_processed = df_processed.reset_index().rename(columns={'index': 'time'})
|
|
|
+
|
|
|
+ return df_processed
|
|
|
+
|
|
|
+
|
|
|
+# ---------- 数据分块保存函数 ----------
|
|
|
+def save_data_chunks(df, output_dir, prefix="sensor_data", chunk_size=200000):
|
|
|
+ """
|
|
|
+ 将数据集分块保存,命名格式为 {prefix}_partX_of_Y.csv
|
|
|
+ """
|
|
|
+ total_rows = len(df)
|
|
|
+ num_chunks = (total_rows + chunk_size - 1) // chunk_size # 向上取整
|
|
|
+
|
|
|
+ print(f"\n开始分块保存数据(共 {num_chunks} 个文件)...")
|
|
|
+
|
|
|
+ for i in range(num_chunks):
|
|
|
+ start_idx = i * chunk_size
|
|
|
+ end_idx = min((i + 1) * chunk_size, total_rows)
|
|
|
+ chunk_df = df.iloc[start_idx:end_idx]
|
|
|
+
|
|
|
+ if not chunk_df.empty:
|
|
|
+ file_path = os.path.join(
|
|
|
+ output_dir,
|
|
|
+ f"{prefix}_part{i + 1}_of_{num_chunks}.csv"
|
|
|
+ )
|
|
|
+ chunk_df.to_csv(file_path, index=False, encoding='utf_8_sig')
|
|
|
+ print(f" ✓ 已保存第 {i + 1}/{num_chunks} 个文件: {os.path.basename(file_path)} ({len(chunk_df)} 行)")
|
|
|
+
|
|
|
+ return num_chunks
|
|
|
+
|
|
|
+
|
|
|
+# ---------- 主程序 ----------
|
|
|
+if __name__ == "__main__":
|
|
|
+ print("=" * 70)
|
|
|
+ print("传感器数据采集程序 - 分钟级聚合版本")
|
|
|
+ print("=" * 70)
|
|
|
+
|
|
|
+ # 1. 创建数据库引擎
|
|
|
+ print("\n[1/4] 创建数据库连接...")
|
|
|
+ engine_test, engine_prod = create_db_engines()
|
|
|
+ if engine_test is None or engine_prod is None:
|
|
|
+ print("❌ 数据库连接失败,程序退出")
|
|
|
+ exit(1)
|
|
|
+
|
|
|
+ # 2. 获取一分钟聚合数据
|
|
|
+ print(f"\n[2/4] 获取一分钟聚合数据...")
|
|
|
+ print(f"时间范围: {START_TIME} 到 {END_TIME}")
|
|
|
+
|
|
|
+ agg_df = fetch_sensor_data(
|
|
|
+ SENSOR_NAMES,
|
|
|
+ START_TIME,
|
|
|
+ END_TIME,
|
|
|
+ BOUNDARY,
|
|
|
+ engine_test,
|
|
|
+ engine_prod
|
|
|
+ )
|
|
|
+
|
|
|
+ if agg_df.empty:
|
|
|
+ print("\n❌ 未获取到任何聚合数据,程序退出")
|
|
|
+ exit(1)
|
|
|
+
|
|
|
+ print(f"\n✅ 聚合数据获取完成!")
|
|
|
+ print(f"总数据量: {len(agg_df)} 条记录")
|
|
|
+ print(f"时间范围: {agg_df['time'].min()} 到 {agg_df['time'].max()}")
|
|
|
+ print(f"时间间隔: 1分钟")
|
|
|
+ print(f"传感器数量: {len(agg_df.columns) - 1} 个") # 减1是因为time列
|
|
|
+
|
|
|
+ # 3. 后处理聚合数据(填充空值)
|
|
|
+ print(f"\n[3/4] 后处理聚合数据(填充空值)...")
|
|
|
+ processed_df = post_process_data(agg_df, method='both')
|
|
|
+
|
|
|
+ print(f"\n✅ 后处理完成!")
|
|
|
+ print(f"处理后数据行数: {len(processed_df)}")
|
|
|
+
|
|
|
+ # 4. 保存数据(分块保存)
|
|
|
+ print(f"\n[4/4] 保存数据...")
|
|
|
+
|
|
|
+ # 直接使用分块保存,不再保存单个大文件
|
|
|
+ chunk_size = 200000 # 每块20万行
|
|
|
+ num_chunks = save_data_chunks(
|
|
|
+ processed_df,
|
|
|
+ PROCESSED_OUTPUT_DIR,
|
|
|
+ "uf_all_units_processed_1min",
|
|
|
+ chunk_size
|
|
|
+ )
|
|
|
+
|
|
|
+ print("\n" + "=" * 70)
|
|
|
+ print("✅ 所有任务完成!")
|
|
|
+ print("=" * 70)
|
|
|
+
|
|
|
+ # 显示文件信息
|
|
|
+ print(f"\n文件信息:")
|
|
|
+ print(f"输出目录: {PROCESSED_OUTPUT_DIR}")
|
|
|
+ print(f"文件数量: {num_chunks} 个")
|
|
|
+
|
|
|
+ # 计算总大小
|
|
|
+ total_size = 0
|
|
|
+ for i in range(num_chunks):
|
|
|
+ file_path = os.path.join(
|
|
|
+ PROCESSED_OUTPUT_DIR,
|
|
|
+ f"uf_all_units_processed_1min_part{i + 1}_of_{num_chunks}.csv"
|
|
|
+ )
|
|
|
+ if os.path.exists(file_path):
|
|
|
+ file_size = os.path.getsize(file_path) / (1024 * 1024)
|
|
|
+ total_size += file_size
|
|
|
+ print(f" {os.path.basename(file_path)}: {file_size:.2f} MB")
|
|
|
+
|
|
|
+ print(f"总文件大小: {total_size:.2f} MB")
|