#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ alert_aggregator.py ------------------- 报警聚合器 - 跨设备聚合抑制 + 分类型冷却 两项核心功能: 1. 跨设备聚合抑制:同一水厂 N 分钟内 >=M 个设备同时报警 -> 全部抑制(环境噪声) 2. 分类型冷却时间:同类型异常 24 小时冷却,不同类型异常 1 小时冷却 """ import logging from datetime import datetime, timedelta from collections import defaultdict logger = logging.getLogger(__name__) class AlertAggregator: def __init__(self, push_callback, aggregate_enabled=True, window_seconds=300, min_devices=2, cooldown_same_type_hours=24, cooldown_diff_type_hours=1): # 聚合窗口到期后,符合条件的报警通过此回调实际推送 self.push_callback = push_callback # 跨设备聚合配置 self.aggregate_enabled = aggregate_enabled # 聚合窗口时长(秒),该时间段内收集同一水厂的所有报警 self.window_seconds = window_seconds # 触发抑制的最小设备数,>=此数量则判定为环境噪声 self.min_devices = min_devices # 分类型冷却配置 # 同一设备同类型异常的冷却时间(小时),防止重复报警 self.cooldown_same_type_hours = cooldown_same_type_hours # 同一设备不同类型异常的冷却时间(小时),允许较快报警新类型 self.cooldown_diff_type_hours = cooldown_diff_type_hours # 聚合窗口状态 # key: plant_name # value: {"start_time": datetime, "alerts": [alert_info_dict, ...]} self.pending_windows = {} # 冷却记录 # key: device_code # value: {anomaly_type_code: last_alert_datetime} self.cooldown_records = defaultdict(dict) logger.info( f"报警聚合器已初始化 | " f"聚合={'启用' if aggregate_enabled else '禁用'} " f"窗口={window_seconds}秒 最小设备数={min_devices} | " f"冷却: 同类型={cooldown_same_type_hours}h 不同类型={cooldown_diff_type_hours}h" ) def check_cooldown(self, device_code, anomaly_type_code): # 检查该设备是否在冷却期内 # 返回 True 表示冷却中(应抑制),False 表示可以报警 records = self.cooldown_records.get(device_code, {}) now = datetime.now() if not records: # 该设备从未报过警,不在冷却期 return False if anomaly_type_code in records: # 同类型异常:检查 24 小时冷却 last_time = records[anomaly_type_code] elapsed_hours = (now - last_time).total_seconds() / 3600 if elapsed_hours < self.cooldown_same_type_hours: remaining = self.cooldown_same_type_hours - elapsed_hours logger.info( f"同类型冷却中: {device_code} | " f"类型={anomaly_type_code} | " f"剩余 {remaining:.1f} 小时" ) return True else: # 不同类型异常:检查最近一次任意类型报警的 1 小时冷却 # 取所有类型中最近的一次报警时间 last_any = max(records.values()) if records else None if last_any: elapsed_hours = (now - last_any).total_seconds() / 3600 if elapsed_hours < self.cooldown_diff_type_hours: remaining = self.cooldown_diff_type_hours - elapsed_hours logger.info( f"不同类型冷却中: {device_code} | " f"新类型={anomaly_type_code} | " f"剩余 {remaining:.1f} 小时" ) return True return False def record_cooldown(self, device_code, anomaly_type_code): # 记录报警时间,用于后续冷却判断 self.cooldown_records[device_code][anomaly_type_code] = datetime.now() def submit_alert(self, plant_name, device_code, anomaly_type_code, push_kwargs): # 提交一条报警到聚合队列 # # 参数: # plant_name: 水厂名称(聚合维度) # device_code: 设备编号 # anomaly_type_code: 异常类型编码(level_two) # push_kwargs: 原 _push_detection_result 的全部关键字参数 now = datetime.now() # 第一步:检查分类型冷却 if self.check_cooldown(device_code, anomaly_type_code): logger.info(f"报警被冷却抑制: {device_code} | 类型={anomaly_type_code}") return if not self.aggregate_enabled: # 聚合禁用时直接推送 self._do_push(device_code, anomaly_type_code, push_kwargs) return # 第二步:加入聚合窗口 if plant_name not in self.pending_windows: # 该水厂没有活跃窗口,创建新窗口 self.pending_windows[plant_name] = { "start_time": now, "alerts": [] } logger.info(f"聚合窗口已开启: {plant_name} | 窗口={self.window_seconds}秒") # 加入队列(去重:同一设备在同一窗口内只保留一次) window = self.pending_windows[plant_name] existing_devices = {a["device_code"] for a in window["alerts"]} if device_code not in existing_devices: window["alerts"].append({ "device_code": device_code, "anomaly_type_code": anomaly_type_code, "push_kwargs": push_kwargs, "submit_time": now }) logger.info( f"报警已加入聚合窗口: {plant_name}/{device_code} | " f"当前窗口内设备数={len(window['alerts'])}" ) def check_and_flush(self): # 定期调用,检查所有聚合窗口是否到期,到期后执行聚合判定 now = datetime.now() expired_plants = [] for plant_name, window in self.pending_windows.items(): elapsed = (now - window["start_time"]).total_seconds() if elapsed >= self.window_seconds: expired_plants.append(plant_name) for plant_name in expired_plants: window = self.pending_windows.pop(plant_name) alerts = window["alerts"] device_count = len(alerts) if device_count >= self.min_devices: # 多设备同时报警 -> 判定为环境噪声,全部抑制 device_names = [a["device_code"] for a in alerts] logger.warning( f"聚合抑制: {plant_name} | " f"{device_count}个设备同时报警,判定为环境噪声 | " f"设备: {', '.join(device_names)}" ) elif device_count == 1: # 仅单个设备报警 -> 正常推送 alert = alerts[0] self._do_push( alert["device_code"], alert["anomaly_type_code"], alert["push_kwargs"] ) # device_count == 0 理论上不会出现,忽略 def _do_push(self, device_code, anomaly_type_code, push_kwargs): # 实际执行推送,并记录冷却时间 try: self.push_callback(**push_kwargs) # 推送成功后记录冷却 self.record_cooldown(device_code, anomaly_type_code) logger.info(f"报警已推送: {device_code} | 类型={anomaly_type_code}") except Exception as e: logger.error(f"报警推送失败: {device_code} | {e}")