| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- """
- RO CIP 预测模型适配器。支持每日定时 + 异常触发两种运行模式。
- """
- import logging
- import time
- from datetime import datetime
- from typing import List, Optional
- import pandas as pd
- from core.config import PipelineConfig
- from core.data_provider import DataProvider
- from core.shared_state import SharedState
- logger = logging.getLogger(__name__)
- class ROCIPAdapter:
- def __init__(
- self,
- plant: str,
- data_provider: DataProvider,
- shared_state: SharedState,
- config: PipelineConfig,
- ):
- self.plant = plant
- self.data_provider = data_provider
- self.shared_state = shared_state
- self.config = config
- # 导入 RO 模型函数(不触发 __main__)
- import importlib
- import sys
- from pathlib import Path
- ro_root = str(config.ro_root / "cip_predict")
- if ro_root not in sys.path:
- sys.path.insert(0, ro_root)
- ro_module = importlib.import_module("run_this")
- self._process_unit = ro_module.process_unit
- self._main = ro_module.main
- self._load_history = ro_module.load_history
- def run_prediction(
- self,
- units: Optional[List[str]] = None,
- tmp_limit: Optional[float] = None,
- dpt_type: Optional[str] = None,
- df: Optional[pd.DataFrame] = None,
- ) -> pd.DataFrame:
- """运行 CIP 预测并发布结果。"""
- units = units or self.config.ro_units
- tmp_limit = tmp_limit or self.config.ro_tmp_limit
- dpt_type = dpt_type or self.config.ro_dpt_type
- result_df = self._main(units, tmp_limit, dpt_type, df=df)
- # 发布正常/异常结果
- for _, row in result_df.iterrows():
- row_dict = row.to_dict()
- if row_dict.get("status") == "ERROR":
- self.shared_state.publish("ro_anomalies", {
- "timestamp": datetime.now().isoformat(),
- **row_dict,
- })
- logger.warning(f"RO 异常: {row_dict}")
- else:
- logger.info(
- f"RO {row_dict.get('unit')} 预计下次 CIP: "
- f"{row_dict.get('remaining_days', '?'):.1f} 天后"
- )
- self.shared_state.publish("ro_predictions", {
- "timestamp": datetime.now().isoformat(),
- "plant": self.plant,
- "results": result_df.to_dict("records"),
- })
- return result_df
- def _has_ro_anomaly(self, diag_result: dict) -> bool:
- """检查诊断结果中是否包含 RO 段异常。"""
- if diag_result.get("status") != "abnormal":
- return False
- for trace in diag_result.get("results", []):
- trigger = trace.get("trigger", "")
- if "RO" in trigger or "DPT" in trigger:
- return True
- return False
- def run_scheduler(self):
- """每日定时 + RO 异常触发调度。"""
- last_run_date = None
- while True:
- now = datetime.now()
- # 检查诊断结果是否有 RO 异常
- diag = self.shared_state.read("diagnosis_results", max_age_seconds=7200)
- ro_anomaly = diag is not None and self._has_ro_anomaly(diag)
- # 每日定时(凌晨 8 点附近自然触发)
- is_new_day = now.date() != last_run_date and now.hour >= 8
- if ro_anomaly or is_new_day:
- try:
- logger.info(f"RO CIP 预测触发 (anomaly={ro_anomaly}, daily={is_new_day})")
- self.run_prediction()
- last_run_date = now.date()
- except Exception as e:
- logger.error(f"RO CIP 预测异常: {e}", exc_info=True)
- time.sleep(60 * 60) # 每小时检查一次
|