#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ anomaly_classifier.py --------------------- 规则分类器 - 用于判断异常类型 异常类型编码 (level_two): - 6: 未分类/一般异常 - 7: 轴承问题 - 8: 气蚀问题 - 9: 松动/共振 - 10: 叶轮问题 - 11: 阀件/冲击 """ import numpy as np import logging logger = logging.getLogger('AnomalyClassifier') # 异常类型与编码映射 ANOMALY_TYPES = { 'unknown': {'code': 6, 'name': '未分类异常'}, 'bearing': {'code': 7, 'name': '轴承问题'}, 'cavitation': {'code': 8, 'name': '气蚀问题'}, 'loosening': {'code': 9, 'name': '松动/共振'}, 'impeller': {'code': 10, 'name': '叶轮问题'}, 'valve': {'code': 11, 'name': '阀件/冲击'}, } from pathlib import Path # 基线文件路径 BASELINE_FILE = Path(__file__).parent / "models" / "classifier_baseline.npy" # 默认基线(泵正常运行时的典型特征值) DEFAULT_BASELINE = { 'rms': 0.02, 'zcr': 0.05, 'energy_std': 0.3, 'spectral_centroid': 2000.0, 'spectral_bandwidth': 1500.0, 'spectral_flatness': 0.01, 'low_energy': 0.1, 'mid_energy': 0.05, 'high_energy': 0.02, 'has_periodic': False, } class AnomalyClassifier: """ 规则分类器 基于音频特征判断异常类型 """ def __init__(self): # 正常基线特征 self.normal_baseline = None # 自动加载基线 self._load_baseline() def _load_baseline(self): """ 从文件加载基线,如不存在则使用默认值 """ if BASELINE_FILE.exists(): try: data = np.load(BASELINE_FILE, allow_pickle=True).item() self.normal_baseline = data logger.info(f"已加载分类器基线: {BASELINE_FILE.name}") except Exception as e: logger.warning(f"加载基线失败: {e},使用默认基线") self.normal_baseline = DEFAULT_BASELINE.copy() else: # 使用默认基线 self.normal_baseline = DEFAULT_BASELINE.copy() logger.info("使用默认分类器基线") def set_baseline(self, baseline_features: dict): """ 设置正常基线 参数: baseline_features: 正常状态的平均特征 """ self.normal_baseline = baseline_features def save_baseline(self, baseline_features: dict = None): """ 保存基线到文件 参数: baseline_features: 基线特征字典,None则保存当前基线 """ if baseline_features is not None: self.normal_baseline = baseline_features if self.normal_baseline is None: logger.warning("无基线可保存") return False try: BASELINE_FILE.parent.mkdir(parents=True, exist_ok=True) np.save(BASELINE_FILE, self.normal_baseline) logger.info(f"已保存分类器基线: {BASELINE_FILE.name}") return True except Exception as e: logger.error(f"保存基线失败: {e}") return False def extract_features(self, y: np.ndarray, sr: int = 16000) -> dict: """ 提取音频特征 参数: y: 音频信号 sr: 采样率 返回: 特征字典 """ try: import librosa except ImportError: logger.warning("librosa未安装,无法提取特征") return {} # 时域特征 rms = float(np.sqrt(np.mean(y**2))) zcr = float(np.mean(librosa.feature.zero_crossing_rate(y))) # 能量波动 frame_length = int(0.025 * sr) hop = int(0.010 * sr) frames = librosa.util.frame(y, frame_length=frame_length, hop_length=hop) frame_energies = np.sum(frames**2, axis=0) energy_std = float(np.std(frame_energies) / (np.mean(frame_energies) + 1e-10)) # 频域特征 spectral_centroid = float(np.mean(librosa.feature.spectral_centroid(y=y, sr=sr))) spectral_bandwidth = float(np.mean(librosa.feature.spectral_bandwidth(y=y, sr=sr))) spectral_flatness = float(np.mean(librosa.feature.spectral_flatness(y=y))) # 频段能量 D = np.abs(librosa.stft(y, n_fft=1024, hop_length=256)) freqs = librosa.fft_frequencies(sr=sr, n_fft=1024) low_mask = freqs < 1000 mid_mask = (freqs >= 1000) & (freqs < 3000) high_mask = freqs >= 3000 low_energy = float(np.mean(D[low_mask, :])) mid_energy = float(np.mean(D[mid_mask, :])) high_energy = float(np.mean(D[high_mask, :])) # 周期性检测(简化版) autocorr = np.correlate(y[:min(len(y), sr)], y[:min(len(y), sr)], mode='full') autocorr = autocorr[len(autocorr)//2:] autocorr = autocorr / (autocorr[0] + 1e-10) min_lag = int(0.01 * sr) max_lag = int(0.5 * sr) search_range = autocorr[min_lag:max_lag] has_periodic = False if len(search_range) > 0: peak_value = np.max(search_range) has_periodic = peak_value > 0.3 return { 'rms': rms, 'zcr': zcr, 'energy_std': energy_std, 'spectral_centroid': spectral_centroid, 'spectral_bandwidth': spectral_bandwidth, 'spectral_flatness': spectral_flatness, 'low_energy': low_energy, 'mid_energy': mid_energy, 'high_energy': high_energy, 'has_periodic': has_periodic, } def classify(self, current_features: dict) -> tuple: """ 分类异常类型 参数: current_features: 当前音频特征 返回: (level_two编码, 异常类型名称, 置信度) """ if self.normal_baseline is None: # 无基线,返回未分类 return 6, '未分类异常', 0.0 # 计算变化率 def safe_change(curr, base): if base == 0: return 0.0 return (curr - base) / base changes = { 'high_freq': safe_change(current_features.get('high_energy', 0), self.normal_baseline.get('high_energy', 1)), 'low_freq': safe_change(current_features.get('low_energy', 0), self.normal_baseline.get('low_energy', 1)), 'zcr': safe_change(current_features.get('zcr', 0), self.normal_baseline.get('zcr', 1)), 'centroid': safe_change(current_features.get('spectral_centroid', 0), self.normal_baseline.get('spectral_centroid', 1)), 'bandwidth': safe_change(current_features.get('spectral_bandwidth', 0), self.normal_baseline.get('spectral_bandwidth', 1)), 'energy_std': safe_change(current_features.get('energy_std', 0), self.normal_baseline.get('energy_std', 1)), 'flatness': safe_change(current_features.get('spectral_flatness', 0), self.normal_baseline.get('spectral_flatness', 1)), 'rms': safe_change(current_features.get('rms', 0), self.normal_baseline.get('rms', 1)), } has_periodic = current_features.get('has_periodic', False) # 规则判断 scores = { 'bearing': 0.0, 'cavitation': 0.0, 'loosening': 0.0, 'impeller': 0.0, 'valve': 0.0, } # 轴承问题:高频增加、过零率变高、频谱质心上移 if changes['high_freq'] > 0.3: scores['bearing'] += 0.4 if changes['zcr'] > 0.2: scores['bearing'] += 0.3 if changes['centroid'] > 0.15: scores['bearing'] += 0.3 # 气蚀问题:噪声增加、频谱变宽、能量波动大 if changes['bandwidth'] > 0.25: scores['cavitation'] += 0.35 if changes['energy_std'] > 0.4: scores['cavitation'] += 0.35 if changes['flatness'] > 0.2: scores['cavitation'] += 0.3 # 松动/共振:低频增加、周期性冲击 if changes['low_freq'] > 0.35: scores['loosening'] += 0.5 if has_periodic: scores['loosening'] += 0.5 # 叶轮问题:能量变化、周期性 if changes['rms'] > 0.3: scores['impeller'] += 0.35 if abs(changes['centroid']) > 0.25: scores['impeller'] += 0.35 if has_periodic: scores['impeller'] += 0.3 # 阀件/冲击:能量波动大、低频有增加 if changes['energy_std'] > 0.5: scores['valve'] += 0.6 if changes['low_freq'] > 0.2: scores['valve'] += 0.4 # 选择最高分 if max(scores.values()) < 0.3: return 6, '未分类异常', 0.0 best_type = max(scores, key=scores.get) confidence = min(scores[best_type], 1.0) code = ANOMALY_TYPES[best_type]['code'] name = ANOMALY_TYPES[best_type]['name'] return code, name, confidence def classify_audio(self, y: np.ndarray, sr: int = 16000) -> tuple: """ 直接从音频分类 参数: y: 音频信号 sr: 采样率 返回: (level_two编码, 异常类型名称, 置信度) """ features = self.extract_features(y, sr) return self.classify(features) # 全局分类器实例 _classifier = None def get_classifier() -> AnomalyClassifier: """获取全局分类器实例""" global _classifier if _classifier is None: _classifier = AnomalyClassifier() return _classifier def classify_anomaly(y: np.ndarray, sr: int = 16000) -> tuple: """ 便捷函数:分类异常类型 参数: y: 音频信号 sr: 采样率 返回: (level_two编码, 异常类型名称, 置信度) """ classifier = get_classifier() return classifier.classify_audio(y, sr)