""" 异常诊断模型适配器。定时运行诊断并将结果写入共享状态。 """ import logging import os import sys import time from pathlib import Path from typing import Optional import pandas as pd from core.config import PipelineConfig from core.data_provider import DataProvider from core.shared_state import SharedState logger = logging.getLogger(__name__) class DiagnosisAdapter: def __init__( self, plant: str, data_provider: DataProvider, shared_state: SharedState, config: PipelineConfig, ): self.plant = plant self.data_provider = data_provider self.shared_state = shared_state self.config = config diag_root = str(config.diagnosis_root.resolve()) # 加载诊断模型配置 if diag_root not in sys.path: sys.path.insert(0, diag_root) os.chdir(diag_root) try: from config import config as diag_config diag_config.load(plant) finally: os.chdir(str(config.diagnosis_root.parent.parent)) self._diag_config = diag_config self._diag_root = diag_root # 加载 PLC 点位列表 points_file = Path(diag_root) / plant / "input_format.txt" self._plc_points = self._load_plc_points(str(points_file)) # 延迟初始化 diagnoser(模型权重较重) self._diagnoser = None def _ensure_diagnoser(self): if self._diagnoser is not None: return old_cwd = os.getcwd() os.chdir(self._diag_root) try: from test import WaterPlantDiagnoser self._diagnoser = WaterPlantDiagnoser() finally: os.chdir(old_cwd) @staticmethod def _load_plc_points(filepath: str) -> list: points = [] with open(filepath, "r", encoding="utf-8-sig") as f: for line in f: pt = line.strip() if pt and pt.lower() != "index": points.append(pt) logger.info(f"加载了 {len(points)} 个 PLC 点位") return points def run_once(self) -> dict: """运行一次诊断并发布结果。""" self._ensure_diagnoser() # 获取 40 分钟历史数据 df = self.data_provider.query_points_history( self._plc_points, duration_minutes=self.config.diagnosis_interval_minutes, ) # 转为 api_predict 期望的格式:第一列为 time,后续为传感器值 # 并用线性插值补齐缺失,确保降采样后有足够的数据行 df = df.reset_index().rename(columns={"index": "time"}) df["time"] = pd.to_datetime(df["time"], errors="coerce") df = df.dropna(subset=["time"]).sort_values("time") # 以 4s 间隔重建完整时间轴,插值填充间隙 if len(df) >= 2: full_idx = pd.date_range(start=df["time"].min(), end=df["time"].max(), freq="4s") df = df.set_index("time") df = df.reindex(full_idx) df = df.interpolate(method="time", limit_direction="both") df = df.ffill().bfill() df = df.reset_index().rename(columns={"index": "time"}) # 执行诊断 result = self._diagnoser.api_predict(df, mode="auto") # 发布到共享状态 self.shared_state.publish("diagnosis_results", result) logger.info(f"诊断完成: status={result.get('status')}") return result def run_loop(self, interval_minutes: Optional[int] = None): """定时循环运行诊断。""" interval = interval_minutes or self.config.diagnosis_interval_minutes while True: try: self.run_once() except Exception as e: logger.error(f"诊断异常: {e}", exc_info=True) time.sleep(interval * 60)