| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- import os, pandas as pd
- from datetime import datetime
- from sqlalchemy import create_engine, text
- from functools import reduce
- # ---------- 配置 ----------
- DB_USER = "whu"
- DB_PASS = "09093f4e6b33ddd"
- DB_HOST = "222.130.26.206"
- DB_PORT = 4000
- # 时间配置
- START_TIME = datetime(2025, 11, 28, 0, 0, 0)
- END_TIME = datetime(2026, 2, 20, 0, 0, 0)
- BOUNDARY = datetime(2025, 3, 25, 0, 0, 0)
- DELETE_PERIODS = [
- ("2024-04-24 13:42:00", "2024-04-24 18:26:00"),
- ("2024-11-09 12:34:00", "2024-11-11 10:46:00"),
- ("2024-12-12 08:52:00", "2024-12-12 17:22:00"),
- ("2024-12-15 16:00:00", "2024-12-16 09:34:00"),
- ("2025-01-28 10:58:00", "2025-02-05 11:24:00"),
- ] # 来自张昊师兄的,需要被去除的黑名单区间
- DELETE_PERIODS = [(pd.to_datetime(s), pd.to_datetime(e)) for s, e in DELETE_PERIODS]
- # 新水岛
- # ---------- 传感器配置(新水厂系统) ----------
- # 机组编号
- UNITS = [1, 2]
- # 新系统变量模板
- BASE_VARIABLES = [
- "ns=3;s={}#UF_JSFLOW_O", # 进水流量
- "ns=3;s={}#UF_JSPRESS_O", # 进水压力(如有)
- "ns=3;s=UF{}_SSD_KMYC", # 跨膜压差(如有)
- "ns=3;s=UF{}_STEP" # 步序
- ]
- # 系统总变量(如果存在)
- SYSTEM_VARIABLES = [
- "ns=3;s=ZJS_TEMP_O", # 进水温度
- "ns=3;s=RO_JSORP_O", # 总产水ORP(示例)
- "ns=3;s=RO_JSPH_O", # 总产水PH(示例)
- ]
- # 生成所有变量名称
- SENSOR_NAMES = []
- for unit in UNITS:
- for var_template in BASE_VARIABLES:
- SENSOR_NAMES.append(var_template.format(unit))
- SENSOR_NAMES.extend(SYSTEM_VARIABLES)
- print(f"总共查询 {len(SENSOR_NAMES)} 个变量")
- for i, var in enumerate(SENSOR_NAMES, 1):
- print(f"{i:2d}. {var}")
- # 输出目录 - 只需要processed文件夹
- BASE_OUTPUT_DIR = "../datasets/UF_longting_data"
- PROCESSED_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "raw")
- # 创建目录
- os.makedirs(PROCESSED_OUTPUT_DIR, exist_ok=True)
- # ---------- 创建数据库引擎 ----------
- def create_db_engines():
- """
- 创建数据库引擎,每个数据库只创建一次
- """
- try:
- # 为两个数据库分别创建引擎
- engine_test = create_engine(
- f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/ws_data_test?charset=utf8mb4"
- )
- engine_prod = create_engine(
- f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/ws_data?charset=utf8mb4"
- )
- # 测试连接
- with engine_test.connect() as conn:
- print("✅ 测试数据库连接成功!")
- with engine_prod.connect() as conn:
- print("✅ 生产数据库连接成功!")
- return engine_test, engine_prod
- except Exception as e:
- print(f"❌ 数据库连接失败: {e}")
- return None, None
- # ---------- 一分钟聚合查询函数(子查询方式)----------
- def fetch_valve_aggregated(name, start, end, engine, interval_minutes=1):
- """
- 从数据库获取传感器数据并直接进行时间聚合
- 使用子查询方式避免 ONLY_FULL_GROUP_BY 问题
- """
- interval_seconds = interval_minutes * 60
- sql = text(f"""
- SELECT
- MIN(h_time) AS time,
- AVG(val) AS val
- FROM (
- SELECT
- h_time,
- val,
- FLOOR(UNIX_TIMESTAMP(h_time) / {interval_seconds}) AS time_group
- FROM dc_item_history_data_1450
- WHERE item_name = :name
- AND h_time BETWEEN :st AND :et
- AND val IS NOT NULL
- ) t
- GROUP BY t.time_group
- ORDER BY time
- """)
- try:
- df = pd.read_sql(sql, engine, params={"name": name, "st": start, "et": end})
- if not df.empty:
- df['time'] = pd.to_datetime(df['time'])
- print(f" ✓ {name}: {len(df)} 条记录")
- return df
- except Exception as e:
- print(f" ✗ {name} 查询失败: {str(e)}")
- return pd.DataFrame()
- # ---------- 传感器数据查询函数(只获取聚合数据)----------
- def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test, engine_prod):
- """
- 获取多个传感器的分钟级聚合数据并合并为宽表
- 参数:
- sensor_names: 传感器名称列表
- start_time: 开始时间
- end_time: 结束时间
- boundary: 数据库切换边界
- engine_test: 测试数据库引擎
- engine_prod: 生产数据库引擎
- """
- all_data = [] # 只存储聚合数据
- print("\n开始获取各传感器数据:")
- for sensor in sensor_names:
- try:
- # 根据边界时间选择合适的数据库引擎
- if end_time <= boundary:
- # 全部在测试数据库
- df = fetch_valve_aggregated(sensor, start_time, end_time, engine_test)
- elif start_time >= boundary:
- # 全部在生产数据库
- df = fetch_valve_aggregated(sensor, start_time, end_time, engine_prod)
- else:
- # 跨越两个数据库
- df1 = fetch_valve_aggregated(sensor, start_time, boundary, engine_test)
- df2 = fetch_valve_aggregated(sensor, boundary, end_time, engine_prod)
- df = pd.concat([df1, df2], ignore_index=True)
- if not df.empty:
- # 重命名列以区分不同传感器
- df_renamed = df.rename(columns={'val': sensor})
- all_data.append(df_renamed[['time', sensor]])
- else:
- print(f" ⚠ {sensor}: 无数据")
- except Exception as e:
- print(f" ⚠ {sensor}: 处理失败 - {str(e)}")
- continue
- if not all_data:
- print("\n❌ 未获取到任何传感器数据")
- return pd.DataFrame()
- print(f"\n开始合并 {len(all_data)} 个传感器的数据...")
- # 使用reduce逐步合并DataFrame
- merged_df = reduce(
- lambda left, right: pd.merge(left, right, on='time', how='outer'),
- all_data
- )
- # 按时间排序
- merged_df = merged_df.sort_values('time').reset_index(drop=True)
- print(f"合并完成,共 {len(merged_df)} 条时间记录")
- # 删除黑名单时段
- print("删除黑名单时段...")
- original_len = len(merged_df)
- for s, e in DELETE_PERIODS:
- merged_df = merged_df[(merged_df['time'] < s) | (merged_df['time'] > e)]
- deleted_count = original_len - len(merged_df)
- if deleted_count > 0:
- print(f"已删除 {deleted_count} 条黑名单时段数据")
- return merged_df
- # ---------- 数据后处理函数(填充空值)----------
- def post_process_data(df, method='both'):
- """
- 对聚合后的数据进行后处理:填充空值
- 参数:
- df: 输入的宽表DataFrame
- method: 填充方法
- 返回:
- pd.DataFrame: 处理后的DataFrame
- """
- if df.empty:
- print("警告:输入数据为空")
- return df
- df_processed = df.copy()
- df_processed['time'] = pd.to_datetime(df_processed['time'])
- df_processed = df_processed.set_index('time')
- print(f"\n开始填充空值...")
- print(f"数据时间范围: {df_processed.index.min()} 到 {df_processed.index.max()}")
- print(f"数据行数: {len(df_processed)}")
- missing_before = df_processed.isnull().sum().sum()
- print(f"填充前空值数量: {missing_before}")
- if method == 'bfill':
- df_processed = df_processed.bfill()
- elif method == 'ffill':
- df_processed = df_processed.ffill()
- else: # both
- df_processed = df_processed.ffill().bfill()
- missing_after = df_processed.isnull().sum().sum()
- print(f"填充后空值数量: {missing_after}")
- print(f"填充了 {missing_before - missing_after} 个空值")
- # 重置索引
- df_processed = df_processed.reset_index().rename(columns={'index': 'time'})
- return df_processed
- # ---------- 数据分块保存函数 ----------
- def save_data_chunks(df, output_dir, prefix="sensor_data", chunk_size=200000):
- """
- 将数据集分块保存,命名格式为 {prefix}_partX_of_Y.csv
- """
- total_rows = len(df)
- num_chunks = (total_rows + chunk_size - 1) // chunk_size # 向上取整
- print(f"\n开始分块保存数据(共 {num_chunks} 个文件)...")
- for i in range(num_chunks):
- start_idx = i * chunk_size
- end_idx = min((i + 1) * chunk_size, total_rows)
- chunk_df = df.iloc[start_idx:end_idx]
- if not chunk_df.empty:
- file_path = os.path.join(
- output_dir,
- f"{prefix}_part{i + 1}_of_{num_chunks}.csv"
- )
- chunk_df.to_csv(file_path, index=False, encoding='utf_8_sig')
- print(f" ✓ 已保存第 {i + 1}/{num_chunks} 个文件: {os.path.basename(file_path)} ({len(chunk_df)} 行)")
- return num_chunks
- # ---------- 主程序 ----------
- if __name__ == "__main__":
- print("=" * 70)
- print("传感器数据采集程序 - 分钟级聚合版本")
- print("=" * 70)
- # 1. 创建数据库引擎
- print("\n[1/4] 创建数据库连接...")
- engine_test, engine_prod = create_db_engines()
- if engine_test is None or engine_prod is None:
- print("❌ 数据库连接失败,程序退出")
- exit(1)
- # 2. 获取一分钟聚合数据
- print(f"\n[2/4] 获取一分钟聚合数据...")
- print(f"时间范围: {START_TIME} 到 {END_TIME}")
- agg_df = fetch_sensor_data(
- SENSOR_NAMES,
- START_TIME,
- END_TIME,
- BOUNDARY,
- engine_test,
- engine_prod
- )
- if agg_df.empty:
- print("\n❌ 未获取到任何聚合数据,程序退出")
- exit(1)
- print(f"\n✅ 聚合数据获取完成!")
- print(f"总数据量: {len(agg_df)} 条记录")
- print(f"时间范围: {agg_df['time'].min()} 到 {agg_df['time'].max()}")
- print(f"时间间隔: 1分钟")
- print(f"传感器数量: {len(agg_df.columns) - 1} 个") # 减1是因为time列
- # 3. 后处理聚合数据(填充空值)
- print(f"\n[3/4] 后处理聚合数据(填充空值)...")
- processed_df = post_process_data(agg_df, method='both')
- print(f"\n✅ 后处理完成!")
- print(f"处理后数据行数: {len(processed_df)}")
- # 4. 保存数据(分块保存)
- print(f"\n[4/4] 保存数据...")
- # 直接使用分块保存,不再保存单个大文件
- chunk_size = 200000 # 每块20万行
- num_chunks = save_data_chunks(
- processed_df,
- PROCESSED_OUTPUT_DIR,
- "uf_all_units_processed_1min",
- chunk_size
- )
- print("\n" + "=" * 70)
- print("✅ 所有任务完成!")
- print("=" * 70)
- # 显示文件信息
- print(f"\n文件信息:")
- print(f"输出目录: {PROCESSED_OUTPUT_DIR}")
- print(f"文件数量: {num_chunks} 个")
- # 计算总大小
- total_size = 0
- for i in range(num_chunks):
- file_path = os.path.join(
- PROCESSED_OUTPUT_DIR,
- f"uf_all_units_processed_1min_part{i + 1}_of_{num_chunks}.csv"
- )
- if os.path.exists(file_path):
- file_size = os.path.getsize(file_path) / (1024 * 1024)
- total_size += file_size
- print(f" {os.path.basename(file_path)}: {file_size:.2f} MB")
- print(f"总文件大小: {total_size:.2f} MB")
|