smart_monitor.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. # -*- coding: utf-8 -*-
  2. """
  3. CIP监控系统 - 配置文件监控模式
  4. 功能:
  5. 1. 监控config.json中actual_time字段的变化
  6. 2. 检测到变化后,基于actual_time执行CIP预测
  7. 3. 将预测结果保存到predicted_time(仅用于记录)
  8. 4. 通过回调接口发送预测结果到决策系统
  9. 工作流程:
  10. 修改actual_time → 检测变化 → 执行预测 → 保存predicted_time → 发送回调 → 继续监控
  11. """
  12. import os
  13. import json
  14. import time
  15. import threading
  16. import pandas as pd
  17. import logging
  18. from logging.handlers import RotatingFileHandler
  19. from datetime import datetime
  20. from main_simple import main as run_cip_analysis
  21. from main_simple import send_decision_to_callback
  22. # 日志系统配置
  23. logger = logging.getLogger(__name__)
  24. logger.setLevel(logging.INFO)
  25. # 日志输出格式
  26. formatter = logging.Formatter(
  27. '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
  28. datefmt='%Y-%m-%d %H:%M:%S'
  29. )
  30. # 文件日志处理器,单个文件最大5MB,保留3个备份
  31. file_handler = RotatingFileHandler('smart_monitor.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8')
  32. file_handler.setFormatter(formatter)
  33. # 控制台日志处理器
  34. console_handler = logging.StreamHandler()
  35. console_handler.setFormatter(formatter)
  36. # 添加处理器
  37. logger.addHandler(file_handler)
  38. logger.addHandler(console_handler)
  39. class SmartCIPMonitor:
  40. """
  41. CIP监控器
  42. 核心功能:
  43. 1. 监控config.json中各机组的actual_time字段
  44. 2. 检测到时间变化后,自动触发CIP预测
  45. 3. 保存predicted_time到配置文件(仅用于记录)
  46. 4. 发送预测结果到回调接口
  47. 属性:
  48. config_path: 配置文件路径
  49. config: 配置文件内容
  50. running: 监控运行状态
  51. monitor_thread: 监控线程
  52. unit_names: 机组名称列表
  53. last_seen_actual_times: 记录上次看到的各机组actual_time,用于检测变化
  54. """
  55. def __init__(self):
  56. """
  57. 初始化监控系统
  58. 初始化流程:
  59. 1. 设置配置文件路径
  60. 2. 加载配置文件
  61. 3. 初始化监控基准时间
  62. """
  63. self.config_path = os.path.join(os.path.dirname(__file__), 'config.json')
  64. self.config = None
  65. self.running = False
  66. self.monitor_thread = None
  67. self.unit_names = ['RO1', 'RO2', 'RO3', 'RO4']
  68. # 记录上次看到的actual_time,用于检测变化
  69. # 格式:{'RO1': '2025-12-01 12:00:00', 'RO2': '...'}
  70. self.last_seen_actual_times = {}
  71. if not self.load_config():
  72. logger.error("配置文件加载失败")
  73. else:
  74. self.initialize_last_seen_times()
  75. def load_config(self):
  76. """
  77. 加载配置文件
  78. Returns:
  79. bool: 加载成功返回True
  80. """
  81. try:
  82. with open(self.config_path, 'r', encoding='utf-8') as f:
  83. self.config = json.load(f)
  84. return True
  85. except Exception as e:
  86. logger.error(f"配置文件加载失败: {e}")
  87. return False
  88. def save_config(self):
  89. """
  90. 保存配置文件
  91. Returns:
  92. bool: 保存成功返回True
  93. """
  94. try:
  95. with open(self.config_path, 'w', encoding='utf-8') as f:
  96. json.dump(self.config, f, ensure_ascii=False, indent=2)
  97. return True
  98. except Exception as e:
  99. logger.error(f"配置文件保存失败: {e}")
  100. return False
  101. def initialize_last_seen_times(self):
  102. """
  103. 初始化监控基准时间
  104. 功能:
  105. 读取config.json中各机组当前的actual_time作为监控基准
  106. 后续只有当actual_time与此基准不同时才触发预测
  107. 说明:
  108. - last_seen_actual_times用于防止重复处理相同的时间
  109. - 支持新旧配置格式兼容
  110. """
  111. if not self.config or 'cip_times' not in self.config:
  112. return
  113. for unit_name in self.unit_names:
  114. if unit_name in self.config['cip_times']:
  115. unit_data = self.config['cip_times'][unit_name]
  116. if isinstance(unit_data, dict) and 'actual_time' in unit_data:
  117. # 新格式:{'actual_time': '...', 'predicted_time': '...'}
  118. self.last_seen_actual_times[unit_name] = unit_data['actual_time']
  119. elif isinstance(unit_data, str):
  120. # 兼容旧格式:直接是时间字符串
  121. self.last_seen_actual_times[unit_name] = unit_data
  122. def check_for_changes(self):
  123. """
  124. 检查actual_time是否有变化
  125. 功能:
  126. 比对当前配置文件中的actual_time和上次记录的值
  127. 发现不同则判定为有变化,需要触发预测
  128. Returns:
  129. list: 有变化的机组列表,格式 [(unit_name, new_actual_time), ...]
  130. 例如:[('RO1', '2025-10-12 12:00:00'), ('RO3', '2025-10-15 08:00:00')]
  131. 说明:
  132. - 只有actual_time变化才会触发
  133. - predicted_time的变化不会触发
  134. """
  135. changes = []
  136. if not self.config or 'cip_times' not in self.config:
  137. return changes
  138. for unit_name in self.unit_names:
  139. if unit_name in self.config['cip_times']:
  140. unit_data = self.config['cip_times'][unit_name]
  141. current_actual_time = None
  142. if isinstance(unit_data, dict) and 'actual_time' in unit_data:
  143. # 新格式:读取actual_time字段
  144. current_actual_time = unit_data['actual_time']
  145. elif isinstance(unit_data, str):
  146. # 兼容旧格式:整个值就是时间
  147. current_actual_time = unit_data
  148. # 检查是否有变化
  149. if current_actual_time:
  150. last_actual_time = self.last_seen_actual_times.get(unit_name)
  151. if last_actual_time != current_actual_time:
  152. changes.append((unit_name, current_actual_time))
  153. logger.info(f"检测到{unit_name}实际CIP时间变化: 旧值={last_actual_time}, 新值={current_actual_time}")
  154. return changes
  155. def predict_and_update_unit(self, unit_name, start_date_str):
  156. """
  157. 预测单个机组并保存结果到配置
  158. 功能:
  159. 1. 调用main_simple.py中的预测函数
  160. 2. 提取预测结果中的CIP时机
  161. 3. 保存predicted_time到配置文件(仅用于记录,不参与任何计算)
  162. 4. 发送预测结果到回调接口
  163. Args:
  164. unit_name: str,机组名称,如'RO1', 'RO2', 'RO3', 'RO4'
  165. start_date_str: str,预测起始时间(即actual_time),格式'YYYY-MM-DD HH:MM:SS'
  166. Returns:
  167. bool: 预测成功返回True,失败返回False
  168. 说明:
  169. - predicted_time仅用于记录,不影响下次预测的触发
  170. - 下次预测只由actual_time的变化触发
  171. """
  172. try:
  173. logger.info(f"[{unit_name}] 开始预测,基于实际CIP时间: {start_date_str}")
  174. # 步骤1:调用预测逻辑
  175. result_df = run_cip_analysis(strategy=3, start_date=start_date_str, unit_filter=unit_name)
  176. if result_df.empty:
  177. logger.warning(f"{unit_name} 预测失败: 无结果")
  178. return False
  179. # 步骤2:提取该机组的结果
  180. unit_result = result_df[result_df['机组类型'] == unit_name]
  181. if unit_result.empty:
  182. logger.warning(f"{unit_name} 预测失败: 结果中无此机组")
  183. return False
  184. cip_time = unit_result.iloc[0]['CIP时机']
  185. if pd.isna(cip_time):
  186. logger.warning(f"{unit_name} 预测失败: CIP时机为空")
  187. return False
  188. # 步骤3:保存预测时间到配置文件(仅用于记录)
  189. predicted_time_str = cip_time.strftime('%Y-%m-%d %H:%M:%S')
  190. if unit_name not in self.config['cip_times']:
  191. # 配置中不存在该机组,创建新记录
  192. self.config['cip_times'][unit_name] = {
  193. 'actual_time': start_date_str,
  194. 'predicted_time': predicted_time_str
  195. }
  196. else:
  197. # 更新预测时间,保持actual_time不变
  198. if not isinstance(self.config['cip_times'][unit_name], dict):
  199. # 兼容旧格式:转换为新格式
  200. self.config['cip_times'][unit_name] = {
  201. 'actual_time': self.config['cip_times'][unit_name],
  202. 'predicted_time': predicted_time_str
  203. }
  204. else:
  205. # 只更新predicted_time,不修改actual_time
  206. self.config['cip_times'][unit_name]['predicted_time'] = predicted_time_str
  207. if not self.save_config():
  208. logger.error(f"[{unit_name}] 配置保存失败")
  209. return False
  210. logger.info(f"[{unit_name}] 预测成功 → {predicted_time_str}")
  211. # 步骤4:发送预测结果到回调接口
  212. try:
  213. if send_decision_to_callback(result_df):
  214. logger.info(f"[{unit_name}] 回调发送成功")
  215. else:
  216. logger.error(f"[{unit_name}] 回调发送失败")
  217. except Exception as callback_error:
  218. logger.error(f"[{unit_name}] 回调发送异常: {callback_error}")
  219. return True
  220. except Exception as e:
  221. logger.error(f"[{unit_name}] 预测失败: {e}", exc_info=True)
  222. return False
  223. def process_changes(self, changes):
  224. """
  225. 处理检测到的actual_time变化
  226. 功能:
  227. 遍历所有检测到的变化,逐个执行预测并更新监控基准
  228. Args:
  229. changes: list,变化列表,格式 [(unit_name, new_actual_time), ...]
  230. 例如:[('RO1', '2025-10-12 12:00:00')]
  231. 处理流程:
  232. 1. 执行预测:调用predict_and_update_unit
  233. 2. 更新监控基准:将新的actual_time记录到last_seen_actual_times
  234. 3. 重新加载配置:确保获取最新数据
  235. 4. 短暂延时:避免频繁操作
  236. """
  237. for unit_name, new_actual_time in changes:
  238. # 执行预测
  239. if self.predict_and_update_unit(unit_name, new_actual_time):
  240. # 更新监控基准,防止重复处理
  241. self.last_seen_actual_times[unit_name] = new_actual_time
  242. logger.info(f"[{unit_name}] 处理完成")
  243. else:
  244. logger.error(f"[{unit_name}] 处理失败")
  245. # 重新加载配置,确保有最新数据
  246. self.load_config()
  247. time.sleep(2) # 短暂延时,避免频繁操作
  248. def monitor_loop(self):
  249. """
  250. 监控主循环
  251. 功能:
  252. 1. 定期重新加载config.json(每5秒)
  253. 2. 检查actual_time是否有变化
  254. 3. 发现变化后自动触发预测
  255. 4. 发送预测结果到回调接口
  256. 监控频率:
  257. - 每5秒检查一次配置文件
  258. - 发现变化立即处理
  259. - 处理失败不影响后续监控
  260. 说明:
  261. - 只监控actual_time字段的变化
  262. - predicted_time的变化不会触发任何操作
  263. - 相同的actual_time不会重复触发
  264. """
  265. logger.info("开始监控 config.json 变化(每5秒检查一次)")
  266. while self.running:
  267. try:
  268. # 步骤1:重新加载配置
  269. if not self.load_config():
  270. logger.error("配置加载失败,10秒后重试...")
  271. time.sleep(10)
  272. continue
  273. # 步骤2:检查是否有变化
  274. changes = self.check_for_changes()
  275. if changes:
  276. # 发现变化,立即处理
  277. self.process_changes(changes)
  278. # 步骤3:等待一段时间后再次检查
  279. time.sleep(5) # 每5秒检查一次配置文件
  280. except Exception as e:
  281. logger.error(f"监控循环出错: {e}", exc_info=True)
  282. time.sleep(10) # 出错后等待10秒再继续
  283. def start(self):
  284. """
  285. 启动监控系统
  286. """
  287. if self.running:
  288. logger.warning("监控系统已在运行")
  289. return
  290. logger.info("CIP监控系统启动(监控 config.json 中的 actual_time 变化)")
  291. # 启动监控线程
  292. self.running = True
  293. self.monitor_thread = threading.Thread(target=self.monitor_loop, daemon=True)
  294. self.monitor_thread.start()
  295. try:
  296. while self.running:
  297. time.sleep(1)
  298. except KeyboardInterrupt:
  299. self.stop()
  300. def stop(self):
  301. """
  302. 停止监控系统
  303. """
  304. logger.info("停止监控系统...")
  305. self.running = False
  306. if self.monitor_thread and self.monitor_thread.is_alive():
  307. self.monitor_thread.join(timeout=5)
  308. logger.info("监控系统已停止")
  309. def main():
  310. """
  311. 主函数
  312. 功能:启动监控系统
  313. """
  314. monitor = SmartCIPMonitor()
  315. try:
  316. monitor.start()
  317. except Exception as e:
  318. logger.error(f"系统运行出错: {e}", exc_info=True)
  319. finally:
  320. if monitor.running:
  321. monitor.stop()
  322. if __name__ == '__main__':
  323. main()