smart_monitor.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. # -*- coding: utf-8 -*-
  2. """
  3. CIP监控系统 - 配置文件监控模式
  4. 功能:
  5. 1. 监控config.json中actual_time字段的变化
  6. 2. 检测到变化且actual_time < 当前时间时,基于actual_time执行CIP预测
  7. 3. 将预测结果保存到predicted_time(仅用于记录)
  8. 4. 通过回调接口发送预测结果到决策系统
  9. 5. 继续等待actual_time的下次更新
  10. 触发条件(同时满足):
  11. - actual_time 发生了变化
  12. - actual_time < 当前时间(已过期)
  13. 工作流程:
  14. 修改actual_time(且 < 当前时间)→ 检测变化 → 执行预测 → 保存predicted_time → 发送回调 → 继续监控actual_time
  15. """
  16. import os
  17. import json
  18. import time
  19. import threading
  20. import pandas as pd
  21. import logging
  22. from logging.handlers import RotatingFileHandler
  23. from datetime import datetime
  24. from main_simple import main as run_cip_analysis
  25. from main_simple import send_decision_to_callback
  26. # 日志系统配置
  27. logger = logging.getLogger(__name__)
  28. logger.setLevel(logging.INFO)
  29. # 日志输出格式
  30. formatter = logging.Formatter(
  31. '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
  32. datefmt='%Y-%m-%d %H:%M:%S'
  33. )
  34. # 文件日志处理器,单个文件最大5MB,保留3个备份
  35. file_handler = RotatingFileHandler('smart_monitor.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8')
  36. file_handler.setFormatter(formatter)
  37. # 控制台日志处理器
  38. console_handler = logging.StreamHandler()
  39. console_handler.setFormatter(formatter)
  40. # 添加处理器
  41. logger.addHandler(file_handler)
  42. logger.addHandler(console_handler)
  43. class SmartCIPMonitor:
  44. """
  45. CIP监控器
  46. 核心功能:
  47. 1. 监控config.json中各机组的actual_time字段
  48. 2. 检测到时间变化后,自动触发CIP预测
  49. 3. 保存predicted_time到配置文件(仅用于记录)
  50. 4. 发送预测结果到回调接口
  51. 属性:
  52. config_path: 配置文件路径
  53. config: 配置文件内容
  54. running: 监控运行状态
  55. monitor_thread: 监控线程
  56. unit_names: 机组名称列表
  57. last_seen_actual_times: 记录上次看到的各机组actual_time,用于检测变化
  58. """
  59. def __init__(self):
  60. """
  61. 初始化监控系统
  62. 初始化流程:
  63. 1. 设置配置文件路径
  64. 2. 加载配置文件
  65. 3. 初始化监控基准时间
  66. """
  67. self.config_path = os.path.join(os.path.dirname(__file__), 'config.json')
  68. self.config = None
  69. self.running = False
  70. self.monitor_thread = None
  71. self.unit_names = ['RO1', 'RO2', 'RO3', 'RO4']
  72. # 记录上次看到的actual_time,用于检测变化
  73. # 格式:{'RO1': '2025-12-01 12:00:00', 'RO2': '...'}
  74. self.last_seen_actual_times = {}
  75. if not self.load_config():
  76. logger.error("配置文件加载失败")
  77. else:
  78. self.initialize_last_seen_times()
  79. def load_config(self):
  80. """
  81. 加载配置文件
  82. Returns:
  83. bool: 加载成功返回True
  84. """
  85. try:
  86. with open(self.config_path, 'r', encoding='utf-8') as f:
  87. self.config = json.load(f)
  88. return True
  89. except Exception as e:
  90. logger.error(f"配置文件加载失败: {e}")
  91. return False
  92. def save_config(self):
  93. """
  94. 保存配置文件
  95. Returns:
  96. bool: 保存成功返回True
  97. """
  98. try:
  99. with open(self.config_path, 'w', encoding='utf-8') as f:
  100. json.dump(self.config, f, ensure_ascii=False, indent=2)
  101. return True
  102. except Exception as e:
  103. logger.error(f"配置文件保存失败: {e}")
  104. return False
  105. def initialize_last_seen_times(self):
  106. """
  107. 初始化监控基准时间
  108. 功能:
  109. 读取config.json中各机组当前的actual_time作为监控基准
  110. 但是:只记录未来的时间,已过期的时间不记录,以便首次检查时能触发预测
  111. 说明:
  112. - last_seen_actual_times用于防止重复处理相同的时间
  113. - 支持新旧配置格式兼容
  114. - 已过期的时间(< 当前时间)不记录,确保首次检查时能被检测为"变化"
  115. """
  116. if not self.config or 'cip_times' not in self.config:
  117. return
  118. current_time = datetime.now()
  119. for unit_name in self.unit_names:
  120. if unit_name in self.config['cip_times']:
  121. unit_data = self.config['cip_times'][unit_name]
  122. actual_time_str = None
  123. if isinstance(unit_data, dict) and 'actual_time' in unit_data:
  124. # 新格式:{'actual_time': '...', 'predicted_time': '...'}
  125. actual_time_str = unit_data['actual_time']
  126. elif isinstance(unit_data, str):
  127. # 兼容旧格式:直接是时间字符串
  128. actual_time_str = unit_data
  129. # 只记录未来的时间,已过期的时间不记录
  130. if actual_time_str:
  131. actual_time_dt = self.parse_time_string(actual_time_str)
  132. if actual_time_dt and actual_time_dt >= current_time:
  133. # 未来时间:记录基准,避免触发预测
  134. self.last_seen_actual_times[unit_name] = actual_time_str
  135. logger.info(f"{unit_name}: 未来时间,不触发预测")
  136. elif actual_time_dt:
  137. # 已过期时间:不记录基准,让首次检查时能检测到变化
  138. logger.info(f"{unit_name}: 已过期,待触发预测")
  139. # 如果为None则不记录
  140. def parse_time_string(self, time_str):
  141. """
  142. 解析时间字符串为datetime对象
  143. Args:
  144. time_str: str,时间字符串,格式'YYYY-MM-DD HH:MM:SS'
  145. Returns:
  146. datetime: 解析后的datetime对象,解析失败返回None
  147. """
  148. if not time_str or not isinstance(time_str, str):
  149. return None
  150. try:
  151. return datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
  152. except Exception as e:
  153. logger.warning(f"时间字符串解析失败: {time_str}, 错误: {e}")
  154. return None
  155. def check_for_changes(self):
  156. """
  157. 检查actual_time是否有变化且小于当前时间
  158. 功能:
  159. 比对当前配置文件中的actual_time和上次记录的值
  160. 发现不同且小于当前时间则判定为有变化,需要触发预测
  161. Returns:
  162. list: 有变化的机组列表,格式 [(unit_name, new_actual_time), ...]
  163. 例如:[('RO1', '2025-10-12 12:00:00'), ('RO3', '2025-10-15 08:00:00')]
  164. 说明:
  165. - 只有actual_time变化才会触发
  166. - 必须满足 actual_time < 当前时间
  167. - predicted_time的变化不会触发
  168. """
  169. changes = []
  170. current_time = datetime.now()
  171. if not self.config or 'cip_times' not in self.config:
  172. return changes
  173. for unit_name in self.unit_names:
  174. if unit_name in self.config['cip_times']:
  175. unit_data = self.config['cip_times'][unit_name]
  176. current_actual_time = None
  177. if isinstance(unit_data, dict) and 'actual_time' in unit_data:
  178. # 新格式:读取actual_time字段
  179. current_actual_time = unit_data['actual_time']
  180. elif isinstance(unit_data, str):
  181. # 兼容旧格式:整个值就是时间
  182. current_actual_time = unit_data
  183. # 检查是否有变化
  184. if current_actual_time:
  185. last_actual_time = self.last_seen_actual_times.get(unit_name)
  186. # 条件1:actual_time 发生了变化
  187. if last_actual_time != current_actual_time:
  188. # 条件2:actual_time 小于当前时间
  189. actual_time_dt = self.parse_time_string(current_actual_time)
  190. if actual_time_dt is None:
  191. logger.warning(f"{unit_name} actual_time 格式无效: {current_actual_time}")
  192. continue
  193. if actual_time_dt < current_time:
  194. changes.append((unit_name, current_actual_time))
  195. logger.info(f"{unit_name}: 检测到变化,触发预测 ({current_actual_time})")
  196. else:
  197. logger.info(f"{unit_name}: 未来时间,不触发 ({current_actual_time})")
  198. # 更新监控基准,避免重复提示
  199. self.last_seen_actual_times[unit_name] = current_actual_time
  200. return changes
  201. def predict_and_update_unit(self, unit_name, start_date_str):
  202. """
  203. 预测单个机组并保存结果到配置
  204. 功能:
  205. 1. 调用main_simple.py中的预测函数
  206. 2. 提取预测结果中的CIP时机
  207. 3. 保存predicted_time到配置文件(仅用于记录,不参与任何计算)
  208. 4. 发送预测结果到回调接口
  209. Args:
  210. unit_name: str,机组名称,如'RO1', 'RO2', 'RO3', 'RO4'
  211. start_date_str: str,预测起始时间(即actual_time),格式'YYYY-MM-DD HH:MM:SS'
  212. Returns:
  213. bool: 预测成功返回True,失败返回False
  214. 说明:
  215. - predicted_time仅用于记录,不影响下次预测的触发
  216. - 下次预测只由actual_time的变化触发
  217. """
  218. try:
  219. logger.info(f"[{unit_name}] 开始预测 (基于 {start_date_str})")
  220. # 步骤1:调用预测逻辑
  221. result_df = run_cip_analysis(strategy=3, start_date=start_date_str, unit_filter=unit_name)
  222. if result_df.empty:
  223. logger.warning(f"[{unit_name}] 预测失败: 无结果")
  224. return False
  225. # 步骤2:提取该机组的结果
  226. unit_result = result_df[result_df['机组类型'] == unit_name]
  227. if unit_result.empty:
  228. logger.warning(f"[{unit_name}] 预测失败: 结果中无此机组")
  229. return False
  230. cip_time = unit_result.iloc[0]['CIP时机']
  231. if pd.isna(cip_time):
  232. logger.warning(f"[{unit_name}] 预测失败: CIP时机为空")
  233. return False
  234. # 步骤3:保存预测时间到配置文件(仅用于记录)
  235. predicted_time_str = cip_time.strftime('%Y-%m-%d %H:%M:%S')
  236. if unit_name not in self.config['cip_times']:
  237. # 配置中不存在该机组,创建新记录
  238. self.config['cip_times'][unit_name] = {
  239. 'actual_time': start_date_str,
  240. 'predicted_time': predicted_time_str
  241. }
  242. else:
  243. # 更新预测时间,保持actual_time不变
  244. if not isinstance(self.config['cip_times'][unit_name], dict):
  245. # 兼容旧格式:转换为新格式
  246. self.config['cip_times'][unit_name] = {
  247. 'actual_time': self.config['cip_times'][unit_name],
  248. 'predicted_time': predicted_time_str
  249. }
  250. else:
  251. # 只更新predicted_time,不修改actual_time
  252. self.config['cip_times'][unit_name]['predicted_time'] = predicted_time_str
  253. if not self.save_config():
  254. logger.error(f"[{unit_name}] 配置保存失败")
  255. return False
  256. logger.info(f"[{unit_name}] 预测成功 → {predicted_time_str}")
  257. # 步骤4:发送预测结果到回调接口
  258. send_decision_to_callback(result_df)
  259. return True
  260. except Exception as e:
  261. logger.error(f"[{unit_name}] 预测失败: {e}")
  262. return False
  263. def process_changes(self, changes):
  264. """
  265. 处理检测到的actual_time变化
  266. 功能:
  267. 遍历所有检测到的变化,逐个执行预测并更新监控基准
  268. Args:
  269. changes: list,变化列表,格式 [(unit_name, new_actual_time), ...]
  270. 例如:[('RO1', '2025-10-12 12:00:00')]
  271. 处理流程:
  272. 1. 执行预测:调用predict_and_update_unit
  273. 2. 更新监控基准:将新的actual_time记录到last_seen_actual_times(无论成功失败)
  274. 3. 重新加载配置:确保获取最新数据
  275. 4. 短暂延时:避免频繁操作
  276. """
  277. for unit_name, new_actual_time in changes:
  278. # 执行预测
  279. success = self.predict_and_update_unit(unit_name, new_actual_time)
  280. # ⚠️ 关键修复:无论预测成功还是失败,都要更新监控基准,避免无限循环
  281. self.last_seen_actual_times[unit_name] = new_actual_time
  282. if not success:
  283. logger.error(f"[{unit_name}] 处理失败,已记录时间避免重复")
  284. # 重新加载配置,确保有最新数据
  285. self.load_config()
  286. time.sleep(2) # 短暂延时,避免频繁操作
  287. def monitor_loop(self):
  288. """
  289. 监控主循环
  290. 功能:
  291. 1. 定期重新加载config.json(每5秒)
  292. 2. 检查actual_time是否有变化且小于当前时间
  293. 3. 发现满足条件的变化后自动触发预测
  294. 4. 发送预测结果到回调接口
  295. 监控频率:
  296. - 每5秒检查一次配置文件
  297. - 发现变化立即处理
  298. - 处理失败不影响后续监控
  299. 触发条件(同时满足):
  300. - actual_time 发生了变化
  301. - actual_time < 当前时间(已过期)
  302. 说明:
  303. - 只监控actual_time字段的变化
  304. - predicted_time的变化不会触发任何操作
  305. - 相同的actual_time不会重复触发
  306. - 未来时间的actual_time不会触发预测
  307. """
  308. logger.info("监控启动 (每5秒检查一次,触发条件: actual_time变化且<当前时间)")
  309. while self.running:
  310. try:
  311. # 步骤1:重新加载配置
  312. if not self.load_config():
  313. logger.error("配置加载失败,10秒后重试...")
  314. time.sleep(10)
  315. continue
  316. # 步骤2:检查是否有变化
  317. changes = self.check_for_changes()
  318. if changes:
  319. # 发现变化,立即处理
  320. self.process_changes(changes)
  321. # 步骤3:等待一段时间后再次检查
  322. time.sleep(5) # 每5秒检查一次配置文件
  323. except Exception as e:
  324. logger.error(f"监控循环出错: {e}", exc_info=True)
  325. time.sleep(10) # 出错后等待10秒再继续
  326. def start(self):
  327. """
  328. 启动监控系统
  329. """
  330. if self.running:
  331. logger.warning("监控系统已在运行")
  332. return
  333. logger.info("CIP监控系统启动(监控 config.json 中的 actual_time 变化)")
  334. # 启动监控线程
  335. self.running = True
  336. self.monitor_thread = threading.Thread(target=self.monitor_loop, daemon=True)
  337. self.monitor_thread.start()
  338. try:
  339. while self.running:
  340. time.sleep(1)
  341. except KeyboardInterrupt:
  342. self.stop()
  343. def stop(self):
  344. """
  345. 停止监控系统
  346. """
  347. logger.info("停止监控系统...")
  348. self.running = False
  349. if self.monitor_thread and self.monitor_thread.is_alive():
  350. self.monitor_thread.join(timeout=5)
  351. logger.info("监控系统已停止")
  352. def main():
  353. """
  354. 主函数
  355. 功能:启动监控系统
  356. """
  357. monitor = SmartCIPMonitor()
  358. try:
  359. monitor.start()
  360. except Exception as e:
  361. logger.error(f"系统运行出错: {e}", exc_info=True)
  362. finally:
  363. if monitor.running:
  364. monitor.stop()
  365. if __name__ == '__main__':
  366. main()