Bladeren bron

移除上报格式中预留的 upper/lower 频谱字段

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
wmy 3 dagen geleden
bovenliggende
commit
f1af45dc1d
1 gewijzigde bestanden met toevoegingen van 2400 en 0 verwijderingen
  1. 2400 0
      run_pickup_monitor_old.py

+ 2400 - 0
run_pickup_monitor_old.py

@@ -0,0 +1,2400 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+run_pickup_monitor.py
+---------------------
+拾音器异响检测主程序
+
+功能说明:
+该程序用于音频采集设备(拾音器)的实时异常检测。
+与摄像头版本不同,本版本:
+1. 没有视频/图片采集和上报
+2. 上报时包含音频频谱图分析数据
+3. 支持进水流量PLC数据读取
+4. 每1分钟计算一次平均重建误差并上报
+
+上报数据结构:
+{
+    "message": {
+        "channelInfo": {"name": "设备信息"},
+        "requestTime": 时间戳,
+        "classification": {
+            "level_one": 2,  // 音频检测大类
+            "level_two": 6   // 异常类型小类: 6=未分类, 7=轴承, 8=气蚀, 9=松动/共振, 10=叶轮, 11=阀件
+        },
+        "skillInfo": {"name": "异响检测"},
+        "sound_detection": {
+            "abnormalwav": "",           // base64编码的音频
+            "status": 0/1,               // 0=异常, 1=正常
+            "condition": {
+                "running_status": "运行中/停机中",  // 启停状态
+                "inlet_flow": 0.0            // 进水流量
+            },
+            "score": {
+                "abnormal_score": 0.0,   // 当前1分钟内8s平均重构误差
+                "score_threshold": 0.0   // 该设备的异常阀值
+            },
+            "frequency": {
+                "this_frequency": [],              // 当前1分钟的频谱图
+                "normal_frequency_middle": [],    // 过去10分钟的频谱图平均
+                "normal_frequency_upper": [],     // 上限(暂为空)
+                "normal_frequency_lower": []      // 下限(暂为空)
+            }
+        }
+    }
+}
+
+使用方法:
+    python run_pickup_monitor.py
+
+配置文件:
+    config/rtsp_config.yaml
+"""
+
+import subprocess
+import time
+import re
+import signal
+import sys
+import threading
+import logging
+import base64
+from concurrent.futures import ThreadPoolExecutor
+from pathlib import Path
+
+from datetime import datetime, timedelta
+from collections import defaultdict
+
+# 用于计算FFT
+import numpy as np
+
+
+try:
+    import librosa
+except ImportError:
+    print("错误:缺少librosa库,请安装: pip install librosa")
+    sys.exit(1)
+
+try:
+    import requests
+except ImportError:
+    print("错误:缺少requests库,请安装: pip install requests")
+    sys.exit(1)
+
+# 导入预测器模块
+from predictor import MultiModelPredictor, CFG
+
+# 导入配置管理模块(SQLite)
+from config.config_manager import ConfigManager
+from config.config_api import app as config_app, init_config_api
+from config.db_models import get_db_path
+
+# 导入泵状态监控模块(用于检测启停过渡期)
+try:
+    from core.pump_state_monitor import PumpStateMonitor
+    PUMP_STATE_MONITOR_AVAILABLE = True
+except ImportError:
+    PUMP_STATE_MONITOR_AVAILABLE = False
+
+# 导入人体检测读取模块(用于抑制有人时的误报)
+try:
+    from core.human_detection_reader import HumanDetectionReader
+    HUMAN_DETECTION_AVAILABLE = True
+except ImportError:
+    HUMAN_DETECTION_AVAILABLE = False
+
+# 导入报警聚合器(跨设备聚合抑制 + 分类型冷却)
+try:
+    from core.alert_aggregator import AlertAggregator
+    ALERT_AGGREGATOR_AVAILABLE = True
+except ImportError:
+    ALERT_AGGREGATOR_AVAILABLE = False
+
+
+# ========================================
+# 配置日志系统
+# ========================================
+def setup_logging():
+    # 配置日志系统(RotatingFileHandler -> system.log)
+    from logging.handlers import RotatingFileHandler
+
+    # 如果根 logger 已经被上层调用者配置过,则直接复用
+    root = logging.getLogger()
+    if root.handlers:
+        return logging.getLogger('PickupMonitor')
+
+    # 日志配置
+    log_dir = Path(__file__).parent / "logs"
+    log_dir.mkdir(parents=True, exist_ok=True)
+    log_file = log_dir / "system.log"
+
+    formatter = logging.Formatter(
+        '%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s',
+        datefmt='%Y-%m-%d %H:%M:%S'
+    )
+
+    # 按文件大小轮转,最多保留 2 个备份(共 30MB)
+    file_handler = RotatingFileHandler(
+        log_file,
+        maxBytes=10 * 1024 * 1024,
+        backupCount=2,
+        encoding='utf-8'
+    )
+    file_handler.setFormatter(formatter)
+
+    # 控制台输出(前台运行时可见,后台运行时 stdout 已被丢弃不影响)
+    console_handler = logging.StreamHandler(sys.stdout)
+    console_handler.setFormatter(formatter)
+
+    logging.basicConfig(
+        level=logging.INFO,
+        handlers=[file_handler, console_handler]
+    )
+
+    return logging.getLogger('PickupMonitor')
+
+
+# 初始化日志系统
+logger = setup_logging()
+
+# 导入能量基线模块
+try:
+    from core.energy_baseline import EnergyBaseline
+    ENERGY_BASELINE_AVAILABLE = True
+except ImportError:
+    ENERGY_BASELINE_AVAILABLE = False
+    logger.warning("能量基线模块未找到,泵状态检测功能禁用")
+
+
+class RTSPStreamConfig:
+    """
+    RTSP流配置类
+    
+    封装单个RTSP流的配置信息,包含拾音器特有字段
+    """
+    
+    def __init__(self, plant_name, rtsp_url, channel, 
+                 camera_name, device_code, pump_name,
+                 flow_plc, project_id):
+        """
+        初始化RTSP流配置
+        
+        参数:
+            plant_name: 区域名称(泵房-反渗透高压泵等)
+            rtsp_url: RTSP流URL
+            channel: 通道号
+            camera_name: 设备名称
+            device_code: 设备编号(如1#-1)
+            pump_name: 泵名称(A/B/C/D),用于匹配流量PLC
+            flow_plc: 流量PLC地址映射
+        """
+        self.plant_name = plant_name
+        self.rtsp_url = rtsp_url
+        self.channel = channel
+        self.pump_id = f"ch{channel}"
+        self.camera_name = camera_name or f"ch{channel}"
+        self.device_code = device_code
+        self.pump_name = pump_name
+        self.flow_plc = flow_plc or {}
+        self.project_id = project_id
+    
+    def get_flow_plc_address(self):
+        """
+        获取该设备对应的进水流量PLC地址
+        
+        返回:
+            PLC地址字符串,不存在则返回空字符串
+        """
+        if self.pump_name and self.flow_plc:
+            return self.flow_plc.get(self.pump_name, "")
+        return ""
+    
+    def __repr__(self):
+        return f"RTSPStreamConfig(plant='{self.plant_name}', camera='{self.camera_name}', pump='{self.pump_id}')"
+
+
+class FFmpegProcess:
+    """
+    FFmpeg进程管理类
+    
+    负责启动和管理单个RTSP流的FFmpeg进程。
+    进程将RTSP流转换为固定时长的WAV音频文件。
+    
+    文件名格式: {project_id}_{device_code}_{时间戳}.wav
+    """
+    
+    def __init__(self, stream_config, output_dir, config=None):
+        """
+        初始化FFmpeg进程管理器
+        
+        参数:
+            stream_config: RTSP流配置
+            output_dir: 音频输出目录
+            config: 全局配置字典
+        """
+        self.config_dict = config or {}
+        self.stream_config = stream_config
+        self.output_dir = output_dir
+        self.process = None
+        
+        # 从配置中读取文件时长,默认8秒
+        audio_cfg = self.config_dict.get('audio', {})
+        self.file_duration = audio_cfg.get('file_duration', 8)
+        
+        # 获取project_id(从 stream_config 中读取)
+        self.project_id = stream_config.project_id
+    
+    def start(self):
+        """
+        启动FFmpeg进程
+        
+        返回:
+            bool: 启动成功返回True,失败返回False
+        """
+        # 获取设备编号
+        device_code = self.stream_config.device_code or self.stream_config.pump_id
+        
+        # 创建输出目录(每个设备独立目录)
+        # 结构: data/{device_code}/current/
+        current_dir = self.output_dir / device_code / "current"
+        current_dir.mkdir(parents=True, exist_ok=True)
+        # 存放人工核查确认为正常的异常音频,增训时全量参与训练
+        verified_dir = self.output_dir / device_code / "verified_normal"
+        verified_dir.mkdir(parents=True, exist_ok=True)
+        
+        # 构建输出文件名模板
+        # 格式: {project_id}_{device_code}_{时间戳}.wav
+        # 例如: 92_1#-1_20251218142000.wav
+        output_pattern = str(current_dir / f"{self.project_id}_{device_code}_%Y%m%d%H%M%S.wav")
+        
+        # 构建FFmpeg命令
+        # 添加内存限制参数,防止 RTSP 缓冲区无限增长导致 OOM
+        cmd = [
+            "ffmpeg",
+            # RTSP 输入参数(内存限制)
+            "-rtsp_transport", "tcp",           # 使用TCP传输
+            "-probesize", "1000000",            # 限制探测大小为1MB(默认5MB)
+            "-analyzeduration", "1000000",      # 限制分析时长为1秒(默认5秒)
+            "-max_delay", "500000",             # 最大延迟500ms
+            "-fflags", "nobuffer",              # 禁用输入缓冲
+            "-flags", "low_delay",              # 低延迟模式
+            "-i", self.stream_config.rtsp_url,  # 输入RTSP流
+            # 音频输出参数
+            "-vn",                              # 不处理视频
+            "-ac", "1",                         # 单声道
+            "-ar", str(CFG.SR),                 # 采样率(16000Hz)
+            "-f", "segment",                    # 分段模式
+            "-segment_time", str(int(self.file_duration)),  # 每段时长
+            "-strftime", "1",                   # 启用时间格式化
+            "-loglevel", "error",               # 只输出错误日志
+            "-y",                               # 覆盖已存在的文件
+            output_pattern,
+        ]
+        
+        try:
+            # 启动FFmpeg进程
+            self.process = subprocess.Popen(
+                cmd,
+                stdout=subprocess.PIPE,
+                stderr=subprocess.PIPE
+            )
+            
+            logger.info(f"FFmpeg已启动: {device_code} | {self.stream_config.camera_name} | "
+                  f"文件时长: {self.file_duration}秒 | PID={self.process.pid}")
+            return True
+            
+        except FileNotFoundError:
+            logger.error("FFmpeg错误: 未找到ffmpeg命令,请确保已安装FFmpeg")
+            return False
+        except Exception as e:
+            logger.error(f"FFmpeg启动失败: {device_code} | 错误: {e}")
+            return False
+    
+    def is_running(self):
+        """
+        检查FFmpeg进程是否在运行
+        
+        返回:
+            bool: 进程运行中返回True,否则返回False
+        """
+        if self.process is None:
+            return False
+        return self.process.poll() is None
+    
+    def stop(self):
+        """
+        停止FFmpeg进程
+        """
+        if self.process is not None and self.is_running():
+            logger.info(f"FFmpeg停止: {self.stream_config.plant_name} | {self.stream_config.camera_name} | PID={self.process.pid}")
+            self.process.terminate()
+            try:
+                self.process.wait(timeout=5)
+            except subprocess.TimeoutExpired:
+                logger.warning(f"FFmpeg强制终止: PID={self.process.pid}")
+                self.process.kill()
+
+
+class PickupMonitor:
+    """
+    拾音器监控线程类
+    
+    监控音频目录,调用预测器检测异常,推送告警。
+    
+    主要功能:
+    1. 不截取视频帧(纯音频)
+    2. 计算并上报频谱图数据(this_frequency + normal_frequency_middle)
+    3. 每分钟汇总上报一次
+    4. 使用SCADA API获取进水流量
+    """
+    
+    def __init__(self, audio_dir, multi_predictor, 
+                 stream_configs,
+                 check_interval=1.0, config=None,
+                 config_manager=None):
+        """
+        初始化监控器
+        
+        Args:
+            audio_dir: 音频根目录
+            multi_predictor: 多模型预测器实例
+            stream_configs: 所有RTSP流配置
+            check_interval: 检查间隔(秒)
+            config: 配置字典
+            config_manager: ConfigManager 实例(用于热更新,为 None 时禁用热更新)
+        """
+        # 音频根目录(各设备目录在其下)
+        self.audio_dir = audio_dir
+        self.multi_predictor = multi_predictor
+        self.predictor = None  # 兼容性保留,已废弃
+        self.stream_configs = stream_configs
+        self.check_interval = check_interval
+        self.config = config or {}
+        
+        # 热更新:持有 ConfigManager 引用,定期从 DB 刷新配置
+        self._config_manager = config_manager
+        self._last_config_reload = 0       # 上次配置刷新时间戳
+        self._config_reload_interval = 30  # 配置刷新间隔(秒)
+        
+        # project_id
+        self.project_id = self.config.get('platform', {}).get('project_id', 92)
+        
+        # 构建device_code到stream_config的映射
+        self.device_map = {}
+        for cfg in stream_configs:
+            if cfg.device_code:
+                self.device_map[cfg.device_code] = cfg
+        
+        # 已处理文件集合
+        self.seen_files = set()
+        
+        # 每个设备的检测结果缓存(用于1分钟汇总)
+        # key: device_code(如 "1#-1")
+        self.device_cache = defaultdict(lambda: {
+            "errors": [],          # 每8秒的重建误差列表
+            "last_upload": None,   # 上次上报时间
+            "audio_data": [],      # 用于计算频谱图的音频数据
+            "status": None         # 最近的运行状态
+        })
+        
+        # 频谱图历史缓存(用于计算normal_frequency_middle)
+        # key: device_code, value: list of (timestamp, freq_db)
+        freq_cfg = self.config.get('prediction', {}).get('frequency_history', {})
+        self.freq_history_enabled = freq_cfg.get('enabled', True)
+        self.freq_history_minutes = freq_cfg.get('history_minutes', 10)
+        self.freq_history = defaultdict(list)
+        
+        # 上一次上报状态(True=异常,False=正常,None=初始)
+        # 用于状态变更时去重,防止持续报警
+        self.last_report_status = {}
+        
+        # 上报周期(秒)
+        audio_cfg = self.config.get('audio', {})
+        self.segment_duration = audio_cfg.get('segment_duration', 60)
+        
+        # 异常音频保存配置
+        save_cfg = self.config.get('prediction', {}).get('save_anomaly_audio', {})
+        self.save_anomaly_enabled = save_cfg.get('enabled', True)
+        self.save_anomaly_dir = Path(__file__).parent / save_cfg.get('save_dir', 'data/anomaly_detected')
+        if self.save_anomaly_enabled:
+            self.save_anomaly_dir.mkdir(parents=True, exist_ok=True)
+        
+        # 异常推送配置
+        push_cfg = self.config.get('push_notification', {})
+        self.push_enabled = push_cfg.get('enabled', False)
+        self.alert_enabled = push_cfg.get('alert_enabled', True)  # 是否启用异常告警(false=暂时禁用异常上报)
+        self.push_timeout = push_cfg.get('timeout', 30)
+        self.push_retry_count = push_cfg.get('retry_count', 2)
+        # 推送基地址列表(统一用 base_url/{project_id} 拼接,可无限扩展)
+        raw_urls = push_cfg.get('push_base_urls', [])
+        self.push_base_urls = [
+            {"label": item.get("label", f"target-{i}"), "url": item.get("url", "").rstrip("/")}
+            for i, item in enumerate(raw_urls)
+            if item.get("url")  # 跳过空URL
+        ]
+        # 推送失败记录文件路径
+        failed_log_path = push_cfg.get('failed_push_log', 'data/push_failures.jsonl')
+        self.failed_push_log = Path(__file__).parent / failed_log_path
+        self.failed_push_log.parent.mkdir(parents=True, exist_ok=True)
+        
+        # 如果 alert_enabled 为 False,记录日志提醒
+        if not self.alert_enabled:
+            logger.warning("异常告警已禁用(alert_enabled=false),仅上报心跳状态")
+        
+        # ========================================
+        # 报警聚合器(替代原有固定 cooldown_minutes)
+        # ----------------------------------------
+        # 功能1:跨设备聚合抑制 - 同一水厂多设备同时报警 -> 环境噪声,全部抑制
+        # 功能2:分类型冷却 - 同类型24h,不同类型1h
+        # ========================================
+        agg_cfg = push_cfg.get('alert_aggregate', {})
+        self.alert_aggregator = None
+        if ALERT_AGGREGATOR_AVAILABLE:
+            self.alert_aggregator = AlertAggregator(
+                push_callback=self._push_detection_result,
+                aggregate_enabled=agg_cfg.get('enabled', True),
+                window_seconds=agg_cfg.get('window_seconds', 300),
+                min_devices=agg_cfg.get('min_devices', 2),
+                cooldown_same_type_hours=push_cfg.get('cooldown_same_type_hours', 24),
+                cooldown_diff_type_hours=push_cfg.get('cooldown_diff_type_hours', 1)
+            )
+        else:
+            logger.warning("报警聚合器模块未找到,使用默认报警逻辑")
+        
+        # 上次异常音频保存时间(用于保存冷却时间计算)
+        self.last_anomaly_save_time = {}
+        # 异常保存冷却时间(分钟),同一设备连续异常时,每N分钟只保存一次
+        self.anomaly_save_cooldown_minutes = save_cfg.get('cooldown_minutes', 10)
+        
+        # 当前异常分类结果锁定(持续异常期间保持分类结果不变)
+        # key: device_code, value: (anomaly_type_code, type_name)
+        self.locked_anomaly_type = {}
+        
+        # 滑动窗口投票配置(5次中有3次异常才判定为异常)
+        voting_cfg = self.config.get('prediction', {}).get('voting', {})
+        self.voting_enabled = voting_cfg.get('enabled', True)
+        self.voting_window_size = voting_cfg.get('window_size', 5)   # 窗口大小
+        self.voting_threshold = voting_cfg.get('threshold', 3)       # 异常阈值(>=3次则判定异常)
+        self.detection_history = {}          # 每个设备的检测历史(True=异常)
+        
+        # 阈值容差区间配置(避免边界值反复跳变)
+        # 误差在 threshold*(1-tolerance) ~ threshold*(1+tolerance) 范围内为灰区,维持上一状态
+        self.tolerance_ratio = voting_cfg.get('tolerance_ratio', 0.05)  # 默认5%容差
+        self.last_single_anomaly = {}  # 每个设备上一次的单周期判定结果
+        
+        # 阈值现在由 multi_predictor 管理,每个设备从其对应模型目录加载
+        
+        # 能量检测配置
+        energy_cfg = self.config.get('prediction', {}).get('energy_detection', {})
+        self.energy_detection_enabled = ENERGY_BASELINE_AVAILABLE and energy_cfg.get('enabled', True)
+        self.skip_detection_when_stopped = energy_cfg.get('skip_when_stopped', True)
+        
+        # 能量基线检测器(每个设备一个)
+        if self.energy_detection_enabled:
+            self.energy_baselines = {}
+        else:
+            self.energy_baselines = None
+        
+        # SCADA API配置(用于获取泵状态和进水流量)
+        scada_cfg = self.config.get('scada_api', {})
+        self.scada_enabled = scada_cfg.get('enabled', False)
+        self.scada_url = scada_cfg.get('base_url', '')  # 历史数据接口(备用)
+        self.scada_realtime_url = scada_cfg.get('realtime_url', '')  # 实时数据接口(主用)
+        self.scada_jwt = scada_cfg.get('jwt_token', '')
+        self.scada_timeout = scada_cfg.get('timeout', 10)
+        
+        # 泵状态监控器(用于检测启停过渡期)
+        self.pump_state_monitor = None
+        self.pump_status_plc_configs = {}  # {pump_name: [{point, name}, ...]}
+        if PUMP_STATE_MONITOR_AVAILABLE and self.scada_enabled:
+            # 初始化泵状态监控器
+            # 获取第一个启用水厂的project_id
+            project_id = 0
+            for plant in self.config.get('plants', []):
+                if plant.get('enabled', False):
+                    project_id = plant.get('project_id', 0)
+                    # 加载泵状态点位配置
+                    pump_status_plc = plant.get('pump_status_plc', {})
+                    self.pump_status_plc_configs = pump_status_plc
+                    break
+            
+            if project_id > 0:
+                # 使用实时接口 URL 进行泵状态查询
+                self.pump_state_monitor = PumpStateMonitor(
+                    scada_url=self.scada_realtime_url,  # 使用实时数据接口
+                    scada_jwt=self.scada_jwt,
+                    project_id=project_id,
+                    timeout=self.scada_timeout,
+                    transition_window_minutes=15  # 启停后15分钟内视为过渡期
+                )
+                logger.info(f"泵状态监控器已启用 (project_id={project_id}, 过渡期窗口=15分钟, 使用实时接口)")
+        
+        # 线程控制
+        self.running = False
+        self.thread = None
+        # 推送线程池(避免推送超时阻塞主监控循环)
+        self._push_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="push")
+        
+        # 启动时间(用于跳过启动期间的状态变化日志)
+        self.startup_time = None
+        self.startup_warmup_seconds = 120  # 启动后120秒内不记录状态变化
+        
+        # ========================================
+        # 异常上下文捕获配置
+        # ========================================
+        context_cfg = save_cfg.get('context_capture', {})
+        self.context_pre_minutes = context_cfg.get('pre_minutes', 2)
+        self.context_post_minutes = context_cfg.get('post_minutes', 2)
+
+        # 音频文件历史缓存(用于捕获异常前的音频)
+        # key: device_code, value: deque of (timestamp, file_path)
+        from collections import deque
+        # 计算需要保留的历史文件数量(按分钟计算,每分钟约1个文件)
+        history_size = self.context_pre_minutes + 2  # 多保留2个作为缓冲
+        self.audio_file_history = defaultdict(lambda: deque(maxlen=history_size))
+
+        # 异常上下文捕获状态
+        # key: device_code, value: dict
+        self.anomaly_capture_state = {}
+
+        logger.info(f"异常上下文捕获: 前{self.context_pre_minutes}分钟 + 后{self.context_post_minutes}分钟")
+        
+        # ========================================
+        # 人体检测报警抑制配置
+        # ========================================
+        # 只要任意摄像头在冷却时间内检测到人,所有设备的异常报警都会被抑制
+        human_cfg = self.config.get('human_detection', {})
+        self.human_detection_enabled = human_cfg.get('enabled', False) and HUMAN_DETECTION_AVAILABLE
+        self.human_reader = None
+        
+        if self.human_detection_enabled:
+            db_path = human_cfg.get('db_path', '')
+            cooldown_minutes = human_cfg.get('cooldown_minutes', 5)
+            self.human_reader = HumanDetectionReader(
+                db_path=db_path,
+                cooldown_minutes=cooldown_minutes
+            )
+            logger.info(f"人体检测报警抑制已启用 (冷却时间={cooldown_minutes}分钟, 数据库={db_path})")
+        
+        # ========================================
+        # 远程异响检测调度配置
+        # ========================================
+        # 通过定时 GET 请求远程接口,根据返回的 mode_type、model_list、时间窗口
+        # 动态控制 alert_enabled 开关
+        schedule_cfg = self.config.get('detection_schedule', {})
+        self._detection_schedule_url = schedule_cfg.get('url', '')
+        self._detection_schedule_interval = schedule_cfg.get('poll_interval', 60)
+        self._detection_schedule_timeout = schedule_cfg.get('request_timeout', 10)
+        # 独立计时器,与配置热刷新的 30 秒间隔解耦
+        self._last_detection_schedule_check = 0
+        # 远程调度控制的暂停标志(优先级最高,True 时跳过整个检测流程,包括模型推理)
+        self.detection_paused = False
+        
+        if self._detection_schedule_url:
+            logger.info(f"远程异响调度已启用 | URL={self._detection_schedule_url} | 间隔={self._detection_schedule_interval}秒")
+    
+    def start(self):
+        """
+        启动监控线程
+        """
+        if self.running:
+            return
+        
+        # 启动前清理current目录中的遗留文件
+        self._cleanup_current_on_startup()
+        
+        self.running = True
+        self.startup_time = datetime.now()  # 记录启动时间
+        self.thread = threading.Thread(target=self._monitor_loop, daemon=True)
+        self.thread.start()
+        # 打印监控的设备列表
+        device_codes = list(self.device_map.keys())
+        logger.info(f"监控线程已启动 | 目录: {self.audio_dir} | 监控设备: {device_codes}")
+    
+    def _cleanup_current_on_startup(self):
+        """
+        启动时清理current目录中的遗留文件
+        
+        删除上次运行遗留的文件,避免混入新采集数据影响检测准确性
+        """
+        cleaned_count = 0
+        
+        for device_code in self.device_map.keys():
+            current_dir = self.audio_dir / device_code / "current"
+            if not current_dir.exists():
+                continue
+            
+            for wav_file in current_dir.glob("*.wav"):
+                try:
+                    wav_file.unlink()
+                    cleaned_count += 1
+                except Exception as e:
+                    logger.warning(f"清理遗留文件失败: {wav_file.name} | {e}")
+        
+        if cleaned_count > 0:
+            logger.info(f"启动清理: 已删除current目录中 {cleaned_count} 个遗留文件")
+    
+    def stop(self):
+        """
+        停止监控线程
+        """
+        if not self.running:
+            return
+        
+        self.running = False
+        if self.thread is not None:
+            self.thread.join(timeout=5)
+        # 关闭推送线程池,等待已提交的推送任务完成
+        self._push_executor.shutdown(wait=True, cancel_futures=False)
+        logger.info(f"监控线程已停止")
+    
+    def _reload_hot_config(self):
+        # 从 DB 热加载可变配置项,无需重启服务
+        # 只刷新运行时可安全变更的参数,不影响 FFmpeg 进程和流映射
+        if self._config_manager is None:
+            return
+        
+        now = time.time()
+        # 未达到刷新间隔则跳过
+        if now - self._last_config_reload < self._config_reload_interval:
+            return
+        self._last_config_reload = now
+        
+        try:
+            # 从 DB 读取最新配置
+            fresh = self._config_manager.get_full_config()
+            self.config = fresh
+            
+            # 刷新推送配置
+            push_cfg = fresh.get('push_notification', {})
+            self.push_enabled = push_cfg.get('enabled', False)
+            self.alert_enabled = push_cfg.get('alert_enabled', True)
+            self.push_timeout = push_cfg.get('timeout', 30)
+            self.push_retry_count = push_cfg.get('retry_count', 2)
+            raw_urls = push_cfg.get('push_base_urls', [])
+            self.push_base_urls = [
+                {"label": item.get("label", f"target-{i}"), "url": item.get("url", "").rstrip("/")}
+                for i, item in enumerate(raw_urls)
+                if item.get("url")
+            ]
+            
+            # 刷新投票配置
+            voting_cfg = fresh.get('prediction', {}).get('voting', {})
+            self.voting_enabled = voting_cfg.get('enabled', True)
+            self.voting_window_size = voting_cfg.get('window_size', 5)
+            self.voting_threshold = voting_cfg.get('threshold', 3)
+            self.tolerance_ratio = voting_cfg.get('tolerance_ratio', 0.05)
+            
+            # 刷新能量检测配置
+            energy_cfg = fresh.get('prediction', {}).get('energy_detection', {})
+            self.skip_detection_when_stopped = energy_cfg.get('skip_when_stopped', True)
+            
+            # 刷新人体检测配置
+            human_cfg = fresh.get('human_detection', {})
+            new_human_enabled = human_cfg.get('enabled', False)
+            if new_human_enabled != self.human_detection_enabled:
+                logger.info(f"人体检测抑制配置变更: {self.human_detection_enabled} -> {new_human_enabled}")
+                self.human_detection_enabled = new_human_enabled
+            
+            # 刷新远程异响调度配置
+            schedule_cfg = fresh.get('detection_schedule', {})
+            self._detection_schedule_url = schedule_cfg.get('url', '')
+            self._detection_schedule_interval = schedule_cfg.get('poll_interval', 60)
+            self._detection_schedule_timeout = schedule_cfg.get('request_timeout', 10)
+            
+            logger.debug("配置热刷新完成")
+        except Exception as e:
+            logger.error(f"配置热刷新失败: {e}")
+    
+    def _check_detection_schedule(self):
+        # 远程异响检测调度检查
+        # 独立于 DB 配置热刷新,有自己的 60 秒计时器
+        # 条件全部满足才开启 alert_enabled:
+        #   1. model_list 中包含 "异响"
+        #   2. mode_type == 1
+        #   3. start_time <= 当前时间 <= end_time
+        if not self._detection_schedule_url:
+            return
+        
+        now_ts = time.time()
+        if now_ts - self._last_detection_schedule_check < self._detection_schedule_interval:
+            return
+        self._last_detection_schedule_check = now_ts
+        
+        try:
+            resp = requests.get(
+                self._detection_schedule_url,
+                params={'project_id': self.project_id},
+                timeout=self._detection_schedule_timeout
+            )
+            resp.raise_for_status()
+            data = resp.json()
+            
+            model_list = data.get('model_list', [])
+            mode_type = int(data.get('mode_type', 0))
+            start_str = data.get('start_time', '')
+            end_str = data.get('end_time', '')
+            
+            # model_list 中没有"异响",说明本项目不涉及异响检测,保持现状不做干预
+            if '异响' not in model_list:
+                return
+            
+            if not start_str or not end_str:
+                logger.warning("远程调度时间窗口不完整,保持当前状态")
+                return
+            
+            # 解析时间,支持多种常见格式
+            now = datetime.now()
+            start_dt = self._parse_schedule_time(start_str)
+            end_dt = self._parse_schedule_time(end_str)
+            
+            if start_dt is None or end_dt is None:
+                logger.warning(f"时间解析失败: start='{start_str}', end='{end_str}'")
+                return
+            
+            # 核心判定:mode_type==1 且当前时间在窗口内 -> 检测启用
+            # 反之 -> 暂停整个检测流程(不做模型推理,不上报)
+            should_detect = (mode_type == 1) and (start_dt <= now <= end_dt)
+            new_paused = not should_detect
+            
+            if new_paused != self.detection_paused:
+                logger.info(
+                    f"异响检测调度变更: {'暂停' if new_paused else '恢复'} | "
+                    f"mode_type={mode_type} | "
+                    f"窗口=[{start_str}, {end_str}] | 当前={now.strftime('%Y-%m-%d %H:%M:%S')}"
+                )
+                self.detection_paused = new_paused
+        except Exception as e:
+            # 请求失败保持当前状态不变
+            logger.warning(f"远程异响调度请求失败,保持当前状态: {e}")
+    
+    @staticmethod
+    def _parse_schedule_time(time_str):
+        # 尝试多种格式解析时间字符串,全部失败返回 None
+        for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M',
+                    '%Y-%m-%dT%H:%M:%S', '%Y-%m-%dT%H:%M'):
+            try:
+                return datetime.strptime(time_str, fmt)
+            except ValueError:
+                continue
+        return None
+    
+    def _monitor_loop(self):
+        """
+        监控循环(线程主函数)
+        
+        持续检查各设备的音频目录,处理新生成的WAV文件。
+        每分钟汇总一次结果进行上报。
+        """
+        # 确保根目录存在
+        self.audio_dir.mkdir(parents=True, exist_ok=True)
+        
+        while self.running:
+            try:
+                # 热更新:定期从 DB 刷新可变配置
+                self._reload_hot_config()
+                # 远程异响调度:每60秒检查一次远程接口,控制 detection_paused
+                self._check_detection_schedule()
+
+                # 远程调度暂停时,从源头跳过整个扫描+推理+上报流程
+                # 只保留热更新和调度检查,连目录遍历都不做
+                if self.detection_paused:
+                    time.sleep(self.check_interval)
+                    continue
+
+                # 扫描所有设备目录下的current文件夹
+                for device_code in self.device_map.keys():
+                    device_current_dir = self.audio_dir / device_code / "current"
+                    
+                    # 目录不存在则跳过
+                    if not device_current_dir.exists():
+                        continue
+                    
+                    for wav_file in device_current_dir.glob("*.wav"):
+                        # 跳过已处理的文件
+                        if wav_file in self.seen_files:
+                            continue
+
+                        # 检查文件是否完整
+                        try:
+                            stat_info = wav_file.stat()
+                            file_age = time.time() - stat_info.st_mtime
+                            file_size = stat_info.st_size
+
+                            # 文件还在写入中,下次再检查(不加入 seen_files)
+                            if file_age < 12.0:
+                                continue
+
+                            # 文件大小异常处理(60秒 × 16000Hz × 2字节 ≈ 1.9MB)
+                            if file_size < 500_000:
+                                if file_age > 20.0:
+                                    # 确认是损坏文件,删除并标记
+                                    try:
+                                        wav_file.unlink()
+                                        logger.debug(f"删除异常小文件: {wav_file.name} ({file_size / 1000:.1f}KB)")
+                                    except Exception as e:
+                                        logger.error(f"删除文件失败: {wav_file.name} | {e}")
+                                    self.seen_files.add(wav_file)
+                                continue
+
+                            if file_size > 3_000_000:
+                                if file_age > 20.0:
+                                    # 文件过大,直接归档到日期目录(不丢弃,不加 seen)
+                                    logger.warning(f"文件过大,直接归档: {wav_file.name} ({file_size / 1000:.1f}KB)")
+                                    self._move_audio_to_date_dir(wav_file)
+                                    self.seen_files.add(wav_file)
+                                continue
+
+                        except Exception as e:
+                            logger.error(f"文件状态检查失败: {wav_file.name} | {e}")
+                            continue
+
+                        # 处理新文件
+                        self._process_new_file(wav_file)
+
+                        # 标记为已处理
+                        self.seen_files.add(wav_file)
+                
+                # 检查是否需要进行周期性上报
+                self._check_periodic_upload()
+                
+                # 检查聚合窗口是否到期,到期则执行聚合判定并推送
+                if self.alert_aggregator:
+                    self.alert_aggregator.check_and_flush()
+                
+                # 清理过大的已处理文件集合
+                if len(self.seen_files) > 10000:
+                    recent_files = sorted(self.seen_files, 
+                                        key=lambda f: f.stat().st_mtime if f.exists() else 0)[-5000:]
+                    self.seen_files = set(recent_files)
+                
+            except Exception as e:
+                logger.error(f"监控循环错误: {e}")
+            
+            # 等待下一次检查
+            time.sleep(self.check_interval)
+    
+    def _process_new_file(self, wav_file):
+        """
+        处理新的音频文件
+        
+        文件名格式: {project_id}_{device_code}_{时间戳}.wav
+        例如: 92_1#-1_20251218142000.wav
+        
+        流程:
+        1. 加载音频
+        2. 计算能量判断设备状态
+        3. 进行AE异常检测,记录重建误差
+        4. 保存音频数据用于计算频谱图
+        """
+        try:
+            # 从文件名解析device_code
+            # 格式: {project_id}_{device_code}_{时间戳}.wav
+            try:
+                parts = wav_file.stem.split('_')
+                if len(parts) >= 3:
+                    device_code = parts[1]  # 第二部分是device_code
+                else:
+                    device_code = "unknown"
+            except:
+                device_code = "unknown"
+            
+            # ========================================
+            # 泵启停状态检测(基于 PLC 查询)- 优先于模型检查
+            # ----------------------------------------
+            # 逻辑:通过 SCADA API 查询 PLC 点位判断泵是否运行
+            # 作用:
+            #   1. 冷启动模式下也能过滤停机时的音频(保证训练数据质量)
+            #   2. 泵停机时跳过异常检测(避免无意义的检测)
+            #   3. 记录设备状态用于后续上报
+            # 依赖:pump_state_monitor + rtsp_config 中的 pump_status_plc 配置
+            # ========================================
+            device_status = "未知"
+            pump_is_running = True  # 默认认为运行中(PLC 查询失败时的保守策略)
+            
+            # 获取该设备对应的流配置
+            stream_config = self.device_map.get(device_code)
+            
+            if self.pump_state_monitor and stream_config:
+                # 根据设备的 pump_name 找到关联的 PLC 点位配置
+                pump_name = stream_config.pump_name
+                pump_configs = self.pump_status_plc_configs.get(pump_name, [])
+                
+                if pump_configs:
+                    # 遍历所有关联泵,只要有一个运行就认为设备在工作
+                    any_pump_running = False
+                    for pump_cfg in pump_configs:
+                        point = pump_cfg.get("point", "")
+                        name = pump_cfg.get("name", point)
+                        pump_id = point
+                        
+                        # 查询泵状态(带 60 秒缓存,不会频繁请求 SCADA)
+                        is_running, _ = self.pump_state_monitor.update_pump_state(pump_id, point, name)
+                        if is_running:
+                            any_pump_running = True
+                    
+                    pump_is_running = any_pump_running
+                    device_status = "开机" if pump_is_running else "停机"
+                    
+                    # 泵全部停机时跳过(可通过配置禁用此行为)
+                    # 冷启动和正常模式都适用,确保训练数据质量
+                    if self.skip_detection_when_stopped and not pump_is_running:
+                        logger.info(f"泵停机(PLC): {device_code} | 归档到过渡期目录(不用于训练)")
+                        self._move_audio_to_transition_dir(wav_file, "stopped")
+                        return
+                    
+                    # ========================================
+                    # 过渡期检测(泵启停后15分钟内)
+                    # ----------------------------------------
+                    # 目的:过滤启停过程中的不稳定音频
+                    # 确保训练数据只包含稳定运行期的音频
+                    # ========================================
+                    if self.skip_detection_when_stopped:
+                        pump_in_transition, transition_pump_names = \
+                            self.pump_state_monitor.check_pumps_transition(pump_configs)
+                        
+                        if pump_in_transition:
+                            logger.info(f"泵过渡期: {device_code} | 归档到过渡期目录(不用于训练) | "
+                                       f"过渡期泵: {', '.join(transition_pump_names)}")
+                            self._move_audio_to_transition_dir(wav_file, "transition")
+                            return
+            
+            # 获取该设备的预测器(懒加载模型)
+            device_predictor = self.multi_predictor.get_predictor(device_code)
+            if device_predictor is None:
+                logger.info(f"冷启动模式(设备 {device_code} 无模型): 归档 {wav_file.name}")
+                self._move_audio_to_date_dir(wav_file)
+                return
+            # 加载音频
+            try:
+                y, sr = librosa.load(str(wav_file), sr=CFG.SR, mono=True)
+            except Exception as e:
+                logger.error(f"音频加载失败: {wav_file.name} | {e}")
+                return
+            
+            # 记录状态到缓存(用于周期上报)
+            # 注意:泵状态检测已在前面完成,这里只记录状态
+            self.device_cache[device_code]["status"] = device_status
+            
+            # ========================================
+            # 计算重建误差
+            # ========================================
+            error = self._compute_reconstruction_error(wav_file, device_predictor)
+            
+            if error is not None:
+                self.device_cache[device_code]["errors"].append(error)
+            
+            # ========================================
+            # 保存音频数据用于计算频谱图
+            # ========================================
+            self.device_cache[device_code]["audio_data"].append(y)
+            
+            # ========================================
+            # 暂存文件路径,等待周期聚合判定后再归档
+            # ========================================
+            if "pending_files" not in self.device_cache[device_code]:
+                self.device_cache[device_code]["pending_files"] = []
+            self.device_cache[device_code]["pending_files"].append(wav_file)
+            
+            # ========================================
+            # 记录到音频历史缓存(用于异常上下文捕获)
+            # ========================================
+            self.audio_file_history[device_code].append((datetime.now(), wav_file))
+            
+            # 初始化上次上报时间
+            if self.device_cache[device_code]["last_upload"] is None:
+                self.device_cache[device_code]["last_upload"] = datetime.now()
+            
+            # 获取阈值并判断结果
+            threshold = self._get_threshold(device_code)
+            
+            # ========================================
+            # 快速通道:连续多个文件误差极高时快速预警(暂时关闭)
+            # 不走投票窗口,用于捕获突发性严重故障
+            # ========================================
+            # if error is not None and threshold is not None and threshold > 0:
+            #     # 维护快速通道缓冲区
+            #     if "fast_alert_buffer" not in self.device_cache[device_code]:
+            #         self.device_cache[device_code]["fast_alert_buffer"] = []
+            #     
+            #     fast_buf = self.device_cache[device_code]["fast_alert_buffer"]
+            #     # 连续性检查:误差超过 2x 阈值时记录,否则清空缓冲区
+            #     if error > threshold * 2.0:
+            #         fast_buf.append(error)
+            #     else:
+            #         fast_buf.clear()
+            #     
+            #     # 连续 3 个文件(~24秒)都超过 2x 阈值 → 触发快速预警
+            #     FAST_ALERT_CONSECUTIVE = 3
+            #     if len(fast_buf) >= FAST_ALERT_CONSECUTIVE:
+            #         # 检查是否已触发过(避免重复告警)
+            #         last_fast = self.device_cache[device_code].get("last_fast_alert_time")
+            #         now = datetime.now()
+            #         can_fast_alert = (last_fast is None or
+            #                          (now - last_fast).total_seconds() > 300)  # 5分钟冷却
+            #         
+            #         if can_fast_alert:
+            #             # 快速通道同样受抑制逻辑约束
+            #             suppress = False
+            #             # 泵过渡期抑制
+            #             if self.pump_state_monitor and stream_config:
+            #                 pump_name = stream_config.pump_name
+            #                 pump_configs = self.pump_status_plc_configs.get(pump_name, [])
+            #                 if pump_configs:
+            #                     in_transition, _ = self.pump_state_monitor.check_pumps_transition(pump_configs)
+            #                     if in_transition:
+            #                         suppress = True
+            #             # 人体检测抑制
+            #             if self.human_detection_enabled and self.human_reader:
+            #                 if self.human_reader.is_in_cooldown():
+            #                     suppress = True
+            #             # alert_enabled 开关
+            #             if not self.alert_enabled:
+            #                 suppress = True
+            #             
+            #             if not suppress:
+            #                 avg_fast = float(np.mean(fast_buf))
+            #                 logger.warning(
+            #                     f"[!!] 快速通道触发: {device_code} | "
+            #                     f"连续{len(fast_buf)}个文件异常 | "
+            #                     f"平均误差={avg_fast:.6f} 阈值={threshold:.6f}")
+            #                 self.device_cache[device_code]["last_fast_alert_time"] = now
+            #                 fast_buf.clear()
+            #                 # 标记快速预警,在下次周期上报时一并处理
+            #                 self.device_cache[device_code]["fast_alert_pending"] = True
+            
+            if error is not None and threshold is not None:
+                is_anomaly = error > threshold
+                result_tag = "!!" if is_anomaly else "OK"
+                logger.info(f"[{result_tag}] {device_code} | {wav_file.name} | "
+                           f"误差={error:.6f} 阈值={threshold:.6f}")
+            elif error is not None:
+                logger.debug(f"文件预测: {wav_file.name} | 误差={error:.6f} | 阀值=未设置")
+            else:
+                logger.warning(f"预测跳过: {wav_file.name} | 误差计算失败")
+            
+        except Exception as e:
+            logger.error(f"处理文件失败: {wav_file.name} | 错误: {e}")
+    
+    def _compute_reconstruction_error(self, wav_file, device_predictor):
+        """
+        计算单个音频文件的重建误差(Min-Max 标准化)
+        
+        使用8秒窗口、4秒步长切割音频,提取多个patches分别计算误差后取平均值。
+        
+        参数:
+            wav_file: 音频文件路径
+            device_predictor: 设备预测器实例
+        
+        返回:
+            重建误差值(所有patches的平均MSE),失败返回None
+        """
+        try:
+            import torch
+            from predictor.utils import align_to_target
+            
+            # Min-Max 标准化参数
+            global_min = device_predictor.global_min
+            global_max = device_predictor.global_max
+            
+            # 加载音频
+            y, _ = librosa.load(str(wav_file), sr=CFG.SR, mono=True)
+            
+            win_samples = int(CFG.WIN_SEC * CFG.SR)
+            hop_samples = int(CFG.HOP_SEC * CFG.SR)
+            
+            if len(y) < win_samples:
+                logger.warning(f"音频太短,无法提取patches: {wav_file.name}")
+                return None
+            
+            patches = []
+            for start in range(0, len(y) - win_samples + 1, hop_samples):
+                window = y[start:start + win_samples]
+                
+                S = librosa.feature.melspectrogram(
+                    y=window, sr=CFG.SR, n_fft=CFG.N_FFT,
+                    hop_length=CFG.HOP_LENGTH, n_mels=CFG.N_MELS, power=2.0
+                )
+                S_db = librosa.power_to_db(S, ref=np.max)
+                
+                if S_db.shape[1] < CFG.TARGET_FRAMES:
+                    S_db = np.pad(S_db, ((0, 0), (0, CFG.TARGET_FRAMES - S_db.shape[1])))
+                else:
+                    S_db = S_db[:, :CFG.TARGET_FRAMES]
+                
+                # Min-Max 标准化
+                S_norm = (S_db - global_min) / (global_max - global_min + 1e-6)
+                patches.append(S_norm.astype(np.float32))
+            
+            if not patches:
+                logger.warning(f"未能提取任何patches: {wav_file.name}")
+                return None
+            
+            arr = np.stack(patches, 0)
+            arr = np.expand_dims(arr, 1)
+            tensor = torch.from_numpy(arr)
+            
+            torch_device = device_predictor.torch_device
+            tensor = tensor.to(torch_device)
+            
+            with torch.no_grad():
+                recon = device_predictor.model(tensor)
+                recon = align_to_target(recon, tensor)
+                mse_per_patch = torch.mean((recon - tensor) ** 2, dim=[1, 2, 3])
+                mean_mse = torch.mean(mse_per_patch).item()
+            
+            logger.debug(f"重建误差: {wav_file.name} | patches={len(patches)} | MSE={mean_mse:.6f}")
+            return mean_mse
+            
+        except Exception as e:
+            logger.error(f"计算重建误差失败: {wav_file.name} | {e}")
+            return None
+    
+    def _check_periodic_upload(self):
+        """
+        检查是否需要进行周期性上报
+        
+        每分钟汇总一次各设备的检测结果并上报
+        """
+        # 远程调度暂停时跳过上报,与 _monitor_loop 中的推理拦截保持一致
+        if self.detection_paused:
+            return
+        
+        now = datetime.now()
+        
+        for device_code, cache in self.device_cache.items():
+            # 检查上报时间间隔
+            last_upload = cache.get("last_upload")
+            if last_upload is None:
+                continue
+            
+            elapsed = (now - last_upload).total_seconds()
+            
+            # 达到上报周期
+            if elapsed >= self.segment_duration:
+                # 获取该设备的流配置
+                stream_config = self.device_map.get(device_code)
+                
+                # 计算平均重建误差
+                errors = cache.get("errors", [])
+                avg_error = float(np.mean(errors)) if errors else 0.0
+                
+                # 获取阈值
+                threshold = self._get_threshold(device_code)
+                
+                # 判断当前周期是否异常(带容差区间)
+                # 容差区间:避免边界值反复跳变,但灰区内仍与阈值比较
+                if threshold:
+                    upper_bound = threshold * (1 + self.tolerance_ratio)  # 确定异常边界
+                    lower_bound = threshold * (1 - self.tolerance_ratio)  # 确定正常边界
+                    
+                    if avg_error > upper_bound:
+                        # 超过上边界 -> 确定异常
+                        is_current_anomaly = True
+                    elif avg_error < lower_bound:
+                        # 低于下边界 -> 确定正常
+                        is_current_anomaly = False
+                    else:
+                        # 灰区 -> 与阈值比较(避免异常状态延长)
+                        is_current_anomaly = avg_error > threshold
+                    
+                    # 记录本次判定结果
+                    self.last_single_anomaly[device_code] = is_current_anomaly
+                else:
+                    is_current_anomaly = False
+                
+                # ========================================
+                # 滑动窗口投票:5次中有3次异常才判定为异常
+                # ========================================
+                if device_code not in self.detection_history:
+                    self.detection_history[device_code] = []
+                
+                # 记录本次检测结果
+                self.detection_history[device_code].append(is_current_anomaly)
+                
+                # 保持窗口大小
+                if len(self.detection_history[device_code]) > self.voting_window_size:
+                    self.detection_history[device_code].pop(0)
+                
+                # 投票判定最终异常状态
+                if self.voting_enabled and len(self.detection_history[device_code]) >= self.voting_window_size:
+                    anomaly_count = sum(self.detection_history[device_code])
+                    is_anomaly = anomaly_count >= self.voting_threshold
+                    window_info = f"窗口[{anomaly_count}/{self.voting_window_size}]"
+                else:
+                    is_anomaly = is_current_anomaly
+                    window_info = "窗口未满"
+                
+                # ========================================
+                # 状态变更检测
+                # trigger_alert: 仅在 正常(或初始) -> 异常 时为True
+                # 冷却逻辑已移至 AlertAggregator 内部处理
+                # ========================================
+                last_is_anomaly = self.last_report_status.get(device_code)
+                trigger_alert = False
+                
+                if is_anomaly:
+                    # 只有状态变化(正常->异常) 才触发报警流程
+                    # 泵过渡期检查已在 _process_new_file 中完成,过渡期内的文件不会进入 pending
+                    if last_is_anomaly is None or not last_is_anomaly:
+                        if self.alert_enabled:
+                            # 人体检测抑制:任意摄像头检测到人则不报警
+                            if self.human_detection_enabled and self.human_reader:
+                                if self.human_reader.is_in_cooldown():
+                                    trigger_alert = False
+                                    status_info = self.human_reader.get_status_info()
+                                    logger.info(f"人体检测抑制: {device_code} | {status_info},跳过报警")
+                                else:
+                                    trigger_alert = True
+                            else:
+                                trigger_alert = True
+                        else:
+                            trigger_alert = False
+                            logger.debug(f"异常告警已禁用,跳过告警: {device_code}")
+                
+                # 获取运行状态
+                running_status = cache.get("status", "未知")
+                
+                # 获取进水流量
+                inlet_flow = self._get_inlet_flow(stream_config) if stream_config else 0.0
+                
+                # 计算本次频谱图
+                audio_data = cache.get("audio_data", [])
+                freq_db = self._compute_frequency_spectrum(audio_data)
+                
+                # 保存到频谱图历史(只保存dB值)
+                if self.freq_history_enabled and freq_db:
+                    self.freq_history[device_code].append((now, freq_db))
+                    # 清理过期历史
+                    cutoff = now - timedelta(minutes=self.freq_history_minutes)
+                    self.freq_history[device_code] = [
+                        (t, d) for t, d in self.freq_history[device_code]
+                        if t > cutoff
+                    ]
+                
+                # 计算历史频谱图平均值(normal_frequency_middle)
+                freq_middle_db = self._compute_frequency_middle(device_code)
+                
+                # ========================================
+                # 异常分类:只在状态从正常变为异常时进行分类
+                # 持续异常期间沿用上次分类结果,保持一致性
+                # ========================================
+                anomaly_type_code = 6  # 默认:未分类异常
+                type_name = "未分类异常"
+                
+                if is_anomaly and audio_data:
+                    # 检查是否是新的异常(从正常变为异常)
+                    is_new_anomaly = (last_is_anomaly is None or not last_is_anomaly)
+                    
+                    if is_new_anomaly:
+                        # 新异常:进行分类并锁定结果
+                        try:
+                            from core.anomaly_classifier import classify_anomaly
+                            if len(audio_data) > 0:
+                                y = audio_data[-1] if isinstance(audio_data[-1], np.ndarray) else np.array(audio_data[-1])
+                                anomaly_type_code, type_name, confidence = classify_anomaly(y, sr=16000)
+                                # 锁定分类结果
+                                self.locked_anomaly_type[device_code] = (anomaly_type_code, type_name)
+                                logger.info(f"异常分类(新异常): {type_name} (code={anomaly_type_code}, 置信度={confidence:.2f})")
+                        except Exception as e:
+                            logger.warning(f"异常分类失败: {e}")
+                    else:
+                        # 持续异常:沿用锁定的分类结果
+                        if device_code in self.locked_anomaly_type:
+                            anomaly_type_code, type_name = self.locked_anomaly_type[device_code]
+                            logger.debug(f"异常分类(沿用): {type_name} (code={anomaly_type_code})")
+                else:
+                    # 状态正常时清除锁定的分类结果
+                    if device_code in self.locked_anomaly_type:
+                        del self.locked_anomaly_type[device_code]
+                
+                # 上报逻辑
+                if self.push_enabled and stream_config:
+                    # 预读异常音频base64:在文件被归档/清空之前立即读取
+                    # 解决竞态问题:异步推送或聚合器延迟推送时文件可能已被移走
+                    pre_read_wav_b64 = ""
+                    if trigger_alert:
+                        try:
+                            current_pending = cache.get("pending_files", [])
+                            if current_pending and current_pending[0].exists():
+                                with open(current_pending[0], "rb") as f:
+                                    pre_read_wav_b64 = base64.b64encode(f.read()).decode('utf-8')
+                                logger.debug(f"预读异常音频成功: {current_pending[0].name} | size={len(pre_read_wav_b64)}")
+                        except Exception as e:
+                            logger.warning(f"预读异常音频失败: {e}")
+
+                    if trigger_alert and self.alert_aggregator:
+                        # 报警走聚合器:跨设备聚合判定 + 分类型冷却
+                        # 聚合器会在窗口到期后决定是否真正推送
+                        self.alert_aggregator.submit_alert(
+                            plant_name=stream_config.plant_name,
+                            device_code=device_code,
+                            anomaly_type_code=anomaly_type_code,
+                            push_kwargs=dict(
+                                stream_config=stream_config,
+                                device_code=device_code,
+                                is_anomaly=is_anomaly,
+                                trigger_alert=True,
+                                abnormal_score=avg_error,
+                                score_threshold=threshold,
+                                running_status=running_status,
+                                inlet_flow=inlet_flow,
+                                freq_db=freq_db,
+                                freq_middle_db=freq_middle_db,
+                                anomaly_type_code=anomaly_type_code,
+                                abnormal_wav_b64=pre_read_wav_b64
+                            )
+                        )
+                    else:
+                        # 非报警(心跳)或聚合器不可用 -> 提交到线程池异步推送
+                        self._push_executor.submit(
+                            self._push_detection_result,
+                            stream_config=stream_config,
+                            device_code=device_code,
+                            is_anomaly=is_anomaly,
+                            trigger_alert=trigger_alert,
+                            abnormal_score=avg_error,
+                            score_threshold=threshold,
+                            running_status=running_status,
+                            inlet_flow=inlet_flow,
+                            freq_db=freq_db,
+                            freq_middle_db=freq_middle_db,
+                            anomaly_type_code=anomaly_type_code,
+                            abnormal_wav_b64=pre_read_wav_b64
+                        )
+                    
+                    # 更新上一次状态
+                    self.last_report_status[device_code] = is_anomaly
+                
+                # 日志记录
+                thr_str = f"{threshold:.6f}" if threshold else "未设置"
+                # 报警去向说明
+                if trigger_alert and self.alert_aggregator:
+                    alert_dest = "-> 聚合器"
+                elif trigger_alert:
+                    alert_dest = "-> 直接推送"
+                else:
+                    alert_dest = ""
+                
+                # 使用设备名作为标识,增加视觉分隔
+                cam_label = ""
+                if stream_config:
+                    cam_label = f"({stream_config.camera_name})"
+                
+                result_emoji = "!!" if is_anomaly else "OK"
+                alert_str = f"报警=是 {alert_dest}" if trigger_alert else "报警=否"
+                
+                logger.info(
+                    f"[{result_emoji}] {device_code}{cam_label} | "
+                    f"误差={avg_error:.6f} 阈值={thr_str} | "
+                    f"{window_info} | {running_status} | "
+                    f"{'异常' if is_anomaly else '正常'} | {alert_str}"
+                )
+                
+                # ========================================
+                # 根据投票结果归档文件
+                # 用投票后的 is_anomaly 决定归档,避免单次波动误归
+                # ========================================
+                pending_files = cache.get("pending_files", [])
+
+                # 检查是否是新的异常(从正常变为异常)
+                is_new_anomaly = is_anomaly and (last_is_anomaly is None or not last_is_anomaly)
+
+                # 更新异常上下文捕获状态机
+                self._update_anomaly_capture_state(device_code, is_anomaly, is_new_anomaly,
+                                                   avg_error, threshold, now, pending_files)
+
+                if pending_files:
+                    if is_anomaly:
+                        # 异常文件由上下文捕获状态机统一管理
+                        # 状态机会在收集完前+后文件后一次性保存到 anomaly_detected
+                        pass
+                    else:
+                        # 正常 -> 移到日期目录归档
+                        for f in pending_files:
+                            self._move_audio_to_date_dir(f)
+                        # 状态恢复正常时,清除保存冷却时间
+                        if device_code in self.last_anomaly_save_time:
+                            del self.last_anomaly_save_time[device_code]
+                
+                # 重置缓存
+                cache["errors"] = []
+                cache["audio_data"] = []
+                cache["pending_files"] = []
+                cache["last_upload"] = now
+                logger.info("─" * 60)
+    
+    def _update_anomaly_capture_state(self, device_code, is_anomaly,
+                                       is_new_anomaly, avg_error,
+                                       threshold, now, pending_files):
+        """
+        异常上下文捕获状态机
+
+        状态流转:
+        1. 无状态 + 新异常 -> 触发捕获,回溯 audio_file_history 获取前置文件
+        2. 已触发 + 异常持续 -> 持续收集异常文件
+        3. 已触发 + 异常结束 -> 开始收集后续文件(post阶段)
+        4. post阶段 + 时间到 -> 保存所有文件到 anomaly_detected
+
+        所有异常文件(从触发到结束)统一保存到 anomaly_detected 目录,
+        包含 metadata.json 记录误差/阈值等信息。
+        """
+        state = self.anomaly_capture_state.get(device_code)
+
+        if is_new_anomaly and state is None:
+            # 新异常触发:回溯获取前置文件
+            anomaly_cutoff = now - timedelta(minutes=1)
+            pre_cutoff = now - timedelta(minutes=self.context_pre_minutes + 1)
+
+            pre_files = []
+            anomaly_files = []
+
+            for ts, fpath in self.audio_file_history[device_code]:
+                if not fpath.exists():
+                    continue
+                if ts >= anomaly_cutoff:
+                    anomaly_files.append(fpath)
+                elif ts >= pre_cutoff:
+                    pre_files.append(fpath)
+
+            # 兜底:如果回溯没找到,把当前 pending 加入
+            if not anomaly_files and pending_files:
+                anomaly_files = [f for f in pending_files if f.exists()]
+
+            self.anomaly_capture_state[device_code] = {
+                "trigger_time": now,
+                "avg_error": avg_error,
+                "threshold": threshold,
+                "pre_files": pre_files,
+                "anomaly_files": anomaly_files,
+                "post_files": [],
+                "anomaly_ended": False,
+                "post_start_time": None
+            }
+
+            logger.info(f"异常上下文捕获已触发: {device_code} | "
+                       f"前置={len(pre_files)}个 | 异常={len(anomaly_files)}个")
+
+        elif state is not None:
+            if is_anomaly and not state["anomaly_ended"]:
+                # 异常持续中:继续收集异常文件
+                for f in pending_files:
+                    if f.exists():
+                        state["anomaly_files"].append(f)
+            else:
+                # 异常结束(或之前已结束,在收集后续文件)
+                if not state["anomaly_ended"]:
+                    state["anomaly_ended"] = True
+                    state["post_start_time"] = now
+                    logger.info(f"异常结束,开始收集后续文件: {device_code} | "
+                               f"异常文件共{len(state['anomaly_files'])}个 | "
+                               f"等待{self.context_post_minutes}分钟")
+
+                # 收集后续文件
+                for f in pending_files:
+                    if f.exists():
+                        state["post_files"].append(f)
+
+                # 检查 post 阶段是否到时间
+                elapsed_post = (now - state["post_start_time"]).total_seconds() / 60
+                if elapsed_post >= self.context_post_minutes:
+                    self._save_anomaly_context(device_code, state)
+                    del self.anomaly_capture_state[device_code]
+                    self.last_anomaly_save_time[device_code] = now
+    
+    def _save_anomaly_context(self, device_code: str, state: dict):
+        """
+        保存异常上下文文件到独立目录
+        
+        目录结构:
+        data/anomaly_detected/{device_code}/{异常文件名(不含扩展名)}/
+        ├── 92_1#-1_20260130140313.wav  (保持原文件名)
+        ├── ...
+        └── metadata.json
+        
+        metadata.json 字段说明:
+        - before_trigger: 触发前 1~(N+1) 分钟的文件(正常时期,用于对比)
+        - at_trigger: 触发时刻前 1 分钟内的文件(异常开始出现的时期)
+        - after_trigger: 触发后 N 分钟的文件(不管异常是否恢复)
+        
+        参数:
+            device_code: 设备编号
+            state: 捕获状态字典
+        """
+        import shutil
+        import json
+        
+        try:
+            # 获取第一个异常文件名作为文件夹名
+            anomaly_files = state.get("anomaly_files", [])
+            if not anomaly_files:
+                logger.warning(f"无异常文件,跳过保存: {device_code}")
+                return
+            
+            # 用第一个异常文件名(不含扩展名)作为文件夹名
+            first_anomaly = anomaly_files[0]
+            folder_name = first_anomaly.stem  # 如 92_1#-1_20260130140313
+            save_dir = self.save_anomaly_dir / device_code / folder_name
+            save_dir.mkdir(parents=True, exist_ok=True)
+            
+            # 收集所有文件名(使用新命名)
+            all_files = {"before_trigger": [], "at_trigger": [], "after_trigger": []}
+            
+            # 移动前置文件(来自日期目录,移动后原位置不再保留)
+            for fpath in state.get("pre_files", []):
+                if fpath.exists():
+                    dest = save_dir / fpath.name
+                    shutil.move(str(fpath), str(dest))  # 移动而非复制,避免重复
+                    all_files["before_trigger"].append(fpath.name)
+            
+            # 移动触发时刻文件(保持原名)
+            for fpath in anomaly_files:
+                if fpath.exists():
+                    dest = save_dir / fpath.name
+                    shutil.move(str(fpath), str(dest))
+                    all_files["at_trigger"].append(fpath.name)
+            
+            # 移动后续文件(保持原名)
+            for fpath in state.get("post_files", []):
+                if fpath.exists():
+                    dest = save_dir / fpath.name
+                    shutil.move(str(fpath), str(dest))
+                    all_files["after_trigger"].append(fpath.name)
+            
+            # 生成精简的元数据文件
+            trigger_time = state.get("trigger_time")
+            metadata = {
+                "device_code": device_code,
+                "trigger_time": trigger_time.strftime("%Y-%m-%d %H:%M:%S") if trigger_time else None,
+                "avg_error": round(state.get("avg_error", 0.0), 6),
+                "threshold": round(state.get("threshold", 0.0), 6),
+                "files": all_files
+            }
+            
+            metadata_path = save_dir / "metadata.json"
+            with open(metadata_path, 'w', encoding='utf-8') as f:
+                json.dump(metadata, f, ensure_ascii=False, indent=2)
+            
+            total = sum(len(v) for v in all_files.values())
+            logger.warning(f"异常上下文已保存: {device_code}/{folder_name} | "
+                          f"共{total}个文件 (前{len(all_files['before_trigger'])}+异常{len(all_files['at_trigger'])}+后{len(all_files['after_trigger'])})")
+            
+        except Exception as e:
+            logger.error(f"保存异常上下文失败: {device_code} | {e}")
+    
+    def _compute_frequency_middle(self, device_code):
+        """
+        计算历史频谱图平均值(normal_frequency_middle)
+        
+        参数:
+            device_code: 设备编号
+        
+        返回:
+            dB值列表的平均值
+        """
+        history = self.freq_history.get(device_code, [])
+        if not history or len(history) < 2:
+            return []
+        
+        try:
+            # 收集所有历史dB值(只保留长度一致的)
+            all_db = []
+            ref_len = len(history[0][1])  # 使用第一条记录的长度作为参考
+            
+            for _, db in history:
+                if len(db) == ref_len:
+                    all_db.append(db)
+            
+            if not all_db:
+                return []
+            
+            # 计算各频率点的平均dB
+            avg_db = np.mean(all_db, axis=0).tolist()
+            return avg_db
+            
+        except Exception as e:
+            logger.error(f"计算频谱图历史平均失败: {e}")
+            return []
+    
+    def _get_threshold(self, device_code):
+        """
+        获取指定设备的阈值
+        
+        优先级:
+        1. 从 multi_predictor 获取设备对应模型的阈值
+        2. 使用配置文件中的默认阈值
+        3. 返回0.0(不进行异常判定)
+        
+        参数:
+            device_code: 设备编号(如 "LT-1")
+        
+        返回:
+            阈值,未找到返回默认值或0.0
+        """
+        # 方式1:从 multi_predictor 获取设备阈值
+        thr = self.multi_predictor.get_threshold(device_code)
+        if thr is not None:
+            logger.debug(f"阈值来源: {device_code} -> 设备模型阈值 = {thr:.6f}")
+            return thr
+        
+        # 方式2:使用配置中的默认阈值
+        default_threshold = self.config.get('prediction', {}).get('default_threshold', 0.0)
+        
+        if default_threshold > 0:
+            logger.debug(f"阈值来源: {device_code} -> 配置默认值 = {default_threshold:.6f}")
+            return default_threshold
+        
+        # 首次找不到阈值时记录警告
+        if device_code not in getattr(self, '_threshold_warned', set()):
+            if not hasattr(self, '_threshold_warned'):
+                self._threshold_warned = set()
+            self._threshold_warned.add(device_code)
+            logger.warning(f"未找到阈值: {device_code},将跳过异常判定")
+        
+        return 0.0
+    
+    def _get_inlet_flow(self, stream_config: RTSPStreamConfig) -> float:
+        """
+        获取进水流量(使用实时数据接口)
+        
+        使用 current-data 接口直接获取最新一条数据
+        
+        参数:
+            stream_config: 流配置
+        
+        返回:
+            进水流量值,失败返回0.0
+        """
+        if not self.scada_enabled or not stream_config:
+            logger.debug(f"流量跳过: scada_enabled={self.scada_enabled}, stream_config={stream_config}")
+            return 0.0
+        
+        # 获取PLC数据点位
+        plc_address = stream_config.get_flow_plc_address()
+        if not plc_address:
+            logger.debug(f"流量跳过(无PLC地址): {stream_config.device_code} | pump_name='{stream_config.pump_name}'")
+            return 0.0
+        
+        # 使用水厂配置的 project_id
+        project_id = stream_config.project_id
+        
+        try:
+            # 当前时间戳(毫秒)
+            now_ms = int(datetime.now().timestamp() * 1000)
+            
+            # 请求参数
+            params = {"time": now_ms}
+            
+            # 请求体:使用实时数据接口格式
+            request_body = [
+                {
+                    "deviceId": "1",
+                    "deviceItems": plc_address,
+                    "deviceName": f"流量_{stream_config.pump_name}",
+                    "project_id": project_id
+                }
+            ]
+            
+            # 请求头
+            headers = {
+                "Content-Type": "application/json",
+                "JWT-TOKEN": self.scada_jwt
+            }
+            
+            logger.debug(f"流量请求: {stream_config.device_code} | project_id={project_id} | plc={plc_address}")
+            
+            # 发送 POST 请求到实时接口
+            response = requests.post(
+                self.scada_realtime_url,
+                params=params,
+                json=request_body,
+                headers=headers,
+                timeout=self.scada_timeout
+            )
+            
+            if response.status_code == 200:
+                data = response.json()
+                if data.get("code") == 200:
+                    if data.get("data"):
+                        # 获取第一条数据(实时接口只返回最新一条)
+                        latest = data["data"][0]
+                        if "val" in latest:
+                            flow = float(latest["val"])
+                            logger.debug(f"流量获取成功: {stream_config.device_code} | 流量={flow}")
+                            return flow
+                        else:
+                            logger.debug(f"流量数据无val字段: {stream_config.device_code}")
+                    else:
+                        # API正常返回但无数据
+                        logger.debug(f"流量查询无数据: {stream_config.device_code}")
+                else:
+                    logger.warning(f"流量API返回异常: {stream_config.device_code} | code={data.get('code')} | msg={data.get('msg')}")
+            else:
+                logger.warning(f"流量HTTP错误: {stream_config.device_code} | status={response.status_code}")
+            
+        except Exception as e:
+            logger.warning(f"流量获取异常: {stream_config.device_code} | {e}")
+        
+        return 0.0
+    
+    def _compute_frequency_spectrum(self, audio_data):
+        """
+        计算频谱图数据
+        
+        将1分钟内的多个8秒音频片段合并,计算整体FFT
+        
+        参数:
+            audio_data: 音频数据列表
+        
+        返回:
+            dB值列表(0-8000Hz均匀分布,共400个点)
+        """
+        if not audio_data:
+            return []
+        
+        try:
+            # 合并所有音频片段
+            combined = np.concatenate(audio_data)
+            
+            # 计算FFT
+            n = len(combined)
+            fft_result = np.fft.rfft(combined)
+            freqs = np.fft.rfftfreq(n, 1.0 / CFG.SR)
+            
+            # 计算幅度(转换为dB)
+            magnitude = np.abs(fft_result)
+            # 避免log(0)
+            magnitude = np.maximum(magnitude, 1e-10)
+            db = 20 * np.log10(magnitude)
+            
+            # 降采样到400个点(0-8000Hz均匀分布)
+            max_freq = 8000
+            num_points = 400
+            
+            db_list = []
+            
+            for i in range(num_points):
+                # 计算目标频率(0到8000Hz均匀分布)
+                target_freq = (i / (num_points - 1)) * max_freq
+                # 找到最接近的频率索引
+                idx = np.argmin(np.abs(freqs - target_freq))
+                if idx < len(freqs):
+                    db_list.append(float(db[idx]))
+            
+            return db_list
+            
+        except Exception as e:
+            logger.error(f"计算频谱图失败: {e}")
+            return []
+    
+    def _push_detection_result(self, stream_config, device_code,
+                               is_anomaly, trigger_alert, abnormal_score, score_threshold,
+                               running_status, inlet_flow,
+                               freq_db, freq_middle_db=None,
+                               anomaly_type_code=6,
+                               abnormal_wav_b64=""):
+        """
+        推送检测结果到远程服务器
+        
+        上报格式符合用户要求:
+        - 包含频谱图数据(this_frequency + normal_frequency_middle)
+        - 包含启停状态和进水流量
+        - sound_detection.status 和 abnormalwav 仅在 trigger_alert=True 时上报异常(0)和base64,否则为正常(1)和空
+        
+        参数:
+            stream_config: 流配置
+            device_code: 设备编号
+            is_anomaly: 实际检测是否异常
+            trigger_alert: 是否触发报警(仅在新异常产生时为True)
+            abnormal_score: 平均重建误差
+            score_threshold: 阈值
+            running_status: 启停状态
+            inlet_flow: 进水流量
+            freq_db: 本次频谱图dB值列表
+            freq_middle_db: 历史平均频谱图dB值列表
+            abnormal_wav_b64: 预读的异常音频base64编码(在文件归档前读取,避免竞态)
+        """
+        try:
+            import time as time_module
+            
+            # 获取设备名称
+            camera_name = stream_config.camera_name
+            
+            # 从 push_base_urls 列表构建推送目标(每个 base_url + /{project_id})
+            if not self.push_base_urls:
+                logger.warning(f"未配置push_base_urls: {device_code}")
+                return
+            
+            # 获取该设备对应的 project_id,用于拼接最终推送URL
+            project_id = stream_config.project_id
+            
+            # 构建推送消息
+            request_time = int(time_module.time() * 1000)
+            
+            # 异常音频已通过参数 abnormal_wav_b64 传入(调用方在文件归档前预读)
+            # 如果调用方未提供,记录警告以便排查
+            if trigger_alert and not abnormal_wav_b64:
+                logger.warning(f"报警推送但无异常音频数据: {device_code}")
+
+            # 决定 sound_detection.status
+            # 只有在 trigger_alert=True 时才报 0 (异常)
+            # 其他情况(正常、持续异常)都报 1 (正常/空) -- 根据用户要求: "正常情况下 ... 是空 就行"
+            report_status = 0 if trigger_alert else 1
+            
+            payload = {
+                "message": {
+                    # 通道信息
+                    "channelInfo": {"name": camera_name},
+                    # 请求时间戳
+                    "requestTime": request_time,
+                    # 分类信息
+                    "classification": {
+                        "level_one": 2,                   # 音频检测大类
+                        "level_two": anomaly_type_code    # 异常类型小类(6=未分类, 7=轴承, 8=气蚀, 9=松动, 10=叶轮, 11=阀件)
+                    },
+                    # 技能信息
+                    "skillInfo": {"name": "异响检测"},
+                    # 声音检测数据
+                    "sound_detection": {
+                        # 异常音频
+                        "abnormalwav": abnormal_wav_b64,
+                        # 状态:0=异常(仅新异常), 1=正常(或持续异常)
+                        "status": report_status,
+                        # 设备状态信息
+                        "condition": {
+                            "running_status": "运行中" if running_status == "开机" else "停机中",
+                            "inlet_flow": inlet_flow
+                        },
+                        # 得分信息
+                        "score": {
+                            "abnormal_score": abnormal_score,    # 当前1分钟平均重构误差
+                            "score_threshold": score_threshold   # 该设备异常阀值
+                        },
+                        # 频谱图数据
+                        "frequency": {
+                            "this_frequency": freq_db,                    # 当前1分钟的频谱图
+                            "normal_frequency_middle": freq_middle_db or [],  # 过去10分钟的频谱图平均
+                        }
+                    }
+                }
+            }
+            
+            # 遍历所有推送基地址,逐个拼接 project_id 发送(各目标互不影响)
+            push_targets = [
+                (f"{item['url']}/{project_id}", item['label'])
+                for item in self.push_base_urls
+            ]
+            
+            # 逐个目标推送(各目标互不影响)
+            for target_url, target_label in push_targets:
+                self._send_to_target(
+                    target_url, target_label, payload,
+                    device_code, camera_name, trigger_alert, abnormal_score
+                )
+            
+        except Exception as e:
+            logger.error(f"推送通知异常: {e}")
+    
+    def _send_to_target(self, target_url, target_label, payload,
+                        device_code, camera_name, trigger_alert, abnormal_score):
+        # 向单个推送目标发送数据(含重试逻辑)
+        # 各目标独立调用,某个目标失败不影响其他目标
+        import time as time_module
+        
+        push_success = False
+        for attempt in range(self.push_retry_count + 1):
+            try:
+                response = requests.post(
+                    target_url,
+                    json=payload,
+                    timeout=self.push_timeout,
+                    headers={"Content-Type": "application/json"}
+                )
+                
+                if response.status_code == 200:
+                    alert_tag = "报警" if trigger_alert else "心跳"
+                    logger.info(
+                        f"    [{alert_tag}][{target_label}] {device_code}({camera_name}) | "
+                        f"误差={abnormal_score:.6f}"
+                    )
+                    push_success = True
+                    break
+                else:
+                    logger.warning(
+                        f"推送失败[{target_label}]: {device_code} | URL={target_url} | "
+                        f"状态码={response.status_code} | 内容={response.text[:100]}"
+                    )
+                    
+            except requests.exceptions.Timeout:
+                logger.warning(f"推送超时[{target_label}]: {device_code} | URL={target_url} | 尝试 {attempt + 1}/{self.push_retry_count + 1}")
+            except requests.exceptions.RequestException as e:
+                logger.warning(f"推送异常[{target_label}]: {device_code} | URL={target_url} | {e}")
+            
+            # 重试间隔
+            if attempt < self.push_retry_count:
+                time_module.sleep(1)
+        
+        if not push_success:
+            logger.error(f"推送失败[{target_label}]: {device_code} | URL={target_url} | 已达最大重试次数")
+    
+    def _move_audio_to_date_dir(self, wav_file):
+        """
+        将音频移动到日期目录归档
+        
+        目录结构:
+        deploy_pickup/data/{device_code}/{日期}/{文件名}
+        
+        参数:
+            wav_file: 音频文件路径
+        """
+        try:
+            import shutil
+            
+            # 从文件名提取device_code和日期
+            # 格式: 92_1#-1_20251218142000.wav
+            match = re.match(r'\d+_(.+)_(\d{8})\d{6}\.wav', wav_file.name)
+            if not match:
+                logger.warning(f"无法从文件名提取信息: {wav_file.name}")
+                return
+            
+            device_code = match.group(1)  # 如 1#-1
+            date_str = match.group(2)     # YYYYMMDD
+            
+            # 构建目标目录: data/{device_code}/{日期}/
+            date_dir = self.audio_dir / device_code / date_str
+            date_dir.mkdir(parents=True, exist_ok=True)
+            
+            # 移动文件
+            dest_file = date_dir / wav_file.name
+            shutil.move(str(wav_file), str(dest_file))
+            logger.debug(f"音频已归档: {device_code}/{date_str}/{wav_file.name}")
+            
+        except Exception as e:
+            logger.error(f"移动音频失败: {wav_file.name} | 错误: {e}")
+    
+    def _move_audio_to_transition_dir(self, wav_file, reason):
+        """
+        将泵停机/过渡期的音频移动到过渡期目录
+        
+        目录结构:
+        deploy_pickup/data/{device_code}/pump_transition/{文件名}
+        
+        这些音频不会被用于模型训练,但保留用于分析调试
+        
+        参数:
+            wav_file: 音频文件路径
+            reason: 原因标识(stopped=停机, transition=过渡期)
+        """
+        try:
+            import shutil
+            
+            # 从文件名提取device_code
+            # 格式: 92_1#-1_20251218142000.wav
+            match = re.match(r'\d+_(.+)_\d{14}\.wav', wav_file.name)
+            if not match:
+                logger.warning(f"无法从文件名提取信息: {wav_file.name}")
+                return
+            
+            device_code = match.group(1)  # 如 1#-1
+            
+            # 构建目标目录: data/{device_code}/pump_transition/
+            transition_dir = self.audio_dir / device_code / "pump_transition"
+            transition_dir.mkdir(parents=True, exist_ok=True)
+            
+            # 移动文件
+            dest_file = transition_dir / wav_file.name
+            shutil.move(str(wav_file), str(dest_file))
+            logger.debug(f"过渡期音频已归档: {device_code}/pump_transition/{wav_file.name} ({reason})")
+            
+        except Exception as e:
+            logger.error(f"移动过渡期音频失败: {wav_file.name} | 错误: {e}")
+    
+    def _cleanup_old_files(self, days: int = 7):
+        """
+        清理超过指定天数的正常音频文件
+        
+        清理规则:
+        - 只清理日期归档目录(data/{device_code}/{日期}/)
+        - 保留current目录和anomaly_detected目录
+        - 超过days天的文件删除
+        
+        参数:
+            days: 保留天数,默认7天
+        """
+        try:
+            import shutil
+            from datetime import datetime, timedelta
+            
+            # 计算截止日期
+            cutoff_date = datetime.now() - timedelta(days=days)
+            cutoff_str = cutoff_date.strftime("%Y%m%d")
+            
+            deleted_count = 0
+            
+            # 遍历每个设备目录
+            for device_dir in self.audio_dir.iterdir():
+                if not device_dir.is_dir():
+                    continue
+                
+                # 检查是否是日期目录(跳过current和其他特殊目录)
+                for subdir in device_dir.iterdir():
+                    if not subdir.is_dir():
+                        continue
+                    
+                    # 跳过current目录
+                    if subdir.name == "current":
+                        continue
+                    
+                    # 检查是否为日期目录(YYYYMMDD格式)
+                    if not re.match(r'^\d{8}$', subdir.name):
+                        continue
+                    
+                    # 如果日期早于截止日期,删除整个目录
+                    if subdir.name < cutoff_str:
+                        try:
+                            shutil.rmtree(subdir)
+                            deleted_count += 1
+                            logger.debug(f"清理过期目录: {device_dir.name}/{subdir.name}")
+                        except Exception as e:
+                            logger.error(f"删除目录失败: {subdir} | {e}")
+            
+            if deleted_count > 0:
+                logger.info(f"清理完成: 共删除{deleted_count}个过期目录")
+                
+        except Exception as e:
+            logger.error(f"清理过期文件失败: {e}")
+
+
+class PickupMonitoringSystem:
+    """
+    拾音器监控系统
+    
+    管理FFmpeg进程和监控线程
+    """
+    
+    def __init__(self, db_path=None, yaml_config=None):
+        """
+        初始化监控系统
+
+        参数:
+            db_path: SQLite 数据库路径(为 None 时使用默认路径 config/pickup_config.db)
+            yaml_config: 若不为 None,则直接使用该 dict 作为配置(跳过 DB)
+        """
+        self.db_path = db_path
+        self.config_manager = None
+
+        if yaml_config is not None:
+            # 直接使用传入的 YAML 配置字典
+            self.config = yaml_config
+            print(f"配置源: YAML (外部传入)")
+        else:
+            # 从 SQLite DB 加载
+            actual_db = Path(db_path) if db_path else get_db_path()
+            if not actual_db.exists():
+                raise FileNotFoundError(
+                    f"\u914d\u7f6e\u6570\u636e\u5e93\u4e0d\u5b58\u5728: {actual_db}\n"
+                    f"\u8bf7\u5148\u8fd0\u884c\u8fc1\u79fb\u811a\u672c: python tool/migrate_yaml_to_db.py"
+                )
+            self.config_manager = ConfigManager(str(actual_db))
+            print(f"配置源: SQLite ({actual_db})")
+            self.config = self._load_config()
+        
+        # 冷启动模式标记
+        self.cold_start_mode = False
+        
+        # 初始化多模型预测器(支持每个设备独立模型)
+        self.multi_predictor = MultiModelPredictor()
+        self.predictor = None  # 兼容性保留,已废弃
+        
+        # 从配置中注册所有设备的模型目录映射
+        print("\n正在初始化多模型预测器...")
+        for plant in self.config.get('plants', []):
+            if not plant.get('enabled', False):
+                continue
+            for stream in plant.get('rtsp_streams', []):
+                device_code = stream.get('device_code', '')
+                model_subdir = stream.get('model_subdir', device_code)
+                if device_code and model_subdir:
+                    self.multi_predictor.register_device(device_code, model_subdir)
+                    print(f"  注册设备: {device_code} -> models/{model_subdir}/")
+        
+        print(f"已注册 {len(self.multi_predictor.registered_devices)} 个设备模型映射")
+        
+        # 进程和监控器列表
+        self.ffmpeg_processes = []
+        self.monitors = []
+        
+        # 信号处理
+        signal.signal(signal.SIGINT, self._signal_handler)
+        signal.signal(signal.SIGTERM, self._signal_handler)
+    
+    def _load_config(self):
+        """
+        从 SQLite DB 加载配置
+        
+        返回:
+            Dict: 配置字典
+        """
+        config = self.config_manager.get_full_config()
+        logger.info("配置已从 SQLite 加载")
+        return config
+    
+    def _parse_rtsp_streams(self):
+        """
+        解析配置文件中的RTSP流信息
+        
+        返回:
+            List[RTSPStreamConfig]: RTSP流配置列表
+        """
+        streams = []
+        
+        plants = self.config.get('plants', [])
+        if not plants:
+            raise ValueError("配置文件中未找到水厂配置")
+        
+        for plant in plants:
+            plant_name = plant.get('name')
+            if not plant_name:
+                print("警告: 跳过未命名的区域配置")
+                continue
+            
+            # 检查是否启用该水厂(默认启用以兼容旧配置)
+            if not plant.get('enabled', True):
+                logger.debug(f"跳过禁用的水厂: {plant_name}")
+                continue
+            
+            # 获取流量PLC配置
+            flow_plc = plant.get('flow_plc', {})
+            
+            # 获取该水厂的project_id(每个plant有自己的project_id)
+            project_id = plant.get('project_id', 92)
+            logger.info(f"加载区域配置: {plant_name} | project_id={project_id}")
+            
+            rtsp_streams = plant.get('rtsp_streams', [])
+            for stream in rtsp_streams:
+                url = stream.get('url')
+                channel = stream.get('channel')
+                camera_name = stream.get('name', '')
+                device_code = stream.get('device_code', '')
+                pump_name = stream.get('pump_name', '')
+                
+                if not url or channel is None:
+                    print(f"警告: 跳过不完整的RTSP流配置 (区域: {plant_name})")
+                    continue
+                
+                streams.append(RTSPStreamConfig(
+                    plant_name=plant_name,
+                    rtsp_url=url,
+                    channel=channel,
+                    camera_name=camera_name,
+                    device_code=device_code,
+                    pump_name=pump_name,
+                    flow_plc=flow_plc,
+                    project_id=project_id
+                ))
+        
+        return streams
+    
+    def start(self):
+        """
+        启动监控系统
+        """
+        print("=" * 70)
+        print("拾音器异响检测系统")
+        print("=" * 70)
+        
+        # 解析流配置
+        streams = self._parse_rtsp_streams()
+        print(f"\n共配置 {len(streams)} 个拾音设备:")
+        for stream in streams:
+            print(f"  - {stream.device_code} | {stream.camera_name}")
+        
+        # 启动FFmpeg进程
+        print("\n启动FFmpeg进程...")
+        for stream in streams:
+            ffmpeg = FFmpegProcess(stream, CFG.AUDIO_DIR, self.config)
+            if ffmpeg.start():
+                self.ffmpeg_processes.append(ffmpeg)
+            else:
+                print(f"警告: FFmpeg启动失败,跳过该流: {stream}")
+        
+        if not self.ffmpeg_processes:
+            print("\n错误: 所有FFmpeg进程均启动失败")
+            sys.exit(1)
+        
+        print(f"\n成功启动 {len(self.ffmpeg_processes)}/{len(streams)} 个FFmpeg进程")
+        
+        # 收集所有流配置(用于PickupMonitor)
+        all_stream_configs = [p.stream_config for p in self.ffmpeg_processes]
+        
+        # 启动监控线程(统一一个监控器)
+        print("\n启动监控线程...")
+        check_interval = self.config.get('prediction', {}).get('check_interval', 1.0)
+        
+        monitor = PickupMonitor(
+            audio_dir=CFG.AUDIO_DIR,
+            multi_predictor=self.multi_predictor,
+            stream_configs=all_stream_configs,
+            check_interval=check_interval,
+            config=self.config,
+            config_manager=self.config_manager
+        )
+        monitor.start()
+        self.monitors.append(monitor)
+        
+        print(f"\n成功启动监控线程")
+        print("\n" + "=" * 70)
+        print("系统已启动,开始监控...")
+        print("按 Ctrl+C 停止系统")
+        print("=" * 70 + "\n")
+        
+        # FFmpeg重启配置
+        max_restart_attempts = 5        # 单个进程最大重启次数
+        restart_interval_base = 30    # 基础重启间隔(秒)-> 改为30s
+        restart_counts = {id(p): 0 for p in self.ffmpeg_processes}  # 重启计数
+        
+        # 主循环(带自动重启)
+        try:
+            while True:
+                # 检查每个FFmpeg进程状态
+                for ffmpeg in self.ffmpeg_processes:
+                    if not ffmpeg.is_running():
+                        pid = id(ffmpeg)
+                        device_code = ffmpeg.stream_config.device_code
+                        
+                        # 检查重启次数
+                        if restart_counts.get(pid, 0) < max_restart_attempts:
+                            # 计算等待时间(指数退避)
+                            wait_time = restart_interval_base * (2 ** restart_counts.get(pid, 0))
+                            logger.warning(f"FFmpeg进程停止: {device_code} | 将在{wait_time}秒后重启 | "
+                                         f"重试次数: {restart_counts.get(pid, 0) + 1}/{max_restart_attempts}")
+                            
+                            time.sleep(wait_time)
+                            
+                            # 尝试重启
+                            if ffmpeg.start():
+                                logger.debug(f"FFmpeg重启成功: {device_code}")
+                                restart_counts[pid] = 0  # 重置计数
+                            else:
+                                restart_counts[pid] = restart_counts.get(pid, 0) + 1
+                                logger.error(f"FFmpeg重启失败: {device_code}")
+                        else:
+                            logger.error(f"FFmpeg达到最大重启次数: {device_code} | 已放弃重启")
+                
+                # 检查是否所有进程都已放弃
+                running_count = sum(1 for p in self.ffmpeg_processes if p.is_running())
+                all_abandoned = all(restart_counts.get(id(p), 0) >= max_restart_attempts 
+                                   for p in self.ffmpeg_processes if not p.is_running())
+                
+                if running_count == 0 and all_abandoned:
+                    print("\n错误: 所有FFmpeg进程均已停止且无法重启")
+                    break
+                
+                # 每天0点执行7天清理
+                current_hour = datetime.now().hour
+                current_date = datetime.now().strftime("%Y%m%d")
+                
+                if not hasattr(self, '_last_cleanup_date'):
+                    self._last_cleanup_date = ""
+                
+                # 在0点且今天还没清理过时执行
+                if current_hour == 0 and self._last_cleanup_date != current_date:
+                    logger.info("执行7天过期文件清理...")
+                    for monitor in self.monitors:
+                        monitor._cleanup_old_files(days=7)
+                    self._last_cleanup_date = current_date
+                
+                # 定期打印RTSP状态(每分钟一次)
+                if not hasattr(self, '_last_status_log'):
+                    self._last_status_log = datetime.now()
+                
+                if (datetime.now() - self._last_status_log).total_seconds() >= 60:
+                    running = sum(1 for p in self.ffmpeg_processes if p.is_running())
+                    total = len(self.ffmpeg_processes)
+                    logger.info(f"RTSP状态: {running}/{total} 个FFmpeg进程运行中")
+                    logger.info("─" * 60)
+                    self._last_status_log = datetime.now()
+                
+                time.sleep(10)
+        except KeyboardInterrupt:
+            print("\n\n收到停止信号,正在关闭系统...")
+        finally:
+            self.stop()
+    
+    def stop(self):
+        """
+        停止监控系统
+        """
+        print("\n正在停止监控系统...")
+        
+        print("停止监控线程...")
+        for monitor in self.monitors:
+            monitor.stop()
+        
+        print("停止FFmpeg进程...")
+        for ffmpeg in self.ffmpeg_processes:
+            ffmpeg.stop()
+        
+        print("系统已完全停止")
+    
+    def _signal_handler(self, signum, frame):
+        """
+        信号处理函数
+        """
+        print(f"\n\n收到信号 {signum},正在关闭系统...")
+        self.stop()
+        sys.exit(0)
+
+
+def _start_config_api_server(config_manager, multi_predictor=None, port=18080):
+    # 在后台线程中启动 FastAPI 配置管理 API
+    try:
+        import uvicorn
+        init_config_api(config_manager, multi_predictor)
+        logger.info(f"启动配置管理 API: http://0.0.0.0:{port}")
+        uvicorn.run(config_app, host="0.0.0.0", port=port, log_level="warning")
+    except ImportError:
+        logger.warning("uvicorn 未安装,配置管理 API 无法启动。请安装: pip install uvicorn")
+    except Exception as e:
+        logger.error(f"配置管理 API 启动失败: {e}")
+
+
+def main():
+    """
+    主函数
+    """
+    # 检测 DB 是否存在
+    db_path = get_db_path(Path(__file__).parent / "config")
+    if not db_path.exists():
+        print(f"错误: 配置数据库不存在: {db_path}")
+        print(f"\n请先运行迁移脚本: python tool/migrate_yaml_to_db.py")
+        sys.exit(1)
+    
+    try:
+        system = PickupMonitoringSystem()
+        
+        # 后台线程启动配置管理 API
+        api_port = 18080
+        api_thread = threading.Thread(
+            target=_start_config_api_server,
+            args=(system.config_manager, system.multi_predictor, api_port),
+            daemon=True,
+            name="config-api"
+        )
+        api_thread.start()
+        system.start()
+    except FileNotFoundError as e:
+        print(f"\n错误: {e}")
+        print("\n请确保:")
+        print("1. 已完成训练并计算阈值")
+        print("2. 已复制必要的模型文件到 models/ 目录")
+        sys.exit(1)
+    except Exception as e:
+        print(f"\n严重错误: {e}")
+        import traceback
+        traceback.print_exc()
+        sys.exit(1)
+
+
+if __name__ == "__main__":
+    main()