energy_baseline.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. # -*- coding: utf-8 -*-
  2. """
  3. energy_baseline.py
  4. ------------------
  5. 能量基线自动校准模块 - 基于音频能量判断泵启停状态
  6. 功能:
  7. 1. 冷启动时自动采集能量数据,计算运行/停机阈值
  8. 2. 根据实时 RMS 能量判断泵状态(开机/停机)
  9. 3. 通过滑动窗口检测能量趋势变化
  10. 与 pump_state_monitor 的区别:
  11. - pump_state_monitor: 基于 PLC 查询,准确但需 SCADA 系统
  12. - energy_baseline: 基于音频能量,无需外部系统,适合无 PLC 场景
  13. 调用流程:
  14. baseline = EnergyBaseline(pump_id="ch8", warmup_samples=30)
  15. # 每次处理音频时:
  16. rms = np.sqrt(np.mean(y ** 2)) # 计算 RMS 能量
  17. status = baseline.update(rms) # 返回 "开机" / "停机"
  18. # 判断是否运行中(决定是否做异常检测)
  19. if baseline.is_running():
  20. # 进行异常检测
  21. pass
  22. 阈值计算逻辑:
  23. - energy_high = 运行时能量下界(高能量分布的第5百分位)
  24. - energy_low = energy_high × 30%
  25. - 能量 > energy_high -> 运行中
  26. - 能量 < energy_low -> 停机中
  27. 注意:
  28. - 冷启动约4分钟(30样本×8秒),期间默认返回"开机"
  29. - 每500个样本自动重新校准阈值
  30. """
  31. import logging
  32. import numpy as np
  33. from typing import Dict, List
  34. logger = logging.getLogger(__name__)
  35. class EnergyBaseline:
  36. """
  37. 能量基线自动校准类
  38. 功能:
  39. 1. 自动采集音频能量数据
  40. 2. 自动计算运行/停机阈值
  41. 3. 判断泵状态(运行/停机/开机/停机中)
  42. 使用方法:
  43. baseline = EnergyBaseline(pump_id="ch8")
  44. baseline.update(rms_value) # 每次处理音频时更新
  45. status = baseline.get_status() # 获取当前状态
  46. """
  47. # 状态常量
  48. # 校准期间默认为开机状态
  49. STATUS_CALIBRATING = "开机"
  50. STATUS_RUNNING = "开机"
  51. STATUS_STOPPED = "停机"
  52. STATUS_STARTING = "开机"
  53. STATUS_STOPPING = "停机"
  54. def __init__(self, pump_id: str, warmup_samples: int = 30):
  55. """
  56. 初始化能量基线
  57. 参数:
  58. pump_id: 泵标识
  59. warmup_samples: 冷启动需要的样本数(约30×8s=4分钟)
  60. """
  61. self.pump_id = pump_id
  62. self.warmup_samples = warmup_samples
  63. # 能量历史记录(用于校准)
  64. self.energy_history: List[float] = []
  65. # 最近窗口记录(用于趋势判断)
  66. self.recent_window: List[float] = []
  67. self.window_size = 5
  68. # 校准状态
  69. self.is_calibrated = False
  70. self.energy_high = None # 运行阈值
  71. self.energy_low = None # 停机阈值
  72. # 当前状态
  73. self.current_status = self.STATUS_CALIBRATING
  74. def update(self, rms: float) -> str:
  75. """
  76. 更新能量并返回当前状态
  77. 参数:
  78. rms: 当前音频的RMS能量值
  79. 返回:
  80. status: 当前泵状态
  81. """
  82. # 记录到历史(用于校准)
  83. self.energy_history.append(rms)
  84. # 记录到最近窗口(用于趋势判断)
  85. self.recent_window.append(rms)
  86. if len(self.recent_window) > self.window_size:
  87. self.recent_window.pop(0)
  88. # 冷启动阶段:等待积累足够数据
  89. if not self.is_calibrated:
  90. if len(self.energy_history) >= self.warmup_samples:
  91. self._calibrate()
  92. else:
  93. # 显示校准进度
  94. progress = len(self.energy_history)
  95. if progress % 5 == 0: # 每5个样本输出一次
  96. logger.info(f"[{self.pump_id}] 能量校准中: {progress}/{self.warmup_samples} | RMS={rms:.4f}")
  97. self.current_status = self.STATUS_CALIBRATING
  98. return self.current_status
  99. else:
  100. # 定期重新校准(每500个样本)
  101. if len(self.energy_history) > 500:
  102. self.energy_history = self.energy_history[-500:]
  103. self._calibrate()
  104. # 判断状态
  105. self.current_status = self._determine_status()
  106. return self.current_status
  107. def _calibrate(self):
  108. """
  109. 根据历史数据计算阈值
  110. 策略:
  111. 1. 假设大部分时间泵在运行
  112. 2. 取能量分布的高区间作为"运行能量"
  113. 3. 计算阈值
  114. """
  115. energies = np.array(self.energy_history)
  116. # 过滤掉极低能量(可能是静音或损坏文件)
  117. valid_energies = energies[energies > 0.001]
  118. if len(valid_energies) < 10:
  119. # 数据不足,使用默认值
  120. self.energy_high = 0.05
  121. self.energy_low = 0.02
  122. self.is_calibrated = True
  123. logger.info(f"[{self.pump_id}] 数据不足,使用默认阈值: HIGH={self.energy_high:.4f}, LOW={self.energy_low:.4f}")
  124. return
  125. # 取高能量部分(假设运行时能量较高)
  126. # 使用第30百分位以上的数据
  127. threshold = np.percentile(valid_energies, 30)
  128. high_energies = valid_energies[valid_energies > threshold]
  129. if len(high_energies) > 5:
  130. # 运行阈值 = 运行能量分布的第5百分位(下界)
  131. self.energy_high = float(np.percentile(high_energies, 5))
  132. # 停机阈值 = 运行阈值的30%
  133. self.energy_low = self.energy_high * 0.3
  134. else:
  135. # 使用所有数据的中位数
  136. self.energy_high = float(np.median(valid_energies) * 0.7)
  137. self.energy_low = self.energy_high * 0.3
  138. self.is_calibrated = True
  139. logger.info(f"[{self.pump_id}] 阈值自动校准: HIGH={self.energy_high:.4f}, LOW={self.energy_low:.4f}")
  140. def _determine_status(self) -> str:
  141. """
  142. 根据能量趋势判断泵状态
  143. 返回:
  144. status: 泵状态字符串
  145. """
  146. if len(self.recent_window) < 3:
  147. return self.STATUS_CALIBRATING
  148. # 最近能量和较早能量
  149. recent_avg = np.mean(self.recent_window[-2:]) # 最近2个窗口
  150. older_avg = np.mean(self.recent_window[:2]) # 较早2个窗口
  151. current = self.recent_window[-1]
  152. # 判断逻辑
  153. # 1. 高→低:停机过程
  154. if older_avg > self.energy_high and recent_avg < self.energy_low:
  155. return self.STATUS_STOPPING
  156. # 2. 低→高:启动过程
  157. if older_avg < self.energy_low and recent_avg > self.energy_high:
  158. return self.STATUS_STARTING
  159. # 3. 稳定高:运行中
  160. if current > self.energy_high:
  161. return self.STATUS_RUNNING
  162. # 4. 稳定低:停机中
  163. if current < self.energy_low:
  164. return self.STATUS_STOPPED
  165. # 5. 中间状态:在阈值之间
  166. # 根据与哪个阈值更接近来判断
  167. mid_point = (self.energy_high + self.energy_low) / 2
  168. if current >= mid_point:
  169. return self.STATUS_RUNNING
  170. else:
  171. return self.STATUS_STOPPED
  172. def get_status(self) -> str:
  173. """
  174. 获取当前泵状态
  175. 返回:
  176. status: 当前状态
  177. """
  178. return self.current_status
  179. def is_running(self) -> bool:
  180. """
  181. 判断泵是否在运行
  182. 只有"运行中"状态返回True
  183. 用于决定是否进行AE异常检测
  184. 返回:
  185. bool: 是否运行中
  186. """
  187. return self.current_status == self.STATUS_RUNNING
  188. def get_calibration_info(self) -> Dict:
  189. """
  190. 获取校准信息
  191. 返回:
  192. dict: 校准状态和阈值信息
  193. """
  194. return {
  195. 'is_calibrated': self.is_calibrated,
  196. 'energy_high': self.energy_high,
  197. 'energy_low': self.energy_low,
  198. 'samples_collected': len(self.energy_history),
  199. 'current_status': self.current_status
  200. }