#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ alert_aggregator.py ------------------- 报警聚合器 - 跨设备聚合抑制 + 分类型冷却 核心功能: 1. 跨设备聚合抑制:同一水厂 N 分钟内 >=M 个设备同时报警 -> 全部抑制(环境噪声) 2. 分类型冷却时间:同类型异常 24 小时冷却,不同类型异常 1 小时冷却 3. 调试模式感知:mode_id=4 时使用短冷却并跳过聚合窗口,方便现场测试 """ import logging from datetime import datetime, timedelta from collections import defaultdict logger = logging.getLogger(__name__) class AlertAggregator: def __init__(self, push_callback, aggregate_enabled=True, window_seconds=300, min_devices=2, cooldown_same_type_hours=24, cooldown_diff_type_hours=1, mode_provider=None, debug_cooldown_minutes=5): # 聚合窗口到期后,符合条件的报警通过此回调实际推送 self.push_callback = push_callback # 跨设备聚合配置 self.aggregate_enabled = aggregate_enabled # 聚合窗口时长(秒),该时间段内收集同一水厂的所有报警 self.window_seconds = window_seconds # 触发抑制的最小设备数,>=此数量则判定为环境噪声 self.min_devices = min_devices # 分类型冷却配置 # 同一设备同类型异常的冷却时间(小时),防止重复报警 self.cooldown_same_type_hours = cooldown_same_type_hours # 同一设备不同类型异常的冷却时间(小时),允许较快报警新类型 self.cooldown_diff_type_hours = cooldown_diff_type_hours # 项目模式感知:回调返回当前 mode_id(1=日常, 2=参观, 3=检修, 4=调试) # mode_id=4 时使用短冷却并跳过聚合窗口 self._mode_provider = mode_provider # 调试模式下同类型异常的冷却时间(分钟),远小于正常的24h self._debug_cooldown_minutes = debug_cooldown_minutes # 聚合窗口状态 # key: plant_name # value: {"start_time": datetime, "alerts": [alert_info_dict, ...]} self.pending_windows = {} # 冷却记录 # key: device_code # value: {anomaly_type_code: last_alert_datetime} self.cooldown_records = defaultdict(dict) logger.info( f"报警聚合器已初始化 | " f"聚合={'启用' if aggregate_enabled else '禁用'} " f"窗口={window_seconds}秒 最小设备数={min_devices} | " f"冷却: 同类型={cooldown_same_type_hours}h 不同类型={cooldown_diff_type_hours}h | " f"调试冷却={debug_cooldown_minutes}分钟" ) def _is_debug_mode(self): # 判断当前是否为调试模式(mode_id=4) if self._mode_provider is None: return False try: return self._mode_provider() == 4 except Exception: return False def check_cooldown(self, device_code, anomaly_type_code): # 检查该设备是否在冷却期内 # 返回 True 表示冷却中(应抑制),False 表示可以报警 records = self.cooldown_records.get(device_code, {}) now = datetime.now() if not records: # 该设备从未报过警,不在冷却期 return False # 调试模式使用分钟级短冷却,加速现场测试迭代 is_debug = self._is_debug_mode() if anomaly_type_code in records: last_time = records[anomaly_type_code] elapsed_seconds = (now - last_time).total_seconds() if is_debug: # 调试模式:同类型异常使用分钟级冷却 cooldown_seconds = self._debug_cooldown_minutes * 60 if elapsed_seconds < cooldown_seconds: remaining_min = (cooldown_seconds - elapsed_seconds) / 60 logger.info( f"[调试模式] 同类型冷却中: {device_code} | " f"类型={anomaly_type_code} | " f"剩余 {remaining_min:.1f} 分钟" ) return True else: # 正常模式:同类型异常 24 小时冷却 elapsed_hours = elapsed_seconds / 3600 if elapsed_hours < self.cooldown_same_type_hours: remaining = self.cooldown_same_type_hours - elapsed_hours logger.info( f"同类型冷却中: {device_code} | " f"类型={anomaly_type_code} | " f"剩余 {remaining:.1f} 小时" ) return True else: if is_debug: # 调试模式:不同类型异常零冷却,切换场景时立即可报 pass else: # 正常模式:不同类型异常 1 小时冷却 last_any = max(records.values()) if records else None if last_any: elapsed_hours = (now - last_any).total_seconds() / 3600 if elapsed_hours < self.cooldown_diff_type_hours: remaining = self.cooldown_diff_type_hours - elapsed_hours logger.info( f"不同类型冷却中: {device_code} | " f"新类型={anomaly_type_code} | " f"剩余 {remaining:.1f} 小时" ) return True return False def record_cooldown(self, device_code, anomaly_type_code): # 记录报警时间,用于后续冷却判断 self.cooldown_records[device_code][anomaly_type_code] = datetime.now() def submit_alert(self, plant_name, device_code, anomaly_type_code, push_kwargs): # 提交一条报警到聚合队列 # # 参数: # plant_name: 水厂名称(聚合维度) # device_code: 设备编号 # anomaly_type_code: 异常类型编码(level_two) # push_kwargs: 原 _push_detection_result 的全部关键字参数 now = datetime.now() # 第一步:检查分类型冷却 if self.check_cooldown(device_code, anomaly_type_code): logger.info(f"报警被冷却抑制: {device_code} | 类型={anomaly_type_code}") return # 调试模式(mode_id=4):跳过聚合窗口,单次异常立即推送 # 调试目的是验证报警链路,不需要等待多设备比对 if self._is_debug_mode(): logger.info(f"[调试模式] 直接推送: {device_code} | 类型={anomaly_type_code}") self._do_push(device_code, anomaly_type_code, push_kwargs) return if not self.aggregate_enabled: # 聚合禁用时直接推送 self._do_push(device_code, anomaly_type_code, push_kwargs) return # 第二步:加入聚合窗口 if plant_name not in self.pending_windows: # 该水厂没有活跃窗口,创建新窗口 self.pending_windows[plant_name] = { "start_time": now, "alerts": [] } logger.info(f"聚合窗口已开启: {plant_name} | 窗口={self.window_seconds}秒") # 加入队列(去重:同一设备在同一窗口内只保留一次) window = self.pending_windows[plant_name] existing_devices = {a["device_code"] for a in window["alerts"]} if device_code not in existing_devices: window["alerts"].append({ "device_code": device_code, "anomaly_type_code": anomaly_type_code, "push_kwargs": push_kwargs, "submit_time": now }) logger.info( f"报警已加入聚合窗口: {plant_name}/{device_code} | " f"当前窗口内设备数={len(window['alerts'])}" ) def check_and_flush(self): # 定期调用,检查所有聚合窗口是否到期,到期后执行聚合判定 now = datetime.now() expired_plants = [] for plant_name, window in self.pending_windows.items(): elapsed = (now - window["start_time"]).total_seconds() if elapsed >= self.window_seconds: expired_plants.append(plant_name) for plant_name in expired_plants: window = self.pending_windows.pop(plant_name) alerts = window["alerts"] device_count = len(alerts) if device_count >= self.min_devices: # 多设备同时报警 -> 判定为环境噪声,全部抑制 device_names = [a["device_code"] for a in alerts] logger.warning( f"聚合抑制: {plant_name} | " f"{device_count}个设备同时报警,判定为环境噪声 | " f"设备: {', '.join(device_names)}" ) else: # 设备数不足 min_devices -> 逐个推送(正常报警) for alert in alerts: self._do_push( alert["device_code"], alert["anomaly_type_code"], alert["push_kwargs"] ) def _do_push(self, device_code, anomaly_type_code, push_kwargs): # 实际执行推送,并记录冷却时间 try: self.push_callback(**push_kwargs) # 推送成功后记录冷却 self.record_cooldown(device_code, anomaly_type_code) logger.info(f"报警已推送: {device_code} | 类型={anomaly_type_code}") except Exception as e: logger.error(f"报警推送失败: {device_code} | {e}")