| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- """
- 异常诊断模型适配器。定时运行诊断并将结果写入共享状态。
- """
- import logging
- import os
- import sys
- import time
- from pathlib import Path
- from typing import Optional
- import pandas as pd
- from core.config import PipelineConfig
- from core.data_provider import DataProvider
- from core.shared_state import SharedState
- logger = logging.getLogger(__name__)
- class DiagnosisAdapter:
- def __init__(
- self,
- plant: str,
- data_provider: DataProvider,
- shared_state: SharedState,
- config: PipelineConfig,
- ):
- self.plant = plant
- self.data_provider = data_provider
- self.shared_state = shared_state
- self.config = config
- diag_root = str(config.diagnosis_root.resolve())
- # 加载诊断模型配置
- if diag_root not in sys.path:
- sys.path.insert(0, diag_root)
- os.chdir(diag_root)
- try:
- from config import config as diag_config
- diag_config.load(plant)
- finally:
- os.chdir(str(config.diagnosis_root.parent.parent))
- self._diag_config = diag_config
- self._diag_root = diag_root
- # 加载 PLC 点位列表
- points_file = Path(diag_root) / plant / "input_format.txt"
- self._plc_points = self._load_plc_points(str(points_file))
- # 延迟初始化 diagnoser(模型权重较重)
- self._diagnoser = None
- def _ensure_diagnoser(self):
- if self._diagnoser is not None:
- return
- old_cwd = os.getcwd()
- os.chdir(self._diag_root)
- try:
- from test import WaterPlantDiagnoser
- self._diagnoser = WaterPlantDiagnoser()
- finally:
- os.chdir(old_cwd)
- @staticmethod
- def _load_plc_points(filepath: str) -> list:
- points = []
- with open(filepath, "r", encoding="utf-8-sig") as f:
- for line in f:
- pt = line.strip()
- if pt and pt.lower() != "index":
- points.append(pt)
- logger.info(f"加载了 {len(points)} 个 PLC 点位")
- return points
- def run_once(self) -> dict:
- """运行一次诊断并发布结果。"""
- self._ensure_diagnoser()
- # 获取 40 分钟历史数据
- df = self.data_provider.query_points_history(
- self._plc_points,
- duration_minutes=self.config.diagnosis_interval_minutes,
- )
- # 转为 api_predict 期望的格式:第一列为 time,后续为传感器值
- # 并用线性插值补齐缺失,确保降采样后有足够的数据行
- df = df.reset_index().rename(columns={"index": "time"})
- df["time"] = pd.to_datetime(df["time"], errors="coerce")
- df = df.dropna(subset=["time"]).sort_values("time")
- # 以 4s 间隔重建完整时间轴,插值填充间隙
- if len(df) >= 2:
- full_idx = pd.date_range(start=df["time"].min(), end=df["time"].max(), freq="4s")
- df = df.set_index("time")
- df = df.reindex(full_idx)
- df = df.interpolate(method="time", limit_direction="both")
- df = df.ffill().bfill()
- df = df.reset_index().rename(columns={"index": "time"})
- # 执行诊断
- result = self._diagnoser.api_predict(df, mode="auto")
- # 发布到共享状态
- self.shared_state.publish("diagnosis_results", result)
- logger.info(f"诊断完成: status={result.get('status')}")
- return result
- def run_loop(self, interval_minutes: Optional[int] = None):
- """定时循环运行诊断。"""
- interval = interval_minutes or self.config.diagnosis_interval_minutes
- while True:
- try:
- self.run_once()
- except Exception as e:
- logger.error(f"诊断异常: {e}", exc_info=True)
- time.sleep(interval * 60)
|