#!/usr/bin/env python # -*- coding: utf-8 -*- """ run_pickup_monitor.py --------------------- 拾音器异响检测主程序 功能说明: 该程序用于音频采集设备(拾音器)的实时异常检测。 与摄像头版本不同,本版本: 1. 没有视频/图片采集和上报 2. 上报时包含音频频谱图分析数据 3. 支持进水流量PLC数据读取 4. 每1分钟计算一次平均重建误差并上报 上报数据结构: { "message": { "channelInfo": {"name": "设备信息"}, "requestTime": 时间戳, "classification": { "level_one": 2, // 音频检测大类 "level_two": 6 // 异常类型小类: 6=未分类, 7=轴承, 8=气蚀, 9=松动/共振, 10=叶轮, 11=阀件 }, "skillInfo": {"name": "异响检测"}, "sound_detection": { "abnormalwav": "", // base64编码的音频 "status": 0/1, // 0=异常, 1=正常 "condition": { "running_status": "运行中/停机中", // 启停状态 "inlet_flow": 0.0 // 进水流量 }, "score": { "abnormal_score": 0.0, // 当前1分钟内8s平均重构误差 "score_threshold": 0.0 // 该设备的异常阀值 }, "frequency": { "this_frequency": [], // 当前1分钟的频谱图 "normal_frequency_middle": [], // 过去10分钟的频谱图平均 "normal_frequency_upper": [], // 上限(暂为空) "normal_frequency_lower": [] // 下限(暂为空) } } } } 使用方法: python run_pickup_monitor.py 配置文件: config/rtsp_config.yaml """ import subprocess import time import re import signal import sys import threading import logging import base64 from concurrent.futures import ThreadPoolExecutor from pathlib import Path from datetime import datetime, timedelta from collections import defaultdict # 用于计算FFT import numpy as np try: import librosa except ImportError: print("错误:缺少librosa库,请安装: pip install librosa") sys.exit(1) try: import requests except ImportError: print("错误:缺少requests库,请安装: pip install requests") sys.exit(1) import json as _json from urllib import request as _urllib_request, error as _urllib_error # 导入预测器模块 from predictor import MultiModelPredictor, CFG # 导入配置管理模块(SQLite) from config.config_manager import ConfigManager from config.config_api import app as config_app, init_config_api from config.db_models import get_db_path # 导入泵状态监控模块(用于检测启停过渡期) try: from core.pump_state_monitor import PumpStateMonitor PUMP_STATE_MONITOR_AVAILABLE = True except ImportError: PUMP_STATE_MONITOR_AVAILABLE = False # 导入人体检测读取模块(用于抑制有人时的误报) try: from core.human_detection_reader import HumanDetectionReader HUMAN_DETECTION_AVAILABLE = True except ImportError: HUMAN_DETECTION_AVAILABLE = False # 导入报警聚合器(跨设备聚合抑制 + 分类型冷却) try: from core.alert_aggregator import AlertAggregator ALERT_AGGREGATOR_AVAILABLE = True except ImportError: ALERT_AGGREGATOR_AVAILABLE = False # 导入音频上传旁路(边云协同 - 训练数据上传) # enabled=false 时所有入队操作都是 no-op,零开销 try: from core.audio_uploader import AudioUploader AUDIO_UPLOADER_AVAILABLE = True except ImportError: AUDIO_UPLOADER_AVAILABLE = False # ======================================== # 配置日志系统 # ======================================== def setup_logging(): # 配置日志系统(RotatingFileHandler -> system.log) from logging.handlers import RotatingFileHandler # 如果根 logger 已经被上层调用者配置过,则直接复用 root = logging.getLogger() if root.handlers: return logging.getLogger('PickupMonitor') # 日志配置 log_dir = Path(__file__).parent / "logs" log_dir.mkdir(parents=True, exist_ok=True) log_file = log_dir / "system.log" formatter = logging.Formatter( '%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 按文件大小轮转,最多保留 2 个备份(共 30MB) file_handler = RotatingFileHandler( log_file, maxBytes=10 * 1024 * 1024, backupCount=2, encoding='utf-8' ) file_handler.setFormatter(formatter) # 控制台输出(前台运行时可见,后台运行时 stdout 已被丢弃不影响) console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(formatter) logging.basicConfig( level=logging.INFO, handlers=[file_handler, console_handler] ) return logging.getLogger('PickupMonitor') # 初始化日志系统 logger = setup_logging() # 导入能量基线模块 try: from core.energy_baseline import EnergyBaseline ENERGY_BASELINE_AVAILABLE = True except ImportError: ENERGY_BASELINE_AVAILABLE = False logger.warning("能量基线模块未找到,泵状态检测功能禁用") class RTSPStreamConfig: """ RTSP流配置类 封装单个RTSP流的配置信息,包含拾音器特有字段 """ def __init__(self, plant_name, rtsp_url, channel, camera_name, device_code, pump_name, flow_plc, project_id): """ 初始化RTSP流配置 参数: plant_name: 区域名称(泵房-反渗透高压泵等) rtsp_url: RTSP流URL channel: 通道号 camera_name: 设备名称 device_code: 设备编号(如1#-1) pump_name: 泵名称(A/B/C/D),用于匹配流量PLC flow_plc: 流量PLC地址映射 """ self.plant_name = plant_name self.rtsp_url = rtsp_url self.channel = channel self.pump_id = f"ch{channel}" self.camera_name = camera_name or f"ch{channel}" self.device_code = device_code self.pump_name = pump_name self.flow_plc = flow_plc or {} self.project_id = project_id def get_flow_plc_address(self): """ 获取该设备对应的进水流量PLC地址 返回: PLC地址字符串,不存在则返回空字符串 """ if self.pump_name and self.flow_plc: return self.flow_plc.get(self.pump_name, "") return "" def __repr__(self): return f"RTSPStreamConfig(plant='{self.plant_name}', camera='{self.camera_name}', pump='{self.pump_id}')" class FFmpegProcess: """ FFmpeg进程管理类 负责启动和管理单个RTSP流的FFmpeg进程。 进程将RTSP流转换为固定时长的WAV音频文件。 文件名格式: {project_id}_{device_code}_{时间戳}.wav """ def __init__(self, stream_config, output_dir, config=None): """ 初始化FFmpeg进程管理器 参数: stream_config: RTSP流配置 output_dir: 音频输出目录 config: 全局配置字典 """ self.config_dict = config or {} self.stream_config = stream_config self.output_dir = output_dir self.process = None # 从配置中读取文件时长,默认8秒 audio_cfg = self.config_dict.get('audio', {}) self.file_duration = audio_cfg.get('file_duration', 8) # 获取project_id(从 stream_config 中读取) self.project_id = stream_config.project_id def start(self): """ 启动FFmpeg进程 返回: bool: 启动成功返回True,失败返回False """ # 获取设备编号 device_code = self.stream_config.device_code or self.stream_config.pump_id # 创建输出目录(每个设备独立目录) # 结构: data/{device_code}/current/ current_dir = self.output_dir / device_code / "current" current_dir.mkdir(parents=True, exist_ok=True) # 存放人工核查确认为正常的异常音频,增训时全量参与训练 verified_dir = self.output_dir / device_code / "verified_normal" verified_dir.mkdir(parents=True, exist_ok=True) # 构建输出文件名模板 # 格式: {project_id}_{device_code}_{时间戳}.wav # 例如: 92_1#-1_20251218142000.wav output_pattern = str(current_dir / f"{self.project_id}_{device_code}_%Y%m%d%H%M%S.wav") # 构建FFmpeg命令 # 添加内存限制参数,防止 RTSP 缓冲区无限增长导致 OOM cmd = [ "ffmpeg", # RTSP 输入参数(内存限制) "-rtsp_transport", "tcp", # 使用TCP传输 "-probesize", "1000000", # 限制探测大小为1MB(默认5MB) "-analyzeduration", "1000000", # 限制分析时长为1秒(默认5秒) "-max_delay", "500000", # 最大延迟500ms "-fflags", "nobuffer", # 禁用输入缓冲 "-flags", "low_delay", # 低延迟模式 "-i", self.stream_config.rtsp_url, # 输入RTSP流 # 音频输出参数 "-vn", # 不处理视频 "-ac", "1", # 单声道 "-ar", str(CFG.SR), # 采样率(16000Hz) "-f", "segment", # 分段模式 "-segment_time", str(int(self.file_duration)), # 每段时长 "-strftime", "1", # 启用时间格式化 "-loglevel", "error", # 只输出错误日志 "-y", # 覆盖已存在的文件 output_pattern, ] try: # 启动FFmpeg进程 self.process = subprocess.Popen( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) logger.info(f"FFmpeg已启动: {device_code} | {self.stream_config.camera_name} | " f"文件时长: {self.file_duration}秒 | PID={self.process.pid}") return True except FileNotFoundError: logger.error("FFmpeg错误: 未找到ffmpeg命令,请确保已安装FFmpeg") return False except Exception as e: logger.error(f"FFmpeg启动失败: {device_code} | 错误: {e}") return False def is_running(self): """ 检查FFmpeg进程是否在运行 返回: bool: 进程运行中返回True,否则返回False """ if self.process is None: return False return self.process.poll() is None def stop(self): """ 停止FFmpeg进程 """ if self.process is not None and self.is_running(): logger.info(f"FFmpeg停止: {self.stream_config.plant_name} | {self.stream_config.camera_name} | PID={self.process.pid}") self.process.terminate() try: self.process.wait(timeout=5) except subprocess.TimeoutExpired: logger.warning(f"FFmpeg强制终止: PID={self.process.pid}") self.process.kill() class PickupMonitor: """ 拾音器监控线程类 监控音频目录,调用预测器检测异常,推送告警。 主要功能: 1. 不截取视频帧(纯音频) 2. 计算并上报频谱图数据(this_frequency + normal_frequency_middle) 3. 每分钟汇总上报一次 4. 使用SCADA API获取进水流量 """ def __init__(self, audio_dir, multi_predictor, stream_configs, check_interval=1.0, config=None, config_manager=None, test_mode=False): """ 初始化监控器 Args: audio_dir: 音频根目录 multi_predictor: 多模型预测器实例 stream_configs: 所有RTSP流配置 check_interval: 检查间隔(秒) config: 配置字典 config_manager: ConfigManager 实例(用于热更新,为 None 时禁用热更新) """ # 音频根目录(各设备目录在其下) self.audio_dir = audio_dir self.multi_predictor = multi_predictor self.predictor = None # 兼容性保留,已废弃 self.stream_configs = stream_configs self.check_interval = check_interval self.config = config or {} # 测试模式:禁用聚合抑制、强制 alert_enabled=True # 用于水厂现场播放音频测试,每个设备独立报警不被聚合抑制 self.test_mode = test_mode if test_mode: logger.warning("测试模式已启用: 聚合抑制=关闭 | alert_enabled=强制开启") # 热更新:持有 ConfigManager 引用,定期从 DB 刷新配置 self._config_manager = config_manager self._last_config_reload = 0 # 上次配置刷新时间戳 self._config_reload_interval = 30 # 配置刷新间隔(秒) # 从 plants 列表中取第一个启用水厂的 project_id # 兼容旧配置:优先 plants,回退 platform.project_id self.project_id = 0 for plant in self.config.get('plants', []): if plant.get('enabled', False): self.project_id = plant.get('project_id', 0) break if self.project_id == 0: self.project_id = self.config.get('platform', {}).get('project_id', 92) # 构建device_code到stream_config的映射 self.device_map = {} for cfg in stream_configs: if cfg.device_code: self.device_map[cfg.device_code] = cfg # 已处理文件集合 self.seen_files = set() # 每个设备的检测结果缓存(用于1分钟汇总) # key: device_code(如 "1#-1") self.device_cache = defaultdict(lambda: { "errors": [], # 每个文件的平均重建误差列表(用于持续弱信号检测) "anomaly_ratios": [], # 每个文件的 patch 异常比例列表(用于突发型故障检测) "last_upload": None, # 上次上报时间 "audio_data": [], # 用于计算频谱图的音频数据 "status": None # 最近的运行状态 }) # 频谱图历史缓存(用于计算normal_frequency_middle) # key: device_code, value: list of (timestamp, freq_db) freq_cfg = self.config.get('prediction', {}).get('frequency_history', {}) self.freq_history_enabled = freq_cfg.get('enabled', True) self.freq_history_minutes = freq_cfg.get('history_minutes', 10) self.freq_history = defaultdict(list) # 上一次上报状态(True=异常,False=正常,None=初始) # 用于状态变更时去重,防止持续报警 self.last_report_status = {} # 上报周期(秒) audio_cfg = self.config.get('audio', {}) self.segment_duration = audio_cfg.get('segment_duration', 60) # 异常音频保存配置 save_cfg = self.config.get('prediction', {}).get('save_anomaly_audio', {}) self.save_anomaly_enabled = save_cfg.get('enabled', True) self.save_anomaly_dir = Path(__file__).parent / save_cfg.get('save_dir', 'data/anomaly_detected') if self.save_anomaly_enabled: self.save_anomaly_dir.mkdir(parents=True, exist_ok=True) # 异常推送配置 push_cfg = self.config.get('push_notification', {}) self.push_enabled = push_cfg.get('enabled', False) # 测试模式下强制开启异常告警,不受配置文件中 alert_enabled 影响 if self.test_mode: self.alert_enabled = True else: self.alert_enabled = push_cfg.get('alert_enabled', True) self.push_timeout = push_cfg.get('timeout', 30) self.push_retry_count = push_cfg.get('retry_count', 2) # 推送基地址分组(production=正式, test=测试,按模式路由) url_groups = push_cfg.get('push_base_urls', {}) # 兼容旧格式(列表)和新格式(字典) if isinstance(url_groups, dict): self.push_production_urls = [u.rstrip('/') for u in url_groups.get('production', []) if u] self.push_test_urls = [u.rstrip('/') for u in url_groups.get('test', []) if u] else: # 旧格式兜底:全部作为 production self.push_production_urls = [item.get('url', '').rstrip('/') for item in url_groups if item.get('url')] self.push_test_urls = [] # 诊断日志:确认推送URL解析结果 logger.info(f"推送URL解析结果 | production={self.push_production_urls} | test={self.push_test_urls} | url_groups类型={type(url_groups).__name__}") # 推送失败记录文件路径 failed_log_path = push_cfg.get('failed_push_log', 'data/push_failures.jsonl') self.failed_push_log = Path(__file__).parent / failed_log_path self.failed_push_log.parent.mkdir(parents=True, exist_ok=True) # 如果 alert_enabled 为 False,记录日志提醒 if not self.alert_enabled: logger.warning("异常告警已禁用(alert_enabled=false),仅上报心跳状态") # ======================================== # 报警聚合器(替代原有固定 cooldown_minutes) # ---------------------------------------- # 功能1:跨设备聚合抑制 - 同一水厂多设备同时报警 -> 环境噪声,全部抑制 # 功能2:分类型冷却 - 同类型24h,不同类型1h # 功能3:调试模式感知 - mode_id=4 时使用5分钟短冷却+跳过聚合窗口 # ======================================== agg_cfg = push_cfg.get('alert_aggregate', {}) self.alert_aggregator = None if self.test_mode: # 测试模式:不使用聚合器,每个设备独立报警 logger.info("测试模式: 聚合器已禁用") elif ALERT_AGGREGATOR_AVAILABLE: self.alert_aggregator = AlertAggregator( push_callback=self._push_detection_result, aggregate_enabled=agg_cfg.get('enabled', True), window_seconds=agg_cfg.get('window_seconds', 300), min_devices=agg_cfg.get('min_devices', 2), cooldown_same_type_hours=push_cfg.get('cooldown_same_type_hours', 24), cooldown_diff_type_hours=push_cfg.get('cooldown_diff_type_hours', 1), # 传入模式回调,让聚合器感知 mode_id=4 调试模式 mode_provider=lambda: self._remote_mode_id, debug_cooldown_minutes=push_cfg.get('debug_cooldown_minutes', 5) ) else: logger.warning("报警聚合器模块未找到,使用默认报警逻辑") # 上次异常音频保存时间(用于保存冷却时间计算) self.last_anomaly_save_time = {} # 异常保存冷却时间(分钟),同一设备连续异常时,每N分钟只保存一次 self.anomaly_save_cooldown_minutes = save_cfg.get('cooldown_minutes', 10) # 当前异常分类结果锁定(持续异常期间保持分类结果不变) # key: device_code, value: (anomaly_type_code, type_name) self.locked_anomaly_type = {} # 滑动窗口投票配置(5次中有3次异常才判定为异常) voting_cfg = self.config.get('prediction', {}).get('voting', {}) self.voting_enabled = voting_cfg.get('enabled', True) self.voting_window_size = voting_cfg.get('window_size', 5) # 窗口大小 self.voting_threshold = voting_cfg.get('threshold', 3) # 异常阈值(>=3次则判定异常) self.detection_history = {} # 每个设备的检测历史(True=异常) # 阈值容差区间配置(避免边界值反复跳变) # 误差在 threshold*(1-tolerance) ~ threshold*(1+tolerance) 范围内为灰区,维持上一状态 self.tolerance_ratio = voting_cfg.get('tolerance_ratio', 0.05) # 默认5%容差 self.last_single_anomaly = {} # 每个设备上一次的单周期判定结果 # 阈值现在由 multi_predictor 管理,每个设备从其对应模型目录加载 # 能量检测配置 energy_cfg = self.config.get('prediction', {}).get('energy_detection', {}) self.energy_detection_enabled = ENERGY_BASELINE_AVAILABLE and energy_cfg.get('enabled', True) self.skip_detection_when_stopped = energy_cfg.get('skip_when_stopped', True) # 能量基线检测器(每个设备一个) if self.energy_detection_enabled: self.energy_baselines = {} else: self.energy_baselines = None # SCADA API配置(用于获取泵状态和进水流量) scada_cfg = self.config.get('scada_api', {}) self.scada_enabled = scada_cfg.get('enabled', False) self.scada_url = scada_cfg.get('base_url', '') # 历史数据接口(备用) self.scada_realtime_url = scada_cfg.get('realtime_url', '') # 实时数据接口(主用) self.scada_jwt = scada_cfg.get('jwt_token', '') self.scada_timeout = scada_cfg.get('timeout', 10) # SCADA 自动登录配置 self._scada_login_url = scada_cfg.get('login_url', '') self._scada_login_username = scada_cfg.get('login_username', '') self._scada_login_password = scada_cfg.get('login_password', '') self._scada_login_type = scada_cfg.get('login_type', 'account') self._scada_login_dep_id = scada_cfg.get('login_dep_id', '') self._scada_auth_lock = threading.Lock() # 泵状态监控器(用于检测启停过渡期) self.pump_state_monitor = None self.pump_status_plc_configs = {} # {pump_name: [{point, name}, ...]} if PUMP_STATE_MONITOR_AVAILABLE and self.scada_enabled: # 初始化泵状态监控器 # 获取第一个启用水厂的project_id project_id = 0 for plant in self.config.get('plants', []): if plant.get('enabled', False): project_id = plant.get('project_id', 0) # 加载泵状态点位配置 pump_status_plc = plant.get('pump_status_plc', {}) self.pump_status_plc_configs = pump_status_plc break if project_id > 0: # 使用实时接口 URL 进行泵状态查询 self.pump_state_monitor = PumpStateMonitor( scada_url=self.scada_realtime_url, # 使用实时数据接口 scada_jwt=self.scada_jwt, project_id=project_id, timeout=self.scada_timeout, transition_window_minutes=15, # 启停后15分钟内视为过渡期 login_url=self._scada_login_url, login_username=self._scada_login_username, login_password=self._scada_login_password, login_type=self._scada_login_type, login_dep_id=self._scada_login_dep_id, ) logger.info(f"泵状态监控器已启用 (project_id={project_id}, 过渡期窗口=15分钟, 使用实时接口)") # 线程控制 self.running = False self.thread = None # 推送线程池(避免推送超时阻塞主监控循环) self._push_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="push") # 启动时间(用于跳过启动期间的状态变化日志) self.startup_time = None self.startup_warmup_seconds = 120 # 启动后120秒内不记录状态变化 # ======================================== # 异常上下文捕获配置 # ======================================== context_cfg = save_cfg.get('context_capture', {}) self.context_pre_minutes = context_cfg.get('pre_minutes', 2) self.context_post_minutes = context_cfg.get('post_minutes', 2) # 音频文件历史缓存(用于捕获异常前的音频) # key: device_code, value: deque of (timestamp, file_path) from collections import deque # 计算需要保留的历史文件数量(按分钟计算,每分钟约1个文件) history_size = self.context_pre_minutes + 2 # 多保留2个作为缓冲 self.audio_file_history = defaultdict(lambda: deque(maxlen=history_size)) # 异常上下文捕获状态 # key: device_code, value: dict self.anomaly_capture_state = {} logger.info(f"异常上下文捕获: 前{self.context_pre_minutes}分钟 + 后{self.context_post_minutes}分钟") # ======================================== # 人体检测报警抑制配置 # ======================================== # 只要任意摄像头在冷却时间内检测到人,所有设备的异常报警都会被抑制 human_cfg = self.config.get('human_detection', {}) self.human_detection_enabled = human_cfg.get('enabled', False) and HUMAN_DETECTION_AVAILABLE self.human_reader = None if self.human_detection_enabled: db_path = human_cfg.get('db_path', '') cooldown_minutes = human_cfg.get('cooldown_minutes', 5) self.human_reader = HumanDetectionReader( db_path=db_path, cooldown_minutes=cooldown_minutes ) logger.info(f"人体检测报警抑制已启用 (冷却时间={cooldown_minutes}分钟, 数据库={db_path})") # ======================================== # 远程异响检测调度配置 # ======================================== # 通过定时 GET 请求远程接口,根据返回的 mode_type、model_list、时间窗口 # 动态控制 alert_enabled 开关 schedule_cfg = self.config.get('detection_schedule', {}) self._detection_schedule_url = schedule_cfg.get('url', '') self._detection_schedule_interval = schedule_cfg.get('poll_interval', 60) self._detection_schedule_timeout = schedule_cfg.get('request_timeout', 10) # 独立计时器,与配置热刷新的 30 秒间隔解耦 self._last_detection_schedule_check = 0 # 远程调度返回的项目模式(1=日常, 2=参观, 3=检修, 4=调试) # 1/2: 推理 + 上报所有推送目标(外网/内网/测试) # 3: 检修模式,采集的音频直接丢弃,不保存不推理不上报 # 4: 调试模式,推理 + 仅上报到 label="测试" 的目标 self._remote_mode_id = 1 self._schedule_first_poll = True if self._detection_schedule_url: logger.info(f"远程异响调度已启用 | URL={self._detection_schedule_url} | 间隔={self._detection_schedule_interval}秒") # ======================================== # 音频上传旁路(边云协同 - 训练数据上传) # ======================================== # cloud_sync.enabled=false 时 AudioUploader 所有操作都是 no-op # 实际上传由独立进程 run_upload_worker.py 执行,不影响主推理 self.audio_uploader = None if AUDIO_UPLOADER_AVAILABLE: self.audio_uploader = AudioUploader( config=self.config, project_id=self.project_id, audio_dir=self.audio_dir ) # ======================================== # 正常音频多样性采样配置 # ======================================== # 基于重建误差的多样性采样:保留与已留样本误差差异大的文件,丢弃冗余样本 # 只影响正常音频的保存,异常音频走独立的上下文捕获分支不受影响 auto_training_cfg = self.config.get('auto_training', {}) cleanup_cfg = self.config.get('audio', {}).get('auto_cleanup', {}) # 训练模式:self=边缘自训练(保留数据),cloud=云端训练(可选上传后删除) self._training_mode = auto_training_cfg.get('training_mode', 'self') # 总开关:增训关闭时正常音频推理后直接删除 self._auto_training_enabled = auto_training_cfg.get('enabled', False) # 每设备每小时保留配额(总量 = keep_hourly_samples × 24) self._keep_hourly_samples = cleanup_cfg.get('keep_hourly_samples', 21) # 历史日期数据保留天数 self._keep_days = cleanup_cfg.get('keep_days', 14) # 多样性采样:误差最小距离阈值(小于此值视为冗余) # 运行时会根据当前小时进度自适应调整 self._diversity_base_epsilon = cleanup_cfg.get('diversity_epsilon', 0.001) # 采样运行时状态(按设备+小时跟踪) # key: (device_code, hour), value: {"kept_errors": [float], "kept_count": int} self._sample_state = {} def start(self): """ 启动监控线程 """ if self.running: return # 启动前清理current目录中的遗留文件 self._cleanup_current_on_startup() self.running = True self.startup_time = datetime.now() # 记录启动时间 self.thread = threading.Thread(target=self._monitor_loop, daemon=True) self.thread.start() # 打印监控的设备列表 device_codes = list(self.device_map.keys()) logger.info(f"监控线程已启动 | 目录: {self.audio_dir} | 监控设备: {device_codes}") def _cleanup_current_on_startup(self): """ 启动时清理current目录中的遗留文件 删除上次运行遗留的文件,避免混入新采集数据影响检测准确性 """ cleaned_count = 0 for device_code in self.device_map.keys(): current_dir = self.audio_dir / device_code / "current" if not current_dir.exists(): continue for wav_file in current_dir.glob("*.wav"): try: wav_file.unlink() cleaned_count += 1 except Exception as e: logger.warning(f"清理遗留文件失败: {wav_file.name} | {e}") if cleaned_count > 0: logger.info(f"启动清理: 已删除current目录中 {cleaned_count} 个遗留文件") def stop(self): """ 停止监控线程 """ if not self.running: return self.running = False if self.thread is not None: self.thread.join(timeout=5) # 关闭推送线程池,取消未执行的任务,最多等待10秒 self._push_executor.shutdown(wait=False, cancel_futures=True) logger.info(f"监控线程已停止") def _reload_hot_config(self): # 从 DB 热加载可变配置项,无需重启服务 # 只刷新运行时可安全变更的参数,不影响 FFmpeg 进程和流映射 if self._config_manager is None: return now = time.time() # 未达到刷新间隔则跳过 if now - self._last_config_reload < self._config_reload_interval: return self._last_config_reload = now try: # 从 DB 读取最新配置 fresh = self._config_manager.get_full_config() self.config = fresh # 刷新推送配置 push_cfg = fresh.get('push_notification', {}) self.push_enabled = push_cfg.get('enabled', False) self.alert_enabled = push_cfg.get('alert_enabled', True) self.push_timeout = push_cfg.get('timeout', 30) self.push_retry_count = push_cfg.get('retry_count', 2) url_groups = push_cfg.get('push_base_urls', {}) if isinstance(url_groups, dict): self.push_production_urls = [u.rstrip('/') for u in url_groups.get('production', []) if u] self.push_test_urls = [u.rstrip('/') for u in url_groups.get('test', []) if u] else: self.push_production_urls = [item.get('url', '').rstrip('/') for item in url_groups if item.get('url')] self.push_test_urls = [] # 刷新投票配置 voting_cfg = fresh.get('prediction', {}).get('voting', {}) self.voting_enabled = voting_cfg.get('enabled', True) self.voting_window_size = voting_cfg.get('window_size', 5) self.voting_threshold = voting_cfg.get('threshold', 3) self.tolerance_ratio = voting_cfg.get('tolerance_ratio', 0.05) # 刷新能量检测配置 energy_cfg = fresh.get('prediction', {}).get('energy_detection', {}) self.skip_detection_when_stopped = energy_cfg.get('skip_when_stopped', True) # 刷新人体检测配置 human_cfg = fresh.get('human_detection', {}) new_human_enabled = human_cfg.get('enabled', False) if new_human_enabled != self.human_detection_enabled: logger.info(f"人体检测抑制配置变更: {self.human_detection_enabled} -> {new_human_enabled}") self.human_detection_enabled = new_human_enabled # 刷新远程异响调度配置 schedule_cfg = fresh.get('detection_schedule', {}) self._detection_schedule_url = schedule_cfg.get('url', '') self._detection_schedule_interval = schedule_cfg.get('poll_interval', 60) self._detection_schedule_timeout = schedule_cfg.get('request_timeout', 10) logger.debug("配置热刷新完成") except Exception as e: logger.error(f"配置热刷新失败: {e}") def _check_detection_schedule(self): # 远程项目模式调度检查 # 接口:GET /api/v2/project_mode/current?project_id=xxx # mode_id 含义(1=日常, 2=参观, 3=检修, 4=调试): # 1/2: 正常运行 -> 推理 + 上报所有推送目标 # 3: 检修模式 -> 音频采集但不保存、不推理、不上报 # 4: 调试模式 -> 推理 + 仅推送到 label="测试" 的目标 if not self._detection_schedule_url: return now_ts = time.time() if now_ts - self._last_detection_schedule_check < self._detection_schedule_interval: return self._last_detection_schedule_check = now_ts try: for attempt in range(2): if not self._ensure_jwt(): logger.warning("远程调度接口: JWT 不可用") return resp = requests.get( self._detection_schedule_url, params={'project_id': self.project_id}, headers={"JWT-TOKEN": self.scada_jwt}, timeout=self._detection_schedule_timeout ) # 401/403 时刷新 token 重试一次 if resp.status_code in (401, 403) and self._can_auto_login() and attempt == 0: self._clear_jwt() continue break resp.raise_for_status() result = resp.json() if result.get('code') != 200: logger.warning(f"远程调度接口返回异常: {result}") return # data 是数组,按 project_id 匹配当前项目 entries = result.get('data', []) entry = None for item in entries: if item.get('project_id') == self.project_id: entry = item break if entry is None: logger.debug(f"远程调度未返回 project_id={self.project_id} 的记录,保持当前状态") return new_mode_id = int(entry.get('mode_id', 1)) # mode_id=3 检修模式额外检查 scheduled_end_time 是否已过期 # 过期则自动回退到日常模式,防止平台侧忘记切回 if new_mode_id == 3: end_str = entry.get('scheduled_end_time', '') if end_str and not end_str.startswith('0001'): end_dt = self._parse_schedule_time(end_str) if end_dt is not None and datetime.now() > end_dt: new_mode_id = 1 mode_names = {1: '日常', 2: '参观', 3: '检修', 4: '调试'} if new_mode_id != self._remote_mode_id: old_mode = self._remote_mode_id logger.info( f"项目模式变更: {mode_names.get(old_mode, '未知')} -> " f"{mode_names.get(new_mode_id, '未知')}(mode_id={new_mode_id})" ) self._remote_mode_id = new_mode_id # 模式切换时重置所有报警相关状态 # 防止上一模式的积累状态污染新模式的判定逻辑 self._reset_alert_state_on_mode_change(old_mode, new_mode_id) else: logger.info(f"远程调度轮询 | 当前模式: {mode_names.get(new_mode_id, '未知')}(mode_id={new_mode_id})") except Exception as e: logger.warning(f"远程异响调度请求失败,保持当前状态: {e}") @staticmethod def _parse_schedule_time(time_str): # 尝试多种格式解析时间字符串(含 ISO 8601 带时区后缀),全部失败返回 None # 先去掉尾部 'Z' 或 '+08:00' 等时区标记,统一按本地时间解析 clean = time_str.strip() if clean.endswith('Z'): clean = clean[:-1] # 去掉类似 +08:00 的时区偏移 if '+' in clean and clean.index('+') > 10: clean = clean[:clean.rindex('+')] for fmt in ('%Y-%m-%dT%H:%M:%S', '%Y-%m-%dT%H:%M', '%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M'): try: return datetime.strptime(clean, fmt) except ValueError: continue return None def _reset_alert_state_on_mode_change(self, old_mode_id, new_mode_id): # 模式切换时重置所有报警相关状态 # 投票窗口、异常状态、冷却记录、聚合窗口全部清空 # 确保切换后从零开始判定,不受上一模式残留数据影响 mode_names = {1: '日常', 2: '参观', 3: '检修', 4: '调试'} logger.info( f"重置报警状态: {mode_names.get(old_mode_id, '?')} -> " f"{mode_names.get(new_mode_id, '?')}" ) # 清空投票窗口历史 self.detection_history.clear() # 清空上次上报状态(重新触发正常->异常的判定) self.last_report_status.clear() # 清空锁定的异常分类类型 self.locked_anomaly_type.clear() # 清空每个设备的累积检测结果(errors, anomaly_ratios) for device_code in list(self.device_cache.keys()): cache = self.device_cache[device_code] cache["errors"].clear() cache["anomaly_ratios"].clear() # 清空聚合器的冷却记录和聚合窗口 if self.alert_aggregator: self.alert_aggregator.cooldown_records.clear() self.alert_aggregator.pending_windows.clear() logger.info("聚合器冷却记录和聚合窗口已清空") logger.info("报警状态重置完成") def _monitor_loop(self): """ 监控循环(线程主函数) 持续检查各设备的音频目录,处理新生成的WAV文件。 每分钟汇总一次结果进行上报。 """ # 确保根目录存在 self.audio_dir.mkdir(parents=True, exist_ok=True) while self.running: try: # 热更新:定期从 DB 刷新可变配置 self._reload_hot_config() # 远程异响调度:每60秒检查一次远程接口,控制 _skip_inference self._check_detection_schedule() # 扫描所有设备目录下的current文件夹 for device_code in self.device_map.keys(): device_current_dir = self.audio_dir / device_code / "current" # 目录不存在则跳过 if not device_current_dir.exists(): continue for wav_file in device_current_dir.glob("*.wav"): # 跳过已处理的文件 if wav_file in self.seen_files: continue # 检查文件是否完整 try: stat_info = wav_file.stat() file_age = time.time() - stat_info.st_mtime file_size = stat_info.st_size # 文件还在写入中,下次再检查(不加入 seen_files) if file_age < 12.0: continue # 文件大小异常处理(60秒 × 16000Hz × 2字节 ≈ 1.9MB) if file_size < 500_000: if file_age > 20.0: # 确认是损坏文件,删除并标记 try: wav_file.unlink() logger.debug(f"删除异常小文件: {wav_file.name} ({file_size / 1000:.1f}KB)") except Exception as e: logger.error(f"删除文件失败: {wav_file.name} | {e}") self.seen_files.add(wav_file) continue if file_size > 3_000_000: if file_age > 20.0: # 文件过大,直接归档到日期目录(不丢弃,不加 seen) logger.warning(f"文件过大,直接归档: {wav_file.name} ({file_size / 1000:.1f}KB)") self._move_audio_to_date_dir(wav_file) self.seen_files.add(wav_file) continue except Exception as e: logger.error(f"文件状态检查失败: {wav_file.name} | {e}") continue # 处理新文件 self._process_new_file(wav_file) # 标记为已处理 self.seen_files.add(wav_file) # 检查是否需要进行周期性上报 self._check_periodic_upload() # 检查聚合窗口是否到期,到期则执行聚合判定并推送 if self.alert_aggregator: self.alert_aggregator.check_and_flush() # 定期清理 current 目录中的滞留文件(防僵尸兜底) # 正常流程中文件应在 1 分钟内被处理并移走 # 超过 5 分钟仍留在 current 的文件视为异常滞留 self._cleanup_stale_current_files() # 清理过大的已处理文件集合(直接丢弃旧的,避免 stat 系统调用) if len(self.seen_files) > 10000: # 按添加顺序保留最近的一半(seen_files 是无序的,直接截断) self.seen_files = set(list(self.seen_files)[-5000:]) except Exception as e: logger.error(f"监控循环错误: {e}") # 防御性清空:避免异常导致 audio_data 持续累积内存 for dc in list(self.device_cache.keys()): self.device_cache[dc]["audio_data"] = [] self.device_cache[dc]["pending_files"] = [] # 等待下一次检查 time.sleep(self.check_interval) def _process_new_file(self, wav_file): """ 处理新的音频文件 文件名格式: {project_id}_{device_code}_{时间戳}.wav 例如: 92_1#-1_20251218142000.wav 流程: 1. 加载音频 2. 计算能量判断设备状态 3. 进行AE异常检测,记录重建误差 4. 保存音频数据用于计算频谱图 """ try: # 从文件名解析device_code # 格式: {project_id}_{device_code}_{时间戳}.wav try: parts = wav_file.stem.split('_') if len(parts) >= 3: device_code = parts[1] # 第二部分是device_code else: device_code = "unknown" except: device_code = "unknown" # ======================================== # 泵启停状态检测(基于 PLC 查询)- 优先于模型检查 # ---------------------------------------- # 逻辑:通过 SCADA API 查询 PLC 点位判断泵是否运行 # 作用: # 1. 冷启动模式下也能过滤停机时的音频(保证训练数据质量) # 2. 泵停机时跳过异常检测(避免无意义的检测) # 3. 记录设备状态用于后续上报 # 依赖:pump_state_monitor + rtsp_config 中的 pump_status_plc 配置 # ======================================== device_status = "未知" pump_is_running = True # 默认认为运行中(PLC 查询失败时的保守策略) # 获取该设备对应的流配置 stream_config = self.device_map.get(device_code) if self.pump_state_monitor and stream_config: # 根据设备的 pump_name 找到关联的 PLC 点位配置 pump_name = stream_config.pump_name pump_configs = self.pump_status_plc_configs.get(pump_name, []) if pump_configs: # 遍历所有关联泵,只要有一个运行就认为设备在工作 any_pump_running = False for pump_cfg in pump_configs: point = pump_cfg.get("point", "") name = pump_cfg.get("name", point) pump_id = point # 查询泵状态(带 60 秒缓存,不会频繁请求 SCADA) is_running, _ = self.pump_state_monitor.update_pump_state(pump_id, point, name) if is_running: any_pump_running = True pump_is_running = any_pump_running device_status = "开机" if pump_is_running else "停机" # 泵全部停机时跳过(可通过配置禁用此行为) # 冷启动和正常模式都适用,确保训练数据质量 if self.skip_detection_when_stopped and not pump_is_running: logger.info(f"泵停机(PLC): {device_code} | 归档到过渡期目录(不用于训练)") self._move_audio_to_transition_dir(wav_file, "stopped") return # ======================================== # 过渡期检测(泵启停后15分钟内) # ---------------------------------------- # 目的:过滤启停过程中的不稳定音频 # 确保训练数据只包含稳定运行期的音频 # ======================================== if self.skip_detection_when_stopped: pump_in_transition, transition_pump_names = \ self.pump_state_monitor.check_pumps_transition(pump_configs) if pump_in_transition: logger.info(f"泵过渡期: {device_code} | 归档到过渡期目录(不用于训练) | " f"过渡期泵: {', '.join(transition_pump_names)}") self._move_audio_to_transition_dir(wav_file, "transition") return # 检修模式(mode_id=3):丢弃音频,不保存不推理 if self._remote_mode_id == 3: try: wav_file.unlink() except Exception: pass return # 获取该设备的预测器(懒加载模型) device_predictor = self.multi_predictor.get_predictor(device_code) if device_predictor is None: logger.info(f"冷启动模式(设备 {device_code} 无模型): 归档 {wav_file.name}") self._move_audio_to_date_dir(wav_file) return # 加载音频 try: y, sr = librosa.load(str(wav_file), sr=CFG.SR, mono=True) except Exception as e: logger.error(f"音频加载失败: {wav_file.name} | {e}") return # 记录状态到缓存(用于周期上报) # 注意:泵状态检测已在前面完成,这里只记录状态 self.device_cache[device_code]["status"] = device_status # ======================================== # 计算重建误差(双轨:平均MSE + patch异常比例) # ======================================== threshold = self._get_threshold(device_code) error, anomaly_ratio = self._compute_reconstruction_error( wav_file, device_predictor, threshold=threshold ) if error is not None: self.device_cache[device_code]["errors"].append(error) if anomaly_ratio is not None: self.device_cache[device_code]["anomaly_ratios"].append(anomaly_ratio) # ======================================== # 保存音频数据用于计算频谱图 # ======================================== self.device_cache[device_code]["audio_data"].append(y) # ======================================== # 暂存文件路径,等待周期聚合判定后再归档 # ======================================== if "pending_files" not in self.device_cache[device_code]: self.device_cache[device_code]["pending_files"] = [] self.device_cache[device_code]["pending_files"].append(wav_file) # ======================================== # 记录到音频历史缓存(用于异常上下文捕获) # ======================================== self.audio_file_history[device_code].append((datetime.now(), wav_file)) # 初始化上次上报时间 if self.device_cache[device_code]["last_upload"] is None: self.device_cache[device_code]["last_upload"] = datetime.now() # 获取阈值并判断结果 # 阈值已在上方双轨计算时获取,此处复用(避免重复调用) # ======================================== # 快速通道:连续多个文件误差极高时快速预警(暂时关闭) # 不走投票窗口,用于捕获突发性严重故障 # ======================================== # if error is not None and threshold is not None and threshold > 0: # # 维护快速通道缓冲区 # if "fast_alert_buffer" not in self.device_cache[device_code]: # self.device_cache[device_code]["fast_alert_buffer"] = [] # # fast_buf = self.device_cache[device_code]["fast_alert_buffer"] # # 连续性检查:误差超过 2x 阈值时记录,否则清空缓冲区 # if error > threshold * 2.0: # fast_buf.append(error) # else: # fast_buf.clear() # # # 连续 3 个文件(~24秒)都超过 2x 阈值 → 触发快速预警 # FAST_ALERT_CONSECUTIVE = 3 # if len(fast_buf) >= FAST_ALERT_CONSECUTIVE: # # 检查是否已触发过(避免重复告警) # last_fast = self.device_cache[device_code].get("last_fast_alert_time") # now = datetime.now() # can_fast_alert = (last_fast is None or # (now - last_fast).total_seconds() > 300) # 5分钟冷却 # # if can_fast_alert: # # 快速通道同样受抑制逻辑约束 # suppress = False # # 泵过渡期抑制 # if self.pump_state_monitor and stream_config: # pump_name = stream_config.pump_name # pump_configs = self.pump_status_plc_configs.get(pump_name, []) # if pump_configs: # in_transition, _ = self.pump_state_monitor.check_pumps_transition(pump_configs) # if in_transition: # suppress = True # # 人体检测抑制 # if self.human_detection_enabled and self.human_reader: # if self.human_reader.is_in_cooldown(): # suppress = True # # alert_enabled 开关 # if not self.alert_enabled: # suppress = True # # if not suppress: # avg_fast = float(np.mean(fast_buf)) # logger.warning( # f"[!!] 快速通道触发: {device_code} | " # f"连续{len(fast_buf)}个文件异常 | " # f"平均误差={avg_fast:.6f} 阈值={threshold:.6f}") # self.device_cache[device_code]["last_fast_alert_time"] = now # fast_buf.clear() # # 标记快速预警,在下次周期上报时一并处理 # self.device_cache[device_code]["fast_alert_pending"] = True if error is not None and threshold is not None: is_anomaly = error > threshold ratio_tag = f" patch比例={anomaly_ratio:.2f}" if anomaly_ratio is not None else "" result_tag = "!!" if is_anomaly else "OK" logger.info(f"[{result_tag}] {device_code} | {wav_file.name} | " f"误差={error:.6f} 阈值={threshold:.6f}{ratio_tag}") elif error is not None: logger.debug(f"文件预测: {wav_file.name} | 误差={error:.6f} | 阀值=未设置") else: logger.warning(f"预测跳过: {wav_file.name} | 误差计算失败") except Exception as e: logger.error(f"处理文件失败: {wav_file.name} | 错误: {e}") # 处理失败的文件归档到日期目录,防止留在 current 变成僵尸文件 # 文件在 current 中会被 seen_files 标记为已处理,但物理上不会被移走 # 如果不主动移出,会导致 current 目录持续膨胀 try: if wav_file.exists(): self._move_audio_to_date_dir(wav_file) logger.info(f"异常文件已归档: {wav_file.name}") except Exception as move_err: logger.error(f"异常文件归档失败: {wav_file.name} | {move_err}") def _compute_reconstruction_error(self, wav_file, device_predictor, threshold=None): """ 计算单个音频文件的重建误差(双轨判定) 使用8秒窗口、4秒步长切割音频,提取多个patches分别计算误差。 同时返回: 1. 平均 MSE(用于检测持续弱信号型故障,如轴承缓慢磨损) 2. patch 异常比例(用于检测突发型故障,如轴承刮擦、水锤冲击) 参数: wav_file: 音频文件路径 device_predictor: 设备预测器实例 threshold: 设备阈值,用于计算 patch 级异常比例(为 None 时不计算比例) 返回: (mean_mse, anomaly_ratio) 元组,失败返回 (None, None) """ try: import torch from predictor.utils import align_to_target # Min-Max 标准化参数 global_min = device_predictor.global_min global_max = device_predictor.global_max # 加载音频 y, _ = librosa.load(str(wav_file), sr=CFG.SR, mono=True) win_samples = int(CFG.WIN_SEC * CFG.SR) hop_samples = int(CFG.HOP_SEC * CFG.SR) if len(y) < win_samples: logger.warning(f"音频太短,无法提取patches: {wav_file.name}") return None, None patches = [] for start in range(0, len(y) - win_samples + 1, hop_samples): window = y[start:start + win_samples] S = librosa.feature.melspectrogram( y=window, sr=CFG.SR, n_fft=CFG.N_FFT, hop_length=CFG.HOP_LENGTH, n_mels=CFG.N_MELS, power=2.0 ) S_db = librosa.power_to_db(S, ref=np.max) if S_db.shape[1] < CFG.TARGET_FRAMES: S_db = np.pad(S_db, ((0, 0), (0, CFG.TARGET_FRAMES - S_db.shape[1]))) else: S_db = S_db[:, :CFG.TARGET_FRAMES] # Min-Max 标准化 S_norm = (S_db - global_min) / (global_max - global_min + 1e-6) patches.append(S_norm.astype(np.float32)) if not patches: logger.warning(f"未能提取任何patches: {wav_file.name}") return None, None arr = np.stack(patches, 0) arr = np.expand_dims(arr, 1) tensor = torch.from_numpy(arr) torch_device = device_predictor.torch_device tensor = tensor.to(torch_device) with torch.no_grad(): recon = device_predictor.model(tensor) recon = align_to_target(recon, tensor) mse_per_patch = torch.mean((recon - tensor) ** 2, dim=[1, 2, 3]) mean_mse = torch.mean(mse_per_patch).item() # 计算 patch 级异常比例(突发型故障检测) # 单个 patch 的 MSE 超过阈值即视为该 patch 异常 anomaly_ratio = 0.0 if threshold and threshold > 0: patch_mses = mse_per_patch.cpu().numpy() anomaly_patch_count = int(np.sum(patch_mses > threshold)) anomaly_ratio = anomaly_patch_count / len(patch_mses) logger.debug( f"patch级分析: {wav_file.name} | " f"异常patch={anomaly_patch_count}/{len(patch_mses)} " f"比例={anomaly_ratio:.2f}" ) logger.debug(f"重建误差: {wav_file.name} | patches={len(patches)} | MSE={mean_mse:.6f} | ratio={anomaly_ratio:.2f}") return mean_mse, anomaly_ratio except Exception as e: logger.error(f"计算重建误差失败: {wav_file.name} | {e}") return None, None def _check_periodic_upload(self): """ 检查是否需要进行周期性上报 每分钟汇总一次各设备的检测结果并上报 """ # 检修模式(mode_id=3)跳过上报 if self._remote_mode_id == 3: return now = datetime.now() for device_code, cache in self.device_cache.items(): # 检查上报时间间隔 last_upload = cache.get("last_upload") if last_upload is None: continue elapsed = (now - last_upload).total_seconds() # 达到上报周期 if elapsed >= self.segment_duration: # 获取该设备的流配置 stream_config = self.device_map.get(device_code) # 计算平均重建误差 errors = cache.get("errors", []) avg_error = float(np.mean(errors)) if errors else 0.0 # 获取阈值 threshold = self._get_threshold(device_code) # ======================================== # 双轨判定:平均MSE(持续弱信号) OR patch异常比例(突发型故障) # ======================================== # 轨道1:平均 MSE 判定(带容差区间,检测持续性退化型故障) # 轨道2:patch 异常比例判定(检测瞬时突发型故障) # 任一轨道触发即判定当前周期异常 anomaly_ratios = cache.get("anomaly_ratios", []) avg_anomaly_ratio = float(np.mean(anomaly_ratios)) if anomaly_ratios else 0.0 if threshold: # 轨道1:平均 MSE 容差区间判定 upper_bound = threshold * (1 + self.tolerance_ratio) lower_bound = threshold * (1 - self.tolerance_ratio) if avg_error > upper_bound: # 超过上边界 -> 确定异常 mse_anomaly = True elif avg_error < lower_bound: # 低于下边界 -> 确定正常 mse_anomaly = False else: # 灰区 -> 与阈值比较 mse_anomaly = avg_error > threshold # 轨道2:patch 异常比例判定 # ANOMALY_RATIO_THRESHOLD 定义在 predictor/config.py(默认 0.1 = 10% 的 patch 超阈值) ratio_anomaly = avg_anomaly_ratio >= CFG.ANOMALY_RATIO_THRESHOLD # OR 逻辑:任一轨道触发即判定异常 is_current_anomaly = mse_anomaly or ratio_anomaly # 判定来源日志(仅异常时记录来源,便于调试) if is_current_anomaly: sources = [] if mse_anomaly: sources.append(f"平均MSE={avg_error:.6f}") if ratio_anomaly: sources.append(f"patch比例={avg_anomaly_ratio:.2f}") logger.debug(f"双轨判定异常: {device_code} | 触发源: {', '.join(sources)}") # 记录本次判定结果 self.last_single_anomaly[device_code] = is_current_anomaly else: is_current_anomaly = False # ======================================== # 滑动窗口投票:5次中有3次异常才判定为异常 # ======================================== if device_code not in self.detection_history: self.detection_history[device_code] = [] # 按照用户要求:如果本次是正常判定,直接清空历史滑动窗口,强制要求连续异常 if not is_current_anomaly: self.detection_history[device_code].clear() # 记录本次检测结果 self.detection_history[device_code].append(is_current_anomaly) # 保持窗口大小 if len(self.detection_history[device_code]) > self.voting_window_size: self.detection_history[device_code].pop(0) # 投票判定最终异常状态 if self.voting_enabled: anomaly_count = sum(self.detection_history[device_code]) is_anomaly = anomaly_count >= self.voting_threshold if len(self.detection_history[device_code]) >= self.voting_window_size: window_info = f"窗口[{anomaly_count}/{self.voting_window_size}]" else: window_info = f"窗口未满[{anomaly_count}/{self.voting_window_size}]" else: is_anomaly = is_current_anomaly window_info = "投票未开启" # ======================================== # 状态变更检测 # trigger_alert: 仅在 正常(或初始) -> 异常 时为True # 冷却逻辑已移至 AlertAggregator 内部处理 # ======================================== last_is_anomaly = self.last_report_status.get(device_code) trigger_alert = False if is_anomaly: # 只有状态变化(正常->异常) 才触发报警流程 # 泵过渡期检查已在 _process_new_file 中完成,过渡期内的文件不会进入 pending if last_is_anomaly is None or not last_is_anomaly: if self.alert_enabled: # 调试模式裕量检查:误差 barely 超阈值时不触发 # 防止边界抖动导致状态反复跳变,每隔几分钟刷一次报警 # 正常模式不加此限制(投票+容差已足够) if self._remote_mode_id == 4 and threshold: margin_ratio = self.config.get('push_notification', {}).get('debug_alert_margin', 0.05) if avg_error < threshold * (1 + margin_ratio): trigger_alert = False logger.info( f"[调试模式] 裕量不足,跳过报警: {device_code} | " f"误差={avg_error:.6f} 阈值={threshold:.6f} " f"(需超过 {threshold * (1 + margin_ratio):.6f})" ) else: trigger_alert = True # 人体检测抑制:任意摄像头检测到人则不报警 elif self.human_detection_enabled and self.human_reader: if self.human_reader.is_in_cooldown(): trigger_alert = False status_info = self.human_reader.get_status_info() logger.info(f"人体检测抑制: {device_code} | {status_info},跳过报警") else: trigger_alert = True else: trigger_alert = True else: trigger_alert = False logger.debug(f"异常告警已禁用,跳过告警: {device_code}") # 获取运行状态 running_status = cache.get("status", "未知") # 获取进水流量 inlet_flow = self._get_inlet_flow(stream_config) if stream_config else 0.0 # 计算本次频谱图 audio_data = cache.get("audio_data", []) freq_db = self._compute_frequency_spectrum(audio_data) # 保存到频谱图历史(只保存dB值) if self.freq_history_enabled and freq_db: self.freq_history[device_code].append((now, freq_db)) # 清理过期历史 cutoff = now - timedelta(minutes=self.freq_history_minutes) self.freq_history[device_code] = [ (t, d) for t, d in self.freq_history[device_code] if t > cutoff ] # 计算历史频谱图平均值(normal_frequency_middle) freq_middle_db = self._compute_frequency_middle(device_code) # ======================================== # 异常分类:只在状态从正常变为异常时进行分类 # 持续异常期间沿用上次分类结果,保持一致性 # ======================================== anomaly_type_code = 6 # 默认:未分类异常 type_name = "未分类异常" if is_anomaly and audio_data: # 检查是否是新的异常(从正常变为异常) is_new_anomaly = (last_is_anomaly is None or not last_is_anomaly) if is_new_anomaly: # 新异常:使用 ML 分类器进行故障类型判定并锁定结果 try: from core.ml_classifier import classify_fault if len(audio_data) > 0: y = audio_data[-1] if isinstance(audio_data[-1], np.ndarray) else np.array(audio_data[-1]) # ML 分类器返回 4 元组:(编码, 名称, 置信度, 细分详情) anomaly_type_code, type_name, confidence, detail = classify_fault(y, sr=16000) # 锁定分类结果 self.locked_anomaly_type[device_code] = (anomaly_type_code, type_name) # 细分类型用于日志追踪,便于后续模型迭代分析 fine_info = detail.get('fine_name', '') if detail else '' logger.info(f"异常分类(ML): {type_name} (code={anomaly_type_code}, " f"细分={fine_info}, 置信度={confidence:.2f})") except Exception as e: logger.warning(f"异常分类失败: {e}") else: # 持续异常:沿用锁定的分类结果 if device_code in self.locked_anomaly_type: anomaly_type_code, type_name = self.locked_anomaly_type[device_code] logger.debug(f"异常分类(沿用): {type_name} (code={anomaly_type_code})") else: # 状态正常时清除锁定的分类结果 if device_code in self.locked_anomaly_type: del self.locked_anomaly_type[device_code] # 上报逻辑 if self.push_enabled and stream_config: # 预读异常音频base64:在文件被归档/清空之前立即读取 # 解决竞态问题:异步推送或聚合器延迟推送时文件可能已被移走 pre_read_wav_b64 = "" if trigger_alert: try: current_pending = cache.get("pending_files", []) if current_pending and current_pending[0].exists(): with open(current_pending[0], "rb") as f: pre_read_wav_b64 = base64.b64encode(f.read()).decode('utf-8') logger.debug(f"预读异常音频成功: {current_pending[0].name} | size={len(pre_read_wav_b64)}") except Exception as e: logger.warning(f"预读异常音频失败: {e}") if trigger_alert and self.alert_aggregator and not self.test_mode: # 报警走聚合器:跨设备聚合判定 + 分类型冷却 # 聚合器会在窗口到期后决定是否真正推送 # 测试模式下跳过聚合器,直接推送 self.alert_aggregator.submit_alert( plant_name=stream_config.plant_name, device_code=device_code, anomaly_type_code=anomaly_type_code, push_kwargs=dict( stream_config=stream_config, device_code=device_code, is_anomaly=is_anomaly, trigger_alert=True, abnormal_score=avg_error, score_threshold=threshold, running_status=running_status, inlet_flow=inlet_flow, freq_db=freq_db, freq_middle_db=freq_middle_db, anomaly_type_code=anomaly_type_code, abnormal_wav_b64=pre_read_wav_b64 ) ) else: # 非报警(心跳)或聚合器不可用或测试模式 -> 提交到线程池异步推送 self._push_executor.submit( self._push_detection_result, stream_config=stream_config, device_code=device_code, is_anomaly=is_anomaly, trigger_alert=trigger_alert, abnormal_score=avg_error, score_threshold=threshold, running_status=running_status, inlet_flow=inlet_flow, freq_db=freq_db, freq_middle_db=freq_middle_db, anomaly_type_code=anomaly_type_code, abnormal_wav_b64=pre_read_wav_b64 ) # 更新上一次状态 self.last_report_status[device_code] = is_anomaly # 日志记录 thr_str = f"{threshold:.6f}" if threshold else "未设置" # 报警去向说明 if trigger_alert and self.alert_aggregator: alert_dest = "-> 聚合器" elif trigger_alert: alert_dest = "-> 直接推送" else: alert_dest = "" # 使用设备名作为标识,增加视觉分隔 cam_label = "" if stream_config: cam_label = f"({stream_config.camera_name})" result_emoji = "!!" if is_anomaly else "OK" alert_str = f"报警=是 {alert_dest}" if trigger_alert else "报警=否" # patch异常比例信息 ratio_str = f"patch比例={avg_anomaly_ratio:.2f}" if anomaly_ratios else "patch比例=N/A" logger.info( f"[{result_emoji}] {device_code}{cam_label} | " f"误差={avg_error:.6f} 阈值={thr_str} {ratio_str} | " f"{window_info} | {running_status} | " f"{'异常' if is_anomaly else '正常'} | {alert_str}" ) # ======================================== # 根据投票结果归档文件 # 用投票后的 is_anomaly 决定归档,避免单次波动误归 # ======================================== pending_files = cache.get("pending_files", []) # 检查是否是新的异常(从正常变为异常) is_new_anomaly = is_anomaly and (last_is_anomaly is None or not last_is_anomaly) # 更新异常上下文捕获状态机 self._update_anomaly_capture_state(device_code, is_anomaly, is_new_anomaly, avg_error, threshold, now, pending_files) if pending_files: if is_anomaly: # 异常文件由上下文捕获状态机统一管理 # 状态机会在收集完前+后文件后一次性保存到 anomaly_detected pass else: # 正常文件:基于重建误差的多样性采样 for f in pending_files: should_keep = self._should_keep_normal_sample( device_code, avg_error, now ) if should_keep: # 归档到日期目录(供增训使用) self._move_audio_to_date_dir(f) # 边云协同:正常抽样入队上传 # AudioUploader 内部自动执行抽样间隔和每日上限控制 # enabled=false 时 enqueue_normal_sample 直接 return if self.audio_uploader: model_group = self.multi_predictor.device_model_map.get(device_code, device_code) self.audio_uploader.enqueue_normal_sample( device_code=device_code, wav_path=f, model_group=model_group, avg_error=avg_error, threshold=threshold ) else: # 冗余或增训关闭,直接删除节省磁盘 try: f.unlink() except Exception as e: logger.warning(f"删除冗余正常音频失败: {f.name} | {e}") # 状态恢复正常时,清除保存冷却时间 if device_code in self.last_anomaly_save_time: del self.last_anomaly_save_time[device_code] # 重置缓存 cache["errors"] = [] cache["anomaly_ratios"] = [] cache["audio_data"] = [] cache["pending_files"] = [] cache["last_upload"] = now logger.info("─" * 60) def _update_anomaly_capture_state(self, device_code, is_anomaly, is_new_anomaly, avg_error, threshold, now, pending_files): """ 异常上下文捕获状态机 状态流转: 1. 无状态 + 新异常 -> 触发捕获,回溯 audio_file_history 获取前置文件 2. 已触发 + 异常持续 -> 持续收集异常文件 3. 已触发 + 异常结束 -> 开始收集后续文件(post阶段) 4. post阶段 + 时间到 -> 保存所有文件到 anomaly_detected 所有异常文件(从触发到结束)统一保存到 anomaly_detected 目录, 包含 metadata.json 记录误差/阈值等信息。 """ state = self.anomaly_capture_state.get(device_code) if is_new_anomaly and state is None: # 新异常触发:回溯获取前置文件 anomaly_cutoff = now - timedelta(minutes=1) pre_cutoff = now - timedelta(minutes=self.context_pre_minutes + 1) pre_files = [] anomaly_files = [] for ts, fpath in self.audio_file_history[device_code]: if not fpath.exists(): continue if ts >= anomaly_cutoff: anomaly_files.append(fpath) elif ts >= pre_cutoff: pre_files.append(fpath) # 兜底:如果回溯没找到,把当前 pending 加入 if not anomaly_files and pending_files: anomaly_files = [f for f in pending_files if f.exists()] self.anomaly_capture_state[device_code] = { "trigger_time": now, "avg_error": avg_error, "threshold": threshold, "pre_files": pre_files, "anomaly_files": anomaly_files, "post_files": [], "anomaly_ended": False, "post_start_time": None } logger.info(f"异常上下文捕获已触发: {device_code} | " f"前置={len(pre_files)}个 | 异常={len(anomaly_files)}个") elif state is not None: if is_anomaly and not state["anomaly_ended"]: # 异常持续中:继续收集异常文件 for f in pending_files: if f.exists(): state["anomaly_files"].append(f) else: # 异常结束(或之前已结束,在收集后续文件) if not state["anomaly_ended"]: state["anomaly_ended"] = True state["post_start_time"] = now logger.info(f"异常结束,开始收集后续文件: {device_code} | " f"异常文件共{len(state['anomaly_files'])}个 | " f"等待{self.context_post_minutes}分钟") # 收集后续文件 for f in pending_files: if f.exists(): state["post_files"].append(f) # 检查 post 阶段是否到时间 elapsed_post = (now - state["post_start_time"]).total_seconds() / 60 if elapsed_post >= self.context_post_minutes: self._save_anomaly_context(device_code, state) # 边云协同:异常事件入队上传 # 异常上下文保存完成后,将整个事件目录加入上传队列 # enabled=false 时 enqueue_anomaly_event 直接 return if self.audio_uploader: anomaly_files = state.get('anomaly_files', []) if anomaly_files: event_dir = self.save_anomaly_dir / device_code / anomaly_files[0].stem model_group = self.multi_predictor.device_model_map.get(device_code, device_code) self.audio_uploader.enqueue_anomaly_event( device_code=device_code, event_dir=event_dir, metadata=state, model_group=model_group ) del self.anomaly_capture_state[device_code] self.last_anomaly_save_time[device_code] = now def _save_anomaly_context(self, device_code: str, state: dict): """ 保存异常上下文文件到独立目录 目录结构: data/anomaly_detected/{device_code}/{异常文件名(不含扩展名)}/ ├── 92_1#-1_20260130140313.wav (保持原文件名) ├── ... └── metadata.json metadata.json 字段说明: - before_trigger: 触发前 1~(N+1) 分钟的文件(正常时期,用于对比) - at_trigger: 触发时刻前 1 分钟内的文件(异常开始出现的时期) - after_trigger: 触发后 N 分钟的文件(不管异常是否恢复) 参数: device_code: 设备编号 state: 捕获状态字典 """ import shutil import json try: # 获取第一个异常文件名作为文件夹名 anomaly_files = state.get("anomaly_files", []) if not anomaly_files: logger.warning(f"无异常文件,跳过保存: {device_code}") return # 用第一个异常文件名(不含扩展名)作为文件夹名 first_anomaly = anomaly_files[0] folder_name = first_anomaly.stem # 如 92_1#-1_20260130140313 save_dir = self.save_anomaly_dir / device_code / folder_name save_dir.mkdir(parents=True, exist_ok=True) # 收集所有文件名(使用新命名) all_files = {"before_trigger": [], "at_trigger": [], "after_trigger": []} # 移动前置文件(来自日期目录,移动后原位置不再保留) for fpath in state.get("pre_files", []): if fpath.exists(): dest = save_dir / fpath.name shutil.move(str(fpath), str(dest)) # 移动而非复制,避免重复 all_files["before_trigger"].append(fpath.name) # 移动触发时刻文件(保持原名) for fpath in anomaly_files: if fpath.exists(): dest = save_dir / fpath.name shutil.move(str(fpath), str(dest)) all_files["at_trigger"].append(fpath.name) # 移动后续文件(保持原名) for fpath in state.get("post_files", []): if fpath.exists(): dest = save_dir / fpath.name shutil.move(str(fpath), str(dest)) all_files["after_trigger"].append(fpath.name) # 生成精简的元数据文件 trigger_time = state.get("trigger_time") metadata = { "device_code": device_code, "trigger_time": trigger_time.strftime("%Y-%m-%d %H:%M:%S") if trigger_time else None, "avg_error": round(state.get("avg_error", 0.0), 6), "threshold": round(state.get("threshold", 0.0), 6), "files": all_files } metadata_path = save_dir / "metadata.json" with open(metadata_path, 'w', encoding='utf-8') as f: json.dump(metadata, f, ensure_ascii=False, indent=2) total = sum(len(v) for v in all_files.values()) logger.warning(f"异常上下文已保存: {device_code}/{folder_name} | " f"共{total}个文件 (前{len(all_files['before_trigger'])}+异常{len(all_files['at_trigger'])}+后{len(all_files['after_trigger'])})") except Exception as e: logger.error(f"保存异常上下文失败: {device_code} | {e}") def _compute_frequency_middle(self, device_code): """ 计算历史频谱图平均值(normal_frequency_middle) 参数: device_code: 设备编号 返回: dB值列表的平均值 """ history = self.freq_history.get(device_code, []) if not history or len(history) < 2: return [] try: # 收集所有历史dB值(只保留长度一致的) all_db = [] ref_len = len(history[0][1]) # 使用第一条记录的长度作为参考 for _, db in history: if len(db) == ref_len: all_db.append(db) if not all_db: return [] # 计算各频率点的平均dB avg_db = np.mean(all_db, axis=0).tolist() return avg_db except Exception as e: logger.error(f"计算频谱图历史平均失败: {e}") return [] # ------------------------------------------------------------------ # # SCADA JWT 自动管理 # # ------------------------------------------------------------------ # def _can_auto_login(self) -> bool: return bool(self._scada_login_url and self._scada_login_username and self._scada_login_password) @staticmethod def _extract_token(payload: dict) -> str: if not isinstance(payload, dict): return "" candidates = [] data = payload.get("data") if isinstance(data, dict): candidates.extend([ data.get("token"), data.get("jwt"), data.get("jwtToken"), data.get("accessToken"), data.get("access_token"), ]) elif isinstance(data, str): candidates.append(data) candidates.extend([ payload.get("token"), payload.get("jwt"), payload.get("jwtToken"), payload.get("accessToken"), payload.get("access_token"), ]) for value in candidates: if isinstance(value, str) and value.strip(): return value.strip() return "" def _login_and_get_jwt(self) -> bool: if not self._can_auto_login(): return False body = _json.dumps({ "UserName": self._scada_login_username, "Password": self._scada_login_password, "type": self._scada_login_type, "DepId": self._scada_login_dep_id, }).encode("utf-8") req = _urllib_request.Request( self._scada_login_url, data=body, headers={"Content-Type": "application/json"}, method="POST", ) try: with _urllib_request.urlopen(req, timeout=self.scada_timeout) as resp: data = _json.loads(resp.read().decode("utf-8")) token = self._extract_token(data) if token: self.scada_jwt = token logger.info("SCADA 登录成功,JWT 已刷新") return True logger.warning("SCADA 登录成功但响应内无 JWT 字段") except (_urllib_error.URLError, TimeoutError, _json.JSONDecodeError, Exception) as e: logger.warning("SCADA 登录失败: %s", e) return False def _ensure_jwt(self) -> bool: if self.scada_jwt: return True with self._scada_auth_lock: if self.scada_jwt: return True return self._login_and_get_jwt() def _clear_jwt(self) -> None: with self._scada_auth_lock: self.scada_jwt = "" def _get_threshold(self, device_code): """ 获取指定设备的阈值 优先级: 1. 从 multi_predictor 获取设备对应模型的阈值 2. 使用配置文件中的默认阈值 3. 返回0.0(不进行异常判定) 参数: device_code: 设备编号(如 "LT-1") 返回: 阈值,未找到返回默认值或0.0 """ # 方式1:从 multi_predictor 获取设备阈值 thr = self.multi_predictor.get_threshold(device_code) # 应用倍率放宽 (如 1.2 倍),用于防止训练基线过严导致常亮误报 multiplier = self.config.get('prediction', {}).get('threshold_multiplier', 1.0) if thr is not None: if multiplier != 1.0: thr = thr * multiplier logger.debug(f"阈值来源: {device_code} -> 原始模型={thr/multiplier if multiplier!=0 else 0:.6f} × 倍数{multiplier} = 最终判定阈值{thr:.6f}") return thr # 方式2:使用配置中的默认阈值 default_threshold = self.config.get('prediction', {}).get('default_threshold', 0.0) if default_threshold > 0: logger.debug(f"阈值来源: {device_code} -> 配置默认值 = {default_threshold:.6f}") return default_threshold # 首次找不到阈值时记录警告 if device_code not in getattr(self, '_threshold_warned', set()): if not hasattr(self, '_threshold_warned'): self._threshold_warned = set() self._threshold_warned.add(device_code) logger.warning(f"未找到阈值: {device_code},将跳过异常判定") return 0.0 def _get_inlet_flow(self, stream_config: RTSPStreamConfig) -> float: """ 获取进水流量(使用实时数据接口) 使用 current-data 接口直接获取最新一条数据 参数: stream_config: 流配置 返回: 进水流量值,失败返回0.0 """ if not self.scada_enabled or not stream_config: logger.debug(f"流量跳过: scada_enabled={self.scada_enabled}, stream_config={stream_config}") return 0.0 # 获取PLC数据点位 plc_address = stream_config.get_flow_plc_address() if not plc_address: logger.debug(f"流量跳过(无PLC地址): {stream_config.device_code} | pump_name='{stream_config.pump_name}'") return 0.0 # 使用水厂配置的 project_id project_id = stream_config.project_id try: # 当前时间戳(毫秒) now_ms = int(datetime.now().timestamp() * 1000) # 请求参数 params = {"time": now_ms} # 请求体:使用实时数据接口格式 request_body = [ { "deviceId": "1", "deviceItems": plc_address, "deviceName": f"流量_{stream_config.pump_name}", "project_id": project_id } ] logger.debug(f"流量请求: {stream_config.device_code} | project_id={project_id} | plc={plc_address}") for attempt in range(2): if not self._ensure_jwt(): logger.warning("流量查询: JWT 不可用") break headers = { "Content-Type": "application/json", "JWT-TOKEN": self.scada_jwt } # 发送 POST 请求到实时接口 response = requests.post( self.scada_realtime_url, params=params, json=request_body, headers=headers, timeout=self.scada_timeout ) if response.status_code == 200: data = response.json() code = data.get("code") # 业务层 401/403 也触发 token 刷新 if code in (401, 403) and self._can_auto_login() and attempt == 0: self._clear_jwt() continue if code == 200: if data.get("data"): latest = data["data"][0] if "val" in latest: flow = float(latest["val"]) logger.debug(f"流量获取成功: {stream_config.device_code} | 流量={flow}") return flow else: logger.debug(f"流量数据无val字段: {stream_config.device_code}") else: logger.debug(f"流量查询无数据: {stream_config.device_code}") else: logger.warning(f"流量API返回异常: {stream_config.device_code} | code={code} | msg={data.get('msg')}") elif response.status_code in (401, 403) and self._can_auto_login() and attempt == 0: self._clear_jwt() if self._ensure_jwt(): continue break else: logger.warning(f"流量HTTP错误: {stream_config.device_code} | status={response.status_code}") break except Exception as e: logger.warning(f"流量获取异常: {stream_config.device_code} | {e}") return 0.0 def _compute_frequency_spectrum(self, audio_data): """ 计算频谱图数据 将1分钟内的多个8秒音频片段合并,计算整体FFT 参数: audio_data: 音频数据列表 返回: dB值列表(0-8000Hz均匀分布,共400个点) """ if not audio_data: return [] try: # 合并所有音频片段 combined = np.concatenate(audio_data) # 计算FFT n = len(combined) fft_result = np.fft.rfft(combined) freqs = np.fft.rfftfreq(n, 1.0 / CFG.SR) # 计算幅度(转换为dB) magnitude = np.abs(fft_result) # 避免log(0) magnitude = np.maximum(magnitude, 1e-10) db = 20 * np.log10(magnitude) # 降采样到400个点(0-8000Hz均匀分布) max_freq = 8000 num_points = 400 db_list = [] for i in range(num_points): # 计算目标频率(0到8000Hz均匀分布) target_freq = (i / (num_points - 1)) * max_freq # 找到最接近的频率索引 idx = np.argmin(np.abs(freqs - target_freq)) if idx < len(freqs): db_list.append(float(db[idx])) return db_list except Exception as e: logger.error(f"计算频谱图失败: {e}") return [] def _push_detection_result(self, stream_config, device_code, is_anomaly, trigger_alert, abnormal_score, score_threshold, running_status, inlet_flow, freq_db, freq_middle_db=None, anomaly_type_code=6, abnormal_wav_b64=""): """ 推送检测结果到远程服务器 上报格式符合用户要求: - 包含频谱图数据(this_frequency + normal_frequency_middle) - 包含启停状态和进水流量 - sound_detection.status 和 abnormalwav 仅在 trigger_alert=True 时上报异常(0)和base64,否则为正常(1)和空 参数: stream_config: 流配置 device_code: 设备编号 is_anomaly: 实际检测是否异常 trigger_alert: 是否触发报警(仅在新异常产生时为True) abnormal_score: 平均重建误差 score_threshold: 阈值 running_status: 启停状态 inlet_flow: 进水流量 freq_db: 本次频谱图dB值列表 freq_middle_db: 历史平均频谱图dB值列表 abnormal_wav_b64: 预读的异常音频base64编码(在文件归档前读取,避免竞态) """ try: import time as time_module # 获取设备名称 camera_name = stream_config.camera_name # 根据远程模式构建推送目标列表 if not self.push_production_urls and not self.push_test_urls: logger.warning(f"未配置push_base_urls: {device_code}") return # 获取该设备对应的 project_id,用于拼接最终推送URL project_id = stream_config.project_id # 构建推送消息 request_time = int(time_module.time() * 1000) # 异常音频已通过参数 abnormal_wav_b64 传入(调用方在文件归档前预读) # 如果调用方未提供,记录警告以便排查 if trigger_alert and not abnormal_wav_b64: logger.warning(f"报警推送但无异常音频数据: {device_code}") # 决定 sound_detection.status # 只有在 trigger_alert=True 时才报 0 (异常) # 其他情况(正常、持续异常)都报 1 (正常/空) -- 根据用户要求: "正常情况下 ... 是空 就行" report_status = 0 if trigger_alert else 1 payload = { "message": { # 通道信息 "channelInfo": {"name": camera_name}, # 请求时间戳 "requestTime": request_time, # 分类信息 "classification": { "level_one": 2, # 音频检测大类 "level_two": anomaly_type_code # 异常类型小类(6=未分类, 7=轴承, 8=气蚀, 9=松动, 10=叶轮, 11=阀件) }, # 技能信息 "skillInfo": {"name": "异响检测"}, # 声音检测数据 "sound_detection": { # 异常音频 "abnormalwav": abnormal_wav_b64, # 状态:0=异常(仅新异常), 1=正常(或持续异常) "status": report_status, # 设备状态信息 "condition": { "running_status": "运行中" if running_status == "开机" else "停机中", "inlet_flow": inlet_flow }, # 得分信息 "score": { "abnormal_score": abnormal_score, # 当前1分钟平均重构误差 "score_threshold": score_threshold # 该设备异常阀值 }, # 频谱图数据 "frequency": { "this_frequency": freq_db, # 当前1分钟的频谱图 "normal_frequency_middle": freq_middle_db or [], # 过去10分钟的频谱图平均 "normal_frequency_upper": [], # 上限(暂为空) "normal_frequency_lower": [] # 下限(暂为空) } } } } # 根据远程项目模式筛选推送目标 # mode 1/2(日常/参观):推 production + test # mode 3(检修):不推送(已由 _check_periodic_upload 拦截,此处兜底) # mode 4(调试):仅推 test if self._remote_mode_id == 3: logger.debug(f"检修模式,跳过推送: {device_code}") return if self._remote_mode_id == 4: # 调试模式:仅推送测试地址 active_urls = [(u, '测试') for u in self.push_test_urls] else: # 日常/参观模式:推送所有地址 active_urls = [(u, '正式') for u in self.push_production_urls] active_urls += [(u, '测试') for u in self.push_test_urls] if not active_urls: logger.warning(f"当前模式(mode_id={self._remote_mode_id})下无可用推送目标: {device_code}") return push_targets = [ (f"{url}/{project_id}", label) for url, label in active_urls ] # 逐个目标推送(各目标互不影响) for target_url, target_label in push_targets: self._send_to_target( target_url, target_label, payload, device_code, camera_name, trigger_alert, abnormal_score ) except Exception as e: logger.error(f"推送通知异常: {e}") def _send_to_target(self, target_url, target_label, payload, device_code, camera_name, trigger_alert, abnormal_score): # 向单个推送目标发送数据(含重试逻辑) # 各目标独立调用,某个目标失败不影响其他目标 import time as time_module push_success = False for attempt in range(self.push_retry_count + 1): try: response = requests.post( target_url, json=payload, timeout=self.push_timeout, headers={"Content-Type": "application/json"} ) if response.status_code == 200: alert_tag = "报警" if trigger_alert else "心跳" logger.info( f" [{alert_tag}][{target_label}] {device_code}({camera_name}) | " f"误差={abnormal_score:.6f}" ) push_success = True break else: logger.warning( f"推送失败[{target_label}]: {device_code} | URL={target_url} | " f"状态码={response.status_code} | 内容={response.text[:100]}" ) except requests.exceptions.Timeout: logger.warning(f"推送超时[{target_label}]: {device_code} | URL={target_url} | 尝试 {attempt + 1}/{self.push_retry_count + 1}") except requests.exceptions.RequestException as e: logger.warning(f"推送异常[{target_label}]: {device_code} | URL={target_url} | {e}") # 重试间隔 if attempt < self.push_retry_count: time_module.sleep(1) if not push_success: logger.error(f"推送失败[{target_label}]: {device_code} | URL={target_url} | 已达最大重试次数") def _should_keep_normal_sample(self, device_code, avg_error, now): # 基于重建误差的多样性采样,决定正常音频是否保留 # # 策略(不论增训是否开启,数据始终是训练的基础): # 1. 每小时配额制 + 误差多样性判定 # - 已保留样本中有误差接近的 → 视为冗余,丢弃 # - 误差有新信息 → 保留 # - 最后 10 分钟配额未满 → 放宽标准凑够配额 current_hour = now.hour current_minute = now.minute state_key = (device_code, current_hour) # 获取或创建当前小时的采样状态 if state_key not in self._sample_state: self._sample_state[state_key] = { "kept_errors": [], "kept_count": 0 } # 清理过期的小时状态(只保留当前小时和上一小时) expired_keys = [ k for k in self._sample_state if k[0] == device_code and k[1] != current_hour and k[1] != (current_hour - 1) % 24 ] for k in expired_keys: del self._sample_state[k] state = self._sample_state[state_key] # 配额已满,直接丢弃 if state["kept_count"] >= self._keep_hourly_samples: return False # 自适应 epsilon:前 50 分钟用正常值,最后 10 分钟逐步降为 0(放宽标准凑配额) if current_minute >= 50: # 最后 10 分钟:epsilon 线性衰减到 0 decay_ratio = (60 - current_minute) / 10.0 epsilon = self._diversity_base_epsilon * decay_ratio else: epsilon = self._diversity_base_epsilon # 计算与已保留样本的最小误差距离 kept_errors = state["kept_errors"] if kept_errors: min_distance = min(abs(avg_error - e) for e in kept_errors) # 误差太接近已保留的样本,视为冗余 if min_distance < epsilon: return False # 通过多样性检查,保留此样本 state["kept_errors"].append(avg_error) state["kept_count"] += 1 logger.debug( f"正常音频采样保留: {device_code} | " f"误差={avg_error:.6f} | " f"本小时已保留={state['kept_count']}/{self._keep_hourly_samples}" ) return True def _move_audio_to_date_dir(self, wav_file): """ 将音频移动到日期目录归档 目录结构: deploy_pickup/data/{device_code}/{日期}/{文件名} 参数: wav_file: 音频文件路径 """ try: import shutil # 从文件名提取device_code和日期 # 格式: 92_1#-1_20251218142000.wav match = re.match(r'\d+_(.+)_(\d{8})\d{6}\.wav', wav_file.name) if not match: logger.warning(f"无法从文件名提取信息: {wav_file.name}") return device_code = match.group(1) # 如 1#-1 date_str = match.group(2) # YYYYMMDD # 构建目标目录: data/{device_code}/{日期}/ date_dir = self.audio_dir / device_code / date_str date_dir.mkdir(parents=True, exist_ok=True) # 移动文件 dest_file = date_dir / wav_file.name shutil.move(str(wav_file), str(dest_file)) logger.debug(f"音频已归档: {device_code}/{date_str}/{wav_file.name}") except Exception as e: logger.error(f"移动音频失败: {wav_file.name} | 错误: {e}") def _move_audio_to_transition_dir(self, wav_file, reason): """ 将泵停机/过渡期的音频移动到过渡期目录 目录结构: deploy_pickup/data/{device_code}/pump_transition/{文件名} 这些音频不会被用于模型训练,但保留用于分析调试 参数: wav_file: 音频文件路径 reason: 原因标识(stopped=停机, transition=过渡期) """ try: import shutil # 从文件名提取device_code # 格式: 92_1#-1_20251218142000.wav match = re.match(r'\d+_(.+)_\d{14}\.wav', wav_file.name) if not match: logger.warning(f"无法从文件名提取信息: {wav_file.name}") return device_code = match.group(1) # 如 1#-1 # 构建目标目录: data/{device_code}/pump_transition/ transition_dir = self.audio_dir / device_code / "pump_transition" transition_dir.mkdir(parents=True, exist_ok=True) # 移动文件 dest_file = transition_dir / wav_file.name shutil.move(str(wav_file), str(dest_file)) logger.debug(f"过渡期音频已归档: {device_code}/pump_transition/{wav_file.name} ({reason})") except Exception as e: logger.error(f"移动过渡期音频失败: {wav_file.name} | 错误: {e}") def _cleanup_stale_current_files(self): # 定期清理 current 目录中超龄滞留的文件(兜底机制) # 正常文件应在 ~60s 内被处理并移走,超过 5 分钟仍在 current 的视为异常 # 每 5 分钟执行一次(通过时间戳控制频率,避免每秒都遍历目录浪费 CPU) now_ts = time.time() if not hasattr(self, '_last_stale_cleanup'): self._last_stale_cleanup = now_ts # 5 分钟执行一次 if now_ts - self._last_stale_cleanup < 300: return self._last_stale_cleanup = now_ts stale_threshold = 300 # 5 分钟 cleaned = 0 for device_code in self.device_map.keys(): current_dir = self.audio_dir / device_code / "current" if not current_dir.exists(): continue for wav_file in current_dir.glob("*.wav"): try: age = now_ts - wav_file.stat().st_mtime if age > stale_threshold: # 超龄文件归档到日期目录(保留数据价值) self._move_audio_to_date_dir(wav_file) # 从 seen_files 中移除(Path 对象,移走后路径已失效) self.seen_files.discard(wav_file) cleaned += 1 except Exception: pass if cleaned > 0: logger.info(f"current目录兜底清理: 移走 {cleaned} 个超龄滞留文件") def _cleanup_old_files(self, days: int = None): # 定期清理音频归档目录 # # 清理规则: # 1. 超过 keep_days 天的日期目录 → 整个删除 # 2. 未过期但非当天的日期目录 → 按小时分桶抽样保留 # 3. 当天的日期目录 → 不清理 # 4. current → 跳过(由 _cleanup_stale_current_files 负责) # 5. pump_transition → 按 mtime 清理超过 keep_days 的文件 # 6. verified_normal → 按 mtime 清理超过 keep_days 的文件 keep_days = days if days is not None else self._keep_days try: import shutil now = datetime.now() cutoff_date = now - timedelta(days=keep_days) cutoff_str = cutoff_date.strftime("%Y%m%d") cutoff_ts = (now - timedelta(days=keep_days)).timestamp() today_str = now.strftime("%Y%m%d") deleted_dirs = 0 trimmed_files = 0 transition_cleaned = 0 for device_dir in self.audio_dir.iterdir(): if not device_dir.is_dir(): continue for subdir in device_dir.iterdir(): if not subdir.is_dir(): continue # current 由 _cleanup_stale_current_files 负责 if subdir.name == "current": continue # pump_transition / verified_normal:按文件 mtime 清理 if subdir.name in ("pump_transition", "verified_normal"): transition_cleaned += self._cleanup_flat_dir_by_mtime( subdir, cutoff_ts ) continue # 只处理日期目录(YYYYMMDD 格式) if not re.match(r'^\d{8}$', subdir.name): continue # 过期目录:整个删除 if subdir.name < cutoff_str: try: shutil.rmtree(subdir) deleted_dirs += 1 logger.debug(f"清理过期目录: {device_dir.name}/{subdir.name}") except Exception as e: logger.error(f"删除目录失败: {subdir} | {e}") continue # 当天目录不清理 if subdir.name == today_str: continue # 历史未过期目录:按小时分桶抽样保留 trimmed_files += self._trim_date_dir_by_hour(subdir) if deleted_dirs > 0 or trimmed_files > 0 or transition_cleaned > 0: logger.info( f"清理完成: 删除{deleted_dirs}个过期目录, " f"裁剪{trimmed_files}个冗余文件, " f"清理{transition_cleaned}个过渡期/核查文件" ) except Exception as e: logger.error(f"清理过期文件失败: {e}") def _cleanup_flat_dir_by_mtime(self, target_dir, cutoff_ts): # 按文件修改时间清理平铺目录(pump_transition / verified_normal) # 这些目录没有日期子目录结构,直接存放 wav 文件 # 删除 mtime 早于 cutoff_ts 的文件,空目录不删除(保持结构) cleaned = 0 freed_bytes = 0 for f in target_dir.glob("*.wav"): try: st = f.stat() if st.st_mtime < cutoff_ts: freed_bytes += st.st_size f.unlink() cleaned += 1 except Exception: pass if cleaned > 0: logger.info( f"清理 {target_dir.parent.name}/{target_dir.name}: " f"删除 {cleaned} 个过期文件, 释放 {freed_bytes / 1e6:.1f}MB" ) return cleaned def _trim_date_dir_by_hour(self, date_dir): # 对单个日期目录按小时分桶,每桶只保留 keep_hourly_samples 个文件 # 超出的文件删除,返回删除的文件数 wav_files = list(date_dir.glob("*.wav")) if not wav_files: return 0 # 按小时分桶(从文件名提取小时:{project}_{device}_{YYYYMMDDHHMMSS}.wav) hour_buckets = defaultdict(list) for f in wav_files: match = re.match(r'\d+_.+_\d{8}(\d{2})\d{4}\.wav', f.name) if match: hour = int(match.group(1)) hour_buckets[hour].append(f) else: # 无法解析小时的文件直接保留 hour_buckets[-1].append(f) deleted = 0 for hour, files in hour_buckets.items(): if len(files) <= self._keep_hourly_samples: continue # 按修改时间排序,均匀保留(等间隔取样) files_sorted = sorted(files, key=lambda f: f.stat().st_mtime) # 计算保留间隔 keep_indices = set() step = len(files_sorted) / self._keep_hourly_samples for i in range(self._keep_hourly_samples): keep_indices.add(int(i * step)) for idx, f in enumerate(files_sorted): if idx not in keep_indices: try: f.unlink() deleted += 1 except Exception: pass return deleted class PickupMonitoringSystem: """ 拾音器监控系统 管理FFmpeg进程和监控线程 """ def __init__(self, db_path=None, yaml_config=None, test_mode=False): """ 初始化监控系统 参数: db_path: SQLite 数据库路径(为 None 时使用默认路径 config/pickup_config.db) yaml_config: 若不为 None,则直接使用该 dict 作为配置(跳过 DB) test_mode: 测试模式,禁用聚合/冷却/投票,所有URL都推送 """ from config.loader import load_project_config # 测试模式标记,传递给 PickupMonitor self.test_mode = test_mode if yaml_config is not None: # 向下兼容:如果外部强行传入了 yaml_config(例如从 run_with_auto_training),直接使用 self.config = yaml_config self.config_manager = None print("配置源: YAML (外部传入)") else: # 统一加载 config, source, cm = load_project_config() self.config = config self.config_manager = cm # 冷启动模式标记 self.cold_start_mode = False # 初始化多模型预测器(支持每个设备独立模型) self.multi_predictor = MultiModelPredictor() self.predictor = None # 兼容性保留,已废弃 # 从配置中注册所有设备的模型目录映射 print("\n正在初始化多模型预测器...") for plant in self.config.get('plants', []): if not plant.get('enabled', False): continue for stream in plant.get('rtsp_streams', []): device_code = stream.get('device_code', '') model_subdir = stream.get('model_subdir', device_code) if device_code and model_subdir: self.multi_predictor.register_device(device_code, model_subdir) print(f" 注册设备: {device_code} -> models/{model_subdir}/") print(f"已注册 {len(self.multi_predictor.registered_devices)} 个设备模型映射") # 进程和监控器列表 self.ffmpeg_processes = [] self.monitors = [] # 信号处理 signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def _load_config(self): """ 从 SQLite DB 加载配置 返回: Dict: 配置字典 """ config = self.config_manager.get_full_config() logger.info("配置已从 SQLite 加载") return config def _parse_rtsp_streams(self): """ 解析配置文件中的RTSP流信息 返回: List[RTSPStreamConfig]: RTSP流配置列表 """ streams = [] plants = self.config.get('plants', []) if not plants: raise ValueError("配置文件中未找到水厂配置") for plant in plants: plant_name = plant.get('name') if not plant_name: print("警告: 跳过未命名的区域配置") continue # 检查是否启用该水厂(默认启用以兼容旧配置) if not plant.get('enabled', True): logger.debug(f"跳过禁用的水厂: {plant_name}") continue # 获取流量PLC配置 flow_plc = plant.get('flow_plc', {}) # 获取该水厂的project_id(每个plant有自己的project_id) project_id = plant.get('project_id', 92) logger.info(f"加载区域配置: {plant_name} | project_id={project_id}") rtsp_streams = plant.get('rtsp_streams', []) for stream in rtsp_streams: url = stream.get('url') channel = stream.get('channel') camera_name = stream.get('name', '') device_code = stream.get('device_code', '') pump_name = stream.get('pump_name', '') if not url or channel is None: print(f"警告: 跳过不完整的RTSP流配置 (区域: {plant_name})") continue streams.append(RTSPStreamConfig( plant_name=plant_name, rtsp_url=url, channel=channel, camera_name=camera_name, device_code=device_code, pump_name=pump_name, flow_plc=flow_plc, project_id=project_id )) return streams def start(self): """ 启动监控系统 """ print("=" * 70) print("拾音器异响检测系统") print("=" * 70) # 解析流配置 streams = self._parse_rtsp_streams() print(f"\n共配置 {len(streams)} 个拾音设备:") for stream in streams: print(f" - {stream.device_code} | {stream.camera_name}") # 启动FFmpeg进程 print("\n启动FFmpeg进程...") for stream in streams: ffmpeg = FFmpegProcess(stream, CFG.AUDIO_DIR, self.config) if ffmpeg.start(): self.ffmpeg_processes.append(ffmpeg) else: print(f"警告: FFmpeg启动失败,跳过该流: {stream}") if not self.ffmpeg_processes: print("\n错误: 所有FFmpeg进程均启动失败") sys.exit(1) print(f"\n成功启动 {len(self.ffmpeg_processes)}/{len(streams)} 个FFmpeg进程") # 收集所有流配置(用于PickupMonitor) all_stream_configs = [p.stream_config for p in self.ffmpeg_processes] # 启动监控线程(统一一个监控器) print("\n启动监控线程...") check_interval = self.config.get('prediction', {}).get('check_interval', 1.0) monitor = PickupMonitor( audio_dir=CFG.AUDIO_DIR, multi_predictor=self.multi_predictor, stream_configs=all_stream_configs, check_interval=check_interval, config=self.config, config_manager=self.config_manager, test_mode=self.test_mode ) monitor.start() self.monitors.append(monitor) print(f"\n成功启动监控线程") print("\n" + "=" * 70) print("系统已启动,开始监控...") print("按 Ctrl+C 停止系统") print("=" * 70 + "\n") # FFmpeg重启配置 max_restart_attempts = 5 # 单个进程最大重启次数 restart_interval_base = 30 # 基础重启间隔(秒)-> 改为30s restart_counts = {id(p): 0 for p in self.ffmpeg_processes} # 重启计数 restart_timers = {} # {pid: next_restart_time} 非阻塞重启调度 # 主循环(带自动重启) try: while True: now_ts = time.time() # 检查每个FFmpeg进程状态 for ffmpeg in self.ffmpeg_processes: if not ffmpeg.is_running(): pid = id(ffmpeg) device_code = ffmpeg.stream_config.device_code # 检查重启次数 if restart_counts.get(pid, 0) < max_restart_attempts: # 非阻塞:检查是否到达重启时间 next_time = restart_timers.get(pid, 0) if next_time == 0: # 首次检测到停止,记录重启时间 wait_time = restart_interval_base * (2 ** restart_counts.get(pid, 0)) restart_timers[pid] = now_ts + wait_time logger.warning(f"FFmpeg进程停止: {device_code} | 将在{wait_time}秒后重启 | " f"重试次数: {restart_counts.get(pid, 0) + 1}/{max_restart_attempts}") elif now_ts >= next_time: # 到达重启时间,尝试重启 if ffmpeg.start(): logger.info(f"FFmpeg重启成功: {device_code}") restart_counts[pid] = 0 restart_timers.pop(pid, None) else: restart_counts[pid] = restart_counts.get(pid, 0) + 1 logger.error(f"FFmpeg重启失败: {device_code}") # 安排下一次重启 wait_time = restart_interval_base * (2 ** restart_counts.get(pid, 0)) restart_timers[pid] = now_ts + wait_time else: logger.error(f"FFmpeg达到最大重启次数: {device_code} | 已放弃重启") # 检查是否所有进程都已放弃 running_count = sum(1 for p in self.ffmpeg_processes if p.is_running()) all_abandoned = all(restart_counts.get(id(p), 0) >= max_restart_attempts for p in self.ffmpeg_processes if not p.is_running()) if running_count == 0 and all_abandoned: print("\n错误: 所有FFmpeg进程均已停止且无法重启") break # 每天0点执行清理(整个0点小时内都尝试,避免sleep跨过错过) current_hour = datetime.now().hour current_date = datetime.now().strftime("%Y%m%d") if not hasattr(self, '_last_cleanup_date'): self._last_cleanup_date = "" # 在0点小时内且今天还没清理过时执行 if 0 <= current_hour < 1 and self._last_cleanup_date != current_date: logger.info("执行定期文件清理...") for monitor in self.monitors: monitor._cleanup_old_files() self._last_cleanup_date = current_date # 定期打印RTSP状态(每分钟一次) if not hasattr(self, '_last_status_log'): self._last_status_log = datetime.now() if (datetime.now() - self._last_status_log).total_seconds() >= 60: running = sum(1 for p in self.ffmpeg_processes if p.is_running()) total = len(self.ffmpeg_processes) logger.info(f"RTSP状态: {running}/{total} 个FFmpeg进程运行中") logger.info("─" * 60) self._last_status_log = datetime.now() time.sleep(10) except KeyboardInterrupt: print("\n\n收到停止信号,正在关闭系统...") finally: self.stop() def stop(self): """ 停止监控系统 """ print("\n正在停止监控系统...") print("停止监控线程...") for monitor in self.monitors: monitor.stop() print("停止FFmpeg进程...") for ffmpeg in self.ffmpeg_processes: ffmpeg.stop() print("系统已完全停止") def _signal_handler(self, signum, frame): """ 信号处理函数 """ print(f"\n\n收到信号 {signum},正在关闭系统...") self.stop() sys.exit(0) def _start_config_api_server(config_manager, multi_predictor=None, port=18080): # 在后台线程中启动 FastAPI 配置管理 API try: import uvicorn init_config_api(config_manager, multi_predictor) logger.info(f"启动配置管理 API: http://0.0.0.0:{port}") uvicorn.run(config_app, host="0.0.0.0", port=port, log_level="warning") except ImportError: logger.warning("uvicorn 未安装,配置管理 API 无法启动。请安装: pip install uvicorn") except Exception as e: logger.error(f"配置管理 API 启动失败: {e}") def main(): """ 主函数 """ # 检测 DB 是否存在 db_path = get_db_path(Path(__file__).parent / "config") if not db_path.exists(): print(f"错误: 配置数据库不存在: {db_path}") print(f"\n请先运行迁移脚本: python tool/migrate_yaml_to_db.py") sys.exit(1) try: system = PickupMonitoringSystem() # 后台线程启动配置管理 API api_port = 18080 api_thread = threading.Thread( target=_start_config_api_server, args=(system.config_manager, system.multi_predictor, api_port), daemon=True, name="config-api" ) api_thread.start() system.start() except FileNotFoundError as e: print(f"\n错误: {e}") print("\n请确保:") print("1. 已完成训练并计算阈值") print("2. 已复制必要的模型文件到 models/ 目录") sys.exit(1) except Exception as e: print(f"\n严重错误: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()