| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427 |
- import os, pandas as pd
- from datetime import datetime
- from sqlalchemy import create_engine, text
- from functools import reduce
- # ---------- 配置 ----------
- DB_USER = "whu"
- DB_PASS = "09093f4e6b33ddd"
- DB_HOST = "222.130.26.206"
- DB_PORT = 4000
- # 时间配置
- START_TIME = datetime(2026, 1, 22, 0, 0, 0)
- END_TIME = datetime(2026, 3, 15, 6, 0, 0)
- BOUNDARY = datetime(2025, 3, 25, 0, 0, 0)
- DELETE_PERIODS = [
- ("2024-04-24 13:42:00", "2024-04-24 18:26:00"),
- ("2024-11-09 12:34:00", "2024-11-11 10:46:00"),
- ("2024-12-12 08:52:00", "2024-12-12 17:22:00"),
- ("2024-12-15 16:00:00", "2024-12-16 09:34:00"),
- ("2025-01-28 10:58:00", "2025-02-05 11:24:00"),
- ] # 来自张昊师兄的,需要被去除的黑名单区间
- DELETE_PERIODS = [(pd.to_datetime(s), pd.to_datetime(e)) for s, e in DELETE_PERIODS]
- # ---------- 传感器配置----------
- # 机组编号
- UNITS = [1, 2]
- BASE_VARIABLES = [
- "ns=3;s={}#UF_JSFLOW_O", # 进水流量
- "ns=3;s={}#UF_JSPRESS_O", # 进水压力
- "ns=3;s=UF{}_SSD_KMYC", # 跨膜压差
- "ns=3;s=UF{}_STEP", # 步序/控制字
- "ns=3;s=ZZ_{}#UFBWB_POWER" # 反洗泵功率
- ]
- SYSTEM_VARIABLES = [
- "ns=3;s=ZJS_TEMP_O", # 进水温度
- "ns=3;s=RO_JSORP_O", # 总产水ORP
- "ns=3;s=RO_JSPH_O", # 总产水PH
- "ns=3;s=RO_JSDD_O", # 总产水电导
- "ns=3;s=CN_LEVEL_O", # 次钠液位
- "ns=3;s=S_LEVEL_O", # 酸液位
- "ns=3;s=J_LEVEL_O", # 碱液位
- "ns=3;s=ZZ_UFGSB_POWER", # 超滤供水泵功率
- ]
- # 输出目录
- BASE_OUTPUT_DIR = "../datasets/UF_yancheng_data"
- PROCESSED_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "raw")
- # 创建目录
- os.makedirs(PROCESSED_OUTPUT_DIR, exist_ok=True)
- # 生成所有变量名称
- SENSOR_NAMES = []
- for unit in UNITS:
- for var_template in BASE_VARIABLES:
- SENSOR_NAMES.append(var_template.format(unit))
- SENSOR_NAMES.extend(SYSTEM_VARIABLES)
- print(f"总共查询 {len(SENSOR_NAMES)} 个变量")
- for i, var in enumerate(SENSOR_NAMES, 1):
- print(f"{i:2d}. {var}")
- # ---------- 创建数据库引擎 ----------
- def create_db_engines():
- """
- 创建数据库引擎,每个数据库只创建一次
- """
- try:
- # 为两个数据库分别创建引擎
- engine_test = create_engine(
- f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/ws_data_test?charset=utf8mb4"
- )
- engine_prod = create_engine(
- f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/ws_data?charset=utf8mb4"
- )
- # 测试连接
- with engine_test.connect() as conn:
- print("✅ 测试数据库连接成功!")
- with engine_prod.connect() as conn:
- print("✅ 生产数据库连接成功!")
- return engine_test, engine_prod
- except Exception as e:
- print(f"❌ 数据库连接失败: {e}")
- return None, None
- # ---------- 一分钟聚合查询函数(子查询方式)----------
- def fetch_valve_aggregated(name, start, end, engine, interval_minutes=1):
- """
- 从数据库获取传感器数据并按分钟聚合,时间戳对齐到分钟开始(00秒)
- """
- interval_seconds = interval_minutes * 60
- sql = text(f"""
- SELECT
- FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(MIN(h_time)) / {interval_seconds}) * {interval_seconds}) AS time,
- AVG(val) AS val
- FROM (
- SELECT
- h_time,
- val,
- FLOOR(UNIX_TIMESTAMP(h_time) / {interval_seconds}) AS time_group
- FROM dc_item_history_data_1497
- WHERE item_name = :name
- AND h_time BETWEEN :st AND :et
- AND val IS NOT NULL
- ) t
- GROUP BY t.time_group
- ORDER BY time
- """)
- try:
- df = pd.read_sql(sql, engine, params={"name": name, "st": start, "et": end})
- if not df.empty:
- df['time'] = pd.to_datetime(df['time'])
- # 确保时间戳是整分钟(去除秒和微秒)
- df['time'] = df['time'].dt.floor('1min')
- print(f" ✓ {name}: {len(df)} 条记录")
- return df
- except Exception as e:
- print(f" ✗ {name} 查询失败: {str(e)}")
- return pd.DataFrame()
- def fetch_special_data(sensor, start, end, boundary, engine_test, engine_prod):
- """
- 专门获取步序数据,保持原始变化点
- 返回原始时间戳和值
- """
- sql = text("""
- SELECT h_time AS time, val
- FROM dc_item_history_data_1497
- WHERE item_name = :name
- AND h_time BETWEEN :st
- AND :et
- AND val IS NOT NULL
- ORDER BY h_time
- """)
- try:
- # 根据边界时间选择合适的数据库引擎
- if end <= boundary:
- df = pd.read_sql(sql, engine_test, params={"name": sensor, "st": start, "et": end})
- elif start >= boundary:
- df = pd.read_sql(sql, engine_prod, params={"name": sensor, "st": start, "et": end})
- else:
- df1 = pd.read_sql(sql, engine_test, params={"name": sensor, "st": start, "et": boundary})
- df2 = pd.read_sql(sql, engine_prod, params={"name": sensor, "st": boundary, "et": end})
- df = pd.concat([df1, df2], ignore_index=True)
- if not df.empty:
- df['time'] = pd.to_datetime(df['time'])
- return df
- except Exception as e:
- print(f" ✗ {sensor} 查询失败: {str(e)}")
- return pd.DataFrame()
- # ---------- 传感器数据查询函数(只获取聚合数据)----------
- # ---------- 传感器数据查询函数(只获取聚合数据)----------
- def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test, engine_prod):
- """
- 获取多个传感器的分钟级聚合数据并合并为宽表
- 步序变量单独处理
- """
- # 识别步序变量和功率变量
- step_vars = [v for v in sensor_names if 'STEP' in v]
- power_vars = [v for v in sensor_names if 'POWER' in v]
- # 需要特殊处理的变量
- special_vars = step_vars + power_vars
- # 其他连续变量
- continuous_vars = [v for v in sensor_names if v not in special_vars]
- print(f"\n识别到 {len(special_vars)} 个离散变量, {len(continuous_vars)} 个连续变量")
- # 3. 创建完整的时间网格(整分钟)- 先创建
- print(f"\n创建完整时间网格...")
- time_grid = pd.date_range(
- start=start_time.replace(second=0, microsecond=0),
- end=end_time.replace(second=0, microsecond=0),
- freq='1min'
- )
- # 创建以时间为索引的DataFrame
- merged_df = pd.DataFrame(index=time_grid)
- print(f"时间网格: {len(time_grid)} 个时间点")
- # 1. 处理连续变量(按分钟聚合,时间对齐到整分钟)
- if continuous_vars:
- print("\n获取连续变量数据(分钟平均):")
- for sensor in continuous_vars:
- try:
- # 根据边界时间选择合适的数据库引擎
- if end_time <= boundary:
- df = fetch_valve_aggregated(sensor, start_time, end_time, engine_test)
- elif start_time >= boundary:
- df = fetch_valve_aggregated(sensor, start_time, end_time, engine_prod)
- else:
- df1 = fetch_valve_aggregated(sensor, start_time, boundary, engine_test)
- df2 = fetch_valve_aggregated(sensor, boundary, end_time, engine_prod)
- df = pd.concat([df1, df2], ignore_index=True)
- if not df.empty:
- # 确保时间戳是整分钟并设为索引
- df['time'] = pd.to_datetime(df['time']).dt.floor('1min')
- df = df.drop_duplicates(subset=['time'], keep='first')
- df = df.set_index('time')
- # 添加到合并DataFrame
- merged_df[sensor] = df['val']
- print(f" ✓ {sensor}: {len(df)} 条记录")
- else:
- print(f" ⚠ {sensor}: 无数据")
- except Exception as e:
- print(f" ⚠ {sensor}: 处理失败 - {str(e)}")
- continue
- # 2. 处理离散变量(保持原始变化点)
- if special_vars:
- print("\n获取离散变量数据(原始变化点):")
- for sensor in special_vars:
- try:
- # 获取原始数据(不聚合)
- df = fetch_special_data(sensor, start_time, end_time, boundary, engine_test, engine_prod)
- if not df.empty:
- # 创建分钟级的重采样
- df['time'] = pd.to_datetime(df['time'])
- df = df.set_index('time')
- # 重采样到分钟,对于离散变量使用前向填充
- df_resampled = df.resample('1min').ffill()
- # 添加到合并DataFrame
- merged_df[sensor] = df_resampled['val']
- print(f" ✓ {sensor}: {len(df)} 个原始点 -> {len(df_resampled)} 个分钟点")
- else:
- print(f" ⚠ {sensor}: 无数据")
- except Exception as e:
- print(f" ⚠ {sensor}: 处理失败 - {str(e)}")
- continue
- if merged_df.empty or len(merged_df.columns) == 0:
- print("\n❌ 未获取到任何传感器数据")
- return pd.DataFrame()
- # 重置索引,将时间变为列
- merged_df = merged_df.reset_index()
- merged_df = merged_df.rename(columns={'index': 'time'})
- print(f"\n合并完成,共 {len(merged_df)} 条时间记录 × {len(merged_df.columns) - 1} 个传感器")
- print(f"数据框形状: {merged_df.shape}")
- # 5. 删除黑名单时段
- print("\n删除黑名单时段...")
- original_len = len(merged_df)
- for s, e in DELETE_PERIODS:
- s = pd.to_datetime(s).floor('1min')
- e = pd.to_datetime(e).floor('1min')
- merged_df = merged_df[(merged_df['time'] < s) | (merged_df['time'] > e)]
- deleted_count = original_len - len(merged_df)
- if deleted_count > 0:
- print(f"已删除 {deleted_count} 条黑名单时段数据")
- return merged_df
- # ---------- 数据后处理函数(填充空值)----------
- def post_process_data(df, continuous_vars, step_vars):
- """
- 对聚合后的数据进行后处理
- 连续变量:线性插值或前后填充
- 步序变量:已经前向填充,只需处理开头可能存在的空值
- """
- if df.empty:
- print("警告:输入数据为空")
- return df
- df_processed = df.copy()
- df_processed['time'] = pd.to_datetime(df_processed['time'])
- df_processed = df_processed.set_index('time')
- print(f"\n开始填充空值...")
- print(f"数据时间范围: {df_processed.index.min()} 到 {df_processed.index.max()}")
- print(f"数据行数: {len(df_processed)}")
- missing_before = df_processed.isnull().sum().sum()
- print(f"填充前空值数量: {missing_before}")
- # 处理连续变量(使用线性插值更适合连续物理量)
- for var in continuous_vars:
- if var in df_processed.columns:
- # 先线性插值,再前后填充边界
- df_processed[var] = df_processed[var].interpolate(method='linear', limit_direction='both')
- # 处理步序变量(可能开头有NaN,用后向填充)
- for var in step_vars:
- if var in df_processed.columns:
- df_processed[var] = df_processed[var].bfill() # 开头空值用第一个有效值填充
- missing_after = df_processed.isnull().sum().sum()
- print(f"填充后空值数量: {missing_after}")
- print(f"填充了 {missing_before - missing_after} 个空值")
- return df_processed.reset_index()
- # ---------- 数据分块保存函数 ----------
- def save_data_chunks(df, output_dir, prefix="sensor_data", chunk_size=200000):
- """
- 将数据集分块保存,命名格式为 {prefix}_partX_of_Y.csv
- """
- total_rows = len(df)
- num_chunks = (total_rows + chunk_size - 1) // chunk_size # 向上取整
- print(f"\n开始分块保存数据(共 {num_chunks} 个文件)...")
- for i in range(num_chunks):
- start_idx = i * chunk_size
- end_idx = min((i + 1) * chunk_size, total_rows)
- chunk_df = df.iloc[start_idx:end_idx]
- if not chunk_df.empty:
- file_path = os.path.join(
- output_dir,
- f"{prefix}_part{i + 1}_of_{num_chunks}.csv"
- )
- chunk_df.to_csv(file_path, index=False, encoding='utf_8_sig')
- print(f" ✓ 已保存第 {i + 1}/{num_chunks} 个文件: {os.path.basename(file_path)} ({len(chunk_df)} 行)")
- return num_chunks
- # ---------- 主程序 ----------
- if __name__ == "__main__":
- print("=" * 70)
- print("传感器数据采集程序 - 分钟级聚合版本")
- print("=" * 70)
- # 1. 创建数据库引擎
- print("\n[1/4] 创建数据库连接...")
- engine_test, engine_prod = create_db_engines()
- if engine_test is None or engine_prod is None:
- print("❌ 数据库连接失败,程序退出")
- exit(1)
- # 2. 获取一分钟聚合数据
- print(f"\n[2/4] 获取一分钟聚合数据...")
- print(f"时间范围: {START_TIME} 到 {END_TIME}")
- agg_df = fetch_sensor_data(
- SENSOR_NAMES,
- START_TIME,
- END_TIME,
- BOUNDARY,
- engine_test,
- engine_prod
- )
- if agg_df.empty:
- print("\n❌ 未获取到任何聚合数据,程序退出")
- exit(1)
- print(f"\n✅ 聚合数据获取完成!")
- print(f"总数据量: {len(agg_df)} 条记录")
- print(f"时间范围: {agg_df['time'].min()} 到 {agg_df['time'].max()}")
- print(f"时间间隔: 1分钟(整点)")
- print(f"传感器数量: {len(agg_df.columns) - 1} 个")
- # 识别步序变量
- step_vars = [col for col in agg_df.columns if 'STEP' in col]
- continuous_vars = [col for col in agg_df.columns if col != 'time' and col not in step_vars]
- # 3. 后处理数据
- print(f"\n[3/4] 后处理聚合数据...")
- processed_df = post_process_data(agg_df, continuous_vars, step_vars)
- print(f"\n✅ 后处理完成!")
- print(f"处理后数据行数: {len(processed_df)}")
- # 4. 保存数据(分块保存)
- print(f"\n[4/4] 保存数据...")
- # 直接使用分块保存,不再保存单个大文件
- chunk_size = 200000 # 每块20万行
- num_chunks = save_data_chunks(
- processed_df,
- PROCESSED_OUTPUT_DIR,
- "uf_all_units_processed_1min",
- chunk_size
- )
- print("\n" + "=" * 70)
- print("✅ 所有任务完成!")
- print("=" * 70)
- # 显示文件信息
- print(f"\n文件信息:")
- print(f"输出目录: {PROCESSED_OUTPUT_DIR}")
- print(f"文件数量: {num_chunks} 个")
- # 计算总大小
- total_size = 0
- for i in range(num_chunks):
- file_path = os.path.join(
- PROCESSED_OUTPUT_DIR,
- f"uf_all_units_processed_1min_part{i + 1}_of_{num_chunks}.csv"
- )
- if os.path.exists(file_path):
- file_size = os.path.getsize(file_path) / (1024 * 1024)
- total_size += file_size
- print(f" {os.path.basename(file_path)}: {file_size:.2f} MB")
- print(f"总文件大小: {total_size:.2f} MB")
|