| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- """
- DualFlow 三模型统一管线主入口。
- 启动 UF-RL(事件驱动)、异常诊断(定时)、RO CIP(每日/触发)三个服务线程。
- 用法:
- python -m core.pipeline # 默认 longting
- python -m core.pipeline --plant xishan # 指定水厂
- """
- import argparse
- import logging
- import signal
- import sys
- import threading
- import time
- from pathlib import Path
- # 确保 core/ 目录在 sys.path,使 from core.xxx 可用
- CORE_ROOT = Path(__file__).resolve().parent
- PROJECT_ROOT = CORE_ROOT.parent
- for p in (str(PROJECT_ROOT), str(CORE_ROOT)):
- if p not in sys.path:
- sys.path.insert(0, p)
- from core.config import PipelineConfig
- from core.data_provider import DataProvider
- from core.shared_state import SharedState
- from core.adapters.diagnosis_adapter import DiagnosisAdapter
- from core.adapters.ro_adapter import ROCIPAdapter
- from core.adapters.uf_adapter import UFAdapter
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s [%(threadName)s] %(levelname)s %(name)s: %(message)s",
- datefmt="%Y-%m-%d %H:%M:%S",
- )
- logger = logging.getLogger("pipeline")
- class DualFlowPipeline:
- def __init__(self, plant: str = "longting", config_path: str = None):
- self.plant = plant
- self.config = PipelineConfig(config_path=config_path, plant=plant)
- self.shared_state = SharedState()
- self.data_provider = DataProvider(self.config)
- # 初始化适配器
- self.uf = UFAdapter(plant, self.data_provider, self.shared_state, self.config)
- self.ro = ROCIPAdapter(plant, self.data_provider, self.shared_state, self.config)
- # 诊断模型可能不存在所有 plant 的配置
- self._diag_available = (self.config.diagnosis_root / plant / "config.yaml").exists()
- self.diag = None
- if self._diag_available:
- self.diag = DiagnosisAdapter(
- plant, self.data_provider, self.shared_state, self.config
- )
- self._threads: list = []
- self._running = False
- def start(self):
- self._running = True
- logger.info(f"=== DualFlow Pipeline 启动 (plant={self.plant}, dry_run={self.config.dry_run}) ===")
- # UF: 每个机组一个线程
- for unit in self.config.uf_units:
- t = threading.Thread(
- target=self.uf.run_unit_forever,
- args=(unit,),
- name=f"UF-{unit}",
- daemon=True,
- )
- self._threads.append(t)
- # 诊断: 定时线程
- if self.diag:
- t = threading.Thread(
- target=self.diag.run_loop,
- name="Diagnosis",
- daemon=True,
- )
- self._threads.append(t)
- else:
- logger.warning(f"plant={self.plant} 无诊断模型配置,跳过诊断线程")
- # RO: 调度线程
- t = threading.Thread(
- target=self.ro.run_scheduler,
- name="RO-CIP",
- daemon=True,
- )
- self._threads.append(t)
- # 启动所有线程
- for t in self._threads:
- t.start()
- logger.info(f"线程已启动: {t.name}")
- # 主线程:健康监控
- self._monitor()
- def _monitor(self):
- while self._running:
- for t in self._threads:
- if not t.is_alive():
- logger.error(f"线程已死亡: {t.name}")
- time.sleep(60)
- def stop(self):
- self._running = False
- logger.info("Pipeline 停止")
- def main():
- parser = argparse.ArgumentParser(description="DualFlow 三模型统一管线")
- parser.add_argument(
- "--plant",
- default="xishan",
- choices=["xishan", "longting", "anzhen", "lankao"],
- help="水厂名称 (默认 xishan)",
- )
- parser.add_argument(
- "--config",
- default=None,
- help="配置文件路径 (默认 core/config/pipeline_config.yaml)",
- )
- args = parser.parse_args()
- pipeline = DualFlowPipeline(args.plant, args.config)
- def _signal_handler(sig, frame):
- logger.info("收到停止信号,正在退出...")
- pipeline.stop()
- sys.exit(0)
- signal.signal(signal.SIGINT, _signal_handler)
- signal.signal(signal.SIGTERM, _signal_handler)
- pipeline.start()
- if __name__ == "__main__":
- main()
|