import os import pandas as pd import numpy as np from datetime import datetime, timedelta from sqlalchemy import create_engine, text from functools import reduce import time # ---------- 数据库连接配置 ---------- DB_USER = "whu" # 数据库用户名 DB_PASS = "09093f4e6b33ddd" # 数据库密码 DB_HOST = "222.130.26.206" # 数据库服务器地址 DB_PORT = 4000 # 数据库端口号 # ---------- 时间范围配置 ---------- START_TIME = datetime(2023, 2, 1, 0, 0, 0) # 数据查询开始时间 END_TIME = datetime(2025, 9, 1, 0, 0, 0) # 数据查询结束时间 BOUNDARY = datetime(2025, 3, 25, 0, 0, 0) # 数据库切换分界时间 # ---------- 输出目录配置 ---------- OUTPUT_DIR = "high_freq_cip" os.makedirs(OUTPUT_DIR, exist_ok=True) # ---------- 关键传感器配置(用于CIP区间划分) ---------- CONTROL_WORD_SENSORS = [ # 离散控制状态 - 原始1秒数据 {"name": "C.M.RO1_DB@control_word", "agg": "raw"}, # RO1设备控制字 {"name": "C.M.RO2_DB@control_word", "agg": "raw"}, # RO2设备控制字 {"name": "C.M.RO3_DB@control_word", "agg": "raw"}, # RO3设备控制字 {"name": "C.M.RO4_DB@control_word", "agg": "raw"}, # RO4设备控制字 ] CIP_SENSORS = [ # CIP相关传感器 - 1分钟降采样数据 {"name": "CIP_QXB_runFlow", "agg": "1min"}, # 清洗水泵流量 {"name": "C.M.RO1_PT_JS@out", "agg": "1min"}, # RO1一段进水压力 {"name": "C.M.RO2_PT_JS@out", "agg": "1min"}, # RO2一段进水压力 {"name": "C.M.RO3_PT_JS@out", "agg": "1min"}, # RO3一段进水压力 {"name": "C.M.RO4_PT_JS@out", "agg": "1min"}, # RO4一段进水压力 ] # ---------- 原始数据查询函数(1秒间隔) ---------- def fetch_raw_sensor_data(name, start, end, db): """ 查询传感器的原始1秒间隔数据 """ engine = create_engine( f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{db}?charset=utf8mb4", pool_pre_ping=True, pool_recycle=3600, connect_args={'connect_timeout': 300, 'read_timeout': 600, 'write_timeout': 600} ) sql = text(""" SELECT h_time AS time, val FROM dc_item_history_data_92 WHERE item_name = :item_name AND h_time BETWEEN :st AND :et AND val IS NOT NULL ORDER BY h_time """) try: print(f" 查询 {name} 原始数据...") df = pd.read_sql(sql, engine, params={"item_name": name, "st": start, "et": end}) print(f" 获取到 {len(df)} 条原始记录") return df except Exception as e: print(f" 查询失败: {str(e)}") return pd.DataFrame() # ---------- 降采样数据查询函数(1分钟间隔) ---------- def fetch_resampled_sensor_data(name, start, end, db, interval='1MINUTE'): """ 直接在数据库查询阶段进行1分钟降采样 """ engine = create_engine( f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{db}?charset=utf8mb4", pool_pre_ping=True, pool_recycle=3600, connect_args={'connect_timeout': 300, 'read_timeout': 600, 'write_timeout': 600} ) # 使用MySQL的日期函数进行降采样 sql = text(f""" SELECT DATE_FORMAT(h_time, '%Y-%m-%d %H:%i:00') AS time, AVG(val) AS val FROM dc_item_history_data_92 WHERE item_name = :item_name AND h_time BETWEEN :st AND :et AND val IS NOT NULL GROUP BY DATE_FORMAT(h_time, '%Y-%m-%d %H:%i:00') ORDER BY time """) try: print(f" 查询 {name} 1分钟降采样数据...") df = pd.read_sql(sql, engine, params={"item_name": name, "st": start, "et": end}) print(f" 获取到 {len(df)} 条降采样记录") return df except Exception as e: print(f" 降采样查询失败: {str(e)}") return pd.DataFrame() # ---------- 分时间段查询函数 ---------- def fetch_sensor_data_chunked(name, start, end, db, agg_type="raw"): """ 按年分时间段查询传感器数据 """ all_chunks = [] current_start = start while current_start < end: # 计算当前年度的结束时间(12月31日23:59:59) year_end = datetime(current_start.year, 12, 31, 23, 59, 59) current_end = min(year_end, end) try: if agg_type == "raw": chunk_df = fetch_raw_sensor_data(name, current_start, current_end, db) else: # 1min降采样 chunk_df = fetch_resampled_sensor_data(name, current_start, current_end, db) if not chunk_df.empty: all_chunks.append(chunk_df) print(f" 已获取 {current_start.year}年 的数据: {current_start} 到 {current_end}") # 添加短暂延迟避免服务器压力 time.sleep(0.5) except Exception as e: print(f" {current_start.year}年数据查询失败: {str(e)}") # 继续下一年 pass # 移动到下一年1月1日00:00:00 current_start = datetime(current_start.year + 1, 1, 1, 0, 0, 0) if all_chunks: return pd.concat(all_chunks, ignore_index=True) else: return pd.DataFrame() # ---------- 处理控制字传感器(单独保存) ---------- def process_control_word_sensors(sensor_configs, start_time, end_time, boundary, output_dir): """ 处理控制字传感器,每个传感器单独保存为CSV文件 """ saved_files = [] for i, sensor_config in enumerate(sensor_configs): sensor_name = sensor_config["name"] print(f"\n[{i + 1}/{len(sensor_configs)}] 处理控制字传感器: {sensor_name}") try: # 根据时间范围选择数据库 if end_time <= boundary: df = fetch_sensor_data_chunked(sensor_name, start_time, end_time, "ws_data_test", "raw") elif start_time >= boundary: df = fetch_sensor_data_chunked(sensor_name, start_time, end_time, "ws_data", "raw") else: df1 = fetch_sensor_data_chunked(sensor_name, start_time, boundary, "ws_data_test", "raw") df2 = fetch_sensor_data_chunked(sensor_name, boundary, end_time, "ws_data", "raw") df = pd.concat([df1, df2], ignore_index=True) if df.empty: print(f" 警告: {sensor_name} 未获取到数据") continue # 重命名列 df = df.rename(columns={'val': sensor_name}) # 保存单个控制字传感器数据 control_word_file = os.path.join(output_dir, f"{sensor_name.replace('@', '_').replace('.', '_')}_raw.csv") df.to_csv(control_word_file, index=False, encoding='utf_8_sig') saved_files.append(control_word_file) print(f" 已保存 {sensor_name} 原始数据,共 {len(df):,} 条记录") # 添加延迟 if i < len(sensor_configs) - 1: time.sleep(2) except Exception as e: print(f" 处理 {sensor_name} 失败: {str(e)}") continue return saved_files # ---------- 处理CIP传感器(合并保存) ---------- def process_cip_sensors(sensor_configs, start_time, end_time, boundary, output_dir): """ 处理CIP传感器,直接在数据库层面进行1分钟降采样后合并保存为一个文件 """ all_sensor_dfs = [] # 逐个传感器查询(已经在数据库层面降采样) for i, sensor_config in enumerate(sensor_configs): sensor_name = sensor_config["name"] print(f"\n[{i + 1}/{len(sensor_configs)}] 处理CIP传感器: {sensor_name}") try: # 根据时间范围选择数据库 if end_time <= boundary: df = fetch_sensor_data_chunked(sensor_name, start_time, end_time, "ws_data_test", "1min") elif start_time >= boundary: df = fetch_sensor_data_chunked(sensor_name, start_time, end_time, "ws_data", "1min") else: df1 = fetch_sensor_data_chunked(sensor_name, start_time, boundary, "ws_data_test", "1min") df2 = fetch_sensor_data_chunked(sensor_name, boundary, end_time, "ws_data", "1min") df = pd.concat([df1, df2], ignore_index=True) if df.empty: print(f" 警告: {sensor_name} 未获取到数据") continue # 重命名列并确保时间格式一致 df = df.rename(columns={'val': sensor_name}) df['time'] = pd.to_datetime(df['time']) # 设置时间为索引以便后续合并 df = df.set_index('time') all_sensor_dfs.append(df) print(f" 已获取 {sensor_name} 1分钟降采样数据,共 {len(df):,} 条记录") # 添加延迟 if i < len(sensor_configs) - 1: time.sleep(2) except Exception as e: print(f" 处理 {sensor_name} 失败: {str(e)}") continue # 合并所有CIP传感器数据 if not all_sensor_dfs: print("没有可处理的CIP传感器数据") return None print(f"\n开始合并 {len(all_sensor_dfs)} 个CIP传感器数据...") # 使用outer join合并所有数据 merged_df = all_sensor_dfs[0] for i in range(1, len(all_sensor_dfs)): merged_df = merged_df.join(all_sensor_dfs[i], how='outer') # 重置索引并排序 merged_df = merged_df.reset_index() merged_df = merged_df.rename(columns={'index': 'time'}) merged_df = merged_df.sort_values('time') # 处理缺失值(前后向填充) print("进行缺失值填充...") numeric_cols = merged_df.select_dtypes(include=[np.number]).columns merged_df[numeric_cols] = merged_df[numeric_cols].ffill().bfill() # 保存最终文件 cip_output_file = os.path.join(output_dir, "cip_sensors_1min_merged.csv") merged_df.to_csv(cip_output_file, index=False, encoding='utf_8_sig') print(f"已保存合并的CIP传感器数据,共 {len(merged_df):,} 条记录") return cip_output_file # ---------- 主程序 ---------- if __name__ == "__main__": print("开始获取高频率CIP相关传感器数据") print(f"时间范围: {START_TIME} 到 {END_TIME}") print(f"控制字传感器数量: {len(CONTROL_WORD_SENSORS)} (原始数据)") print(f"CIP传感器数量: {len(CIP_SENSORS)} (1分钟降采样)") print("=" * 60) try: # 1. 处理控制字传感器(每个单独保存) print("\n=== 处理控制字传感器 ===") control_word_files = process_control_word_sensors( CONTROL_WORD_SENSORS, START_TIME, END_TIME, BOUNDARY, OUTPUT_DIR ) # 2. 处理CIP传感器(合并保存) print("\n=== 处理CIP传感器 ===") cip_file = process_cip_sensors( CIP_SENSORS, START_TIME, END_TIME, BOUNDARY, OUTPUT_DIR ) print("\n=== 处理完成 ===") print(f"控制字文件: {len(control_word_files)} 个") print(f"CIP传感器文件: 1 个") if control_word_files: print("控制字文件列表:") for file in control_word_files: print(f" - {os.path.basename(file)}") if cip_file: print(f"CIP传感器文件: {os.path.basename(cip_file)}") except KeyboardInterrupt: print("\n用户中断程序执行") except Exception as e: print(f"程序执行出错: {str(e)}")