smart_monitor.py 15 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. CIP监控系统 - 配置文件监控模式
  4. 功能:
  5. 1. 监控config.json中actual_time字段的变化
  6. 2. 检测到变化后,基于actual_time执行CIP预测
  7. 3. 将预测结果保存到predicted_time(仅用于记录)
  8. 4. 通过回调接口发送预测结果到决策系统
  9. 工作流程:
  10. 修改actual_time → 检测变化 → 执行预测 → 保存predicted_time → 发送回调 → 继续监控
  11. """
  12. import os
  13. import json
  14. import time
  15. import threading
  16. import pandas as pd
  17. from datetime import datetime
  18. from main_simple import main as run_cip_analysis
  19. from main_simple import send_decision_to_callback
  20. class SmartCIPMonitor:
  21. """
  22. CIP监控器
  23. 核心功能:
  24. 1. 监控config.json中各机组的actual_time字段
  25. 2. 检测到时间变化后,自动触发CIP预测
  26. 3. 保存predicted_time到配置文件(仅用于记录)
  27. 4. 发送预测结果到回调接口
  28. 属性:
  29. config_path: 配置文件路径
  30. config: 配置文件内容
  31. running: 监控运行状态
  32. monitor_thread: 监控线程
  33. unit_names: 机组名称列表
  34. last_seen_actual_times: 记录上次看到的各机组actual_time,用于检测变化
  35. """
  36. def __init__(self):
  37. """
  38. 初始化监控系统
  39. 初始化流程:
  40. 1. 设置配置文件路径
  41. 2. 加载配置文件
  42. 3. 初始化监控基准时间
  43. """
  44. self.config_path = os.path.join(os.path.dirname(__file__), 'config.json')
  45. self.config = None
  46. self.running = False
  47. self.monitor_thread = None
  48. self.unit_names = ['RO1', 'RO2', 'RO3', 'RO4']
  49. # 记录上次看到的actual_time,用于检测变化
  50. # 格式:{'RO1': '2025-12-01 12:00:00', 'RO2': '...'}
  51. self.last_seen_actual_times = {}
  52. print("初始化CIP监控系统...")
  53. if not self.load_config():
  54. print("配置加载失败")
  55. else:
  56. print("配置加载成功")
  57. self.initialize_last_seen_times()
  58. def load_config(self):
  59. """
  60. 加载配置文件
  61. Returns:
  62. bool: 加载成功返回True
  63. """
  64. try:
  65. with open(self.config_path, 'r', encoding='utf-8') as f:
  66. self.config = json.load(f)
  67. return True
  68. except Exception as e:
  69. print(f"配置文件加载失败: {e}")
  70. return False
  71. def save_config(self):
  72. """
  73. 保存配置文件
  74. Returns:
  75. bool: 保存成功返回True
  76. """
  77. try:
  78. with open(self.config_path, 'w', encoding='utf-8') as f:
  79. json.dump(self.config, f, ensure_ascii=False, indent=2)
  80. return True
  81. except Exception as e:
  82. print(f"配置文件保存失败: {e}")
  83. return False
  84. def initialize_last_seen_times(self):
  85. """
  86. 初始化监控基准时间
  87. 功能:
  88. 读取config.json中各机组当前的actual_time作为监控基准
  89. 后续只有当actual_time与此基准不同时才触发预测
  90. 说明:
  91. - last_seen_actual_times用于防止重复处理相同的时间
  92. - 支持新旧配置格式兼容
  93. """
  94. if not self.config or 'cip_times' not in self.config:
  95. return
  96. for unit_name in self.unit_names:
  97. if unit_name in self.config['cip_times']:
  98. unit_data = self.config['cip_times'][unit_name]
  99. if isinstance(unit_data, dict) and 'actual_time' in unit_data:
  100. # 新格式:{'actual_time': '...', 'predicted_time': '...'}
  101. self.last_seen_actual_times[unit_name] = unit_data['actual_time']
  102. elif isinstance(unit_data, str):
  103. # 兼容旧格式:直接是时间字符串
  104. self.last_seen_actual_times[unit_name] = unit_data
  105. print(f"已初始化监控基准时间: {self.last_seen_actual_times}")
  106. def check_for_changes(self):
  107. """
  108. 检查actual_time是否有变化
  109. 功能:
  110. 比对当前配置文件中的actual_time和上次记录的值
  111. 发现不同则判定为有变化,需要触发预测
  112. Returns:
  113. list: 有变化的机组列表,格式 [(unit_name, new_actual_time), ...]
  114. 例如:[('RO1', '2025-10-12 12:00:00'), ('RO3', '2025-10-15 08:00:00')]
  115. 说明:
  116. - 只有actual_time变化才会触发
  117. - predicted_time的变化不会触发
  118. """
  119. changes = []
  120. if not self.config or 'cip_times' not in self.config:
  121. return changes
  122. for unit_name in self.unit_names:
  123. if unit_name in self.config['cip_times']:
  124. unit_data = self.config['cip_times'][unit_name]
  125. current_actual_time = None
  126. if isinstance(unit_data, dict) and 'actual_time' in unit_data:
  127. # 新格式:读取actual_time字段
  128. current_actual_time = unit_data['actual_time']
  129. elif isinstance(unit_data, str):
  130. # 兼容旧格式:整个值就是时间
  131. current_actual_time = unit_data
  132. # 检查是否有变化
  133. if current_actual_time:
  134. last_actual_time = self.last_seen_actual_times.get(unit_name)
  135. if last_actual_time != current_actual_time:
  136. changes.append((unit_name, current_actual_time))
  137. print(f"\n检测到{unit_name}实际CIP时间变化:")
  138. print(f" 旧值: {last_actual_time}")
  139. print(f" 新值: {current_actual_time}")
  140. return changes
  141. def predict_and_update_unit(self, unit_name, start_date_str):
  142. """
  143. 预测单个机组并保存结果到配置
  144. 功能:
  145. 1. 调用main_simple.py中的预测函数
  146. 2. 提取预测结果中的CIP时机
  147. 3. 保存predicted_time到配置文件(仅用于记录,不参与任何计算)
  148. 4. 发送预测结果到回调接口
  149. Args:
  150. unit_name: str,机组名称,如'RO1', 'RO2', 'RO3', 'RO4'
  151. start_date_str: str,预测起始时间(即actual_time),格式'YYYY-MM-DD HH:MM:SS'
  152. Returns:
  153. bool: 预测成功返回True,失败返回False
  154. 说明:
  155. - predicted_time仅用于记录,不影响下次预测的触发
  156. - 下次预测只由actual_time的变化触发
  157. """
  158. try:
  159. print(f"\n{'='*50}")
  160. print(f"开始预测 {unit_name}")
  161. print(f"基于实际CIP时间: {start_date_str}")
  162. print(f"{'='*50}")
  163. # 步骤1:调用预测逻辑
  164. result_df = run_cip_analysis(strategy=3, start_date=start_date_str, unit_filter=unit_name)
  165. if result_df.empty:
  166. print(f"{unit_name} 预测失败: 无结果")
  167. return False
  168. # 步骤2:提取该机组的结果
  169. unit_result = result_df[result_df['机组类型'] == unit_name]
  170. if unit_result.empty:
  171. print(f"{unit_name} 预测失败: 结果中无此机组")
  172. return False
  173. cip_time = unit_result.iloc[0]['CIP时机']
  174. if pd.isna(cip_time):
  175. print(f"{unit_name} 预测失败: CIP时机为空")
  176. return False
  177. # 步骤3:保存预测时间到配置文件(仅用于记录)
  178. predicted_time_str = cip_time.strftime('%Y-%m-%d %H:%M:%S')
  179. if unit_name not in self.config['cip_times']:
  180. # 配置中不存在该机组,创建新记录
  181. self.config['cip_times'][unit_name] = {
  182. 'actual_time': start_date_str,
  183. 'predicted_time': predicted_time_str
  184. }
  185. else:
  186. # 更新预测时间,保持actual_time不变
  187. if not isinstance(self.config['cip_times'][unit_name], dict):
  188. # 兼容旧格式:转换为新格式
  189. self.config['cip_times'][unit_name] = {
  190. 'actual_time': self.config['cip_times'][unit_name],
  191. 'predicted_time': predicted_time_str
  192. }
  193. else:
  194. # 只更新predicted_time,不修改actual_time
  195. self.config['cip_times'][unit_name]['predicted_time'] = predicted_time_str
  196. if not self.save_config():
  197. print(f"✗ {unit_name} 配置保存失败")
  198. return False
  199. print(f"✓ {unit_name} 预测成功")
  200. print(f" 实际CIP时间: {start_date_str}")
  201. print(f" 预测CIP时间: {predicted_time_str} (仅用于记录)")
  202. # 步骤4:发送预测结果到回调接口
  203. print(f"\n发送预测结果到回调接口...")
  204. try:
  205. if send_decision_to_callback(result_df):
  206. print(f"✓ {unit_name} 回调发送成功")
  207. else:
  208. print(f"✗ {unit_name} 回调发送失败")
  209. except Exception as callback_error:
  210. print(f"✗ {unit_name} 回调发送异常: {callback_error}")
  211. return True
  212. except Exception as e:
  213. print(f"✗ {unit_name} 预测失败: {e}")
  214. import traceback
  215. traceback.print_exc()
  216. return False
  217. def process_changes(self, changes):
  218. """
  219. 处理检测到的actual_time变化
  220. 功能:
  221. 遍历所有检测到的变化,逐个执行预测并更新监控基准
  222. Args:
  223. changes: list,变化列表,格式 [(unit_name, new_actual_time), ...]
  224. 例如:[('RO1', '2025-10-12 12:00:00')]
  225. 处理流程:
  226. 1. 执行预测:调用predict_and_update_unit
  227. 2. 更新监控基准:将新的actual_time记录到last_seen_actual_times
  228. 3. 重新加载配置:确保获取最新数据
  229. 4. 短暂延时:避免频繁操作
  230. """
  231. for unit_name, new_actual_time in changes:
  232. print(f"\n{'='*60}")
  233. print(f"处理 {unit_name} 的实际CIP时间变化")
  234. print(f"{'='*60}")
  235. # 执行预测
  236. if self.predict_and_update_unit(unit_name, new_actual_time):
  237. # 更新监控基准,防止重复处理
  238. self.last_seen_actual_times[unit_name] = new_actual_time
  239. print(f"\n✓ {unit_name} 处理完成,已更新监控基准")
  240. print(f" 新的监控基准: {new_actual_time}")
  241. else:
  242. print(f"\n✗ {unit_name} 处理失败")
  243. # 重新加载配置,确保有最新数据
  244. self.load_config()
  245. time.sleep(2) # 短暂延时,避免频繁操作
  246. def monitor_loop(self):
  247. """
  248. 监控主循环
  249. 功能:
  250. 1. 定期重新加载config.json(每5秒)
  251. 2. 检查actual_time是否有变化
  252. 3. 发现变化后自动触发预测
  253. 4. 发送预测结果到回调接口
  254. 监控频率:
  255. - 每5秒检查一次配置文件
  256. - 发现变化立即处理
  257. - 处理失败不影响后续监控
  258. 说明:
  259. - 只监控actual_time字段的变化
  260. - predicted_time的变化不会触发任何操作
  261. - 相同的actual_time不会重复触发
  262. """
  263. print("\n" + "="*60)
  264. print("开始监控config.json的变化")
  265. print("="*60)
  266. print("监控说明:")
  267. print(" - 监控字段: cip_times.*.actual_time")
  268. print(" - 检查频率: 每5秒")
  269. print(" - 触发条件: actual_time发生变化")
  270. print(" - 自动操作: 执行预测 → 保存predicted_time → 发送回调")
  271. print("="*60)
  272. while self.running:
  273. try:
  274. # 步骤1:重新加载配置
  275. if not self.load_config():
  276. print("✗ 配置加载失败,10秒后重试...")
  277. time.sleep(10)
  278. continue
  279. # 步骤2:检查是否有变化
  280. changes = self.check_for_changes()
  281. if changes:
  282. # 发现变化,立即处理
  283. print(f"\n检测到 {len(changes)} 个机组的actual_time变化")
  284. self.process_changes(changes)
  285. # 步骤3:等待一段时间后再次检查
  286. time.sleep(5) # 每5秒检查一次配置文件
  287. except Exception as e:
  288. print(f"\n✗ 监控循环出错: {e}")
  289. import traceback
  290. traceback.print_exc()
  291. time.sleep(10) # 出错后等待10秒再继续
  292. def start(self):
  293. """
  294. 启动监控系统
  295. """
  296. if self.running:
  297. print("监控系统已在运行")
  298. return
  299. print("\n启动CIP监控系统")
  300. print("="*60)
  301. print("监控模式: 检测config.json中actual_time的变化")
  302. print("工作流程:")
  303. print(" 1. 手动修改config.json中的actual_time")
  304. print(" 2. 系统自动检测到变化")
  305. print(" 3. 基于actual_time执行预测")
  306. print(" 4. 保存predicted_time(仅用于记录)")
  307. print(" 5. 继续监控下一次变化")
  308. print("="*60)
  309. # 启动监控线程
  310. self.running = True
  311. self.monitor_thread = threading.Thread(target=self.monitor_loop, daemon=True)
  312. self.monitor_thread.start()
  313. print("\n监控系统已启动,按Ctrl+C停止")
  314. try:
  315. while self.running:
  316. time.sleep(1)
  317. except KeyboardInterrupt:
  318. self.stop()
  319. def stop(self):
  320. """
  321. 停止监控系统
  322. """
  323. print("\n停止监控系统...")
  324. self.running = False
  325. if self.monitor_thread and self.monitor_thread.is_alive():
  326. self.monitor_thread.join(timeout=5)
  327. print("监控系统已停止")
  328. def main():
  329. """
  330. 主函数
  331. 功能:启动监控系统
  332. """
  333. print("CIP监控系统 - 配置文件监控模式")
  334. print("="*60)
  335. monitor = SmartCIPMonitor()
  336. try:
  337. monitor.start()
  338. except Exception as e:
  339. print(f"系统运行出错: {e}")
  340. import traceback
  341. traceback.print_exc()
  342. finally:
  343. if monitor.running:
  344. monitor.stop()
  345. if __name__ == '__main__':
  346. main()