| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- """
- RO CIP 预测模型适配器。支持每日定时 + 异常触发两种运行模式。
- 拉取最近 cycle_days 天的实时在线数据,通过 minute 间隔 API 查询。
- """
- import logging
- import time
- from datetime import datetime, timedelta
- from typing import List, Optional
- import numpy as np
- import pandas as pd
- from core.config import PipelineConfig
- from core.data_provider import DataProvider
- from core.shared_state import SharedState
- logger = logging.getLogger(__name__)
- # RO CIP 预测所需的 PLC 传感器点位
- _RO_PLC_POINTS = [
- "RO1_FluxF", "RO2_FluxF", "RO3_FluxF", "RO4_FluxF",
- "C.M.RO1_DB@DPT_1", "C.M.RO2_DB@DPT_1", "C.M.RO3_DB@DPT_1", "C.M.RO4_DB@DPT_1",
- "C.M.RO1_DB@DPT_2", "C.M.RO2_DB@DPT_2", "C.M.RO3_DB@DPT_2", "C.M.RO4_DB@DPT_2",
- "C.M.RO_TT_ZJS@out",
- "C.M.RO_Cond_ZJS@out",
- "C.M.RO_PH_ZJS@out",
- "C.M.RO_ORP_ZJS@out",
- ]
- class ROCIPAdapter:
- def __init__(
- self,
- plant: str,
- data_provider: DataProvider,
- shared_state: SharedState,
- config: PipelineConfig,
- ):
- self.plant = plant
- self.data_provider = data_provider
- self.shared_state = shared_state
- self.config = config
- # 导入 RO 模型函数(不触发 __main__)
- import importlib
- import sys
- ro_root = str(config.ro_root / "cip_predict")
- if ro_root not in sys.path:
- sys.path.insert(0, ro_root)
- ro_module = importlib.import_module("run_this")
- self._process_unit = ro_module.process_unit
- self._main = ro_module.main
- # ==================== 数据拉取 ====================
- def _fetch_cycle_data(self) -> pd.DataFrame:
- """从 API 拉取最近 cycle_days 天的 RO 数据(minute 间隔,5 分钟降采样)。"""
- cycle_days = self.config.get("ro.cycle_days", 30)
- end_time = datetime.now()
- start_time = end_time - timedelta(days=cycle_days)
- resample_interval = 300 # 5 分钟
- time_index = pd.date_range(start=start_time, end=end_time, freq=f"{resample_interval}s")
- result = {"time": time_index}
- success, empty = 0, 0
- for pt in _RO_PLC_POINTS:
- raw = self.data_provider.query_single_point_history(
- pt, start_time, end_time, interval="minute"
- )
- if raw.empty:
- result[pt] = np.nan
- empty += 1
- else:
- resampled = raw.resample(f"{resample_interval}s").mean()
- resampled = resampled.reindex(
- time_index,
- method="nearest",
- tolerance=pd.Timedelta(seconds=resample_interval),
- )
- result[pt] = resampled.ffill().bfill()
- success += 1
- logger.info(f"RO 数据拉取: {success}/{len(_RO_PLC_POINTS)} 成功, 范围 {cycle_days} 天")
- df = pd.DataFrame(result)
- df["time"] = pd.to_datetime(df["time"])
- sensor_cols = [c for c in _RO_PLC_POINTS if c in df.columns]
- df = df.dropna(subset=sensor_cols, how="all").reset_index(drop=True)
- return df
- # ==================== 预测 ====================
- def run_prediction(
- self,
- units: Optional[List[str]] = None,
- tmp_limit: Optional[float] = None,
- dpt_type: Optional[str] = None,
- df: Optional[pd.DataFrame] = None,
- ) -> pd.DataFrame:
- """运行 CIP 预测并发布结果。"""
- units = units or self.config.ro_units
- tmp_limit = tmp_limit or self.config.ro_tmp_limit
- dpt_type = dpt_type or self.config.ro_dpt_type
- if df is None:
- df = self._fetch_cycle_data()
- if df.empty or len(df) < 10:
- logger.warning(f"RO 数据不足 ({len(df)} 行), 跳过本轮预测")
- return pd.DataFrame()
- # 按机组过滤非生产数据后逐个预测(避免 flux≈0 导致 R1=∞ 传播)
- results = []
- for u in units:
- unit_df = df.copy()
- flux_col = f"{u}_FluxF"
- if flux_col in unit_df.columns:
- unit_df = unit_df[unit_df[flux_col] > 0.1]
- if len(unit_df) < 10:
- results.append({
- "unit": u, "status": "ERROR",
- "error_type": "INSUFFICIENT_PRODUCTION_DATA",
- })
- continue
- res = self._process_unit(u, unit_df, tmp_limit, dpt_type)
- if res is None:
- results.append({
- "unit": u, "status": "ERROR",
- "error_type": "MODEL_FIT_FAILED",
- })
- elif res.get("status") == "ERROR":
- results.append(res)
- else:
- res["status"] = "OK"
- results.append(res)
- result_df = pd.DataFrame(results)
- for _, row in result_df.iterrows():
- row_dict = row.to_dict()
- if row_dict.get("status") == "ERROR":
- self.shared_state.publish("ro_anomalies", {
- "timestamp": datetime.now().isoformat(),
- **row_dict,
- })
- logger.warning(f"RO 异常: {row_dict}")
- else:
- logger.info(
- f"RO {row_dict.get('unit')} 预计下次 CIP: "
- f"{row_dict.get('remaining_days', '?'):.1f} 天后"
- )
- self.shared_state.publish("ro_predictions", {
- "timestamp": datetime.now().isoformat(),
- "plant": self.plant,
- "results": result_df.to_dict("records"),
- })
- return result_df
- # ==================== 异常联动 ====================
- def _has_ro_anomaly(self, diag_result: dict) -> bool:
- """检查诊断结果中是否包含 RO 段异常。"""
- if diag_result.get("status") != "abnormal":
- return False
- for trace in diag_result.get("results", []):
- trigger = trace.get("trigger", "")
- if "RO" in trigger or "DPT" in trigger:
- return True
- return False
- # ==================== 调度 ====================
- def run_scheduler(self):
- """每日定时 + RO 异常触发调度。"""
- last_run_date = None
- while True:
- now = datetime.now()
- # 检查诊断结果是否有 RO 异常
- diag = self.shared_state.read("diagnosis_results", max_age_seconds=7200)
- ro_anomaly = diag is not None and self._has_ro_anomaly(diag)
- # 每日定时(凌晨 8 点附近自然触发)
- is_new_day = now.date() != last_run_date and now.hour >= 8
- if ro_anomaly or is_new_day:
- try:
- logger.info(f"RO CIP 预测触发 (anomaly={ro_anomaly}, daily={is_new_day})")
- self.run_prediction()
- last_run_date = now.date()
- except Exception as e:
- logger.error(f"RO CIP 预测异常: {e}", exc_info=True)
- time.sleep(60 * 60) # 每小时检查一次
|