import os import pandas as pd import json from datetime import datetime, timedelta from sqlalchemy import create_engine, text from sqlalchemy.exc import OperationalError # 加载配置文件 def load_database_config(): """从config.json加载数据库配置""" config_path = os.path.join(os.path.dirname(__file__), '..', 'config.json') try: with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) return config['database'] except Exception as e: print(f"加载数据库配置失败: {e}") # 回退到环境变量或默认值 return { 'host': os.getenv('DB_HOST', '222.130.26.206'), 'port': int(os.getenv('DB_PORT', '4000')), 'user': os.getenv('DB_USERNAME', 'whu'), 'password': os.getenv('DB_PASSWORD', '09093f4e6b33ddd'), 'database': os.getenv('DB_DATABASE', 'ws_data'), 'table_name': 'dc_item_history_data_hour' } # 数据库引擎创建部分 db_config = load_database_config() try: database_url = f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}?charset=utf8mb4" engine = create_engine(database_url) print(f"数据库引擎创建成功!连接到: {db_config['host']}:{db_config['port']}/{db_config['database']}") except Exception as e: print(f"创建数据库引擎失败: {e}") engine = None def get_sensor_data(start_date=None): if engine is None: print("数据库引擎未初始化,无法查询数据。") return pd.DataFrame() if start_date is None: start_date = datetime.today() else: start_date = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S') print(start_date) end_date = start_date start_date = start_date - timedelta(days=180) start_date_str = start_date.strftime('%Y-%m-%d %H:%M:%S') end_date_str = end_date.strftime('%Y-%m-%d %H:%M:%S') item_names = [ 'C.M.RO1_FT_JS@out', 'C.M.RO2_FT_JS@out', 'C.M.RO3_FT_JS@out', 'C.M.RO4_FT_JS@out', 'C.M.RO_TT_ZJS@out', 'C.M.RO_Cond_ZJS@out', 'C.M.RO1_DB@DPT_1', 'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_1', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_1', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_1', 'C.M.RO4_DB@DPT_2' ] # 输出 180天 × 24小时 = 4320 行数据 table_name = db_config['table_name'] # 从配置获取表名 sql_query = text(f""" SELECT * FROM {table_name} WHERE project_id = :proj_id AND item_name IN :item_names_list AND h_time >= :start_time AND h_time < :end_time """) # 将参数构建为一个字典,key 对应上面查询中的命名占位符 params = { "proj_id": 92, "item_names_list": item_names, # 直接将整个列表传入,SQLAlchemy会处理 "start_time": start_date_str, "end_time": end_date_str } try: with engine.connect() as connection: print("数据库连接成功!准备使用 text() 结构执行查询...") data_origin = pd.read_sql(sql_query, connection, params=params) print(f"查询成功!获取到 {len(data_origin)} 条原始数据。") except OperationalError as e: print(f"数据库连接或查询失败: 用户名、密码或地址错误?\n 错误详情: {e}") return pd.DataFrame() except Exception as e: print(f"执行数据库查询时发生未知错误: {e}") # 打印完整的错误追溯信息,这对于调试至关重要! import traceback traceback.print_exc() return pd.DataFrame() if data_origin.empty: print("⚠️ 数据库未返回任何数据,无法进行后续处理。") return pd.DataFrame() data_origin['index'] = pd.to_datetime(data_origin['h_time']) pivot_df = data_origin.pivot_table(index='index', columns='item_name', values='val', aggfunc='first') pivot_df.reset_index(inplace=True) print("-" * 20) print(f"不重复的原始时间戳数量: {pivot_df['index'].nunique()}") print(f"“取整到小时后”的不重复小时数量: {pivot_df['index'].dt.floor('H').nunique()}") print("-" * 20) # ----------------------------- pivot_df = fill_missing_hourly_data(pivot_df, start_date, end_date) return pivot_df def fill_missing_hourly_data(df, start_date, end_date): if df.empty: print("原始数据为空,无法进行数据补充") return df complete_time_range = pd.date_range(start=start_date.replace(minute=0, second=0, microsecond=0), end=end_date.replace(minute=0, second=0, microsecond=0), freq='H', inclusive='left') print(f"期望时间范围: {start_date.strftime('%Y-%m-%d %H:%M')} 到 {end_date.strftime('%Y-%m-%d %H:%M')}") print(f"期望小时数: {len(complete_time_range)} 小时") print(f"实际数据行数: {len(df)} 行") df['index'] = pd.to_datetime(df['index']) df = df.drop_duplicates(subset=['index']).sort_values('index') existing_times = set(df['index'].dt.floor('H')) expected_times = set(complete_time_range) missing_times = expected_times - existing_times if missing_times: missing_count = len(missing_times) print(f"发现 {missing_count} 个缺失的小时数据") if not df.empty: last_row = df.iloc[-1].copy() missing_rows = [] for missing_time in sorted(list(missing_times)): new_row = last_row.copy() new_row['index'] = missing_time missing_rows.append(new_row) if missing_rows: missing_df = pd.DataFrame(missing_rows) df = pd.concat([df, missing_df], ignore_index=True) print(f"已补充 {len(missing_rows)} 行数据") else: print("无法补充数据:原始数据为空") else: print("数据完整,无需补充") df = df.sort_values('index').reset_index(drop=True) # 保持与数据库查询和时间范围生成的一致性(排除结束时间点) df = df[(df['index'] >= start_date.replace(minute=0, second=0, microsecond=0)) & ( df['index'] < end_date.replace(minute=0, second=0, microsecond=0))].reset_index(drop=True) print(f"最终数据行数: {len(df)} 行") if not df.empty: print(f"时间范围: {df['index'].min()} 到 {df['index'].max()}") return df if __name__ == '__main__': final_data = get_sensor_data(start_date=None) if not final_data.empty: print("\n--- 最终获取的数据 ---") print(final_data.head()) print(f"\n数据维度: {final_data.shape}")