sensor_data_extract.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. import os
  2. import pandas as pd
  3. import numpy as np
  4. from datetime import datetime, timedelta
  5. from sqlalchemy import create_engine, text
  6. from functools import reduce
  7. import time
  8. # ---------- 数据库连接配置 ----------
  9. DB_USER = "whu" # 数据库用户名
  10. DB_PASS = "09093f4e6b33ddd" # 数据库密码
  11. DB_HOST = "222.130.26.206" # 数据库服务器地址
  12. DB_PORT = 4000 # 数据库端口号
  13. # ---------- 时间范围配置 ----------
  14. START_TIME = datetime(2024, 4, 1, 0, 0, 0) # 数据查询开始时间
  15. END_TIME = datetime(2026, 4, 1, 0, 0, 0) # 数据查询结束时间
  16. BOUNDARY = datetime(2025, 3, 25, 0, 0, 0) # 数据库切换分界时间
  17. # ---------- 输出目录配置 ----------
  18. OUTPUT_DIR = "../datasets/high_freq_cip"
  19. os.makedirs(OUTPUT_DIR, exist_ok=True)
  20. # ---------- 关键传感器配置(用于CIP区间划分) ----------
  21. CONTROL_WORD_SENSORS = [
  22. # 离散控制状态 - 原始1秒数据
  23. {"name": "C.M.RO1_DB@control_word", "agg": "raw"}, # RO1设备控制字
  24. {"name": "C.M.RO2_DB@control_word", "agg": "raw"}, # RO2设备控制字
  25. {"name": "C.M.RO3_DB@control_word", "agg": "raw"}, # RO3设备控制字
  26. {"name": "C.M.RO4_DB@control_word", "agg": "raw"}, # RO4设备控制字
  27. {"name": "C.M.RO1_DB@model_word", "agg": "raw"}, # RO1控制模式
  28. {"name": "C.M.RO2_DB@model_word", "agg": "raw"}, # RO2控制模式
  29. {"name": "C.M.RO3_DB@model_word", "agg": "raw"}, # RO3控制模式
  30. {"name": "C.M.RO4_DB@model_word", "agg": "raw"}, # RO4控制模式
  31. {"name": "C.M.CIP_QXB1@run", "agg": "raw"}, # 1#CIP清洗水泵运行
  32. {"name": "C.M.CIP_QXB2@run", "agg": "raw"}, # 2#CIP清洗水泵运行
  33. {"name": "C.M.RO1_CIP_JSF1@open_feedback", "agg": "raw"}, # RO1一段CIP进水阀开到位
  34. {"name": "C.M.RO1_CIP_JSF2@open_feedback", "agg": "raw"}, # RO1二段CIP进水阀开到位
  35. {"name": "C.M.RO2_CIP_JSF1@open_feedback", "agg": "raw"}, # RO2一段CIP进水阀开到位
  36. {"name": "C.M.RO2_CIP_JSF2@open_feedback", "agg": "raw"}, # RO2二段CIP进水阀开到位
  37. {"name": "C.M.RO3_CIP_JSF1@open_feedback", "agg": "raw"}, # RO3一段CIP进水阀开到位
  38. {"name": "C.M.RO3_CIP_JSF2@open_feedback", "agg": "raw"}, # RO3二段CIP进水阀开到位
  39. {"name": "C.M.RO4_CIP_JSF1@open_feedback", "agg": "raw"}, # RO4一段CIP进水阀开到位
  40. {"name": "C.M.RO4_CIP_JSF2@open_feedback", "agg": "raw"}, # RO4二段CIP进水阀开到位
  41. ]
  42. CIP_SENSORS = [
  43. # CIP相关传感器 - 1分钟降采样数据
  44. {"name": "CIP_QXB_runFlow", "agg": "1min"}, # 清洗水泵流量
  45. {"name": "C.M.RO1_PT_JS@out", "agg": "1min"}, # RO1一段进水压力
  46. {"name": "C.M.RO2_PT_JS@out", "agg": "1min"}, # RO2一段进水压力
  47. {"name": "C.M.RO3_PT_JS@out", "agg": "1min"}, # RO3一段进水压力
  48. {"name": "C.M.RO4_PT_JS@out", "agg": "1min"}, # RO4一段进水压力
  49. ]
  50. # ---------- 原始数据查询函数(1秒间隔) ----------
  51. def fetch_raw_sensor_data(name, start, end, db):
  52. """
  53. 查询传感器的原始1秒间隔数据
  54. """
  55. engine = create_engine(
  56. f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{db}?charset=utf8mb4",
  57. pool_pre_ping=True,
  58. pool_recycle=3600,
  59. connect_args={'connect_timeout': 300, 'read_timeout': 600, 'write_timeout': 600}
  60. )
  61. sql = text("""
  62. SELECT h_time AS time, val
  63. FROM dc_item_history_data_92
  64. WHERE item_name = :item_name
  65. AND h_time BETWEEN :st
  66. AND :et
  67. AND val IS NOT NULL
  68. ORDER BY h_time
  69. """)
  70. try:
  71. print(f" 查询 {name} 原始数据...")
  72. df = pd.read_sql(sql, engine, params={"item_name": name, "st": start, "et": end})
  73. print(f" 获取到 {len(df)} 条原始记录")
  74. return df
  75. except Exception as e:
  76. print(f" 查询失败: {str(e)}")
  77. return pd.DataFrame()
  78. # ---------- 降采样数据查询函数(1分钟间隔) ----------
  79. def fetch_resampled_sensor_data(name, start, end, db, interval='1MINUTE'):
  80. """
  81. 直接在数据库查询阶段进行1分钟降采样
  82. """
  83. engine = create_engine(
  84. f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{db}?charset=utf8mb4",
  85. pool_pre_ping=True,
  86. pool_recycle=3600,
  87. connect_args={'connect_timeout': 300, 'read_timeout': 600, 'write_timeout': 600}
  88. )
  89. # 使用MySQL的日期函数进行降采样
  90. sql = text(f"""
  91. SELECT
  92. DATE_FORMAT(h_time, '%Y-%m-%d %H:%i:00') AS time,
  93. AVG(val) AS val
  94. FROM dc_item_history_data_92
  95. WHERE item_name = :item_name
  96. AND h_time BETWEEN :st
  97. AND :et
  98. AND val IS NOT NULL
  99. GROUP BY DATE_FORMAT(h_time, '%Y-%m-%d %H:%i:00')
  100. ORDER BY time
  101. """)
  102. try:
  103. print(f" 查询 {name} 1分钟降采样数据...")
  104. df = pd.read_sql(sql, engine, params={"item_name": name, "st": start, "et": end})
  105. print(f" 获取到 {len(df)} 条降采样记录")
  106. return df
  107. except Exception as e:
  108. print(f" 降采样查询失败: {str(e)}")
  109. return pd.DataFrame()
  110. # ---------- 分时间段查询函数 ----------
  111. def fetch_sensor_data_chunked(name, start, end, db, agg_type="raw"):
  112. """
  113. 按年分时间段查询传感器数据
  114. """
  115. all_chunks = []
  116. current_start = start
  117. while current_start < end:
  118. # 计算当前年度的结束时间(12月31日23:59:59)
  119. year_end = datetime(current_start.year, 12, 31, 23, 59, 59)
  120. current_end = min(year_end, end)
  121. try:
  122. if agg_type == "raw":
  123. chunk_df = fetch_raw_sensor_data(name, current_start, current_end, db)
  124. else: # 1min降采样
  125. chunk_df = fetch_resampled_sensor_data(name, current_start, current_end, db)
  126. if not chunk_df.empty:
  127. all_chunks.append(chunk_df)
  128. print(f" 已获取 {current_start.year}年 的数据: {current_start} 到 {current_end}")
  129. # 添加短暂延迟避免服务器压力
  130. time.sleep(0.5)
  131. except Exception as e:
  132. print(f" {current_start.year}年数据查询失败: {str(e)}")
  133. # 继续下一年
  134. pass
  135. # 移动到下一年1月1日00:00:00
  136. current_start = datetime(current_start.year + 1, 1, 1, 0, 0, 0)
  137. if all_chunks:
  138. return pd.concat(all_chunks, ignore_index=True)
  139. else:
  140. return pd.DataFrame()
  141. # ---------- 处理控制字传感器(单独保存) ----------
  142. def process_control_word_sensors(sensor_configs, start_time, end_time, boundary, output_dir):
  143. """
  144. 处理控制字传感器,每个传感器单独保存为CSV文件
  145. """
  146. saved_files = []
  147. for i, sensor_config in enumerate(sensor_configs):
  148. sensor_name = sensor_config["name"]
  149. print(f"\n[{i + 1}/{len(sensor_configs)}] 处理控制字传感器: {sensor_name}")
  150. try:
  151. # 根据时间范围选择数据库
  152. if end_time <= boundary:
  153. df = fetch_sensor_data_chunked(sensor_name, start_time, end_time, "ws_data_test", "raw")
  154. elif start_time >= boundary:
  155. df = fetch_sensor_data_chunked(sensor_name, start_time, end_time, "ws_data", "raw")
  156. else:
  157. df1 = fetch_sensor_data_chunked(sensor_name, start_time, boundary, "ws_data_test", "raw")
  158. df2 = fetch_sensor_data_chunked(sensor_name, boundary, end_time, "ws_data", "raw")
  159. df = pd.concat([df1, df2], ignore_index=True)
  160. if df.empty:
  161. print(f" 警告: {sensor_name} 未获取到数据")
  162. continue
  163. # 重命名列
  164. df = df.rename(columns={'val': sensor_name})
  165. # 保存单个控制字传感器数据
  166. control_word_file = os.path.join(output_dir, f"{sensor_name.replace('@', '_').replace('.', '_')}_raw.csv")
  167. df.to_csv(control_word_file, index=False, encoding='utf_8_sig')
  168. saved_files.append(control_word_file)
  169. print(f" 已保存 {sensor_name} 原始数据,共 {len(df):,} 条记录")
  170. # 添加延迟
  171. if i < len(sensor_configs) - 1:
  172. time.sleep(2)
  173. except Exception as e:
  174. print(f" 处理 {sensor_name} 失败: {str(e)}")
  175. continue
  176. return saved_files
  177. # ---------- 处理CIP传感器(合并保存) ----------
  178. def process_cip_sensors(sensor_configs, start_time, end_time, boundary, output_dir):
  179. """
  180. 处理CIP传感器,直接在数据库层面进行1分钟降采样后合并保存为一个文件
  181. """
  182. all_sensor_dfs = []
  183. # 逐个传感器查询(已经在数据库层面降采样)
  184. for i, sensor_config in enumerate(sensor_configs):
  185. sensor_name = sensor_config["name"]
  186. print(f"\n[{i + 1}/{len(sensor_configs)}] 处理CIP传感器: {sensor_name}")
  187. try:
  188. # 根据时间范围选择数据库
  189. if end_time <= boundary:
  190. df = fetch_sensor_data_chunked(sensor_name, start_time, end_time, "ws_data_test", "1min")
  191. elif start_time >= boundary:
  192. df = fetch_sensor_data_chunked(sensor_name, start_time, end_time, "ws_data", "1min")
  193. else:
  194. df1 = fetch_sensor_data_chunked(sensor_name, start_time, boundary, "ws_data_test", "1min")
  195. df2 = fetch_sensor_data_chunked(sensor_name, boundary, end_time, "ws_data", "1min")
  196. df = pd.concat([df1, df2], ignore_index=True)
  197. if df.empty:
  198. print(f" 警告: {sensor_name} 未获取到数据")
  199. continue
  200. # 重命名列并确保时间格式一致
  201. df = df.rename(columns={'val': sensor_name})
  202. df['time'] = pd.to_datetime(df['time'])
  203. # 设置时间为索引以便后续合并
  204. df = df.set_index('time')
  205. all_sensor_dfs.append(df)
  206. print(f" 已获取 {sensor_name} 1分钟降采样数据,共 {len(df):,} 条记录")
  207. # 添加延迟
  208. if i < len(sensor_configs) - 1:
  209. time.sleep(2)
  210. except Exception as e:
  211. print(f" 处理 {sensor_name} 失败: {str(e)}")
  212. continue
  213. # 合并所有CIP传感器数据
  214. if not all_sensor_dfs:
  215. print("没有可处理的CIP传感器数据")
  216. return None
  217. print(f"\n开始合并 {len(all_sensor_dfs)} 个CIP传感器数据...")
  218. # 使用outer join合并所有数据
  219. merged_df = all_sensor_dfs[0]
  220. for i in range(1, len(all_sensor_dfs)):
  221. merged_df = merged_df.join(all_sensor_dfs[i], how='outer')
  222. # 重置索引并排序
  223. merged_df = merged_df.reset_index()
  224. merged_df = merged_df.rename(columns={'index': 'time'})
  225. merged_df = merged_df.sort_values('time')
  226. # 处理缺失值(前后向填充)
  227. print("进行缺失值填充...")
  228. numeric_cols = merged_df.select_dtypes(include=[np.number]).columns
  229. merged_df[numeric_cols] = merged_df[numeric_cols].ffill().bfill()
  230. # 保存最终文件
  231. cip_output_file = os.path.join(output_dir, "cip_sensors_1min_merged.csv")
  232. merged_df.to_csv(cip_output_file, index=False, encoding='utf_8_sig')
  233. print(f"已保存合并的CIP传感器数据,共 {len(merged_df):,} 条记录")
  234. return cip_output_file
  235. # ---------- 主程序 ----------
  236. if __name__ == "__main__":
  237. print("开始获取高频率CIP相关传感器数据")
  238. print(f"时间范围: {START_TIME} 到 {END_TIME}")
  239. print(f"控制字传感器数量: {len(CONTROL_WORD_SENSORS)} (原始数据)")
  240. print(f"CIP传感器数量: {len(CIP_SENSORS)} (1分钟降采样)")
  241. print("=" * 60)
  242. try:
  243. # 1. 处理控制字传感器(每个单独保存)
  244. print("\n=== 处理控制字传感器 ===")
  245. control_word_files = process_control_word_sensors(
  246. CONTROL_WORD_SENSORS,
  247. START_TIME,
  248. END_TIME,
  249. BOUNDARY,
  250. OUTPUT_DIR
  251. )
  252. # 2. 处理CIP传感器(合并保存)
  253. print("\n=== 处理CIP传感器 ===")
  254. cip_file = process_cip_sensors(
  255. CIP_SENSORS,
  256. START_TIME,
  257. END_TIME,
  258. BOUNDARY,
  259. OUTPUT_DIR
  260. )
  261. print("\n=== 处理完成 ===")
  262. print(f"控制字文件: {len(control_word_files)} 个")
  263. print(f"CIP传感器文件: 1 个")
  264. if control_word_files:
  265. print("控制字文件列表:")
  266. for file in control_word_files:
  267. print(f" - {os.path.basename(file)}")
  268. if cip_file:
  269. print(f"CIP传感器文件: {os.path.basename(cip_file)}")
  270. except KeyboardInterrupt:
  271. print("\n用户中断程序执行")
  272. except Exception as e:
  273. print(f"程序执行出错: {str(e)}")