| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418 |
- # -*- coding: utf-8 -*-
- """
- CIP监控系统 - 配置文件监控模式
- 功能:
- 1. 监控config.json中actual_time字段的变化
- 2. 检测到变化后,基于actual_time执行CIP预测
- 3. 将预测结果保存到predicted_time(仅用于记录)
- 4. 通过回调接口发送预测结果到决策系统
- 工作流程:
- 修改actual_time → 检测变化 → 执行预测 → 保存predicted_time → 发送回调 → 继续监控
- """
- import os
- import json
- import time
- import threading
- import pandas as pd
- from datetime import datetime
- from main_simple import main as run_cip_analysis
- from main_simple import send_decision_to_callback
- class SmartCIPMonitor:
- """
- CIP监控器
-
- 核心功能:
- 1. 监控config.json中各机组的actual_time字段
- 2. 检测到时间变化后,自动触发CIP预测
- 3. 保存predicted_time到配置文件(仅用于记录)
- 4. 发送预测结果到回调接口
-
- 属性:
- config_path: 配置文件路径
- config: 配置文件内容
- running: 监控运行状态
- monitor_thread: 监控线程
- unit_names: 机组名称列表
- last_seen_actual_times: 记录上次看到的各机组actual_time,用于检测变化
- """
-
- def __init__(self):
- """
- 初始化监控系统
-
- 初始化流程:
- 1. 设置配置文件路径
- 2. 加载配置文件
- 3. 初始化监控基准时间
- """
- self.config_path = os.path.join(os.path.dirname(__file__), 'config.json')
- self.config = None
- self.running = False
- self.monitor_thread = None
- self.unit_names = ['RO1', 'RO2', 'RO3', 'RO4']
-
- # 记录上次看到的actual_time,用于检测变化
- # 格式:{'RO1': '2025-12-01 12:00:00', 'RO2': '...'}
- self.last_seen_actual_times = {}
-
- print("初始化CIP监控系统...")
- if not self.load_config():
- print("配置加载失败")
- else:
- print("配置加载成功")
- self.initialize_last_seen_times()
-
- def load_config(self):
- """
- 加载配置文件
-
- Returns:
- bool: 加载成功返回True
- """
- try:
- with open(self.config_path, 'r', encoding='utf-8') as f:
- self.config = json.load(f)
- return True
- except Exception as e:
- print(f"配置文件加载失败: {e}")
- return False
-
- def save_config(self):
- """
- 保存配置文件
-
- Returns:
- bool: 保存成功返回True
- """
- try:
- with open(self.config_path, 'w', encoding='utf-8') as f:
- json.dump(self.config, f, ensure_ascii=False, indent=2)
- return True
- except Exception as e:
- print(f"配置文件保存失败: {e}")
- return False
-
- def initialize_last_seen_times(self):
- """
- 初始化监控基准时间
-
- 功能:
- 读取config.json中各机组当前的actual_time作为监控基准
- 后续只有当actual_time与此基准不同时才触发预测
-
- 说明:
- - last_seen_actual_times用于防止重复处理相同的时间
- - 支持新旧配置格式兼容
- """
- if not self.config or 'cip_times' not in self.config:
- return
-
- for unit_name in self.unit_names:
- if unit_name in self.config['cip_times']:
- unit_data = self.config['cip_times'][unit_name]
- if isinstance(unit_data, dict) and 'actual_time' in unit_data:
- # 新格式:{'actual_time': '...', 'predicted_time': '...'}
- self.last_seen_actual_times[unit_name] = unit_data['actual_time']
- elif isinstance(unit_data, str):
- # 兼容旧格式:直接是时间字符串
- self.last_seen_actual_times[unit_name] = unit_data
-
- print(f"已初始化监控基准时间: {self.last_seen_actual_times}")
-
- def check_for_changes(self):
- """
- 检查actual_time是否有变化
-
- 功能:
- 比对当前配置文件中的actual_time和上次记录的值
- 发现不同则判定为有变化,需要触发预测
-
- Returns:
- list: 有变化的机组列表,格式 [(unit_name, new_actual_time), ...]
- 例如:[('RO1', '2025-10-12 12:00:00'), ('RO3', '2025-10-15 08:00:00')]
-
- 说明:
- - 只有actual_time变化才会触发
- - predicted_time的变化不会触发
- """
- changes = []
-
- if not self.config or 'cip_times' not in self.config:
- return changes
-
- for unit_name in self.unit_names:
- if unit_name in self.config['cip_times']:
- unit_data = self.config['cip_times'][unit_name]
- current_actual_time = None
-
- if isinstance(unit_data, dict) and 'actual_time' in unit_data:
- # 新格式:读取actual_time字段
- current_actual_time = unit_data['actual_time']
- elif isinstance(unit_data, str):
- # 兼容旧格式:整个值就是时间
- current_actual_time = unit_data
-
- # 检查是否有变化
- if current_actual_time:
- last_actual_time = self.last_seen_actual_times.get(unit_name)
- if last_actual_time != current_actual_time:
- changes.append((unit_name, current_actual_time))
- print(f"\n检测到{unit_name}实际CIP时间变化:")
- print(f" 旧值: {last_actual_time}")
- print(f" 新值: {current_actual_time}")
-
- return changes
-
- def predict_and_update_unit(self, unit_name, start_date_str):
- """
- 预测单个机组并保存结果到配置
-
- 功能:
- 1. 调用main_simple.py中的预测函数
- 2. 提取预测结果中的CIP时机
- 3. 保存predicted_time到配置文件(仅用于记录,不参与任何计算)
- 4. 发送预测结果到回调接口
-
- Args:
- unit_name: str,机组名称,如'RO1', 'RO2', 'RO3', 'RO4'
- start_date_str: str,预测起始时间(即actual_time),格式'YYYY-MM-DD HH:MM:SS'
-
- Returns:
- bool: 预测成功返回True,失败返回False
-
- 说明:
- - predicted_time仅用于记录,不影响下次预测的触发
- - 下次预测只由actual_time的变化触发
- """
- try:
- print(f"\n{'='*50}")
- print(f"开始预测 {unit_name}")
- print(f"基于实际CIP时间: {start_date_str}")
- print(f"{'='*50}")
-
- # 步骤1:调用预测逻辑
- result_df = run_cip_analysis(strategy=3, start_date=start_date_str, unit_filter=unit_name)
-
- if result_df.empty:
- print(f"{unit_name} 预测失败: 无结果")
- return False
-
- # 步骤2:提取该机组的结果
- unit_result = result_df[result_df['机组类型'] == unit_name]
- if unit_result.empty:
- print(f"{unit_name} 预测失败: 结果中无此机组")
- return False
-
- cip_time = unit_result.iloc[0]['CIP时机']
- if pd.isna(cip_time):
- print(f"{unit_name} 预测失败: CIP时机为空")
- return False
-
- # 步骤3:保存预测时间到配置文件(仅用于记录)
- predicted_time_str = cip_time.strftime('%Y-%m-%d %H:%M:%S')
-
- if unit_name not in self.config['cip_times']:
- # 配置中不存在该机组,创建新记录
- self.config['cip_times'][unit_name] = {
- 'actual_time': start_date_str,
- 'predicted_time': predicted_time_str
- }
- else:
- # 更新预测时间,保持actual_time不变
- if not isinstance(self.config['cip_times'][unit_name], dict):
- # 兼容旧格式:转换为新格式
- self.config['cip_times'][unit_name] = {
- 'actual_time': self.config['cip_times'][unit_name],
- 'predicted_time': predicted_time_str
- }
- else:
- # 只更新predicted_time,不修改actual_time
- self.config['cip_times'][unit_name]['predicted_time'] = predicted_time_str
-
- if not self.save_config():
- print(f"✗ {unit_name} 配置保存失败")
- return False
-
- print(f"✓ {unit_name} 预测成功")
- print(f" 实际CIP时间: {start_date_str}")
- print(f" 预测CIP时间: {predicted_time_str} (仅用于记录)")
-
- # 步骤4:发送预测结果到回调接口
- print(f"\n发送预测结果到回调接口...")
- try:
- if send_decision_to_callback(result_df):
- print(f"✓ {unit_name} 回调发送成功")
- else:
- print(f"✗ {unit_name} 回调发送失败")
- except Exception as callback_error:
- print(f"✗ {unit_name} 回调发送异常: {callback_error}")
-
- return True
-
- except Exception as e:
- print(f"✗ {unit_name} 预测失败: {e}")
- import traceback
- traceback.print_exc()
- return False
-
- def process_changes(self, changes):
- """
- 处理检测到的actual_time变化
-
- 功能:
- 遍历所有检测到的变化,逐个执行预测并更新监控基准
-
- Args:
- changes: list,变化列表,格式 [(unit_name, new_actual_time), ...]
- 例如:[('RO1', '2025-10-12 12:00:00')]
-
- 处理流程:
- 1. 执行预测:调用predict_and_update_unit
- 2. 更新监控基准:将新的actual_time记录到last_seen_actual_times
- 3. 重新加载配置:确保获取最新数据
- 4. 短暂延时:避免频繁操作
- """
- for unit_name, new_actual_time in changes:
- print(f"\n{'='*60}")
- print(f"处理 {unit_name} 的实际CIP时间变化")
- print(f"{'='*60}")
-
- # 执行预测
- if self.predict_and_update_unit(unit_name, new_actual_time):
- # 更新监控基准,防止重复处理
- self.last_seen_actual_times[unit_name] = new_actual_time
- print(f"\n✓ {unit_name} 处理完成,已更新监控基准")
- print(f" 新的监控基准: {new_actual_time}")
- else:
- print(f"\n✗ {unit_name} 处理失败")
-
- # 重新加载配置,确保有最新数据
- self.load_config()
- time.sleep(2) # 短暂延时,避免频繁操作
-
- def monitor_loop(self):
- """
- 监控主循环
-
- 功能:
- 1. 定期重新加载config.json(每5秒)
- 2. 检查actual_time是否有变化
- 3. 发现变化后自动触发预测
- 4. 发送预测结果到回调接口
-
- 监控频率:
- - 每5秒检查一次配置文件
- - 发现变化立即处理
- - 处理失败不影响后续监控
-
- 说明:
- - 只监控actual_time字段的变化
- - predicted_time的变化不会触发任何操作
- - 相同的actual_time不会重复触发
- """
- print("\n" + "="*60)
- print("开始监控config.json的变化")
- print("="*60)
- print("监控说明:")
- print(" - 监控字段: cip_times.*.actual_time")
- print(" - 检查频率: 每5秒")
- print(" - 触发条件: actual_time发生变化")
- print(" - 自动操作: 执行预测 → 保存predicted_time → 发送回调")
- print("="*60)
-
- while self.running:
- try:
- # 步骤1:重新加载配置
- if not self.load_config():
- print("✗ 配置加载失败,10秒后重试...")
- time.sleep(10)
- continue
-
- # 步骤2:检查是否有变化
- changes = self.check_for_changes()
-
- if changes:
- # 发现变化,立即处理
- print(f"\n检测到 {len(changes)} 个机组的actual_time变化")
- self.process_changes(changes)
-
- # 步骤3:等待一段时间后再次检查
- time.sleep(5) # 每5秒检查一次配置文件
-
- except Exception as e:
- print(f"\n✗ 监控循环出错: {e}")
- import traceback
- traceback.print_exc()
- time.sleep(10) # 出错后等待10秒再继续
-
- def start(self):
- """
- 启动监控系统
- """
- if self.running:
- print("监控系统已在运行")
- return
-
- print("\n启动CIP监控系统")
- print("="*60)
- print("监控模式: 检测config.json中actual_time的变化")
- print("工作流程:")
- print(" 1. 手动修改config.json中的actual_time")
- print(" 2. 系统自动检测到变化")
- print(" 3. 基于actual_time执行预测")
- print(" 4. 保存predicted_time(仅用于记录)")
- print(" 5. 继续监控下一次变化")
- print("="*60)
-
- # 启动监控线程
- self.running = True
- self.monitor_thread = threading.Thread(target=self.monitor_loop, daemon=True)
- self.monitor_thread.start()
-
- print("\n监控系统已启动,按Ctrl+C停止")
- try:
- while self.running:
- time.sleep(1)
- except KeyboardInterrupt:
- self.stop()
-
- def stop(self):
- """
- 停止监控系统
- """
- print("\n停止监控系统...")
- self.running = False
-
- if self.monitor_thread and self.monitor_thread.is_alive():
- self.monitor_thread.join(timeout=5)
-
- print("监控系统已停止")
- def main():
- """
- 主函数
-
- 功能:启动监控系统
- """
- print("CIP监控系统 - 配置文件监控模式")
- print("="*60)
-
- monitor = SmartCIPMonitor()
-
- try:
- monitor.start()
- except Exception as e:
- print(f"系统运行出错: {e}")
- import traceback
- traceback.print_exc()
- finally:
- if monitor.running:
- monitor.stop()
- if __name__ == '__main__':
- main()
|