data_export.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. import os, pandas as pd
  2. from datetime import datetime
  3. from sqlalchemy import create_engine, text
  4. from functools import reduce
  5. # ---------- 配置 ----------
  6. DB_USER = "whu"
  7. DB_PASS = "09093f4e6b33ddd"
  8. DB_HOST = "222.130.26.206"
  9. DB_PORT = 4000
  10. # 时间配置
  11. START_TIME = datetime(2026, 1, 22, 0, 0, 0)
  12. END_TIME = datetime(2026, 3, 15, 6, 0, 0)
  13. BOUNDARY = datetime(2025, 3, 25, 0, 0, 0)
  14. DELETE_PERIODS = [
  15. ("2024-04-24 13:42:00", "2024-04-24 18:26:00"),
  16. ("2024-11-09 12:34:00", "2024-11-11 10:46:00"),
  17. ("2024-12-12 08:52:00", "2024-12-12 17:22:00"),
  18. ("2024-12-15 16:00:00", "2024-12-16 09:34:00"),
  19. ("2025-01-28 10:58:00", "2025-02-05 11:24:00"),
  20. ] # 来自张昊师兄的,需要被去除的黑名单区间
  21. DELETE_PERIODS = [(pd.to_datetime(s), pd.to_datetime(e)) for s, e in DELETE_PERIODS]
  22. # ---------- 传感器配置----------
  23. # 机组编号
  24. UNITS = [1, 2]
  25. BASE_VARIABLES = [
  26. "ns=3;s={}#UF_JSFLOW_O", # 进水流量
  27. "ns=3;s={}#UF_JSPRESS_O", # 进水压力
  28. "ns=3;s=UF{}_SSD_KMYC", # 跨膜压差
  29. "ns=3;s=UF{}_STEP", # 步序/控制字
  30. "ns=3;s=ZZ_{}#UFBWB_POWER" # 反洗泵功率
  31. ]
  32. SYSTEM_VARIABLES = [
  33. "ns=3;s=ZJS_TEMP_O", # 进水温度
  34. "ns=3;s=RO_JSORP_O", # 总产水ORP
  35. "ns=3;s=RO_JSPH_O", # 总产水PH
  36. "ns=3;s=RO_JSDD_O", # 总产水电导
  37. "ns=3;s=CN_LEVEL_O", # 次钠液位
  38. "ns=3;s=S_LEVEL_O", # 酸液位
  39. "ns=3;s=J_LEVEL_O", # 碱液位
  40. "ns=3;s=ZZ_UFGSB_POWER", # 超滤供水泵功率
  41. ]
  42. # 输出目录
  43. BASE_OUTPUT_DIR = "../datasets/UF_yancheng_data"
  44. PROCESSED_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "raw")
  45. # 创建目录
  46. os.makedirs(PROCESSED_OUTPUT_DIR, exist_ok=True)
  47. # 生成所有变量名称
  48. SENSOR_NAMES = []
  49. for unit in UNITS:
  50. for var_template in BASE_VARIABLES:
  51. SENSOR_NAMES.append(var_template.format(unit))
  52. SENSOR_NAMES.extend(SYSTEM_VARIABLES)
  53. print(f"总共查询 {len(SENSOR_NAMES)} 个变量")
  54. for i, var in enumerate(SENSOR_NAMES, 1):
  55. print(f"{i:2d}. {var}")
  56. # ---------- 创建数据库引擎 ----------
  57. def create_db_engines():
  58. """
  59. 创建数据库引擎,每个数据库只创建一次
  60. """
  61. try:
  62. # 为两个数据库分别创建引擎
  63. engine_test = create_engine(
  64. f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/ws_data_test?charset=utf8mb4"
  65. )
  66. engine_prod = create_engine(
  67. f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/ws_data?charset=utf8mb4"
  68. )
  69. # 测试连接
  70. with engine_test.connect() as conn:
  71. print("✅ 测试数据库连接成功!")
  72. with engine_prod.connect() as conn:
  73. print("✅ 生产数据库连接成功!")
  74. return engine_test, engine_prod
  75. except Exception as e:
  76. print(f"❌ 数据库连接失败: {e}")
  77. return None, None
  78. # ---------- 一分钟聚合查询函数(子查询方式)----------
  79. def fetch_valve_aggregated(name, start, end, engine, interval_minutes=1):
  80. """
  81. 从数据库获取传感器数据并按分钟聚合,时间戳对齐到分钟开始(00秒)
  82. """
  83. interval_seconds = interval_minutes * 60
  84. sql = text(f"""
  85. SELECT
  86. FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(MIN(h_time)) / {interval_seconds}) * {interval_seconds}) AS time,
  87. AVG(val) AS val
  88. FROM (
  89. SELECT
  90. h_time,
  91. val,
  92. FLOOR(UNIX_TIMESTAMP(h_time) / {interval_seconds}) AS time_group
  93. FROM dc_item_history_data_1497
  94. WHERE item_name = :name
  95. AND h_time BETWEEN :st AND :et
  96. AND val IS NOT NULL
  97. ) t
  98. GROUP BY t.time_group
  99. ORDER BY time
  100. """)
  101. try:
  102. df = pd.read_sql(sql, engine, params={"name": name, "st": start, "et": end})
  103. if not df.empty:
  104. df['time'] = pd.to_datetime(df['time'])
  105. # 确保时间戳是整分钟(去除秒和微秒)
  106. df['time'] = df['time'].dt.floor('1min')
  107. print(f" ✓ {name}: {len(df)} 条记录")
  108. return df
  109. except Exception as e:
  110. print(f" ✗ {name} 查询失败: {str(e)}")
  111. return pd.DataFrame()
  112. def fetch_special_data(sensor, start, end, boundary, engine_test, engine_prod):
  113. """
  114. 专门获取步序数据,保持原始变化点
  115. 返回原始时间戳和值
  116. """
  117. sql = text("""
  118. SELECT h_time AS time, val
  119. FROM dc_item_history_data_1497
  120. WHERE item_name = :name
  121. AND h_time BETWEEN :st
  122. AND :et
  123. AND val IS NOT NULL
  124. ORDER BY h_time
  125. """)
  126. try:
  127. # 根据边界时间选择合适的数据库引擎
  128. if end <= boundary:
  129. df = pd.read_sql(sql, engine_test, params={"name": sensor, "st": start, "et": end})
  130. elif start >= boundary:
  131. df = pd.read_sql(sql, engine_prod, params={"name": sensor, "st": start, "et": end})
  132. else:
  133. df1 = pd.read_sql(sql, engine_test, params={"name": sensor, "st": start, "et": boundary})
  134. df2 = pd.read_sql(sql, engine_prod, params={"name": sensor, "st": boundary, "et": end})
  135. df = pd.concat([df1, df2], ignore_index=True)
  136. if not df.empty:
  137. df['time'] = pd.to_datetime(df['time'])
  138. return df
  139. except Exception as e:
  140. print(f" ✗ {sensor} 查询失败: {str(e)}")
  141. return pd.DataFrame()
  142. # ---------- 传感器数据查询函数(只获取聚合数据)----------
  143. # ---------- 传感器数据查询函数(只获取聚合数据)----------
  144. def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test, engine_prod):
  145. """
  146. 获取多个传感器的分钟级聚合数据并合并为宽表
  147. 步序变量单独处理
  148. """
  149. # 识别步序变量和功率变量
  150. step_vars = [v for v in sensor_names if 'STEP' in v]
  151. power_vars = [v for v in sensor_names if 'POWER' in v]
  152. # 需要特殊处理的变量
  153. special_vars = step_vars + power_vars
  154. # 其他连续变量
  155. continuous_vars = [v for v in sensor_names if v not in special_vars]
  156. print(f"\n识别到 {len(special_vars)} 个离散变量, {len(continuous_vars)} 个连续变量")
  157. # 3. 创建完整的时间网格(整分钟)- 先创建
  158. print(f"\n创建完整时间网格...")
  159. time_grid = pd.date_range(
  160. start=start_time.replace(second=0, microsecond=0),
  161. end=end_time.replace(second=0, microsecond=0),
  162. freq='1min'
  163. )
  164. # 创建以时间为索引的DataFrame
  165. merged_df = pd.DataFrame(index=time_grid)
  166. print(f"时间网格: {len(time_grid)} 个时间点")
  167. # 1. 处理连续变量(按分钟聚合,时间对齐到整分钟)
  168. if continuous_vars:
  169. print("\n获取连续变量数据(分钟平均):")
  170. for sensor in continuous_vars:
  171. try:
  172. # 根据边界时间选择合适的数据库引擎
  173. if end_time <= boundary:
  174. df = fetch_valve_aggregated(sensor, start_time, end_time, engine_test)
  175. elif start_time >= boundary:
  176. df = fetch_valve_aggregated(sensor, start_time, end_time, engine_prod)
  177. else:
  178. df1 = fetch_valve_aggregated(sensor, start_time, boundary, engine_test)
  179. df2 = fetch_valve_aggregated(sensor, boundary, end_time, engine_prod)
  180. df = pd.concat([df1, df2], ignore_index=True)
  181. if not df.empty:
  182. # 确保时间戳是整分钟并设为索引
  183. df['time'] = pd.to_datetime(df['time']).dt.floor('1min')
  184. df = df.drop_duplicates(subset=['time'], keep='first')
  185. df = df.set_index('time')
  186. # 添加到合并DataFrame
  187. merged_df[sensor] = df['val']
  188. print(f" ✓ {sensor}: {len(df)} 条记录")
  189. else:
  190. print(f" ⚠ {sensor}: 无数据")
  191. except Exception as e:
  192. print(f" ⚠ {sensor}: 处理失败 - {str(e)}")
  193. continue
  194. # 2. 处理离散变量(保持原始变化点)
  195. if special_vars:
  196. print("\n获取离散变量数据(原始变化点):")
  197. for sensor in special_vars:
  198. try:
  199. # 获取原始数据(不聚合)
  200. df = fetch_special_data(sensor, start_time, end_time, boundary, engine_test, engine_prod)
  201. if not df.empty:
  202. # 创建分钟级的重采样
  203. df['time'] = pd.to_datetime(df['time'])
  204. df = df.set_index('time')
  205. # 重采样到分钟,对于离散变量使用前向填充
  206. df_resampled = df.resample('1min').ffill()
  207. # 添加到合并DataFrame
  208. merged_df[sensor] = df_resampled['val']
  209. print(f" ✓ {sensor}: {len(df)} 个原始点 -> {len(df_resampled)} 个分钟点")
  210. else:
  211. print(f" ⚠ {sensor}: 无数据")
  212. except Exception as e:
  213. print(f" ⚠ {sensor}: 处理失败 - {str(e)}")
  214. continue
  215. if merged_df.empty or len(merged_df.columns) == 0:
  216. print("\n❌ 未获取到任何传感器数据")
  217. return pd.DataFrame()
  218. # 重置索引,将时间变为列
  219. merged_df = merged_df.reset_index()
  220. merged_df = merged_df.rename(columns={'index': 'time'})
  221. print(f"\n合并完成,共 {len(merged_df)} 条时间记录 × {len(merged_df.columns) - 1} 个传感器")
  222. print(f"数据框形状: {merged_df.shape}")
  223. # 5. 删除黑名单时段
  224. print("\n删除黑名单时段...")
  225. original_len = len(merged_df)
  226. for s, e in DELETE_PERIODS:
  227. s = pd.to_datetime(s).floor('1min')
  228. e = pd.to_datetime(e).floor('1min')
  229. merged_df = merged_df[(merged_df['time'] < s) | (merged_df['time'] > e)]
  230. deleted_count = original_len - len(merged_df)
  231. if deleted_count > 0:
  232. print(f"已删除 {deleted_count} 条黑名单时段数据")
  233. return merged_df
  234. # ---------- 数据后处理函数(填充空值)----------
  235. def post_process_data(df, continuous_vars, step_vars):
  236. """
  237. 对聚合后的数据进行后处理
  238. 连续变量:线性插值或前后填充
  239. 步序变量:已经前向填充,只需处理开头可能存在的空值
  240. """
  241. if df.empty:
  242. print("警告:输入数据为空")
  243. return df
  244. df_processed = df.copy()
  245. df_processed['time'] = pd.to_datetime(df_processed['time'])
  246. df_processed = df_processed.set_index('time')
  247. print(f"\n开始填充空值...")
  248. print(f"数据时间范围: {df_processed.index.min()} 到 {df_processed.index.max()}")
  249. print(f"数据行数: {len(df_processed)}")
  250. missing_before = df_processed.isnull().sum().sum()
  251. print(f"填充前空值数量: {missing_before}")
  252. # 处理连续变量(使用线性插值更适合连续物理量)
  253. for var in continuous_vars:
  254. if var in df_processed.columns:
  255. # 先线性插值,再前后填充边界
  256. df_processed[var] = df_processed[var].interpolate(method='linear', limit_direction='both')
  257. # 处理步序变量(可能开头有NaN,用后向填充)
  258. for var in step_vars:
  259. if var in df_processed.columns:
  260. df_processed[var] = df_processed[var].bfill() # 开头空值用第一个有效值填充
  261. missing_after = df_processed.isnull().sum().sum()
  262. print(f"填充后空值数量: {missing_after}")
  263. print(f"填充了 {missing_before - missing_after} 个空值")
  264. return df_processed.reset_index()
  265. # ---------- 数据分块保存函数 ----------
  266. def save_data_chunks(df, output_dir, prefix="sensor_data", chunk_size=200000):
  267. """
  268. 将数据集分块保存,命名格式为 {prefix}_partX_of_Y.csv
  269. """
  270. total_rows = len(df)
  271. num_chunks = (total_rows + chunk_size - 1) // chunk_size # 向上取整
  272. print(f"\n开始分块保存数据(共 {num_chunks} 个文件)...")
  273. for i in range(num_chunks):
  274. start_idx = i * chunk_size
  275. end_idx = min((i + 1) * chunk_size, total_rows)
  276. chunk_df = df.iloc[start_idx:end_idx]
  277. if not chunk_df.empty:
  278. file_path = os.path.join(
  279. output_dir,
  280. f"{prefix}_part{i + 1}_of_{num_chunks}.csv"
  281. )
  282. chunk_df.to_csv(file_path, index=False, encoding='utf_8_sig')
  283. print(f" ✓ 已保存第 {i + 1}/{num_chunks} 个文件: {os.path.basename(file_path)} ({len(chunk_df)} 行)")
  284. return num_chunks
  285. # ---------- 主程序 ----------
  286. if __name__ == "__main__":
  287. print("=" * 70)
  288. print("传感器数据采集程序 - 分钟级聚合版本")
  289. print("=" * 70)
  290. # 1. 创建数据库引擎
  291. print("\n[1/4] 创建数据库连接...")
  292. engine_test, engine_prod = create_db_engines()
  293. if engine_test is None or engine_prod is None:
  294. print("❌ 数据库连接失败,程序退出")
  295. exit(1)
  296. # 2. 获取一分钟聚合数据
  297. print(f"\n[2/4] 获取一分钟聚合数据...")
  298. print(f"时间范围: {START_TIME} 到 {END_TIME}")
  299. agg_df = fetch_sensor_data(
  300. SENSOR_NAMES,
  301. START_TIME,
  302. END_TIME,
  303. BOUNDARY,
  304. engine_test,
  305. engine_prod
  306. )
  307. if agg_df.empty:
  308. print("\n❌ 未获取到任何聚合数据,程序退出")
  309. exit(1)
  310. print(f"\n✅ 聚合数据获取完成!")
  311. print(f"总数据量: {len(agg_df)} 条记录")
  312. print(f"时间范围: {agg_df['time'].min()} 到 {agg_df['time'].max()}")
  313. print(f"时间间隔: 1分钟(整点)")
  314. print(f"传感器数量: {len(agg_df.columns) - 1} 个")
  315. # 识别步序变量
  316. step_vars = [col for col in agg_df.columns if 'STEP' in col]
  317. continuous_vars = [col for col in agg_df.columns if col != 'time' and col not in step_vars]
  318. # 3. 后处理数据
  319. print(f"\n[3/4] 后处理聚合数据...")
  320. processed_df = post_process_data(agg_df, continuous_vars, step_vars)
  321. print(f"\n✅ 后处理完成!")
  322. print(f"处理后数据行数: {len(processed_df)}")
  323. # 4. 保存数据(分块保存)
  324. print(f"\n[4/4] 保存数据...")
  325. # 直接使用分块保存,不再保存单个大文件
  326. chunk_size = 200000 # 每块20万行
  327. num_chunks = save_data_chunks(
  328. processed_df,
  329. PROCESSED_OUTPUT_DIR,
  330. "uf_all_units_processed_1min",
  331. chunk_size
  332. )
  333. print("\n" + "=" * 70)
  334. print("✅ 所有任务完成!")
  335. print("=" * 70)
  336. # 显示文件信息
  337. print(f"\n文件信息:")
  338. print(f"输出目录: {PROCESSED_OUTPUT_DIR}")
  339. print(f"文件数量: {num_chunks} 个")
  340. # 计算总大小
  341. total_size = 0
  342. for i in range(num_chunks):
  343. file_path = os.path.join(
  344. PROCESSED_OUTPUT_DIR,
  345. f"uf_all_units_processed_1min_part{i + 1}_of_{num_chunks}.csv"
  346. )
  347. if os.path.exists(file_path):
  348. file_size = os.path.getsize(file_path) / (1024 * 1024)
  349. total_size += file_size
  350. print(f" {os.path.basename(file_path)}: {file_size:.2f} MB")
  351. print(f"总文件大小: {total_size:.2f} MB")