alert_aggregator.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. alert_aggregator.py
  5. -------------------
  6. 报警聚合器 - 跨设备聚合抑制 + 分类型冷却
  7. 两项核心功能:
  8. 1. 跨设备聚合抑制:同一水厂 N 分钟内 >=M 个设备同时报警 -> 全部抑制(环境噪声)
  9. 2. 分类型冷却时间:同类型异常 24 小时冷却,不同类型异常 1 小时冷却
  10. """
  11. import logging
  12. from datetime import datetime, timedelta
  13. from collections import defaultdict
  14. logger = logging.getLogger(__name__)
  15. class AlertAggregator:
  16. def __init__(self,
  17. push_callback,
  18. aggregate_enabled=True,
  19. window_seconds=300,
  20. min_devices=2,
  21. cooldown_same_type_hours=24,
  22. cooldown_diff_type_hours=1):
  23. # 聚合窗口到期后,符合条件的报警通过此回调实际推送
  24. self.push_callback = push_callback
  25. # 跨设备聚合配置
  26. self.aggregate_enabled = aggregate_enabled
  27. # 聚合窗口时长(秒),该时间段内收集同一水厂的所有报警
  28. self.window_seconds = window_seconds
  29. # 触发抑制的最小设备数,>=此数量则判定为环境噪声
  30. self.min_devices = min_devices
  31. # 分类型冷却配置
  32. # 同一设备同类型异常的冷却时间(小时),防止重复报警
  33. self.cooldown_same_type_hours = cooldown_same_type_hours
  34. # 同一设备不同类型异常的冷却时间(小时),允许较快报警新类型
  35. self.cooldown_diff_type_hours = cooldown_diff_type_hours
  36. # 聚合窗口状态
  37. # key: plant_name
  38. # value: {"start_time": datetime, "alerts": [alert_info_dict, ...]}
  39. self.pending_windows = {}
  40. # 冷却记录
  41. # key: device_code
  42. # value: {anomaly_type_code: last_alert_datetime}
  43. self.cooldown_records = defaultdict(dict)
  44. logger.info(
  45. f"报警聚合器已初始化 | "
  46. f"聚合={'启用' if aggregate_enabled else '禁用'} "
  47. f"窗口={window_seconds}秒 最小设备数={min_devices} | "
  48. f"冷却: 同类型={cooldown_same_type_hours}h 不同类型={cooldown_diff_type_hours}h"
  49. )
  50. def check_cooldown(self, device_code, anomaly_type_code):
  51. # 检查该设备是否在冷却期内
  52. # 返回 True 表示冷却中(应抑制),False 表示可以报警
  53. records = self.cooldown_records.get(device_code, {})
  54. now = datetime.now()
  55. if not records:
  56. # 该设备从未报过警,不在冷却期
  57. return False
  58. if anomaly_type_code in records:
  59. # 同类型异常:检查 24 小时冷却
  60. last_time = records[anomaly_type_code]
  61. elapsed_hours = (now - last_time).total_seconds() / 3600
  62. if elapsed_hours < self.cooldown_same_type_hours:
  63. remaining = self.cooldown_same_type_hours - elapsed_hours
  64. logger.info(
  65. f"同类型冷却中: {device_code} | "
  66. f"类型={anomaly_type_code} | "
  67. f"剩余 {remaining:.1f} 小时"
  68. )
  69. return True
  70. else:
  71. # 不同类型异常:检查最近一次任意类型报警的 1 小时冷却
  72. # 取所有类型中最近的一次报警时间
  73. last_any = max(records.values()) if records else None
  74. if last_any:
  75. elapsed_hours = (now - last_any).total_seconds() / 3600
  76. if elapsed_hours < self.cooldown_diff_type_hours:
  77. remaining = self.cooldown_diff_type_hours - elapsed_hours
  78. logger.info(
  79. f"不同类型冷却中: {device_code} | "
  80. f"新类型={anomaly_type_code} | "
  81. f"剩余 {remaining:.1f} 小时"
  82. )
  83. return True
  84. return False
  85. def record_cooldown(self, device_code, anomaly_type_code):
  86. # 记录报警时间,用于后续冷却判断
  87. self.cooldown_records[device_code][anomaly_type_code] = datetime.now()
  88. def submit_alert(self, plant_name, device_code, anomaly_type_code, push_kwargs):
  89. # 提交一条报警到聚合队列
  90. #
  91. # 参数:
  92. # plant_name: 水厂名称(聚合维度)
  93. # device_code: 设备编号
  94. # anomaly_type_code: 异常类型编码(level_two)
  95. # push_kwargs: 原 _push_detection_result 的全部关键字参数
  96. now = datetime.now()
  97. # 第一步:检查分类型冷却
  98. if self.check_cooldown(device_code, anomaly_type_code):
  99. logger.info(f"报警被冷却抑制: {device_code} | 类型={anomaly_type_code}")
  100. return
  101. if not self.aggregate_enabled:
  102. # 聚合禁用时直接推送
  103. self._do_push(device_code, anomaly_type_code, push_kwargs)
  104. return
  105. # 第二步:加入聚合窗口
  106. if plant_name not in self.pending_windows:
  107. # 该水厂没有活跃窗口,创建新窗口
  108. self.pending_windows[plant_name] = {
  109. "start_time": now,
  110. "alerts": []
  111. }
  112. logger.info(f"聚合窗口已开启: {plant_name} | 窗口={self.window_seconds}秒")
  113. # 加入队列(去重:同一设备在同一窗口内只保留一次)
  114. window = self.pending_windows[plant_name]
  115. existing_devices = {a["device_code"] for a in window["alerts"]}
  116. if device_code not in existing_devices:
  117. window["alerts"].append({
  118. "device_code": device_code,
  119. "anomaly_type_code": anomaly_type_code,
  120. "push_kwargs": push_kwargs,
  121. "submit_time": now
  122. })
  123. logger.info(
  124. f"报警已加入聚合窗口: {plant_name}/{device_code} | "
  125. f"当前窗口内设备数={len(window['alerts'])}"
  126. )
  127. def check_and_flush(self):
  128. # 定期调用,检查所有聚合窗口是否到期,到期后执行聚合判定
  129. now = datetime.now()
  130. expired_plants = []
  131. for plant_name, window in self.pending_windows.items():
  132. elapsed = (now - window["start_time"]).total_seconds()
  133. if elapsed >= self.window_seconds:
  134. expired_plants.append(plant_name)
  135. for plant_name in expired_plants:
  136. window = self.pending_windows.pop(plant_name)
  137. alerts = window["alerts"]
  138. device_count = len(alerts)
  139. if device_count >= self.min_devices:
  140. # 多设备同时报警 -> 判定为环境噪声,全部抑制
  141. device_names = [a["device_code"] for a in alerts]
  142. logger.warning(
  143. f"聚合抑制: {plant_name} | "
  144. f"{device_count}个设备同时报警,判定为环境噪声 | "
  145. f"设备: {', '.join(device_names)}"
  146. )
  147. elif device_count == 1:
  148. # 仅单个设备报警 -> 正常推送
  149. alert = alerts[0]
  150. self._do_push(
  151. alert["device_code"],
  152. alert["anomaly_type_code"],
  153. alert["push_kwargs"]
  154. )
  155. # device_count == 0 理论上不会出现,忽略
  156. def _do_push(self, device_code, anomaly_type_code, push_kwargs):
  157. # 实际执行推送,并记录冷却时间
  158. try:
  159. self.push_callback(**push_kwargs)
  160. # 推送成功后记录冷却
  161. self.record_cooldown(device_code, anomaly_type_code)
  162. logger.info(f"报警已推送: {device_code} | 类型={anomaly_type_code}")
  163. except Exception as e:
  164. logger.error(f"报警推送失败: {device_code} | {e}")