""" RO CIP 预测模型适配器。支持每日定时 + 异常触发两种运行模式。 """ import logging import time from datetime import datetime from typing import List, Optional import pandas as pd from core.config import PipelineConfig from core.data_provider import DataProvider from core.shared_state import SharedState logger = logging.getLogger(__name__) class ROCIPAdapter: def __init__( self, plant: str, data_provider: DataProvider, shared_state: SharedState, config: PipelineConfig, ): self.plant = plant self.data_provider = data_provider self.shared_state = shared_state self.config = config # 导入 RO 模型函数(不触发 __main__) import importlib import sys from pathlib import Path ro_root = str(config.ro_root / "cip_predict") if ro_root not in sys.path: sys.path.insert(0, ro_root) ro_module = importlib.import_module("run_this") self._process_unit = ro_module.process_unit self._main = ro_module.main self._load_history = ro_module.load_history def run_prediction( self, units: Optional[List[str]] = None, tmp_limit: Optional[float] = None, dpt_type: Optional[str] = None, df: Optional[pd.DataFrame] = None, ) -> pd.DataFrame: """运行 CIP 预测并发布结果。""" units = units or self.config.ro_units tmp_limit = tmp_limit or self.config.ro_tmp_limit dpt_type = dpt_type or self.config.ro_dpt_type result_df = self._main(units, tmp_limit, dpt_type, df=df) # 发布正常/异常结果 for _, row in result_df.iterrows(): row_dict = row.to_dict() if row_dict.get("status") == "ERROR": self.shared_state.publish("ro_anomalies", { "timestamp": datetime.now().isoformat(), **row_dict, }) logger.warning(f"RO 异常: {row_dict}") else: logger.info( f"RO {row_dict.get('unit')} 预计下次 CIP: " f"{row_dict.get('remaining_days', '?'):.1f} 天后" ) self.shared_state.publish("ro_predictions", { "timestamp": datetime.now().isoformat(), "plant": self.plant, "results": result_df.to_dict("records"), }) return result_df def _has_ro_anomaly(self, diag_result: dict) -> bool: """检查诊断结果中是否包含 RO 段异常。""" if diag_result.get("status") != "abnormal": return False for trace in diag_result.get("results", []): trigger = trace.get("trigger", "") if "RO" in trigger or "DPT" in trigger: return True return False def run_scheduler(self): """每日定时 + RO 异常触发调度。""" last_run_date = None while True: now = datetime.now() # 检查诊断结果是否有 RO 异常 diag = self.shared_state.read("diagnosis_results", max_age_seconds=7200) ro_anomaly = diag is not None and self._has_ro_anomaly(diag) # 每日定时(凌晨 8 点附近自然触发) is_new_day = now.date() != last_run_date and now.hour >= 8 if ro_anomaly or is_new_day: try: logger.info(f"RO CIP 预测触发 (anomaly={ro_anomaly}, daily={is_new_day})") self.run_prediction() last_run_date = now.date() except Exception as e: logger.error(f"RO CIP 预测异常: {e}", exc_info=True) time.sleep(60 * 60) # 每小时检查一次