import requests import logging import pandas as pd from datetime import datetime, timedelta # --- 1. 基础配置 --- # 配置日志记录器,方便调试和追踪 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger() # --- 2. 辅助函数:填充缺失的小时数据 --- def fill_missing_hourly_data(df, start_date, end_date): """ 确保DataFrame在给定的日期范围内拥有完整的小时索引。 缺失的小时数据点将通过前向填充(forward-fill)的方式补全。 Args: df (pd.DataFrame): 经过数据透视处理的DataFrame,必须有名为 'timestamp' 的时间列。 start_date (datetime): 期望时间范围的开始时间。 end_date (datetime): 期望时间范围的结束时间。 Returns: pd.DataFrame: 填充了缺失小时数据后的DataFrame。 """ if df.empty: logger.warning("原始数据为空,无法进行数据补充。") return df # 将 'index' 列设为索引,这是进行时间序列分析的标准操作 df.set_index('index', inplace=True) # 创建一个完整的小时级别的时间范围 # inclusive='left' 表示包含开始时间点,但不包含结束时间点,符合常规查询逻辑 complete_time_range = pd.date_range( start=start_date.replace(minute=0, second=0, microsecond=0), periods=180 * 24, # 指定需要 4320 个小时 freq='H' ) logger.info( f"期望生成的时间范围: 从 {start_date.strftime('%Y-%m-%d %H:00')} 到 {end_date.strftime('%Y-%m-%d %H:00')}") logger.info(f"期望的小时总数: {len(complete_time_range)} 小时") logger.info(f"数据填充前的行数: {len(df)} 行") # 使用 reindex 和 ffill 高效地填充缺失值 # 1. reindex: 将DataFrame的索引与完整时间范围对齐,缺失的时间点会产生NaN值 # 2. ffill: 使用前一个有效观测值向前填充NaN值 filled_df = df.reindex(complete_time_range).ffill() filled_df.reset_index(inplace=True) filled_df.rename(columns={'index': 'index'}, inplace=True) # 通常 reset_index 默认列名就是 'index',但这行可以确保万无一失 logger.info(f"数据填充后的最终行数: {len(filled_df)} 行") if not filled_df.empty: logger.info(f"最终时间范围: 从 {filled_df.index.min()} 到 {filled_df.index.max()}") return filled_df # --- 3. 主函数:通过API获取并处理传感器数据 --- def get_sensor_data(end_date_str=None, API_BASE_URL=None, HEADERS=None): """ 通过循环调用API获取多个传感器的数据,合并、处理并填充成一个干净、完整的DataFrame。 Args: end_date_str (str, optional): 查询的结束日期,格式为 'YYYY-MM-DD HH:MM:SS'。 如果为None,则默认为当前时间。 Returns: pd.DataFrame: 一个处理完成的DataFrame,索引是时间,每列是一个传感器。 """ # 步骤 1: 计算时间范围和时间戳 if end_date_str is None: end_date = datetime.now() else: try: end_date = datetime.strptime(end_date_str, '%Y-%m-%d %H:%M:%S') except ValueError: logger.error(f"日期格式错误: '{end_date_str}'。请使用 'YYYY-MM-DD HH:MM:SS' 格式。") return pd.DataFrame() start_date = end_date - timedelta(days=180) start_timestamp = int(start_date.timestamp() * 1000) end_timestamp = int(end_date.timestamp() * 1000) logger.info( f"开始查询数据,时间范围: {start_date.strftime('%Y-%m-%d %H:%M:%S')} 到 {end_date.strftime('%Y-%m-%d %H:%M:%S')}") # 步骤 2: 定义要查询的14个传感器列表 item_names = [ 'C.M.RO1_FT_JS@out', 'C.M.RO2_FT_JS@out', 'C.M.RO3_FT_JS@out', 'C.M.RO4_FT_JS@out', 'C.M.RO_TT_ZJS@out', 'C.M.RO_Cond_ZJS@out', 'C.M.RO1_DB@DPT_1', 'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_1', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_1', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_1', 'C.M.RO4_DB@DPT_2' ] # 步骤 3: 循环调用API获取所有传感器的数据 all_records = [] for item_name in item_names: params = { "deviceid": "1", "dataitemid": item_name, "project_id": "92", "stime": start_timestamp, "etime": end_timestamp, "size": "1", "interval": "h", "aggregator": "new" } try: response = requests.get(API_BASE_URL, params=params, headers=HEADERS, timeout=60) response.raise_for_status() # 如果请求失败 (如 404, 500),则会抛出异常 api_response = response.json() if api_response.get('code') == 200 and api_response.get('data'): records = api_response['data'] # 将 item_name 添加到每条记录中,为后续数据透视做准备 for record in records: record['item_name'] = item_name # 使用查询时的 itemid 作为列名 all_records.extend(records) logger.info(f"成功获取 '{item_name}' 的 {len(records)} 条数据。") else: logger.warning(f"'{item_name}' 未返回有效数据。API消息: {api_response.get('msg', '无')}") except requests.exceptions.RequestException as e: logger.error(f"查询 '{item_name}' 时发生网络错误: {e}") continue # 跳过当前失败的传感器,继续下一个 # 步骤 4: 将原始数据转换为DataFrame并进行清洗 if not all_records: logger.error("未能从API获取任何有效数据,处理终止。") return pd.DataFrame() logger.info(f"API数据获取完成,总共获取了 {len(all_records)} 条原始记录。") data_origin = pd.DataFrame(all_records) # 数据清洗:转换数据类型,并处理可能存在的错误 # 使用 'coerce' 会将无法转换的值变为 NaT (时间) 或 NaN (数值),更稳健 data_origin['index'] = pd.to_datetime(data_origin['htime_at'], errors='coerce') data_origin['val'] = pd.to_numeric(data_origin['val'], errors='coerce') # 删除时间或数值转换失败的无效行 data_origin.dropna(subset=['index', 'val'], inplace=True) # 步骤 5: 数据透视,将长表转换为宽表 logger.info("正在进行数据透视,将数据整理为每行一个时间点,每列一个传感器...") pivot_df = data_origin.pivot_table(index='index', columns='item_name', values='val', aggfunc='first') pivot_df.reset_index(inplace=True) # 将索引 'index' 变回普通列,方便传入填充函数 # 步骤 6: 填充缺失的小时数据并设置最终的索引 logger.info("正在填充缺失的小时数据以确保时间序列的完整性...") # pivot_df.columns.name = None final_df = fill_missing_hourly_data(pivot_df, start_date, end_date) # final_df = final_df.sort_values('index').reset_index(drop=True) return final_df # --- 4. 脚本执行入口 --- if __name__ == "__main__": # 示例: 不传入日期,默认使用当前时间作为结束时间,查询过去180天的数据 print("--- 开始执行数据获取任务 ---") sensor_df = get_sensor_data() if not sensor_df.empty: print("\n[成功] 数据获取与处理完成!") print("\n[结果] DataFrame (前5行):") print(sensor_df.head()) print("\n[结果] DataFrame (后5行):") print(sensor_df.tail()) print("\n[结果] DataFrame 信息:") # .info() 会打印出维度、列名、非空值数量和数据类型等关键信息 sensor_df.info() else: print("\n[失败] 未能生成最终的DataFrame,请检查上面的日志输出获取详细错误信息。")