# -*- coding: utf-8 -*- """ energy_baseline.py ------------------ 能量基线自动校准模块 - 基于音频能量判断泵启停状态 功能: 1. 冷启动时自动采集能量数据,计算运行/停机阈值 2. 根据实时 RMS 能量判断泵状态(开机/停机) 3. 通过滑动窗口检测能量趋势变化 与 pump_state_monitor 的区别: - pump_state_monitor: 基于 PLC 查询,准确但需 SCADA 系统 - energy_baseline: 基于音频能量,无需外部系统,适合无 PLC 场景 调用流程: baseline = EnergyBaseline(pump_id="ch8", warmup_samples=30) # 每次处理音频时: rms = np.sqrt(np.mean(y ** 2)) # 计算 RMS 能量 status = baseline.update(rms) # 返回 "开机" / "停机" # 判断是否运行中(决定是否做异常检测) if baseline.is_running(): # 进行异常检测 pass 阈值计算逻辑: - energy_high = 运行时能量下界(高能量分布的第5百分位) - energy_low = energy_high × 30% - 能量 > energy_high -> 运行中 - 能量 < energy_low -> 停机中 注意: - 冷启动约4分钟(30样本×8秒),期间默认返回"开机" - 每500个样本自动重新校准阈值 """ import logging import numpy as np from typing import Dict, List logger = logging.getLogger(__name__) class EnergyBaseline: """ 能量基线自动校准类 功能: 1. 自动采集音频能量数据 2. 自动计算运行/停机阈值 3. 判断泵状态(运行/停机/开机/停机中) 使用方法: baseline = EnergyBaseline(pump_id="ch8") baseline.update(rms_value) # 每次处理音频时更新 status = baseline.get_status() # 获取当前状态 """ # 状态常量 # 校准期间默认为开机状态 STATUS_CALIBRATING = "开机" STATUS_RUNNING = "开机" STATUS_STOPPED = "停机" STATUS_STARTING = "开机" STATUS_STOPPING = "停机" def __init__(self, pump_id: str, warmup_samples: int = 30): """ 初始化能量基线 参数: pump_id: 泵标识 warmup_samples: 冷启动需要的样本数(约30×8s=4分钟) """ self.pump_id = pump_id self.warmup_samples = warmup_samples # 能量历史记录(用于校准) self.energy_history: List[float] = [] # 最近窗口记录(用于趋势判断) self.recent_window: List[float] = [] self.window_size = 5 # 校准状态 self.is_calibrated = False self.energy_high = None # 运行阈值 self.energy_low = None # 停机阈值 # 当前状态 self.current_status = self.STATUS_CALIBRATING def update(self, rms: float) -> str: """ 更新能量并返回当前状态 参数: rms: 当前音频的RMS能量值 返回: status: 当前泵状态 """ # 记录到历史(用于校准) self.energy_history.append(rms) # 记录到最近窗口(用于趋势判断) self.recent_window.append(rms) if len(self.recent_window) > self.window_size: self.recent_window.pop(0) # 冷启动阶段:等待积累足够数据 if not self.is_calibrated: if len(self.energy_history) >= self.warmup_samples: self._calibrate() else: # 显示校准进度 progress = len(self.energy_history) if progress % 5 == 0: # 每5个样本输出一次 logger.info(f"[{self.pump_id}] 能量校准中: {progress}/{self.warmup_samples} | RMS={rms:.4f}") self.current_status = self.STATUS_CALIBRATING return self.current_status else: # 定期重新校准(每500个样本) if len(self.energy_history) > 500: self.energy_history = self.energy_history[-500:] self._calibrate() # 判断状态 self.current_status = self._determine_status() return self.current_status def _calibrate(self): """ 根据历史数据计算阈值 策略: 1. 假设大部分时间泵在运行 2. 取能量分布的高区间作为"运行能量" 3. 计算阈值 """ energies = np.array(self.energy_history) # 过滤掉极低能量(可能是静音或损坏文件) valid_energies = energies[energies > 0.001] if len(valid_energies) < 10: # 数据不足,使用默认值 self.energy_high = 0.05 self.energy_low = 0.02 self.is_calibrated = True logger.info(f"[{self.pump_id}] 数据不足,使用默认阈值: HIGH={self.energy_high:.4f}, LOW={self.energy_low:.4f}") return # 取高能量部分(假设运行时能量较高) # 使用第30百分位以上的数据 threshold = np.percentile(valid_energies, 30) high_energies = valid_energies[valid_energies > threshold] if len(high_energies) > 5: # 运行阈值 = 运行能量分布的第5百分位(下界) self.energy_high = float(np.percentile(high_energies, 5)) # 停机阈值 = 运行阈值的30% self.energy_low = self.energy_high * 0.3 else: # 使用所有数据的中位数 self.energy_high = float(np.median(valid_energies) * 0.7) self.energy_low = self.energy_high * 0.3 self.is_calibrated = True logger.info(f"[{self.pump_id}] 阈值自动校准: HIGH={self.energy_high:.4f}, LOW={self.energy_low:.4f}") def _determine_status(self) -> str: """ 根据能量趋势判断泵状态 返回: status: 泵状态字符串 """ if len(self.recent_window) < 3: return self.STATUS_CALIBRATING # 最近能量和较早能量 recent_avg = np.mean(self.recent_window[-2:]) # 最近2个窗口 older_avg = np.mean(self.recent_window[:2]) # 较早2个窗口 current = self.recent_window[-1] # 判断逻辑 # 1. 高→低:停机过程 if older_avg > self.energy_high and recent_avg < self.energy_low: return self.STATUS_STOPPING # 2. 低→高:启动过程 if older_avg < self.energy_low and recent_avg > self.energy_high: return self.STATUS_STARTING # 3. 稳定高:运行中 if current > self.energy_high: return self.STATUS_RUNNING # 4. 稳定低:停机中 if current < self.energy_low: return self.STATUS_STOPPED # 5. 中间状态:在阈值之间 # 根据与哪个阈值更接近来判断 mid_point = (self.energy_high + self.energy_low) / 2 if current >= mid_point: return self.STATUS_RUNNING else: return self.STATUS_STOPPED def get_status(self) -> str: """ 获取当前泵状态 返回: status: 当前状态 """ return self.current_status def is_running(self) -> bool: """ 判断泵是否在运行 只有"运行中"状态返回True 用于决定是否进行AE异常检测 返回: bool: 是否运行中 """ return self.current_status == self.STATUS_RUNNING def get_calibration_info(self) -> Dict: """ 获取校准信息 返回: dict: 校准状态和阈值信息 """ return { 'is_calibrated': self.is_calibrated, 'energy_high': self.energy_high, 'energy_low': self.energy_low, 'samples_collected': len(self.energy_history), 'current_status': self.current_status }