#!/usr/bin/env python # -*- coding: utf-8 -*- """ run_pickup_monitor.py --------------------- 拾音器异响检测主程序 功能说明: 该程序用于音频采集设备(拾音器)的实时异常检测。 与摄像头版本不同,本版本: 1. 没有视频/图片采集和上报 2. 上报时包含音频频谱图分析数据 3. 支持进水流量PLC数据读取 4. 每1分钟计算一次平均重建误差并上报 上报数据结构: { "message": { "channelInfo": {"name": "设备信息"}, "requestTime": 时间戳, "classification": { "level_one": 2, // 音频检测大类 "level_two": 6 // 异常类型小类: 6=未分类, 7=轴承, 8=气蚀, 9=松动/共振, 10=叶轮, 11=阀件 }, "skillInfo": {"name": "异响检测"}, "sound_detection": { "abnormalwav": "", // base64编码的音频 "status": 0/1, // 0=异常, 1=正常 "condition": { "running_status": "运行中/停机中", // 启停状态 "inlet_flow": 0.0 // 进水流量 }, "score": { "abnormal_score": 0.0, // 当前1分钟内8s平均重构误差 "score_threshold": 0.0 // 该设备的异常阀值 }, "frequency": { "this_frequency": [], // 当前1分钟的频谱图 "normal_frequency_middle": [], // 过去10分钟的频谱图平均 "normal_frequency_upper": [], // 上限(暂为空) "normal_frequency_lower": [] // 下限(暂为空) } } } } 使用方法: python run_pickup_monitor.py 配置文件: config/rtsp_config.yaml """ import subprocess import time import re import signal import sys import threading import logging import base64 from concurrent.futures import ThreadPoolExecutor from pathlib import Path from datetime import datetime, timedelta from collections import defaultdict # 用于计算FFT import numpy as np try: import librosa except ImportError: print("错误:缺少librosa库,请安装: pip install librosa") sys.exit(1) try: import requests except ImportError: print("错误:缺少requests库,请安装: pip install requests") sys.exit(1) # 导入预测器模块 from predictor import MultiModelPredictor, CFG # 导入配置管理模块(SQLite) from config.config_manager import ConfigManager from config.config_api import app as config_app, init_config_api from config.db_models import get_db_path # 导入泵状态监控模块(用于检测启停过渡期) try: from core.pump_state_monitor import PumpStateMonitor PUMP_STATE_MONITOR_AVAILABLE = True except ImportError: PUMP_STATE_MONITOR_AVAILABLE = False # 导入人体检测读取模块(用于抑制有人时的误报) try: from core.human_detection_reader import HumanDetectionReader HUMAN_DETECTION_AVAILABLE = True except ImportError: HUMAN_DETECTION_AVAILABLE = False # 导入报警聚合器(跨设备聚合抑制 + 分类型冷却) try: from core.alert_aggregator import AlertAggregator ALERT_AGGREGATOR_AVAILABLE = True except ImportError: ALERT_AGGREGATOR_AVAILABLE = False # ======================================== # 配置日志系统 # ======================================== def setup_logging(): # 配置日志系统(RotatingFileHandler -> system.log) from logging.handlers import RotatingFileHandler # 如果根 logger 已经被上层调用者配置过,则直接复用 root = logging.getLogger() if root.handlers: return logging.getLogger('PickupMonitor') # 日志配置 log_dir = Path(__file__).parent / "logs" log_dir.mkdir(parents=True, exist_ok=True) log_file = log_dir / "system.log" formatter = logging.Formatter( '%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 按文件大小轮转,最多保留 2 个备份(共 30MB) file_handler = RotatingFileHandler( log_file, maxBytes=10 * 1024 * 1024, backupCount=2, encoding='utf-8' ) file_handler.setFormatter(formatter) # 控制台输出(前台运行时可见,后台运行时 stdout 已被丢弃不影响) console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(formatter) logging.basicConfig( level=logging.INFO, handlers=[file_handler, console_handler] ) return logging.getLogger('PickupMonitor') # 初始化日志系统 logger = setup_logging() # 导入能量基线模块 try: from core.energy_baseline import EnergyBaseline ENERGY_BASELINE_AVAILABLE = True except ImportError: ENERGY_BASELINE_AVAILABLE = False logger.warning("能量基线模块未找到,泵状态检测功能禁用") class RTSPStreamConfig: """ RTSP流配置类 封装单个RTSP流的配置信息,包含拾音器特有字段 """ def __init__(self, plant_name, rtsp_url, channel, camera_name, device_code, pump_name, flow_plc, project_id): """ 初始化RTSP流配置 参数: plant_name: 区域名称(泵房-反渗透高压泵等) rtsp_url: RTSP流URL channel: 通道号 camera_name: 设备名称 device_code: 设备编号(如1#-1) pump_name: 泵名称(A/B/C/D),用于匹配流量PLC flow_plc: 流量PLC地址映射 """ self.plant_name = plant_name self.rtsp_url = rtsp_url self.channel = channel self.pump_id = f"ch{channel}" self.camera_name = camera_name or f"ch{channel}" self.device_code = device_code self.pump_name = pump_name self.flow_plc = flow_plc or {} self.project_id = project_id def get_flow_plc_address(self): """ 获取该设备对应的进水流量PLC地址 返回: PLC地址字符串,不存在则返回空字符串 """ if self.pump_name and self.flow_plc: return self.flow_plc.get(self.pump_name, "") return "" def __repr__(self): return f"RTSPStreamConfig(plant='{self.plant_name}', camera='{self.camera_name}', pump='{self.pump_id}')" class FFmpegProcess: """ FFmpeg进程管理类 负责启动和管理单个RTSP流的FFmpeg进程。 进程将RTSP流转换为固定时长的WAV音频文件。 文件名格式: {project_id}_{device_code}_{时间戳}.wav """ def __init__(self, stream_config, output_dir, config=None): """ 初始化FFmpeg进程管理器 参数: stream_config: RTSP流配置 output_dir: 音频输出目录 config: 全局配置字典 """ self.config_dict = config or {} self.stream_config = stream_config self.output_dir = output_dir self.process = None # 从配置中读取文件时长,默认8秒 audio_cfg = self.config_dict.get('audio', {}) self.file_duration = audio_cfg.get('file_duration', 8) # 获取project_id(从 stream_config 中读取) self.project_id = stream_config.project_id def start(self): """ 启动FFmpeg进程 返回: bool: 启动成功返回True,失败返回False """ # 获取设备编号 device_code = self.stream_config.device_code or self.stream_config.pump_id # 创建输出目录(每个设备独立目录) # 结构: data/{device_code}/current/ current_dir = self.output_dir / device_code / "current" current_dir.mkdir(parents=True, exist_ok=True) # 构建输出文件名模板 # 格式: {project_id}_{device_code}_{时间戳}.wav # 例如: 92_1#-1_20251218142000.wav output_pattern = str(current_dir / f"{self.project_id}_{device_code}_%Y%m%d%H%M%S.wav") # 构建FFmpeg命令 # 添加内存限制参数,防止 RTSP 缓冲区无限增长导致 OOM cmd = [ "ffmpeg", # RTSP 输入参数(内存限制) "-rtsp_transport", "tcp", # 使用TCP传输 "-probesize", "1000000", # 限制探测大小为1MB(默认5MB) "-analyzeduration", "1000000", # 限制分析时长为1秒(默认5秒) "-max_delay", "500000", # 最大延迟500ms "-fflags", "nobuffer", # 禁用输入缓冲 "-flags", "low_delay", # 低延迟模式 "-i", self.stream_config.rtsp_url, # 输入RTSP流 # 音频输出参数 "-vn", # 不处理视频 "-ac", "1", # 单声道 "-ar", str(CFG.SR), # 采样率(16000Hz) "-f", "segment", # 分段模式 "-segment_time", str(int(self.file_duration)), # 每段时长 "-strftime", "1", # 启用时间格式化 "-loglevel", "error", # 只输出错误日志 "-y", # 覆盖已存在的文件 output_pattern, ] try: # 启动FFmpeg进程 self.process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) logger.info(f"FFmpeg已启动: {device_code} | {self.stream_config.camera_name} | " f"文件时长: {self.file_duration}秒 | PID={self.process.pid}") return True except FileNotFoundError: logger.error("FFmpeg错误: 未找到ffmpeg命令,请确保已安装FFmpeg") return False except Exception as e: logger.error(f"FFmpeg启动失败: {device_code} | 错误: {e}") return False def is_running(self): """ 检查FFmpeg进程是否在运行 返回: bool: 进程运行中返回True,否则返回False """ if self.process is None: return False return self.process.poll() is None def stop(self): """ 停止FFmpeg进程 """ if self.process is not None and self.is_running(): logger.info(f"FFmpeg停止: {self.stream_config.plant_name} | {self.stream_config.camera_name} | PID={self.process.pid}") self.process.terminate() try: self.process.wait(timeout=5) except subprocess.TimeoutExpired: logger.warning(f"FFmpeg强制终止: PID={self.process.pid}") self.process.kill() class PickupMonitor: """ 拾音器监控线程类 监控音频目录,调用预测器检测异常,推送告警。 主要功能: 1. 不截取视频帧(纯音频) 2. 计算并上报频谱图数据(this_frequency + normal_frequency_middle) 3. 每分钟汇总上报一次 4. 使用SCADA API获取进水流量 """ def __init__(self, audio_dir, multi_predictor, stream_configs, check_interval=1.0, config=None, config_manager=None): """ 初始化监控器 Args: audio_dir: 音频根目录 multi_predictor: 多模型预测器实例 stream_configs: 所有RTSP流配置 check_interval: 检查间隔(秒) config: 配置字典 config_manager: ConfigManager 实例(用于热更新,为 None 时禁用热更新) """ # 音频根目录(各设备目录在其下) self.audio_dir = audio_dir self.multi_predictor = multi_predictor self.predictor = None # 兼容性保留,已废弃 self.stream_configs = stream_configs self.check_interval = check_interval self.config = config or {} # 热更新:持有 ConfigManager 引用,定期从 DB 刷新配置 self._config_manager = config_manager self._last_config_reload = 0 # 上次配置刷新时间戳 self._config_reload_interval = 30 # 配置刷新间隔(秒) # project_id self.project_id = self.config.get('platform', {}).get('project_id', 92) # 构建device_code到stream_config的映射 self.device_map = {} for cfg in stream_configs: if cfg.device_code: self.device_map[cfg.device_code] = cfg # 已处理文件集合 self.seen_files = set() # 每个设备的检测结果缓存(用于1分钟汇总) # key: device_code(如 "1#-1") self.device_cache = defaultdict(lambda: { "errors": [], # 每8秒的重建误差列表 "last_upload": None, # 上次上报时间 "audio_data": [], # 用于计算频谱图的音频数据 "status": None # 最近的运行状态 }) # 频谱图历史缓存(用于计算normal_frequency_middle) # key: device_code, value: list of (timestamp, freq_db) freq_cfg = self.config.get('prediction', {}).get('frequency_history', {}) self.freq_history_enabled = freq_cfg.get('enabled', True) self.freq_history_minutes = freq_cfg.get('history_minutes', 10) self.freq_history = defaultdict(list) # 上一次上报状态(True=异常,False=正常,None=初始) # 用于状态变更时去重,防止持续报警 self.last_report_status = {} # 上报周期(秒) audio_cfg = self.config.get('audio', {}) self.segment_duration = audio_cfg.get('segment_duration', 60) # 异常音频保存配置 save_cfg = self.config.get('prediction', {}).get('save_anomaly_audio', {}) self.save_anomaly_enabled = save_cfg.get('enabled', True) self.save_anomaly_dir = Path(__file__).parent / save_cfg.get('save_dir', 'data/anomaly_detected') if self.save_anomaly_enabled: self.save_anomaly_dir.mkdir(parents=True, exist_ok=True) # 异常推送配置 push_cfg = self.config.get('push_notification', {}) self.push_enabled = push_cfg.get('enabled', False) self.alert_enabled = push_cfg.get('alert_enabled', True) # 是否启用异常告警(false=暂时禁用异常上报) self.push_timeout = push_cfg.get('timeout', 30) self.push_retry_count = push_cfg.get('retry_count', 2) # 推送基地址列表(统一用 base_url/{project_id} 拼接,可无限扩展) raw_urls = push_cfg.get('push_base_urls', []) self.push_base_urls = [ {"label": item.get("label", f"target-{i}"), "url": item.get("url", "").rstrip("/")} for i, item in enumerate(raw_urls) if item.get("url") # 跳过空URL ] # 推送失败记录文件路径 failed_log_path = push_cfg.get('failed_push_log', 'data/push_failures.jsonl') self.failed_push_log = Path(__file__).parent / failed_log_path self.failed_push_log.parent.mkdir(parents=True, exist_ok=True) # 如果 alert_enabled 为 False,记录日志提醒 if not self.alert_enabled: logger.warning("异常告警已禁用(alert_enabled=false),仅上报心跳状态") # ======================================== # 报警聚合器(替代原有固定 cooldown_minutes) # ---------------------------------------- # 功能1:跨设备聚合抑制 - 同一水厂多设备同时报警 -> 环境噪声,全部抑制 # 功能2:分类型冷却 - 同类型24h,不同类型1h # ======================================== agg_cfg = push_cfg.get('alert_aggregate', {}) self.alert_aggregator = None if ALERT_AGGREGATOR_AVAILABLE: self.alert_aggregator = AlertAggregator( push_callback=self._push_detection_result, aggregate_enabled=agg_cfg.get('enabled', True), window_seconds=agg_cfg.get('window_seconds', 300), min_devices=agg_cfg.get('min_devices', 2), cooldown_same_type_hours=push_cfg.get('cooldown_same_type_hours', 24), cooldown_diff_type_hours=push_cfg.get('cooldown_diff_type_hours', 1) ) else: logger.warning("报警聚合器模块未找到,使用默认报警逻辑") # 上次异常音频保存时间(用于保存冷却时间计算) self.last_anomaly_save_time = {} # 异常保存冷却时间(分钟),同一设备连续异常时,每N分钟只保存一次 self.anomaly_save_cooldown_minutes = save_cfg.get('cooldown_minutes', 10) # 当前异常分类结果锁定(持续异常期间保持分类结果不变) # key: device_code, value: (anomaly_type_code, type_name) self.locked_anomaly_type = {} # 滑动窗口投票配置(5次中有3次异常才判定为异常) voting_cfg = self.config.get('prediction', {}).get('voting', {}) self.voting_enabled = voting_cfg.get('enabled', True) self.voting_window_size = voting_cfg.get('window_size', 5) # 窗口大小 self.voting_threshold = voting_cfg.get('threshold', 3) # 异常阈值(>=3次则判定异常) self.detection_history = {} # 每个设备的检测历史(True=异常) # 阈值容差区间配置(避免边界值反复跳变) # 误差在 threshold*(1-tolerance) ~ threshold*(1+tolerance) 范围内为灰区,维持上一状态 self.tolerance_ratio = voting_cfg.get('tolerance_ratio', 0.05) # 默认5%容差 self.last_single_anomaly = {} # 每个设备上一次的单周期判定结果 # 阈值现在由 multi_predictor 管理,每个设备从其对应模型目录加载 # 能量检测配置 energy_cfg = self.config.get('prediction', {}).get('energy_detection', {}) self.energy_detection_enabled = ENERGY_BASELINE_AVAILABLE and energy_cfg.get('enabled', True) self.skip_detection_when_stopped = energy_cfg.get('skip_when_stopped', True) # 能量基线检测器(每个设备一个) if self.energy_detection_enabled: self.energy_baselines = {} else: self.energy_baselines = None # SCADA API配置(用于获取泵状态和进水流量) scada_cfg = self.config.get('scada_api', {}) self.scada_enabled = scada_cfg.get('enabled', False) self.scada_url = scada_cfg.get('base_url', '') # 历史数据接口(备用) self.scada_realtime_url = scada_cfg.get('realtime_url', '') # 实时数据接口(主用) self.scada_jwt = scada_cfg.get('jwt_token', '') self.scada_timeout = scada_cfg.get('timeout', 10) # 泵状态监控器(用于检测启停过渡期) self.pump_state_monitor = None self.pump_status_plc_configs = {} # {pump_name: [{point, name}, ...]} if PUMP_STATE_MONITOR_AVAILABLE and self.scada_enabled: # 初始化泵状态监控器 # 获取第一个启用水厂的project_id project_id = 0 for plant in self.config.get('plants', []): if plant.get('enabled', False): project_id = plant.get('project_id', 0) # 加载泵状态点位配置 pump_status_plc = plant.get('pump_status_plc', {}) self.pump_status_plc_configs = pump_status_plc break if project_id > 0: # 使用实时接口 URL 进行泵状态查询 self.pump_state_monitor = PumpStateMonitor( scada_url=self.scada_realtime_url, # 使用实时数据接口 scada_jwt=self.scada_jwt, project_id=project_id, timeout=self.scada_timeout, transition_window_minutes=15 # 启停后15分钟内视为过渡期 ) logger.info(f"泵状态监控器已启用 (project_id={project_id}, 过渡期窗口=15分钟, 使用实时接口)") # 线程控制 self.running = False self.thread = None # 推送线程池(避免推送超时阻塞主监控循环) self._push_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="push") # 启动时间(用于跳过启动期间的状态变化日志) self.startup_time = None self.startup_warmup_seconds = 120 # 启动后120秒内不记录状态变化 # ======================================== # 异常上下文捕获配置 # ======================================== context_cfg = save_cfg.get('context_capture', {}) self.context_capture_enabled = context_cfg.get('enabled', False) self.context_pre_minutes = context_cfg.get('pre_minutes', 2) self.context_post_minutes = context_cfg.get('post_minutes', 2) # 音频文件历史缓存(用于捕获异常前的音频) # key: device_code, value: deque of (timestamp, file_path) from collections import deque # 计算需要保留的历史文件数量(按分钟计算,每分钟约1个文件) history_size = self.context_pre_minutes + 2 # 多保留2个作为缓冲 self.audio_file_history = defaultdict(lambda: deque(maxlen=history_size)) # 异常上下文捕获状态 # key: device_code, value: dict self.anomaly_capture_state = {} if self.context_capture_enabled: logger.info(f"异常上下文捕获已启用 (前{self.context_pre_minutes}分钟 + 后{self.context_post_minutes}分钟)") # ======================================== # 人体检测报警抑制配置 # ======================================== # 只要任意摄像头在冷却时间内检测到人,所有设备的异常报警都会被抑制 human_cfg = self.config.get('human_detection', {}) self.human_detection_enabled = human_cfg.get('enabled', False) and HUMAN_DETECTION_AVAILABLE self.human_reader = None if self.human_detection_enabled: db_path = human_cfg.get('db_path', '') cooldown_minutes = human_cfg.get('cooldown_minutes', 5) self.human_reader = HumanDetectionReader( db_path=db_path, cooldown_minutes=cooldown_minutes ) logger.info(f"人体检测报警抑制已启用 (冷却时间={cooldown_minutes}分钟, 数据库={db_path})") def start(self): """ 启动监控线程 """ if self.running: return # 启动前清理current目录中的遗留文件 self._cleanup_current_on_startup() self.running = True self.startup_time = datetime.now() # 记录启动时间 self.thread = threading.Thread(target=self._monitor_loop, daemon=True) self.thread.start() # 打印监控的设备列表 device_codes = list(self.device_map.keys()) logger.info(f"监控线程已启动 | 目录: {self.audio_dir} | 监控设备: {device_codes}") def _cleanup_current_on_startup(self): """ 启动时清理current目录中的遗留文件 删除上次运行遗留的文件,避免混入新采集数据影响检测准确性 """ cleaned_count = 0 for device_code in self.device_map.keys(): current_dir = self.audio_dir / device_code / "current" if not current_dir.exists(): continue for wav_file in current_dir.glob("*.wav"): try: wav_file.unlink() cleaned_count += 1 except Exception as e: logger.warning(f"清理遗留文件失败: {wav_file.name} | {e}") if cleaned_count > 0: logger.info(f"启动清理: 已删除current目录中 {cleaned_count} 个遗留文件") def stop(self): """ 停止监控线程 """ if not self.running: return self.running = False if self.thread is not None: self.thread.join(timeout=5) # 关闭推送线程池,等待已提交的推送任务完成 self._push_executor.shutdown(wait=True, cancel_futures=False) logger.info(f"监控线程已停止") def _reload_hot_config(self): # 从 DB 热加载可变配置项,无需重启服务 # 只刷新运行时可安全变更的参数,不影响 FFmpeg 进程和流映射 if self._config_manager is None: return now = time.time() # 未达到刷新间隔则跳过 if now - self._last_config_reload < self._config_reload_interval: return self._last_config_reload = now try: # 从 DB 读取最新配置 fresh = self._config_manager.get_full_config() self.config = fresh # 刷新推送配置 push_cfg = fresh.get('push_notification', {}) self.push_enabled = push_cfg.get('enabled', False) self.alert_enabled = push_cfg.get('alert_enabled', True) self.push_timeout = push_cfg.get('timeout', 30) self.push_retry_count = push_cfg.get('retry_count', 2) raw_urls = push_cfg.get('push_base_urls', []) self.push_base_urls = [ {"label": item.get("label", f"target-{i}"), "url": item.get("url", "").rstrip("/")} for i, item in enumerate(raw_urls) if item.get("url") ] # 刷新投票配置 voting_cfg = fresh.get('prediction', {}).get('voting', {}) self.voting_enabled = voting_cfg.get('enabled', True) self.voting_window_size = voting_cfg.get('window_size', 5) self.voting_threshold = voting_cfg.get('threshold', 3) self.tolerance_ratio = voting_cfg.get('tolerance_ratio', 0.05) # 刷新能量检测配置 energy_cfg = fresh.get('prediction', {}).get('energy_detection', {}) self.skip_detection_when_stopped = energy_cfg.get('skip_when_stopped', True) # 刷新人体检测配置 human_cfg = fresh.get('human_detection', {}) new_human_enabled = human_cfg.get('enabled', False) if new_human_enabled != self.human_detection_enabled: logger.info(f"人体检测抑制配置变更: {self.human_detection_enabled} -> {new_human_enabled}") self.human_detection_enabled = new_human_enabled logger.debug("配置热刷新完成") except Exception as e: logger.error(f"配置热刷新失败: {e}") def _monitor_loop(self): """ 监控循环(线程主函数) 持续检查各设备的音频目录,处理新生成的WAV文件。 每分钟汇总一次结果进行上报。 """ # 确保根目录存在 self.audio_dir.mkdir(parents=True, exist_ok=True) while self.running: try: # 热更新:定期从 DB 刷新可变配置 self._reload_hot_config() # 扫描所有设备目录下的current文件夹 for device_code in self.device_map.keys(): device_current_dir = self.audio_dir / device_code / "current" # 目录不存在则跳过 if not device_current_dir.exists(): continue for wav_file in device_current_dir.glob("*.wav"): # 跳过已处理的文件 if wav_file in self.seen_files: continue # 检查文件是否完整 try: stat_info = wav_file.stat() file_age = time.time() - stat_info.st_mtime file_size = stat_info.st_size # 文件修改时间检查(确保文件写入完成) # FFmpeg生成文件需要时间,太新的文件可能还没写完 if file_age < 12.0: continue # 文件大小检查(60秒 × 16000Hz × 2字节 ≈ 1.9MB) # 范围:500KB - 3MB if file_size < 500_000 or file_size > 3_000_000: # 只有当文件生成已经有一段时间(>20秒)且依然很小,才判定为异常 # 这样可以避免刚开始生成、正在写入的文件被误报 if file_age > 20.0: logger.warning(f"文件大小异常: {wav_file.name} ({file_size / 1000:.1f}KB)") # 自动删除过小的文件(通常是0KB或损坏文件) if file_size < 500_000: try: wav_file.unlink() logger.debug(f"删除异常小文件: {wav_file.name}") except Exception as e: logger.error(f"删除文件失败: {wav_file.name} | {e}") self.seen_files.add(wav_file) continue # 继续下一个文件循环 # 正常大小的文件继续处理 if file_size < 500_000: # 双重检查,防止漏网 continue except Exception as e: logger.error(f"文件状态检查失败: {wav_file.name} | {e}") continue # 处理新文件 self._process_new_file(wav_file) # 标记为已处理 self.seen_files.add(wav_file) # 检查是否需要进行周期性上报 self._check_periodic_upload() # 检查聚合窗口是否到期,到期则执行聚合判定并推送 if self.alert_aggregator: self.alert_aggregator.check_and_flush() # 清理过大的已处理文件集合 if len(self.seen_files) > 10000: recent_files = sorted(self.seen_files, key=lambda f: f.stat().st_mtime if f.exists() else 0)[-5000:] self.seen_files = set(recent_files) except Exception as e: logger.error(f"监控循环错误: {e}") # 等待下一次检查 time.sleep(self.check_interval) def _process_new_file(self, wav_file): """ 处理新的音频文件 文件名格式: {project_id}_{device_code}_{时间戳}.wav 例如: 92_1#-1_20251218142000.wav 流程: 1. 加载音频 2. 计算能量判断设备状态 3. 进行AE异常检测,记录重建误差 4. 保存音频数据用于计算频谱图 """ try: # 从文件名解析device_code # 格式: {project_id}_{device_code}_{时间戳}.wav try: parts = wav_file.stem.split('_') if len(parts) >= 3: device_code = parts[1] # 第二部分是device_code else: device_code = "unknown" except: device_code = "unknown" # ======================================== # 泵启停状态检测(基于 PLC 查询)- 优先于模型检查 # ---------------------------------------- # 逻辑:通过 SCADA API 查询 PLC 点位判断泵是否运行 # 作用: # 1. 冷启动模式下也能过滤停机时的音频(保证训练数据质量) # 2. 泵停机时跳过异常检测(避免无意义的检测) # 3. 记录设备状态用于后续上报 # 依赖:pump_state_monitor + rtsp_config 中的 pump_status_plc 配置 # ======================================== device_status = "未知" pump_is_running = True # 默认认为运行中(PLC 查询失败时的保守策略) # 获取该设备对应的流配置 stream_config = self.device_map.get(device_code) if self.pump_state_monitor and stream_config: # 根据设备的 pump_name 找到关联的 PLC 点位配置 pump_name = stream_config.pump_name pump_configs = self.pump_status_plc_configs.get(pump_name, []) if pump_configs: # 遍历所有关联泵,只要有一个运行就认为设备在工作 any_pump_running = False for pump_cfg in pump_configs: point = pump_cfg.get("point", "") name = pump_cfg.get("name", point) pump_id = point # 查询泵状态(带 60 秒缓存,不会频繁请求 SCADA) is_running, _ = self.pump_state_monitor.update_pump_state(pump_id, point, name) if is_running: any_pump_running = True pump_is_running = any_pump_running device_status = "开机" if pump_is_running else "停机" # 泵全部停机时跳过(可通过配置禁用此行为) # 冷启动和正常模式都适用,确保训练数据质量 if self.skip_detection_when_stopped and not pump_is_running: logger.info(f"泵停机(PLC): {device_code} | 归档到过渡期目录(不用于训练)") self._move_audio_to_transition_dir(wav_file, "stopped") return # ======================================== # 过渡期检测(泵启停后15分钟内) # ---------------------------------------- # 目的:过滤启停过程中的不稳定音频 # 确保训练数据只包含稳定运行期的音频 # ======================================== if self.skip_detection_when_stopped: pump_in_transition, transition_pump_names = \ self.pump_state_monitor.check_pumps_transition(pump_configs) if pump_in_transition: logger.info(f"泵过渡期: {device_code} | 归档到过渡期目录(不用于训练) | " f"过渡期泵: {', '.join(transition_pump_names)}") self._move_audio_to_transition_dir(wav_file, "transition") return # 获取该设备的预测器(懒加载模型) device_predictor = self.multi_predictor.get_predictor(device_code) if device_predictor is None: logger.info(f"冷启动模式(设备 {device_code} 无模型): 归档 {wav_file.name}") self._move_audio_to_date_dir(wav_file) return # 加载音频 try: y, sr = librosa.load(str(wav_file), sr=CFG.SR, mono=True) except Exception as e: logger.error(f"音频加载失败: {wav_file.name} | {e}") return # 记录状态到缓存(用于周期上报) # 注意:泵状态检测已在前面完成,这里只记录状态 self.device_cache[device_code]["status"] = device_status # ======================================== # 计算重建误差 # ======================================== error = self._compute_reconstruction_error(wav_file, device_predictor) if error is not None: self.device_cache[device_code]["errors"].append(error) # ======================================== # 保存音频数据用于计算频谱图 # ======================================== self.device_cache[device_code]["audio_data"].append(y) # ======================================== # 暂存文件路径,等待周期聚合判定后再归档 # ======================================== if "pending_files" not in self.device_cache[device_code]: self.device_cache[device_code]["pending_files"] = [] self.device_cache[device_code]["pending_files"].append(wav_file) # ======================================== # 记录到音频历史缓存(用于异常上下文捕获) # ======================================== if self.context_capture_enabled: self.audio_file_history[device_code].append((datetime.now(), wav_file)) # 初始化上次上报时间 if self.device_cache[device_code]["last_upload"] is None: self.device_cache[device_code]["last_upload"] = datetime.now() # 获取阈值并判断结果 threshold = self._get_threshold(device_code) # ======================================== # 快速通道:连续多个文件误差极高时快速预警(暂时关闭) # 不走投票窗口,用于捕获突发性严重故障 # ======================================== # if error is not None and threshold is not None and threshold > 0: # # 维护快速通道缓冲区 # if "fast_alert_buffer" not in self.device_cache[device_code]: # self.device_cache[device_code]["fast_alert_buffer"] = [] # # fast_buf = self.device_cache[device_code]["fast_alert_buffer"] # # 连续性检查:误差超过 2x 阈值时记录,否则清空缓冲区 # if error > threshold * 2.0: # fast_buf.append(error) # else: # fast_buf.clear() # # # 连续 3 个文件(~24秒)都超过 2x 阈值 → 触发快速预警 # FAST_ALERT_CONSECUTIVE = 3 # if len(fast_buf) >= FAST_ALERT_CONSECUTIVE: # # 检查是否已触发过(避免重复告警) # last_fast = self.device_cache[device_code].get("last_fast_alert_time") # now = datetime.now() # can_fast_alert = (last_fast is None or # (now - last_fast).total_seconds() > 300) # 5分钟冷却 # # if can_fast_alert: # # 快速通道同样受抑制逻辑约束 # suppress = False # # 泵过渡期抑制 # if self.pump_state_monitor and stream_config: # pump_name = stream_config.pump_name # pump_configs = self.pump_status_plc_configs.get(pump_name, []) # if pump_configs: # in_transition, _ = self.pump_state_monitor.check_pumps_transition(pump_configs) # if in_transition: # suppress = True # # 人体检测抑制 # if self.human_detection_enabled and self.human_reader: # if self.human_reader.is_in_cooldown(): # suppress = True # # alert_enabled 开关 # if not self.alert_enabled: # suppress = True # # if not suppress: # avg_fast = float(np.mean(fast_buf)) # logger.warning( # f"[!!] 快速通道触发: {device_code} | " # f"连续{len(fast_buf)}个文件异常 | " # f"平均误差={avg_fast:.6f} 阈值={threshold:.6f}") # self.device_cache[device_code]["last_fast_alert_time"] = now # fast_buf.clear() # # 标记快速预警,在下次周期上报时一并处理 # self.device_cache[device_code]["fast_alert_pending"] = True if error is not None and threshold is not None: is_anomaly = error > threshold result_tag = "!!" if is_anomaly else "OK" logger.info(f"[{result_tag}] {device_code} | {wav_file.name} | " f"误差={error:.6f} 阈值={threshold:.6f}") elif error is not None: logger.debug(f"文件预测: {wav_file.name} | 误差={error:.6f} | 阀值=未设置") else: logger.warning(f"预测跳过: {wav_file.name} | 误差计算失败") except Exception as e: logger.error(f"处理文件失败: {wav_file.name} | 错误: {e}") def _compute_reconstruction_error(self, wav_file, device_predictor): """ 计算单个音频文件的重建误差(Min-Max 标准化) 使用8秒窗口、4秒步长切割音频,提取多个patches分别计算误差后取平均值。 参数: wav_file: 音频文件路径 device_predictor: 设备预测器实例 返回: 重建误差值(所有patches的平均MSE),失败返回None """ try: import torch from predictor.utils import align_to_target # Min-Max 标准化参数 global_min = device_predictor.global_min global_max = device_predictor.global_max # 加载音频 y, _ = librosa.load(str(wav_file), sr=CFG.SR, mono=True) win_samples = int(CFG.WIN_SEC * CFG.SR) hop_samples = int(CFG.HOP_SEC * CFG.SR) if len(y) < win_samples: logger.warning(f"音频太短,无法提取patches: {wav_file.name}") return None patches = [] for start in range(0, len(y) - win_samples + 1, hop_samples): window = y[start:start + win_samples] S = librosa.feature.melspectrogram( y=window, sr=CFG.SR, n_fft=CFG.N_FFT, hop_length=CFG.HOP_LENGTH, n_mels=CFG.N_MELS, power=2.0 ) S_db = librosa.power_to_db(S, ref=np.max) if S_db.shape[1] < CFG.TARGET_FRAMES: S_db = np.pad(S_db, ((0, 0), (0, CFG.TARGET_FRAMES - S_db.shape[1]))) else: S_db = S_db[:, :CFG.TARGET_FRAMES] # Min-Max 标准化 S_norm = (S_db - global_min) / (global_max - global_min + 1e-6) patches.append(S_norm.astype(np.float32)) if not patches: logger.warning(f"未能提取任何patches: {wav_file.name}") return None arr = np.stack(patches, 0) arr = np.expand_dims(arr, 1) tensor = torch.from_numpy(arr) torch_device = device_predictor.torch_device tensor = tensor.to(torch_device) with torch.no_grad(): recon = device_predictor.model(tensor) recon = align_to_target(recon, tensor) mse_per_patch = torch.mean((recon - tensor) ** 2, dim=[1, 2, 3]) mean_mse = torch.mean(mse_per_patch).item() logger.debug(f"重建误差: {wav_file.name} | patches={len(patches)} | MSE={mean_mse:.6f}") return mean_mse except Exception as e: logger.error(f"计算重建误差失败: {wav_file.name} | {e}") return None def _check_periodic_upload(self): """ 检查是否需要进行周期性上报 每分钟汇总一次各设备的检测结果并上报 """ now = datetime.now() for device_code, cache in self.device_cache.items(): # 检查上报时间间隔 last_upload = cache.get("last_upload") if last_upload is None: continue elapsed = (now - last_upload).total_seconds() # 达到上报周期 if elapsed >= self.segment_duration: # 获取该设备的流配置 stream_config = self.device_map.get(device_code) # 计算平均重建误差 errors = cache.get("errors", []) avg_error = float(np.mean(errors)) if errors else 0.0 # 获取阈值 threshold = self._get_threshold(device_code) # 判断当前周期是否异常(带容差区间) # 容差区间:避免边界值反复跳变,但灰区内仍与阈值比较 if threshold: upper_bound = threshold * (1 + self.tolerance_ratio) # 确定异常边界 lower_bound = threshold * (1 - self.tolerance_ratio) # 确定正常边界 if avg_error > upper_bound: # 超过上边界 -> 确定异常 is_current_anomaly = True elif avg_error < lower_bound: # 低于下边界 -> 确定正常 is_current_anomaly = False else: # 灰区 -> 与阈值比较(避免异常状态延长) is_current_anomaly = avg_error > threshold # 记录本次判定结果 self.last_single_anomaly[device_code] = is_current_anomaly else: is_current_anomaly = False # ======================================== # 滑动窗口投票:5次中有3次异常才判定为异常 # ======================================== if device_code not in self.detection_history: self.detection_history[device_code] = [] # 记录本次检测结果 self.detection_history[device_code].append(is_current_anomaly) # 保持窗口大小 if len(self.detection_history[device_code]) > self.voting_window_size: self.detection_history[device_code].pop(0) # 投票判定最终异常状态 if self.voting_enabled and len(self.detection_history[device_code]) >= self.voting_window_size: anomaly_count = sum(self.detection_history[device_code]) is_anomaly = anomaly_count >= self.voting_threshold window_info = f"窗口[{anomaly_count}/{self.voting_window_size}]" else: is_anomaly = is_current_anomaly window_info = "窗口未满" # ======================================== # 状态变更检测 # trigger_alert: 仅在 正常(或初始) -> 异常 时为True # 冷却逻辑已移至 AlertAggregator 内部处理 # ======================================== last_is_anomaly = self.last_report_status.get(device_code) trigger_alert = False if is_anomaly: # 只有状态变化(正常->异常) 才触发报警流程 if last_is_anomaly is None or not last_is_anomaly: # ======================================== # 泵启停过渡期检查(抑制误报) # ---------------------------------------- # 逻辑:检测到异常时,检查关联泵是否刚启停 # 作用:泵启停过程中音频特征剧烈变化,易被误判为异常 # 过渡期内抑制告警,避免误报 # 过渡期窗口:配置中的 transition_window_minutes(默认15分钟) # ======================================== pump_in_transition = False transition_pump_names = [] if self.pump_state_monitor and stream_config: pump_name = stream_config.pump_name pump_configs = self.pump_status_plc_configs.get(pump_name, []) if pump_configs: # 批量检查所有关联泵是否有处于过渡期的 pump_in_transition, transition_pump_names = \ self.pump_state_monitor.check_pumps_transition(pump_configs) if pump_in_transition: # 有泵处于过渡期 -> 抑制本次告警 trigger_alert = False logger.info(f"泵启停过渡期,抑制告警: {device_code} | " f"过渡期泵: {', '.join(transition_pump_names)}") else: # 检查 alert_enabled 开关:如果禁用则不触发告警 if self.alert_enabled: # ======================================== # 人体检测抑制:任意摄像头检测到人则不报警 # ======================================== if self.human_detection_enabled and self.human_reader: if self.human_reader.is_in_cooldown(): trigger_alert = False status_info = self.human_reader.get_status_info() logger.info(f"人体检测抑制: {device_code} | {status_info},跳过报警") else: trigger_alert = True else: trigger_alert = True else: trigger_alert = False logger.debug(f"异常告警已禁用,跳过告警: {device_code}") # 获取运行状态 running_status = cache.get("status", "未知") # 获取进水流量 inlet_flow = self._get_inlet_flow(stream_config) if stream_config else 0.0 # 计算本次频谱图 audio_data = cache.get("audio_data", []) freq_db = self._compute_frequency_spectrum(audio_data) # 保存到频谱图历史(只保存dB值) if self.freq_history_enabled and freq_db: self.freq_history[device_code].append((now, freq_db)) # 清理过期历史 cutoff = now - timedelta(minutes=self.freq_history_minutes) self.freq_history[device_code] = [ (t, d) for t, d in self.freq_history[device_code] if t > cutoff ] # 计算历史频谱图平均值(normal_frequency_middle) freq_middle_db = self._compute_frequency_middle(device_code) # ======================================== # 异常分类:只在状态从正常变为异常时进行分类 # 持续异常期间沿用上次分类结果,保持一致性 # ======================================== anomaly_type_code = 6 # 默认:未分类异常 type_name = "未分类异常" if is_anomaly and audio_data: # 检查是否是新的异常(从正常变为异常) is_new_anomaly = (last_is_anomaly is None or not last_is_anomaly) if is_new_anomaly: # 新异常:进行分类并锁定结果 try: from core.anomaly_classifier import classify_anomaly if len(audio_data) > 0: y = audio_data[-1] if isinstance(audio_data[-1], np.ndarray) else np.array(audio_data[-1]) anomaly_type_code, type_name, confidence = classify_anomaly(y, sr=16000) # 锁定分类结果 self.locked_anomaly_type[device_code] = (anomaly_type_code, type_name) logger.info(f"异常分类(新异常): {type_name} (code={anomaly_type_code}, 置信度={confidence:.2f})") except Exception as e: logger.warning(f"异常分类失败: {e}") else: # 持续异常:沿用锁定的分类结果 if device_code in self.locked_anomaly_type: anomaly_type_code, type_name = self.locked_anomaly_type[device_code] logger.debug(f"异常分类(沿用): {type_name} (code={anomaly_type_code})") else: # 状态正常时清除锁定的分类结果 if device_code in self.locked_anomaly_type: del self.locked_anomaly_type[device_code] # 上报逻辑 if self.push_enabled and stream_config: # 预读异常音频base64:在文件被归档/清空之前立即读取 # 解决竞态问题:异步推送或聚合器延迟推送时文件可能已被移走 pre_read_wav_b64 = "" if trigger_alert: try: current_pending = cache.get("pending_files", []) if current_pending and current_pending[0].exists(): with open(current_pending[0], "rb") as f: pre_read_wav_b64 = base64.b64encode(f.read()).decode('utf-8') logger.debug(f"预读异常音频成功: {current_pending[0].name} | size={len(pre_read_wav_b64)}") except Exception as e: logger.warning(f"预读异常音频失败: {e}") if trigger_alert and self.alert_aggregator: # 报警走聚合器:跨设备聚合判定 + 分类型冷却 # 聚合器会在窗口到期后决定是否真正推送 self.alert_aggregator.submit_alert( plant_name=stream_config.plant_name, device_code=device_code, anomaly_type_code=anomaly_type_code, push_kwargs=dict( stream_config=stream_config, device_code=device_code, is_anomaly=is_anomaly, trigger_alert=True, abnormal_score=avg_error, score_threshold=threshold, running_status=running_status, inlet_flow=inlet_flow, freq_db=freq_db, freq_middle_db=freq_middle_db, anomaly_type_code=anomaly_type_code, abnormal_wav_b64=pre_read_wav_b64 ) ) else: # 非报警(心跳)或聚合器不可用 -> 提交到线程池异步推送 self._push_executor.submit( self._push_detection_result, stream_config=stream_config, device_code=device_code, is_anomaly=is_anomaly, trigger_alert=trigger_alert, abnormal_score=avg_error, score_threshold=threshold, running_status=running_status, inlet_flow=inlet_flow, freq_db=freq_db, freq_middle_db=freq_middle_db, anomaly_type_code=anomaly_type_code, abnormal_wav_b64=pre_read_wav_b64 ) # 更新上一次状态 self.last_report_status[device_code] = is_anomaly # 日志记录 thr_str = f"{threshold:.6f}" if threshold else "未设置" # 报警去向说明 if trigger_alert and self.alert_aggregator: alert_dest = "-> 聚合器" elif trigger_alert: alert_dest = "-> 直接推送" else: alert_dest = "" # 使用设备名作为标识,增加视觉分隔 cam_label = "" if stream_config: cam_label = f"({stream_config.camera_name})" result_emoji = "!!" if is_anomaly else "OK" alert_str = f"报警=是 {alert_dest}" if trigger_alert else "报警=否" logger.info( f"[{result_emoji}] {device_code}{cam_label} | " f"误差={avg_error:.6f} 阈值={thr_str} | " f"{window_info} | {running_status} | " f"{'异常' if is_anomaly else '正常'} | {alert_str}" ) # ======================================== # 根据单次检测结果归档文件 # 归档基于 is_current_anomaly(单次检测),而非投票后的 is_anomaly # 投票机制只影响是否推送报警,不影响文件分类 # ======================================== pending_files = cache.get("pending_files", []) # 检查是否是新的异常(从正常变为异常) is_new_anomaly = is_anomaly and (last_is_anomaly is None or not last_is_anomaly) # ======================================== # 异常上下文捕获逻辑 # ======================================== if self.context_capture_enabled: # 检查并更新捕获状态 self._update_anomaly_capture_state(device_code, is_anomaly, is_new_anomaly, avg_error, threshold, now, pending_files) if pending_files: if is_current_anomaly: # 单次检测为异常 -> 归档到异常目录 # 检查异常保存冷却时间 last_save = self.last_anomaly_save_time.get(device_code) should_save = True if last_save: elapsed_minutes = (now - last_save).total_seconds() / 60 should_save = elapsed_minutes >= self.anomaly_save_cooldown_minutes if should_save and not self.context_capture_enabled: # 上下文捕获禁用时,使用原有逻辑:移动到异常待排查目录 for f in pending_files: self._move_audio_to_anomaly_pending(f) self.last_anomaly_save_time[device_code] = now logger.warning(f"已隔离 {len(pending_files)} 个异常文件到待排查目录") elif self.context_capture_enabled: # 上下文捕获启用时: # - 异常文件已在 _update_anomaly_capture_state 中记录路径 # - 文件会在 _save_anomaly_context 中被移动到异常目录 # - 这里暂时保留文件,不做处理 pass # 文件由捕获逻辑处理 else: # 冷却时间内,删除文件不保存 for f in pending_files: try: f.unlink() except Exception as e: logger.debug(f"删除冷却期内文件失败: {f.name} | {e}") remaining_minutes = self.anomaly_save_cooldown_minutes - elapsed_minutes logger.debug(f"异常保存冷却中: {device_code} | 剩余 {remaining_minutes:.1f} 分钟") else: # 单次检测为正常 -> 移到日期目录归档 for f in pending_files: self._move_audio_to_date_dir(f) # 状态恢复正常时,清除保存冷却时间(下次异常时可立即保存) if device_code in self.last_anomaly_save_time: del self.last_anomaly_save_time[device_code] # 重置缓存 cache["errors"] = [] cache["audio_data"] = [] cache["pending_files"] = [] cache["last_upload"] = now logger.info("─" * 60) def _update_anomaly_capture_state(self, device_code, is_anomaly, is_new_anomaly, avg_error, threshold, now, pending_files): """ 更新异常上下文捕获状态 状态机: 1. 未触发 -> 检测到新异常 -> 触发捕获,从 audio_file_history 回溯获取文件 2. 已触发,等待后续 -> 持续收集后续文件 3. 已触发,时间到 -> 保存所有文件和元数据 修复说明: - anomaly_files 不再依赖 pending_files(可能为空) - 改为从 audio_file_history 回溯获取触发时刻前 1 分钟内的文件 - pre_files 改为获取触发时刻前 1~N+1 分钟的文件(排除 anomaly 时间段) 参数: device_code: 设备编号 is_anomaly: 当前周期是否异常 is_new_anomaly: 是否是新异常(从正常变为异常) avg_error: 平均重建误差 threshold: 阈值 now: 当前时间 pending_files: 当前周期的待处理文件 """ import shutil import json state = self.anomaly_capture_state.get(device_code) if is_new_anomaly and state is None: # 新异常触发:开始捕获 # ======================================== # 从 audio_file_history 回溯获取文件 # ---------------------------------------- # anomaly_files: 触发时刻前 1 分钟内的文件(最接近异常的时间段) # pre_files: 触发时刻前 1~(N+1) 分钟的文件(更早的正常时间段) # ======================================== anomaly_cutoff = now - timedelta(minutes=1) # 前1分钟 pre_cutoff = now - timedelta(minutes=self.context_pre_minutes + 1) # 前N+1分钟 pre_files = [] anomaly_files = [] for ts, fpath in self.audio_file_history[device_code]: if not fpath.exists(): continue if ts >= anomaly_cutoff: # 触发时刻前1分钟内 -> anomaly_files anomaly_files.append(fpath) elif ts >= pre_cutoff: # 触发时刻前1~(N+1)分钟 -> pre_files pre_files.append(fpath) # 如果 anomaly_files 仍为空,把当前 pending_files 加入(兜底) if not anomaly_files and pending_files: anomaly_files = [f for f in pending_files if f.exists()] # 初始化捕获状态 self.anomaly_capture_state[device_code] = { "trigger_time": now, "avg_error": avg_error, "threshold": threshold, "pre_files": pre_files, "anomaly_files": anomaly_files, "post_files": [], "post_start_time": now } logger.info(f"异常上下文捕获已触发: {device_code} | " f"前置文件={len(pre_files)}个 | 异常文件={len(anomaly_files)}个 | " f"等待后续{self.context_post_minutes}分钟") elif state is not None: # 已触发状态:收集后续文件 elapsed_post = (now - state["post_start_time"]).total_seconds() / 60 if elapsed_post < self.context_post_minutes: # 还在收集后续文件 for f in pending_files: if f.exists(): state["post_files"].append(f) else: # 时间到,保存所有文件 self._save_anomaly_context(device_code, state) # 清除状态 del self.anomaly_capture_state[device_code] # 更新保存冷却时间 self.last_anomaly_save_time[device_code] = now def _save_anomaly_context(self, device_code: str, state: dict): """ 保存异常上下文文件到独立目录 目录结构: data/anomaly_detected/{device_code}/{异常文件名(不含扩展名)}/ ├── 92_1#-1_20260130140313.wav (保持原文件名) ├── ... └── metadata.json metadata.json 字段说明: - before_trigger: 触发前 1~(N+1) 分钟的文件(正常时期,用于对比) - at_trigger: 触发时刻前 1 分钟内的文件(异常开始出现的时期) - after_trigger: 触发后 N 分钟的文件(不管异常是否恢复) 参数: device_code: 设备编号 state: 捕获状态字典 """ import shutil import json try: # 获取第一个异常文件名作为文件夹名 anomaly_files = state.get("anomaly_files", []) if not anomaly_files: logger.warning(f"无异常文件,跳过保存: {device_code}") return # 用第一个异常文件名(不含扩展名)作为文件夹名 first_anomaly = anomaly_files[0] folder_name = first_anomaly.stem # 如 92_1#-1_20260130140313 save_dir = self.save_anomaly_dir / device_code / folder_name save_dir.mkdir(parents=True, exist_ok=True) # 收集所有文件名(使用新命名) all_files = {"before_trigger": [], "at_trigger": [], "after_trigger": []} # 移动前置文件(来自日期目录,移动后原位置不再保留) for fpath in state.get("pre_files", []): if fpath.exists(): dest = save_dir / fpath.name shutil.move(str(fpath), str(dest)) # 移动而非复制,避免重复 all_files["before_trigger"].append(fpath.name) # 移动触发时刻文件(保持原名) for fpath in anomaly_files: if fpath.exists(): dest = save_dir / fpath.name shutil.move(str(fpath), str(dest)) all_files["at_trigger"].append(fpath.name) # 移动后续文件(保持原名) for fpath in state.get("post_files", []): if fpath.exists(): dest = save_dir / fpath.name shutil.move(str(fpath), str(dest)) all_files["after_trigger"].append(fpath.name) # 生成精简的元数据文件 trigger_time = state.get("trigger_time") metadata = { "device_code": device_code, "trigger_time": trigger_time.strftime("%Y-%m-%d %H:%M:%S") if trigger_time else None, "avg_error": round(state.get("avg_error", 0.0), 6), "threshold": round(state.get("threshold", 0.0), 6), "files": all_files } metadata_path = save_dir / "metadata.json" with open(metadata_path, 'w', encoding='utf-8') as f: json.dump(metadata, f, ensure_ascii=False, indent=2) total = sum(len(v) for v in all_files.values()) logger.warning(f"异常上下文已保存: {device_code}/{folder_name} | " f"共{total}个文件 (前{len(all_files['before_trigger'])}+异常{len(all_files['at_trigger'])}+后{len(all_files['after_trigger'])})") except Exception as e: logger.error(f"保存异常上下文失败: {device_code} | {e}") def _compute_frequency_middle(self, device_code): """ 计算历史频谱图平均值(normal_frequency_middle) 参数: device_code: 设备编号 返回: dB值列表的平均值 """ history = self.freq_history.get(device_code, []) if not history or len(history) < 2: return [] try: # 收集所有历史dB值(只保留长度一致的) all_db = [] ref_len = len(history[0][1]) # 使用第一条记录的长度作为参考 for _, db in history: if len(db) == ref_len: all_db.append(db) if not all_db: return [] # 计算各频率点的平均dB avg_db = np.mean(all_db, axis=0).tolist() return avg_db except Exception as e: logger.error(f"计算频谱图历史平均失败: {e}") return [] def _get_threshold(self, device_code): """ 获取指定设备的阈值 优先级: 1. 从 multi_predictor 获取设备对应模型的阈值 2. 使用配置文件中的默认阈值 3. 返回0.0(不进行异常判定) 参数: device_code: 设备编号(如 "LT-1") 返回: 阈值,未找到返回默认值或0.0 """ # 方式1:从 multi_predictor 获取设备阈值 thr = self.multi_predictor.get_threshold(device_code) if thr is not None: logger.debug(f"阈值来源: {device_code} -> 设备模型阈值 = {thr:.6f}") return thr # 方式2:使用配置中的默认阈值 default_threshold = self.config.get('prediction', {}).get('default_threshold', 0.0) if default_threshold > 0: logger.debug(f"阈值来源: {device_code} -> 配置默认值 = {default_threshold:.6f}") return default_threshold # 首次找不到阈值时记录警告 if device_code not in getattr(self, '_threshold_warned', set()): if not hasattr(self, '_threshold_warned'): self._threshold_warned = set() self._threshold_warned.add(device_code) logger.warning(f"未找到阈值: {device_code},将跳过异常判定") return 0.0 def _get_inlet_flow(self, stream_config: RTSPStreamConfig) -> float: """ 获取进水流量(使用实时数据接口) 使用 current-data 接口直接获取最新一条数据 参数: stream_config: 流配置 返回: 进水流量值,失败返回0.0 """ if not self.scada_enabled or not stream_config: logger.debug(f"流量跳过: scada_enabled={self.scada_enabled}, stream_config={stream_config}") return 0.0 # 获取PLC数据点位 plc_address = stream_config.get_flow_plc_address() if not plc_address: logger.debug(f"流量跳过(无PLC地址): {stream_config.device_code} | pump_name='{stream_config.pump_name}'") return 0.0 # 使用水厂配置的 project_id project_id = stream_config.project_id try: # 当前时间戳(毫秒) now_ms = int(datetime.now().timestamp() * 1000) # 请求参数 params = {"time": now_ms} # 请求体:使用实时数据接口格式 request_body = [ { "deviceId": "1", "deviceItems": plc_address, "deviceName": f"流量_{stream_config.pump_name}", "project_id": project_id } ] # 请求头 headers = { "Content-Type": "application/json", "JWT-TOKEN": self.scada_jwt } logger.debug(f"流量请求: {stream_config.device_code} | project_id={project_id} | plc={plc_address}") # 发送 POST 请求到实时接口 response = requests.post( self.scada_realtime_url, params=params, json=request_body, headers=headers, timeout=self.scada_timeout ) if response.status_code == 200: data = response.json() if data.get("code") == 200: if data.get("data"): # 获取第一条数据(实时接口只返回最新一条) latest = data["data"][0] if "val" in latest: flow = float(latest["val"]) logger.debug(f"流量获取成功: {stream_config.device_code} | 流量={flow}") return flow else: logger.debug(f"流量数据无val字段: {stream_config.device_code}") else: # API正常返回但无数据 logger.debug(f"流量查询无数据: {stream_config.device_code}") else: logger.warning(f"流量API返回异常: {stream_config.device_code} | code={data.get('code')} | msg={data.get('msg')}") else: logger.warning(f"流量HTTP错误: {stream_config.device_code} | status={response.status_code}") except Exception as e: logger.warning(f"流量获取异常: {stream_config.device_code} | {e}") return 0.0 def _compute_frequency_spectrum(self, audio_data): """ 计算频谱图数据 将1分钟内的多个8秒音频片段合并,计算整体FFT 参数: audio_data: 音频数据列表 返回: dB值列表(0-8000Hz均匀分布,共400个点) """ if not audio_data: return [] try: # 合并所有音频片段 combined = np.concatenate(audio_data) # 计算FFT n = len(combined) fft_result = np.fft.rfft(combined) freqs = np.fft.rfftfreq(n, 1.0 / CFG.SR) # 计算幅度(转换为dB) magnitude = np.abs(fft_result) # 避免log(0) magnitude = np.maximum(magnitude, 1e-10) db = 20 * np.log10(magnitude) # 降采样到400个点(0-8000Hz均匀分布) max_freq = 8000 num_points = 400 db_list = [] for i in range(num_points): # 计算目标频率(0到8000Hz均匀分布) target_freq = (i / (num_points - 1)) * max_freq # 找到最接近的频率索引 idx = np.argmin(np.abs(freqs - target_freq)) if idx < len(freqs): db_list.append(float(db[idx])) return db_list except Exception as e: logger.error(f"计算频谱图失败: {e}") return [] def _push_detection_result(self, stream_config, device_code, is_anomaly, trigger_alert, abnormal_score, score_threshold, running_status, inlet_flow, freq_db, freq_middle_db=None, anomaly_type_code=6, abnormal_wav_b64=""): """ 推送检测结果到远程服务器 上报格式符合用户要求: - 包含频谱图数据(this_frequency + normal_frequency_middle) - 包含启停状态和进水流量 - sound_detection.status 和 abnormalwav 仅在 trigger_alert=True 时上报异常(0)和base64,否则为正常(1)和空 参数: stream_config: 流配置 device_code: 设备编号 is_anomaly: 实际检测是否异常 trigger_alert: 是否触发报警(仅在新异常产生时为True) abnormal_score: 平均重建误差 score_threshold: 阈值 running_status: 启停状态 inlet_flow: 进水流量 freq_db: 本次频谱图dB值列表 freq_middle_db: 历史平均频谱图dB值列表 abnormal_wav_b64: 预读的异常音频base64编码(在文件归档前读取,避免竞态) """ try: import time as time_module # 获取设备名称 camera_name = stream_config.camera_name # 从 push_base_urls 列表构建推送目标(每个 base_url + /{project_id}) if not self.push_base_urls: logger.warning(f"未配置push_base_urls: {device_code}") return # 获取该设备对应的 project_id,用于拼接最终推送URL project_id = stream_config.project_id # 构建推送消息 request_time = int(time_module.time() * 1000) # 异常音频已通过参数 abnormal_wav_b64 传入(调用方在文件归档前预读) # 如果调用方未提供,记录警告以便排查 if trigger_alert and not abnormal_wav_b64: logger.warning(f"报警推送但无异常音频数据: {device_code}") # 决定 sound_detection.status # 只有在 trigger_alert=True 时才报 0 (异常) # 其他情况(正常、持续异常)都报 1 (正常/空) -- 根据用户要求: "正常情况下 ... 是空 就行" report_status = 0 if trigger_alert else 1 payload = { "message": { # 通道信息 "channelInfo": {"name": camera_name}, # 请求时间戳 "requestTime": request_time, # 分类信息 "classification": { "level_one": 2, # 音频检测大类 "level_two": anomaly_type_code # 异常类型小类(6=未分类, 7=轴承, 8=气蚀, 9=松动, 10=叶轮, 11=阀件) }, # 技能信息 "skillInfo": {"name": "异响检测"}, # 声音检测数据 "sound_detection": { # 异常音频 "abnormalwav": abnormal_wav_b64, # 状态:0=异常(仅新异常), 1=正常(或持续异常) "status": report_status, # 设备状态信息 "condition": { "running_status": "运行中" if running_status == "开机" else "停机中", "inlet_flow": inlet_flow }, # 得分信息 "score": { "abnormal_score": abnormal_score, # 当前1分钟平均重构误差 "score_threshold": score_threshold # 该设备异常阀值 }, # 频谱图数据 "frequency": { "this_frequency": freq_db, # 当前1分钟的频谱图 "normal_frequency_middle": freq_middle_db or [], # 过去10分钟的频谱图平均 "normal_frequency_upper": [], # 上限(暂为空) "normal_frequency_lower": [] # 下限(暂为空) } } } } # 遍历所有推送基地址,逐个拼接 project_id 发送(各目标互不影响) push_targets = [ (f"{item['url']}/{project_id}", item['label']) for item in self.push_base_urls ] # 逐个目标推送(各目标互不影响) for target_url, target_label in push_targets: self._send_to_target( target_url, target_label, payload, device_code, camera_name, trigger_alert, abnormal_score ) except Exception as e: logger.error(f"推送通知异常: {e}") def _send_to_target(self, target_url, target_label, payload, device_code, camera_name, trigger_alert, abnormal_score): # 向单个推送目标发送数据(含重试逻辑) # 各目标独立调用,某个目标失败不影响其他目标 import time as time_module push_success = False for attempt in range(self.push_retry_count + 1): try: response = requests.post( target_url, json=payload, timeout=self.push_timeout, headers={"Content-Type": "application/json"} ) if response.status_code == 200: alert_tag = "报警" if trigger_alert else "心跳" logger.info( f" [{alert_tag}][{target_label}] {device_code}({camera_name}) | " f"误差={abnormal_score:.6f}" ) push_success = True break else: logger.warning( f"推送失败[{target_label}]: {device_code} | URL={target_url} | " f"状态码={response.status_code} | 内容={response.text[:100]}" ) except requests.exceptions.Timeout: logger.warning(f"推送超时[{target_label}]: {device_code} | URL={target_url} | 尝试 {attempt + 1}/{self.push_retry_count + 1}") except requests.exceptions.RequestException as e: logger.warning(f"推送异常[{target_label}]: {device_code} | URL={target_url} | {e}") # 重试间隔 if attempt < self.push_retry_count: time_module.sleep(1) if not push_success: logger.error(f"推送失败[{target_label}]: {device_code} | URL={target_url} | 已达最大重试次数") def _move_audio_to_date_dir(self, wav_file): """ 将音频移动到日期目录归档 目录结构: deploy_pickup/data/{device_code}/{日期}/{文件名} 参数: wav_file: 音频文件路径 """ try: import shutil # 从文件名提取device_code和日期 # 格式: 92_1#-1_20251218142000.wav match = re.match(r'\d+_(.+)_(\d{8})\d{6}\.wav', wav_file.name) if not match: logger.warning(f"无法从文件名提取信息: {wav_file.name}") return device_code = match.group(1) # 如 1#-1 date_str = match.group(2) # YYYYMMDD # 构建目标目录: data/{device_code}/{日期}/ date_dir = self.audio_dir / device_code / date_str date_dir.mkdir(parents=True, exist_ok=True) # 移动文件 dest_file = date_dir / wav_file.name shutil.move(str(wav_file), str(dest_file)) logger.debug(f"音频已归档: {device_code}/{date_str}/{wav_file.name}") except Exception as e: logger.error(f"移动音频失败: {wav_file.name} | 错误: {e}") def _move_audio_to_transition_dir(self, wav_file, reason): """ 将泵停机/过渡期的音频移动到过渡期目录 目录结构: deploy_pickup/data/{device_code}/pump_transition/{文件名} 这些音频不会被用于模型训练,但保留用于分析调试 参数: wav_file: 音频文件路径 reason: 原因标识(stopped=停机, transition=过渡期) """ try: import shutil # 从文件名提取device_code # 格式: 92_1#-1_20251218142000.wav match = re.match(r'\d+_(.+)_\d{14}\.wav', wav_file.name) if not match: logger.warning(f"无法从文件名提取信息: {wav_file.name}") return device_code = match.group(1) # 如 1#-1 # 构建目标目录: data/{device_code}/pump_transition/ transition_dir = self.audio_dir / device_code / "pump_transition" transition_dir.mkdir(parents=True, exist_ok=True) # 移动文件 dest_file = transition_dir / wav_file.name shutil.move(str(wav_file), str(dest_file)) logger.debug(f"过渡期音频已归档: {device_code}/pump_transition/{wav_file.name} ({reason})") except Exception as e: logger.error(f"移动过渡期音频失败: {wav_file.name} | 错误: {e}") def _move_audio_to_anomaly_pending(self, wav_file): """ 将异常音频移动到待排查目录 目录结构: deploy_pickup/data/{device_code}/anomaly_pending/{文件名} 用户确认是误报后,可手动将文件移到日期目录参与增训 参数: wav_file: 音频文件路径 """ try: import shutil # 从文件名提取device_code # 格式: 92_1#-1_20251218142000.wav match = re.match(r'\d+_(.+)_\d{14}\.wav', wav_file.name) if not match: logger.warning(f"无法从文件名提取信息: {wav_file.name}") return device_code = match.group(1) # 如 1#-1 # 构建目标目录: data/{device_code}/anomaly_pending/ pending_dir = self.audio_dir / device_code / "anomaly_pending" pending_dir.mkdir(parents=True, exist_ok=True) # 移动文件 dest_file = pending_dir / wav_file.name shutil.move(str(wav_file), str(dest_file)) logger.debug(f"异常音频已隔离: {device_code}/anomaly_pending/{wav_file.name}") except Exception as e: logger.error(f"移动异常音频失败: {wav_file.name} | 错误: {e}") def _cleanup_old_files(self, days: int = 7): """ 清理超过指定天数的正常音频文件 清理规则: - 只清理日期归档目录(data/{device_code}/{日期}/) - 保留current目录和anomaly_detected目录 - 超过days天的文件删除 参数: days: 保留天数,默认7天 """ try: import shutil from datetime import datetime, timedelta # 计算截止日期 cutoff_date = datetime.now() - timedelta(days=days) cutoff_str = cutoff_date.strftime("%Y%m%d") deleted_count = 0 # 遍历每个设备目录 for device_dir in self.audio_dir.iterdir(): if not device_dir.is_dir(): continue # 检查是否是日期目录(跳过current和其他特殊目录) for subdir in device_dir.iterdir(): if not subdir.is_dir(): continue # 跳过current目录 if subdir.name == "current": continue # 检查是否为日期目录(YYYYMMDD格式) if not re.match(r'^\d{8}$', subdir.name): continue # 如果日期早于截止日期,删除整个目录 if subdir.name < cutoff_str: try: shutil.rmtree(subdir) deleted_count += 1 logger.debug(f"清理过期目录: {device_dir.name}/{subdir.name}") except Exception as e: logger.error(f"删除目录失败: {subdir} | {e}") if deleted_count > 0: logger.info(f"清理完成: 共删除{deleted_count}个过期目录") except Exception as e: logger.error(f"清理过期文件失败: {e}") class PickupMonitoringSystem: """ 拾音器监控系统 管理FFmpeg进程和监控线程 """ def __init__(self, db_path=None): """ 初始化监控系统 参数: db_path: SQLite 数据库路径(为 None 时使用默认路径 config/pickup_config.db) """ self.db_path = db_path # 初始化 ConfigManager actual_db = Path(db_path) if db_path else get_db_path() if not actual_db.exists(): raise FileNotFoundError( f"\u914d\u7f6e\u6570\u636e\u5e93\u4e0d\u5b58\u5728: {actual_db}\n" f"\u8bf7\u5148\u8fd0\u884c\u8fc1\u79fb\u811a\u672c: python tool/migrate_yaml_to_db.py" ) self.config_manager = ConfigManager(str(actual_db)) print(f"\u914d\u7f6e\u6e90: SQLite ({actual_db})") self.config = self._load_config() # 冷启动模式标记 self.cold_start_mode = False # 初始化多模型预测器(支持每个设备独立模型) self.multi_predictor = MultiModelPredictor() self.predictor = None # 兼容性保留,已废弃 # 从配置中注册所有设备的模型目录映射 print("\n正在初始化多模型预测器...") for plant in self.config.get('plants', []): if not plant.get('enabled', False): continue for stream in plant.get('rtsp_streams', []): device_code = stream.get('device_code', '') model_subdir = stream.get('model_subdir', device_code) if device_code and model_subdir: self.multi_predictor.register_device(device_code, model_subdir) print(f" 注册设备: {device_code} -> models/{model_subdir}/") print(f"已注册 {len(self.multi_predictor.registered_devices)} 个设备模型映射") # 进程和监控器列表 self.ffmpeg_processes = [] self.monitors = [] # 信号处理 signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def _load_config(self): """ 从 SQLite DB 加载配置 返回: Dict: 配置字典 """ config = self.config_manager.get_full_config() logger.info("配置已从 SQLite 加载") return config def _parse_rtsp_streams(self): """ 解析配置文件中的RTSP流信息 返回: List[RTSPStreamConfig]: RTSP流配置列表 """ streams = [] plants = self.config.get('plants', []) if not plants: raise ValueError("配置文件中未找到水厂配置") for plant in plants: plant_name = plant.get('name') if not plant_name: print("警告: 跳过未命名的区域配置") continue # 检查是否启用该水厂(默认启用以兼容旧配置) if not plant.get('enabled', True): logger.debug(f"跳过禁用的水厂: {plant_name}") continue # 获取流量PLC配置 flow_plc = plant.get('flow_plc', {}) # 获取该水厂的project_id(每个plant有自己的project_id) project_id = plant.get('project_id', 92) logger.info(f"加载区域配置: {plant_name} | project_id={project_id}") rtsp_streams = plant.get('rtsp_streams', []) for stream in rtsp_streams: url = stream.get('url') channel = stream.get('channel') camera_name = stream.get('name', '') device_code = stream.get('device_code', '') pump_name = stream.get('pump_name', '') if not url or channel is None: print(f"警告: 跳过不完整的RTSP流配置 (区域: {plant_name})") continue streams.append(RTSPStreamConfig( plant_name=plant_name, rtsp_url=url, channel=channel, camera_name=camera_name, device_code=device_code, pump_name=pump_name, flow_plc=flow_plc, project_id=project_id )) return streams def start(self): """ 启动监控系统 """ print("=" * 70) print("拾音器异响检测系统") print("=" * 70) # 解析流配置 streams = self._parse_rtsp_streams() print(f"\n共配置 {len(streams)} 个拾音设备:") for stream in streams: print(f" - {stream.device_code} | {stream.camera_name}") # 启动FFmpeg进程 print("\n启动FFmpeg进程...") for stream in streams: ffmpeg = FFmpegProcess(stream, CFG.AUDIO_DIR, self.config) if ffmpeg.start(): self.ffmpeg_processes.append(ffmpeg) else: print(f"警告: FFmpeg启动失败,跳过该流: {stream}") if not self.ffmpeg_processes: print("\n错误: 所有FFmpeg进程均启动失败") sys.exit(1) print(f"\n成功启动 {len(self.ffmpeg_processes)}/{len(streams)} 个FFmpeg进程") # 收集所有流配置(用于PickupMonitor) all_stream_configs = [p.stream_config for p in self.ffmpeg_processes] # 启动监控线程(统一一个监控器) print("\n启动监控线程...") check_interval = self.config.get('prediction', {}).get('check_interval', 1.0) monitor = PickupMonitor( audio_dir=CFG.AUDIO_DIR, multi_predictor=self.multi_predictor, stream_configs=all_stream_configs, check_interval=check_interval, config=self.config, config_manager=self.config_manager ) monitor.start() self.monitors.append(monitor) print(f"\n成功启动监控线程") print("\n" + "=" * 70) print("系统已启动,开始监控...") print("按 Ctrl+C 停止系统") print("=" * 70 + "\n") # FFmpeg重启配置 max_restart_attempts = 5 # 单个进程最大重启次数 restart_interval_base = 30 # 基础重启间隔(秒)-> 改为30s restart_counts = {id(p): 0 for p in self.ffmpeg_processes} # 重启计数 # 主循环(带自动重启) try: while True: # 检查每个FFmpeg进程状态 for ffmpeg in self.ffmpeg_processes: if not ffmpeg.is_running(): pid = id(ffmpeg) device_code = ffmpeg.stream_config.device_code # 检查重启次数 if restart_counts.get(pid, 0) < max_restart_attempts: # 计算等待时间(指数退避) wait_time = restart_interval_base * (2 ** restart_counts.get(pid, 0)) logger.warning(f"FFmpeg进程停止: {device_code} | 将在{wait_time}秒后重启 | " f"重试次数: {restart_counts.get(pid, 0) + 1}/{max_restart_attempts}") time.sleep(wait_time) # 尝试重启 if ffmpeg.start(): logger.debug(f"FFmpeg重启成功: {device_code}") restart_counts[pid] = 0 # 重置计数 else: restart_counts[pid] = restart_counts.get(pid, 0) + 1 logger.error(f"FFmpeg重启失败: {device_code}") else: logger.error(f"FFmpeg达到最大重启次数: {device_code} | 已放弃重启") # 检查是否所有进程都已放弃 running_count = sum(1 for p in self.ffmpeg_processes if p.is_running()) all_abandoned = all(restart_counts.get(id(p), 0) >= max_restart_attempts for p in self.ffmpeg_processes if not p.is_running()) if running_count == 0 and all_abandoned: print("\n错误: 所有FFmpeg进程均已停止且无法重启") break # 每天0点执行7天清理 current_hour = datetime.now().hour current_date = datetime.now().strftime("%Y%m%d") if not hasattr(self, '_last_cleanup_date'): self._last_cleanup_date = "" # 在0点且今天还没清理过时执行 if current_hour == 0 and self._last_cleanup_date != current_date: logger.info("执行7天过期文件清理...") for monitor in self.monitors: monitor._cleanup_old_files(days=7) self._last_cleanup_date = current_date # 定期打印RTSP状态(每分钟一次) if not hasattr(self, '_last_status_log'): self._last_status_log = datetime.now() if (datetime.now() - self._last_status_log).total_seconds() >= 60: running = sum(1 for p in self.ffmpeg_processes if p.is_running()) total = len(self.ffmpeg_processes) logger.info(f"RTSP状态: {running}/{total} 个FFmpeg进程运行中") logger.info("─" * 60) self._last_status_log = datetime.now() time.sleep(10) except KeyboardInterrupt: print("\n\n收到停止信号,正在关闭系统...") finally: self.stop() def stop(self): """ 停止监控系统 """ print("\n正在停止监控系统...") print("停止监控线程...") for monitor in self.monitors: monitor.stop() print("停止FFmpeg进程...") for ffmpeg in self.ffmpeg_processes: ffmpeg.stop() print("系统已完全停止") def _signal_handler(self, signum, frame): """ 信号处理函数 """ print(f"\n\n收到信号 {signum},正在关闭系统...") self.stop() sys.exit(0) def _start_config_api_server(config_manager, multi_predictor=None, port=18080): # 在后台线程中启动 FastAPI 配置管理 API try: import uvicorn init_config_api(config_manager, multi_predictor) logger.info(f"启动配置管理 API: http://0.0.0.0:{port}") uvicorn.run(config_app, host="0.0.0.0", port=port, log_level="warning") except ImportError: logger.warning("uvicorn 未安装,配置管理 API 无法启动。请安装: pip install uvicorn") except Exception as e: logger.error(f"配置管理 API 启动失败: {e}") def main(): """ 主函数 """ # 检测 DB 是否存在 db_path = get_db_path(Path(__file__).parent / "config") if not db_path.exists(): print(f"错误: 配置数据库不存在: {db_path}") print(f"\n请先运行迁移脚本: python tool/migrate_yaml_to_db.py") sys.exit(1) try: system = PickupMonitoringSystem() # 后台线程启动配置管理 API api_port = 18080 api_thread = threading.Thread( target=_start_config_api_server, args=(system.config_manager, system.multi_predictor, api_port), daemon=True, name="config-api" ) api_thread.start() system.start() except FileNotFoundError as e: print(f"\n错误: {e}") print("\n请确保:") print("1. 已完成训练并计算阈值") print("2. 已复制必要的模型文件到 models/ 目录") sys.exit(1) except Exception as e: print(f"\n严重错误: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()