# -*- coding: utf-8 -*- """ CIP监控系统 - 配置文件监控模式 功能: 1. 监控config.json中actual_time字段的变化 2. 检测到变化后,基于actual_time执行CIP预测 3. 将预测结果保存到predicted_time(仅用于记录) 4. 通过回调接口发送预测结果到决策系统 工作流程: 修改actual_time → 检测变化 → 执行预测 → 保存predicted_time → 发送回调 → 继续监控 """ import os import json import time import threading import pandas as pd import logging from logging.handlers import RotatingFileHandler from datetime import datetime from main_simple import main as run_cip_analysis from main_simple import send_decision_to_callback # 日志系统配置 logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # 日志输出格式 formatter = logging.Formatter( '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 文件日志处理器,单个文件最大5MB,保留3个备份 file_handler = RotatingFileHandler('smart_monitor.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8') file_handler.setFormatter(formatter) # 控制台日志处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) # 添加处理器 logger.addHandler(file_handler) logger.addHandler(console_handler) class SmartCIPMonitor: """ CIP监控器 核心功能: 1. 监控config.json中各机组的actual_time字段 2. 检测到时间变化后,自动触发CIP预测 3. 保存predicted_time到配置文件(仅用于记录) 4. 发送预测结果到回调接口 属性: config_path: 配置文件路径 config: 配置文件内容 running: 监控运行状态 monitor_thread: 监控线程 unit_names: 机组名称列表 last_seen_actual_times: 记录上次看到的各机组actual_time,用于检测变化 """ def __init__(self): """ 初始化监控系统 初始化流程: 1. 设置配置文件路径 2. 加载配置文件 3. 初始化监控基准时间 """ self.config_path = os.path.join(os.path.dirname(__file__), 'config.json') self.config = None self.running = False self.monitor_thread = None self.unit_names = ['RO1', 'RO2', 'RO3', 'RO4'] # 记录上次看到的actual_time,用于检测变化 # 格式:{'RO1': '2025-12-01 12:00:00', 'RO2': '...'} self.last_seen_actual_times = {} if not self.load_config(): logger.error("配置文件加载失败") else: self.initialize_last_seen_times() def load_config(self): """ 加载配置文件 Returns: bool: 加载成功返回True """ try: with open(self.config_path, 'r', encoding='utf-8') as f: self.config = json.load(f) return True except Exception as e: logger.error(f"配置文件加载失败: {e}") return False def save_config(self): """ 保存配置文件 Returns: bool: 保存成功返回True """ try: with open(self.config_path, 'w', encoding='utf-8') as f: json.dump(self.config, f, ensure_ascii=False, indent=2) return True except Exception as e: logger.error(f"配置文件保存失败: {e}") return False def initialize_last_seen_times(self): """ 初始化监控基准时间 功能: 读取config.json中各机组当前的actual_time作为监控基准 后续只有当actual_time与此基准不同时才触发预测 说明: - last_seen_actual_times用于防止重复处理相同的时间 - 支持新旧配置格式兼容 """ if not self.config or 'cip_times' not in self.config: return for unit_name in self.unit_names: if unit_name in self.config['cip_times']: unit_data = self.config['cip_times'][unit_name] if isinstance(unit_data, dict) and 'actual_time' in unit_data: # 新格式:{'actual_time': '...', 'predicted_time': '...'} self.last_seen_actual_times[unit_name] = unit_data['actual_time'] elif isinstance(unit_data, str): # 兼容旧格式:直接是时间字符串 self.last_seen_actual_times[unit_name] = unit_data def check_for_changes(self): """ 检查actual_time是否有变化 功能: 比对当前配置文件中的actual_time和上次记录的值 发现不同则判定为有变化,需要触发预测 Returns: list: 有变化的机组列表,格式 [(unit_name, new_actual_time), ...] 例如:[('RO1', '2025-10-12 12:00:00'), ('RO3', '2025-10-15 08:00:00')] 说明: - 只有actual_time变化才会触发 - predicted_time的变化不会触发 """ changes = [] if not self.config or 'cip_times' not in self.config: return changes for unit_name in self.unit_names: if unit_name in self.config['cip_times']: unit_data = self.config['cip_times'][unit_name] current_actual_time = None if isinstance(unit_data, dict) and 'actual_time' in unit_data: # 新格式:读取actual_time字段 current_actual_time = unit_data['actual_time'] elif isinstance(unit_data, str): # 兼容旧格式:整个值就是时间 current_actual_time = unit_data # 检查是否有变化 if current_actual_time: last_actual_time = self.last_seen_actual_times.get(unit_name) if last_actual_time != current_actual_time: changes.append((unit_name, current_actual_time)) logger.info(f"检测到{unit_name}实际CIP时间变化: 旧值={last_actual_time}, 新值={current_actual_time}") return changes def predict_and_update_unit(self, unit_name, start_date_str): """ 预测单个机组并保存结果到配置 功能: 1. 调用main_simple.py中的预测函数 2. 提取预测结果中的CIP时机 3. 保存predicted_time到配置文件(仅用于记录,不参与任何计算) 4. 发送预测结果到回调接口 Args: unit_name: str,机组名称,如'RO1', 'RO2', 'RO3', 'RO4' start_date_str: str,预测起始时间(即actual_time),格式'YYYY-MM-DD HH:MM:SS' Returns: bool: 预测成功返回True,失败返回False 说明: - predicted_time仅用于记录,不影响下次预测的触发 - 下次预测只由actual_time的变化触发 """ try: logger.info(f"[{unit_name}] 开始预测,基于实际CIP时间: {start_date_str}") # 步骤1:调用预测逻辑 result_df = run_cip_analysis(strategy=3, start_date=start_date_str, unit_filter=unit_name) if result_df.empty: logger.warning(f"{unit_name} 预测失败: 无结果") return False # 步骤2:提取该机组的结果 unit_result = result_df[result_df['机组类型'] == unit_name] if unit_result.empty: logger.warning(f"{unit_name} 预测失败: 结果中无此机组") return False cip_time = unit_result.iloc[0]['CIP时机'] if pd.isna(cip_time): logger.warning(f"{unit_name} 预测失败: CIP时机为空") return False # 步骤3:保存预测时间到配置文件(仅用于记录) predicted_time_str = cip_time.strftime('%Y-%m-%d %H:%M:%S') if unit_name not in self.config['cip_times']: # 配置中不存在该机组,创建新记录 self.config['cip_times'][unit_name] = { 'actual_time': start_date_str, 'predicted_time': predicted_time_str } else: # 更新预测时间,保持actual_time不变 if not isinstance(self.config['cip_times'][unit_name], dict): # 兼容旧格式:转换为新格式 self.config['cip_times'][unit_name] = { 'actual_time': self.config['cip_times'][unit_name], 'predicted_time': predicted_time_str } else: # 只更新predicted_time,不修改actual_time self.config['cip_times'][unit_name]['predicted_time'] = predicted_time_str if not self.save_config(): logger.error(f"[{unit_name}] 配置保存失败") return False logger.info(f"[{unit_name}] 预测成功 → {predicted_time_str}") # 步骤4:发送预测结果到回调接口 try: if send_decision_to_callback(result_df): logger.info(f"[{unit_name}] 回调发送成功") else: logger.error(f"[{unit_name}] 回调发送失败") except Exception as callback_error: logger.error(f"[{unit_name}] 回调发送异常: {callback_error}") return True except Exception as e: logger.error(f"[{unit_name}] 预测失败: {e}", exc_info=True) return False def process_changes(self, changes): """ 处理检测到的actual_time变化 功能: 遍历所有检测到的变化,逐个执行预测并更新监控基准 Args: changes: list,变化列表,格式 [(unit_name, new_actual_time), ...] 例如:[('RO1', '2025-10-12 12:00:00')] 处理流程: 1. 执行预测:调用predict_and_update_unit 2. 更新监控基准:将新的actual_time记录到last_seen_actual_times 3. 重新加载配置:确保获取最新数据 4. 短暂延时:避免频繁操作 """ for unit_name, new_actual_time in changes: # 执行预测 if self.predict_and_update_unit(unit_name, new_actual_time): # 更新监控基准,防止重复处理 self.last_seen_actual_times[unit_name] = new_actual_time logger.info(f"[{unit_name}] 处理完成") else: logger.error(f"[{unit_name}] 处理失败") # 重新加载配置,确保有最新数据 self.load_config() time.sleep(2) # 短暂延时,避免频繁操作 def monitor_loop(self): """ 监控主循环 功能: 1. 定期重新加载config.json(每5秒) 2. 检查actual_time是否有变化 3. 发现变化后自动触发预测 4. 发送预测结果到回调接口 监控频率: - 每5秒检查一次配置文件 - 发现变化立即处理 - 处理失败不影响后续监控 说明: - 只监控actual_time字段的变化 - predicted_time的变化不会触发任何操作 - 相同的actual_time不会重复触发 """ logger.info("开始监控 config.json 变化(每5秒检查一次)") while self.running: try: # 步骤1:重新加载配置 if not self.load_config(): logger.error("配置加载失败,10秒后重试...") time.sleep(10) continue # 步骤2:检查是否有变化 changes = self.check_for_changes() if changes: # 发现变化,立即处理 self.process_changes(changes) # 步骤3:等待一段时间后再次检查 time.sleep(5) # 每5秒检查一次配置文件 except Exception as e: logger.error(f"监控循环出错: {e}", exc_info=True) time.sleep(10) # 出错后等待10秒再继续 def start(self): """ 启动监控系统 """ if self.running: logger.warning("监控系统已在运行") return logger.info("CIP监控系统启动(监控 config.json 中的 actual_time 变化)") # 启动监控线程 self.running = True self.monitor_thread = threading.Thread(target=self.monitor_loop, daemon=True) self.monitor_thread.start() try: while self.running: time.sleep(1) except KeyboardInterrupt: self.stop() def stop(self): """ 停止监控系统 """ logger.info("停止监控系统...") self.running = False if self.monitor_thread and self.monitor_thread.is_alive(): self.monitor_thread.join(timeout=5) logger.info("监控系统已停止") def main(): """ 主函数 功能:启动监控系统 """ monitor = SmartCIPMonitor() try: monitor.start() except Exception as e: logger.error(f"系统运行出错: {e}", exc_info=True) finally: if monitor.running: monitor.stop() if __name__ == '__main__': main()