| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396 |
- # -*- coding: utf-8 -*-
- """
- CIP监控系统 - 配置文件监控模式
- 功能:
- 1. 监控config.json中actual_time字段的变化
- 2. 检测到变化后,基于actual_time执行CIP预测
- 3. 将预测结果保存到predicted_time(仅用于记录)
- 4. 通过回调接口发送预测结果到决策系统
- 工作流程:
- 修改actual_time → 检测变化 → 执行预测 → 保存predicted_time → 发送回调 → 继续监控
- """
- import os
- import json
- import time
- import threading
- import pandas as pd
- import logging
- from logging.handlers import RotatingFileHandler
- from datetime import datetime
- from main_simple import main as run_cip_analysis
- from main_simple import send_decision_to_callback
- # 日志系统配置
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.INFO)
- # 日志输出格式
- formatter = logging.Formatter(
- '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S'
- )
- # 文件日志处理器,单个文件最大5MB,保留3个备份
- file_handler = RotatingFileHandler('smart_monitor.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8')
- file_handler.setFormatter(formatter)
- # 控制台日志处理器
- console_handler = logging.StreamHandler()
- console_handler.setFormatter(formatter)
- # 添加处理器
- logger.addHandler(file_handler)
- logger.addHandler(console_handler)
- class SmartCIPMonitor:
- """
- CIP监控器
-
- 核心功能:
- 1. 监控config.json中各机组的actual_time字段
- 2. 检测到时间变化后,自动触发CIP预测
- 3. 保存predicted_time到配置文件(仅用于记录)
- 4. 发送预测结果到回调接口
-
- 属性:
- config_path: 配置文件路径
- config: 配置文件内容
- running: 监控运行状态
- monitor_thread: 监控线程
- unit_names: 机组名称列表
- last_seen_actual_times: 记录上次看到的各机组actual_time,用于检测变化
- """
-
- def __init__(self):
- """
- 初始化监控系统
-
- 初始化流程:
- 1. 设置配置文件路径
- 2. 加载配置文件
- 3. 初始化监控基准时间
- """
- self.config_path = os.path.join(os.path.dirname(__file__), 'config.json')
- self.config = None
- self.running = False
- self.monitor_thread = None
- self.unit_names = ['RO1', 'RO2', 'RO3', 'RO4']
-
- # 记录上次看到的actual_time,用于检测变化
- # 格式:{'RO1': '2025-12-01 12:00:00', 'RO2': '...'}
- self.last_seen_actual_times = {}
-
- if not self.load_config():
- logger.error("配置文件加载失败")
- else:
- self.initialize_last_seen_times()
-
- def load_config(self):
- """
- 加载配置文件
-
- Returns:
- bool: 加载成功返回True
- """
- try:
- with open(self.config_path, 'r', encoding='utf-8') as f:
- self.config = json.load(f)
- return True
- except Exception as e:
- logger.error(f"配置文件加载失败: {e}")
- return False
-
- def save_config(self):
- """
- 保存配置文件
-
- Returns:
- bool: 保存成功返回True
- """
- try:
- with open(self.config_path, 'w', encoding='utf-8') as f:
- json.dump(self.config, f, ensure_ascii=False, indent=2)
- return True
- except Exception as e:
- logger.error(f"配置文件保存失败: {e}")
- return False
-
- def initialize_last_seen_times(self):
- """
- 初始化监控基准时间
-
- 功能:
- 读取config.json中各机组当前的actual_time作为监控基准
- 后续只有当actual_time与此基准不同时才触发预测
-
- 说明:
- - last_seen_actual_times用于防止重复处理相同的时间
- - 支持新旧配置格式兼容
- """
- if not self.config or 'cip_times' not in self.config:
- return
-
- for unit_name in self.unit_names:
- if unit_name in self.config['cip_times']:
- unit_data = self.config['cip_times'][unit_name]
- if isinstance(unit_data, dict) and 'actual_time' in unit_data:
- # 新格式:{'actual_time': '...', 'predicted_time': '...'}
- self.last_seen_actual_times[unit_name] = unit_data['actual_time']
- elif isinstance(unit_data, str):
- # 兼容旧格式:直接是时间字符串
- self.last_seen_actual_times[unit_name] = unit_data
-
- def check_for_changes(self):
- """
- 检查actual_time是否有变化
-
- 功能:
- 比对当前配置文件中的actual_time和上次记录的值
- 发现不同则判定为有变化,需要触发预测
-
- Returns:
- list: 有变化的机组列表,格式 [(unit_name, new_actual_time), ...]
- 例如:[('RO1', '2025-10-12 12:00:00'), ('RO3', '2025-10-15 08:00:00')]
-
- 说明:
- - 只有actual_time变化才会触发
- - predicted_time的变化不会触发
- """
- changes = []
-
- if not self.config or 'cip_times' not in self.config:
- return changes
-
- for unit_name in self.unit_names:
- if unit_name in self.config['cip_times']:
- unit_data = self.config['cip_times'][unit_name]
- current_actual_time = None
-
- if isinstance(unit_data, dict) and 'actual_time' in unit_data:
- # 新格式:读取actual_time字段
- current_actual_time = unit_data['actual_time']
- elif isinstance(unit_data, str):
- # 兼容旧格式:整个值就是时间
- current_actual_time = unit_data
-
- # 检查是否有变化
- if current_actual_time:
- last_actual_time = self.last_seen_actual_times.get(unit_name)
- if last_actual_time != current_actual_time:
- changes.append((unit_name, current_actual_time))
- logger.info(f"检测到{unit_name}实际CIP时间变化: 旧值={last_actual_time}, 新值={current_actual_time}")
-
- return changes
-
- def predict_and_update_unit(self, unit_name, start_date_str):
- """
- 预测单个机组并保存结果到配置
-
- 功能:
- 1. 调用main_simple.py中的预测函数
- 2. 提取预测结果中的CIP时机
- 3. 保存predicted_time到配置文件(仅用于记录,不参与任何计算)
- 4. 发送预测结果到回调接口
-
- Args:
- unit_name: str,机组名称,如'RO1', 'RO2', 'RO3', 'RO4'
- start_date_str: str,预测起始时间(即actual_time),格式'YYYY-MM-DD HH:MM:SS'
-
- Returns:
- bool: 预测成功返回True,失败返回False
-
- 说明:
- - predicted_time仅用于记录,不影响下次预测的触发
- - 下次预测只由actual_time的变化触发
- """
- try:
- logger.info(f"[{unit_name}] 开始预测,基于实际CIP时间: {start_date_str}")
-
- # 步骤1:调用预测逻辑
- result_df = run_cip_analysis(strategy=3, start_date=start_date_str, unit_filter=unit_name)
-
- if result_df.empty:
- logger.warning(f"{unit_name} 预测失败: 无结果")
- return False
-
- # 步骤2:提取该机组的结果
- unit_result = result_df[result_df['机组类型'] == unit_name]
- if unit_result.empty:
- logger.warning(f"{unit_name} 预测失败: 结果中无此机组")
- return False
-
- cip_time = unit_result.iloc[0]['CIP时机']
- if pd.isna(cip_time):
- logger.warning(f"{unit_name} 预测失败: CIP时机为空")
- return False
-
- # 步骤3:保存预测时间到配置文件(仅用于记录)
- predicted_time_str = cip_time.strftime('%Y-%m-%d %H:%M:%S')
-
- if unit_name not in self.config['cip_times']:
- # 配置中不存在该机组,创建新记录
- self.config['cip_times'][unit_name] = {
- 'actual_time': start_date_str,
- 'predicted_time': predicted_time_str
- }
- else:
- # 更新预测时间,保持actual_time不变
- if not isinstance(self.config['cip_times'][unit_name], dict):
- # 兼容旧格式:转换为新格式
- self.config['cip_times'][unit_name] = {
- 'actual_time': self.config['cip_times'][unit_name],
- 'predicted_time': predicted_time_str
- }
- else:
- # 只更新predicted_time,不修改actual_time
- self.config['cip_times'][unit_name]['predicted_time'] = predicted_time_str
-
- if not self.save_config():
- logger.error(f"[{unit_name}] 配置保存失败")
- return False
-
- logger.info(f"[{unit_name}] 预测成功 → {predicted_time_str}")
-
- # 步骤4:发送预测结果到回调接口
- try:
- if send_decision_to_callback(result_df):
- logger.info(f"[{unit_name}] 回调发送成功")
- else:
- logger.error(f"[{unit_name}] 回调发送失败")
- except Exception as callback_error:
- logger.error(f"[{unit_name}] 回调发送异常: {callback_error}")
-
- return True
-
- except Exception as e:
- logger.error(f"[{unit_name}] 预测失败: {e}", exc_info=True)
- return False
-
- def process_changes(self, changes):
- """
- 处理检测到的actual_time变化
-
- 功能:
- 遍历所有检测到的变化,逐个执行预测并更新监控基准
-
- Args:
- changes: list,变化列表,格式 [(unit_name, new_actual_time), ...]
- 例如:[('RO1', '2025-10-12 12:00:00')]
-
- 处理流程:
- 1. 执行预测:调用predict_and_update_unit
- 2. 更新监控基准:将新的actual_time记录到last_seen_actual_times
- 3. 重新加载配置:确保获取最新数据
- 4. 短暂延时:避免频繁操作
- """
- for unit_name, new_actual_time in changes:
- # 执行预测
- if self.predict_and_update_unit(unit_name, new_actual_time):
- # 更新监控基准,防止重复处理
- self.last_seen_actual_times[unit_name] = new_actual_time
- logger.info(f"[{unit_name}] 处理完成")
- else:
- logger.error(f"[{unit_name}] 处理失败")
-
- # 重新加载配置,确保有最新数据
- self.load_config()
- time.sleep(2) # 短暂延时,避免频繁操作
-
- def monitor_loop(self):
- """
- 监控主循环
-
- 功能:
- 1. 定期重新加载config.json(每5秒)
- 2. 检查actual_time是否有变化
- 3. 发现变化后自动触发预测
- 4. 发送预测结果到回调接口
-
- 监控频率:
- - 每5秒检查一次配置文件
- - 发现变化立即处理
- - 处理失败不影响后续监控
-
- 说明:
- - 只监控actual_time字段的变化
- - predicted_time的变化不会触发任何操作
- - 相同的actual_time不会重复触发
- """
- logger.info("开始监控 config.json 变化(每5秒检查一次)")
-
- while self.running:
- try:
- # 步骤1:重新加载配置
- if not self.load_config():
- logger.error("配置加载失败,10秒后重试...")
- time.sleep(10)
- continue
-
- # 步骤2:检查是否有变化
- changes = self.check_for_changes()
-
- if changes:
- # 发现变化,立即处理
- self.process_changes(changes)
-
- # 步骤3:等待一段时间后再次检查
- time.sleep(5) # 每5秒检查一次配置文件
-
- except Exception as e:
- logger.error(f"监控循环出错: {e}", exc_info=True)
- time.sleep(10) # 出错后等待10秒再继续
-
- def start(self):
- """
- 启动监控系统
- """
- if self.running:
- logger.warning("监控系统已在运行")
- return
-
- logger.info("CIP监控系统启动(监控 config.json 中的 actual_time 变化)")
-
- # 启动监控线程
- self.running = True
- self.monitor_thread = threading.Thread(target=self.monitor_loop, daemon=True)
- self.monitor_thread.start()
- try:
- while self.running:
- time.sleep(1)
- except KeyboardInterrupt:
- self.stop()
-
- def stop(self):
- """
- 停止监控系统
- """
- logger.info("停止监控系统...")
- self.running = False
-
- if self.monitor_thread and self.monitor_thread.is_alive():
- self.monitor_thread.join(timeout=5)
-
- logger.info("监控系统已停止")
- def main():
- """
- 主函数
-
- 功能:启动监控系统
- """
- monitor = SmartCIPMonitor()
-
- try:
- monitor.start()
- except Exception as e:
- logger.error(f"系统运行出错: {e}", exc_info=True)
- finally:
- if monitor.running:
- monitor.stop()
- if __name__ == '__main__':
- main()
|