| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524 |
- # -*- coding: utf-8 -*-
- """
- CIP监控系统 - 配置文件监控模式(支持分段监控)
- 功能:
- 1. 监控config.json中各机组段(一段、二段)的actual_time字段变化
- 2. 检测到变化且actual_time < 当前时间时,基于actual_time执行CIP预测
- 3. 将预测结果保存到对应段的predicted_time(仅用于记录)
- 4. 通过回调接口发送预测结果到决策系统
- 5. 继续等待actual_time的下次更新
- 触发条件(同时满足):
- - actual_time 发生了变化
- - actual_time < 当前时间(已过期)
- 工作流程:
- 修改actual_time(且 < 当前时间)→ 检测变化 → 执行预测 → 保存predicted_time → 发送回调 → 继续监控actual_time
- 分段监控说明:
- - 支持独立监控每个段(RO1-一段、RO1-二段等)
- - 每个段有独立的actual_time和predicted_time
- - 可以为不同段设置不同的CIP时间
- - 预测时会分别输出每个段的CIP时机
- """
- import os
- import json
- import time
- import threading
- import pandas as pd
- import logging
- from logging.handlers import RotatingFileHandler
- from datetime import datetime
- from main_simple import main as run_cip_analysis
- from main_simple import send_decision_to_callback
- # 日志系统配置
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.INFO)
- # 禁止传播到root logger,避免重复输出
- logger.propagate = False
- # 日志输出格式
- formatter = logging.Formatter(
- '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S'
- )
- # 清除已有的处理器(防止重复添加)
- if logger.handlers:
- for handler in logger.handlers[:]:
- handler.close()
- logger.removeHandler(handler)
- # 文件日志处理器,单个文件最大5MB,保留3个备份
- file_handler = RotatingFileHandler('smart_monitor.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8')
- file_handler.setFormatter(formatter)
- # 控制台日志处理器
- console_handler = logging.StreamHandler()
- console_handler.setFormatter(formatter)
- # 添加处理器
- logger.addHandler(file_handler)
- logger.addHandler(console_handler)
- class SmartCIPMonitor:
- """
- CIP监控器(支持分段监控)
-
- 核心功能:
- 1. 监控config.json中各机组段的actual_time字段
- 2. 检测到时间变化后,自动触发CIP预测
- 3. 保存predicted_time到配置文件(仅用于记录)
- 4. 发送预测结果到回调接口
-
- 属性:
- config_path: 配置文件路径
- config: 配置文件内容
- running: 监控运行状态
- monitor_thread: 监控线程
- unit_names: 机组段名称列表(如['RO1-一段', 'RO1-二段', ...])
- last_seen_actual_times: 记录上次看到的各机组段actual_time,用于检测变化
- """
-
- def __init__(self):
- """
- 初始化监控系统
-
- 初始化流程:
- 1. 设置配置文件路径
- 2. 加载配置文件
- 3. 初始化监控基准时间
- """
- self.config_path = os.path.join(os.path.dirname(__file__), 'config.json')
- self.config = None
- self.running = False
- self.monitor_thread = None
- # 支持分段监控:每个机组包含一段和二段
- self.unit_names = [
- 'RO1-一段', 'RO1-二段',
- 'RO2-一段', 'RO2-二段',
- 'RO3-一段', 'RO3-二段',
- 'RO4-一段', 'RO4-二段'
- ]
-
- # 记录上次看到的actual_time,用于检测变化
- # 格式:{'RO1-一段': '2025-12-01 12:00:00', 'RO1-二段': '...', ...}
- self.last_seen_actual_times = {}
-
- if not self.load_config():
- logger.error("配置文件加载失败")
- else:
- self.initialize_last_seen_times()
-
- def load_config(self):
- """
- 加载配置文件
-
- Returns:
- bool: 加载成功返回True
- """
- try:
- with open(self.config_path, 'r', encoding='utf-8') as f:
- self.config = json.load(f)
- return True
- except Exception as e:
- logger.error(f"配置文件加载失败: {e}")
- return False
-
- def save_config(self):
- """
- 保存配置文件
-
- Returns:
- bool: 保存成功返回True
- """
- try:
- with open(self.config_path, 'w', encoding='utf-8') as f:
- json.dump(self.config, f, ensure_ascii=False, indent=2)
- return True
- except Exception as e:
- logger.error(f"配置文件保存失败: {e}")
- return False
-
- def initialize_last_seen_times(self):
- """
- 初始化监控基准时间(支持分段)
-
- 功能:
- 读取config.json中各机组段当前的actual_time作为监控基准
- 但是:只记录未来的时间,已过期的时间不记录,以便首次检查时能触发预测
-
- 说明:
- - last_seen_actual_times用于防止重复处理相同的时间
- - 支持分段配置(如'RO1-一段', 'RO1-二段')
- - 支持新旧配置格式兼容
- - 已过期的时间(< 当前时间)不记录,确保首次检查时能被检测为"变化"
- """
- if not self.config or 'cip_times' not in self.config:
- return
-
- current_time = datetime.now()
-
- for unit_name in self.unit_names:
- if unit_name in self.config['cip_times']:
- unit_data = self.config['cip_times'][unit_name]
- actual_time_str = None
-
- if isinstance(unit_data, dict) and 'actual_time' in unit_data:
- # 新格式:{'actual_time': '...', 'predicted_time': '...'}
- actual_time_str = unit_data['actual_time']
- elif isinstance(unit_data, str):
- # 兼容旧格式:直接是时间字符串
- actual_time_str = unit_data
-
- # 只记录未来的时间,已过期的时间不记录
- if actual_time_str:
- actual_time_dt = self.parse_time_string(actual_time_str)
- if actual_time_dt and actual_time_dt >= current_time:
- # 未来时间:记录基准,避免触发预测
- self.last_seen_actual_times[unit_name] = actual_time_str
- logger.info(f"{unit_name}: 未来时间,不触发预测")
- elif actual_time_dt:
- # 已过期时间:不记录基准,让首次检查时能检测到变化
- logger.info(f"{unit_name}: 已过期,待触发预测")
- # 如果为None则不记录
-
- def parse_time_string(self, time_str):
- """
- 解析时间字符串为datetime对象
-
- Args:
- time_str: str,时间字符串,格式'YYYY-MM-DD HH:MM:SS'
-
- Returns:
- datetime: 解析后的datetime对象,解析失败返回None
- """
- if not time_str or not isinstance(time_str, str):
- return None
-
- try:
- return datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
- except Exception as e:
- logger.warning(f"时间字符串解析失败: {time_str}, 错误: {e}")
- return None
-
- def check_for_changes(self):
- """
- 检查actual_time是否有变化且小于当前时间(支持分段)
-
- 功能:
- 比对当前配置文件中的actual_time和上次记录的值
- 发现不同且小于当前时间则判定为有变化,需要触发预测
-
- Returns:
- list: 有变化的机组段列表,格式 [(unit_name, new_actual_time), ...]
- 例如:[('RO1-一段', '2025-10-12 12:00:00'), ('RO1-二段', '2025-10-15 08:00:00')]
-
- 说明:
- - 只有actual_time变化才会触发
- - 必须满足 actual_time < 当前时间
- - predicted_time的变化不会触发
- - 支持分段独立监控(每个段可以有不同的actual_time)
- """
- changes = []
- current_time = datetime.now()
-
- if not self.config or 'cip_times' not in self.config:
- return changes
-
- for unit_name in self.unit_names:
- if unit_name in self.config['cip_times']:
- unit_data = self.config['cip_times'][unit_name]
- current_actual_time = None
-
- if isinstance(unit_data, dict) and 'actual_time' in unit_data:
- # 新格式:读取actual_time字段
- current_actual_time = unit_data['actual_time']
- elif isinstance(unit_data, str):
- # 兼容旧格式:整个值就是时间
- current_actual_time = unit_data
-
- # 检查是否有变化
- if current_actual_time:
- last_actual_time = self.last_seen_actual_times.get(unit_name)
-
- # 条件1:actual_time 发生了变化
- if last_actual_time != current_actual_time:
- # 条件2:actual_time 小于当前时间
- actual_time_dt = self.parse_time_string(current_actual_time)
-
- if actual_time_dt is None:
- logger.warning(f"{unit_name} actual_time 格式无效: {current_actual_time}")
- continue
-
- if actual_time_dt < current_time:
- changes.append((unit_name, current_actual_time))
- logger.info(f"{unit_name}: 检测到变化,触发预测 ({current_actual_time})")
- else:
- logger.info(f"{unit_name}: 未来时间,不触发 ({current_actual_time})")
- # 更新监控基准,避免重复提示
- self.last_seen_actual_times[unit_name] = current_actual_time
-
- return changes
-
- def extract_unit_base_name(self, unit_name):
- """
- 从分段机组名称中提取基础机组号
-
- Args:
- unit_name: str,分段机组名称,如'RO1-一段'
-
- Returns:
- str: 基础机组号,如'RO1'
-
- 示例:
- 'RO1-一段' -> 'RO1'
- 'RO2-二段' -> 'RO2'
- """
- if '-' in unit_name:
- return unit_name.split('-')[0]
- return unit_name
-
- def predict_and_update_unit(self, unit_name, start_date_str):
- """
- 预测并只处理触发段的结果(严格控制:谁触发只处理谁)
-
- 功能:
- 1. 调用预测函数(会预测整个机组的所有段,这是模型限制)
- 2. 只提取触发段的结果(其他段的结果丢弃)
- 3. 只保存触发段的predicted_time到配置文件
- 4. 只发送触发段的回调
- 5. 其他未触发的段等它们自己触发后再处理
-
- Args:
- unit_name: str,触发的机组段名称,如'RO1-一段', 'RO2-二段'
- start_date_str: str,预测起始时间(即actual_time),格式'YYYY-MM-DD HH:MM:SS'
-
- Returns:
- bool: 预测成功返回True,失败返回False
-
- 说明:
- - 严格的分段控制:只处理触发段,其他段不保存、不发送
- - predicted_time仅用于记录,不影响下次预测的触发
- - 下次预测只由actual_time的变化触发
-
- 示例流程:
- 1. RO1-一段触发 → 预测RO1 → 只处理RO1-一段 → RO1-二段丢弃
- 2. 稍后RO1-二段触发 → 预测RO1 → 只处理RO1-二段 → RO1-一段丢弃(已处理过)
- """
- try:
- logger.info(f"[{unit_name}] 开始预测 (基于 {start_date_str})")
-
- # 步骤1:提取基础机组号(用于unit_filter参数)
- base_unit_name = self.extract_unit_base_name(unit_name)
-
- # 步骤2:调用预测逻辑(会预测整个机组的所有段,这是模型限制)
- # 禁用 main() 内部的回调发送,由 smart_monitor 统一管理
- logger.info(f"[{unit_name}] 执行预测 (unit_filter={base_unit_name},会预测整个机组)")
- result_df = run_cip_analysis(
- strategy=3,
- start_date=start_date_str,
- unit_filter=base_unit_name,
- separate_stages=True, # 明确指定分段输出
- send_callback=False # 禁用内部回调,避免重复发送
- )
-
- if result_df.empty:
- logger.warning(f"[{unit_name}] 预测失败: 无结果")
- return False
-
- # 显示预测得到的所有段
- all_predicted_stages = result_df['机组类型'].tolist()
- logger.info(f"[{unit_name}] 预测完成,得到 {len(all_predicted_stages)} 个段: {all_predicted_stages}")
-
- # 步骤3:只提取触发段的结果(关键:只处理触发的段!)
- logger.info(f"[{unit_name}] 只提取触发段 '{unit_name}' 的结果,其他段丢弃")
- unit_result = result_df[result_df['机组类型'] == unit_name]
- if unit_result.empty:
- logger.warning(f"[{unit_name}] 预测失败: 结果中无此机组段")
- logger.info(f"[{unit_name}] 可用的机组类型: {result_df['机组类型'].tolist()}")
- return False
-
- cip_time = unit_result.iloc[0]['CIP时机']
- if pd.isna(cip_time):
- logger.warning(f"[{unit_name}] 预测失败: CIP时机为空")
- return False
-
- # 步骤4:只保存触发段的预测时间到配置文件(其他段不保存)
- predicted_time_str = cip_time.strftime('%Y-%m-%d %H:%M:%S')
- logger.info(f"[{unit_name}] 只保存触发段的 predicted_time: {predicted_time_str}")
-
- if unit_name not in self.config['cip_times']:
- # 配置中不存在该机组段,创建新记录
- self.config['cip_times'][unit_name] = {
- 'actual_time': start_date_str,
- 'predicted_time': predicted_time_str
- }
- else:
- # 更新预测时间,保持actual_time不变
- if not isinstance(self.config['cip_times'][unit_name], dict):
- # 兼容旧格式:转换为新格式
- self.config['cip_times'][unit_name] = {
- 'actual_time': self.config['cip_times'][unit_name],
- 'predicted_time': predicted_time_str
- }
- else:
- # 只更新predicted_time,不修改actual_time
- self.config['cip_times'][unit_name]['predicted_time'] = predicted_time_str
-
- if not self.save_config():
- logger.error(f"[{unit_name}] 配置保存失败")
- return False
-
- logger.info(f"[{unit_name}] 预测成功 → {predicted_time_str}")
-
- # 步骤5:只发送触发段的回调(其他段不发送)
- logger.info(f"[{unit_name}] 只发送触发段的回调")
- send_decision_to_callback(unit_result)
-
- # 明确说明哪些段被丢弃了
- other_stages = [s for s in all_predicted_stages if s != unit_name]
- if other_stages:
- logger.info(f"[{unit_name}] 其他段的结果已丢弃,等它们自己触发后再处理: {other_stages}")
-
- return True
-
- except Exception as e:
- logger.error(f"[{unit_name}] 预测失败: {e}", exc_info=True)
- return False
-
- def process_changes(self, changes):
- """
- 处理检测到的actual_time变化
-
- 功能:
- 遍历所有检测到的变化,逐个执行预测并更新监控基准
-
- Args:
- changes: list,变化列表,格式 [(unit_name, new_actual_time), ...]
- 例如:[('RO1', '2025-10-12 12:00:00')]
-
- 处理流程:
- 1. 执行预测:调用predict_and_update_unit
- 2. 更新监控基准:将新的actual_time记录到last_seen_actual_times(无论成功失败)
- 3. 重新加载配置:确保获取最新数据
- 4. 短暂延时:避免频繁操作
- """
- for unit_name, new_actual_time in changes:
- # 执行预测
- success = self.predict_and_update_unit(unit_name, new_actual_time)
-
- # 无论预测成功还是失败,都要更新监控基准,避免无限循环
- self.last_seen_actual_times[unit_name] = new_actual_time
-
- if not success:
- logger.error(f"[{unit_name}] 处理失败,已记录时间避免重复")
-
- # 重新加载配置,确保有最新数据
- self.load_config()
- time.sleep(2) # 短暂延时,避免频繁操作
-
- def monitor_loop(self):
- """
- 监控主循环
-
- 功能:
- 1. 定期重新加载config.json(每5秒)
- 2. 检查actual_time是否有变化且小于当前时间
- 3. 发现满足条件的变化后自动触发预测
- 4. 发送预测结果到回调接口
-
- 监控频率:
- - 每5秒检查一次配置文件
- - 发现变化立即处理
- - 处理失败不影响后续监控
-
- 触发条件(同时满足):
- - actual_time 发生了变化
- - actual_time < 当前时间(已过期)
-
- 说明:
- - 只监控actual_time字段的变化
- - predicted_time的变化不会触发任何操作
- - 相同的actual_time不会重复触发
- - 未来时间的actual_time不会触发预测
- """
- logger.info("监控启动 (每5秒检查一次,触发条件: actual_time变化且<当前时间)")
-
- while self.running:
- try:
- # 步骤1:重新加载配置
- if not self.load_config():
- logger.error("配置加载失败,10秒后重试...")
- time.sleep(10)
- continue
-
- # 步骤2:检查是否有变化
- changes = self.check_for_changes()
-
- if changes:
- # 发现变化,立即处理
- self.process_changes(changes)
-
- # 步骤3:等待一段时间后再次检查
- time.sleep(5) # 每5秒检查一次配置文件
-
- except Exception as e:
- logger.error(f"监控循环出错: {e}", exc_info=True)
- time.sleep(10) # 出错后等待10秒再继续
-
- def start(self):
- """
- 启动监控系统
- """
- if self.running:
- logger.warning("监控系统已在运行")
- return
-
- logger.info("CIP监控系统启动(监控 config.json 中的 actual_time 变化)")
-
- # 启动监控线程
- self.running = True
- self.monitor_thread = threading.Thread(target=self.monitor_loop, daemon=True)
- self.monitor_thread.start()
- try:
- while self.running:
- time.sleep(1)
- except KeyboardInterrupt:
- self.stop()
-
- def stop(self):
- """
- 停止监控系统
- """
- logger.info("停止监控系统...")
- self.running = False
-
- if self.monitor_thread and self.monitor_thread.is_alive():
- self.monitor_thread.join(timeout=5)
-
- logger.info("监控系统已停止")
- def main():
- """
- 主函数
-
- 功能:启动监控系统
- """
- monitor = SmartCIPMonitor()
-
- try:
- monitor.start()
- except Exception as e:
- logger.error(f"系统运行出错: {e}", exc_info=True)
- finally:
- if monitor.running:
- monitor.stop()
- if __name__ == '__main__':
- main()
|