smart_monitor.py 20 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. CIP监控系统 - 配置文件监控模式(支持分段监控)
  4. 功能:
  5. 1. 监控config.json中各机组段(一段、二段)的actual_time字段变化
  6. 2. 检测到变化且actual_time < 当前时间时,基于actual_time执行CIP预测
  7. 3. 将预测结果保存到对应段的predicted_time(仅用于记录)
  8. 4. 通过回调接口发送预测结果到决策系统
  9. 5. 继续等待actual_time的下次更新
  10. 触发条件(同时满足):
  11. - actual_time 发生了变化
  12. - actual_time < 当前时间(已过期)
  13. 工作流程:
  14. 修改actual_time(且 < 当前时间)→ 检测变化 → 执行预测 → 保存predicted_time → 发送回调 → 继续监控actual_time
  15. 分段监控说明:
  16. - 支持独立监控每个段(RO1-一段、RO1-二段等)
  17. - 每个段有独立的actual_time和predicted_time
  18. - 可以为不同段设置不同的CIP时间
  19. - 预测时会分别输出每个段的CIP时机
  20. """
  21. import os
  22. import json
  23. import time
  24. import threading
  25. import pandas as pd
  26. import logging
  27. from logging.handlers import RotatingFileHandler
  28. from datetime import datetime
  29. from main_simple import main as run_cip_analysis
  30. from main_simple import send_decision_to_callback
  31. # 日志系统配置
  32. logger = logging.getLogger(__name__)
  33. logger.setLevel(logging.INFO)
  34. # 禁止传播到root logger,避免重复输出
  35. logger.propagate = False
  36. # 日志输出格式
  37. formatter = logging.Formatter(
  38. '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
  39. datefmt='%Y-%m-%d %H:%M:%S'
  40. )
  41. # 清除已有的处理器(防止重复添加)
  42. if logger.handlers:
  43. for handler in logger.handlers[:]:
  44. handler.close()
  45. logger.removeHandler(handler)
  46. # 文件日志处理器,单个文件最大5MB,保留3个备份
  47. file_handler = RotatingFileHandler('smart_monitor.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8')
  48. file_handler.setFormatter(formatter)
  49. # 控制台日志处理器
  50. console_handler = logging.StreamHandler()
  51. console_handler.setFormatter(formatter)
  52. # 添加处理器
  53. logger.addHandler(file_handler)
  54. logger.addHandler(console_handler)
  55. class SmartCIPMonitor:
  56. """
  57. CIP监控器(支持分段监控)
  58. 核心功能:
  59. 1. 监控config.json中各机组段的actual_time字段
  60. 2. 检测到时间变化后,自动触发CIP预测
  61. 3. 保存predicted_time到配置文件(仅用于记录)
  62. 4. 发送预测结果到回调接口
  63. 属性:
  64. config_path: 配置文件路径
  65. config: 配置文件内容
  66. running: 监控运行状态
  67. monitor_thread: 监控线程
  68. unit_names: 机组段名称列表(如['RO1-一段', 'RO1-二段', ...])
  69. last_seen_actual_times: 记录上次看到的各机组段actual_time,用于检测变化
  70. """
  71. def __init__(self):
  72. """
  73. 初始化监控系统
  74. 初始化流程:
  75. 1. 设置配置文件路径
  76. 2. 加载配置文件
  77. 3. 初始化监控基准时间
  78. """
  79. self.config_path = os.path.join(os.path.dirname(__file__), 'config.json')
  80. self.config = None
  81. self.running = False
  82. self.monitor_thread = None
  83. # 支持分段监控:每个机组包含一段和二段
  84. self.unit_names = [
  85. 'RO1-一段', 'RO1-二段',
  86. 'RO2-一段', 'RO2-二段',
  87. 'RO3-一段', 'RO3-二段',
  88. 'RO4-一段', 'RO4-二段'
  89. ]
  90. # 记录上次看到的actual_time,用于检测变化
  91. # 格式:{'RO1-一段': '2025-12-01 12:00:00', 'RO1-二段': '...', ...}
  92. self.last_seen_actual_times = {}
  93. if not self.load_config():
  94. logger.error("配置文件加载失败")
  95. else:
  96. self.initialize_last_seen_times()
  97. def load_config(self):
  98. """
  99. 加载配置文件
  100. Returns:
  101. bool: 加载成功返回True
  102. """
  103. try:
  104. with open(self.config_path, 'r', encoding='utf-8') as f:
  105. self.config = json.load(f)
  106. return True
  107. except Exception as e:
  108. logger.error(f"配置文件加载失败: {e}")
  109. return False
  110. def save_config(self):
  111. """
  112. 保存配置文件
  113. Returns:
  114. bool: 保存成功返回True
  115. """
  116. try:
  117. with open(self.config_path, 'w', encoding='utf-8') as f:
  118. json.dump(self.config, f, ensure_ascii=False, indent=2)
  119. return True
  120. except Exception as e:
  121. logger.error(f"配置文件保存失败: {e}")
  122. return False
  123. def initialize_last_seen_times(self):
  124. """
  125. 初始化监控基准时间(支持分段)
  126. 功能:
  127. 读取config.json中各机组段当前的actual_time作为监控基准
  128. 但是:只记录未来的时间,已过期的时间不记录,以便首次检查时能触发预测
  129. 说明:
  130. - last_seen_actual_times用于防止重复处理相同的时间
  131. - 支持分段配置(如'RO1-一段', 'RO1-二段')
  132. - 支持新旧配置格式兼容
  133. - 已过期的时间(< 当前时间)不记录,确保首次检查时能被检测为"变化"
  134. """
  135. if not self.config or 'cip_times' not in self.config:
  136. return
  137. current_time = datetime.now()
  138. for unit_name in self.unit_names:
  139. if unit_name in self.config['cip_times']:
  140. unit_data = self.config['cip_times'][unit_name]
  141. actual_time_str = None
  142. if isinstance(unit_data, dict) and 'actual_time' in unit_data:
  143. # 新格式:{'actual_time': '...', 'predicted_time': '...'}
  144. actual_time_str = unit_data['actual_time']
  145. elif isinstance(unit_data, str):
  146. # 兼容旧格式:直接是时间字符串
  147. actual_time_str = unit_data
  148. # 只记录未来的时间,已过期的时间不记录
  149. if actual_time_str:
  150. actual_time_dt = self.parse_time_string(actual_time_str)
  151. if actual_time_dt and actual_time_dt >= current_time:
  152. # 未来时间:记录基准,避免触发预测
  153. self.last_seen_actual_times[unit_name] = actual_time_str
  154. logger.info(f"{unit_name}: 未来时间,不触发预测")
  155. elif actual_time_dt:
  156. # 已过期时间:不记录基准,让首次检查时能检测到变化
  157. logger.info(f"{unit_name}: 已过期,待触发预测")
  158. # 如果为None则不记录
  159. def parse_time_string(self, time_str):
  160. """
  161. 解析时间字符串为datetime对象
  162. Args:
  163. time_str: str,时间字符串,格式'YYYY-MM-DD HH:MM:SS'
  164. Returns:
  165. datetime: 解析后的datetime对象,解析失败返回None
  166. """
  167. if not time_str or not isinstance(time_str, str):
  168. return None
  169. try:
  170. return datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
  171. except Exception as e:
  172. logger.warning(f"时间字符串解析失败: {time_str}, 错误: {e}")
  173. return None
  174. def check_for_changes(self):
  175. """
  176. 检查actual_time是否有变化且小于当前时间(支持分段)
  177. 功能:
  178. 比对当前配置文件中的actual_time和上次记录的值
  179. 发现不同且小于当前时间则判定为有变化,需要触发预测
  180. Returns:
  181. list: 有变化的机组段列表,格式 [(unit_name, new_actual_time), ...]
  182. 例如:[('RO1-一段', '2025-10-12 12:00:00'), ('RO1-二段', '2025-10-15 08:00:00')]
  183. 说明:
  184. - 只有actual_time变化才会触发
  185. - 必须满足 actual_time < 当前时间
  186. - predicted_time的变化不会触发
  187. - 支持分段独立监控(每个段可以有不同的actual_time)
  188. """
  189. changes = []
  190. current_time = datetime.now()
  191. if not self.config or 'cip_times' not in self.config:
  192. return changes
  193. for unit_name in self.unit_names:
  194. if unit_name in self.config['cip_times']:
  195. unit_data = self.config['cip_times'][unit_name]
  196. current_actual_time = None
  197. if isinstance(unit_data, dict) and 'actual_time' in unit_data:
  198. # 新格式:读取actual_time字段
  199. current_actual_time = unit_data['actual_time']
  200. elif isinstance(unit_data, str):
  201. # 兼容旧格式:整个值就是时间
  202. current_actual_time = unit_data
  203. # 检查是否有变化
  204. if current_actual_time:
  205. last_actual_time = self.last_seen_actual_times.get(unit_name)
  206. # 条件1:actual_time 发生了变化
  207. if last_actual_time != current_actual_time:
  208. # 条件2:actual_time 小于当前时间
  209. actual_time_dt = self.parse_time_string(current_actual_time)
  210. if actual_time_dt is None:
  211. logger.warning(f"{unit_name} actual_time 格式无效: {current_actual_time}")
  212. continue
  213. if actual_time_dt < current_time:
  214. changes.append((unit_name, current_actual_time))
  215. logger.info(f"{unit_name}: 检测到变化,触发预测 ({current_actual_time})")
  216. else:
  217. logger.info(f"{unit_name}: 未来时间,不触发 ({current_actual_time})")
  218. # 更新监控基准,避免重复提示
  219. self.last_seen_actual_times[unit_name] = current_actual_time
  220. return changes
  221. def extract_unit_base_name(self, unit_name):
  222. """
  223. 从分段机组名称中提取基础机组号
  224. Args:
  225. unit_name: str,分段机组名称,如'RO1-一段'
  226. Returns:
  227. str: 基础机组号,如'RO1'
  228. 示例:
  229. 'RO1-一段' -> 'RO1'
  230. 'RO2-二段' -> 'RO2'
  231. """
  232. if '-' in unit_name:
  233. return unit_name.split('-')[0]
  234. return unit_name
  235. def predict_and_update_unit(self, unit_name, start_date_str):
  236. """
  237. 预测并只处理触发段的结果(严格控制:谁触发只处理谁)
  238. 功能:
  239. 1. 调用预测函数(会预测整个机组的所有段,这是模型限制)
  240. 2. 只提取触发段的结果(其他段的结果丢弃)
  241. 3. 只保存触发段的predicted_time到配置文件
  242. 4. 只发送触发段的回调
  243. 5. 其他未触发的段等它们自己触发后再处理
  244. Args:
  245. unit_name: str,触发的机组段名称,如'RO1-一段', 'RO2-二段'
  246. start_date_str: str,预测起始时间(即actual_time),格式'YYYY-MM-DD HH:MM:SS'
  247. Returns:
  248. bool: 预测成功返回True,失败返回False
  249. 说明:
  250. - 严格的分段控制:只处理触发段,其他段不保存、不发送
  251. - predicted_time仅用于记录,不影响下次预测的触发
  252. - 下次预测只由actual_time的变化触发
  253. 示例流程:
  254. 1. RO1-一段触发 → 预测RO1 → 只处理RO1-一段 → RO1-二段丢弃
  255. 2. 稍后RO1-二段触发 → 预测RO1 → 只处理RO1-二段 → RO1-一段丢弃(已处理过)
  256. """
  257. try:
  258. logger.info(f"[{unit_name}] 开始预测 (基于 {start_date_str})")
  259. # 步骤1:提取基础机组号(用于unit_filter参数)
  260. base_unit_name = self.extract_unit_base_name(unit_name)
  261. # 步骤2:调用预测逻辑(会预测整个机组的所有段,这是模型限制)
  262. # 禁用 main() 内部的回调发送,由 smart_monitor 统一管理
  263. logger.info(f"[{unit_name}] 执行预测 (unit_filter={base_unit_name},会预测整个机组)")
  264. result_df = run_cip_analysis(
  265. strategy=3,
  266. start_date=start_date_str,
  267. unit_filter=base_unit_name,
  268. separate_stages=True, # 明确指定分段输出
  269. send_callback=False # 禁用内部回调,避免重复发送
  270. )
  271. if result_df.empty:
  272. logger.warning(f"[{unit_name}] 预测失败: 无结果")
  273. return False
  274. # 显示预测得到的所有段
  275. all_predicted_stages = result_df['机组类型'].tolist()
  276. logger.info(f"[{unit_name}] 预测完成,得到 {len(all_predicted_stages)} 个段: {all_predicted_stages}")
  277. # 步骤3:只提取触发段的结果(关键:只处理触发的段!)
  278. logger.info(f"[{unit_name}] 只提取触发段 '{unit_name}' 的结果,其他段丢弃")
  279. unit_result = result_df[result_df['机组类型'] == unit_name]
  280. if unit_result.empty:
  281. logger.warning(f"[{unit_name}] 预测失败: 结果中无此机组段")
  282. logger.info(f"[{unit_name}] 可用的机组类型: {result_df['机组类型'].tolist()}")
  283. return False
  284. cip_time = unit_result.iloc[0]['CIP时机']
  285. if pd.isna(cip_time):
  286. logger.warning(f"[{unit_name}] 预测失败: CIP时机为空")
  287. return False
  288. # 步骤4:只保存触发段的预测时间到配置文件(其他段不保存)
  289. predicted_time_str = cip_time.strftime('%Y-%m-%d %H:%M:%S')
  290. logger.info(f"[{unit_name}] 只保存触发段的 predicted_time: {predicted_time_str}")
  291. if unit_name not in self.config['cip_times']:
  292. # 配置中不存在该机组段,创建新记录
  293. self.config['cip_times'][unit_name] = {
  294. 'actual_time': start_date_str,
  295. 'predicted_time': predicted_time_str
  296. }
  297. else:
  298. # 更新预测时间,保持actual_time不变
  299. if not isinstance(self.config['cip_times'][unit_name], dict):
  300. # 兼容旧格式:转换为新格式
  301. self.config['cip_times'][unit_name] = {
  302. 'actual_time': self.config['cip_times'][unit_name],
  303. 'predicted_time': predicted_time_str
  304. }
  305. else:
  306. # 只更新predicted_time,不修改actual_time
  307. self.config['cip_times'][unit_name]['predicted_time'] = predicted_time_str
  308. if not self.save_config():
  309. logger.error(f"[{unit_name}] 配置保存失败")
  310. return False
  311. logger.info(f"[{unit_name}] 预测成功 → {predicted_time_str}")
  312. # 步骤5:只发送触发段的回调(其他段不发送)
  313. logger.info(f"[{unit_name}] 只发送触发段的回调")
  314. send_decision_to_callback(unit_result)
  315. # 明确说明哪些段被丢弃了
  316. other_stages = [s for s in all_predicted_stages if s != unit_name]
  317. if other_stages:
  318. logger.info(f"[{unit_name}] 其他段的结果已丢弃,等它们自己触发后再处理: {other_stages}")
  319. return True
  320. except Exception as e:
  321. logger.error(f"[{unit_name}] 预测失败: {e}", exc_info=True)
  322. return False
  323. def process_changes(self, changes):
  324. """
  325. 处理检测到的actual_time变化
  326. 功能:
  327. 遍历所有检测到的变化,逐个执行预测并更新监控基准
  328. Args:
  329. changes: list,变化列表,格式 [(unit_name, new_actual_time), ...]
  330. 例如:[('RO1', '2025-10-12 12:00:00')]
  331. 处理流程:
  332. 1. 执行预测:调用predict_and_update_unit
  333. 2. 更新监控基准:将新的actual_time记录到last_seen_actual_times(无论成功失败)
  334. 3. 重新加载配置:确保获取最新数据
  335. 4. 短暂延时:避免频繁操作
  336. """
  337. for unit_name, new_actual_time in changes:
  338. # 执行预测
  339. success = self.predict_and_update_unit(unit_name, new_actual_time)
  340. # 无论预测成功还是失败,都要更新监控基准,避免无限循环
  341. self.last_seen_actual_times[unit_name] = new_actual_time
  342. if not success:
  343. logger.error(f"[{unit_name}] 处理失败,已记录时间避免重复")
  344. # 重新加载配置,确保有最新数据
  345. self.load_config()
  346. time.sleep(2) # 短暂延时,避免频繁操作
  347. def monitor_loop(self):
  348. """
  349. 监控主循环
  350. 功能:
  351. 1. 定期重新加载config.json(每5秒)
  352. 2. 检查actual_time是否有变化且小于当前时间
  353. 3. 发现满足条件的变化后自动触发预测
  354. 4. 发送预测结果到回调接口
  355. 监控频率:
  356. - 每5秒检查一次配置文件
  357. - 发现变化立即处理
  358. - 处理失败不影响后续监控
  359. 触发条件(同时满足):
  360. - actual_time 发生了变化
  361. - actual_time < 当前时间(已过期)
  362. 说明:
  363. - 只监控actual_time字段的变化
  364. - predicted_time的变化不会触发任何操作
  365. - 相同的actual_time不会重复触发
  366. - 未来时间的actual_time不会触发预测
  367. """
  368. logger.info("监控启动 (每5秒检查一次,触发条件: actual_time变化且<当前时间)")
  369. while self.running:
  370. try:
  371. # 步骤1:重新加载配置
  372. if not self.load_config():
  373. logger.error("配置加载失败,10秒后重试...")
  374. time.sleep(10)
  375. continue
  376. # 步骤2:检查是否有变化
  377. changes = self.check_for_changes()
  378. if changes:
  379. # 发现变化,立即处理
  380. self.process_changes(changes)
  381. # 步骤3:等待一段时间后再次检查
  382. time.sleep(5) # 每5秒检查一次配置文件
  383. except Exception as e:
  384. logger.error(f"监控循环出错: {e}", exc_info=True)
  385. time.sleep(10) # 出错后等待10秒再继续
  386. def start(self):
  387. """
  388. 启动监控系统
  389. """
  390. if self.running:
  391. logger.warning("监控系统已在运行")
  392. return
  393. logger.info("CIP监控系统启动(监控 config.json 中的 actual_time 变化)")
  394. # 启动监控线程
  395. self.running = True
  396. self.monitor_thread = threading.Thread(target=self.monitor_loop, daemon=True)
  397. self.monitor_thread.start()
  398. try:
  399. while self.running:
  400. time.sleep(1)
  401. except KeyboardInterrupt:
  402. self.stop()
  403. def stop(self):
  404. """
  405. 停止监控系统
  406. """
  407. logger.info("停止监控系统...")
  408. self.running = False
  409. if self.monitor_thread and self.monitor_thread.is_alive():
  410. self.monitor_thread.join(timeout=5)
  411. logger.info("监控系统已停止")
  412. def main():
  413. """
  414. 主函数
  415. 功能:启动监控系统
  416. """
  417. monitor = SmartCIPMonitor()
  418. try:
  419. monitor.start()
  420. except Exception as e:
  421. logger.error(f"系统运行出错: {e}", exc_info=True)
  422. finally:
  423. if monitor.running:
  424. monitor.stop()
  425. if __name__ == '__main__':
  426. main()