| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315 |
- import os
- import pandas as pd
- import numpy as np
- from datetime import datetime, timedelta
- from sqlalchemy import create_engine, text
- from functools import reduce
- import time
- # ---------- 数据库连接配置 ----------
- DB_USER = "whu" # 数据库用户名
- DB_PASS = "09093f4e6b33ddd" # 数据库密码
- DB_HOST = "222.130.26.206" # 数据库服务器地址
- DB_PORT = 4000 # 数据库端口号
- # ---------- 时间范围配置 ----------
- START_TIME = datetime(2023, 2, 1, 0, 0, 0) # 数据查询开始时间
- END_TIME = datetime(2025, 9, 1, 0, 0, 0) # 数据查询结束时间
- BOUNDARY = datetime(2025, 3, 25, 0, 0, 0) # 数据库切换分界时间
- # ---------- 输出目录配置 ----------
- OUTPUT_DIR = "high_freq_cip"
- os.makedirs(OUTPUT_DIR, exist_ok=True)
- # ---------- 关键传感器配置(用于CIP区间划分) ----------
- CONTROL_WORD_SENSORS = [
- # 离散控制状态 - 原始1秒数据
- {"name": "C.M.RO1_DB@control_word", "agg": "raw"}, # RO1设备控制字
- {"name": "C.M.RO2_DB@control_word", "agg": "raw"}, # RO2设备控制字
- {"name": "C.M.RO3_DB@control_word", "agg": "raw"}, # RO3设备控制字
- {"name": "C.M.RO4_DB@control_word", "agg": "raw"}, # RO4设备控制字
- ]
- CIP_SENSORS = [
- # CIP相关传感器 - 1分钟降采样数据
- {"name": "CIP_QXB_runFlow", "agg": "1min"}, # 清洗水泵流量
- {"name": "C.M.RO1_PT_JS@out", "agg": "1min"}, # RO1一段进水压力
- {"name": "C.M.RO2_PT_JS@out", "agg": "1min"}, # RO2一段进水压力
- {"name": "C.M.RO3_PT_JS@out", "agg": "1min"}, # RO3一段进水压力
- {"name": "C.M.RO4_PT_JS@out", "agg": "1min"}, # RO4一段进水压力
- ]
- # ---------- 原始数据查询函数(1秒间隔) ----------
- def fetch_raw_sensor_data(name, start, end, db):
- """
- 查询传感器的原始1秒间隔数据
- """
- engine = create_engine(
- f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{db}?charset=utf8mb4",
- pool_pre_ping=True,
- pool_recycle=3600,
- connect_args={'connect_timeout': 300, 'read_timeout': 600, 'write_timeout': 600}
- )
- sql = text("""
- SELECT h_time AS time, val
- FROM dc_item_history_data_92
- WHERE item_name = :item_name
- AND h_time BETWEEN :st
- AND :et
- AND val IS NOT NULL
- ORDER BY h_time
- """)
- try:
- print(f" 查询 {name} 原始数据...")
- df = pd.read_sql(sql, engine, params={"item_name": name, "st": start, "et": end})
- print(f" 获取到 {len(df)} 条原始记录")
- return df
- except Exception as e:
- print(f" 查询失败: {str(e)}")
- return pd.DataFrame()
- # ---------- 降采样数据查询函数(1分钟间隔) ----------
- def fetch_resampled_sensor_data(name, start, end, db, interval='1MINUTE'):
- """
- 直接在数据库查询阶段进行1分钟降采样
- """
- engine = create_engine(
- f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{db}?charset=utf8mb4",
- pool_pre_ping=True,
- pool_recycle=3600,
- connect_args={'connect_timeout': 300, 'read_timeout': 600, 'write_timeout': 600}
- )
- # 使用MySQL的日期函数进行降采样
- sql = text(f"""
- SELECT
- DATE_FORMAT(h_time, '%Y-%m-%d %H:%i:00') AS time,
- AVG(val) AS val
- FROM dc_item_history_data_92
- WHERE item_name = :item_name
- AND h_time BETWEEN :st
- AND :et
- AND val IS NOT NULL
- GROUP BY DATE_FORMAT(h_time, '%Y-%m-%d %H:%i:00')
- ORDER BY time
- """)
- try:
- print(f" 查询 {name} 1分钟降采样数据...")
- df = pd.read_sql(sql, engine, params={"item_name": name, "st": start, "et": end})
- print(f" 获取到 {len(df)} 条降采样记录")
- return df
- except Exception as e:
- print(f" 降采样查询失败: {str(e)}")
- return pd.DataFrame()
- # ---------- 分时间段查询函数 ----------
- def fetch_sensor_data_chunked(name, start, end, db, agg_type="raw"):
- """
- 按年分时间段查询传感器数据
- """
- all_chunks = []
- current_start = start
- while current_start < end:
- # 计算当前年度的结束时间(12月31日23:59:59)
- year_end = datetime(current_start.year, 12, 31, 23, 59, 59)
- current_end = min(year_end, end)
- try:
- if agg_type == "raw":
- chunk_df = fetch_raw_sensor_data(name, current_start, current_end, db)
- else: # 1min降采样
- chunk_df = fetch_resampled_sensor_data(name, current_start, current_end, db)
- if not chunk_df.empty:
- all_chunks.append(chunk_df)
- print(f" 已获取 {current_start.year}年 的数据: {current_start} 到 {current_end}")
- # 添加短暂延迟避免服务器压力
- time.sleep(0.5)
- except Exception as e:
- print(f" {current_start.year}年数据查询失败: {str(e)}")
- # 继续下一年
- pass
- # 移动到下一年1月1日00:00:00
- current_start = datetime(current_start.year + 1, 1, 1, 0, 0, 0)
- if all_chunks:
- return pd.concat(all_chunks, ignore_index=True)
- else:
- return pd.DataFrame()
- # ---------- 处理控制字传感器(单独保存) ----------
- def process_control_word_sensors(sensor_configs, start_time, end_time, boundary, output_dir):
- """
- 处理控制字传感器,每个传感器单独保存为CSV文件
- """
- saved_files = []
- for i, sensor_config in enumerate(sensor_configs):
- sensor_name = sensor_config["name"]
- print(f"\n[{i + 1}/{len(sensor_configs)}] 处理控制字传感器: {sensor_name}")
- try:
- # 根据时间范围选择数据库
- if end_time <= boundary:
- df = fetch_sensor_data_chunked(sensor_name, start_time, end_time, "ws_data_test", "raw")
- elif start_time >= boundary:
- df = fetch_sensor_data_chunked(sensor_name, start_time, end_time, "ws_data", "raw")
- else:
- df1 = fetch_sensor_data_chunked(sensor_name, start_time, boundary, "ws_data_test", "raw")
- df2 = fetch_sensor_data_chunked(sensor_name, boundary, end_time, "ws_data", "raw")
- df = pd.concat([df1, df2], ignore_index=True)
- if df.empty:
- print(f" 警告: {sensor_name} 未获取到数据")
- continue
- # 重命名列
- df = df.rename(columns={'val': sensor_name})
- # 保存单个控制字传感器数据
- control_word_file = os.path.join(output_dir, f"{sensor_name.replace('@', '_').replace('.', '_')}_raw.csv")
- df.to_csv(control_word_file, index=False, encoding='utf_8_sig')
- saved_files.append(control_word_file)
- print(f" 已保存 {sensor_name} 原始数据,共 {len(df):,} 条记录")
- # 添加延迟
- if i < len(sensor_configs) - 1:
- time.sleep(2)
- except Exception as e:
- print(f" 处理 {sensor_name} 失败: {str(e)}")
- continue
- return saved_files
- # ---------- 处理CIP传感器(合并保存) ----------
- def process_cip_sensors(sensor_configs, start_time, end_time, boundary, output_dir):
- """
- 处理CIP传感器,直接在数据库层面进行1分钟降采样后合并保存为一个文件
- """
- all_sensor_dfs = []
- # 逐个传感器查询(已经在数据库层面降采样)
- for i, sensor_config in enumerate(sensor_configs):
- sensor_name = sensor_config["name"]
- print(f"\n[{i + 1}/{len(sensor_configs)}] 处理CIP传感器: {sensor_name}")
- try:
- # 根据时间范围选择数据库
- if end_time <= boundary:
- df = fetch_sensor_data_chunked(sensor_name, start_time, end_time, "ws_data_test", "1min")
- elif start_time >= boundary:
- df = fetch_sensor_data_chunked(sensor_name, start_time, end_time, "ws_data", "1min")
- else:
- df1 = fetch_sensor_data_chunked(sensor_name, start_time, boundary, "ws_data_test", "1min")
- df2 = fetch_sensor_data_chunked(sensor_name, boundary, end_time, "ws_data", "1min")
- df = pd.concat([df1, df2], ignore_index=True)
- if df.empty:
- print(f" 警告: {sensor_name} 未获取到数据")
- continue
- # 重命名列并确保时间格式一致
- df = df.rename(columns={'val': sensor_name})
- df['time'] = pd.to_datetime(df['time'])
- # 设置时间为索引以便后续合并
- df = df.set_index('time')
- all_sensor_dfs.append(df)
- print(f" 已获取 {sensor_name} 1分钟降采样数据,共 {len(df):,} 条记录")
- # 添加延迟
- if i < len(sensor_configs) - 1:
- time.sleep(2)
- except Exception as e:
- print(f" 处理 {sensor_name} 失败: {str(e)}")
- continue
- # 合并所有CIP传感器数据
- if not all_sensor_dfs:
- print("没有可处理的CIP传感器数据")
- return None
- print(f"\n开始合并 {len(all_sensor_dfs)} 个CIP传感器数据...")
- # 使用outer join合并所有数据
- merged_df = all_sensor_dfs[0]
- for i in range(1, len(all_sensor_dfs)):
- merged_df = merged_df.join(all_sensor_dfs[i], how='outer')
- # 重置索引并排序
- merged_df = merged_df.reset_index()
- merged_df = merged_df.rename(columns={'index': 'time'})
- merged_df = merged_df.sort_values('time')
- # 处理缺失值(前后向填充)
- print("进行缺失值填充...")
- numeric_cols = merged_df.select_dtypes(include=[np.number]).columns
- merged_df[numeric_cols] = merged_df[numeric_cols].ffill().bfill()
- # 保存最终文件
- cip_output_file = os.path.join(output_dir, "cip_sensors_1min_merged.csv")
- merged_df.to_csv(cip_output_file, index=False, encoding='utf_8_sig')
- print(f"已保存合并的CIP传感器数据,共 {len(merged_df):,} 条记录")
- return cip_output_file
- # ---------- 主程序 ----------
- if __name__ == "__main__":
- print("开始获取高频率CIP相关传感器数据")
- print(f"时间范围: {START_TIME} 到 {END_TIME}")
- print(f"控制字传感器数量: {len(CONTROL_WORD_SENSORS)} (原始数据)")
- print(f"CIP传感器数量: {len(CIP_SENSORS)} (1分钟降采样)")
- print("=" * 60)
- try:
- # 1. 处理控制字传感器(每个单独保存)
- print("\n=== 处理控制字传感器 ===")
- control_word_files = process_control_word_sensors(
- CONTROL_WORD_SENSORS,
- START_TIME,
- END_TIME,
- BOUNDARY,
- OUTPUT_DIR
- )
- # 2. 处理CIP传感器(合并保存)
- print("\n=== 处理CIP传感器 ===")
- cip_file = process_cip_sensors(
- CIP_SENSORS,
- START_TIME,
- END_TIME,
- BOUNDARY,
- OUTPUT_DIR
- )
- print("\n=== 处理完成 ===")
- print(f"控制字文件: {len(control_word_files)} 个")
- print(f"CIP传感器文件: 1 个")
- if control_word_files:
- print("控制字文件列表:")
- for file in control_word_files:
- print(f" - {os.path.basename(file)}")
- if cip_file:
- print(f"CIP传感器文件: {os.path.basename(cip_file)}")
- except KeyboardInterrupt:
- print("\n用户中断程序执行")
- except Exception as e:
- print(f"程序执行出错: {str(e)}")
|