ro_adapter.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. """
  2. RO CIP 预测模型适配器。支持每日定时 + 异常触发两种运行模式。
  3. 拉取最近 cycle_days 天的实时在线数据,通过 minute 间隔 API 查询。
  4. """
  5. import logging
  6. import time
  7. from datetime import datetime, timedelta
  8. from typing import List, Optional
  9. import numpy as np
  10. import pandas as pd
  11. from core.config import PipelineConfig
  12. from core.data_provider import DataProvider
  13. from core.shared_state import SharedState
  14. logger = logging.getLogger(__name__)
  15. # RO CIP 预测所需的 PLC 传感器点位
  16. _RO_PLC_POINTS = [
  17. "RO1_FluxF", "RO2_FluxF", "RO3_FluxF", "RO4_FluxF",
  18. "C.M.RO1_DB@DPT_1", "C.M.RO2_DB@DPT_1", "C.M.RO3_DB@DPT_1", "C.M.RO4_DB@DPT_1",
  19. "C.M.RO1_DB@DPT_2", "C.M.RO2_DB@DPT_2", "C.M.RO3_DB@DPT_2", "C.M.RO4_DB@DPT_2",
  20. "C.M.RO_TT_ZJS@out",
  21. "C.M.RO_Cond_ZJS@out",
  22. "C.M.RO_PH_ZJS@out",
  23. "C.M.RO_ORP_ZJS@out",
  24. ]
  25. class ROCIPAdapter:
  26. def __init__(
  27. self,
  28. plant: str,
  29. data_provider: DataProvider,
  30. shared_state: SharedState,
  31. config: PipelineConfig,
  32. ):
  33. self.plant = plant
  34. self.data_provider = data_provider
  35. self.shared_state = shared_state
  36. self.config = config
  37. # 导入 RO 模型函数(不触发 __main__)
  38. import importlib
  39. import sys
  40. ro_root = str(config.ro_root / "cip_predict")
  41. if ro_root not in sys.path:
  42. sys.path.insert(0, ro_root)
  43. ro_module = importlib.import_module("run_this")
  44. self._process_unit = ro_module.process_unit
  45. self._main = ro_module.main
  46. # ==================== 数据拉取 ====================
  47. def _fetch_cycle_data(self) -> pd.DataFrame:
  48. """从 API 拉取最近 cycle_days 天的 RO 数据(minute 间隔,5 分钟降采样)。"""
  49. cycle_days = self.config.get("ro.cycle_days", 30)
  50. end_time = datetime.now()
  51. start_time = end_time - timedelta(days=cycle_days)
  52. resample_interval = 300 # 5 分钟
  53. time_index = pd.date_range(start=start_time, end=end_time, freq=f"{resample_interval}s")
  54. result = {"time": time_index}
  55. success, empty = 0, 0
  56. for pt in _RO_PLC_POINTS:
  57. raw = self.data_provider.query_single_point_history(
  58. pt, start_time, end_time, interval="minute"
  59. )
  60. if raw.empty:
  61. result[pt] = np.nan
  62. empty += 1
  63. else:
  64. resampled = raw.resample(f"{resample_interval}s").mean()
  65. resampled = resampled.reindex(
  66. time_index,
  67. method="nearest",
  68. tolerance=pd.Timedelta(seconds=resample_interval),
  69. )
  70. result[pt] = resampled.ffill().bfill()
  71. success += 1
  72. logger.info(f"RO 数据拉取: {success}/{len(_RO_PLC_POINTS)} 成功, 范围 {cycle_days} 天")
  73. df = pd.DataFrame(result)
  74. df["time"] = pd.to_datetime(df["time"])
  75. sensor_cols = [c for c in _RO_PLC_POINTS if c in df.columns]
  76. df = df.dropna(subset=sensor_cols, how="all").reset_index(drop=True)
  77. return df
  78. # ==================== 预测 ====================
  79. def run_prediction(
  80. self,
  81. units: Optional[List[str]] = None,
  82. tmp_limit: Optional[float] = None,
  83. dpt_type: Optional[str] = None,
  84. df: Optional[pd.DataFrame] = None,
  85. ) -> pd.DataFrame:
  86. """运行 CIP 预测并发布结果。"""
  87. units = units or self.config.ro_units
  88. tmp_limit = tmp_limit or self.config.ro_tmp_limit
  89. dpt_type = dpt_type or self.config.ro_dpt_type
  90. if df is None:
  91. df = self._fetch_cycle_data()
  92. if df.empty or len(df) < 10:
  93. logger.warning(f"RO 数据不足 ({len(df)} 行), 跳过本轮预测")
  94. return pd.DataFrame()
  95. # 按机组过滤非生产数据后逐个预测(避免 flux≈0 导致 R1=∞ 传播)
  96. results = []
  97. for u in units:
  98. unit_df = df.copy()
  99. flux_col = f"{u}_FluxF"
  100. if flux_col in unit_df.columns:
  101. unit_df = unit_df[unit_df[flux_col] > 0.1]
  102. if len(unit_df) < 10:
  103. results.append({
  104. "unit": u, "status": "ERROR",
  105. "error_type": "INSUFFICIENT_PRODUCTION_DATA",
  106. })
  107. continue
  108. res = self._process_unit(u, unit_df, tmp_limit, dpt_type)
  109. if res is None:
  110. results.append({
  111. "unit": u, "status": "ERROR",
  112. "error_type": "MODEL_FIT_FAILED",
  113. })
  114. elif res.get("status") == "ERROR":
  115. results.append(res)
  116. else:
  117. res["status"] = "OK"
  118. results.append(res)
  119. result_df = pd.DataFrame(results)
  120. for _, row in result_df.iterrows():
  121. row_dict = row.to_dict()
  122. if row_dict.get("status") == "ERROR":
  123. self.shared_state.publish("ro_anomalies", {
  124. "timestamp": datetime.now().isoformat(),
  125. **row_dict,
  126. })
  127. logger.warning(f"RO 异常: {row_dict}")
  128. else:
  129. logger.info(
  130. f"RO {row_dict.get('unit')} 预计下次 CIP: "
  131. f"{row_dict.get('remaining_days', '?'):.1f} 天后"
  132. )
  133. self.shared_state.publish("ro_predictions", {
  134. "timestamp": datetime.now().isoformat(),
  135. "plant": self.plant,
  136. "results": result_df.to_dict("records"),
  137. })
  138. return result_df
  139. # ==================== 异常联动 ====================
  140. def _has_ro_anomaly(self, diag_result: dict) -> bool:
  141. """检查诊断结果中是否包含 RO 段异常。"""
  142. if diag_result.get("status") != "abnormal":
  143. return False
  144. for trace in diag_result.get("results", []):
  145. trigger = trace.get("trigger", "")
  146. if "RO" in trigger or "DPT" in trigger:
  147. return True
  148. return False
  149. # ==================== 调度 ====================
  150. def run_scheduler(self):
  151. """每日定时 + RO 异常触发调度。"""
  152. last_run_date = None
  153. while True:
  154. now = datetime.now()
  155. # 检查诊断结果是否有 RO 异常
  156. diag = self.shared_state.read("diagnosis_results", max_age_seconds=7200)
  157. ro_anomaly = diag is not None and self._has_ro_anomaly(diag)
  158. # 每日定时(凌晨 8 点附近自然触发)
  159. is_new_day = now.date() != last_run_date and now.hour >= 8
  160. if ro_anomaly or is_new_day:
  161. try:
  162. logger.info(f"RO CIP 预测触发 (anomaly={ro_anomaly}, daily={is_new_day})")
  163. self.run_prediction()
  164. last_run_date = now.date()
  165. except Exception as e:
  166. logger.error(f"RO CIP 预测异常: {e}", exc_info=True)
  167. time.sleep(60 * 60) # 每小时检查一次