Browse Source

feat: RO CIP 预测改用实时在线数据,支持按水厂配置机组

- data_provider: query_single_point_history 新增 interval 参数,支持 minute 级长周期查询
- ro_adapter: 从离线 CSV 改为 API 拉取 30 天实时数据,按机组过滤非生产数据后逐个预测
- config: ro.units 改为按水厂映射(xishan 4 台, longting 2 台)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
wmy 1 week ago
parent
commit
d79e95bd3b
4 changed files with 107 additions and 10 deletions
  1. 95 5
      core/adapters/ro_adapter.py
  2. 4 1
      core/config.py
  3. 4 1
      core/config/pipeline_config.yaml
  4. 4 3
      core/data_provider.py

+ 95 - 5
core/adapters/ro_adapter.py

@@ -1,11 +1,13 @@
 """
 RO CIP 预测模型适配器。支持每日定时 + 异常触发两种运行模式。
+拉取最近 cycle_days 天的实时在线数据,通过 minute 间隔 API 查询。
 """
 import logging
 import time
-from datetime import datetime
+from datetime import datetime, timedelta
 from typing import List, Optional
 
+import numpy as np
 import pandas as pd
 
 from core.config import PipelineConfig
@@ -14,6 +16,17 @@ from core.shared_state import SharedState
 
 logger = logging.getLogger(__name__)
 
+# RO CIP 预测所需的 PLC 传感器点位
+_RO_PLC_POINTS = [
+    "RO1_FluxF", "RO2_FluxF", "RO3_FluxF", "RO4_FluxF",
+    "C.M.RO1_DB@DPT_1", "C.M.RO2_DB@DPT_1", "C.M.RO3_DB@DPT_1", "C.M.RO4_DB@DPT_1",
+    "C.M.RO1_DB@DPT_2", "C.M.RO2_DB@DPT_2", "C.M.RO3_DB@DPT_2", "C.M.RO4_DB@DPT_2",
+    "C.M.RO_TT_ZJS@out",
+    "C.M.RO_Cond_ZJS@out",
+    "C.M.RO_PH_ZJS@out",
+    "C.M.RO_ORP_ZJS@out",
+]
+
 
 class ROCIPAdapter:
     def __init__(
@@ -31,7 +44,6 @@ class ROCIPAdapter:
         # 导入 RO 模型函数(不触发 __main__)
         import importlib
         import sys
-        from pathlib import Path
 
         ro_root = str(config.ro_root / "cip_predict")
         if ro_root not in sys.path:
@@ -40,7 +52,48 @@ class ROCIPAdapter:
         ro_module = importlib.import_module("run_this")
         self._process_unit = ro_module.process_unit
         self._main = ro_module.main
-        self._load_history = ro_module.load_history
+
+    # ==================== 数据拉取 ====================
+
+    def _fetch_cycle_data(self) -> pd.DataFrame:
+        """从 API 拉取最近 cycle_days 天的 RO 数据(minute 间隔,5 分钟降采样)。"""
+        cycle_days = self.config.get("ro.cycle_days", 30)
+        end_time = datetime.now()
+        start_time = end_time - timedelta(days=cycle_days)
+        resample_interval = 300  # 5 分钟
+
+        time_index = pd.date_range(start=start_time, end=end_time, freq=f"{resample_interval}s")
+        result = {"time": time_index}
+
+        success, empty = 0, 0
+        for pt in _RO_PLC_POINTS:
+            raw = self.data_provider.query_single_point_history(
+                pt, start_time, end_time, interval="minute"
+            )
+            if raw.empty:
+                result[pt] = np.nan
+                empty += 1
+            else:
+                resampled = raw.resample(f"{resample_interval}s").mean()
+                resampled = resampled.reindex(
+                    time_index,
+                    method="nearest",
+                    tolerance=pd.Timedelta(seconds=resample_interval),
+                )
+                result[pt] = resampled.ffill().bfill()
+                success += 1
+
+        logger.info(f"RO 数据拉取: {success}/{len(_RO_PLC_POINTS)} 成功, 范围 {cycle_days} 天")
+
+        df = pd.DataFrame(result)
+        df["time"] = pd.to_datetime(df["time"])
+
+        sensor_cols = [c for c in _RO_PLC_POINTS if c in df.columns]
+        df = df.dropna(subset=sensor_cols, how="all").reset_index(drop=True)
+
+        return df
+
+    # ==================== 预测 ====================
 
     def run_prediction(
         self,
@@ -54,9 +107,42 @@ class ROCIPAdapter:
         tmp_limit = tmp_limit or self.config.ro_tmp_limit
         dpt_type = dpt_type or self.config.ro_dpt_type
 
-        result_df = self._main(units, tmp_limit, dpt_type, df=df)
+        if df is None:
+            df = self._fetch_cycle_data()
+
+        if df.empty or len(df) < 10:
+            logger.warning(f"RO 数据不足 ({len(df)} 行), 跳过本轮预测")
+            return pd.DataFrame()
+
+        # 按机组过滤非生产数据后逐个预测(避免 flux≈0 导致 R1=∞ 传播)
+        results = []
+        for u in units:
+            unit_df = df.copy()
+            flux_col = f"{u}_FluxF"
+            if flux_col in unit_df.columns:
+                unit_df = unit_df[unit_df[flux_col] > 0.1]
+
+            if len(unit_df) < 10:
+                results.append({
+                    "unit": u, "status": "ERROR",
+                    "error_type": "INSUFFICIENT_PRODUCTION_DATA",
+                })
+                continue
+
+            res = self._process_unit(u, unit_df, tmp_limit, dpt_type)
+            if res is None:
+                results.append({
+                    "unit": u, "status": "ERROR",
+                    "error_type": "MODEL_FIT_FAILED",
+                })
+            elif res.get("status") == "ERROR":
+                results.append(res)
+            else:
+                res["status"] = "OK"
+                results.append(res)
+
+        result_df = pd.DataFrame(results)
 
-        # 发布正常/异常结果
         for _, row in result_df.iterrows():
             row_dict = row.to_dict()
             if row_dict.get("status") == "ERROR":
@@ -78,6 +164,8 @@ class ROCIPAdapter:
         })
         return result_df
 
+    # ==================== 异常联动 ====================
+
     def _has_ro_anomaly(self, diag_result: dict) -> bool:
         """检查诊断结果中是否包含 RO 段异常。"""
         if diag_result.get("status") != "abnormal":
@@ -88,6 +176,8 @@ class ROCIPAdapter:
                 return True
         return False
 
+    # ==================== 调度 ====================
+
     def run_scheduler(self):
         """每日定时 + RO 异常触发调度。"""
         last_run_date = None

+ 4 - 1
core/config.py

@@ -139,7 +139,10 @@ class PipelineConfig:
 
     @property
     def ro_units(self) -> List[str]:
-        return self._data["ro"].get("units", ["RO1", "RO2", "RO3", "RO4"])
+        units_map = self._data["ro"].get("units", {})
+        if isinstance(units_map, dict):
+            return units_map.get(self.plant, ["RO1", "RO2"])
+        return units_map
 
     @property
     def ro_schedule(self) -> str:

+ 4 - 1
core/config/pipeline_config.yaml

@@ -39,7 +39,10 @@ ro:
   tmp_limit: 0.21
   dpt_type: DPT_1
   schedule: daily
-  units: ["RO1", "RO2", "RO3", "RO4"]
+  cycle_days: 30
+  units:
+    xishan: ["RO1", "RO2", "RO3", "RO4"]
+    longting: ["RO1", "RO2"]
   history_paths:
     DPT_1: models/ro_mechanism_predict/use_data/cycle_rv_seg1_results.csv
     DPT_2: models/ro_mechanism_predict/use_data/cycle_rv_seg2_results.csv

+ 4 - 3
core/data_provider.py

@@ -89,9 +89,10 @@ class DataProvider:
     # ======================== 历史数据查询 ========================
 
     def query_single_point_history(
-        self, item_name: str, start_time: datetime, end_time: datetime
+        self, item_name: str, start_time: datetime, end_time: datetime,
+        interval: str = "s",
     ) -> pd.Series:
-        """查询单个 PLC 点位的历史数据(按秒采样)。"""
+        """查询单个 PLC 点位的历史数据。interval: s/minute/hour 等。"""
         params = {
             "deviceid": "1",
             "dataitemid": item_name,
@@ -99,7 +100,7 @@ class DataProvider:
             "stime": int(start_time.timestamp() * 1000),
             "etime": int(end_time.timestamp() * 1000),
             "size": "1",
-            "interval": "s",
+            "interval": interval,
             "aggregator": "new",
         }
         url = f"{self._api_base}/api/v1/jinke-cloud/db/device/history-data"