get_api_data.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. import requests
  2. import logging
  3. import pandas as pd
  4. from datetime import datetime, timedelta
  5. # --- 1. 基础配置 ---
  6. # 配置日志记录器,方便调试和追踪
  7. logger = logging.getLogger(__name__)
  8. logger.setLevel(logging.INFO)
  9. # 禁止传播到root logger,避免重复输出
  10. logger.propagate = False
  11. # 只有在logger没有handler时才添加(防止重复)
  12. if not logger.handlers:
  13. console_handler = logging.StreamHandler()
  14. console_handler.setLevel(logging.INFO)
  15. formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
  16. console_handler.setFormatter(formatter)
  17. logger.addHandler(console_handler)
  18. # --- 2. 辅助函数:填充缺失的小时数据 ---
  19. def fill_missing_hourly_data(df, start_date, end_date):
  20. """
  21. 确保DataFrame在给定的日期范围内拥有完整的小时索引。
  22. 缺失的小时数据点将通过前向填充(forward-fill)的方式补全。
  23. Args:
  24. df (pd.DataFrame): 经过数据透视处理的DataFrame,必须有名为 'timestamp' 的时间列。
  25. start_date (datetime): 期望时间范围的开始时间。
  26. end_date (datetime): 期望时间范围的结束时间。
  27. Returns:
  28. pd.DataFrame: 填充了缺失小时数据后的DataFrame。
  29. """
  30. if df.empty:
  31. logger.warning("原始数据为空,无法进行数据补充。")
  32. return df
  33. # 将 'index' 列设为索引,这是进行时间序列分析的标准操作
  34. df.set_index('index', inplace=True)
  35. # 创建一个完整的小时级别的时间范围
  36. # inclusive='left' 表示包含开始时间点,但不包含结束时间点,符合常规查询逻辑
  37. complete_time_range = pd.date_range(
  38. start=start_date.replace(minute=0, second=0, microsecond=0),
  39. periods=180 * 24, # 指定需要 4320 个小时
  40. freq='H'
  41. )
  42. logger.info(
  43. f"期望生成的时间范围: 从 {start_date.strftime('%Y-%m-%d %H:00')} 到 {end_date.strftime('%Y-%m-%d %H:00')}")
  44. logger.info(f"期望的小时总数: {len(complete_time_range)} 小时")
  45. logger.info(f"数据填充前的行数: {len(df)} 行")
  46. # 使用 reindex 和 ffill 高效地填充缺失值
  47. # 1. reindex: 将DataFrame的索引与完整时间范围对齐,缺失的时间点会产生NaN值
  48. # 2. ffill: 使用前一个有效观测值向前填充NaN值
  49. filled_df = df.reindex(complete_time_range).ffill()
  50. filled_df.reset_index(inplace=True)
  51. filled_df.rename(columns={'index': 'index'}, inplace=True) # 通常 reset_index 默认列名就是 'index',但这行可以确保万无一失
  52. logger.info(f"数据填充后的最终行数: {len(filled_df)} 行")
  53. if not filled_df.empty:
  54. logger.info(f"最终时间范围: 从 {filled_df.index.min()} 到 {filled_df.index.max()}")
  55. return filled_df
  56. # --- 3. 主函数:通过API获取并处理传感器数据 ---
  57. def get_sensor_data(end_date_str=None, API_BASE_URL=None, HEADERS=None):
  58. """
  59. 通过循环调用API获取多个传感器的数据,合并、处理并填充成一个干净、完整的DataFrame。
  60. Args:
  61. end_date_str (str, optional): 查询的结束日期,格式为 'YYYY-MM-DD HH:MM:SS'。
  62. 如果为None,则默认为当前时间。
  63. Returns:
  64. pd.DataFrame: 一个处理完成的DataFrame,索引是时间,每列是一个传感器。
  65. """
  66. # 步骤 1: 计算时间范围和时间戳
  67. if end_date_str is None:
  68. end_date = datetime.now()
  69. else:
  70. try:
  71. end_date = datetime.strptime(end_date_str, '%Y-%m-%d %H:%M:%S')
  72. except ValueError:
  73. logger.error(f"日期格式错误: '{end_date_str}'。请使用 'YYYY-MM-DD HH:MM:SS' 格式。")
  74. return pd.DataFrame()
  75. start_date = end_date - timedelta(days=180)
  76. start_timestamp = int(start_date.timestamp() * 1000)
  77. end_timestamp = int(end_date.timestamp() * 1000)
  78. logger.info(
  79. f"开始查询数据,时间范围: {start_date.strftime('%Y-%m-%d %H:%M:%S')} 到 {end_date.strftime('%Y-%m-%d %H:%M:%S')}")
  80. # 步骤 2: 定义要查询的14个传感器列表
  81. item_names = [
  82. 'C.M.RO1_FT_JS@out', 'C.M.RO2_FT_JS@out', 'C.M.RO3_FT_JS@out', 'C.M.RO4_FT_JS@out',
  83. 'C.M.RO_TT_ZJS@out', 'C.M.RO_Cond_ZJS@out',
  84. 'C.M.RO1_DB@DPT_1', 'C.M.RO1_DB@DPT_2',
  85. 'C.M.RO2_DB@DPT_1', 'C.M.RO2_DB@DPT_2',
  86. 'C.M.RO3_DB@DPT_1', 'C.M.RO3_DB@DPT_2',
  87. 'C.M.RO4_DB@DPT_1', 'C.M.RO4_DB@DPT_2'
  88. ]
  89. # 步骤 3: 循环调用API获取所有传感器的数据
  90. all_records = []
  91. for item_name in item_names:
  92. params = {
  93. "deviceid": "1", "dataitemid": item_name, "project_id": "92",
  94. "stime": start_timestamp, "etime": end_timestamp,
  95. "size": "1", "interval": "h", "aggregator": "new"
  96. }
  97. try:
  98. response = requests.get(API_BASE_URL, params=params, headers=HEADERS, timeout=60)
  99. response.raise_for_status() # 如果请求失败 (如 404, 500),则会抛出异常
  100. api_response = response.json()
  101. if api_response.get('code') == 200 and api_response.get('data'):
  102. records = api_response['data']
  103. # 将 item_name 添加到每条记录中,为后续数据透视做准备
  104. for record in records:
  105. record['item_name'] = item_name # 使用查询时的 itemid 作为列名
  106. all_records.extend(records)
  107. logger.info(f"成功获取 '{item_name}' 的 {len(records)} 条数据。")
  108. else:
  109. logger.warning(f"'{item_name}' 未返回有效数据。API消息: {api_response.get('msg', '无')}")
  110. except requests.exceptions.RequestException as e:
  111. logger.error(f"查询 '{item_name}' 时发生网络错误: {e}")
  112. continue # 跳过当前失败的传感器,继续下一个
  113. # 步骤 4: 将原始数据转换为DataFrame并进行清洗
  114. if not all_records:
  115. logger.error("未能从API获取任何有效数据,处理终止。")
  116. return pd.DataFrame()
  117. logger.info(f"API数据获取完成,总共获取了 {len(all_records)} 条原始记录。")
  118. data_origin = pd.DataFrame(all_records)
  119. # 数据清洗:转换数据类型,并处理可能存在的错误
  120. # 使用 'coerce' 会将无法转换的值变为 NaT (时间) 或 NaN (数值),更稳健
  121. data_origin['index'] = pd.to_datetime(data_origin['htime_at'], errors='coerce')
  122. data_origin['val'] = pd.to_numeric(data_origin['val'], errors='coerce')
  123. # 删除时间或数值转换失败的无效行
  124. data_origin.dropna(subset=['index', 'val'], inplace=True)
  125. # 步骤 5: 数据透视,将长表转换为宽表
  126. logger.info("正在进行数据透视,将数据整理为每行一个时间点,每列一个传感器...")
  127. pivot_df = data_origin.pivot_table(index='index', columns='item_name', values='val', aggfunc='first')
  128. pivot_df.reset_index(inplace=True) # 将索引 'index' 变回普通列,方便传入填充函数
  129. # 步骤 6: 填充缺失的小时数据并设置最终的索引
  130. logger.info("正在填充缺失的小时数据以确保时间序列的完整性...")
  131. # pivot_df.columns.name = None
  132. final_df = fill_missing_hourly_data(pivot_df, start_date, end_date)
  133. # 步骤 7: 对 final_df 进行插值处理,消除所有 NaN 值
  134. if not final_df.empty:
  135. # 检查是否存在 NaN 值
  136. nan_count_before = final_df.isna().sum().sum()
  137. if nan_count_before > 0:
  138. logger.info(f"检测到 {nan_count_before} 个 NaN 值,开始进行插值处理...")
  139. # 保存 'index' 列(时间列)
  140. time_column = final_df['index'].copy() if 'index' in final_df.columns else None
  141. # 对数值列进行插值处理
  142. numeric_columns = final_df.select_dtypes(include=['float64', 'int64']).columns
  143. if len(numeric_columns) > 0:
  144. # 1. 线性插值(适合时间序列数据)
  145. final_df[numeric_columns] = final_df[numeric_columns].interpolate(method='linear', limit_direction='both')
  146. # 2. 前向填充(处理开头的 NaN)
  147. final_df[numeric_columns] = final_df[numeric_columns].ffill()
  148. # 3. 后向填充(处理末尾的 NaN)
  149. final_df[numeric_columns] = final_df[numeric_columns].bfill()
  150. # 4. 如果仍有 NaN(整列为空的情况),用0填充
  151. final_df[numeric_columns] = final_df[numeric_columns].fillna(0)
  152. # 恢复 'index' 列(确保时间列不被修改)
  153. if time_column is not None:
  154. final_df['index'] = time_column
  155. nan_count_after = final_df.isna().sum().sum()
  156. logger.info(f"插值处理完成,剩余 NaN 值: {nan_count_after} 个")
  157. if nan_count_after > 0:
  158. logger.warning(f"警告: 仍有 {nan_count_after} 个 NaN 值未能填充")
  159. else:
  160. logger.info("数据中没有 NaN 值,无需插值处理")
  161. # final_df = final_df.sort_values('index').reset_index(drop=True)
  162. return final_df
  163. # --- 4. 脚本执行入口 ---
  164. if __name__ == "__main__":
  165. # 示例: 不传入日期,默认使用当前时间作为结束时间,查询过去180天的数据
  166. print("--- 开始执行数据获取任务 ---")
  167. sensor_df = get_sensor_data()
  168. if not sensor_df.empty:
  169. print("\n[成功] 数据获取与处理完成!")
  170. print("\n[结果] DataFrame (前5行):")
  171. print(sensor_df.head())
  172. print("\n[结果] DataFrame (后5行):")
  173. print(sensor_df.tail())
  174. print("\n[结果] DataFrame 信息:")
  175. # .info() 会打印出维度、列名、非空值数量和数据类型等关键信息
  176. sensor_df.info()
  177. else:
  178. print("\n[失败] 未能生成最终的DataFrame,请检查上面的日志输出获取详细错误信息。")