|
|
@@ -0,0 +1,2401 @@
|
|
|
+#!/usr/bin/env python
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+run_pickup_monitor.py
|
|
|
+---------------------
|
|
|
+拾音器异响检测主程序
|
|
|
+
|
|
|
+功能说明:
|
|
|
+该程序用于音频采集设备(拾音器)的实时异常检测。
|
|
|
+与摄像头版本不同,本版本:
|
|
|
+1. 没有视频/图片采集和上报
|
|
|
+2. 上报时包含音频频谱图分析数据
|
|
|
+3. 支持进水流量PLC数据读取
|
|
|
+4. 每1分钟计算一次平均重建误差并上报
|
|
|
+
|
|
|
+上报数据结构:
|
|
|
+{
|
|
|
+ "message": {
|
|
|
+ "channelInfo": {"name": "设备信息"},
|
|
|
+ "requestTime": 时间戳,
|
|
|
+ "classification": {
|
|
|
+ "level_one": 2, // 音频检测大类
|
|
|
+ "level_two": 6 // 异常类型小类: 6=未分类, 7=轴承, 8=气蚀, 9=松动/共振, 10=叶轮, 11=阀件
|
|
|
+ },
|
|
|
+ "skillInfo": {"name": "异响检测"},
|
|
|
+ "sound_detection": {
|
|
|
+ "abnormalwav": "", // base64编码的音频
|
|
|
+ "status": 0/1, // 0=异常, 1=正常
|
|
|
+ "condition": {
|
|
|
+ "running_status": "运行中/停机中", // 启停状态
|
|
|
+ "inlet_flow": 0.0 // 进水流量
|
|
|
+ },
|
|
|
+ "score": {
|
|
|
+ "abnormal_score": 0.0, // 当前1分钟内8s平均重构误差
|
|
|
+ "score_threshold": 0.0 // 该设备的异常阀值
|
|
|
+ },
|
|
|
+ "frequency": {
|
|
|
+ "this_frequency": [], // 当前1分钟的频谱图
|
|
|
+ "normal_frequency_middle": [], // 过去10分钟的频谱图平均
|
|
|
+ "normal_frequency_upper": [], // 上限(暂为空)
|
|
|
+ "normal_frequency_lower": [] // 下限(暂为空)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+使用方法:
|
|
|
+ python run_pickup_monitor.py
|
|
|
+
|
|
|
+配置文件:
|
|
|
+ config/rtsp_config.yaml
|
|
|
+"""
|
|
|
+
|
|
|
+import subprocess
|
|
|
+import time
|
|
|
+import re
|
|
|
+import signal
|
|
|
+import sys
|
|
|
+import threading
|
|
|
+import logging
|
|
|
+import base64
|
|
|
+from concurrent.futures import ThreadPoolExecutor
|
|
|
+from pathlib import Path
|
|
|
+
|
|
|
+from datetime import datetime, timedelta
|
|
|
+from collections import defaultdict
|
|
|
+
|
|
|
+# 用于计算FFT
|
|
|
+import numpy as np
|
|
|
+
|
|
|
+
|
|
|
+try:
|
|
|
+ import librosa
|
|
|
+except ImportError:
|
|
|
+ print("错误:缺少librosa库,请安装: pip install librosa")
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+try:
|
|
|
+ import requests
|
|
|
+except ImportError:
|
|
|
+ print("错误:缺少requests库,请安装: pip install requests")
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+# 导入预测器模块
|
|
|
+from predictor import MultiModelPredictor, CFG
|
|
|
+
|
|
|
+# 导入配置管理模块(SQLite)
|
|
|
+from config.config_manager import ConfigManager
|
|
|
+from config.config_api import app as config_app, init_config_api
|
|
|
+from config.db_models import get_db_path
|
|
|
+
|
|
|
+# 导入泵状态监控模块(用于检测启停过渡期)
|
|
|
+try:
|
|
|
+ from core.pump_state_monitor import PumpStateMonitor
|
|
|
+ PUMP_STATE_MONITOR_AVAILABLE = True
|
|
|
+except ImportError:
|
|
|
+ PUMP_STATE_MONITOR_AVAILABLE = False
|
|
|
+
|
|
|
+# 导入人体检测读取模块(用于抑制有人时的误报)
|
|
|
+try:
|
|
|
+ from core.human_detection_reader import HumanDetectionReader
|
|
|
+ HUMAN_DETECTION_AVAILABLE = True
|
|
|
+except ImportError:
|
|
|
+ HUMAN_DETECTION_AVAILABLE = False
|
|
|
+
|
|
|
+# 导入报警聚合器(跨设备聚合抑制 + 分类型冷却)
|
|
|
+try:
|
|
|
+ from core.alert_aggregator import AlertAggregator
|
|
|
+ ALERT_AGGREGATOR_AVAILABLE = True
|
|
|
+except ImportError:
|
|
|
+ ALERT_AGGREGATOR_AVAILABLE = False
|
|
|
+
|
|
|
+
|
|
|
+# ========================================
|
|
|
+# 配置日志系统
|
|
|
+# ========================================
|
|
|
+def setup_logging():
|
|
|
+ # 配置日志系统(RotatingFileHandler -> system.log)
|
|
|
+ from logging.handlers import RotatingFileHandler
|
|
|
+
|
|
|
+ # 如果根 logger 已经被上层调用者配置过,则直接复用
|
|
|
+ root = logging.getLogger()
|
|
|
+ if root.handlers:
|
|
|
+ return logging.getLogger('PickupMonitor')
|
|
|
+
|
|
|
+ # 日志配置
|
|
|
+ log_dir = Path(__file__).parent / "logs"
|
|
|
+ log_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+ log_file = log_dir / "system.log"
|
|
|
+
|
|
|
+ formatter = logging.Formatter(
|
|
|
+ '%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s',
|
|
|
+ datefmt='%Y-%m-%d %H:%M:%S'
|
|
|
+ )
|
|
|
+
|
|
|
+ # 按文件大小轮转,最多保留 2 个备份(共 30MB)
|
|
|
+ file_handler = RotatingFileHandler(
|
|
|
+ log_file,
|
|
|
+ maxBytes=10 * 1024 * 1024,
|
|
|
+ backupCount=2,
|
|
|
+ encoding='utf-8'
|
|
|
+ )
|
|
|
+ file_handler.setFormatter(formatter)
|
|
|
+
|
|
|
+ # 控制台输出(前台运行时可见,后台运行时 stdout 已被丢弃不影响)
|
|
|
+ console_handler = logging.StreamHandler(sys.stdout)
|
|
|
+ console_handler.setFormatter(formatter)
|
|
|
+
|
|
|
+ logging.basicConfig(
|
|
|
+ level=logging.INFO,
|
|
|
+ handlers=[file_handler, console_handler]
|
|
|
+ )
|
|
|
+
|
|
|
+ return logging.getLogger('PickupMonitor')
|
|
|
+
|
|
|
+
|
|
|
+# 初始化日志系统
|
|
|
+logger = setup_logging()
|
|
|
+
|
|
|
+# 导入能量基线模块
|
|
|
+try:
|
|
|
+ from core.energy_baseline import EnergyBaseline
|
|
|
+ ENERGY_BASELINE_AVAILABLE = True
|
|
|
+except ImportError:
|
|
|
+ ENERGY_BASELINE_AVAILABLE = False
|
|
|
+ logger.warning("能量基线模块未找到,泵状态检测功能禁用")
|
|
|
+
|
|
|
+
|
|
|
+class RTSPStreamConfig:
|
|
|
+ """
|
|
|
+ RTSP流配置类
|
|
|
+
|
|
|
+ 封装单个RTSP流的配置信息,包含拾音器特有字段
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self, plant_name, rtsp_url, channel,
|
|
|
+ camera_name, device_code, pump_name,
|
|
|
+ flow_plc, project_id):
|
|
|
+ """
|
|
|
+ 初始化RTSP流配置
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ plant_name: 区域名称(泵房-反渗透高压泵等)
|
|
|
+ rtsp_url: RTSP流URL
|
|
|
+ channel: 通道号
|
|
|
+ camera_name: 设备名称
|
|
|
+ device_code: 设备编号(如1#-1)
|
|
|
+ pump_name: 泵名称(A/B/C/D),用于匹配流量PLC
|
|
|
+ flow_plc: 流量PLC地址映射
|
|
|
+ """
|
|
|
+ self.plant_name = plant_name
|
|
|
+ self.rtsp_url = rtsp_url
|
|
|
+ self.channel = channel
|
|
|
+ self.pump_id = f"ch{channel}"
|
|
|
+ self.camera_name = camera_name or f"ch{channel}"
|
|
|
+ self.device_code = device_code
|
|
|
+ self.pump_name = pump_name
|
|
|
+ self.flow_plc = flow_plc or {}
|
|
|
+ self.project_id = project_id
|
|
|
+
|
|
|
+ def get_flow_plc_address(self):
|
|
|
+ """
|
|
|
+ 获取该设备对应的进水流量PLC地址
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ PLC地址字符串,不存在则返回空字符串
|
|
|
+ """
|
|
|
+ if self.pump_name and self.flow_plc:
|
|
|
+ return self.flow_plc.get(self.pump_name, "")
|
|
|
+ return ""
|
|
|
+
|
|
|
+ def __repr__(self):
|
|
|
+ return f"RTSPStreamConfig(plant='{self.plant_name}', camera='{self.camera_name}', pump='{self.pump_id}')"
|
|
|
+
|
|
|
+
|
|
|
+class FFmpegProcess:
|
|
|
+ """
|
|
|
+ FFmpeg进程管理类
|
|
|
+
|
|
|
+ 负责启动和管理单个RTSP流的FFmpeg进程。
|
|
|
+ 进程将RTSP流转换为固定时长的WAV音频文件。
|
|
|
+
|
|
|
+ 文件名格式: {project_id}_{device_code}_{时间戳}.wav
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self, stream_config, output_dir, config=None):
|
|
|
+ """
|
|
|
+ 初始化FFmpeg进程管理器
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ stream_config: RTSP流配置
|
|
|
+ output_dir: 音频输出目录
|
|
|
+ config: 全局配置字典
|
|
|
+ """
|
|
|
+ self.config_dict = config or {}
|
|
|
+ self.stream_config = stream_config
|
|
|
+ self.output_dir = output_dir
|
|
|
+ self.process = None
|
|
|
+
|
|
|
+ # 从配置中读取文件时长,默认8秒
|
|
|
+ audio_cfg = self.config_dict.get('audio', {})
|
|
|
+ self.file_duration = audio_cfg.get('file_duration', 8)
|
|
|
+
|
|
|
+ # 获取project_id(从 stream_config 中读取)
|
|
|
+ self.project_id = stream_config.project_id
|
|
|
+
|
|
|
+ def start(self):
|
|
|
+ """
|
|
|
+ 启动FFmpeg进程
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ bool: 启动成功返回True,失败返回False
|
|
|
+ """
|
|
|
+ # 获取设备编号
|
|
|
+ device_code = self.stream_config.device_code or self.stream_config.pump_id
|
|
|
+
|
|
|
+ # 创建输出目录(每个设备独立目录)
|
|
|
+ # 结构: data/{device_code}/current/
|
|
|
+ current_dir = self.output_dir / device_code / "current"
|
|
|
+ current_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+
|
|
|
+ # 构建输出文件名模板
|
|
|
+ # 格式: {project_id}_{device_code}_{时间戳}.wav
|
|
|
+ # 例如: 92_1#-1_20251218142000.wav
|
|
|
+ output_pattern = str(current_dir / f"{self.project_id}_{device_code}_%Y%m%d%H%M%S.wav")
|
|
|
+
|
|
|
+ # 构建FFmpeg命令
|
|
|
+ # 添加内存限制参数,防止 RTSP 缓冲区无限增长导致 OOM
|
|
|
+ cmd = [
|
|
|
+ "ffmpeg",
|
|
|
+ # RTSP 输入参数(内存限制)
|
|
|
+ "-rtsp_transport", "tcp", # 使用TCP传输
|
|
|
+ "-probesize", "1000000", # 限制探测大小为1MB(默认5MB)
|
|
|
+ "-analyzeduration", "1000000", # 限制分析时长为1秒(默认5秒)
|
|
|
+ "-max_delay", "500000", # 最大延迟500ms
|
|
|
+ "-fflags", "nobuffer", # 禁用输入缓冲
|
|
|
+ "-flags", "low_delay", # 低延迟模式
|
|
|
+ "-i", self.stream_config.rtsp_url, # 输入RTSP流
|
|
|
+ # 音频输出参数
|
|
|
+ "-vn", # 不处理视频
|
|
|
+ "-ac", "1", # 单声道
|
|
|
+ "-ar", str(CFG.SR), # 采样率(16000Hz)
|
|
|
+ "-f", "segment", # 分段模式
|
|
|
+ "-segment_time", str(int(self.file_duration)), # 每段时长
|
|
|
+ "-strftime", "1", # 启用时间格式化
|
|
|
+ "-loglevel", "error", # 只输出错误日志
|
|
|
+ "-y", # 覆盖已存在的文件
|
|
|
+ output_pattern,
|
|
|
+ ]
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 启动FFmpeg进程
|
|
|
+ self.process = subprocess.Popen(
|
|
|
+ cmd,
|
|
|
+ stdout=subprocess.PIPE,
|
|
|
+ stderr=subprocess.PIPE
|
|
|
+ )
|
|
|
+
|
|
|
+ logger.info(f"FFmpeg已启动: {device_code} | {self.stream_config.camera_name} | "
|
|
|
+ f"文件时长: {self.file_duration}秒 | PID={self.process.pid}")
|
|
|
+ return True
|
|
|
+
|
|
|
+ except FileNotFoundError:
|
|
|
+ logger.error("FFmpeg错误: 未找到ffmpeg命令,请确保已安装FFmpeg")
|
|
|
+ return False
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"FFmpeg启动失败: {device_code} | 错误: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def is_running(self):
|
|
|
+ """
|
|
|
+ 检查FFmpeg进程是否在运行
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ bool: 进程运行中返回True,否则返回False
|
|
|
+ """
|
|
|
+ if self.process is None:
|
|
|
+ return False
|
|
|
+ return self.process.poll() is None
|
|
|
+
|
|
|
+ def stop(self):
|
|
|
+ """
|
|
|
+ 停止FFmpeg进程
|
|
|
+ """
|
|
|
+ if self.process is not None and self.is_running():
|
|
|
+ logger.info(f"FFmpeg停止: {self.stream_config.plant_name} | {self.stream_config.camera_name} | PID={self.process.pid}")
|
|
|
+ self.process.terminate()
|
|
|
+ try:
|
|
|
+ self.process.wait(timeout=5)
|
|
|
+ except subprocess.TimeoutExpired:
|
|
|
+ logger.warning(f"FFmpeg强制终止: PID={self.process.pid}")
|
|
|
+ self.process.kill()
|
|
|
+
|
|
|
+
|
|
|
+class PickupMonitor:
|
|
|
+ """
|
|
|
+ 拾音器监控线程类
|
|
|
+
|
|
|
+ 监控音频目录,调用预测器检测异常,推送告警。
|
|
|
+
|
|
|
+ 主要功能:
|
|
|
+ 1. 不截取视频帧(纯音频)
|
|
|
+ 2. 计算并上报频谱图数据(this_frequency + normal_frequency_middle)
|
|
|
+ 3. 每分钟汇总上报一次
|
|
|
+ 4. 使用SCADA API获取进水流量
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self, audio_dir, multi_predictor,
|
|
|
+ stream_configs,
|
|
|
+ check_interval=1.0, config=None,
|
|
|
+ config_manager=None):
|
|
|
+ """
|
|
|
+ 初始化监控器
|
|
|
+
|
|
|
+ Args:
|
|
|
+ audio_dir: 音频根目录
|
|
|
+ multi_predictor: 多模型预测器实例
|
|
|
+ stream_configs: 所有RTSP流配置
|
|
|
+ check_interval: 检查间隔(秒)
|
|
|
+ config: 配置字典
|
|
|
+ config_manager: ConfigManager 实例(用于热更新,为 None 时禁用热更新)
|
|
|
+ """
|
|
|
+ # 音频根目录(各设备目录在其下)
|
|
|
+ self.audio_dir = audio_dir
|
|
|
+ self.multi_predictor = multi_predictor
|
|
|
+ self.predictor = None # 兼容性保留,已废弃
|
|
|
+ self.stream_configs = stream_configs
|
|
|
+ self.check_interval = check_interval
|
|
|
+ self.config = config or {}
|
|
|
+
|
|
|
+ # 热更新:持有 ConfigManager 引用,定期从 DB 刷新配置
|
|
|
+ self._config_manager = config_manager
|
|
|
+ self._last_config_reload = 0 # 上次配置刷新时间戳
|
|
|
+ self._config_reload_interval = 30 # 配置刷新间隔(秒)
|
|
|
+
|
|
|
+ # project_id
|
|
|
+ self.project_id = self.config.get('platform', {}).get('project_id', 92)
|
|
|
+
|
|
|
+ # 构建device_code到stream_config的映射
|
|
|
+ self.device_map = {}
|
|
|
+ for cfg in stream_configs:
|
|
|
+ if cfg.device_code:
|
|
|
+ self.device_map[cfg.device_code] = cfg
|
|
|
+
|
|
|
+ # 已处理文件集合
|
|
|
+ self.seen_files = set()
|
|
|
+
|
|
|
+ # 每个设备的检测结果缓存(用于1分钟汇总)
|
|
|
+ # key: device_code(如 "1#-1")
|
|
|
+ self.device_cache = defaultdict(lambda: {
|
|
|
+ "errors": [], # 每8秒的重建误差列表
|
|
|
+ "last_upload": None, # 上次上报时间
|
|
|
+ "audio_data": [], # 用于计算频谱图的音频数据
|
|
|
+ "status": None # 最近的运行状态
|
|
|
+ })
|
|
|
+
|
|
|
+ # 频谱图历史缓存(用于计算normal_frequency_middle)
|
|
|
+ # key: device_code, value: list of (timestamp, freq_db)
|
|
|
+ freq_cfg = self.config.get('prediction', {}).get('frequency_history', {})
|
|
|
+ self.freq_history_enabled = freq_cfg.get('enabled', True)
|
|
|
+ self.freq_history_minutes = freq_cfg.get('history_minutes', 10)
|
|
|
+ self.freq_history = defaultdict(list)
|
|
|
+
|
|
|
+ # 上一次上报状态(True=异常,False=正常,None=初始)
|
|
|
+ # 用于状态变更时去重,防止持续报警
|
|
|
+ self.last_report_status = {}
|
|
|
+
|
|
|
+ # 上报周期(秒)
|
|
|
+ audio_cfg = self.config.get('audio', {})
|
|
|
+ self.segment_duration = audio_cfg.get('segment_duration', 60)
|
|
|
+
|
|
|
+ # 异常音频保存配置
|
|
|
+ save_cfg = self.config.get('prediction', {}).get('save_anomaly_audio', {})
|
|
|
+ self.save_anomaly_enabled = save_cfg.get('enabled', True)
|
|
|
+ self.save_anomaly_dir = Path(__file__).parent / save_cfg.get('save_dir', 'data/anomaly_detected')
|
|
|
+ if self.save_anomaly_enabled:
|
|
|
+ self.save_anomaly_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+
|
|
|
+ # 异常推送配置
|
|
|
+ push_cfg = self.config.get('push_notification', {})
|
|
|
+ self.push_enabled = push_cfg.get('enabled', False)
|
|
|
+ self.alert_enabled = push_cfg.get('alert_enabled', True) # 是否启用异常告警(false=暂时禁用异常上报)
|
|
|
+ self.push_timeout = push_cfg.get('timeout', 30)
|
|
|
+ self.push_retry_count = push_cfg.get('retry_count', 2)
|
|
|
+ # 推送基地址列表(统一用 base_url/{project_id} 拼接,可无限扩展)
|
|
|
+ raw_urls = push_cfg.get('push_base_urls', [])
|
|
|
+ self.push_base_urls = [
|
|
|
+ {"label": item.get("label", f"target-{i}"), "url": item.get("url", "").rstrip("/")}
|
|
|
+ for i, item in enumerate(raw_urls)
|
|
|
+ if item.get("url") # 跳过空URL
|
|
|
+ ]
|
|
|
+ # 推送失败记录文件路径
|
|
|
+ failed_log_path = push_cfg.get('failed_push_log', 'data/push_failures.jsonl')
|
|
|
+ self.failed_push_log = Path(__file__).parent / failed_log_path
|
|
|
+ self.failed_push_log.parent.mkdir(parents=True, exist_ok=True)
|
|
|
+
|
|
|
+ # 如果 alert_enabled 为 False,记录日志提醒
|
|
|
+ if not self.alert_enabled:
|
|
|
+ logger.warning("异常告警已禁用(alert_enabled=false),仅上报心跳状态")
|
|
|
+
|
|
|
+ # ========================================
|
|
|
+ # 报警聚合器(替代原有固定 cooldown_minutes)
|
|
|
+ # ----------------------------------------
|
|
|
+ # 功能1:跨设备聚合抑制 - 同一水厂多设备同时报警 -> 环境噪声,全部抑制
|
|
|
+ # 功能2:分类型冷却 - 同类型24h,不同类型1h
|
|
|
+ # ========================================
|
|
|
+ agg_cfg = push_cfg.get('alert_aggregate', {})
|
|
|
+ self.alert_aggregator = None
|
|
|
+ if ALERT_AGGREGATOR_AVAILABLE:
|
|
|
+ self.alert_aggregator = AlertAggregator(
|
|
|
+ push_callback=self._push_detection_result,
|
|
|
+ aggregate_enabled=agg_cfg.get('enabled', True),
|
|
|
+ window_seconds=agg_cfg.get('window_seconds', 300),
|
|
|
+ min_devices=agg_cfg.get('min_devices', 2),
|
|
|
+ cooldown_same_type_hours=push_cfg.get('cooldown_same_type_hours', 24),
|
|
|
+ cooldown_diff_type_hours=push_cfg.get('cooldown_diff_type_hours', 1)
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ logger.warning("报警聚合器模块未找到,使用默认报警逻辑")
|
|
|
+
|
|
|
+ # 上次异常音频保存时间(用于保存冷却时间计算)
|
|
|
+ self.last_anomaly_save_time = {}
|
|
|
+ # 异常保存冷却时间(分钟),同一设备连续异常时,每N分钟只保存一次
|
|
|
+ self.anomaly_save_cooldown_minutes = save_cfg.get('cooldown_minutes', 10)
|
|
|
+
|
|
|
+ # 当前异常分类结果锁定(持续异常期间保持分类结果不变)
|
|
|
+ # key: device_code, value: (anomaly_type_code, type_name)
|
|
|
+ self.locked_anomaly_type = {}
|
|
|
+
|
|
|
+ # 滑动窗口投票配置(5次中有3次异常才判定为异常)
|
|
|
+ voting_cfg = self.config.get('prediction', {}).get('voting', {})
|
|
|
+ self.voting_enabled = voting_cfg.get('enabled', True)
|
|
|
+ self.voting_window_size = voting_cfg.get('window_size', 5) # 窗口大小
|
|
|
+ self.voting_threshold = voting_cfg.get('threshold', 3) # 异常阈值(>=3次则判定异常)
|
|
|
+ self.detection_history = {} # 每个设备的检测历史(True=异常)
|
|
|
+
|
|
|
+ # 阈值容差区间配置(避免边界值反复跳变)
|
|
|
+ # 误差在 threshold*(1-tolerance) ~ threshold*(1+tolerance) 范围内为灰区,维持上一状态
|
|
|
+ self.tolerance_ratio = voting_cfg.get('tolerance_ratio', 0.05) # 默认5%容差
|
|
|
+ self.last_single_anomaly = {} # 每个设备上一次的单周期判定结果
|
|
|
+
|
|
|
+ # 阈值现在由 multi_predictor 管理,每个设备从其对应模型目录加载
|
|
|
+
|
|
|
+ # 能量检测配置
|
|
|
+ energy_cfg = self.config.get('prediction', {}).get('energy_detection', {})
|
|
|
+ self.energy_detection_enabled = ENERGY_BASELINE_AVAILABLE and energy_cfg.get('enabled', True)
|
|
|
+ self.skip_detection_when_stopped = energy_cfg.get('skip_when_stopped', True)
|
|
|
+
|
|
|
+ # 能量基线检测器(每个设备一个)
|
|
|
+ if self.energy_detection_enabled:
|
|
|
+ self.energy_baselines = {}
|
|
|
+ else:
|
|
|
+ self.energy_baselines = None
|
|
|
+
|
|
|
+ # SCADA API配置(用于获取泵状态和进水流量)
|
|
|
+ scada_cfg = self.config.get('scada_api', {})
|
|
|
+ self.scada_enabled = scada_cfg.get('enabled', False)
|
|
|
+ self.scada_url = scada_cfg.get('base_url', '') # 历史数据接口(备用)
|
|
|
+ self.scada_realtime_url = scada_cfg.get('realtime_url', '') # 实时数据接口(主用)
|
|
|
+ self.scada_jwt = scada_cfg.get('jwt_token', '')
|
|
|
+ self.scada_timeout = scada_cfg.get('timeout', 10)
|
|
|
+
|
|
|
+ # 泵状态监控器(用于检测启停过渡期)
|
|
|
+ self.pump_state_monitor = None
|
|
|
+ self.pump_status_plc_configs = {} # {pump_name: [{point, name}, ...]}
|
|
|
+ if PUMP_STATE_MONITOR_AVAILABLE and self.scada_enabled:
|
|
|
+ # 初始化泵状态监控器
|
|
|
+ # 获取第一个启用水厂的project_id
|
|
|
+ project_id = 0
|
|
|
+ for plant in self.config.get('plants', []):
|
|
|
+ if plant.get('enabled', False):
|
|
|
+ project_id = plant.get('project_id', 0)
|
|
|
+ # 加载泵状态点位配置
|
|
|
+ pump_status_plc = plant.get('pump_status_plc', {})
|
|
|
+ self.pump_status_plc_configs = pump_status_plc
|
|
|
+ break
|
|
|
+
|
|
|
+ if project_id > 0:
|
|
|
+ # 使用实时接口 URL 进行泵状态查询
|
|
|
+ self.pump_state_monitor = PumpStateMonitor(
|
|
|
+ scada_url=self.scada_realtime_url, # 使用实时数据接口
|
|
|
+ scada_jwt=self.scada_jwt,
|
|
|
+ project_id=project_id,
|
|
|
+ timeout=self.scada_timeout,
|
|
|
+ transition_window_minutes=15 # 启停后15分钟内视为过渡期
|
|
|
+ )
|
|
|
+ logger.info(f"泵状态监控器已启用 (project_id={project_id}, 过渡期窗口=15分钟, 使用实时接口)")
|
|
|
+
|
|
|
+ # 线程控制
|
|
|
+ self.running = False
|
|
|
+ self.thread = None
|
|
|
+ # 推送线程池(避免推送超时阻塞主监控循环)
|
|
|
+ self._push_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="push")
|
|
|
+
|
|
|
+ # 启动时间(用于跳过启动期间的状态变化日志)
|
|
|
+ self.startup_time = None
|
|
|
+ self.startup_warmup_seconds = 120 # 启动后120秒内不记录状态变化
|
|
|
+
|
|
|
+ # ========================================
|
|
|
+ # 异常上下文捕获配置
|
|
|
+ # ========================================
|
|
|
+ context_cfg = save_cfg.get('context_capture', {})
|
|
|
+ self.context_capture_enabled = context_cfg.get('enabled', False)
|
|
|
+ self.context_pre_minutes = context_cfg.get('pre_minutes', 2)
|
|
|
+ self.context_post_minutes = context_cfg.get('post_minutes', 2)
|
|
|
+
|
|
|
+ # 音频文件历史缓存(用于捕获异常前的音频)
|
|
|
+ # key: device_code, value: deque of (timestamp, file_path)
|
|
|
+ from collections import deque
|
|
|
+ # 计算需要保留的历史文件数量(按分钟计算,每分钟约1个文件)
|
|
|
+ history_size = self.context_pre_minutes + 2 # 多保留2个作为缓冲
|
|
|
+ self.audio_file_history = defaultdict(lambda: deque(maxlen=history_size))
|
|
|
+
|
|
|
+ # 异常上下文捕获状态
|
|
|
+ # key: device_code, value: dict
|
|
|
+ self.anomaly_capture_state = {}
|
|
|
+
|
|
|
+ if self.context_capture_enabled:
|
|
|
+ logger.info(f"异常上下文捕获已启用 (前{self.context_pre_minutes}分钟 + 后{self.context_post_minutes}分钟)")
|
|
|
+
|
|
|
+ # ========================================
|
|
|
+ # 人体检测报警抑制配置
|
|
|
+ # ========================================
|
|
|
+ # 只要任意摄像头在冷却时间内检测到人,所有设备的异常报警都会被抑制
|
|
|
+ human_cfg = self.config.get('human_detection', {})
|
|
|
+ self.human_detection_enabled = human_cfg.get('enabled', False) and HUMAN_DETECTION_AVAILABLE
|
|
|
+ self.human_reader = None
|
|
|
+
|
|
|
+ if self.human_detection_enabled:
|
|
|
+ db_path = human_cfg.get('db_path', '')
|
|
|
+ cooldown_minutes = human_cfg.get('cooldown_minutes', 5)
|
|
|
+ self.human_reader = HumanDetectionReader(
|
|
|
+ db_path=db_path,
|
|
|
+ cooldown_minutes=cooldown_minutes
|
|
|
+ )
|
|
|
+ logger.info(f"人体检测报警抑制已启用 (冷却时间={cooldown_minutes}分钟, 数据库={db_path})")
|
|
|
+
|
|
|
+ def start(self):
|
|
|
+ """
|
|
|
+ 启动监控线程
|
|
|
+ """
|
|
|
+ if self.running:
|
|
|
+ return
|
|
|
+
|
|
|
+ # 启动前清理current目录中的遗留文件
|
|
|
+ self._cleanup_current_on_startup()
|
|
|
+
|
|
|
+ self.running = True
|
|
|
+ self.startup_time = datetime.now() # 记录启动时间
|
|
|
+ self.thread = threading.Thread(target=self._monitor_loop, daemon=True)
|
|
|
+ self.thread.start()
|
|
|
+ # 打印监控的设备列表
|
|
|
+ device_codes = list(self.device_map.keys())
|
|
|
+ logger.info(f"监控线程已启动 | 目录: {self.audio_dir} | 监控设备: {device_codes}")
|
|
|
+
|
|
|
+ def _cleanup_current_on_startup(self):
|
|
|
+ """
|
|
|
+ 启动时清理current目录中的遗留文件
|
|
|
+
|
|
|
+ 删除上次运行遗留的文件,避免混入新采集数据影响检测准确性
|
|
|
+ """
|
|
|
+ cleaned_count = 0
|
|
|
+
|
|
|
+ for device_code in self.device_map.keys():
|
|
|
+ current_dir = self.audio_dir / device_code / "current"
|
|
|
+ if not current_dir.exists():
|
|
|
+ continue
|
|
|
+
|
|
|
+ for wav_file in current_dir.glob("*.wav"):
|
|
|
+ try:
|
|
|
+ wav_file.unlink()
|
|
|
+ cleaned_count += 1
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"清理遗留文件失败: {wav_file.name} | {e}")
|
|
|
+
|
|
|
+ if cleaned_count > 0:
|
|
|
+ logger.info(f"启动清理: 已删除current目录中 {cleaned_count} 个遗留文件")
|
|
|
+
|
|
|
+ def stop(self):
|
|
|
+ """
|
|
|
+ 停止监控线程
|
|
|
+ """
|
|
|
+ if not self.running:
|
|
|
+ return
|
|
|
+
|
|
|
+ self.running = False
|
|
|
+ if self.thread is not None:
|
|
|
+ self.thread.join(timeout=5)
|
|
|
+ # 关闭推送线程池,等待已提交的推送任务完成
|
|
|
+ self._push_executor.shutdown(wait=True, cancel_futures=False)
|
|
|
+ logger.info(f"监控线程已停止")
|
|
|
+
|
|
|
+ def _reload_hot_config(self):
|
|
|
+ # 从 DB 热加载可变配置项,无需重启服务
|
|
|
+ # 只刷新运行时可安全变更的参数,不影响 FFmpeg 进程和流映射
|
|
|
+ if self._config_manager is None:
|
|
|
+ return
|
|
|
+
|
|
|
+ now = time.time()
|
|
|
+ # 未达到刷新间隔则跳过
|
|
|
+ if now - self._last_config_reload < self._config_reload_interval:
|
|
|
+ return
|
|
|
+ self._last_config_reload = now
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 从 DB 读取最新配置
|
|
|
+ fresh = self._config_manager.get_full_config()
|
|
|
+ self.config = fresh
|
|
|
+
|
|
|
+ # 刷新推送配置
|
|
|
+ push_cfg = fresh.get('push_notification', {})
|
|
|
+ self.push_enabled = push_cfg.get('enabled', False)
|
|
|
+ self.alert_enabled = push_cfg.get('alert_enabled', True)
|
|
|
+ self.push_timeout = push_cfg.get('timeout', 30)
|
|
|
+ self.push_retry_count = push_cfg.get('retry_count', 2)
|
|
|
+ raw_urls = push_cfg.get('push_base_urls', [])
|
|
|
+ self.push_base_urls = [
|
|
|
+ {"label": item.get("label", f"target-{i}"), "url": item.get("url", "").rstrip("/")}
|
|
|
+ for i, item in enumerate(raw_urls)
|
|
|
+ if item.get("url")
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 刷新投票配置
|
|
|
+ voting_cfg = fresh.get('prediction', {}).get('voting', {})
|
|
|
+ self.voting_enabled = voting_cfg.get('enabled', True)
|
|
|
+ self.voting_window_size = voting_cfg.get('window_size', 5)
|
|
|
+ self.voting_threshold = voting_cfg.get('threshold', 3)
|
|
|
+ self.tolerance_ratio = voting_cfg.get('tolerance_ratio', 0.05)
|
|
|
+
|
|
|
+ # 刷新能量检测配置
|
|
|
+ energy_cfg = fresh.get('prediction', {}).get('energy_detection', {})
|
|
|
+ self.skip_detection_when_stopped = energy_cfg.get('skip_when_stopped', True)
|
|
|
+
|
|
|
+ # 刷新人体检测配置
|
|
|
+ human_cfg = fresh.get('human_detection', {})
|
|
|
+ new_human_enabled = human_cfg.get('enabled', False)
|
|
|
+ if new_human_enabled != self.human_detection_enabled:
|
|
|
+ logger.info(f"人体检测抑制配置变更: {self.human_detection_enabled} -> {new_human_enabled}")
|
|
|
+ self.human_detection_enabled = new_human_enabled
|
|
|
+
|
|
|
+ logger.debug("配置热刷新完成")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"配置热刷新失败: {e}")
|
|
|
+
|
|
|
+ def _monitor_loop(self):
|
|
|
+ """
|
|
|
+ 监控循环(线程主函数)
|
|
|
+
|
|
|
+ 持续检查各设备的音频目录,处理新生成的WAV文件。
|
|
|
+ 每分钟汇总一次结果进行上报。
|
|
|
+ """
|
|
|
+ # 确保根目录存在
|
|
|
+ self.audio_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+
|
|
|
+ while self.running:
|
|
|
+ try:
|
|
|
+ # 热更新:定期从 DB 刷新可变配置
|
|
|
+ self._reload_hot_config()
|
|
|
+ # 扫描所有设备目录下的current文件夹
|
|
|
+ for device_code in self.device_map.keys():
|
|
|
+ device_current_dir = self.audio_dir / device_code / "current"
|
|
|
+
|
|
|
+ # 目录不存在则跳过
|
|
|
+ if not device_current_dir.exists():
|
|
|
+ continue
|
|
|
+
|
|
|
+ for wav_file in device_current_dir.glob("*.wav"):
|
|
|
+ # 跳过已处理的文件
|
|
|
+ if wav_file in self.seen_files:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 检查文件是否完整
|
|
|
+ try:
|
|
|
+ stat_info = wav_file.stat()
|
|
|
+ file_age = time.time() - stat_info.st_mtime
|
|
|
+ file_size = stat_info.st_size
|
|
|
+
|
|
|
+ # 文件修改时间检查(确保文件写入完成)
|
|
|
+ # FFmpeg生成文件需要时间,太新的文件可能还没写完
|
|
|
+ if file_age < 12.0:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 文件大小检查(60秒 × 16000Hz × 2字节 ≈ 1.9MB)
|
|
|
+ # 范围:500KB - 3MB
|
|
|
+ if file_size < 500_000 or file_size > 3_000_000:
|
|
|
+ # 只有当文件生成已经有一段时间(>20秒)且依然很小,才判定为异常
|
|
|
+ # 这样可以避免刚开始生成、正在写入的文件被误报
|
|
|
+ if file_age > 20.0:
|
|
|
+ logger.warning(f"文件大小异常: {wav_file.name} ({file_size / 1000:.1f}KB)")
|
|
|
+
|
|
|
+ # 自动删除过小的文件(通常是0KB或损坏文件)
|
|
|
+ if file_size < 500_000:
|
|
|
+ try:
|
|
|
+ wav_file.unlink()
|
|
|
+ logger.debug(f"删除异常小文件: {wav_file.name}")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"删除文件失败: {wav_file.name} | {e}")
|
|
|
+
|
|
|
+ self.seen_files.add(wav_file)
|
|
|
+ continue # 继续下一个文件循环
|
|
|
+
|
|
|
+ # 正常大小的文件继续处理
|
|
|
+ if file_size < 500_000: # 双重检查,防止漏网
|
|
|
+ continue
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"文件状态检查失败: {wav_file.name} | {e}")
|
|
|
+ continue
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ # 处理新文件
|
|
|
+ self._process_new_file(wav_file)
|
|
|
+
|
|
|
+ # 标记为已处理
|
|
|
+ self.seen_files.add(wav_file)
|
|
|
+
|
|
|
+ # 检查是否需要进行周期性上报
|
|
|
+ self._check_periodic_upload()
|
|
|
+
|
|
|
+ # 检查聚合窗口是否到期,到期则执行聚合判定并推送
|
|
|
+ if self.alert_aggregator:
|
|
|
+ self.alert_aggregator.check_and_flush()
|
|
|
+
|
|
|
+ # 清理过大的已处理文件集合
|
|
|
+ if len(self.seen_files) > 10000:
|
|
|
+ recent_files = sorted(self.seen_files,
|
|
|
+ key=lambda f: f.stat().st_mtime if f.exists() else 0)[-5000:]
|
|
|
+ self.seen_files = set(recent_files)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"监控循环错误: {e}")
|
|
|
+
|
|
|
+ # 等待下一次检查
|
|
|
+ time.sleep(self.check_interval)
|
|
|
+
|
|
|
+ def _process_new_file(self, wav_file):
|
|
|
+ """
|
|
|
+ 处理新的音频文件
|
|
|
+
|
|
|
+ 文件名格式: {project_id}_{device_code}_{时间戳}.wav
|
|
|
+ 例如: 92_1#-1_20251218142000.wav
|
|
|
+
|
|
|
+ 流程:
|
|
|
+ 1. 加载音频
|
|
|
+ 2. 计算能量判断设备状态
|
|
|
+ 3. 进行AE异常检测,记录重建误差
|
|
|
+ 4. 保存音频数据用于计算频谱图
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 从文件名解析device_code
|
|
|
+ # 格式: {project_id}_{device_code}_{时间戳}.wav
|
|
|
+ try:
|
|
|
+ parts = wav_file.stem.split('_')
|
|
|
+ if len(parts) >= 3:
|
|
|
+ device_code = parts[1] # 第二部分是device_code
|
|
|
+ else:
|
|
|
+ device_code = "unknown"
|
|
|
+ except:
|
|
|
+ device_code = "unknown"
|
|
|
+
|
|
|
+ # ========================================
|
|
|
+ # 泵启停状态检测(基于 PLC 查询)- 优先于模型检查
|
|
|
+ # ----------------------------------------
|
|
|
+ # 逻辑:通过 SCADA API 查询 PLC 点位判断泵是否运行
|
|
|
+ # 作用:
|
|
|
+ # 1. 冷启动模式下也能过滤停机时的音频(保证训练数据质量)
|
|
|
+ # 2. 泵停机时跳过异常检测(避免无意义的检测)
|
|
|
+ # 3. 记录设备状态用于后续上报
|
|
|
+ # 依赖:pump_state_monitor + rtsp_config 中的 pump_status_plc 配置
|
|
|
+ # ========================================
|
|
|
+ device_status = "未知"
|
|
|
+ pump_is_running = True # 默认认为运行中(PLC 查询失败时的保守策略)
|
|
|
+
|
|
|
+ # 获取该设备对应的流配置
|
|
|
+ stream_config = self.device_map.get(device_code)
|
|
|
+
|
|
|
+ if self.pump_state_monitor and stream_config:
|
|
|
+ # 根据设备的 pump_name 找到关联的 PLC 点位配置
|
|
|
+ pump_name = stream_config.pump_name
|
|
|
+ pump_configs = self.pump_status_plc_configs.get(pump_name, [])
|
|
|
+
|
|
|
+ if pump_configs:
|
|
|
+ # 遍历所有关联泵,只要有一个运行就认为设备在工作
|
|
|
+ any_pump_running = False
|
|
|
+ for pump_cfg in pump_configs:
|
|
|
+ point = pump_cfg.get("point", "")
|
|
|
+ name = pump_cfg.get("name", point)
|
|
|
+ pump_id = point
|
|
|
+
|
|
|
+ # 查询泵状态(带 60 秒缓存,不会频繁请求 SCADA)
|
|
|
+ is_running, _ = self.pump_state_monitor.update_pump_state(pump_id, point, name)
|
|
|
+ if is_running:
|
|
|
+ any_pump_running = True
|
|
|
+
|
|
|
+ pump_is_running = any_pump_running
|
|
|
+ device_status = "开机" if pump_is_running else "停机"
|
|
|
+
|
|
|
+ # 泵全部停机时跳过(可通过配置禁用此行为)
|
|
|
+ # 冷启动和正常模式都适用,确保训练数据质量
|
|
|
+ if self.skip_detection_when_stopped and not pump_is_running:
|
|
|
+ logger.info(f"泵停机(PLC): {device_code} | 归档到过渡期目录(不用于训练)")
|
|
|
+ self._move_audio_to_transition_dir(wav_file, "stopped")
|
|
|
+ return
|
|
|
+
|
|
|
+ # ========================================
|
|
|
+ # 过渡期检测(泵启停后15分钟内)
|
|
|
+ # ----------------------------------------
|
|
|
+ # 目的:过滤启停过程中的不稳定音频
|
|
|
+ # 确保训练数据只包含稳定运行期的音频
|
|
|
+ # ========================================
|
|
|
+ if self.skip_detection_when_stopped:
|
|
|
+ pump_in_transition, transition_pump_names = \
|
|
|
+ self.pump_state_monitor.check_pumps_transition(pump_configs)
|
|
|
+
|
|
|
+ if pump_in_transition:
|
|
|
+ logger.info(f"泵过渡期: {device_code} | 归档到过渡期目录(不用于训练) | "
|
|
|
+ f"过渡期泵: {', '.join(transition_pump_names)}")
|
|
|
+ self._move_audio_to_transition_dir(wav_file, "transition")
|
|
|
+ return
|
|
|
+
|
|
|
+ # 获取该设备的预测器(懒加载模型)
|
|
|
+ device_predictor = self.multi_predictor.get_predictor(device_code)
|
|
|
+ if device_predictor is None:
|
|
|
+ logger.info(f"冷启动模式(设备 {device_code} 无模型): 归档 {wav_file.name}")
|
|
|
+ self._move_audio_to_date_dir(wav_file)
|
|
|
+ return
|
|
|
+ # 加载音频
|
|
|
+ try:
|
|
|
+ y, sr = librosa.load(str(wav_file), sr=CFG.SR, mono=True)
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"音频加载失败: {wav_file.name} | {e}")
|
|
|
+ return
|
|
|
+
|
|
|
+ # 记录状态到缓存(用于周期上报)
|
|
|
+ # 注意:泵状态检测已在前面完成,这里只记录状态
|
|
|
+ self.device_cache[device_code]["status"] = device_status
|
|
|
+
|
|
|
+ # ========================================
|
|
|
+ # 计算重建误差
|
|
|
+ # ========================================
|
|
|
+ error = self._compute_reconstruction_error(wav_file, device_predictor)
|
|
|
+
|
|
|
+ if error is not None:
|
|
|
+ self.device_cache[device_code]["errors"].append(error)
|
|
|
+
|
|
|
+ # ========================================
|
|
|
+ # 保存音频数据用于计算频谱图
|
|
|
+ # ========================================
|
|
|
+ self.device_cache[device_code]["audio_data"].append(y)
|
|
|
+
|
|
|
+ # ========================================
|
|
|
+ # 暂存文件路径,等待周期聚合判定后再归档
|
|
|
+ # ========================================
|
|
|
+ if "pending_files" not in self.device_cache[device_code]:
|
|
|
+ self.device_cache[device_code]["pending_files"] = []
|
|
|
+ self.device_cache[device_code]["pending_files"].append(wav_file)
|
|
|
+
|
|
|
+ # ========================================
|
|
|
+ # 记录到音频历史缓存(用于异常上下文捕获)
|
|
|
+ # ========================================
|
|
|
+ if self.context_capture_enabled:
|
|
|
+ self.audio_file_history[device_code].append((datetime.now(), wav_file))
|
|
|
+
|
|
|
+ # 初始化上次上报时间
|
|
|
+ if self.device_cache[device_code]["last_upload"] is None:
|
|
|
+ self.device_cache[device_code]["last_upload"] = datetime.now()
|
|
|
+
|
|
|
+ # 获取阈值并判断结果
|
|
|
+ threshold = self._get_threshold(device_code)
|
|
|
+
|
|
|
+ # ========================================
|
|
|
+ # 快速通道:连续多个文件误差极高时快速预警(暂时关闭)
|
|
|
+ # 不走投票窗口,用于捕获突发性严重故障
|
|
|
+ # ========================================
|
|
|
+ # if error is not None and threshold is not None and threshold > 0:
|
|
|
+ # # 维护快速通道缓冲区
|
|
|
+ # if "fast_alert_buffer" not in self.device_cache[device_code]:
|
|
|
+ # self.device_cache[device_code]["fast_alert_buffer"] = []
|
|
|
+ #
|
|
|
+ # fast_buf = self.device_cache[device_code]["fast_alert_buffer"]
|
|
|
+ # # 连续性检查:误差超过 2x 阈值时记录,否则清空缓冲区
|
|
|
+ # if error > threshold * 2.0:
|
|
|
+ # fast_buf.append(error)
|
|
|
+ # else:
|
|
|
+ # fast_buf.clear()
|
|
|
+ #
|
|
|
+ # # 连续 3 个文件(~24秒)都超过 2x 阈值 → 触发快速预警
|
|
|
+ # FAST_ALERT_CONSECUTIVE = 3
|
|
|
+ # if len(fast_buf) >= FAST_ALERT_CONSECUTIVE:
|
|
|
+ # # 检查是否已触发过(避免重复告警)
|
|
|
+ # last_fast = self.device_cache[device_code].get("last_fast_alert_time")
|
|
|
+ # now = datetime.now()
|
|
|
+ # can_fast_alert = (last_fast is None or
|
|
|
+ # (now - last_fast).total_seconds() > 300) # 5分钟冷却
|
|
|
+ #
|
|
|
+ # if can_fast_alert:
|
|
|
+ # # 快速通道同样受抑制逻辑约束
|
|
|
+ # suppress = False
|
|
|
+ # # 泵过渡期抑制
|
|
|
+ # if self.pump_state_monitor and stream_config:
|
|
|
+ # pump_name = stream_config.pump_name
|
|
|
+ # pump_configs = self.pump_status_plc_configs.get(pump_name, [])
|
|
|
+ # if pump_configs:
|
|
|
+ # in_transition, _ = self.pump_state_monitor.check_pumps_transition(pump_configs)
|
|
|
+ # if in_transition:
|
|
|
+ # suppress = True
|
|
|
+ # # 人体检测抑制
|
|
|
+ # if self.human_detection_enabled and self.human_reader:
|
|
|
+ # if self.human_reader.is_in_cooldown():
|
|
|
+ # suppress = True
|
|
|
+ # # alert_enabled 开关
|
|
|
+ # if not self.alert_enabled:
|
|
|
+ # suppress = True
|
|
|
+ #
|
|
|
+ # if not suppress:
|
|
|
+ # avg_fast = float(np.mean(fast_buf))
|
|
|
+ # logger.warning(
|
|
|
+ # f"[!!] 快速通道触发: {device_code} | "
|
|
|
+ # f"连续{len(fast_buf)}个文件异常 | "
|
|
|
+ # f"平均误差={avg_fast:.6f} 阈值={threshold:.6f}")
|
|
|
+ # self.device_cache[device_code]["last_fast_alert_time"] = now
|
|
|
+ # fast_buf.clear()
|
|
|
+ # # 标记快速预警,在下次周期上报时一并处理
|
|
|
+ # self.device_cache[device_code]["fast_alert_pending"] = True
|
|
|
+
|
|
|
+ if error is not None and threshold is not None:
|
|
|
+ is_anomaly = error > threshold
|
|
|
+ result_tag = "!!" if is_anomaly else "OK"
|
|
|
+ logger.info(f"[{result_tag}] {device_code} | {wav_file.name} | "
|
|
|
+ f"误差={error:.6f} 阈值={threshold:.6f}")
|
|
|
+ elif error is not None:
|
|
|
+ logger.debug(f"文件预测: {wav_file.name} | 误差={error:.6f} | 阀值=未设置")
|
|
|
+ else:
|
|
|
+ logger.warning(f"预测跳过: {wav_file.name} | 误差计算失败")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"处理文件失败: {wav_file.name} | 错误: {e}")
|
|
|
+
|
|
|
+ def _compute_reconstruction_error(self, wav_file, device_predictor):
|
|
|
+ """
|
|
|
+ 计算单个音频文件的重建误差(Min-Max 标准化)
|
|
|
+
|
|
|
+ 使用8秒窗口、4秒步长切割音频,提取多个patches分别计算误差后取平均值。
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ wav_file: 音频文件路径
|
|
|
+ device_predictor: 设备预测器实例
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ 重建误差值(所有patches的平均MSE),失败返回None
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ import torch
|
|
|
+ from predictor.utils import align_to_target
|
|
|
+
|
|
|
+ # Min-Max 标准化参数
|
|
|
+ global_min = device_predictor.global_min
|
|
|
+ global_max = device_predictor.global_max
|
|
|
+
|
|
|
+ # 加载音频
|
|
|
+ y, _ = librosa.load(str(wav_file), sr=CFG.SR, mono=True)
|
|
|
+
|
|
|
+ win_samples = int(CFG.WIN_SEC * CFG.SR)
|
|
|
+ hop_samples = int(CFG.HOP_SEC * CFG.SR)
|
|
|
+
|
|
|
+ if len(y) < win_samples:
|
|
|
+ logger.warning(f"音频太短,无法提取patches: {wav_file.name}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ patches = []
|
|
|
+ for start in range(0, len(y) - win_samples + 1, hop_samples):
|
|
|
+ window = y[start:start + win_samples]
|
|
|
+
|
|
|
+ S = librosa.feature.melspectrogram(
|
|
|
+ y=window, sr=CFG.SR, n_fft=CFG.N_FFT,
|
|
|
+ hop_length=CFG.HOP_LENGTH, n_mels=CFG.N_MELS, power=2.0
|
|
|
+ )
|
|
|
+ S_db = librosa.power_to_db(S, ref=np.max)
|
|
|
+
|
|
|
+ if S_db.shape[1] < CFG.TARGET_FRAMES:
|
|
|
+ S_db = np.pad(S_db, ((0, 0), (0, CFG.TARGET_FRAMES - S_db.shape[1])))
|
|
|
+ else:
|
|
|
+ S_db = S_db[:, :CFG.TARGET_FRAMES]
|
|
|
+
|
|
|
+ # Min-Max 标准化
|
|
|
+ S_norm = (S_db - global_min) / (global_max - global_min + 1e-6)
|
|
|
+ patches.append(S_norm.astype(np.float32))
|
|
|
+
|
|
|
+ if not patches:
|
|
|
+ logger.warning(f"未能提取任何patches: {wav_file.name}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ arr = np.stack(patches, 0)
|
|
|
+ arr = np.expand_dims(arr, 1)
|
|
|
+ tensor = torch.from_numpy(arr)
|
|
|
+
|
|
|
+ torch_device = device_predictor.torch_device
|
|
|
+ tensor = tensor.to(torch_device)
|
|
|
+
|
|
|
+ with torch.no_grad():
|
|
|
+ recon = device_predictor.model(tensor)
|
|
|
+ recon = align_to_target(recon, tensor)
|
|
|
+ mse_per_patch = torch.mean((recon - tensor) ** 2, dim=[1, 2, 3])
|
|
|
+ mean_mse = torch.mean(mse_per_patch).item()
|
|
|
+
|
|
|
+ logger.debug(f"重建误差: {wav_file.name} | patches={len(patches)} | MSE={mean_mse:.6f}")
|
|
|
+ return mean_mse
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"计算重建误差失败: {wav_file.name} | {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ def _check_periodic_upload(self):
|
|
|
+ """
|
|
|
+ 检查是否需要进行周期性上报
|
|
|
+
|
|
|
+ 每分钟汇总一次各设备的检测结果并上报
|
|
|
+ """
|
|
|
+ now = datetime.now()
|
|
|
+
|
|
|
+ for device_code, cache in self.device_cache.items():
|
|
|
+ # 检查上报时间间隔
|
|
|
+ last_upload = cache.get("last_upload")
|
|
|
+ if last_upload is None:
|
|
|
+ continue
|
|
|
+
|
|
|
+ elapsed = (now - last_upload).total_seconds()
|
|
|
+
|
|
|
+ # 达到上报周期
|
|
|
+ if elapsed >= self.segment_duration:
|
|
|
+ # 获取该设备的流配置
|
|
|
+ stream_config = self.device_map.get(device_code)
|
|
|
+
|
|
|
+ # 计算平均重建误差
|
|
|
+ errors = cache.get("errors", [])
|
|
|
+ avg_error = float(np.mean(errors)) if errors else 0.0
|
|
|
+
|
|
|
+ # 获取阈值
|
|
|
+ threshold = self._get_threshold(device_code)
|
|
|
+
|
|
|
+ # 判断当前周期是否异常(带容差区间)
|
|
|
+ # 容差区间:避免边界值反复跳变,但灰区内仍与阈值比较
|
|
|
+ if threshold:
|
|
|
+ upper_bound = threshold * (1 + self.tolerance_ratio) # 确定异常边界
|
|
|
+ lower_bound = threshold * (1 - self.tolerance_ratio) # 确定正常边界
|
|
|
+
|
|
|
+ if avg_error > upper_bound:
|
|
|
+ # 超过上边界 -> 确定异常
|
|
|
+ is_current_anomaly = True
|
|
|
+ elif avg_error < lower_bound:
|
|
|
+ # 低于下边界 -> 确定正常
|
|
|
+ is_current_anomaly = False
|
|
|
+ else:
|
|
|
+ # 灰区 -> 与阈值比较(避免异常状态延长)
|
|
|
+ is_current_anomaly = avg_error > threshold
|
|
|
+
|
|
|
+ # 记录本次判定结果
|
|
|
+ self.last_single_anomaly[device_code] = is_current_anomaly
|
|
|
+ else:
|
|
|
+ is_current_anomaly = False
|
|
|
+
|
|
|
+ # ========================================
|
|
|
+ # 滑动窗口投票:5次中有3次异常才判定为异常
|
|
|
+ # ========================================
|
|
|
+ if device_code not in self.detection_history:
|
|
|
+ self.detection_history[device_code] = []
|
|
|
+
|
|
|
+ # 记录本次检测结果
|
|
|
+ self.detection_history[device_code].append(is_current_anomaly)
|
|
|
+
|
|
|
+ # 保持窗口大小
|
|
|
+ if len(self.detection_history[device_code]) > self.voting_window_size:
|
|
|
+ self.detection_history[device_code].pop(0)
|
|
|
+
|
|
|
+ # 投票判定最终异常状态
|
|
|
+ if self.voting_enabled and len(self.detection_history[device_code]) >= self.voting_window_size:
|
|
|
+ anomaly_count = sum(self.detection_history[device_code])
|
|
|
+ is_anomaly = anomaly_count >= self.voting_threshold
|
|
|
+ window_info = f"窗口[{anomaly_count}/{self.voting_window_size}]"
|
|
|
+ else:
|
|
|
+ is_anomaly = is_current_anomaly
|
|
|
+ window_info = "窗口未满"
|
|
|
+
|
|
|
+ # ========================================
|
|
|
+ # 状态变更检测
|
|
|
+ # trigger_alert: 仅在 正常(或初始) -> 异常 时为True
|
|
|
+ # 冷却逻辑已移至 AlertAggregator 内部处理
|
|
|
+ # ========================================
|
|
|
+ last_is_anomaly = self.last_report_status.get(device_code)
|
|
|
+ trigger_alert = False
|
|
|
+
|
|
|
+ if is_anomaly:
|
|
|
+ # 只有状态变化(正常->异常) 才触发报警流程
|
|
|
+ if last_is_anomaly is None or not last_is_anomaly:
|
|
|
+ # ========================================
|
|
|
+ # 泵启停过渡期检查(抑制误报)
|
|
|
+ # ----------------------------------------
|
|
|
+ # 逻辑:检测到异常时,检查关联泵是否刚启停
|
|
|
+ # 作用:泵启停过程中音频特征剧烈变化,易被误判为异常
|
|
|
+ # 过渡期内抑制告警,避免误报
|
|
|
+ # 过渡期窗口:配置中的 transition_window_minutes(默认15分钟)
|
|
|
+ # ========================================
|
|
|
+ pump_in_transition = False
|
|
|
+ transition_pump_names = []
|
|
|
+
|
|
|
+ if self.pump_state_monitor and stream_config:
|
|
|
+ pump_name = stream_config.pump_name
|
|
|
+ pump_configs = self.pump_status_plc_configs.get(pump_name, [])
|
|
|
+
|
|
|
+ if pump_configs:
|
|
|
+ # 批量检查所有关联泵是否有处于过渡期的
|
|
|
+ pump_in_transition, transition_pump_names = \
|
|
|
+ self.pump_state_monitor.check_pumps_transition(pump_configs)
|
|
|
+
|
|
|
+ if pump_in_transition:
|
|
|
+ # 有泵处于过渡期 -> 抑制本次告警
|
|
|
+ trigger_alert = False
|
|
|
+ logger.info(f"泵启停过渡期,抑制告警: {device_code} | "
|
|
|
+ f"过渡期泵: {', '.join(transition_pump_names)}")
|
|
|
+ else:
|
|
|
+ # 检查 alert_enabled 开关:如果禁用则不触发告警
|
|
|
+ if self.alert_enabled:
|
|
|
+ # ========================================
|
|
|
+ # 人体检测抑制:任意摄像头检测到人则不报警
|
|
|
+ # ========================================
|
|
|
+ if self.human_detection_enabled and self.human_reader:
|
|
|
+ if self.human_reader.is_in_cooldown():
|
|
|
+ trigger_alert = False
|
|
|
+ status_info = self.human_reader.get_status_info()
|
|
|
+ logger.info(f"人体检测抑制: {device_code} | {status_info},跳过报警")
|
|
|
+ else:
|
|
|
+ trigger_alert = True
|
|
|
+ else:
|
|
|
+ trigger_alert = True
|
|
|
+ else:
|
|
|
+ trigger_alert = False
|
|
|
+ logger.debug(f"异常告警已禁用,跳过告警: {device_code}")
|
|
|
+
|
|
|
+ # 获取运行状态
|
|
|
+ running_status = cache.get("status", "未知")
|
|
|
+
|
|
|
+ # 获取进水流量
|
|
|
+ inlet_flow = self._get_inlet_flow(stream_config) if stream_config else 0.0
|
|
|
+
|
|
|
+ # 计算本次频谱图
|
|
|
+ audio_data = cache.get("audio_data", [])
|
|
|
+ freq_db = self._compute_frequency_spectrum(audio_data)
|
|
|
+
|
|
|
+ # 保存到频谱图历史(只保存dB值)
|
|
|
+ if self.freq_history_enabled and freq_db:
|
|
|
+ self.freq_history[device_code].append((now, freq_db))
|
|
|
+ # 清理过期历史
|
|
|
+ cutoff = now - timedelta(minutes=self.freq_history_minutes)
|
|
|
+ self.freq_history[device_code] = [
|
|
|
+ (t, d) for t, d in self.freq_history[device_code]
|
|
|
+ if t > cutoff
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 计算历史频谱图平均值(normal_frequency_middle)
|
|
|
+ freq_middle_db = self._compute_frequency_middle(device_code)
|
|
|
+
|
|
|
+ # ========================================
|
|
|
+ # 异常分类:只在状态从正常变为异常时进行分类
|
|
|
+ # 持续异常期间沿用上次分类结果,保持一致性
|
|
|
+ # ========================================
|
|
|
+ anomaly_type_code = 6 # 默认:未分类异常
|
|
|
+ type_name = "未分类异常"
|
|
|
+
|
|
|
+ if is_anomaly and audio_data:
|
|
|
+ # 检查是否是新的异常(从正常变为异常)
|
|
|
+ is_new_anomaly = (last_is_anomaly is None or not last_is_anomaly)
|
|
|
+
|
|
|
+ if is_new_anomaly:
|
|
|
+ # 新异常:进行分类并锁定结果
|
|
|
+ try:
|
|
|
+ from core.anomaly_classifier import classify_anomaly
|
|
|
+ if len(audio_data) > 0:
|
|
|
+ y = audio_data[-1] if isinstance(audio_data[-1], np.ndarray) else np.array(audio_data[-1])
|
|
|
+ anomaly_type_code, type_name, confidence = classify_anomaly(y, sr=16000)
|
|
|
+ # 锁定分类结果
|
|
|
+ self.locked_anomaly_type[device_code] = (anomaly_type_code, type_name)
|
|
|
+ logger.info(f"异常分类(新异常): {type_name} (code={anomaly_type_code}, 置信度={confidence:.2f})")
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"异常分类失败: {e}")
|
|
|
+ else:
|
|
|
+ # 持续异常:沿用锁定的分类结果
|
|
|
+ if device_code in self.locked_anomaly_type:
|
|
|
+ anomaly_type_code, type_name = self.locked_anomaly_type[device_code]
|
|
|
+ logger.debug(f"异常分类(沿用): {type_name} (code={anomaly_type_code})")
|
|
|
+ else:
|
|
|
+ # 状态正常时清除锁定的分类结果
|
|
|
+ if device_code in self.locked_anomaly_type:
|
|
|
+ del self.locked_anomaly_type[device_code]
|
|
|
+
|
|
|
+ # 上报逻辑
|
|
|
+ if self.push_enabled and stream_config:
|
|
|
+ # 预读异常音频base64:在文件被归档/清空之前立即读取
|
|
|
+ # 解决竞态问题:异步推送或聚合器延迟推送时文件可能已被移走
|
|
|
+ pre_read_wav_b64 = ""
|
|
|
+ if trigger_alert:
|
|
|
+ try:
|
|
|
+ current_pending = cache.get("pending_files", [])
|
|
|
+ if current_pending and current_pending[0].exists():
|
|
|
+ with open(current_pending[0], "rb") as f:
|
|
|
+ pre_read_wav_b64 = base64.b64encode(f.read()).decode('utf-8')
|
|
|
+ logger.debug(f"预读异常音频成功: {current_pending[0].name} | size={len(pre_read_wav_b64)}")
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"预读异常音频失败: {e}")
|
|
|
+
|
|
|
+ if trigger_alert and self.alert_aggregator:
|
|
|
+ # 报警走聚合器:跨设备聚合判定 + 分类型冷却
|
|
|
+ # 聚合器会在窗口到期后决定是否真正推送
|
|
|
+ self.alert_aggregator.submit_alert(
|
|
|
+ plant_name=stream_config.plant_name,
|
|
|
+ device_code=device_code,
|
|
|
+ anomaly_type_code=anomaly_type_code,
|
|
|
+ push_kwargs=dict(
|
|
|
+ stream_config=stream_config,
|
|
|
+ device_code=device_code,
|
|
|
+ is_anomaly=is_anomaly,
|
|
|
+ trigger_alert=True,
|
|
|
+ abnormal_score=avg_error,
|
|
|
+ score_threshold=threshold,
|
|
|
+ running_status=running_status,
|
|
|
+ inlet_flow=inlet_flow,
|
|
|
+ freq_db=freq_db,
|
|
|
+ freq_middle_db=freq_middle_db,
|
|
|
+ anomaly_type_code=anomaly_type_code,
|
|
|
+ abnormal_wav_b64=pre_read_wav_b64
|
|
|
+ )
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ # 非报警(心跳)或聚合器不可用 -> 提交到线程池异步推送
|
|
|
+ self._push_executor.submit(
|
|
|
+ self._push_detection_result,
|
|
|
+ stream_config=stream_config,
|
|
|
+ device_code=device_code,
|
|
|
+ is_anomaly=is_anomaly,
|
|
|
+ trigger_alert=trigger_alert,
|
|
|
+ abnormal_score=avg_error,
|
|
|
+ score_threshold=threshold,
|
|
|
+ running_status=running_status,
|
|
|
+ inlet_flow=inlet_flow,
|
|
|
+ freq_db=freq_db,
|
|
|
+ freq_middle_db=freq_middle_db,
|
|
|
+ anomaly_type_code=anomaly_type_code,
|
|
|
+ abnormal_wav_b64=pre_read_wav_b64
|
|
|
+ )
|
|
|
+
|
|
|
+ # 更新上一次状态
|
|
|
+ self.last_report_status[device_code] = is_anomaly
|
|
|
+
|
|
|
+ # 日志记录
|
|
|
+ thr_str = f"{threshold:.6f}" if threshold else "未设置"
|
|
|
+ # 报警去向说明
|
|
|
+ if trigger_alert and self.alert_aggregator:
|
|
|
+ alert_dest = "-> 聚合器"
|
|
|
+ elif trigger_alert:
|
|
|
+ alert_dest = "-> 直接推送"
|
|
|
+ else:
|
|
|
+ alert_dest = ""
|
|
|
+
|
|
|
+ # 使用设备名作为标识,增加视觉分隔
|
|
|
+ cam_label = ""
|
|
|
+ if stream_config:
|
|
|
+ cam_label = f"({stream_config.camera_name})"
|
|
|
+
|
|
|
+ result_emoji = "!!" if is_anomaly else "OK"
|
|
|
+ alert_str = f"报警=是 {alert_dest}" if trigger_alert else "报警=否"
|
|
|
+
|
|
|
+ logger.info(
|
|
|
+ f"[{result_emoji}] {device_code}{cam_label} | "
|
|
|
+ f"误差={avg_error:.6f} 阈值={thr_str} | "
|
|
|
+ f"{window_info} | {running_status} | "
|
|
|
+ f"{'异常' if is_anomaly else '正常'} | {alert_str}"
|
|
|
+ )
|
|
|
+
|
|
|
+ # ========================================
|
|
|
+ # 根据单次检测结果归档文件
|
|
|
+ # 归档基于 is_current_anomaly(单次检测),而非投票后的 is_anomaly
|
|
|
+ # 投票机制只影响是否推送报警,不影响文件分类
|
|
|
+ # ========================================
|
|
|
+ pending_files = cache.get("pending_files", [])
|
|
|
+
|
|
|
+ # 检查是否是新的异常(从正常变为异常)
|
|
|
+ is_new_anomaly = is_anomaly and (last_is_anomaly is None or not last_is_anomaly)
|
|
|
+
|
|
|
+ # ========================================
|
|
|
+ # 异常上下文捕获逻辑
|
|
|
+ # ========================================
|
|
|
+ if self.context_capture_enabled:
|
|
|
+ # 检查并更新捕获状态
|
|
|
+ self._update_anomaly_capture_state(device_code, is_anomaly, is_new_anomaly,
|
|
|
+ avg_error, threshold, now, pending_files)
|
|
|
+
|
|
|
+ if pending_files:
|
|
|
+ if is_current_anomaly:
|
|
|
+ # 单次检测为异常 -> 归档到异常目录
|
|
|
+ # 检查异常保存冷却时间
|
|
|
+ last_save = self.last_anomaly_save_time.get(device_code)
|
|
|
+ should_save = True
|
|
|
+
|
|
|
+ if last_save:
|
|
|
+ elapsed_minutes = (now - last_save).total_seconds() / 60
|
|
|
+ should_save = elapsed_minutes >= self.anomaly_save_cooldown_minutes
|
|
|
+
|
|
|
+ if should_save and not self.context_capture_enabled:
|
|
|
+ # 上下文捕获禁用时,使用原有逻辑:移动到异常待排查目录
|
|
|
+ for f in pending_files:
|
|
|
+ self._move_audio_to_anomaly_pending(f)
|
|
|
+ self.last_anomaly_save_time[device_code] = now
|
|
|
+ logger.warning(f"已隔离 {len(pending_files)} 个异常文件到待排查目录")
|
|
|
+ elif self.context_capture_enabled:
|
|
|
+ # 上下文捕获启用时:
|
|
|
+ # - 异常文件已在 _update_anomaly_capture_state 中记录路径
|
|
|
+ # - 文件会在 _save_anomaly_context 中被移动到异常目录
|
|
|
+ # - 这里暂时保留文件,不做处理
|
|
|
+ pass # 文件由捕获逻辑处理
|
|
|
+ else:
|
|
|
+ # 冷却时间内,删除文件不保存
|
|
|
+ for f in pending_files:
|
|
|
+ try:
|
|
|
+ f.unlink()
|
|
|
+ except Exception as e:
|
|
|
+ logger.debug(f"删除冷却期内文件失败: {f.name} | {e}")
|
|
|
+ remaining_minutes = self.anomaly_save_cooldown_minutes - elapsed_minutes
|
|
|
+ logger.debug(f"异常保存冷却中: {device_code} | 剩余 {remaining_minutes:.1f} 分钟")
|
|
|
+ else:
|
|
|
+ # 单次检测为正常 -> 移到日期目录归档
|
|
|
+ for f in pending_files:
|
|
|
+ self._move_audio_to_date_dir(f)
|
|
|
+ # 状态恢复正常时,清除保存冷却时间(下次异常时可立即保存)
|
|
|
+ if device_code in self.last_anomaly_save_time:
|
|
|
+ del self.last_anomaly_save_time[device_code]
|
|
|
+
|
|
|
+ # 重置缓存
|
|
|
+ cache["errors"] = []
|
|
|
+ cache["audio_data"] = []
|
|
|
+ cache["pending_files"] = []
|
|
|
+ cache["last_upload"] = now
|
|
|
+ logger.info("─" * 60)
|
|
|
+
|
|
|
+ def _update_anomaly_capture_state(self, device_code, is_anomaly,
|
|
|
+ is_new_anomaly, avg_error,
|
|
|
+ threshold, now, pending_files):
|
|
|
+ """
|
|
|
+ 更新异常上下文捕获状态
|
|
|
+
|
|
|
+ 状态机:
|
|
|
+ 1. 未触发 -> 检测到新异常 -> 触发捕获,从 audio_file_history 回溯获取文件
|
|
|
+ 2. 已触发,等待后续 -> 持续收集后续文件
|
|
|
+ 3. 已触发,时间到 -> 保存所有文件和元数据
|
|
|
+
|
|
|
+ 修复说明:
|
|
|
+ - anomaly_files 不再依赖 pending_files(可能为空)
|
|
|
+ - 改为从 audio_file_history 回溯获取触发时刻前 1 分钟内的文件
|
|
|
+ - pre_files 改为获取触发时刻前 1~N+1 分钟的文件(排除 anomaly 时间段)
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ device_code: 设备编号
|
|
|
+ is_anomaly: 当前周期是否异常
|
|
|
+ is_new_anomaly: 是否是新异常(从正常变为异常)
|
|
|
+ avg_error: 平均重建误差
|
|
|
+ threshold: 阈值
|
|
|
+ now: 当前时间
|
|
|
+ pending_files: 当前周期的待处理文件
|
|
|
+ """
|
|
|
+ import shutil
|
|
|
+ import json
|
|
|
+
|
|
|
+ state = self.anomaly_capture_state.get(device_code)
|
|
|
+
|
|
|
+ if is_new_anomaly and state is None:
|
|
|
+ # 新异常触发:开始捕获
|
|
|
+ # ========================================
|
|
|
+ # 从 audio_file_history 回溯获取文件
|
|
|
+ # ----------------------------------------
|
|
|
+ # anomaly_files: 触发时刻前 1 分钟内的文件(最接近异常的时间段)
|
|
|
+ # pre_files: 触发时刻前 1~(N+1) 分钟的文件(更早的正常时间段)
|
|
|
+ # ========================================
|
|
|
+ anomaly_cutoff = now - timedelta(minutes=1) # 前1分钟
|
|
|
+ pre_cutoff = now - timedelta(minutes=self.context_pre_minutes + 1) # 前N+1分钟
|
|
|
+
|
|
|
+ pre_files = []
|
|
|
+ anomaly_files = []
|
|
|
+
|
|
|
+ for ts, fpath in self.audio_file_history[device_code]:
|
|
|
+ if not fpath.exists():
|
|
|
+ continue
|
|
|
+ if ts >= anomaly_cutoff:
|
|
|
+ # 触发时刻前1分钟内 -> anomaly_files
|
|
|
+ anomaly_files.append(fpath)
|
|
|
+ elif ts >= pre_cutoff:
|
|
|
+ # 触发时刻前1~(N+1)分钟 -> pre_files
|
|
|
+ pre_files.append(fpath)
|
|
|
+
|
|
|
+ # 如果 anomaly_files 仍为空,把当前 pending_files 加入(兜底)
|
|
|
+ if not anomaly_files and pending_files:
|
|
|
+ anomaly_files = [f for f in pending_files if f.exists()]
|
|
|
+
|
|
|
+ # 初始化捕获状态
|
|
|
+ self.anomaly_capture_state[device_code] = {
|
|
|
+ "trigger_time": now,
|
|
|
+ "avg_error": avg_error,
|
|
|
+ "threshold": threshold,
|
|
|
+ "pre_files": pre_files,
|
|
|
+ "anomaly_files": anomaly_files,
|
|
|
+ "post_files": [],
|
|
|
+ "post_start_time": now
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.info(f"异常上下文捕获已触发: {device_code} | "
|
|
|
+ f"前置文件={len(pre_files)}个 | 异常文件={len(anomaly_files)}个 | "
|
|
|
+ f"等待后续{self.context_post_minutes}分钟")
|
|
|
+
|
|
|
+ elif state is not None:
|
|
|
+ # 已触发状态:收集后续文件
|
|
|
+ elapsed_post = (now - state["post_start_time"]).total_seconds() / 60
|
|
|
+
|
|
|
+ if elapsed_post < self.context_post_minutes:
|
|
|
+ # 还在收集后续文件
|
|
|
+ for f in pending_files:
|
|
|
+ if f.exists():
|
|
|
+ state["post_files"].append(f)
|
|
|
+ else:
|
|
|
+ # 时间到,保存所有文件
|
|
|
+ self._save_anomaly_context(device_code, state)
|
|
|
+ # 清除状态
|
|
|
+ del self.anomaly_capture_state[device_code]
|
|
|
+ # 更新保存冷却时间
|
|
|
+ self.last_anomaly_save_time[device_code] = now
|
|
|
+
|
|
|
+ def _save_anomaly_context(self, device_code: str, state: dict):
|
|
|
+ """
|
|
|
+ 保存异常上下文文件到独立目录
|
|
|
+
|
|
|
+ 目录结构:
|
|
|
+ data/anomaly_detected/{device_code}/{异常文件名(不含扩展名)}/
|
|
|
+ ├── 92_1#-1_20260130140313.wav (保持原文件名)
|
|
|
+ ├── ...
|
|
|
+ └── metadata.json
|
|
|
+
|
|
|
+ metadata.json 字段说明:
|
|
|
+ - before_trigger: 触发前 1~(N+1) 分钟的文件(正常时期,用于对比)
|
|
|
+ - at_trigger: 触发时刻前 1 分钟内的文件(异常开始出现的时期)
|
|
|
+ - after_trigger: 触发后 N 分钟的文件(不管异常是否恢复)
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ device_code: 设备编号
|
|
|
+ state: 捕获状态字典
|
|
|
+ """
|
|
|
+ import shutil
|
|
|
+ import json
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 获取第一个异常文件名作为文件夹名
|
|
|
+ anomaly_files = state.get("anomaly_files", [])
|
|
|
+ if not anomaly_files:
|
|
|
+ logger.warning(f"无异常文件,跳过保存: {device_code}")
|
|
|
+ return
|
|
|
+
|
|
|
+ # 用第一个异常文件名(不含扩展名)作为文件夹名
|
|
|
+ first_anomaly = anomaly_files[0]
|
|
|
+ folder_name = first_anomaly.stem # 如 92_1#-1_20260130140313
|
|
|
+ save_dir = self.save_anomaly_dir / device_code / folder_name
|
|
|
+ save_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+
|
|
|
+ # 收集所有文件名(使用新命名)
|
|
|
+ all_files = {"before_trigger": [], "at_trigger": [], "after_trigger": []}
|
|
|
+
|
|
|
+ # 移动前置文件(来自日期目录,移动后原位置不再保留)
|
|
|
+ for fpath in state.get("pre_files", []):
|
|
|
+ if fpath.exists():
|
|
|
+ dest = save_dir / fpath.name
|
|
|
+ shutil.move(str(fpath), str(dest)) # 移动而非复制,避免重复
|
|
|
+ all_files["before_trigger"].append(fpath.name)
|
|
|
+
|
|
|
+ # 移动触发时刻文件(保持原名)
|
|
|
+ for fpath in anomaly_files:
|
|
|
+ if fpath.exists():
|
|
|
+ dest = save_dir / fpath.name
|
|
|
+ shutil.move(str(fpath), str(dest))
|
|
|
+ all_files["at_trigger"].append(fpath.name)
|
|
|
+
|
|
|
+ # 移动后续文件(保持原名)
|
|
|
+ for fpath in state.get("post_files", []):
|
|
|
+ if fpath.exists():
|
|
|
+ dest = save_dir / fpath.name
|
|
|
+ shutil.move(str(fpath), str(dest))
|
|
|
+ all_files["after_trigger"].append(fpath.name)
|
|
|
+
|
|
|
+ # 生成精简的元数据文件
|
|
|
+ trigger_time = state.get("trigger_time")
|
|
|
+ metadata = {
|
|
|
+ "device_code": device_code,
|
|
|
+ "trigger_time": trigger_time.strftime("%Y-%m-%d %H:%M:%S") if trigger_time else None,
|
|
|
+ "avg_error": round(state.get("avg_error", 0.0), 6),
|
|
|
+ "threshold": round(state.get("threshold", 0.0), 6),
|
|
|
+ "files": all_files
|
|
|
+ }
|
|
|
+
|
|
|
+ metadata_path = save_dir / "metadata.json"
|
|
|
+ with open(metadata_path, 'w', encoding='utf-8') as f:
|
|
|
+ json.dump(metadata, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+ total = sum(len(v) for v in all_files.values())
|
|
|
+ logger.warning(f"异常上下文已保存: {device_code}/{folder_name} | "
|
|
|
+ f"共{total}个文件 (前{len(all_files['before_trigger'])}+异常{len(all_files['at_trigger'])}+后{len(all_files['after_trigger'])})")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"保存异常上下文失败: {device_code} | {e}")
|
|
|
+
|
|
|
+ def _compute_frequency_middle(self, device_code):
|
|
|
+ """
|
|
|
+ 计算历史频谱图平均值(normal_frequency_middle)
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ device_code: 设备编号
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ dB值列表的平均值
|
|
|
+ """
|
|
|
+ history = self.freq_history.get(device_code, [])
|
|
|
+ if not history or len(history) < 2:
|
|
|
+ return []
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 收集所有历史dB值(只保留长度一致的)
|
|
|
+ all_db = []
|
|
|
+ ref_len = len(history[0][1]) # 使用第一条记录的长度作为参考
|
|
|
+
|
|
|
+ for _, db in history:
|
|
|
+ if len(db) == ref_len:
|
|
|
+ all_db.append(db)
|
|
|
+
|
|
|
+ if not all_db:
|
|
|
+ return []
|
|
|
+
|
|
|
+ # 计算各频率点的平均dB
|
|
|
+ avg_db = np.mean(all_db, axis=0).tolist()
|
|
|
+ return avg_db
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"计算频谱图历史平均失败: {e}")
|
|
|
+ return []
|
|
|
+
|
|
|
+ def _get_threshold(self, device_code):
|
|
|
+ """
|
|
|
+ 获取指定设备的阈值
|
|
|
+
|
|
|
+ 优先级:
|
|
|
+ 1. 从 multi_predictor 获取设备对应模型的阈值
|
|
|
+ 2. 使用配置文件中的默认阈值
|
|
|
+ 3. 返回0.0(不进行异常判定)
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ device_code: 设备编号(如 "LT-1")
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ 阈值,未找到返回默认值或0.0
|
|
|
+ """
|
|
|
+ # 方式1:从 multi_predictor 获取设备阈值
|
|
|
+ thr = self.multi_predictor.get_threshold(device_code)
|
|
|
+ if thr is not None:
|
|
|
+ logger.debug(f"阈值来源: {device_code} -> 设备模型阈值 = {thr:.6f}")
|
|
|
+ return thr
|
|
|
+
|
|
|
+ # 方式2:使用配置中的默认阈值
|
|
|
+ default_threshold = self.config.get('prediction', {}).get('default_threshold', 0.0)
|
|
|
+
|
|
|
+ if default_threshold > 0:
|
|
|
+ logger.debug(f"阈值来源: {device_code} -> 配置默认值 = {default_threshold:.6f}")
|
|
|
+ return default_threshold
|
|
|
+
|
|
|
+ # 首次找不到阈值时记录警告
|
|
|
+ if device_code not in getattr(self, '_threshold_warned', set()):
|
|
|
+ if not hasattr(self, '_threshold_warned'):
|
|
|
+ self._threshold_warned = set()
|
|
|
+ self._threshold_warned.add(device_code)
|
|
|
+ logger.warning(f"未找到阈值: {device_code},将跳过异常判定")
|
|
|
+
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+ def _get_inlet_flow(self, stream_config: RTSPStreamConfig) -> float:
|
|
|
+ """
|
|
|
+ 获取进水流量(使用实时数据接口)
|
|
|
+
|
|
|
+ 使用 current-data 接口直接获取最新一条数据
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ stream_config: 流配置
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ 进水流量值,失败返回0.0
|
|
|
+ """
|
|
|
+ if not self.scada_enabled or not stream_config:
|
|
|
+ logger.debug(f"流量跳过: scada_enabled={self.scada_enabled}, stream_config={stream_config}")
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+ # 获取PLC数据点位
|
|
|
+ plc_address = stream_config.get_flow_plc_address()
|
|
|
+ if not plc_address:
|
|
|
+ logger.debug(f"流量跳过(无PLC地址): {stream_config.device_code} | pump_name='{stream_config.pump_name}'")
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+ # 使用水厂配置的 project_id
|
|
|
+ project_id = stream_config.project_id
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 当前时间戳(毫秒)
|
|
|
+ now_ms = int(datetime.now().timestamp() * 1000)
|
|
|
+
|
|
|
+ # 请求参数
|
|
|
+ params = {"time": now_ms}
|
|
|
+
|
|
|
+ # 请求体:使用实时数据接口格式
|
|
|
+ request_body = [
|
|
|
+ {
|
|
|
+ "deviceId": "1",
|
|
|
+ "deviceItems": plc_address,
|
|
|
+ "deviceName": f"流量_{stream_config.pump_name}",
|
|
|
+ "project_id": project_id
|
|
|
+ }
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 请求头
|
|
|
+ headers = {
|
|
|
+ "Content-Type": "application/json",
|
|
|
+ "JWT-TOKEN": self.scada_jwt
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.debug(f"流量请求: {stream_config.device_code} | project_id={project_id} | plc={plc_address}")
|
|
|
+
|
|
|
+ # 发送 POST 请求到实时接口
|
|
|
+ response = requests.post(
|
|
|
+ self.scada_realtime_url,
|
|
|
+ params=params,
|
|
|
+ json=request_body,
|
|
|
+ headers=headers,
|
|
|
+ timeout=self.scada_timeout
|
|
|
+ )
|
|
|
+
|
|
|
+ if response.status_code == 200:
|
|
|
+ data = response.json()
|
|
|
+ if data.get("code") == 200:
|
|
|
+ if data.get("data"):
|
|
|
+ # 获取第一条数据(实时接口只返回最新一条)
|
|
|
+ latest = data["data"][0]
|
|
|
+ if "val" in latest:
|
|
|
+ flow = float(latest["val"])
|
|
|
+ logger.debug(f"流量获取成功: {stream_config.device_code} | 流量={flow}")
|
|
|
+ return flow
|
|
|
+ else:
|
|
|
+ logger.debug(f"流量数据无val字段: {stream_config.device_code}")
|
|
|
+ else:
|
|
|
+ # API正常返回但无数据
|
|
|
+ logger.debug(f"流量查询无数据: {stream_config.device_code}")
|
|
|
+ else:
|
|
|
+ logger.warning(f"流量API返回异常: {stream_config.device_code} | code={data.get('code')} | msg={data.get('msg')}")
|
|
|
+ else:
|
|
|
+ logger.warning(f"流量HTTP错误: {stream_config.device_code} | status={response.status_code}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"流量获取异常: {stream_config.device_code} | {e}")
|
|
|
+
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+ def _compute_frequency_spectrum(self, audio_data):
|
|
|
+ """
|
|
|
+ 计算频谱图数据
|
|
|
+
|
|
|
+ 将1分钟内的多个8秒音频片段合并,计算整体FFT
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ audio_data: 音频数据列表
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ dB值列表(0-8000Hz均匀分布,共400个点)
|
|
|
+ """
|
|
|
+ if not audio_data:
|
|
|
+ return []
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 合并所有音频片段
|
|
|
+ combined = np.concatenate(audio_data)
|
|
|
+
|
|
|
+ # 计算FFT
|
|
|
+ n = len(combined)
|
|
|
+ fft_result = np.fft.rfft(combined)
|
|
|
+ freqs = np.fft.rfftfreq(n, 1.0 / CFG.SR)
|
|
|
+
|
|
|
+ # 计算幅度(转换为dB)
|
|
|
+ magnitude = np.abs(fft_result)
|
|
|
+ # 避免log(0)
|
|
|
+ magnitude = np.maximum(magnitude, 1e-10)
|
|
|
+ db = 20 * np.log10(magnitude)
|
|
|
+
|
|
|
+ # 降采样到400个点(0-8000Hz均匀分布)
|
|
|
+ max_freq = 8000
|
|
|
+ num_points = 400
|
|
|
+
|
|
|
+ db_list = []
|
|
|
+
|
|
|
+ for i in range(num_points):
|
|
|
+ # 计算目标频率(0到8000Hz均匀分布)
|
|
|
+ target_freq = (i / (num_points - 1)) * max_freq
|
|
|
+ # 找到最接近的频率索引
|
|
|
+ idx = np.argmin(np.abs(freqs - target_freq))
|
|
|
+ if idx < len(freqs):
|
|
|
+ db_list.append(float(db[idx]))
|
|
|
+
|
|
|
+ return db_list
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"计算频谱图失败: {e}")
|
|
|
+ return []
|
|
|
+
|
|
|
+ def _push_detection_result(self, stream_config, device_code,
|
|
|
+ is_anomaly, trigger_alert, abnormal_score, score_threshold,
|
|
|
+ running_status, inlet_flow,
|
|
|
+ freq_db, freq_middle_db=None,
|
|
|
+ anomaly_type_code=6,
|
|
|
+ abnormal_wav_b64=""):
|
|
|
+ """
|
|
|
+ 推送检测结果到远程服务器
|
|
|
+
|
|
|
+ 上报格式符合用户要求:
|
|
|
+ - 包含频谱图数据(this_frequency + normal_frequency_middle)
|
|
|
+ - 包含启停状态和进水流量
|
|
|
+ - sound_detection.status 和 abnormalwav 仅在 trigger_alert=True 时上报异常(0)和base64,否则为正常(1)和空
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ stream_config: 流配置
|
|
|
+ device_code: 设备编号
|
|
|
+ is_anomaly: 实际检测是否异常
|
|
|
+ trigger_alert: 是否触发报警(仅在新异常产生时为True)
|
|
|
+ abnormal_score: 平均重建误差
|
|
|
+ score_threshold: 阈值
|
|
|
+ running_status: 启停状态
|
|
|
+ inlet_flow: 进水流量
|
|
|
+ freq_db: 本次频谱图dB值列表
|
|
|
+ freq_middle_db: 历史平均频谱图dB值列表
|
|
|
+ abnormal_wav_b64: 预读的异常音频base64编码(在文件归档前读取,避免竞态)
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ import time as time_module
|
|
|
+
|
|
|
+ # 获取设备名称
|
|
|
+ camera_name = stream_config.camera_name
|
|
|
+
|
|
|
+ # 从 push_base_urls 列表构建推送目标(每个 base_url + /{project_id})
|
|
|
+ if not self.push_base_urls:
|
|
|
+ logger.warning(f"未配置push_base_urls: {device_code}")
|
|
|
+ return
|
|
|
+
|
|
|
+ # 获取该设备对应的 project_id,用于拼接最终推送URL
|
|
|
+ project_id = stream_config.project_id
|
|
|
+
|
|
|
+ # 构建推送消息
|
|
|
+ request_time = int(time_module.time() * 1000)
|
|
|
+
|
|
|
+ # 异常音频已通过参数 abnormal_wav_b64 传入(调用方在文件归档前预读)
|
|
|
+ # 如果调用方未提供,记录警告以便排查
|
|
|
+ if trigger_alert and not abnormal_wav_b64:
|
|
|
+ logger.warning(f"报警推送但无异常音频数据: {device_code}")
|
|
|
+
|
|
|
+ # 决定 sound_detection.status
|
|
|
+ # 只有在 trigger_alert=True 时才报 0 (异常)
|
|
|
+ # 其他情况(正常、持续异常)都报 1 (正常/空) -- 根据用户要求: "正常情况下 ... 是空 就行"
|
|
|
+ report_status = 0 if trigger_alert else 1
|
|
|
+
|
|
|
+ payload = {
|
|
|
+ "message": {
|
|
|
+ # 通道信息
|
|
|
+ "channelInfo": {"name": camera_name},
|
|
|
+ # 请求时间戳
|
|
|
+ "requestTime": request_time,
|
|
|
+ # 分类信息
|
|
|
+ "classification": {
|
|
|
+ "level_one": 2, # 音频检测大类
|
|
|
+ "level_two": anomaly_type_code # 异常类型小类(6=未分类, 7=轴承, 8=气蚀, 9=松动, 10=叶轮, 11=阀件)
|
|
|
+ },
|
|
|
+ # 技能信息
|
|
|
+ "skillInfo": {"name": "异响检测"},
|
|
|
+ # 声音检测数据
|
|
|
+ "sound_detection": {
|
|
|
+ # 异常音频
|
|
|
+ "abnormalwav": abnormal_wav_b64,
|
|
|
+ # 状态:0=异常(仅新异常), 1=正常(或持续异常)
|
|
|
+ "status": report_status,
|
|
|
+ # 设备状态信息
|
|
|
+ "condition": {
|
|
|
+ "running_status": "运行中" if running_status == "开机" else "停机中",
|
|
|
+ "inlet_flow": inlet_flow
|
|
|
+ },
|
|
|
+ # 得分信息
|
|
|
+ "score": {
|
|
|
+ "abnormal_score": abnormal_score, # 当前1分钟平均重构误差
|
|
|
+ "score_threshold": score_threshold # 该设备异常阀值
|
|
|
+ },
|
|
|
+ # 频谱图数据
|
|
|
+ "frequency": {
|
|
|
+ "this_frequency": freq_db, # 当前1分钟的频谱图
|
|
|
+ "normal_frequency_middle": freq_middle_db or [], # 过去10分钟的频谱图平均
|
|
|
+ "normal_frequency_upper": [], # 上限(暂为空)
|
|
|
+ "normal_frequency_lower": [] # 下限(暂为空)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ # 遍历所有推送基地址,逐个拼接 project_id 发送(各目标互不影响)
|
|
|
+ push_targets = [
|
|
|
+ (f"{item['url']}/{project_id}", item['label'])
|
|
|
+ for item in self.push_base_urls
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 逐个目标推送(各目标互不影响)
|
|
|
+ for target_url, target_label in push_targets:
|
|
|
+ self._send_to_target(
|
|
|
+ target_url, target_label, payload,
|
|
|
+ device_code, camera_name, trigger_alert, abnormal_score
|
|
|
+ )
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"推送通知异常: {e}")
|
|
|
+
|
|
|
+ def _send_to_target(self, target_url, target_label, payload,
|
|
|
+ device_code, camera_name, trigger_alert, abnormal_score):
|
|
|
+ # 向单个推送目标发送数据(含重试逻辑)
|
|
|
+ # 各目标独立调用,某个目标失败不影响其他目标
|
|
|
+ import time as time_module
|
|
|
+
|
|
|
+ push_success = False
|
|
|
+ for attempt in range(self.push_retry_count + 1):
|
|
|
+ try:
|
|
|
+ response = requests.post(
|
|
|
+ target_url,
|
|
|
+ json=payload,
|
|
|
+ timeout=self.push_timeout,
|
|
|
+ headers={"Content-Type": "application/json"}
|
|
|
+ )
|
|
|
+
|
|
|
+ if response.status_code == 200:
|
|
|
+ alert_tag = "报警" if trigger_alert else "心跳"
|
|
|
+ logger.info(
|
|
|
+ f" [{alert_tag}][{target_label}] {device_code}({camera_name}) | "
|
|
|
+ f"误差={abnormal_score:.6f}"
|
|
|
+ )
|
|
|
+ push_success = True
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ logger.warning(
|
|
|
+ f"推送失败[{target_label}]: {device_code} | URL={target_url} | "
|
|
|
+ f"状态码={response.status_code} | 内容={response.text[:100]}"
|
|
|
+ )
|
|
|
+
|
|
|
+ except requests.exceptions.Timeout:
|
|
|
+ logger.warning(f"推送超时[{target_label}]: {device_code} | URL={target_url} | 尝试 {attempt + 1}/{self.push_retry_count + 1}")
|
|
|
+ except requests.exceptions.RequestException as e:
|
|
|
+ logger.warning(f"推送异常[{target_label}]: {device_code} | URL={target_url} | {e}")
|
|
|
+
|
|
|
+ # 重试间隔
|
|
|
+ if attempt < self.push_retry_count:
|
|
|
+ time_module.sleep(1)
|
|
|
+
|
|
|
+ if not push_success:
|
|
|
+ logger.error(f"推送失败[{target_label}]: {device_code} | URL={target_url} | 已达最大重试次数")
|
|
|
+
|
|
|
+ def _move_audio_to_date_dir(self, wav_file):
|
|
|
+ """
|
|
|
+ 将音频移动到日期目录归档
|
|
|
+
|
|
|
+ 目录结构:
|
|
|
+ deploy_pickup/data/{device_code}/{日期}/{文件名}
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ wav_file: 音频文件路径
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ import shutil
|
|
|
+
|
|
|
+ # 从文件名提取device_code和日期
|
|
|
+ # 格式: 92_1#-1_20251218142000.wav
|
|
|
+ match = re.match(r'\d+_(.+)_(\d{8})\d{6}\.wav', wav_file.name)
|
|
|
+ if not match:
|
|
|
+ logger.warning(f"无法从文件名提取信息: {wav_file.name}")
|
|
|
+ return
|
|
|
+
|
|
|
+ device_code = match.group(1) # 如 1#-1
|
|
|
+ date_str = match.group(2) # YYYYMMDD
|
|
|
+
|
|
|
+ # 构建目标目录: data/{device_code}/{日期}/
|
|
|
+ date_dir = self.audio_dir / device_code / date_str
|
|
|
+ date_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+
|
|
|
+ # 移动文件
|
|
|
+ dest_file = date_dir / wav_file.name
|
|
|
+ shutil.move(str(wav_file), str(dest_file))
|
|
|
+ logger.debug(f"音频已归档: {device_code}/{date_str}/{wav_file.name}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"移动音频失败: {wav_file.name} | 错误: {e}")
|
|
|
+
|
|
|
+ def _move_audio_to_transition_dir(self, wav_file, reason):
|
|
|
+ """
|
|
|
+ 将泵停机/过渡期的音频移动到过渡期目录
|
|
|
+
|
|
|
+ 目录结构:
|
|
|
+ deploy_pickup/data/{device_code}/pump_transition/{文件名}
|
|
|
+
|
|
|
+ 这些音频不会被用于模型训练,但保留用于分析调试
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ wav_file: 音频文件路径
|
|
|
+ reason: 原因标识(stopped=停机, transition=过渡期)
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ import shutil
|
|
|
+
|
|
|
+ # 从文件名提取device_code
|
|
|
+ # 格式: 92_1#-1_20251218142000.wav
|
|
|
+ match = re.match(r'\d+_(.+)_\d{14}\.wav', wav_file.name)
|
|
|
+ if not match:
|
|
|
+ logger.warning(f"无法从文件名提取信息: {wav_file.name}")
|
|
|
+ return
|
|
|
+
|
|
|
+ device_code = match.group(1) # 如 1#-1
|
|
|
+
|
|
|
+ # 构建目标目录: data/{device_code}/pump_transition/
|
|
|
+ transition_dir = self.audio_dir / device_code / "pump_transition"
|
|
|
+ transition_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+
|
|
|
+ # 移动文件
|
|
|
+ dest_file = transition_dir / wav_file.name
|
|
|
+ shutil.move(str(wav_file), str(dest_file))
|
|
|
+ logger.debug(f"过渡期音频已归档: {device_code}/pump_transition/{wav_file.name} ({reason})")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"移动过渡期音频失败: {wav_file.name} | 错误: {e}")
|
|
|
+
|
|
|
+ def _move_audio_to_anomaly_pending(self, wav_file):
|
|
|
+ """
|
|
|
+ 将异常音频移动到待排查目录
|
|
|
+
|
|
|
+ 目录结构:
|
|
|
+ deploy_pickup/data/{device_code}/anomaly_pending/{文件名}
|
|
|
+
|
|
|
+ 用户确认是误报后,可手动将文件移到日期目录参与增训
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ wav_file: 音频文件路径
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ import shutil
|
|
|
+
|
|
|
+ # 从文件名提取device_code
|
|
|
+ # 格式: 92_1#-1_20251218142000.wav
|
|
|
+ match = re.match(r'\d+_(.+)_\d{14}\.wav', wav_file.name)
|
|
|
+ if not match:
|
|
|
+ logger.warning(f"无法从文件名提取信息: {wav_file.name}")
|
|
|
+ return
|
|
|
+
|
|
|
+ device_code = match.group(1) # 如 1#-1
|
|
|
+
|
|
|
+ # 构建目标目录: data/{device_code}/anomaly_pending/
|
|
|
+ pending_dir = self.audio_dir / device_code / "anomaly_pending"
|
|
|
+ pending_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+
|
|
|
+ # 移动文件
|
|
|
+ dest_file = pending_dir / wav_file.name
|
|
|
+ shutil.move(str(wav_file), str(dest_file))
|
|
|
+ logger.debug(f"异常音频已隔离: {device_code}/anomaly_pending/{wav_file.name}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"移动异常音频失败: {wav_file.name} | 错误: {e}")
|
|
|
+
|
|
|
+ def _cleanup_old_files(self, days: int = 7):
|
|
|
+ """
|
|
|
+ 清理超过指定天数的正常音频文件
|
|
|
+
|
|
|
+ 清理规则:
|
|
|
+ - 只清理日期归档目录(data/{device_code}/{日期}/)
|
|
|
+ - 保留current目录和anomaly_detected目录
|
|
|
+ - 超过days天的文件删除
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ days: 保留天数,默认7天
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ import shutil
|
|
|
+ from datetime import datetime, timedelta
|
|
|
+
|
|
|
+ # 计算截止日期
|
|
|
+ cutoff_date = datetime.now() - timedelta(days=days)
|
|
|
+ cutoff_str = cutoff_date.strftime("%Y%m%d")
|
|
|
+
|
|
|
+ deleted_count = 0
|
|
|
+
|
|
|
+ # 遍历每个设备目录
|
|
|
+ for device_dir in self.audio_dir.iterdir():
|
|
|
+ if not device_dir.is_dir():
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 检查是否是日期目录(跳过current和其他特殊目录)
|
|
|
+ for subdir in device_dir.iterdir():
|
|
|
+ if not subdir.is_dir():
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 跳过current目录
|
|
|
+ if subdir.name == "current":
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 检查是否为日期目录(YYYYMMDD格式)
|
|
|
+ if not re.match(r'^\d{8}$', subdir.name):
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 如果日期早于截止日期,删除整个目录
|
|
|
+ if subdir.name < cutoff_str:
|
|
|
+ try:
|
|
|
+ shutil.rmtree(subdir)
|
|
|
+ deleted_count += 1
|
|
|
+ logger.debug(f"清理过期目录: {device_dir.name}/{subdir.name}")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"删除目录失败: {subdir} | {e}")
|
|
|
+
|
|
|
+ if deleted_count > 0:
|
|
|
+ logger.info(f"清理完成: 共删除{deleted_count}个过期目录")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"清理过期文件失败: {e}")
|
|
|
+
|
|
|
+
|
|
|
+class PickupMonitoringSystem:
|
|
|
+ """
|
|
|
+ 拾音器监控系统
|
|
|
+
|
|
|
+ 管理FFmpeg进程和监控线程
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self, db_path=None):
|
|
|
+ """
|
|
|
+ 初始化监控系统
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ db_path: SQLite 数据库路径(为 None 时使用默认路径 config/pickup_config.db)
|
|
|
+ """
|
|
|
+ self.db_path = db_path
|
|
|
+
|
|
|
+ # 初始化 ConfigManager
|
|
|
+ actual_db = Path(db_path) if db_path else get_db_path()
|
|
|
+ if not actual_db.exists():
|
|
|
+ raise FileNotFoundError(
|
|
|
+ f"\u914d\u7f6e\u6570\u636e\u5e93\u4e0d\u5b58\u5728: {actual_db}\n"
|
|
|
+ f"\u8bf7\u5148\u8fd0\u884c\u8fc1\u79fb\u811a\u672c: python tool/migrate_yaml_to_db.py"
|
|
|
+ )
|
|
|
+ self.config_manager = ConfigManager(str(actual_db))
|
|
|
+ print(f"\u914d\u7f6e\u6e90: SQLite ({actual_db})")
|
|
|
+
|
|
|
+ self.config = self._load_config()
|
|
|
+
|
|
|
+ # 冷启动模式标记
|
|
|
+ self.cold_start_mode = False
|
|
|
+
|
|
|
+ # 初始化多模型预测器(支持每个设备独立模型)
|
|
|
+ self.multi_predictor = MultiModelPredictor()
|
|
|
+ self.predictor = None # 兼容性保留,已废弃
|
|
|
+
|
|
|
+ # 从配置中注册所有设备的模型目录映射
|
|
|
+ print("\n正在初始化多模型预测器...")
|
|
|
+ for plant in self.config.get('plants', []):
|
|
|
+ if not plant.get('enabled', False):
|
|
|
+ continue
|
|
|
+ for stream in plant.get('rtsp_streams', []):
|
|
|
+ device_code = stream.get('device_code', '')
|
|
|
+ model_subdir = stream.get('model_subdir', device_code)
|
|
|
+ if device_code and model_subdir:
|
|
|
+ self.multi_predictor.register_device(device_code, model_subdir)
|
|
|
+ print(f" 注册设备: {device_code} -> models/{model_subdir}/")
|
|
|
+
|
|
|
+ print(f"已注册 {len(self.multi_predictor.registered_devices)} 个设备模型映射")
|
|
|
+
|
|
|
+ # 进程和监控器列表
|
|
|
+ self.ffmpeg_processes = []
|
|
|
+ self.monitors = []
|
|
|
+
|
|
|
+ # 信号处理
|
|
|
+ signal.signal(signal.SIGINT, self._signal_handler)
|
|
|
+ signal.signal(signal.SIGTERM, self._signal_handler)
|
|
|
+
|
|
|
+ def _load_config(self):
|
|
|
+ """
|
|
|
+ 从 SQLite DB 加载配置
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ Dict: 配置字典
|
|
|
+ """
|
|
|
+ config = self.config_manager.get_full_config()
|
|
|
+ logger.info("配置已从 SQLite 加载")
|
|
|
+ return config
|
|
|
+
|
|
|
+ def _parse_rtsp_streams(self):
|
|
|
+ """
|
|
|
+ 解析配置文件中的RTSP流信息
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ List[RTSPStreamConfig]: RTSP流配置列表
|
|
|
+ """
|
|
|
+ streams = []
|
|
|
+
|
|
|
+ plants = self.config.get('plants', [])
|
|
|
+ if not plants:
|
|
|
+ raise ValueError("配置文件中未找到水厂配置")
|
|
|
+
|
|
|
+ for plant in plants:
|
|
|
+ plant_name = plant.get('name')
|
|
|
+ if not plant_name:
|
|
|
+ print("警告: 跳过未命名的区域配置")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 检查是否启用该水厂(默认启用以兼容旧配置)
|
|
|
+ if not plant.get('enabled', True):
|
|
|
+ logger.debug(f"跳过禁用的水厂: {plant_name}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 获取流量PLC配置
|
|
|
+ flow_plc = plant.get('flow_plc', {})
|
|
|
+
|
|
|
+ # 获取该水厂的project_id(每个plant有自己的project_id)
|
|
|
+ project_id = plant.get('project_id', 92)
|
|
|
+ logger.info(f"加载区域配置: {plant_name} | project_id={project_id}")
|
|
|
+
|
|
|
+ rtsp_streams = plant.get('rtsp_streams', [])
|
|
|
+ for stream in rtsp_streams:
|
|
|
+ url = stream.get('url')
|
|
|
+ channel = stream.get('channel')
|
|
|
+ camera_name = stream.get('name', '')
|
|
|
+ device_code = stream.get('device_code', '')
|
|
|
+ pump_name = stream.get('pump_name', '')
|
|
|
+
|
|
|
+ if not url or channel is None:
|
|
|
+ print(f"警告: 跳过不完整的RTSP流配置 (区域: {plant_name})")
|
|
|
+ continue
|
|
|
+
|
|
|
+ streams.append(RTSPStreamConfig(
|
|
|
+ plant_name=plant_name,
|
|
|
+ rtsp_url=url,
|
|
|
+ channel=channel,
|
|
|
+ camera_name=camera_name,
|
|
|
+ device_code=device_code,
|
|
|
+ pump_name=pump_name,
|
|
|
+ flow_plc=flow_plc,
|
|
|
+ project_id=project_id
|
|
|
+ ))
|
|
|
+
|
|
|
+ return streams
|
|
|
+
|
|
|
+ def start(self):
|
|
|
+ """
|
|
|
+ 启动监控系统
|
|
|
+ """
|
|
|
+ print("=" * 70)
|
|
|
+ print("拾音器异响检测系统")
|
|
|
+ print("=" * 70)
|
|
|
+
|
|
|
+ # 解析流配置
|
|
|
+ streams = self._parse_rtsp_streams()
|
|
|
+ print(f"\n共配置 {len(streams)} 个拾音设备:")
|
|
|
+ for stream in streams:
|
|
|
+ print(f" - {stream.device_code} | {stream.camera_name}")
|
|
|
+
|
|
|
+ # 启动FFmpeg进程
|
|
|
+ print("\n启动FFmpeg进程...")
|
|
|
+ for stream in streams:
|
|
|
+ ffmpeg = FFmpegProcess(stream, CFG.AUDIO_DIR, self.config)
|
|
|
+ if ffmpeg.start():
|
|
|
+ self.ffmpeg_processes.append(ffmpeg)
|
|
|
+ else:
|
|
|
+ print(f"警告: FFmpeg启动失败,跳过该流: {stream}")
|
|
|
+
|
|
|
+ if not self.ffmpeg_processes:
|
|
|
+ print("\n错误: 所有FFmpeg进程均启动失败")
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+ print(f"\n成功启动 {len(self.ffmpeg_processes)}/{len(streams)} 个FFmpeg进程")
|
|
|
+
|
|
|
+ # 收集所有流配置(用于PickupMonitor)
|
|
|
+ all_stream_configs = [p.stream_config for p in self.ffmpeg_processes]
|
|
|
+
|
|
|
+ # 启动监控线程(统一一个监控器)
|
|
|
+ print("\n启动监控线程...")
|
|
|
+ check_interval = self.config.get('prediction', {}).get('check_interval', 1.0)
|
|
|
+
|
|
|
+ monitor = PickupMonitor(
|
|
|
+ audio_dir=CFG.AUDIO_DIR,
|
|
|
+ multi_predictor=self.multi_predictor,
|
|
|
+ stream_configs=all_stream_configs,
|
|
|
+ check_interval=check_interval,
|
|
|
+ config=self.config,
|
|
|
+ config_manager=self.config_manager
|
|
|
+ )
|
|
|
+ monitor.start()
|
|
|
+ self.monitors.append(monitor)
|
|
|
+
|
|
|
+ print(f"\n成功启动监控线程")
|
|
|
+ print("\n" + "=" * 70)
|
|
|
+ print("系统已启动,开始监控...")
|
|
|
+ print("按 Ctrl+C 停止系统")
|
|
|
+ print("=" * 70 + "\n")
|
|
|
+
|
|
|
+ # FFmpeg重启配置
|
|
|
+ max_restart_attempts = 5 # 单个进程最大重启次数
|
|
|
+ restart_interval_base = 30 # 基础重启间隔(秒)-> 改为30s
|
|
|
+ restart_counts = {id(p): 0 for p in self.ffmpeg_processes} # 重启计数
|
|
|
+
|
|
|
+ # 主循环(带自动重启)
|
|
|
+ try:
|
|
|
+ while True:
|
|
|
+ # 检查每个FFmpeg进程状态
|
|
|
+ for ffmpeg in self.ffmpeg_processes:
|
|
|
+ if not ffmpeg.is_running():
|
|
|
+ pid = id(ffmpeg)
|
|
|
+ device_code = ffmpeg.stream_config.device_code
|
|
|
+
|
|
|
+ # 检查重启次数
|
|
|
+ if restart_counts.get(pid, 0) < max_restart_attempts:
|
|
|
+ # 计算等待时间(指数退避)
|
|
|
+ wait_time = restart_interval_base * (2 ** restart_counts.get(pid, 0))
|
|
|
+ logger.warning(f"FFmpeg进程停止: {device_code} | 将在{wait_time}秒后重启 | "
|
|
|
+ f"重试次数: {restart_counts.get(pid, 0) + 1}/{max_restart_attempts}")
|
|
|
+
|
|
|
+ time.sleep(wait_time)
|
|
|
+
|
|
|
+ # 尝试重启
|
|
|
+ if ffmpeg.start():
|
|
|
+ logger.debug(f"FFmpeg重启成功: {device_code}")
|
|
|
+ restart_counts[pid] = 0 # 重置计数
|
|
|
+ else:
|
|
|
+ restart_counts[pid] = restart_counts.get(pid, 0) + 1
|
|
|
+ logger.error(f"FFmpeg重启失败: {device_code}")
|
|
|
+ else:
|
|
|
+ logger.error(f"FFmpeg达到最大重启次数: {device_code} | 已放弃重启")
|
|
|
+
|
|
|
+ # 检查是否所有进程都已放弃
|
|
|
+ running_count = sum(1 for p in self.ffmpeg_processes if p.is_running())
|
|
|
+ all_abandoned = all(restart_counts.get(id(p), 0) >= max_restart_attempts
|
|
|
+ for p in self.ffmpeg_processes if not p.is_running())
|
|
|
+
|
|
|
+ if running_count == 0 and all_abandoned:
|
|
|
+ print("\n错误: 所有FFmpeg进程均已停止且无法重启")
|
|
|
+ break
|
|
|
+
|
|
|
+ # 每天0点执行7天清理
|
|
|
+ current_hour = datetime.now().hour
|
|
|
+ current_date = datetime.now().strftime("%Y%m%d")
|
|
|
+
|
|
|
+ if not hasattr(self, '_last_cleanup_date'):
|
|
|
+ self._last_cleanup_date = ""
|
|
|
+
|
|
|
+ # 在0点且今天还没清理过时执行
|
|
|
+ if current_hour == 0 and self._last_cleanup_date != current_date:
|
|
|
+ logger.info("执行7天过期文件清理...")
|
|
|
+ for monitor in self.monitors:
|
|
|
+ monitor._cleanup_old_files(days=7)
|
|
|
+ self._last_cleanup_date = current_date
|
|
|
+
|
|
|
+ # 定期打印RTSP状态(每分钟一次)
|
|
|
+ if not hasattr(self, '_last_status_log'):
|
|
|
+ self._last_status_log = datetime.now()
|
|
|
+
|
|
|
+ if (datetime.now() - self._last_status_log).total_seconds() >= 60:
|
|
|
+ running = sum(1 for p in self.ffmpeg_processes if p.is_running())
|
|
|
+ total = len(self.ffmpeg_processes)
|
|
|
+ logger.info(f"RTSP状态: {running}/{total} 个FFmpeg进程运行中")
|
|
|
+ logger.info("─" * 60)
|
|
|
+ self._last_status_log = datetime.now()
|
|
|
+
|
|
|
+ time.sleep(10)
|
|
|
+ except KeyboardInterrupt:
|
|
|
+ print("\n\n收到停止信号,正在关闭系统...")
|
|
|
+ finally:
|
|
|
+ self.stop()
|
|
|
+
|
|
|
+ def stop(self):
|
|
|
+ """
|
|
|
+ 停止监控系统
|
|
|
+ """
|
|
|
+ print("\n正在停止监控系统...")
|
|
|
+
|
|
|
+ print("停止监控线程...")
|
|
|
+ for monitor in self.monitors:
|
|
|
+ monitor.stop()
|
|
|
+
|
|
|
+ print("停止FFmpeg进程...")
|
|
|
+ for ffmpeg in self.ffmpeg_processes:
|
|
|
+ ffmpeg.stop()
|
|
|
+
|
|
|
+ print("系统已完全停止")
|
|
|
+
|
|
|
+ def _signal_handler(self, signum, frame):
|
|
|
+ """
|
|
|
+ 信号处理函数
|
|
|
+ """
|
|
|
+ print(f"\n\n收到信号 {signum},正在关闭系统...")
|
|
|
+ self.stop()
|
|
|
+ sys.exit(0)
|
|
|
+
|
|
|
+
|
|
|
+def _start_config_api_server(config_manager, multi_predictor=None, port=18080):
|
|
|
+ # 在后台线程中启动 FastAPI 配置管理 API
|
|
|
+ try:
|
|
|
+ import uvicorn
|
|
|
+ init_config_api(config_manager, multi_predictor)
|
|
|
+ logger.info(f"启动配置管理 API: http://0.0.0.0:{port}")
|
|
|
+ uvicorn.run(config_app, host="0.0.0.0", port=port, log_level="warning")
|
|
|
+ except ImportError:
|
|
|
+ logger.warning("uvicorn 未安装,配置管理 API 无法启动。请安装: pip install uvicorn")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"配置管理 API 启动失败: {e}")
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ """
|
|
|
+ 主函数
|
|
|
+ """
|
|
|
+ # 检测 DB 是否存在
|
|
|
+ db_path = get_db_path(Path(__file__).parent / "config")
|
|
|
+ if not db_path.exists():
|
|
|
+ print(f"错误: 配置数据库不存在: {db_path}")
|
|
|
+ print(f"\n请先运行迁移脚本: python tool/migrate_yaml_to_db.py")
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+ try:
|
|
|
+ system = PickupMonitoringSystem()
|
|
|
+
|
|
|
+ # 后台线程启动配置管理 API
|
|
|
+ api_port = 18080
|
|
|
+ api_thread = threading.Thread(
|
|
|
+ target=_start_config_api_server,
|
|
|
+ args=(system.config_manager, system.multi_predictor, api_port),
|
|
|
+ daemon=True,
|
|
|
+ name="config-api"
|
|
|
+ )
|
|
|
+ api_thread.start()
|
|
|
+ system.start()
|
|
|
+ except FileNotFoundError as e:
|
|
|
+ print(f"\n错误: {e}")
|
|
|
+ print("\n请确保:")
|
|
|
+ print("1. 已完成训练并计算阈值")
|
|
|
+ print("2. 已复制必要的模型文件到 models/ 目录")
|
|
|
+ sys.exit(1)
|
|
|
+ except Exception as e:
|
|
|
+ print(f"\n严重错误: {e}")
|
|
|
+ import traceback
|
|
|
+ traceback.print_exc()
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|