data_export.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. import os, pandas as pd
  2. from datetime import datetime
  3. from sqlalchemy import create_engine, text
  4. from functools import reduce
  5. # ---------- 配置 ----------
  6. DB_USER = "whu"
  7. DB_PASS = "09093f4e6b33ddd"
  8. DB_HOST = "222.130.26.206"
  9. DB_PORT = 4000
  10. # 时间配置
  11. START_TIME = datetime(2025, 11, 28, 0, 0, 0)
  12. END_TIME = datetime(2026, 2, 20, 0, 0, 0)
  13. BOUNDARY = datetime(2025, 3, 25, 0, 0, 0)
  14. DELETE_PERIODS = [
  15. ("2024-04-24 13:42:00", "2024-04-24 18:26:00"),
  16. ("2024-11-09 12:34:00", "2024-11-11 10:46:00"),
  17. ("2024-12-12 08:52:00", "2024-12-12 17:22:00"),
  18. ("2024-12-15 16:00:00", "2024-12-16 09:34:00"),
  19. ("2025-01-28 10:58:00", "2025-02-05 11:24:00"),
  20. ] # 来自张昊师兄的,需要被去除的黑名单区间
  21. DELETE_PERIODS = [(pd.to_datetime(s), pd.to_datetime(e)) for s, e in DELETE_PERIODS]
  22. # 新水岛
  23. # ---------- 传感器配置(新水厂系统) ----------
  24. # 机组编号
  25. UNITS = [1, 2]
  26. # 新系统变量模板
  27. BASE_VARIABLES = [
  28. "ns=3;s={}#UF_JSFLOW_O", # 进水流量
  29. "ns=3;s={}#UF_JSPRESS_O", # 进水压力(如有)
  30. "ns=3;s=UF{}_SSD_KMYC", # 跨膜压差(如有)
  31. "ns=3;s=UF{}_STEP" # 步序
  32. ]
  33. # 系统总变量(如果存在)
  34. SYSTEM_VARIABLES = [
  35. "ns=3;s=ZJS_TEMP_O", # 进水温度
  36. "ns=3;s=RO_JSORP_O", # 总产水ORP(示例)
  37. "ns=3;s=RO_JSPH_O", # 总产水PH(示例)
  38. ]
  39. # 生成所有变量名称
  40. SENSOR_NAMES = []
  41. for unit in UNITS:
  42. for var_template in BASE_VARIABLES:
  43. SENSOR_NAMES.append(var_template.format(unit))
  44. SENSOR_NAMES.extend(SYSTEM_VARIABLES)
  45. print(f"总共查询 {len(SENSOR_NAMES)} 个变量")
  46. for i, var in enumerate(SENSOR_NAMES, 1):
  47. print(f"{i:2d}. {var}")
  48. # 输出目录 - 只需要processed文件夹
  49. BASE_OUTPUT_DIR = "../datasets/UF_longting_data"
  50. PROCESSED_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "raw")
  51. # 创建目录
  52. os.makedirs(PROCESSED_OUTPUT_DIR, exist_ok=True)
  53. # ---------- 创建数据库引擎 ----------
  54. def create_db_engines():
  55. """
  56. 创建数据库引擎,每个数据库只创建一次
  57. """
  58. try:
  59. # 为两个数据库分别创建引擎
  60. engine_test = create_engine(
  61. f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/ws_data_test?charset=utf8mb4"
  62. )
  63. engine_prod = create_engine(
  64. f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/ws_data?charset=utf8mb4"
  65. )
  66. # 测试连接
  67. with engine_test.connect() as conn:
  68. print("✅ 测试数据库连接成功!")
  69. with engine_prod.connect() as conn:
  70. print("✅ 生产数据库连接成功!")
  71. return engine_test, engine_prod
  72. except Exception as e:
  73. print(f"❌ 数据库连接失败: {e}")
  74. return None, None
  75. # ---------- 一分钟聚合查询函数(子查询方式)----------
  76. def fetch_valve_aggregated(name, start, end, engine, interval_minutes=1):
  77. """
  78. 从数据库获取传感器数据并直接进行时间聚合
  79. 使用子查询方式避免 ONLY_FULL_GROUP_BY 问题
  80. """
  81. interval_seconds = interval_minutes * 60
  82. sql = text(f"""
  83. SELECT
  84. MIN(h_time) AS time,
  85. AVG(val) AS val
  86. FROM (
  87. SELECT
  88. h_time,
  89. val,
  90. FLOOR(UNIX_TIMESTAMP(h_time) / {interval_seconds}) AS time_group
  91. FROM dc_item_history_data_1450
  92. WHERE item_name = :name
  93. AND h_time BETWEEN :st AND :et
  94. AND val IS NOT NULL
  95. ) t
  96. GROUP BY t.time_group
  97. ORDER BY time
  98. """)
  99. try:
  100. df = pd.read_sql(sql, engine, params={"name": name, "st": start, "et": end})
  101. if not df.empty:
  102. df['time'] = pd.to_datetime(df['time'])
  103. print(f" ✓ {name}: {len(df)} 条记录")
  104. return df
  105. except Exception as e:
  106. print(f" ✗ {name} 查询失败: {str(e)}")
  107. return pd.DataFrame()
  108. # ---------- 传感器数据查询函数(只获取聚合数据)----------
  109. def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test, engine_prod):
  110. """
  111. 获取多个传感器的分钟级聚合数据并合并为宽表
  112. 参数:
  113. sensor_names: 传感器名称列表
  114. start_time: 开始时间
  115. end_time: 结束时间
  116. boundary: 数据库切换边界
  117. engine_test: 测试数据库引擎
  118. engine_prod: 生产数据库引擎
  119. """
  120. all_data = [] # 只存储聚合数据
  121. print("\n开始获取各传感器数据:")
  122. for sensor in sensor_names:
  123. try:
  124. # 根据边界时间选择合适的数据库引擎
  125. if end_time <= boundary:
  126. # 全部在测试数据库
  127. df = fetch_valve_aggregated(sensor, start_time, end_time, engine_test)
  128. elif start_time >= boundary:
  129. # 全部在生产数据库
  130. df = fetch_valve_aggregated(sensor, start_time, end_time, engine_prod)
  131. else:
  132. # 跨越两个数据库
  133. df1 = fetch_valve_aggregated(sensor, start_time, boundary, engine_test)
  134. df2 = fetch_valve_aggregated(sensor, boundary, end_time, engine_prod)
  135. df = pd.concat([df1, df2], ignore_index=True)
  136. if not df.empty:
  137. # 重命名列以区分不同传感器
  138. df_renamed = df.rename(columns={'val': sensor})
  139. all_data.append(df_renamed[['time', sensor]])
  140. else:
  141. print(f" ⚠ {sensor}: 无数据")
  142. except Exception as e:
  143. print(f" ⚠ {sensor}: 处理失败 - {str(e)}")
  144. continue
  145. if not all_data:
  146. print("\n❌ 未获取到任何传感器数据")
  147. return pd.DataFrame()
  148. print(f"\n开始合并 {len(all_data)} 个传感器的数据...")
  149. # 使用reduce逐步合并DataFrame
  150. merged_df = reduce(
  151. lambda left, right: pd.merge(left, right, on='time', how='outer'),
  152. all_data
  153. )
  154. # 按时间排序
  155. merged_df = merged_df.sort_values('time').reset_index(drop=True)
  156. print(f"合并完成,共 {len(merged_df)} 条时间记录")
  157. # 删除黑名单时段
  158. print("删除黑名单时段...")
  159. original_len = len(merged_df)
  160. for s, e in DELETE_PERIODS:
  161. merged_df = merged_df[(merged_df['time'] < s) | (merged_df['time'] > e)]
  162. deleted_count = original_len - len(merged_df)
  163. if deleted_count > 0:
  164. print(f"已删除 {deleted_count} 条黑名单时段数据")
  165. return merged_df
  166. # ---------- 数据后处理函数(填充空值)----------
  167. def post_process_data(df, method='both'):
  168. """
  169. 对聚合后的数据进行后处理:填充空值
  170. 参数:
  171. df: 输入的宽表DataFrame
  172. method: 填充方法
  173. 返回:
  174. pd.DataFrame: 处理后的DataFrame
  175. """
  176. if df.empty:
  177. print("警告:输入数据为空")
  178. return df
  179. df_processed = df.copy()
  180. df_processed['time'] = pd.to_datetime(df_processed['time'])
  181. df_processed = df_processed.set_index('time')
  182. print(f"\n开始填充空值...")
  183. print(f"数据时间范围: {df_processed.index.min()} 到 {df_processed.index.max()}")
  184. print(f"数据行数: {len(df_processed)}")
  185. missing_before = df_processed.isnull().sum().sum()
  186. print(f"填充前空值数量: {missing_before}")
  187. if method == 'bfill':
  188. df_processed = df_processed.bfill()
  189. elif method == 'ffill':
  190. df_processed = df_processed.ffill()
  191. else: # both
  192. df_processed = df_processed.ffill().bfill()
  193. missing_after = df_processed.isnull().sum().sum()
  194. print(f"填充后空值数量: {missing_after}")
  195. print(f"填充了 {missing_before - missing_after} 个空值")
  196. # 重置索引
  197. df_processed = df_processed.reset_index().rename(columns={'index': 'time'})
  198. return df_processed
  199. # ---------- 数据分块保存函数 ----------
  200. def save_data_chunks(df, output_dir, prefix="sensor_data", chunk_size=200000):
  201. """
  202. 将数据集分块保存,命名格式为 {prefix}_partX_of_Y.csv
  203. """
  204. total_rows = len(df)
  205. num_chunks = (total_rows + chunk_size - 1) // chunk_size # 向上取整
  206. print(f"\n开始分块保存数据(共 {num_chunks} 个文件)...")
  207. for i in range(num_chunks):
  208. start_idx = i * chunk_size
  209. end_idx = min((i + 1) * chunk_size, total_rows)
  210. chunk_df = df.iloc[start_idx:end_idx]
  211. if not chunk_df.empty:
  212. file_path = os.path.join(
  213. output_dir,
  214. f"{prefix}_part{i + 1}_of_{num_chunks}.csv"
  215. )
  216. chunk_df.to_csv(file_path, index=False, encoding='utf_8_sig')
  217. print(f" ✓ 已保存第 {i + 1}/{num_chunks} 个文件: {os.path.basename(file_path)} ({len(chunk_df)} 行)")
  218. return num_chunks
  219. # ---------- 主程序 ----------
  220. if __name__ == "__main__":
  221. print("=" * 70)
  222. print("传感器数据采集程序 - 分钟级聚合版本")
  223. print("=" * 70)
  224. # 1. 创建数据库引擎
  225. print("\n[1/4] 创建数据库连接...")
  226. engine_test, engine_prod = create_db_engines()
  227. if engine_test is None or engine_prod is None:
  228. print("❌ 数据库连接失败,程序退出")
  229. exit(1)
  230. # 2. 获取一分钟聚合数据
  231. print(f"\n[2/4] 获取一分钟聚合数据...")
  232. print(f"时间范围: {START_TIME} 到 {END_TIME}")
  233. agg_df = fetch_sensor_data(
  234. SENSOR_NAMES,
  235. START_TIME,
  236. END_TIME,
  237. BOUNDARY,
  238. engine_test,
  239. engine_prod
  240. )
  241. if agg_df.empty:
  242. print("\n❌ 未获取到任何聚合数据,程序退出")
  243. exit(1)
  244. print(f"\n✅ 聚合数据获取完成!")
  245. print(f"总数据量: {len(agg_df)} 条记录")
  246. print(f"时间范围: {agg_df['time'].min()} 到 {agg_df['time'].max()}")
  247. print(f"时间间隔: 1分钟")
  248. print(f"传感器数量: {len(agg_df.columns) - 1} 个") # 减1是因为time列
  249. # 3. 后处理聚合数据(填充空值)
  250. print(f"\n[3/4] 后处理聚合数据(填充空值)...")
  251. processed_df = post_process_data(agg_df, method='both')
  252. print(f"\n✅ 后处理完成!")
  253. print(f"处理后数据行数: {len(processed_df)}")
  254. # 4. 保存数据(分块保存)
  255. print(f"\n[4/4] 保存数据...")
  256. # 直接使用分块保存,不再保存单个大文件
  257. chunk_size = 200000 # 每块20万行
  258. num_chunks = save_data_chunks(
  259. processed_df,
  260. PROCESSED_OUTPUT_DIR,
  261. "uf_all_units_processed_1min",
  262. chunk_size
  263. )
  264. print("\n" + "=" * 70)
  265. print("✅ 所有任务完成!")
  266. print("=" * 70)
  267. # 显示文件信息
  268. print(f"\n文件信息:")
  269. print(f"输出目录: {PROCESSED_OUTPUT_DIR}")
  270. print(f"文件数量: {num_chunks} 个")
  271. # 计算总大小
  272. total_size = 0
  273. for i in range(num_chunks):
  274. file_path = os.path.join(
  275. PROCESSED_OUTPUT_DIR,
  276. f"uf_all_units_processed_1min_part{i + 1}_of_{num_chunks}.csv"
  277. )
  278. if os.path.exists(file_path):
  279. file_size = os.path.getsize(file_path) / (1024 * 1024)
  280. total_size += file_size
  281. print(f" {os.path.basename(file_path)}: {file_size:.2f} MB")
  282. print(f"总文件大小: {total_size:.2f} MB")