Przeglądaj źródła

1:新增获取历史数据逻辑
2:优化调用代码逻辑

wmy 2 tygodni temu
rodzic
commit
9db55b60bd
4 zmienionych plików z 189 dodań i 13 usunięć
  1. 1 0
      config.json
  2. 2 2
      fouling_model_0922/predict.py
  3. 170 0
      get_api_data.py
  4. 16 11
      main_simple.py

+ 1 - 0
config.json

@@ -3,6 +3,7 @@
   "api": {
     "base_url": "http://120.55.44.4:8900",
     "callback_endpoint": "/api/dtgateway/v1/decision/data",
+    "API_History_URL" : "http://120.55.44.4:8900/api/v1/jinke-cloud/db/device/history-data",
     "jwt_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJJRCI6NywiVXNlcm5hbWUiOiJhZG1pbiIsIkRlcCI6IjEzNSIsImV4cCI6MTc3NjExOTExNCwiaXNzIjoiZ2luLWJsb2cifQ.0HTtzHZjyd2mHo8VCy8icYROxmntRMuQhyoZsAYRL_M"
   },
   "_comment_database": "MySQL数据库连接配置",

+ 2 - 2
fouling_model_0922/predict.py

@@ -163,9 +163,9 @@ class Predictor:
             smoothed = self.savitzky_golay_smooth(smoothed)
         return smoothed
 
-    def predict(self, start_date):
+    def predict(self, df):
         # 获取当前或者传入时间的历史数据(180天)
-        df = get_sensor_data(start_date=start_date)
+        # df = get_sensor_data(start_date=start_date)
         """核心预测接口:输入原始数据,返回处理后的预测结果"""
         self.test_start_date = (pd.to_datetime(df['index']).max() + timedelta(hours=3)).strftime("%Y-%m-%d %H:%M:%S")
         self.load_data(df)

+ 170 - 0
get_api_data.py

@@ -0,0 +1,170 @@
+import requests
+import logging
+import pandas as pd
+from datetime import datetime, timedelta
+
+# --- 1. 基础配置 ---
+# 配置日志记录器,方便调试和追踪
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+logger = logging.getLogger()
+
+
+# --- 2. 辅助函数:填充缺失的小时数据 ---
+def fill_missing_hourly_data(df, start_date, end_date):
+    """
+    确保DataFrame在给定的日期范围内拥有完整的小时索引。
+    缺失的小时数据点将通过前向填充(forward-fill)的方式补全。
+
+    Args:
+        df (pd.DataFrame): 经过数据透视处理的DataFrame,必须有名为 'timestamp' 的时间列。
+        start_date (datetime): 期望时间范围的开始时间。
+        end_date (datetime): 期望时间范围的结束时间。
+
+    Returns:
+        pd.DataFrame: 填充了缺失小时数据后的DataFrame。
+    """
+    if df.empty:
+        logger.warning("原始数据为空,无法进行数据补充。")
+        return df
+
+    # 将 'index' 列设为索引,这是进行时间序列分析的标准操作
+    df.set_index('index', inplace=True)
+
+    # 创建一个完整的小时级别的时间范围
+    # inclusive='left' 表示包含开始时间点,但不包含结束时间点,符合常规查询逻辑
+    complete_time_range = pd.date_range(
+        start=start_date.replace(minute=0, second=0, microsecond=0),
+        periods=180 * 24,  # 指定需要 4320 个小时
+        freq='H'
+    )
+
+    logger.info(
+        f"期望生成的时间范围: 从 {start_date.strftime('%Y-%m-%d %H:00')} 到 {end_date.strftime('%Y-%m-%d %H:00')}")
+    logger.info(f"期望的小时总数: {len(complete_time_range)} 小时")
+    logger.info(f"数据填充前的行数: {len(df)} 行")
+
+    # 使用 reindex 和 ffill 高效地填充缺失值
+    # 1. reindex: 将DataFrame的索引与完整时间范围对齐,缺失的时间点会产生NaN值
+    # 2. ffill: 使用前一个有效观测值向前填充NaN值
+    filled_df = df.reindex(complete_time_range).ffill()
+    filled_df.reset_index(inplace=True)
+    filled_df.rename(columns={'index': 'index'}, inplace=True)  # 通常 reset_index 默认列名就是 'index',但这行可以确保万无一失
+    logger.info(f"数据填充后的最终行数: {len(filled_df)} 行")
+    if not filled_df.empty:
+        logger.info(f"最终时间范围: 从 {filled_df.index.min()} 到 {filled_df.index.max()}")
+
+    return filled_df
+
+
+# --- 3. 主函数:通过API获取并处理传感器数据 ---
+def get_sensor_data(end_date_str=None, API_BASE_URL=None, HEADERS=None):
+    """
+    通过循环调用API获取多个传感器的数据,合并、处理并填充成一个干净、完整的DataFrame。
+
+    Args:
+        end_date_str (str, optional): 查询的结束日期,格式为 'YYYY-MM-DD HH:MM:SS'。
+                                     如果为None,则默认为当前时间。
+
+    Returns:
+        pd.DataFrame: 一个处理完成的DataFrame,索引是时间,每列是一个传感器。
+    """
+    # 步骤 1: 计算时间范围和时间戳
+    if end_date_str is None:
+        end_date = datetime.now()
+    else:
+        try:
+            end_date = datetime.strptime(end_date_str, '%Y-%m-%d %H:%M:%S')
+        except ValueError:
+            logger.error(f"日期格式错误: '{end_date_str}'。请使用 'YYYY-MM-DD HH:MM:SS' 格式。")
+            return pd.DataFrame()
+
+    start_date = end_date - timedelta(days=180)
+    start_timestamp = int(start_date.timestamp() * 1000)
+    end_timestamp = int(end_date.timestamp() * 1000)
+    logger.info(
+        f"开始查询数据,时间范围: {start_date.strftime('%Y-%m-%d %H:%M:%S')} 到 {end_date.strftime('%Y-%m-%d %H:%M:%S')}")
+
+    # 步骤 2: 定义要查询的14个传感器列表
+    item_names = [
+        'C.M.RO1_FT_JS@out', 'C.M.RO2_FT_JS@out', 'C.M.RO3_FT_JS@out', 'C.M.RO4_FT_JS@out',
+        'C.M.RO_TT_ZJS@out', 'C.M.RO_Cond_ZJS@out',
+        'C.M.RO1_DB@DPT_1', 'C.M.RO1_DB@DPT_2',
+        'C.M.RO2_DB@DPT_1', 'C.M.RO2_DB@DPT_2',
+        'C.M.RO3_DB@DPT_1', 'C.M.RO3_DB@DPT_2',
+        'C.M.RO4_DB@DPT_1', 'C.M.RO4_DB@DPT_2'
+    ]
+
+    # 步骤 3: 循环调用API获取所有传感器的数据
+    all_records = []
+    for item_name in item_names:
+        params = {
+            "deviceid": "1", "dataitemid": item_name, "project_id": "92",
+            "stime": start_timestamp, "etime": end_timestamp,
+            "size": "1", "interval": "h", "aggregator": "new"
+        }
+        try:
+            response = requests.get(API_BASE_URL, params=params, headers=HEADERS, timeout=60)
+            response.raise_for_status()  # 如果请求失败 (如 404, 500),则会抛出异常
+            api_response = response.json()
+
+            if api_response.get('code') == 200 and api_response.get('data'):
+                records = api_response['data']
+                # 将 item_name 添加到每条记录中,为后续数据透视做准备
+                for record in records:
+                    record['item_name'] = item_name  # 使用查询时的 itemid 作为列名
+                all_records.extend(records)
+                logger.info(f"成功获取 '{item_name}' 的 {len(records)} 条数据。")
+            else:
+                logger.warning(f"'{item_name}' 未返回有效数据。API消息: {api_response.get('msg', '无')}")
+        except requests.exceptions.RequestException as e:
+            logger.error(f"查询 '{item_name}' 时发生网络错误: {e}")
+            continue  # 跳过当前失败的传感器,继续下一个
+
+    # 步骤 4: 将原始数据转换为DataFrame并进行清洗
+    if not all_records:
+        logger.error("未能从API获取任何有效数据,处理终止。")
+        return pd.DataFrame()
+
+    logger.info(f"API数据获取完成,总共获取了 {len(all_records)} 条原始记录。")
+    data_origin = pd.DataFrame(all_records)
+
+    # 数据清洗:转换数据类型,并处理可能存在的错误
+    # 使用 'coerce' 会将无法转换的值变为 NaT (时间) 或 NaN (数值),更稳健
+    data_origin['index'] = pd.to_datetime(data_origin['htime_at'], errors='coerce')
+    data_origin['val'] = pd.to_numeric(data_origin['val'], errors='coerce')
+
+    # 删除时间或数值转换失败的无效行
+    data_origin.dropna(subset=['index', 'val'], inplace=True)
+
+    # 步骤 5: 数据透视,将长表转换为宽表
+    logger.info("正在进行数据透视,将数据整理为每行一个时间点,每列一个传感器...")
+    pivot_df = data_origin.pivot_table(index='index', columns='item_name', values='val', aggfunc='first')
+    pivot_df.reset_index(inplace=True)  # 将索引 'index' 变回普通列,方便传入填充函数
+
+    # 步骤 6: 填充缺失的小时数据并设置最终的索引
+    logger.info("正在填充缺失的小时数据以确保时间序列的完整性...")
+    # pivot_df.columns.name = None
+    final_df = fill_missing_hourly_data(pivot_df, start_date, end_date)
+    # final_df = final_df.sort_values('index').reset_index(drop=True)
+    return final_df
+
+
+# --- 4. 脚本执行入口 ---
+if __name__ == "__main__":
+
+    # 示例: 不传入日期,默认使用当前时间作为结束时间,查询过去180天的数据
+    print("--- 开始执行数据获取任务 ---")
+    sensor_df = get_sensor_data()
+    if not sensor_df.empty:
+        print("\n[成功] 数据获取与处理完成!")
+        print("\n[结果] DataFrame (前5行):")
+        print(sensor_df.head())
+
+        print("\n[结果] DataFrame (后5行):")
+        print(sensor_df.tail())
+
+        print("\n[结果] DataFrame 信息:")
+        # .info() 会打印出维度、列名、非空值数量和数据类型等关键信息
+        sensor_df.info()
+    else:
+        print("\n[失败] 未能生成最终的DataFrame,请检查上面的日志输出获取详细错误信息。")

+ 16 - 11
main_simple.py

@@ -19,6 +19,7 @@ import pandas as pd
 import numpy as np
 from sklearn.linear_model import LinearRegression
 from fouling_model_0922.predict import Predictor
+from get_api_data import get_sensor_data
 import warnings
 from datetime import datetime, timedelta
 from logging_system import CIPAnalysisLogger
@@ -29,6 +30,7 @@ import os
 
 warnings.filterwarnings('ignore', category=FutureWarning)
 
+
 # 加载配置文件
 def load_config():
     """
@@ -50,6 +52,15 @@ def load_config():
 # 加载配置
 config = load_config()
 
+# 设置请求头
+headers = {
+    "Content-Type": "application/json",
+    "JWT-TOKEN": config['api']['jwt_token']
+}
+# 构建回调URL
+callback_url = config['api']['base_url'] + config['api']['callback_endpoint']
+history_url = config['api']['API_History_URL']
+
 def update_cip_history_in_config(result_df):
     """
     保存CIP预测结果到配置文件
@@ -449,7 +460,10 @@ def analyze_ro_unit_cip_timing(strategy=1, start_date=None, unit_filter=None):
     try:
         # 获取预测数据
         try:
-            all_data = Predictor().predict(start_date=start_date)
+            # 获取 前180天的数据
+            df = get_sensor_data(start_date, history_url, headers)
+            #预测
+            all_data = Predictor().predict(df =df)
             if all_data.empty: 
                 logger.logger.error("预测数据为空")
                 return pd.DataFrame()
@@ -620,7 +634,7 @@ def main(strategy=3, start_date=None, unit_filter=None):
     if config and not result_df.empty:
         callback_success = send_decision_to_callback(result_df)
         if not callback_success:
-            print("⚠️ 回调发送失败")
+            print(" 回调发送失败")
 
     return result_df
 
@@ -641,15 +655,6 @@ def send_decision_to_callback(decision_data):
         return False
     
     try:
-        # 构建回调URL
-        callback_url = config['api']['base_url'] + config['api']['callback_endpoint']
-        
-        # 设置请求头
-        headers = {
-            "Content-Type": "application/json",
-            "JWT-TOKEN": config['api']['jwt_token']
-        }
-        
         # 获取项目ID
         project_id = config['scada']['project_id']