# -*- coding: utf-8 -*- """ CIP监控系统 - 配置文件监控模式(支持分段监控) 功能: 1. 监控config.json中各机组段(一段、二段)的actual_time字段变化 2. 检测到变化且actual_time < 当前时间时,基于actual_time执行CIP预测 3. 将预测结果保存到对应段的predicted_time(仅用于记录) 4. 通过回调接口发送预测结果到决策系统 5. 继续等待actual_time的下次更新 触发条件(同时满足): - actual_time 发生了变化 - actual_time < 当前时间(已过期) 工作流程: 修改actual_time(且 < 当前时间)→ 检测变化 → 执行预测 → 保存predicted_time → 发送回调 → 继续监控actual_time 分段监控说明: - 支持独立监控每个段(RO1-一段、RO1-二段等) - 每个段有独立的actual_time和predicted_time - 可以为不同段设置不同的CIP时间 - 预测时会分别输出每个段的CIP时机 """ import os import json import time import threading import pandas as pd import logging from logging.handlers import RotatingFileHandler from datetime import datetime from main_simple import main as run_cip_analysis from main_simple import send_decision_to_callback # 日志系统配置 logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # 禁止传播到root logger,避免重复输出 logger.propagate = False # 日志输出格式 formatter = logging.Formatter( '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 清除已有的处理器(防止重复添加) if logger.handlers: for handler in logger.handlers[:]: handler.close() logger.removeHandler(handler) # 文件日志处理器,单个文件最大5MB,保留3个备份 file_handler = RotatingFileHandler('smart_monitor.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8') file_handler.setFormatter(formatter) # 控制台日志处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) # 添加处理器 logger.addHandler(file_handler) logger.addHandler(console_handler) class SmartCIPMonitor: """ CIP监控器(支持分段监控) 核心功能: 1. 监控config.json中各机组段的actual_time字段 2. 检测到时间变化后,自动触发CIP预测 3. 保存predicted_time到配置文件(仅用于记录) 4. 发送预测结果到回调接口 属性: config_path: 配置文件路径 config: 配置文件内容 running: 监控运行状态 monitor_thread: 监控线程 unit_names: 机组段名称列表(如['RO1-一段', 'RO1-二段', ...]) last_seen_actual_times: 记录上次看到的各机组段actual_time,用于检测变化 """ def __init__(self): """ 初始化监控系统 初始化流程: 1. 设置配置文件路径 2. 加载配置文件 3. 初始化监控基准时间 """ self.config_path = os.path.join(os.path.dirname(__file__), 'config.json') self.config = None self.running = False self.monitor_thread = None # 支持分段监控:每个机组包含一段和二段 self.unit_names = [ 'RO1-一段', 'RO1-二段', 'RO2-一段', 'RO2-二段', 'RO3-一段', 'RO3-二段', 'RO4-一段', 'RO4-二段' ] # 记录上次看到的actual_time,用于检测变化 # 格式:{'RO1-一段': '2025-12-01 12:00:00', 'RO1-二段': '...', ...} self.last_seen_actual_times = {} if not self.load_config(): logger.error("配置文件加载失败") else: self.initialize_last_seen_times() def load_config(self): """ 加载配置文件 Returns: bool: 加载成功返回True """ try: with open(self.config_path, 'r', encoding='utf-8') as f: self.config = json.load(f) return True except Exception as e: logger.error(f"配置文件加载失败: {e}") return False def save_config(self): """ 保存配置文件 Returns: bool: 保存成功返回True """ try: with open(self.config_path, 'w', encoding='utf-8') as f: json.dump(self.config, f, ensure_ascii=False, indent=2) return True except Exception as e: logger.error(f"配置文件保存失败: {e}") return False def initialize_last_seen_times(self): """ 初始化监控基准时间(支持分段) 功能: 读取config.json中各机组段当前的actual_time作为监控基准 但是:只记录未来的时间,已过期的时间不记录,以便首次检查时能触发预测 说明: - last_seen_actual_times用于防止重复处理相同的时间 - 支持分段配置(如'RO1-一段', 'RO1-二段') - 支持新旧配置格式兼容 - 已过期的时间(< 当前时间)不记录,确保首次检查时能被检测为"变化" """ if not self.config or 'cip_times' not in self.config: return current_time = datetime.now() for unit_name in self.unit_names: if unit_name in self.config['cip_times']: unit_data = self.config['cip_times'][unit_name] actual_time_str = None if isinstance(unit_data, dict) and 'actual_time' in unit_data: # 新格式:{'actual_time': '...', 'predicted_time': '...'} actual_time_str = unit_data['actual_time'] elif isinstance(unit_data, str): # 兼容旧格式:直接是时间字符串 actual_time_str = unit_data # 只记录未来的时间,已过期的时间不记录 if actual_time_str: actual_time_dt = self.parse_time_string(actual_time_str) if actual_time_dt and actual_time_dt >= current_time: # 未来时间:记录基准,避免触发预测 self.last_seen_actual_times[unit_name] = actual_time_str logger.info(f"{unit_name}: 未来时间,不触发预测") elif actual_time_dt: # 已过期时间:不记录基准,让首次检查时能检测到变化 logger.info(f"{unit_name}: 已过期,待触发预测") # 如果为None则不记录 def parse_time_string(self, time_str): """ 解析时间字符串为datetime对象 Args: time_str: str,时间字符串,格式'YYYY-MM-DD HH:MM:SS' Returns: datetime: 解析后的datetime对象,解析失败返回None """ if not time_str or not isinstance(time_str, str): return None try: return datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S') except Exception as e: logger.warning(f"时间字符串解析失败: {time_str}, 错误: {e}") return None def check_for_changes(self): """ 检查actual_time是否有变化且小于当前时间(支持分段) 功能: 比对当前配置文件中的actual_time和上次记录的值 发现不同且小于当前时间则判定为有变化,需要触发预测 Returns: list: 有变化的机组段列表,格式 [(unit_name, new_actual_time), ...] 例如:[('RO1-一段', '2025-10-12 12:00:00'), ('RO1-二段', '2025-10-15 08:00:00')] 说明: - 只有actual_time变化才会触发 - 必须满足 actual_time < 当前时间 - predicted_time的变化不会触发 - 支持分段独立监控(每个段可以有不同的actual_time) """ changes = [] current_time = datetime.now() if not self.config or 'cip_times' not in self.config: return changes for unit_name in self.unit_names: if unit_name in self.config['cip_times']: unit_data = self.config['cip_times'][unit_name] current_actual_time = None if isinstance(unit_data, dict) and 'actual_time' in unit_data: # 新格式:读取actual_time字段 current_actual_time = unit_data['actual_time'] elif isinstance(unit_data, str): # 兼容旧格式:整个值就是时间 current_actual_time = unit_data # 检查是否有变化 if current_actual_time: last_actual_time = self.last_seen_actual_times.get(unit_name) # 条件1:actual_time 发生了变化 if last_actual_time != current_actual_time: # 条件2:actual_time 小于当前时间 actual_time_dt = self.parse_time_string(current_actual_time) if actual_time_dt is None: logger.warning(f"{unit_name} actual_time 格式无效: {current_actual_time}") continue if actual_time_dt < current_time: changes.append((unit_name, current_actual_time)) logger.info(f"{unit_name}: 检测到变化,触发预测 ({current_actual_time})") else: logger.info(f"{unit_name}: 未来时间,不触发 ({current_actual_time})") # 更新监控基准,避免重复提示 self.last_seen_actual_times[unit_name] = current_actual_time return changes def extract_unit_base_name(self, unit_name): """ 从分段机组名称中提取基础机组号 Args: unit_name: str,分段机组名称,如'RO1-一段' Returns: str: 基础机组号,如'RO1' 示例: 'RO1-一段' -> 'RO1' 'RO2-二段' -> 'RO2' """ if '-' in unit_name: return unit_name.split('-')[0] return unit_name def predict_and_update_unit(self, unit_name, start_date_str): """ 预测并只处理触发段的结果(严格控制:谁触发只处理谁) 功能: 1. 调用预测函数(会预测整个机组的所有段,这是模型限制) 2. 只提取触发段的结果(其他段的结果丢弃) 3. 只保存触发段的predicted_time到配置文件 4. 只发送触发段的回调 5. 其他未触发的段等它们自己触发后再处理 Args: unit_name: str,触发的机组段名称,如'RO1-一段', 'RO2-二段' start_date_str: str,预测起始时间(即actual_time),格式'YYYY-MM-DD HH:MM:SS' Returns: bool: 预测成功返回True,失败返回False 说明: - 严格的分段控制:只处理触发段,其他段不保存、不发送 - predicted_time仅用于记录,不影响下次预测的触发 - 下次预测只由actual_time的变化触发 示例流程: 1. RO1-一段触发 → 预测RO1 → 只处理RO1-一段 → RO1-二段丢弃 2. 稍后RO1-二段触发 → 预测RO1 → 只处理RO1-二段 → RO1-一段丢弃(已处理过) """ try: logger.info(f"[{unit_name}] 开始预测 (基于 {start_date_str})") # 步骤1:提取基础机组号(用于unit_filter参数) base_unit_name = self.extract_unit_base_name(unit_name) # 步骤2:调用预测逻辑(会预测整个机组的所有段,这是模型限制) # 禁用 main() 内部的回调发送,由 smart_monitor 统一管理 logger.info(f"[{unit_name}] 执行预测 (unit_filter={base_unit_name},会预测整个机组)") result_df = run_cip_analysis( strategy=3, start_date=start_date_str, unit_filter=base_unit_name, separate_stages=True, # 明确指定分段输出 send_callback=False # 禁用内部回调,避免重复发送 ) if result_df.empty: logger.warning(f"[{unit_name}] 预测失败: 无结果") return False # 显示预测得到的所有段 all_predicted_stages = result_df['机组类型'].tolist() logger.info(f"[{unit_name}] 预测完成,得到 {len(all_predicted_stages)} 个段: {all_predicted_stages}") # 步骤3:只提取触发段的结果(关键:只处理触发的段!) logger.info(f"[{unit_name}] 只提取触发段 '{unit_name}' 的结果,其他段丢弃") unit_result = result_df[result_df['机组类型'] == unit_name] if unit_result.empty: logger.warning(f"[{unit_name}] 预测失败: 结果中无此机组段") logger.info(f"[{unit_name}] 可用的机组类型: {result_df['机组类型'].tolist()}") return False cip_time = unit_result.iloc[0]['CIP时机'] if pd.isna(cip_time): logger.warning(f"[{unit_name}] 预测失败: CIP时机为空") return False # 步骤4:只保存触发段的预测时间到配置文件(其他段不保存) predicted_time_str = cip_time.strftime('%Y-%m-%d %H:%M:%S') logger.info(f"[{unit_name}] 只保存触发段的 predicted_time: {predicted_time_str}") if unit_name not in self.config['cip_times']: # 配置中不存在该机组段,创建新记录 self.config['cip_times'][unit_name] = { 'actual_time': start_date_str, 'predicted_time': predicted_time_str } else: # 更新预测时间,保持actual_time不变 if not isinstance(self.config['cip_times'][unit_name], dict): # 兼容旧格式:转换为新格式 self.config['cip_times'][unit_name] = { 'actual_time': self.config['cip_times'][unit_name], 'predicted_time': predicted_time_str } else: # 只更新predicted_time,不修改actual_time self.config['cip_times'][unit_name]['predicted_time'] = predicted_time_str if not self.save_config(): logger.error(f"[{unit_name}] 配置保存失败") return False logger.info(f"[{unit_name}] 预测成功 → {predicted_time_str}") # 步骤5:只发送触发段的回调(其他段不发送) logger.info(f"[{unit_name}] 只发送触发段的回调") send_decision_to_callback(unit_result) # 明确说明哪些段被丢弃了 other_stages = [s for s in all_predicted_stages if s != unit_name] if other_stages: logger.info(f"[{unit_name}] 其他段的结果已丢弃,等它们自己触发后再处理: {other_stages}") return True except Exception as e: logger.error(f"[{unit_name}] 预测失败: {e}", exc_info=True) return False def process_changes(self, changes): """ 处理检测到的actual_time变化 功能: 遍历所有检测到的变化,逐个执行预测并更新监控基准 Args: changes: list,变化列表,格式 [(unit_name, new_actual_time), ...] 例如:[('RO1', '2025-10-12 12:00:00')] 处理流程: 1. 执行预测:调用predict_and_update_unit 2. 更新监控基准:将新的actual_time记录到last_seen_actual_times(无论成功失败) 3. 重新加载配置:确保获取最新数据 4. 短暂延时:避免频繁操作 """ for unit_name, new_actual_time in changes: # 执行预测 success = self.predict_and_update_unit(unit_name, new_actual_time) # 无论预测成功还是失败,都要更新监控基准,避免无限循环 self.last_seen_actual_times[unit_name] = new_actual_time if not success: logger.error(f"[{unit_name}] 处理失败,已记录时间避免重复") # 重新加载配置,确保有最新数据 self.load_config() time.sleep(2) # 短暂延时,避免频繁操作 def monitor_loop(self): """ 监控主循环 功能: 1. 定期重新加载config.json(每5秒) 2. 检查actual_time是否有变化且小于当前时间 3. 发现满足条件的变化后自动触发预测 4. 发送预测结果到回调接口 监控频率: - 每5秒检查一次配置文件 - 发现变化立即处理 - 处理失败不影响后续监控 触发条件(同时满足): - actual_time 发生了变化 - actual_time < 当前时间(已过期) 说明: - 只监控actual_time字段的变化 - predicted_time的变化不会触发任何操作 - 相同的actual_time不会重复触发 - 未来时间的actual_time不会触发预测 """ logger.info("监控启动 (每5秒检查一次,触发条件: actual_time变化且<当前时间)") while self.running: try: # 步骤1:重新加载配置 if not self.load_config(): logger.error("配置加载失败,10秒后重试...") time.sleep(10) continue # 步骤2:检查是否有变化 changes = self.check_for_changes() if changes: # 发现变化,立即处理 self.process_changes(changes) # 步骤3:等待一段时间后再次检查 time.sleep(5) # 每5秒检查一次配置文件 except Exception as e: logger.error(f"监控循环出错: {e}", exc_info=True) time.sleep(10) # 出错后等待10秒再继续 def start(self): """ 启动监控系统 """ if self.running: logger.warning("监控系统已在运行") return logger.info("CIP监控系统启动(监控 config.json 中的 actual_time 变化)") # 启动监控线程 self.running = True self.monitor_thread = threading.Thread(target=self.monitor_loop, daemon=True) self.monitor_thread.start() try: while self.running: time.sleep(1) except KeyboardInterrupt: self.stop() def stop(self): """ 停止监控系统 """ logger.info("停止监控系统...") self.running = False if self.monitor_thread and self.monitor_thread.is_alive(): self.monitor_thread.join(timeout=5) logger.info("监控系统已停止") def main(): """ 主函数 功能:启动监控系统 """ monitor = SmartCIPMonitor() try: monitor.start() except Exception as e: logger.error(f"系统运行出错: {e}", exc_info=True) finally: if monitor.running: monitor.stop() if __name__ == '__main__': main()