alert_aggregator.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. alert_aggregator.py
  5. -------------------
  6. 报警聚合器 - 跨设备聚合抑制 + 分类型冷却
  7. 核心功能:
  8. 1. 跨设备聚合抑制:同一水厂 N 分钟内 >=M 个设备同时报警 -> 全部抑制(环境噪声)
  9. 2. 分类型冷却时间:同类型异常 24 小时冷却,不同类型异常 1 小时冷却
  10. 3. 调试模式感知:mode_id=4 时使用短冷却并跳过聚合窗口,方便现场测试
  11. """
  12. import logging
  13. from datetime import datetime, timedelta
  14. from collections import defaultdict
  15. logger = logging.getLogger(__name__)
  16. class AlertAggregator:
  17. def __init__(self,
  18. push_callback,
  19. aggregate_enabled=True,
  20. window_seconds=300,
  21. min_devices=2,
  22. cooldown_same_type_hours=24,
  23. cooldown_diff_type_hours=1,
  24. mode_provider=None,
  25. debug_cooldown_minutes=5):
  26. # 聚合窗口到期后,符合条件的报警通过此回调实际推送
  27. self.push_callback = push_callback
  28. # 跨设备聚合配置
  29. self.aggregate_enabled = aggregate_enabled
  30. # 聚合窗口时长(秒),该时间段内收集同一水厂的所有报警
  31. self.window_seconds = window_seconds
  32. # 触发抑制的最小设备数,>=此数量则判定为环境噪声
  33. self.min_devices = min_devices
  34. # 分类型冷却配置
  35. # 同一设备同类型异常的冷却时间(小时),防止重复报警
  36. self.cooldown_same_type_hours = cooldown_same_type_hours
  37. # 同一设备不同类型异常的冷却时间(小时),允许较快报警新类型
  38. self.cooldown_diff_type_hours = cooldown_diff_type_hours
  39. # 项目模式感知:回调返回当前 mode_id(1=日常, 2=参观, 3=检修, 4=调试)
  40. # mode_id=4 时使用短冷却并跳过聚合窗口
  41. self._mode_provider = mode_provider
  42. # 调试模式下同类型异常的冷却时间(分钟),远小于正常的24h
  43. self._debug_cooldown_minutes = debug_cooldown_minutes
  44. # 聚合窗口状态
  45. # key: plant_name
  46. # value: {"start_time": datetime, "alerts": [alert_info_dict, ...]}
  47. self.pending_windows = {}
  48. # 冷却记录
  49. # key: device_code
  50. # value: {anomaly_type_code: last_alert_datetime}
  51. self.cooldown_records = defaultdict(dict)
  52. logger.info(
  53. f"报警聚合器已初始化 | "
  54. f"聚合={'启用' if aggregate_enabled else '禁用'} "
  55. f"窗口={window_seconds}秒 最小设备数={min_devices} | "
  56. f"冷却: 同类型={cooldown_same_type_hours}h 不同类型={cooldown_diff_type_hours}h | "
  57. f"调试冷却={debug_cooldown_minutes}分钟"
  58. )
  59. def _is_debug_mode(self):
  60. # 判断当前是否为调试模式(mode_id=4)
  61. if self._mode_provider is None:
  62. return False
  63. try:
  64. return self._mode_provider() == 4
  65. except Exception:
  66. return False
  67. def check_cooldown(self, device_code, anomaly_type_code):
  68. # 检查该设备是否在冷却期内
  69. # 返回 True 表示冷却中(应抑制),False 表示可以报警
  70. records = self.cooldown_records.get(device_code, {})
  71. now = datetime.now()
  72. if not records:
  73. # 该设备从未报过警,不在冷却期
  74. return False
  75. # 调试模式使用分钟级短冷却,加速现场测试迭代
  76. is_debug = self._is_debug_mode()
  77. if anomaly_type_code in records:
  78. last_time = records[anomaly_type_code]
  79. elapsed_seconds = (now - last_time).total_seconds()
  80. if is_debug:
  81. # 调试模式:同类型异常使用分钟级冷却
  82. cooldown_seconds = self._debug_cooldown_minutes * 60
  83. if elapsed_seconds < cooldown_seconds:
  84. remaining_min = (cooldown_seconds - elapsed_seconds) / 60
  85. logger.info(
  86. f"[调试模式] 同类型冷却中: {device_code} | "
  87. f"类型={anomaly_type_code} | "
  88. f"剩余 {remaining_min:.1f} 分钟"
  89. )
  90. return True
  91. else:
  92. # 正常模式:同类型异常 24 小时冷却
  93. elapsed_hours = elapsed_seconds / 3600
  94. if elapsed_hours < self.cooldown_same_type_hours:
  95. remaining = self.cooldown_same_type_hours - elapsed_hours
  96. logger.info(
  97. f"同类型冷却中: {device_code} | "
  98. f"类型={anomaly_type_code} | "
  99. f"剩余 {remaining:.1f} 小时"
  100. )
  101. return True
  102. else:
  103. if is_debug:
  104. # 调试模式:不同类型异常零冷却,切换场景时立即可报
  105. pass
  106. else:
  107. # 正常模式:不同类型异常 1 小时冷却
  108. last_any = max(records.values()) if records else None
  109. if last_any:
  110. elapsed_hours = (now - last_any).total_seconds() / 3600
  111. if elapsed_hours < self.cooldown_diff_type_hours:
  112. remaining = self.cooldown_diff_type_hours - elapsed_hours
  113. logger.info(
  114. f"不同类型冷却中: {device_code} | "
  115. f"新类型={anomaly_type_code} | "
  116. f"剩余 {remaining:.1f} 小时"
  117. )
  118. return True
  119. return False
  120. def record_cooldown(self, device_code, anomaly_type_code):
  121. # 记录报警时间,用于后续冷却判断
  122. self.cooldown_records[device_code][anomaly_type_code] = datetime.now()
  123. def submit_alert(self, plant_name, device_code, anomaly_type_code, push_kwargs):
  124. # 提交一条报警到聚合队列
  125. #
  126. # 参数:
  127. # plant_name: 水厂名称(聚合维度)
  128. # device_code: 设备编号
  129. # anomaly_type_code: 异常类型编码(level_two)
  130. # push_kwargs: 原 _push_detection_result 的全部关键字参数
  131. now = datetime.now()
  132. # 第一步:检查分类型冷却
  133. if self.check_cooldown(device_code, anomaly_type_code):
  134. logger.info(f"报警被冷却抑制: {device_code} | 类型={anomaly_type_code}")
  135. return
  136. # 调试模式(mode_id=4):跳过聚合窗口,单次异常立即推送
  137. # 调试目的是验证报警链路,不需要等待多设备比对
  138. if self._is_debug_mode():
  139. logger.info(f"[调试模式] 直接推送: {device_code} | 类型={anomaly_type_code}")
  140. self._do_push(device_code, anomaly_type_code, push_kwargs)
  141. return
  142. if not self.aggregate_enabled:
  143. # 聚合禁用时直接推送
  144. self._do_push(device_code, anomaly_type_code, push_kwargs)
  145. return
  146. # 第二步:加入聚合窗口
  147. if plant_name not in self.pending_windows:
  148. # 该水厂没有活跃窗口,创建新窗口
  149. self.pending_windows[plant_name] = {
  150. "start_time": now,
  151. "alerts": []
  152. }
  153. logger.info(f"聚合窗口已开启: {plant_name} | 窗口={self.window_seconds}秒")
  154. # 加入队列(去重:同一设备在同一窗口内只保留一次)
  155. window = self.pending_windows[plant_name]
  156. existing_devices = {a["device_code"] for a in window["alerts"]}
  157. if device_code not in existing_devices:
  158. window["alerts"].append({
  159. "device_code": device_code,
  160. "anomaly_type_code": anomaly_type_code,
  161. "push_kwargs": push_kwargs,
  162. "submit_time": now
  163. })
  164. logger.info(
  165. f"报警已加入聚合窗口: {plant_name}/{device_code} | "
  166. f"当前窗口内设备数={len(window['alerts'])}"
  167. )
  168. def check_and_flush(self):
  169. # 定期调用,检查所有聚合窗口是否到期,到期后执行聚合判定
  170. now = datetime.now()
  171. expired_plants = []
  172. for plant_name, window in self.pending_windows.items():
  173. elapsed = (now - window["start_time"]).total_seconds()
  174. if elapsed >= self.window_seconds:
  175. expired_plants.append(plant_name)
  176. for plant_name in expired_plants:
  177. window = self.pending_windows.pop(plant_name)
  178. alerts = window["alerts"]
  179. device_count = len(alerts)
  180. if device_count >= self.min_devices:
  181. # 多设备同时报警 -> 判定为环境噪声,全部抑制
  182. device_names = [a["device_code"] for a in alerts]
  183. logger.warning(
  184. f"聚合抑制: {plant_name} | "
  185. f"{device_count}个设备同时报警,判定为环境噪声 | "
  186. f"设备: {', '.join(device_names)}"
  187. )
  188. else:
  189. # 设备数不足 min_devices -> 逐个推送(正常报警)
  190. for alert in alerts:
  191. self._do_push(
  192. alert["device_code"],
  193. alert["anomaly_type_code"],
  194. alert["push_kwargs"]
  195. )
  196. def _do_push(self, device_code, anomaly_type_code, push_kwargs):
  197. # 实际执行推送,并记录冷却时间
  198. try:
  199. self.push_callback(**push_kwargs)
  200. # 推送成功后记录冷却
  201. self.record_cooldown(device_code, anomaly_type_code)
  202. logger.info(f"报警已推送: {device_code} | 类型={anomaly_type_code}")
  203. except Exception as e:
  204. logger.error(f"报警推送失败: {device_code} | {e}")