| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- import os
- import pandas as pd
- import json
- from datetime import datetime, timedelta
- from sqlalchemy import create_engine, text
- from sqlalchemy.exc import OperationalError
- # 加载配置文件
- def load_database_config():
- """从config.json加载数据库配置"""
- config_path = os.path.join(os.path.dirname(__file__), '..', 'config.json')
- try:
- with open(config_path, 'r', encoding='utf-8') as f:
- config = json.load(f)
- return config['database']
- except Exception as e:
- print(f"加载数据库配置失败: {e}")
- # 回退到环境变量或默认值
- return {
- 'host': os.getenv('DB_HOST', '222.130.26.206'),
- 'port': int(os.getenv('DB_PORT', '4000')),
- 'user': os.getenv('DB_USERNAME', 'whu'),
- 'password': os.getenv('DB_PASSWORD', '09093f4e6b33ddd'),
- 'database': os.getenv('DB_DATABASE', 'ws_data'),
- 'table_name': 'dc_item_history_data_hour'
- }
- # 数据库引擎创建部分
- db_config = load_database_config()
- try:
- database_url = f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}?charset=utf8mb4"
- engine = create_engine(database_url)
- print(f"数据库引擎创建成功!连接到: {db_config['host']}:{db_config['port']}/{db_config['database']}")
- except Exception as e:
- print(f"创建数据库引擎失败: {e}")
- engine = None
- def get_sensor_data(start_date=None):
- if engine is None:
- print("数据库引擎未初始化,无法查询数据。")
- return pd.DataFrame()
- if start_date is None:
- start_date = datetime.today()
- else:
- start_date = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S')
- print(start_date)
- end_date = start_date
- start_date = start_date - timedelta(days=180)
- start_date_str = start_date.strftime('%Y-%m-%d %H:%M:%S')
- end_date_str = end_date.strftime('%Y-%m-%d %H:%M:%S')
- item_names = [
- 'C.M.RO1_FT_JS@out', 'C.M.RO2_FT_JS@out', 'C.M.RO3_FT_JS@out', 'C.M.RO4_FT_JS@out',
- 'C.M.RO_TT_ZJS@out', 'C.M.RO_Cond_ZJS@out',
- 'C.M.RO1_DB@DPT_1', 'C.M.RO1_DB@DPT_2',
- 'C.M.RO2_DB@DPT_1', 'C.M.RO2_DB@DPT_2',
- 'C.M.RO3_DB@DPT_1', 'C.M.RO3_DB@DPT_2',
- 'C.M.RO4_DB@DPT_1', 'C.M.RO4_DB@DPT_2'
- ]
- # 输出 180天 × 24小时 = 4320 行数据
- table_name = db_config['table_name'] # 从配置获取表名
- sql_query = text(f"""
- SELECT *
- FROM {table_name}
- WHERE project_id = :proj_id
- AND item_name IN :item_names_list
- AND h_time >= :start_time AND h_time < :end_time
- """)
- # 将参数构建为一个字典,key 对应上面查询中的命名占位符
- params = {
- "proj_id": 92,
- "item_names_list": item_names, # 直接将整个列表传入,SQLAlchemy会处理
- "start_time": start_date_str,
- "end_time": end_date_str
- }
- try:
- with engine.connect() as connection:
- print("数据库连接成功!准备使用 text() 结构执行查询...")
- data_origin = pd.read_sql(sql_query, connection, params=params)
- print(f"查询成功!获取到 {len(data_origin)} 条原始数据。")
- except OperationalError as e:
- print(f"数据库连接或查询失败: 用户名、密码或地址错误?\n 错误详情: {e}")
- return pd.DataFrame()
- except Exception as e:
- print(f"执行数据库查询时发生未知错误: {e}")
- # 打印完整的错误追溯信息,这对于调试至关重要!
- import traceback
- traceback.print_exc()
- return pd.DataFrame()
- if data_origin.empty:
- print("警告:数据库未返回任何数据,无法进行后续处理。")
- return pd.DataFrame()
- data_origin['index'] = pd.to_datetime(data_origin['h_time'])
- pivot_df = data_origin.pivot_table(index='index', columns='item_name', values='val', aggfunc='first')
- pivot_df.reset_index(inplace=True)
- print("-" * 20)
- print(f"不重复的原始时间戳数量: {pivot_df['index'].nunique()}")
- print(f"“取整到小时后”的不重复小时数量: {pivot_df['index'].dt.floor('H').nunique()}")
- print("-" * 20)
- # -----------------------------
- pivot_df = fill_missing_hourly_data(pivot_df, start_date, end_date)
- return pivot_df
- def fill_missing_hourly_data(df, start_date, end_date):
- if df.empty:
- print("原始数据为空,无法进行数据补充")
- return df
- complete_time_range = pd.date_range(start=start_date.replace(minute=0, second=0, microsecond=0),
- end=end_date.replace(minute=0, second=0, microsecond=0), freq='H', inclusive='left')
- print(f"期望时间范围: {start_date.strftime('%Y-%m-%d %H:%M')} 到 {end_date.strftime('%Y-%m-%d %H:%M')}")
- print(f"期望小时数: {len(complete_time_range)} 小时")
- print(f"实际数据行数: {len(df)} 行")
- df['index'] = pd.to_datetime(df['index'])
- df = df.drop_duplicates(subset=['index']).sort_values('index')
- existing_times = set(df['index'].dt.floor('H'))
- expected_times = set(complete_time_range)
- missing_times = expected_times - existing_times
- if missing_times:
- missing_count = len(missing_times)
- print(f"发现 {missing_count} 个缺失的小时数据")
- if not df.empty:
- last_row = df.iloc[-1].copy()
- missing_rows = []
- for missing_time in sorted(list(missing_times)):
- new_row = last_row.copy()
- new_row['index'] = missing_time
- missing_rows.append(new_row)
- if missing_rows:
- missing_df = pd.DataFrame(missing_rows)
- df = pd.concat([df, missing_df], ignore_index=True)
- print(f"已补充 {len(missing_rows)} 行数据")
- else:
- print("无法补充数据:原始数据为空")
- else:
- print("数据完整,无需补充")
- df = df.sort_values('index').reset_index(drop=True)
- # 保持与数据库查询和时间范围生成的一致性(排除结束时间点)
- df = df[(df['index'] >= start_date.replace(minute=0, second=0, microsecond=0)) & (
- df['index'] < end_date.replace(minute=0, second=0, microsecond=0))].reset_index(drop=True)
- print(f"最终数据行数: {len(df)} 行")
- if not df.empty:
- print(f"时间范围: {df['index'].min()} 到 {df['index'].max()}")
- return df
- if __name__ == '__main__':
- final_data = get_sensor_data(start_date=None)
- if not final_data.empty:
- print("\n--- 最终获取的数据 ---")
- print(final_data.head())
- print(f"\n数据维度: {final_data.shape}")
|