|
|
@@ -4,12 +4,17 @@ CIP监控系统 - 配置文件监控模式
|
|
|
|
|
|
功能:
|
|
|
1. 监控config.json中actual_time字段的变化
|
|
|
-2. 检测到变化后,基于actual_time执行CIP预测
|
|
|
+2. 检测到变化且actual_time < 当前时间时,基于actual_time执行CIP预测
|
|
|
3. 将预测结果保存到predicted_time(仅用于记录)
|
|
|
4. 通过回调接口发送预测结果到决策系统
|
|
|
+5. 继续等待actual_time的下次更新
|
|
|
+
|
|
|
+触发条件(同时满足):
|
|
|
+- actual_time 发生了变化
|
|
|
+- actual_time < 当前时间(已过期)
|
|
|
|
|
|
工作流程:
|
|
|
-修改actual_time → 检测变化 → 执行预测 → 保存predicted_time → 发送回调 → 继续监控
|
|
|
+修改actual_time(且 < 当前时间)→ 检测变化 → 执行预测 → 保存predicted_time → 发送回调 → 继续监控actual_time
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
@@ -17,10 +22,34 @@ import json
|
|
|
import time
|
|
|
import threading
|
|
|
import pandas as pd
|
|
|
+import logging
|
|
|
+from logging.handlers import RotatingFileHandler
|
|
|
from datetime import datetime
|
|
|
from main_simple import main as run_cip_analysis
|
|
|
from main_simple import send_decision_to_callback
|
|
|
|
|
|
+# 日志系统配置
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+logger.setLevel(logging.INFO)
|
|
|
+
|
|
|
+# 日志输出格式
|
|
|
+formatter = logging.Formatter(
|
|
|
+ '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
|
|
|
+ datefmt='%Y-%m-%d %H:%M:%S'
|
|
|
+)
|
|
|
+
|
|
|
+# 文件日志处理器,单个文件最大5MB,保留3个备份
|
|
|
+file_handler = RotatingFileHandler('smart_monitor.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8')
|
|
|
+file_handler.setFormatter(formatter)
|
|
|
+
|
|
|
+# 控制台日志处理器
|
|
|
+console_handler = logging.StreamHandler()
|
|
|
+console_handler.setFormatter(formatter)
|
|
|
+
|
|
|
+# 添加处理器
|
|
|
+logger.addHandler(file_handler)
|
|
|
+logger.addHandler(console_handler)
|
|
|
+
|
|
|
class SmartCIPMonitor:
|
|
|
"""
|
|
|
CIP监控器
|
|
|
@@ -59,11 +88,9 @@ class SmartCIPMonitor:
|
|
|
# 格式:{'RO1': '2025-12-01 12:00:00', 'RO2': '...'}
|
|
|
self.last_seen_actual_times = {}
|
|
|
|
|
|
- print("初始化CIP监控系统...")
|
|
|
if not self.load_config():
|
|
|
- print("配置加载失败")
|
|
|
+ logger.error("配置文件加载失败")
|
|
|
else:
|
|
|
- print("配置加载成功")
|
|
|
self.initialize_last_seen_times()
|
|
|
|
|
|
def load_config(self):
|
|
|
@@ -78,7 +105,7 @@ class SmartCIPMonitor:
|
|
|
self.config = json.load(f)
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
- print(f"配置文件加载失败: {e}")
|
|
|
+ logger.error(f"配置文件加载失败: {e}")
|
|
|
return False
|
|
|
|
|
|
def save_config(self):
|
|
|
@@ -93,7 +120,7 @@ class SmartCIPMonitor:
|
|
|
json.dump(self.config, f, ensure_ascii=False, indent=2)
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
- print(f"配置文件保存失败: {e}")
|
|
|
+ logger.error(f"配置文件保存失败: {e}")
|
|
|
return False
|
|
|
|
|
|
def initialize_last_seen_times(self):
|
|
|
@@ -102,34 +129,68 @@ class SmartCIPMonitor:
|
|
|
|
|
|
功能:
|
|
|
读取config.json中各机组当前的actual_time作为监控基准
|
|
|
- 后续只有当actual_time与此基准不同时才触发预测
|
|
|
+ 但是:只记录未来的时间,已过期的时间不记录,以便首次检查时能触发预测
|
|
|
|
|
|
说明:
|
|
|
- last_seen_actual_times用于防止重复处理相同的时间
|
|
|
- 支持新旧配置格式兼容
|
|
|
+ - 已过期的时间(< 当前时间)不记录,确保首次检查时能被检测为"变化"
|
|
|
"""
|
|
|
if not self.config or 'cip_times' not in self.config:
|
|
|
return
|
|
|
|
|
|
+ current_time = datetime.now()
|
|
|
+
|
|
|
for unit_name in self.unit_names:
|
|
|
if unit_name in self.config['cip_times']:
|
|
|
unit_data = self.config['cip_times'][unit_name]
|
|
|
+ actual_time_str = None
|
|
|
+
|
|
|
if isinstance(unit_data, dict) and 'actual_time' in unit_data:
|
|
|
# 新格式:{'actual_time': '...', 'predicted_time': '...'}
|
|
|
- self.last_seen_actual_times[unit_name] = unit_data['actual_time']
|
|
|
+ actual_time_str = unit_data['actual_time']
|
|
|
elif isinstance(unit_data, str):
|
|
|
# 兼容旧格式:直接是时间字符串
|
|
|
- self.last_seen_actual_times[unit_name] = unit_data
|
|
|
+ actual_time_str = unit_data
|
|
|
+
|
|
|
+ # 只记录未来的时间,已过期的时间不记录
|
|
|
+ if actual_time_str:
|
|
|
+ actual_time_dt = self.parse_time_string(actual_time_str)
|
|
|
+ if actual_time_dt and actual_time_dt >= current_time:
|
|
|
+ # 未来时间:记录基准,避免触发预测
|
|
|
+ self.last_seen_actual_times[unit_name] = actual_time_str
|
|
|
+ logger.info(f"{unit_name}: 未来时间,不触发预测")
|
|
|
+ elif actual_time_dt:
|
|
|
+ # 已过期时间:不记录基准,让首次检查时能检测到变化
|
|
|
+ logger.info(f"{unit_name}: 已过期,待触发预测")
|
|
|
+ # 如果为None则不记录
|
|
|
+
|
|
|
+ def parse_time_string(self, time_str):
|
|
|
+ """
|
|
|
+ 解析时间字符串为datetime对象
|
|
|
+
|
|
|
+ Args:
|
|
|
+ time_str: str,时间字符串,格式'YYYY-MM-DD HH:MM:SS'
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ datetime: 解析后的datetime对象,解析失败返回None
|
|
|
+ """
|
|
|
+ if not time_str or not isinstance(time_str, str):
|
|
|
+ return None
|
|
|
|
|
|
- print(f"已初始化监控基准时间: {self.last_seen_actual_times}")
|
|
|
+ try:
|
|
|
+ return datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"时间字符串解析失败: {time_str}, 错误: {e}")
|
|
|
+ return None
|
|
|
|
|
|
def check_for_changes(self):
|
|
|
"""
|
|
|
- 检查actual_time是否有变化
|
|
|
+ 检查actual_time是否有变化且小于当前时间
|
|
|
|
|
|
功能:
|
|
|
比对当前配置文件中的actual_time和上次记录的值
|
|
|
- 发现不同则判定为有变化,需要触发预测
|
|
|
+ 发现不同且小于当前时间则判定为有变化,需要触发预测
|
|
|
|
|
|
Returns:
|
|
|
list: 有变化的机组列表,格式 [(unit_name, new_actual_time), ...]
|
|
|
@@ -137,9 +198,11 @@ class SmartCIPMonitor:
|
|
|
|
|
|
说明:
|
|
|
- 只有actual_time变化才会触发
|
|
|
+ - 必须满足 actual_time < 当前时间
|
|
|
- predicted_time的变化不会触发
|
|
|
"""
|
|
|
changes = []
|
|
|
+ current_time = datetime.now()
|
|
|
|
|
|
if not self.config or 'cip_times' not in self.config:
|
|
|
return changes
|
|
|
@@ -159,11 +222,23 @@ class SmartCIPMonitor:
|
|
|
# 检查是否有变化
|
|
|
if current_actual_time:
|
|
|
last_actual_time = self.last_seen_actual_times.get(unit_name)
|
|
|
+
|
|
|
+ # 条件1:actual_time 发生了变化
|
|
|
if last_actual_time != current_actual_time:
|
|
|
- changes.append((unit_name, current_actual_time))
|
|
|
- print(f"\n检测到{unit_name}实际CIP时间变化:")
|
|
|
- print(f" 旧值: {last_actual_time}")
|
|
|
- print(f" 新值: {current_actual_time}")
|
|
|
+ # 条件2:actual_time 小于当前时间
|
|
|
+ actual_time_dt = self.parse_time_string(current_actual_time)
|
|
|
+
|
|
|
+ if actual_time_dt is None:
|
|
|
+ logger.warning(f"{unit_name} actual_time 格式无效: {current_actual_time}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ if actual_time_dt < current_time:
|
|
|
+ changes.append((unit_name, current_actual_time))
|
|
|
+ logger.info(f"{unit_name}: 检测到变化,触发预测 ({current_actual_time})")
|
|
|
+ else:
|
|
|
+ logger.info(f"{unit_name}: 未来时间,不触发 ({current_actual_time})")
|
|
|
+ # 更新监控基准,避免重复提示
|
|
|
+ self.last_seen_actual_times[unit_name] = current_actual_time
|
|
|
|
|
|
return changes
|
|
|
|
|
|
@@ -189,27 +264,24 @@ class SmartCIPMonitor:
|
|
|
- 下次预测只由actual_time的变化触发
|
|
|
"""
|
|
|
try:
|
|
|
- print(f"\n{'='*50}")
|
|
|
- print(f"开始预测 {unit_name}")
|
|
|
- print(f"基于实际CIP时间: {start_date_str}")
|
|
|
- print(f"{'='*50}")
|
|
|
+ logger.info(f"[{unit_name}] 开始预测 (基于 {start_date_str})")
|
|
|
|
|
|
# 步骤1:调用预测逻辑
|
|
|
result_df = run_cip_analysis(strategy=3, start_date=start_date_str, unit_filter=unit_name)
|
|
|
|
|
|
if result_df.empty:
|
|
|
- print(f"{unit_name} 预测失败: 无结果")
|
|
|
+ logger.warning(f"[{unit_name}] 预测失败: 无结果")
|
|
|
return False
|
|
|
|
|
|
# 步骤2:提取该机组的结果
|
|
|
unit_result = result_df[result_df['机组类型'] == unit_name]
|
|
|
if unit_result.empty:
|
|
|
- print(f"{unit_name} 预测失败: 结果中无此机组")
|
|
|
+ logger.warning(f"[{unit_name}] 预测失败: 结果中无此机组")
|
|
|
return False
|
|
|
|
|
|
cip_time = unit_result.iloc[0]['CIP时机']
|
|
|
if pd.isna(cip_time):
|
|
|
- print(f"{unit_name} 预测失败: CIP时机为空")
|
|
|
+ logger.warning(f"[{unit_name}] 预测失败: CIP时机为空")
|
|
|
return False
|
|
|
|
|
|
# 步骤3:保存预测时间到配置文件(仅用于记录)
|
|
|
@@ -234,29 +306,18 @@ class SmartCIPMonitor:
|
|
|
self.config['cip_times'][unit_name]['predicted_time'] = predicted_time_str
|
|
|
|
|
|
if not self.save_config():
|
|
|
- print(f"✗ {unit_name} 配置保存失败")
|
|
|
+ logger.error(f"[{unit_name}] 配置保存失败")
|
|
|
return False
|
|
|
|
|
|
- print(f"✓ {unit_name} 预测成功")
|
|
|
- print(f" 实际CIP时间: {start_date_str}")
|
|
|
- print(f" 预测CIP时间: {predicted_time_str} (仅用于记录)")
|
|
|
+ logger.info(f"[{unit_name}] 预测成功 → {predicted_time_str}")
|
|
|
|
|
|
# 步骤4:发送预测结果到回调接口
|
|
|
- print(f"\n发送预测结果到回调接口...")
|
|
|
- try:
|
|
|
- if send_decision_to_callback(result_df):
|
|
|
- print(f"✓ {unit_name} 回调发送成功")
|
|
|
- else:
|
|
|
- print(f"✗ {unit_name} 回调发送失败")
|
|
|
- except Exception as callback_error:
|
|
|
- print(f"✗ {unit_name} 回调发送异常: {callback_error}")
|
|
|
+ send_decision_to_callback(result_df)
|
|
|
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"✗ {unit_name} 预测失败: {e}")
|
|
|
- import traceback
|
|
|
- traceback.print_exc()
|
|
|
+ logger.error(f"[{unit_name}] 预测失败: {e}")
|
|
|
return False
|
|
|
|
|
|
def process_changes(self, changes):
|
|
|
@@ -272,23 +333,19 @@ class SmartCIPMonitor:
|
|
|
|
|
|
处理流程:
|
|
|
1. 执行预测:调用predict_and_update_unit
|
|
|
- 2. 更新监控基准:将新的actual_time记录到last_seen_actual_times
|
|
|
+ 2. 更新监控基准:将新的actual_time记录到last_seen_actual_times(无论成功失败)
|
|
|
3. 重新加载配置:确保获取最新数据
|
|
|
4. 短暂延时:避免频繁操作
|
|
|
"""
|
|
|
for unit_name, new_actual_time in changes:
|
|
|
- print(f"\n{'='*60}")
|
|
|
- print(f"处理 {unit_name} 的实际CIP时间变化")
|
|
|
- print(f"{'='*60}")
|
|
|
-
|
|
|
# 执行预测
|
|
|
- if self.predict_and_update_unit(unit_name, new_actual_time):
|
|
|
- # 更新监控基准,防止重复处理
|
|
|
- self.last_seen_actual_times[unit_name] = new_actual_time
|
|
|
- print(f"\n✓ {unit_name} 处理完成,已更新监控基准")
|
|
|
- print(f" 新的监控基准: {new_actual_time}")
|
|
|
- else:
|
|
|
- print(f"\n✗ {unit_name} 处理失败")
|
|
|
+ success = self.predict_and_update_unit(unit_name, new_actual_time)
|
|
|
+
|
|
|
+ # ⚠️ 关键修复:无论预测成功还是失败,都要更新监控基准,避免无限循环
|
|
|
+ self.last_seen_actual_times[unit_name] = new_actual_time
|
|
|
+
|
|
|
+ if not success:
|
|
|
+ logger.error(f"[{unit_name}] 处理失败,已记录时间避免重复")
|
|
|
|
|
|
# 重新加载配置,确保有最新数据
|
|
|
self.load_config()
|
|
|
@@ -300,8 +357,8 @@ class SmartCIPMonitor:
|
|
|
|
|
|
功能:
|
|
|
1. 定期重新加载config.json(每5秒)
|
|
|
- 2. 检查actual_time是否有变化
|
|
|
- 3. 发现变化后自动触发预测
|
|
|
+ 2. 检查actual_time是否有变化且小于当前时间
|
|
|
+ 3. 发现满足条件的变化后自动触发预测
|
|
|
4. 发送预测结果到回调接口
|
|
|
|
|
|
监控频率:
|
|
|
@@ -309,26 +366,23 @@ class SmartCIPMonitor:
|
|
|
- 发现变化立即处理
|
|
|
- 处理失败不影响后续监控
|
|
|
|
|
|
+ 触发条件(同时满足):
|
|
|
+ - actual_time 发生了变化
|
|
|
+ - actual_time < 当前时间(已过期)
|
|
|
+
|
|
|
说明:
|
|
|
- 只监控actual_time字段的变化
|
|
|
- predicted_time的变化不会触发任何操作
|
|
|
- 相同的actual_time不会重复触发
|
|
|
+ - 未来时间的actual_time不会触发预测
|
|
|
"""
|
|
|
- print("\n" + "="*60)
|
|
|
- print("开始监控config.json的变化")
|
|
|
- print("="*60)
|
|
|
- print("监控说明:")
|
|
|
- print(" - 监控字段: cip_times.*.actual_time")
|
|
|
- print(" - 检查频率: 每5秒")
|
|
|
- print(" - 触发条件: actual_time发生变化")
|
|
|
- print(" - 自动操作: 执行预测 → 保存predicted_time → 发送回调")
|
|
|
- print("="*60)
|
|
|
+ logger.info("监控启动 (每5秒检查一次,触发条件: actual_time变化且<当前时间)")
|
|
|
|
|
|
while self.running:
|
|
|
try:
|
|
|
# 步骤1:重新加载配置
|
|
|
if not self.load_config():
|
|
|
- print("✗ 配置加载失败,10秒后重试...")
|
|
|
+ logger.error("配置加载失败,10秒后重试...")
|
|
|
time.sleep(10)
|
|
|
continue
|
|
|
|
|
|
@@ -337,16 +391,13 @@ class SmartCIPMonitor:
|
|
|
|
|
|
if changes:
|
|
|
# 发现变化,立即处理
|
|
|
- print(f"\n检测到 {len(changes)} 个机组的actual_time变化")
|
|
|
self.process_changes(changes)
|
|
|
|
|
|
# 步骤3:等待一段时间后再次检查
|
|
|
time.sleep(5) # 每5秒检查一次配置文件
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"\n✗ 监控循环出错: {e}")
|
|
|
- import traceback
|
|
|
- traceback.print_exc()
|
|
|
+ logger.error(f"监控循环出错: {e}", exc_info=True)
|
|
|
time.sleep(10) # 出错后等待10秒再继续
|
|
|
|
|
|
def start(self):
|
|
|
@@ -354,26 +405,15 @@ class SmartCIPMonitor:
|
|
|
启动监控系统
|
|
|
"""
|
|
|
if self.running:
|
|
|
- print("监控系统已在运行")
|
|
|
+ logger.warning("监控系统已在运行")
|
|
|
return
|
|
|
|
|
|
- print("\n启动CIP监控系统")
|
|
|
- print("="*60)
|
|
|
- print("监控模式: 检测config.json中actual_time的变化")
|
|
|
- print("工作流程:")
|
|
|
- print(" 1. 手动修改config.json中的actual_time")
|
|
|
- print(" 2. 系统自动检测到变化")
|
|
|
- print(" 3. 基于actual_time执行预测")
|
|
|
- print(" 4. 保存predicted_time(仅用于记录)")
|
|
|
- print(" 5. 继续监控下一次变化")
|
|
|
- print("="*60)
|
|
|
+ logger.info("CIP监控系统启动(监控 config.json 中的 actual_time 变化)")
|
|
|
|
|
|
# 启动监控线程
|
|
|
self.running = True
|
|
|
self.monitor_thread = threading.Thread(target=self.monitor_loop, daemon=True)
|
|
|
self.monitor_thread.start()
|
|
|
-
|
|
|
- print("\n监控系统已启动,按Ctrl+C停止")
|
|
|
|
|
|
try:
|
|
|
while self.running:
|
|
|
@@ -385,13 +425,13 @@ class SmartCIPMonitor:
|
|
|
"""
|
|
|
停止监控系统
|
|
|
"""
|
|
|
- print("\n停止监控系统...")
|
|
|
+ logger.info("停止监控系统...")
|
|
|
self.running = False
|
|
|
|
|
|
if self.monitor_thread and self.monitor_thread.is_alive():
|
|
|
self.monitor_thread.join(timeout=5)
|
|
|
|
|
|
- print("监控系统已停止")
|
|
|
+ logger.info("监控系统已停止")
|
|
|
|
|
|
def main():
|
|
|
"""
|
|
|
@@ -399,17 +439,12 @@ def main():
|
|
|
|
|
|
功能:启动监控系统
|
|
|
"""
|
|
|
- print("CIP监控系统 - 配置文件监控模式")
|
|
|
- print("="*60)
|
|
|
-
|
|
|
monitor = SmartCIPMonitor()
|
|
|
|
|
|
try:
|
|
|
monitor.start()
|
|
|
except Exception as e:
|
|
|
- print(f"系统运行出错: {e}")
|
|
|
- import traceback
|
|
|
- traceback.print_exc()
|
|
|
+ logger.error(f"系统运行出错: {e}", exc_info=True)
|
|
|
finally:
|
|
|
if monitor.running:
|
|
|
monitor.stop()
|