sensor_data_extract.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. import os
  2. import pandas as pd
  3. import numpy as np
  4. from datetime import datetime, timedelta
  5. from sqlalchemy import create_engine, text
  6. from functools import reduce
  7. import time
  8. # ---------- 数据库连接配置 ----------
  9. DB_USER = "whu" # 数据库用户名
  10. DB_PASS = "09093f4e6b33ddd" # 数据库密码
  11. DB_HOST = "222.130.26.206" # 数据库服务器地址
  12. DB_PORT = 4000 # 数据库端口号
  13. # ---------- 时间范围配置 ----------
  14. START_TIME = datetime(2023, 2, 1, 0, 0, 0) # 数据查询开始时间
  15. END_TIME = datetime(2025, 9, 1, 0, 0, 0) # 数据查询结束时间
  16. BOUNDARY = datetime(2025, 3, 25, 0, 0, 0) # 数据库切换分界时间
  17. # ---------- 输出目录配置 ----------
  18. OUTPUT_DIR = "high_freq_cip"
  19. os.makedirs(OUTPUT_DIR, exist_ok=True)
  20. # ---------- 关键传感器配置(用于CIP区间划分) ----------
  21. CONTROL_WORD_SENSORS = [
  22. # 离散控制状态 - 原始1秒数据
  23. {"name": "C.M.RO1_DB@control_word", "agg": "raw"}, # RO1设备控制字
  24. {"name": "C.M.RO2_DB@control_word", "agg": "raw"}, # RO2设备控制字
  25. {"name": "C.M.RO3_DB@control_word", "agg": "raw"}, # RO3设备控制字
  26. {"name": "C.M.RO4_DB@control_word", "agg": "raw"}, # RO4设备控制字
  27. ]
  28. CIP_SENSORS = [
  29. # CIP相关传感器 - 1分钟降采样数据
  30. {"name": "CIP_QXB_runFlow", "agg": "1min"}, # 清洗水泵流量
  31. {"name": "C.M.RO1_PT_JS@out", "agg": "1min"}, # RO1一段进水压力
  32. {"name": "C.M.RO2_PT_JS@out", "agg": "1min"}, # RO2一段进水压力
  33. {"name": "C.M.RO3_PT_JS@out", "agg": "1min"}, # RO3一段进水压力
  34. {"name": "C.M.RO4_PT_JS@out", "agg": "1min"}, # RO4一段进水压力
  35. ]
  36. # ---------- 原始数据查询函数(1秒间隔) ----------
  37. def fetch_raw_sensor_data(name, start, end, db):
  38. """
  39. 查询传感器的原始1秒间隔数据
  40. """
  41. engine = create_engine(
  42. f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{db}?charset=utf8mb4",
  43. pool_pre_ping=True,
  44. pool_recycle=3600,
  45. connect_args={'connect_timeout': 300, 'read_timeout': 600, 'write_timeout': 600}
  46. )
  47. sql = text("""
  48. SELECT h_time AS time, val
  49. FROM dc_item_history_data_92
  50. WHERE item_name = :item_name
  51. AND h_time BETWEEN :st
  52. AND :et
  53. AND val IS NOT NULL
  54. ORDER BY h_time
  55. """)
  56. try:
  57. print(f" 查询 {name} 原始数据...")
  58. df = pd.read_sql(sql, engine, params={"item_name": name, "st": start, "et": end})
  59. print(f" 获取到 {len(df)} 条原始记录")
  60. return df
  61. except Exception as e:
  62. print(f" 查询失败: {str(e)}")
  63. return pd.DataFrame()
  64. # ---------- 降采样数据查询函数(1分钟间隔) ----------
  65. def fetch_resampled_sensor_data(name, start, end, db, interval='1MINUTE'):
  66. """
  67. 直接在数据库查询阶段进行1分钟降采样
  68. """
  69. engine = create_engine(
  70. f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{db}?charset=utf8mb4",
  71. pool_pre_ping=True,
  72. pool_recycle=3600,
  73. connect_args={'connect_timeout': 300, 'read_timeout': 600, 'write_timeout': 600}
  74. )
  75. # 使用MySQL的日期函数进行降采样
  76. sql = text(f"""
  77. SELECT
  78. DATE_FORMAT(h_time, '%Y-%m-%d %H:%i:00') AS time,
  79. AVG(val) AS val
  80. FROM dc_item_history_data_92
  81. WHERE item_name = :item_name
  82. AND h_time BETWEEN :st
  83. AND :et
  84. AND val IS NOT NULL
  85. GROUP BY DATE_FORMAT(h_time, '%Y-%m-%d %H:%i:00')
  86. ORDER BY time
  87. """)
  88. try:
  89. print(f" 查询 {name} 1分钟降采样数据...")
  90. df = pd.read_sql(sql, engine, params={"item_name": name, "st": start, "et": end})
  91. print(f" 获取到 {len(df)} 条降采样记录")
  92. return df
  93. except Exception as e:
  94. print(f" 降采样查询失败: {str(e)}")
  95. return pd.DataFrame()
  96. # ---------- 分时间段查询函数 ----------
  97. def fetch_sensor_data_chunked(name, start, end, db, agg_type="raw"):
  98. """
  99. 按年分时间段查询传感器数据
  100. """
  101. all_chunks = []
  102. current_start = start
  103. while current_start < end:
  104. # 计算当前年度的结束时间(12月31日23:59:59)
  105. year_end = datetime(current_start.year, 12, 31, 23, 59, 59)
  106. current_end = min(year_end, end)
  107. try:
  108. if agg_type == "raw":
  109. chunk_df = fetch_raw_sensor_data(name, current_start, current_end, db)
  110. else: # 1min降采样
  111. chunk_df = fetch_resampled_sensor_data(name, current_start, current_end, db)
  112. if not chunk_df.empty:
  113. all_chunks.append(chunk_df)
  114. print(f" 已获取 {current_start.year}年 的数据: {current_start} 到 {current_end}")
  115. # 添加短暂延迟避免服务器压力
  116. time.sleep(0.5)
  117. except Exception as e:
  118. print(f" {current_start.year}年数据查询失败: {str(e)}")
  119. # 继续下一年
  120. pass
  121. # 移动到下一年1月1日00:00:00
  122. current_start = datetime(current_start.year + 1, 1, 1, 0, 0, 0)
  123. if all_chunks:
  124. return pd.concat(all_chunks, ignore_index=True)
  125. else:
  126. return pd.DataFrame()
  127. # ---------- 处理控制字传感器(单独保存) ----------
  128. def process_control_word_sensors(sensor_configs, start_time, end_time, boundary, output_dir):
  129. """
  130. 处理控制字传感器,每个传感器单独保存为CSV文件
  131. """
  132. saved_files = []
  133. for i, sensor_config in enumerate(sensor_configs):
  134. sensor_name = sensor_config["name"]
  135. print(f"\n[{i + 1}/{len(sensor_configs)}] 处理控制字传感器: {sensor_name}")
  136. try:
  137. # 根据时间范围选择数据库
  138. if end_time <= boundary:
  139. df = fetch_sensor_data_chunked(sensor_name, start_time, end_time, "ws_data_test", "raw")
  140. elif start_time >= boundary:
  141. df = fetch_sensor_data_chunked(sensor_name, start_time, end_time, "ws_data", "raw")
  142. else:
  143. df1 = fetch_sensor_data_chunked(sensor_name, start_time, boundary, "ws_data_test", "raw")
  144. df2 = fetch_sensor_data_chunked(sensor_name, boundary, end_time, "ws_data", "raw")
  145. df = pd.concat([df1, df2], ignore_index=True)
  146. if df.empty:
  147. print(f" 警告: {sensor_name} 未获取到数据")
  148. continue
  149. # 重命名列
  150. df = df.rename(columns={'val': sensor_name})
  151. # 保存单个控制字传感器数据
  152. control_word_file = os.path.join(output_dir, f"{sensor_name.replace('@', '_').replace('.', '_')}_raw.csv")
  153. df.to_csv(control_word_file, index=False, encoding='utf_8_sig')
  154. saved_files.append(control_word_file)
  155. print(f" 已保存 {sensor_name} 原始数据,共 {len(df):,} 条记录")
  156. # 添加延迟
  157. if i < len(sensor_configs) - 1:
  158. time.sleep(2)
  159. except Exception as e:
  160. print(f" 处理 {sensor_name} 失败: {str(e)}")
  161. continue
  162. return saved_files
  163. # ---------- 处理CIP传感器(合并保存) ----------
  164. def process_cip_sensors(sensor_configs, start_time, end_time, boundary, output_dir):
  165. """
  166. 处理CIP传感器,直接在数据库层面进行1分钟降采样后合并保存为一个文件
  167. """
  168. all_sensor_dfs = []
  169. # 逐个传感器查询(已经在数据库层面降采样)
  170. for i, sensor_config in enumerate(sensor_configs):
  171. sensor_name = sensor_config["name"]
  172. print(f"\n[{i + 1}/{len(sensor_configs)}] 处理CIP传感器: {sensor_name}")
  173. try:
  174. # 根据时间范围选择数据库
  175. if end_time <= boundary:
  176. df = fetch_sensor_data_chunked(sensor_name, start_time, end_time, "ws_data_test", "1min")
  177. elif start_time >= boundary:
  178. df = fetch_sensor_data_chunked(sensor_name, start_time, end_time, "ws_data", "1min")
  179. else:
  180. df1 = fetch_sensor_data_chunked(sensor_name, start_time, boundary, "ws_data_test", "1min")
  181. df2 = fetch_sensor_data_chunked(sensor_name, boundary, end_time, "ws_data", "1min")
  182. df = pd.concat([df1, df2], ignore_index=True)
  183. if df.empty:
  184. print(f" 警告: {sensor_name} 未获取到数据")
  185. continue
  186. # 重命名列并确保时间格式一致
  187. df = df.rename(columns={'val': sensor_name})
  188. df['time'] = pd.to_datetime(df['time'])
  189. # 设置时间为索引以便后续合并
  190. df = df.set_index('time')
  191. all_sensor_dfs.append(df)
  192. print(f" 已获取 {sensor_name} 1分钟降采样数据,共 {len(df):,} 条记录")
  193. # 添加延迟
  194. if i < len(sensor_configs) - 1:
  195. time.sleep(2)
  196. except Exception as e:
  197. print(f" 处理 {sensor_name} 失败: {str(e)}")
  198. continue
  199. # 合并所有CIP传感器数据
  200. if not all_sensor_dfs:
  201. print("没有可处理的CIP传感器数据")
  202. return None
  203. print(f"\n开始合并 {len(all_sensor_dfs)} 个CIP传感器数据...")
  204. # 使用outer join合并所有数据
  205. merged_df = all_sensor_dfs[0]
  206. for i in range(1, len(all_sensor_dfs)):
  207. merged_df = merged_df.join(all_sensor_dfs[i], how='outer')
  208. # 重置索引并排序
  209. merged_df = merged_df.reset_index()
  210. merged_df = merged_df.rename(columns={'index': 'time'})
  211. merged_df = merged_df.sort_values('time')
  212. # 处理缺失值(前后向填充)
  213. print("进行缺失值填充...")
  214. numeric_cols = merged_df.select_dtypes(include=[np.number]).columns
  215. merged_df[numeric_cols] = merged_df[numeric_cols].ffill().bfill()
  216. # 保存最终文件
  217. cip_output_file = os.path.join(output_dir, "cip_sensors_1min_merged.csv")
  218. merged_df.to_csv(cip_output_file, index=False, encoding='utf_8_sig')
  219. print(f"已保存合并的CIP传感器数据,共 {len(merged_df):,} 条记录")
  220. return cip_output_file
  221. # ---------- 主程序 ----------
  222. if __name__ == "__main__":
  223. print("开始获取高频率CIP相关传感器数据")
  224. print(f"时间范围: {START_TIME} 到 {END_TIME}")
  225. print(f"控制字传感器数量: {len(CONTROL_WORD_SENSORS)} (原始数据)")
  226. print(f"CIP传感器数量: {len(CIP_SENSORS)} (1分钟降采样)")
  227. print("=" * 60)
  228. try:
  229. # 1. 处理控制字传感器(每个单独保存)
  230. print("\n=== 处理控制字传感器 ===")
  231. control_word_files = process_control_word_sensors(
  232. CONTROL_WORD_SENSORS,
  233. START_TIME,
  234. END_TIME,
  235. BOUNDARY,
  236. OUTPUT_DIR
  237. )
  238. # 2. 处理CIP传感器(合并保存)
  239. print("\n=== 处理CIP传感器 ===")
  240. cip_file = process_cip_sensors(
  241. CIP_SENSORS,
  242. START_TIME,
  243. END_TIME,
  244. BOUNDARY,
  245. OUTPUT_DIR
  246. )
  247. print("\n=== 处理完成 ===")
  248. print(f"控制字文件: {len(control_word_files)} 个")
  249. print(f"CIP传感器文件: 1 个")
  250. if control_word_files:
  251. print("控制字文件列表:")
  252. for file in control_word_files:
  253. print(f" - {os.path.basename(file)}")
  254. if cip_file:
  255. print(f"CIP传感器文件: {os.path.basename(cip_file)}")
  256. except KeyboardInterrupt:
  257. print("\n用户中断程序执行")
  258. except Exception as e:
  259. print(f"程序执行出错: {str(e)}")