diagnosis_adapter.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. """
  2. 异常诊断模型适配器。定时运行诊断并将结果写入共享状态。
  3. """
  4. import logging
  5. import os
  6. import sys
  7. import time
  8. from pathlib import Path
  9. from typing import Optional
  10. import pandas as pd
  11. from core.config import PipelineConfig
  12. from core.data_provider import DataProvider
  13. from core.shared_state import SharedState
  14. logger = logging.getLogger(__name__)
  15. class DiagnosisAdapter:
  16. def __init__(
  17. self,
  18. plant: str,
  19. data_provider: DataProvider,
  20. shared_state: SharedState,
  21. config: PipelineConfig,
  22. ):
  23. self.plant = plant
  24. self.data_provider = data_provider
  25. self.shared_state = shared_state
  26. self.config = config
  27. diag_root = str(config.diagnosis_root.resolve())
  28. # 加载诊断模型配置
  29. if diag_root not in sys.path:
  30. sys.path.insert(0, diag_root)
  31. os.chdir(diag_root)
  32. try:
  33. from config import config as diag_config
  34. diag_config.load(plant)
  35. finally:
  36. os.chdir(str(config.diagnosis_root.parent.parent))
  37. self._diag_config = diag_config
  38. self._diag_root = diag_root
  39. # 加载 PLC 点位列表
  40. points_file = Path(diag_root) / plant / "input_format.txt"
  41. self._plc_points = self._load_plc_points(str(points_file))
  42. # 延迟初始化 diagnoser(模型权重较重)
  43. self._diagnoser = None
  44. def _ensure_diagnoser(self):
  45. if self._diagnoser is not None:
  46. return
  47. old_cwd = os.getcwd()
  48. os.chdir(self._diag_root)
  49. try:
  50. from test import WaterPlantDiagnoser
  51. self._diagnoser = WaterPlantDiagnoser()
  52. finally:
  53. os.chdir(old_cwd)
  54. @staticmethod
  55. def _load_plc_points(filepath: str) -> list:
  56. points = []
  57. with open(filepath, "r", encoding="utf-8-sig") as f:
  58. for line in f:
  59. pt = line.strip()
  60. if pt and pt.lower() != "index":
  61. points.append(pt)
  62. logger.info(f"加载了 {len(points)} 个 PLC 点位")
  63. return points
  64. def run_once(self) -> dict:
  65. """运行一次诊断并发布结果。"""
  66. self._ensure_diagnoser()
  67. # 获取 40 分钟历史数据
  68. df = self.data_provider.query_points_history(
  69. self._plc_points,
  70. duration_minutes=self.config.diagnosis_interval_minutes,
  71. )
  72. # 转为 api_predict 期望的格式:第一列为 time,后续为传感器值
  73. # 并用线性插值补齐缺失,确保降采样后有足够的数据行
  74. df = df.reset_index().rename(columns={"index": "time"})
  75. df["time"] = pd.to_datetime(df["time"], errors="coerce")
  76. df = df.dropna(subset=["time"]).sort_values("time")
  77. # 以 4s 间隔重建完整时间轴,插值填充间隙
  78. if len(df) >= 2:
  79. full_idx = pd.date_range(start=df["time"].min(), end=df["time"].max(), freq="4s")
  80. df = df.set_index("time")
  81. df = df.reindex(full_idx)
  82. df = df.interpolate(method="time", limit_direction="both")
  83. df = df.ffill().bfill()
  84. df = df.reset_index().rename(columns={"index": "time"})
  85. # 执行诊断
  86. result = self._diagnoser.api_predict(df, mode="auto")
  87. # 发布到共享状态
  88. self.shared_state.publish("diagnosis_results", result)
  89. logger.info(f"诊断完成: status={result.get('status')}")
  90. return result
  91. def run_loop(self, interval_minutes: Optional[int] = None):
  92. """定时循环运行诊断。"""
  93. interval = interval_minutes or self.config.diagnosis_interval_minutes
  94. while True:
  95. try:
  96. self.run_once()
  97. except Exception as e:
  98. logger.error(f"诊断异常: {e}", exc_info=True)
  99. time.sleep(interval * 60)