| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- # -*- coding: utf-8 -*-
- """
- energy_baseline.py
- ------------------
- 能量基线自动校准模块 - 基于音频能量判断泵启停状态
- 功能:
- 1. 冷启动时自动采集能量数据,计算运行/停机阈值
- 2. 根据实时 RMS 能量判断泵状态(开机/停机)
- 3. 通过滑动窗口检测能量趋势变化
- 与 pump_state_monitor 的区别:
- - pump_state_monitor: 基于 PLC 查询,准确但需 SCADA 系统
- - energy_baseline: 基于音频能量,无需外部系统,适合无 PLC 场景
- 调用流程:
- baseline = EnergyBaseline(pump_id="ch8", warmup_samples=30)
-
- # 每次处理音频时:
- rms = np.sqrt(np.mean(y ** 2)) # 计算 RMS 能量
- status = baseline.update(rms) # 返回 "开机" / "停机"
-
- # 判断是否运行中(决定是否做异常检测)
- if baseline.is_running():
- # 进行异常检测
- pass
- 阈值计算逻辑:
- - energy_high = 运行时能量下界(高能量分布的第5百分位)
- - energy_low = energy_high × 30%
- - 能量 > energy_high -> 运行中
- - 能量 < energy_low -> 停机中
- 注意:
- - 冷启动约4分钟(30样本×8秒),期间默认返回"开机"
- - 每500个样本自动重新校准阈值
- """
- import logging
- import numpy as np
- from typing import Dict, List
- logger = logging.getLogger(__name__)
- class EnergyBaseline:
- """
- 能量基线自动校准类
-
- 功能:
- 1. 自动采集音频能量数据
- 2. 自动计算运行/停机阈值
- 3. 判断泵状态(运行/停机/开机/停机中)
-
- 使用方法:
- baseline = EnergyBaseline(pump_id="ch8")
- baseline.update(rms_value) # 每次处理音频时更新
- status = baseline.get_status() # 获取当前状态
- """
-
- # 状态常量
- # 校准期间默认为开机状态
- STATUS_CALIBRATING = "开机"
- STATUS_RUNNING = "开机"
- STATUS_STOPPED = "停机"
- STATUS_STARTING = "开机"
- STATUS_STOPPING = "停机"
-
- def __init__(self, pump_id: str, warmup_samples: int = 30):
- """
- 初始化能量基线
-
- 参数:
- pump_id: 泵标识
- warmup_samples: 冷启动需要的样本数(约30×8s=4分钟)
- """
- self.pump_id = pump_id
- self.warmup_samples = warmup_samples
-
- # 能量历史记录(用于校准)
- self.energy_history: List[float] = []
-
- # 最近窗口记录(用于趋势判断)
- self.recent_window: List[float] = []
- self.window_size = 5
-
- # 校准状态
- self.is_calibrated = False
- self.energy_high = None # 运行阈值
- self.energy_low = None # 停机阈值
-
- # 当前状态
- self.current_status = self.STATUS_CALIBRATING
-
- def update(self, rms: float) -> str:
- """
- 更新能量并返回当前状态
-
- 参数:
- rms: 当前音频的RMS能量值
-
- 返回:
- status: 当前泵状态
- """
- # 记录到历史(用于校准)
- self.energy_history.append(rms)
-
- # 记录到最近窗口(用于趋势判断)
- self.recent_window.append(rms)
- if len(self.recent_window) > self.window_size:
- self.recent_window.pop(0)
-
- # 冷启动阶段:等待积累足够数据
- if not self.is_calibrated:
- if len(self.energy_history) >= self.warmup_samples:
- self._calibrate()
- else:
- # 显示校准进度
- progress = len(self.energy_history)
- if progress % 5 == 0: # 每5个样本输出一次
- logger.info(f"[{self.pump_id}] 能量校准中: {progress}/{self.warmup_samples} | RMS={rms:.4f}")
- self.current_status = self.STATUS_CALIBRATING
- return self.current_status
- else:
- # 定期重新校准(每500个样本)
- if len(self.energy_history) > 500:
- self.energy_history = self.energy_history[-500:]
- self._calibrate()
-
- # 判断状态
- self.current_status = self._determine_status()
- return self.current_status
-
- def _calibrate(self):
- """
- 根据历史数据计算阈值
-
- 策略:
- 1. 假设大部分时间泵在运行
- 2. 取能量分布的高区间作为"运行能量"
- 3. 计算阈值
- """
- energies = np.array(self.energy_history)
-
- # 过滤掉极低能量(可能是静音或损坏文件)
- valid_energies = energies[energies > 0.001]
-
- if len(valid_energies) < 10:
- # 数据不足,使用默认值
- self.energy_high = 0.05
- self.energy_low = 0.02
- self.is_calibrated = True
- logger.info(f"[{self.pump_id}] 数据不足,使用默认阈值: HIGH={self.energy_high:.4f}, LOW={self.energy_low:.4f}")
- return
-
- # 取高能量部分(假设运行时能量较高)
- # 使用第30百分位以上的数据
- threshold = np.percentile(valid_energies, 30)
- high_energies = valid_energies[valid_energies > threshold]
-
- if len(high_energies) > 5:
- # 运行阈值 = 运行能量分布的第5百分位(下界)
- self.energy_high = float(np.percentile(high_energies, 5))
- # 停机阈值 = 运行阈值的30%
- self.energy_low = self.energy_high * 0.3
- else:
- # 使用所有数据的中位数
- self.energy_high = float(np.median(valid_energies) * 0.7)
- self.energy_low = self.energy_high * 0.3
-
- self.is_calibrated = True
- logger.info(f"[{self.pump_id}] 阈值自动校准: HIGH={self.energy_high:.4f}, LOW={self.energy_low:.4f}")
-
- def _determine_status(self) -> str:
- """
- 根据能量趋势判断泵状态
-
- 返回:
- status: 泵状态字符串
- """
- if len(self.recent_window) < 3:
- return self.STATUS_CALIBRATING
-
- # 最近能量和较早能量
- recent_avg = np.mean(self.recent_window[-2:]) # 最近2个窗口
- older_avg = np.mean(self.recent_window[:2]) # 较早2个窗口
- current = self.recent_window[-1]
-
- # 判断逻辑
- # 1. 高→低:停机过程
- if older_avg > self.energy_high and recent_avg < self.energy_low:
- return self.STATUS_STOPPING
-
- # 2. 低→高:启动过程
- if older_avg < self.energy_low and recent_avg > self.energy_high:
- return self.STATUS_STARTING
-
- # 3. 稳定高:运行中
- if current > self.energy_high:
- return self.STATUS_RUNNING
-
- # 4. 稳定低:停机中
- if current < self.energy_low:
- return self.STATUS_STOPPED
-
- # 5. 中间状态:在阈值之间
- # 根据与哪个阈值更接近来判断
- mid_point = (self.energy_high + self.energy_low) / 2
- if current >= mid_point:
- return self.STATUS_RUNNING
- else:
- return self.STATUS_STOPPED
-
- def get_status(self) -> str:
- """
- 获取当前泵状态
-
- 返回:
- status: 当前状态
- """
- return self.current_status
-
- def is_running(self) -> bool:
- """
- 判断泵是否在运行
-
- 只有"运行中"状态返回True
- 用于决定是否进行AE异常检测
-
- 返回:
- bool: 是否运行中
- """
- return self.current_status == self.STATUS_RUNNING
-
- def get_calibration_info(self) -> Dict:
- """
- 获取校准信息
-
- 返回:
- dict: 校准状态和阈值信息
- """
- return {
- 'is_calibrated': self.is_calibrated,
- 'energy_high': self.energy_high,
- 'energy_low': self.energy_low,
- 'samples_collected': len(self.energy_history),
- 'current_status': self.current_status
- }
|