| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- anomaly_classifier.py
- ---------------------
- 规则分类器 - 用于判断异常类型
- 异常类型编码 (level_two):
- - 6: 未分类/一般异常
- - 7: 轴承问题
- - 8: 气蚀问题
- - 9: 松动/共振
- - 10: 叶轮问题
- - 11: 阀件/冲击
- """
- import numpy as np
- import logging
- logger = logging.getLogger('AnomalyClassifier')
- # 异常类型与编码映射
- ANOMALY_TYPES = {
- 'unknown': {'code': 6, 'name': '未分类异常'},
- 'bearing': {'code': 7, 'name': '轴承问题'},
- 'cavitation': {'code': 8, 'name': '气蚀问题'},
- 'loosening': {'code': 9, 'name': '松动/共振'},
- 'impeller': {'code': 10, 'name': '叶轮问题'},
- 'valve': {'code': 11, 'name': '阀件/冲击'},
- }
- from pathlib import Path
- # 基线文件路径
- BASELINE_FILE = Path(__file__).parent / "models" / "classifier_baseline.npy"
- # 默认基线(泵正常运行时的典型特征值)
- DEFAULT_BASELINE = {
- 'rms': 0.02,
- 'zcr': 0.05,
- 'energy_std': 0.3,
- 'spectral_centroid': 2000.0,
- 'spectral_bandwidth': 1500.0,
- 'spectral_flatness': 0.01,
- 'low_energy': 0.1,
- 'mid_energy': 0.05,
- 'high_energy': 0.02,
- 'has_periodic': False,
- }
- class AnomalyClassifier:
- """
- 规则分类器
- 基于音频特征判断异常类型
- """
-
- def __init__(self):
- # 正常基线特征
- self.normal_baseline = None
- # 自动加载基线
- self._load_baseline()
-
- def _load_baseline(self):
- """
- 从文件加载基线,如不存在则使用默认值
- """
- if BASELINE_FILE.exists():
- try:
- data = np.load(BASELINE_FILE, allow_pickle=True).item()
- self.normal_baseline = data
- logger.info(f"已加载分类器基线: {BASELINE_FILE.name}")
- except Exception as e:
- logger.warning(f"加载基线失败: {e},使用默认基线")
- self.normal_baseline = DEFAULT_BASELINE.copy()
- else:
- # 使用默认基线
- self.normal_baseline = DEFAULT_BASELINE.copy()
- logger.info("使用默认分类器基线")
-
- def set_baseline(self, baseline_features: dict):
- """
- 设置正常基线
-
- 参数:
- baseline_features: 正常状态的平均特征
- """
- self.normal_baseline = baseline_features
-
- def save_baseline(self, baseline_features: dict = None):
- """
- 保存基线到文件
-
- 参数:
- baseline_features: 基线特征字典,None则保存当前基线
- """
- if baseline_features is not None:
- self.normal_baseline = baseline_features
-
- if self.normal_baseline is None:
- logger.warning("无基线可保存")
- return False
-
- try:
- BASELINE_FILE.parent.mkdir(parents=True, exist_ok=True)
- np.save(BASELINE_FILE, self.normal_baseline)
- logger.info(f"已保存分类器基线: {BASELINE_FILE.name}")
- return True
- except Exception as e:
- logger.error(f"保存基线失败: {e}")
- return False
-
- def extract_features(self, y: np.ndarray, sr: int = 16000) -> dict:
- """
- 提取音频特征
-
- 参数:
- y: 音频信号
- sr: 采样率
-
- 返回:
- 特征字典
- """
- try:
- import librosa
- except ImportError:
- logger.warning("librosa未安装,无法提取特征")
- return {}
-
- # 时域特征
- rms = float(np.sqrt(np.mean(y**2)))
- zcr = float(np.mean(librosa.feature.zero_crossing_rate(y)))
-
- # 能量波动
- frame_length = int(0.025 * sr)
- hop = int(0.010 * sr)
- frames = librosa.util.frame(y, frame_length=frame_length, hop_length=hop)
- frame_energies = np.sum(frames**2, axis=0)
- energy_std = float(np.std(frame_energies) / (np.mean(frame_energies) + 1e-10))
-
- # 频域特征
- spectral_centroid = float(np.mean(librosa.feature.spectral_centroid(y=y, sr=sr)))
- spectral_bandwidth = float(np.mean(librosa.feature.spectral_bandwidth(y=y, sr=sr)))
- spectral_flatness = float(np.mean(librosa.feature.spectral_flatness(y=y)))
-
- # 频段能量
- D = np.abs(librosa.stft(y, n_fft=1024, hop_length=256))
- freqs = librosa.fft_frequencies(sr=sr, n_fft=1024)
-
- low_mask = freqs < 1000
- mid_mask = (freqs >= 1000) & (freqs < 3000)
- high_mask = freqs >= 3000
-
- low_energy = float(np.mean(D[low_mask, :]))
- mid_energy = float(np.mean(D[mid_mask, :]))
- high_energy = float(np.mean(D[high_mask, :]))
-
- # 周期性检测(简化版)
- autocorr = np.correlate(y[:min(len(y), sr)], y[:min(len(y), sr)], mode='full')
- autocorr = autocorr[len(autocorr)//2:]
- autocorr = autocorr / (autocorr[0] + 1e-10)
-
- min_lag = int(0.01 * sr)
- max_lag = int(0.5 * sr)
- search_range = autocorr[min_lag:max_lag]
-
- has_periodic = False
- if len(search_range) > 0:
- peak_value = np.max(search_range)
- has_periodic = peak_value > 0.3
-
- return {
- 'rms': rms,
- 'zcr': zcr,
- 'energy_std': energy_std,
- 'spectral_centroid': spectral_centroid,
- 'spectral_bandwidth': spectral_bandwidth,
- 'spectral_flatness': spectral_flatness,
- 'low_energy': low_energy,
- 'mid_energy': mid_energy,
- 'high_energy': high_energy,
- 'has_periodic': has_periodic,
- }
-
- def classify(self, current_features: dict) -> tuple:
- """
- 分类异常类型
-
- 参数:
- current_features: 当前音频特征
-
- 返回:
- (level_two编码, 异常类型名称, 置信度)
- """
- if self.normal_baseline is None:
- # 无基线,返回未分类
- return 6, '未分类异常', 0.0
-
- # 计算变化率
- def safe_change(curr, base):
- if base == 0:
- return 0.0
- return (curr - base) / base
-
- changes = {
- 'high_freq': safe_change(current_features.get('high_energy', 0), self.normal_baseline.get('high_energy', 1)),
- 'low_freq': safe_change(current_features.get('low_energy', 0), self.normal_baseline.get('low_energy', 1)),
- 'zcr': safe_change(current_features.get('zcr', 0), self.normal_baseline.get('zcr', 1)),
- 'centroid': safe_change(current_features.get('spectral_centroid', 0), self.normal_baseline.get('spectral_centroid', 1)),
- 'bandwidth': safe_change(current_features.get('spectral_bandwidth', 0), self.normal_baseline.get('spectral_bandwidth', 1)),
- 'energy_std': safe_change(current_features.get('energy_std', 0), self.normal_baseline.get('energy_std', 1)),
- 'flatness': safe_change(current_features.get('spectral_flatness', 0), self.normal_baseline.get('spectral_flatness', 1)),
- 'rms': safe_change(current_features.get('rms', 0), self.normal_baseline.get('rms', 1)),
- }
- has_periodic = current_features.get('has_periodic', False)
-
- # 规则判断
- scores = {
- 'bearing': 0.0,
- 'cavitation': 0.0,
- 'loosening': 0.0,
- 'impeller': 0.0,
- 'valve': 0.0,
- }
-
- # 轴承问题:高频增加、过零率变高、频谱质心上移
- if changes['high_freq'] > 0.3:
- scores['bearing'] += 0.4
- if changes['zcr'] > 0.2:
- scores['bearing'] += 0.3
- if changes['centroid'] > 0.15:
- scores['bearing'] += 0.3
-
- # 气蚀问题:噪声增加、频谱变宽、能量波动大
- if changes['bandwidth'] > 0.25:
- scores['cavitation'] += 0.35
- if changes['energy_std'] > 0.4:
- scores['cavitation'] += 0.35
- if changes['flatness'] > 0.2:
- scores['cavitation'] += 0.3
-
- # 松动/共振:低频增加、周期性冲击
- if changes['low_freq'] > 0.35:
- scores['loosening'] += 0.5
- if has_periodic:
- scores['loosening'] += 0.5
-
- # 叶轮问题:能量变化、周期性
- if changes['rms'] > 0.3:
- scores['impeller'] += 0.35
- if abs(changes['centroid']) > 0.25:
- scores['impeller'] += 0.35
- if has_periodic:
- scores['impeller'] += 0.3
-
- # 阀件/冲击:能量波动大、低频有增加
- if changes['energy_std'] > 0.5:
- scores['valve'] += 0.6
- if changes['low_freq'] > 0.2:
- scores['valve'] += 0.4
-
- # 选择最高分
- if max(scores.values()) < 0.3:
- return 6, '未分类异常', 0.0
-
- best_type = max(scores, key=scores.get)
- confidence = min(scores[best_type], 1.0)
-
- code = ANOMALY_TYPES[best_type]['code']
- name = ANOMALY_TYPES[best_type]['name']
-
- return code, name, confidence
-
- def classify_audio(self, y: np.ndarray, sr: int = 16000) -> tuple:
- """
- 直接从音频分类
-
- 参数:
- y: 音频信号
- sr: 采样率
-
- 返回:
- (level_two编码, 异常类型名称, 置信度)
- """
- features = self.extract_features(y, sr)
- return self.classify(features)
- # 全局分类器实例
- _classifier = None
- def get_classifier() -> AnomalyClassifier:
- """获取全局分类器实例"""
- global _classifier
- if _classifier is None:
- _classifier = AnomalyClassifier()
- return _classifier
- def classify_anomaly(y: np.ndarray, sr: int = 16000) -> tuple:
- """
- 便捷函数:分类异常类型
-
- 参数:
- y: 音频信号
- sr: 采样率
-
- 返回:
- (level_two编码, 异常类型名称, 置信度)
- """
- classifier = get_classifier()
- return classifier.classify_audio(y, sr)
|