data_mysql.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import os
  2. import pandas as pd
  3. import json
  4. from datetime import datetime, timedelta
  5. from sqlalchemy import create_engine, text
  6. from sqlalchemy.exc import OperationalError
  7. # 加载配置文件
  8. def load_database_config():
  9. """从config.json加载数据库配置"""
  10. config_path = os.path.join(os.path.dirname(__file__), '..', 'config.json')
  11. try:
  12. with open(config_path, 'r', encoding='utf-8') as f:
  13. config = json.load(f)
  14. return config['database']
  15. except Exception as e:
  16. print(f"加载数据库配置失败: {e}")
  17. # 回退到环境变量或默认值
  18. return {
  19. 'host': os.getenv('DB_HOST', '222.130.26.206'),
  20. 'port': int(os.getenv('DB_PORT', '4000')),
  21. 'user': os.getenv('DB_USERNAME', 'whu'),
  22. 'password': os.getenv('DB_PASSWORD', '09093f4e6b33ddd'),
  23. 'database': os.getenv('DB_DATABASE', 'ws_data'),
  24. 'table_name': 'dc_item_history_data_hour'
  25. }
  26. # 数据库引擎创建部分
  27. db_config = load_database_config()
  28. try:
  29. database_url = f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}?charset=utf8mb4"
  30. engine = create_engine(database_url)
  31. print(f"数据库引擎创建成功!连接到: {db_config['host']}:{db_config['port']}/{db_config['database']}")
  32. except Exception as e:
  33. print(f"创建数据库引擎失败: {e}")
  34. engine = None
  35. def get_sensor_data(start_date=None):
  36. if engine is None:
  37. print("数据库引擎未初始化,无法查询数据。")
  38. return pd.DataFrame()
  39. if start_date is None:
  40. start_date = datetime.today()
  41. else:
  42. start_date = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S')
  43. print(start_date)
  44. end_date = start_date
  45. start_date = start_date - timedelta(days=180)
  46. start_date_str = start_date.strftime('%Y-%m-%d %H:%M:%S')
  47. end_date_str = end_date.strftime('%Y-%m-%d %H:%M:%S')
  48. item_names = [
  49. 'C.M.RO1_FT_JS@out', 'C.M.RO2_FT_JS@out', 'C.M.RO3_FT_JS@out', 'C.M.RO4_FT_JS@out',
  50. 'C.M.RO_TT_ZJS@out', 'C.M.RO_Cond_ZJS@out',
  51. 'C.M.RO1_DB@DPT_1', 'C.M.RO1_DB@DPT_2',
  52. 'C.M.RO2_DB@DPT_1', 'C.M.RO2_DB@DPT_2',
  53. 'C.M.RO3_DB@DPT_1', 'C.M.RO3_DB@DPT_2',
  54. 'C.M.RO4_DB@DPT_1', 'C.M.RO4_DB@DPT_2'
  55. ]
  56. # 输出 180天 × 24小时 = 4320 行数据
  57. table_name = db_config['table_name'] # 从配置获取表名
  58. sql_query = text(f"""
  59. SELECT *
  60. FROM {table_name}
  61. WHERE project_id = :proj_id
  62. AND item_name IN :item_names_list
  63. AND h_time >= :start_time AND h_time < :end_time
  64. """)
  65. # 将参数构建为一个字典,key 对应上面查询中的命名占位符
  66. params = {
  67. "proj_id": 92,
  68. "item_names_list": item_names, # 直接将整个列表传入,SQLAlchemy会处理
  69. "start_time": start_date_str,
  70. "end_time": end_date_str
  71. }
  72. try:
  73. with engine.connect() as connection:
  74. print("数据库连接成功!准备使用 text() 结构执行查询...")
  75. data_origin = pd.read_sql(sql_query, connection, params=params)
  76. print(f"查询成功!获取到 {len(data_origin)} 条原始数据。")
  77. except OperationalError as e:
  78. print(f"数据库连接或查询失败: 用户名、密码或地址错误?\n 错误详情: {e}")
  79. return pd.DataFrame()
  80. except Exception as e:
  81. print(f"执行数据库查询时发生未知错误: {e}")
  82. # 打印完整的错误追溯信息,这对于调试至关重要!
  83. import traceback
  84. traceback.print_exc()
  85. return pd.DataFrame()
  86. if data_origin.empty:
  87. print("警告:数据库未返回任何数据,无法进行后续处理。")
  88. return pd.DataFrame()
  89. data_origin['index'] = pd.to_datetime(data_origin['h_time'])
  90. pivot_df = data_origin.pivot_table(index='index', columns='item_name', values='val', aggfunc='first')
  91. pivot_df.reset_index(inplace=True)
  92. print("-" * 20)
  93. print(f"不重复的原始时间戳数量: {pivot_df['index'].nunique()}")
  94. print(f"“取整到小时后”的不重复小时数量: {pivot_df['index'].dt.floor('H').nunique()}")
  95. print("-" * 20)
  96. # -----------------------------
  97. pivot_df = fill_missing_hourly_data(pivot_df, start_date, end_date)
  98. return pivot_df
  99. def fill_missing_hourly_data(df, start_date, end_date):
  100. if df.empty:
  101. print("原始数据为空,无法进行数据补充")
  102. return df
  103. complete_time_range = pd.date_range(start=start_date.replace(minute=0, second=0, microsecond=0),
  104. end=end_date.replace(minute=0, second=0, microsecond=0), freq='H', inclusive='left')
  105. print(f"期望时间范围: {start_date.strftime('%Y-%m-%d %H:%M')} 到 {end_date.strftime('%Y-%m-%d %H:%M')}")
  106. print(f"期望小时数: {len(complete_time_range)} 小时")
  107. print(f"实际数据行数: {len(df)} 行")
  108. df['index'] = pd.to_datetime(df['index'])
  109. df = df.drop_duplicates(subset=['index']).sort_values('index')
  110. existing_times = set(df['index'].dt.floor('H'))
  111. expected_times = set(complete_time_range)
  112. missing_times = expected_times - existing_times
  113. if missing_times:
  114. missing_count = len(missing_times)
  115. print(f"发现 {missing_count} 个缺失的小时数据")
  116. if not df.empty:
  117. last_row = df.iloc[-1].copy()
  118. missing_rows = []
  119. for missing_time in sorted(list(missing_times)):
  120. new_row = last_row.copy()
  121. new_row['index'] = missing_time
  122. missing_rows.append(new_row)
  123. if missing_rows:
  124. missing_df = pd.DataFrame(missing_rows)
  125. df = pd.concat([df, missing_df], ignore_index=True)
  126. print(f"已补充 {len(missing_rows)} 行数据")
  127. else:
  128. print("无法补充数据:原始数据为空")
  129. else:
  130. print("数据完整,无需补充")
  131. df = df.sort_values('index').reset_index(drop=True)
  132. # 保持与数据库查询和时间范围生成的一致性(排除结束时间点)
  133. df = df[(df['index'] >= start_date.replace(minute=0, second=0, microsecond=0)) & (
  134. df['index'] < end_date.replace(minute=0, second=0, microsecond=0))].reset_index(drop=True)
  135. print(f"最终数据行数: {len(df)} 行")
  136. if not df.empty:
  137. print(f"时间范围: {df['index'].min()} 到 {df['index'].max()}")
  138. return df
  139. if __name__ == '__main__':
  140. final_data = get_sensor_data(start_date=None)
  141. if not final_data.empty:
  142. print("\n--- 最终获取的数据 ---")
  143. print(final_data.head())
  144. print(f"\n数据维度: {final_data.shape}")