ro_adapter.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. """
  2. RO CIP 预测模型适配器。支持每日定时 + 异常触发两种运行模式。
  3. """
  4. import logging
  5. import time
  6. from datetime import datetime
  7. from typing import List, Optional
  8. import pandas as pd
  9. from core.config import PipelineConfig
  10. from core.data_provider import DataProvider
  11. from core.shared_state import SharedState
  12. logger = logging.getLogger(__name__)
  13. class ROCIPAdapter:
  14. def __init__(
  15. self,
  16. plant: str,
  17. data_provider: DataProvider,
  18. shared_state: SharedState,
  19. config: PipelineConfig,
  20. ):
  21. self.plant = plant
  22. self.data_provider = data_provider
  23. self.shared_state = shared_state
  24. self.config = config
  25. # 导入 RO 模型函数(不触发 __main__)
  26. import importlib
  27. import sys
  28. from pathlib import Path
  29. ro_root = str(config.ro_root / "cip_predict")
  30. if ro_root not in sys.path:
  31. sys.path.insert(0, ro_root)
  32. ro_module = importlib.import_module("run_this")
  33. self._process_unit = ro_module.process_unit
  34. self._main = ro_module.main
  35. self._load_history = ro_module.load_history
  36. def run_prediction(
  37. self,
  38. units: Optional[List[str]] = None,
  39. tmp_limit: Optional[float] = None,
  40. dpt_type: Optional[str] = None,
  41. df: Optional[pd.DataFrame] = None,
  42. ) -> pd.DataFrame:
  43. """运行 CIP 预测并发布结果。"""
  44. units = units or self.config.ro_units
  45. tmp_limit = tmp_limit or self.config.ro_tmp_limit
  46. dpt_type = dpt_type or self.config.ro_dpt_type
  47. result_df = self._main(units, tmp_limit, dpt_type, df=df)
  48. # 发布正常/异常结果
  49. for _, row in result_df.iterrows():
  50. row_dict = row.to_dict()
  51. if row_dict.get("status") == "ERROR":
  52. self.shared_state.publish("ro_anomalies", {
  53. "timestamp": datetime.now().isoformat(),
  54. **row_dict,
  55. })
  56. logger.warning(f"RO 异常: {row_dict}")
  57. else:
  58. logger.info(
  59. f"RO {row_dict.get('unit')} 预计下次 CIP: "
  60. f"{row_dict.get('remaining_days', '?'):.1f} 天后"
  61. )
  62. self.shared_state.publish("ro_predictions", {
  63. "timestamp": datetime.now().isoformat(),
  64. "plant": self.plant,
  65. "results": result_df.to_dict("records"),
  66. })
  67. return result_df
  68. def _has_ro_anomaly(self, diag_result: dict) -> bool:
  69. """检查诊断结果中是否包含 RO 段异常。"""
  70. if diag_result.get("status") != "abnormal":
  71. return False
  72. for trace in diag_result.get("results", []):
  73. trigger = trace.get("trigger", "")
  74. if "RO" in trigger or "DPT" in trigger:
  75. return True
  76. return False
  77. def run_scheduler(self):
  78. """每日定时 + RO 异常触发调度。"""
  79. last_run_date = None
  80. while True:
  81. now = datetime.now()
  82. # 检查诊断结果是否有 RO 异常
  83. diag = self.shared_state.read("diagnosis_results", max_age_seconds=7200)
  84. ro_anomaly = diag is not None and self._has_ro_anomaly(diag)
  85. # 每日定时(凌晨 8 点附近自然触发)
  86. is_new_day = now.date() != last_run_date and now.hour >= 8
  87. if ro_anomaly or is_new_day:
  88. try:
  89. logger.info(f"RO CIP 预测触发 (anomaly={ro_anomaly}, daily={is_new_day})")
  90. self.run_prediction()
  91. last_run_date = now.date()
  92. except Exception as e:
  93. logger.error(f"RO CIP 预测异常: {e}", exc_info=True)
  94. time.sleep(60 * 60) # 每小时检查一次