""" RO CIP 预测模型适配器。支持每日定时 + 异常触发两种运行模式。 拉取最近 cycle_days 天的实时在线数据,通过 minute 间隔 API 查询。 """ import logging import time from datetime import datetime, timedelta from typing import List, Optional import numpy as np import pandas as pd from core.config import PipelineConfig from core.data_provider import DataProvider from core.shared_state import SharedState logger = logging.getLogger(__name__) # RO CIP 预测所需的 PLC 传感器点位 _RO_PLC_POINTS = [ "RO1_FluxF", "RO2_FluxF", "RO3_FluxF", "RO4_FluxF", "C.M.RO1_DB@DPT_1", "C.M.RO2_DB@DPT_1", "C.M.RO3_DB@DPT_1", "C.M.RO4_DB@DPT_1", "C.M.RO1_DB@DPT_2", "C.M.RO2_DB@DPT_2", "C.M.RO3_DB@DPT_2", "C.M.RO4_DB@DPT_2", "C.M.RO_TT_ZJS@out", "C.M.RO_Cond_ZJS@out", "C.M.RO_PH_ZJS@out", "C.M.RO_ORP_ZJS@out", ] class ROCIPAdapter: def __init__( self, plant: str, data_provider: DataProvider, shared_state: SharedState, config: PipelineConfig, ): self.plant = plant self.data_provider = data_provider self.shared_state = shared_state self.config = config # 导入 RO 模型函数(不触发 __main__) import importlib import sys ro_root = str(config.ro_root / "cip_predict") if ro_root not in sys.path: sys.path.insert(0, ro_root) ro_module = importlib.import_module("run_this") self._process_unit = ro_module.process_unit self._main = ro_module.main # ==================== 数据拉取 ==================== def _fetch_cycle_data(self) -> pd.DataFrame: """从 API 拉取最近 cycle_days 天的 RO 数据(minute 间隔,5 分钟降采样)。""" cycle_days = self.config.get("ro.cycle_days", 30) end_time = datetime.now() start_time = end_time - timedelta(days=cycle_days) resample_interval = 300 # 5 分钟 time_index = pd.date_range(start=start_time, end=end_time, freq=f"{resample_interval}s") result = {"time": time_index} success, empty = 0, 0 for pt in _RO_PLC_POINTS: raw = self.data_provider.query_single_point_history( pt, start_time, end_time, interval="minute" ) if raw.empty: result[pt] = np.nan empty += 1 else: resampled = raw.resample(f"{resample_interval}s").mean() resampled = resampled.reindex( time_index, method="nearest", tolerance=pd.Timedelta(seconds=resample_interval), ) result[pt] = resampled.ffill().bfill() success += 1 logger.info(f"RO 数据拉取: {success}/{len(_RO_PLC_POINTS)} 成功, 范围 {cycle_days} 天") df = pd.DataFrame(result) df["time"] = pd.to_datetime(df["time"]) sensor_cols = [c for c in _RO_PLC_POINTS if c in df.columns] df = df.dropna(subset=sensor_cols, how="all").reset_index(drop=True) return df # ==================== 预测 ==================== def run_prediction( self, units: Optional[List[str]] = None, tmp_limit: Optional[float] = None, dpt_type: Optional[str] = None, df: Optional[pd.DataFrame] = None, ) -> pd.DataFrame: """运行 CIP 预测并发布结果。""" units = units or self.config.ro_units tmp_limit = tmp_limit or self.config.ro_tmp_limit dpt_type = dpt_type or self.config.ro_dpt_type if df is None: df = self._fetch_cycle_data() if df.empty or len(df) < 10: logger.warning(f"RO 数据不足 ({len(df)} 行), 跳过本轮预测") return pd.DataFrame() # 按机组过滤非生产数据后逐个预测(避免 flux≈0 导致 R1=∞ 传播) results = [] for u in units: unit_df = df.copy() flux_col = f"{u}_FluxF" if flux_col in unit_df.columns: unit_df = unit_df[unit_df[flux_col] > 0.1] if len(unit_df) < 10: results.append({ "unit": u, "status": "ERROR", "error_type": "INSUFFICIENT_PRODUCTION_DATA", }) continue res = self._process_unit(u, unit_df, tmp_limit, dpt_type) if res is None: results.append({ "unit": u, "status": "ERROR", "error_type": "MODEL_FIT_FAILED", }) elif res.get("status") == "ERROR": results.append(res) else: res["status"] = "OK" results.append(res) result_df = pd.DataFrame(results) for _, row in result_df.iterrows(): row_dict = row.to_dict() if row_dict.get("status") == "ERROR": self.shared_state.publish("ro_anomalies", { "timestamp": datetime.now().isoformat(), **row_dict, }) logger.warning(f"RO 异常: {row_dict}") else: logger.info( f"RO {row_dict.get('unit')} 预计下次 CIP: " f"{row_dict.get('remaining_days', '?'):.1f} 天后" ) self.shared_state.publish("ro_predictions", { "timestamp": datetime.now().isoformat(), "plant": self.plant, "results": result_df.to_dict("records"), }) return result_df # ==================== 异常联动 ==================== def _has_ro_anomaly(self, diag_result: dict) -> bool: """检查诊断结果中是否包含 RO 段异常。""" if diag_result.get("status") != "abnormal": return False for trace in diag_result.get("results", []): trigger = trace.get("trigger", "") if "RO" in trigger or "DPT" in trigger: return True return False # ==================== 调度 ==================== def run_scheduler(self): """每日定时 + RO 异常触发调度。""" last_run_date = None while True: now = datetime.now() # 检查诊断结果是否有 RO 异常 diag = self.shared_state.read("diagnosis_results", max_age_seconds=7200) ro_anomaly = diag is not None and self._has_ro_anomaly(diag) # 每日定时(凌晨 8 点附近自然触发) is_new_day = now.date() != last_run_date and now.hour >= 8 if ro_anomaly or is_new_day: try: logger.info(f"RO CIP 预测触发 (anomaly={ro_anomaly}, daily={is_new_day})") self.run_prediction() last_run_date = now.date() except Exception as e: logger.error(f"RO CIP 预测异常: {e}", exc_info=True) time.sleep(60 * 60) # 每小时检查一次