""" DualFlow 三模型统一管线主入口。 启动 UF-RL(事件驱动)、异常诊断(定时)、RO CIP(每日/触发)三个服务线程。 用法: python -m core.pipeline # 默认 longting python -m core.pipeline --plant xishan # 指定水厂 """ import argparse import logging import signal import sys import threading import time from pathlib import Path # 确保 core/ 目录在 sys.path,使 from core.xxx 可用 CORE_ROOT = Path(__file__).resolve().parent PROJECT_ROOT = CORE_ROOT.parent for p in (str(PROJECT_ROOT), str(CORE_ROOT)): if p not in sys.path: sys.path.insert(0, p) from core.config import PipelineConfig from core.data_provider import DataProvider from core.shared_state import SharedState from core.adapters.diagnosis_adapter import DiagnosisAdapter from core.adapters.ro_adapter import ROCIPAdapter from core.adapters.uf_adapter import UFAdapter logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(threadName)s] %(levelname)s %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger("pipeline") class DualFlowPipeline: def __init__(self, plant: str = "longting", config_path: str = None): self.plant = plant self.config = PipelineConfig(config_path=config_path, plant=plant) self.shared_state = SharedState() self.data_provider = DataProvider(self.config) # 初始化适配器 self.uf = UFAdapter(plant, self.data_provider, self.shared_state, self.config) self.ro = ROCIPAdapter(plant, self.data_provider, self.shared_state, self.config) # 诊断模型可能不存在所有 plant 的配置 self._diag_available = (self.config.diagnosis_root / plant / "config.yaml").exists() self.diag = None if self._diag_available: self.diag = DiagnosisAdapter( plant, self.data_provider, self.shared_state, self.config ) self._threads: list = [] self._running = False def start(self): self._running = True logger.info(f"=== DualFlow Pipeline 启动 (plant={self.plant}, dry_run={self.config.dry_run}) ===") # UF: 每个机组一个线程 for unit in self.config.uf_units: t = threading.Thread( target=self.uf.run_unit_forever, args=(unit,), name=f"UF-{unit}", daemon=True, ) self._threads.append(t) # 诊断: 定时线程 if self.diag: t = threading.Thread( target=self.diag.run_loop, name="Diagnosis", daemon=True, ) self._threads.append(t) else: logger.warning(f"plant={self.plant} 无诊断模型配置,跳过诊断线程") # RO: 调度线程 t = threading.Thread( target=self.ro.run_scheduler, name="RO-CIP", daemon=True, ) self._threads.append(t) # 启动所有线程 for t in self._threads: t.start() logger.info(f"线程已启动: {t.name}") # 主线程:健康监控 self._monitor() def _monitor(self): while self._running: for t in self._threads: if not t.is_alive(): logger.error(f"线程已死亡: {t.name}") time.sleep(60) def stop(self): self._running = False logger.info("Pipeline 停止") def main(): parser = argparse.ArgumentParser(description="DualFlow 三模型统一管线") parser.add_argument( "--plant", default="xishan", choices=["xishan", "longting", "anzhen", "lankao"], help="水厂名称 (默认 xishan)", ) parser.add_argument( "--config", default=None, help="配置文件路径 (默认 core/config/pipeline_config.yaml)", ) args = parser.parse_args() pipeline = DualFlowPipeline(args.plant, args.config) def _signal_handler(sig, frame): logger.info("收到停止信号,正在退出...") pipeline.stop() sys.exit(0) signal.signal(signal.SIGINT, _signal_handler) signal.signal(signal.SIGTERM, _signal_handler) pipeline.start() if __name__ == "__main__": main()