pipeline.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. """
  2. DualFlow 三模型统一管线主入口。
  3. 启动 UF-RL(事件驱动)、异常诊断(定时)、RO CIP(每日/触发)三个服务线程。
  4. 用法:
  5. python -m core.pipeline # 默认 longting
  6. python -m core.pipeline --plant xishan # 指定水厂
  7. """
  8. import argparse
  9. import logging
  10. import signal
  11. import sys
  12. import threading
  13. import time
  14. from pathlib import Path
  15. # 确保 core/ 目录在 sys.path,使 from core.xxx 可用
  16. CORE_ROOT = Path(__file__).resolve().parent
  17. PROJECT_ROOT = CORE_ROOT.parent
  18. for p in (str(PROJECT_ROOT), str(CORE_ROOT)):
  19. if p not in sys.path:
  20. sys.path.insert(0, p)
  21. from core.config import PipelineConfig
  22. from core.data_provider import DataProvider
  23. from core.shared_state import SharedState
  24. from core.adapters.diagnosis_adapter import DiagnosisAdapter
  25. from core.adapters.ro_adapter import ROCIPAdapter
  26. from core.adapters.uf_adapter import UFAdapter
  27. logging.basicConfig(
  28. level=logging.INFO,
  29. format="%(asctime)s [%(threadName)s] %(levelname)s %(name)s: %(message)s",
  30. datefmt="%Y-%m-%d %H:%M:%S",
  31. )
  32. logger = logging.getLogger("pipeline")
  33. class DualFlowPipeline:
  34. def __init__(self, plant: str = "longting", config_path: str = None):
  35. self.plant = plant
  36. self.config = PipelineConfig(config_path=config_path, plant=plant)
  37. self.shared_state = SharedState()
  38. self.data_provider = DataProvider(self.config)
  39. # 初始化适配器
  40. self.uf = UFAdapter(plant, self.data_provider, self.shared_state, self.config)
  41. self.ro = ROCIPAdapter(plant, self.data_provider, self.shared_state, self.config)
  42. # 诊断模型可能不存在所有 plant 的配置
  43. self._diag_available = (self.config.diagnosis_root / plant / "config.yaml").exists()
  44. self.diag = None
  45. if self._diag_available:
  46. self.diag = DiagnosisAdapter(
  47. plant, self.data_provider, self.shared_state, self.config
  48. )
  49. self._threads: list = []
  50. self._running = False
  51. def start(self):
  52. self._running = True
  53. logger.info(f"=== DualFlow Pipeline 启动 (plant={self.plant}, dry_run={self.config.dry_run}) ===")
  54. # UF: 每个机组一个线程
  55. for unit in self.config.uf_units:
  56. t = threading.Thread(
  57. target=self.uf.run_unit_forever,
  58. args=(unit,),
  59. name=f"UF-{unit}",
  60. daemon=True,
  61. )
  62. self._threads.append(t)
  63. # 诊断: 定时线程
  64. if self.diag:
  65. t = threading.Thread(
  66. target=self.diag.run_loop,
  67. name="Diagnosis",
  68. daemon=True,
  69. )
  70. self._threads.append(t)
  71. else:
  72. logger.warning(f"plant={self.plant} 无诊断模型配置,跳过诊断线程")
  73. # RO: 调度线程
  74. t = threading.Thread(
  75. target=self.ro.run_scheduler,
  76. name="RO-CIP",
  77. daemon=True,
  78. )
  79. self._threads.append(t)
  80. # 启动所有线程
  81. for t in self._threads:
  82. t.start()
  83. logger.info(f"线程已启动: {t.name}")
  84. # 主线程:健康监控
  85. self._monitor()
  86. def _monitor(self):
  87. while self._running:
  88. for t in self._threads:
  89. if not t.is_alive():
  90. logger.error(f"线程已死亡: {t.name}")
  91. time.sleep(60)
  92. def stop(self):
  93. self._running = False
  94. logger.info("Pipeline 停止")
  95. def main():
  96. parser = argparse.ArgumentParser(description="DualFlow 三模型统一管线")
  97. parser.add_argument(
  98. "--plant",
  99. default="xishan",
  100. choices=["xishan", "longting", "anzhen", "lankao"],
  101. help="水厂名称 (默认 xishan)",
  102. )
  103. parser.add_argument(
  104. "--config",
  105. default=None,
  106. help="配置文件路径 (默认 core/config/pipeline_config.yaml)",
  107. )
  108. args = parser.parse_args()
  109. pipeline = DualFlowPipeline(args.plant, args.config)
  110. def _signal_handler(sig, frame):
  111. logger.info("收到停止信号,正在退出...")
  112. pipeline.stop()
  113. sys.exit(0)
  114. signal.signal(signal.SIGINT, _signal_handler)
  115. signal.signal(signal.SIGTERM, _signal_handler)
  116. pipeline.start()
  117. if __name__ == "__main__":
  118. main()