| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- alert_aggregator.py
- -------------------
- 报警聚合器 - 跨设备聚合抑制 + 分类型冷却
- 两项核心功能:
- 1. 跨设备聚合抑制:同一水厂 N 分钟内 >=M 个设备同时报警 -> 全部抑制(环境噪声)
- 2. 分类型冷却时间:同类型异常 24 小时冷却,不同类型异常 1 小时冷却
- """
- import logging
- from datetime import datetime, timedelta
- from collections import defaultdict
- logger = logging.getLogger(__name__)
- class AlertAggregator:
- def __init__(self,
- push_callback,
- aggregate_enabled=True,
- window_seconds=300,
- min_devices=2,
- cooldown_same_type_hours=24,
- cooldown_diff_type_hours=1):
- # 聚合窗口到期后,符合条件的报警通过此回调实际推送
- self.push_callback = push_callback
- # 跨设备聚合配置
- self.aggregate_enabled = aggregate_enabled
- # 聚合窗口时长(秒),该时间段内收集同一水厂的所有报警
- self.window_seconds = window_seconds
- # 触发抑制的最小设备数,>=此数量则判定为环境噪声
- self.min_devices = min_devices
- # 分类型冷却配置
- # 同一设备同类型异常的冷却时间(小时),防止重复报警
- self.cooldown_same_type_hours = cooldown_same_type_hours
- # 同一设备不同类型异常的冷却时间(小时),允许较快报警新类型
- self.cooldown_diff_type_hours = cooldown_diff_type_hours
- # 聚合窗口状态
- # key: plant_name
- # value: {"start_time": datetime, "alerts": [alert_info_dict, ...]}
- self.pending_windows = {}
- # 冷却记录
- # key: device_code
- # value: {anomaly_type_code: last_alert_datetime}
- self.cooldown_records = defaultdict(dict)
- logger.info(
- f"报警聚合器已初始化 | "
- f"聚合={'启用' if aggregate_enabled else '禁用'} "
- f"窗口={window_seconds}秒 最小设备数={min_devices} | "
- f"冷却: 同类型={cooldown_same_type_hours}h 不同类型={cooldown_diff_type_hours}h"
- )
- def check_cooldown(self, device_code, anomaly_type_code):
- # 检查该设备是否在冷却期内
- # 返回 True 表示冷却中(应抑制),False 表示可以报警
- records = self.cooldown_records.get(device_code, {})
- now = datetime.now()
- if not records:
- # 该设备从未报过警,不在冷却期
- return False
- if anomaly_type_code in records:
- # 同类型异常:检查 24 小时冷却
- last_time = records[anomaly_type_code]
- elapsed_hours = (now - last_time).total_seconds() / 3600
- if elapsed_hours < self.cooldown_same_type_hours:
- remaining = self.cooldown_same_type_hours - elapsed_hours
- logger.info(
- f"同类型冷却中: {device_code} | "
- f"类型={anomaly_type_code} | "
- f"剩余 {remaining:.1f} 小时"
- )
- return True
- else:
- # 不同类型异常:检查最近一次任意类型报警的 1 小时冷却
- # 取所有类型中最近的一次报警时间
- last_any = max(records.values()) if records else None
- if last_any:
- elapsed_hours = (now - last_any).total_seconds() / 3600
- if elapsed_hours < self.cooldown_diff_type_hours:
- remaining = self.cooldown_diff_type_hours - elapsed_hours
- logger.info(
- f"不同类型冷却中: {device_code} | "
- f"新类型={anomaly_type_code} | "
- f"剩余 {remaining:.1f} 小时"
- )
- return True
- return False
- def record_cooldown(self, device_code, anomaly_type_code):
- # 记录报警时间,用于后续冷却判断
- self.cooldown_records[device_code][anomaly_type_code] = datetime.now()
- def submit_alert(self, plant_name, device_code, anomaly_type_code, push_kwargs):
- # 提交一条报警到聚合队列
- #
- # 参数:
- # plant_name: 水厂名称(聚合维度)
- # device_code: 设备编号
- # anomaly_type_code: 异常类型编码(level_two)
- # push_kwargs: 原 _push_detection_result 的全部关键字参数
- now = datetime.now()
- # 第一步:检查分类型冷却
- if self.check_cooldown(device_code, anomaly_type_code):
- logger.info(f"报警被冷却抑制: {device_code} | 类型={anomaly_type_code}")
- return
- if not self.aggregate_enabled:
- # 聚合禁用时直接推送
- self._do_push(device_code, anomaly_type_code, push_kwargs)
- return
- # 第二步:加入聚合窗口
- if plant_name not in self.pending_windows:
- # 该水厂没有活跃窗口,创建新窗口
- self.pending_windows[plant_name] = {
- "start_time": now,
- "alerts": []
- }
- logger.info(f"聚合窗口已开启: {plant_name} | 窗口={self.window_seconds}秒")
- # 加入队列(去重:同一设备在同一窗口内只保留一次)
- window = self.pending_windows[plant_name]
- existing_devices = {a["device_code"] for a in window["alerts"]}
- if device_code not in existing_devices:
- window["alerts"].append({
- "device_code": device_code,
- "anomaly_type_code": anomaly_type_code,
- "push_kwargs": push_kwargs,
- "submit_time": now
- })
- logger.info(
- f"报警已加入聚合窗口: {plant_name}/{device_code} | "
- f"当前窗口内设备数={len(window['alerts'])}"
- )
- def check_and_flush(self):
- # 定期调用,检查所有聚合窗口是否到期,到期后执行聚合判定
- now = datetime.now()
- expired_plants = []
- for plant_name, window in self.pending_windows.items():
- elapsed = (now - window["start_time"]).total_seconds()
- if elapsed >= self.window_seconds:
- expired_plants.append(plant_name)
- for plant_name in expired_plants:
- window = self.pending_windows.pop(plant_name)
- alerts = window["alerts"]
- device_count = len(alerts)
- if device_count >= self.min_devices:
- # 多设备同时报警 -> 判定为环境噪声,全部抑制
- device_names = [a["device_code"] for a in alerts]
- logger.warning(
- f"聚合抑制: {plant_name} | "
- f"{device_count}个设备同时报警,判定为环境噪声 | "
- f"设备: {', '.join(device_names)}"
- )
- elif device_count == 1:
- # 仅单个设备报警 -> 正常推送
- alert = alerts[0]
- self._do_push(
- alert["device_code"],
- alert["anomaly_type_code"],
- alert["push_kwargs"]
- )
- # device_count == 0 理论上不会出现,忽略
- def _do_push(self, device_code, anomaly_type_code, push_kwargs):
- # 实际执行推送,并记录冷却时间
- try:
- self.push_callback(**push_kwargs)
- # 推送成功后记录冷却
- self.record_cooldown(device_code, anomaly_type_code)
- logger.info(f"报警已推送: {device_code} | 类型={anomaly_type_code}")
- except Exception as e:
- logger.error(f"报警推送失败: {device_code} | {e}")
|