| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- run_pickup_monitor.py
- ---------------------
- 拾音器异响检测主程序
- 功能说明:
- 该程序用于音频采集设备(拾音器)的实时异常检测。
- 与摄像头版本不同,本版本:
- 1. 没有视频/图片采集和上报
- 2. 上报时包含音频频谱图分析数据
- 3. 支持进水流量PLC数据读取
- 4. 每1分钟计算一次平均重建误差并上报
- 上报数据结构:
- {
- "message": {
- "channelInfo": {"name": "设备信息"},
- "requestTime": 时间戳,
- "classification": {
- "level_one": 2, // 音频检测大类
- "level_two": 6 // 异常类型小类: 6=未分类, 7=轴承, 8=气蚀, 9=松动/共振, 10=叶轮, 11=阀件
- },
- "skillInfo": {"name": "异响检测"},
- "sound_detection": {
- "abnormalwav": "", // base64编码的音频
- "status": 0/1, // 0=异常, 1=正常
- "condition": {
- "running_status": "运行中/停机中", // 启停状态
- "inlet_flow": 0.0 // 进水流量
- },
- "score": {
- "abnormal_score": 0.0, // 当前1分钟内8s平均重构误差
- "score_threshold": 0.0 // 该设备的异常阀值
- },
- "frequency": {
- "this_frequency": [], // 当前1分钟的频谱图
- "normal_frequency_middle": [], // 过去10分钟的频谱图平均
- "normal_frequency_upper": [], // 上限(暂为空)
- "normal_frequency_lower": [] // 下限(暂为空)
- }
- }
- }
- }
- 使用方法:
- python run_pickup_monitor.py
- 配置文件:
- config/rtsp_config.yaml
- """
- import subprocess
- import time
- import re
- import signal
- import sys
- import threading
- import logging
- import base64
- from concurrent.futures import ThreadPoolExecutor
- from pathlib import Path
- from datetime import datetime, timedelta
- from collections import defaultdict
- # 用于计算FFT
- import numpy as np
- try:
- import librosa
- except ImportError:
- print("错误:缺少librosa库,请安装: pip install librosa")
- sys.exit(1)
- try:
- import requests
- except ImportError:
- print("错误:缺少requests库,请安装: pip install requests")
- sys.exit(1)
- # 导入预测器模块
- from predictor import MultiModelPredictor, CFG
- # 导入配置管理模块(SQLite)
- from config.config_manager import ConfigManager
- from config.config_api import app as config_app, init_config_api
- from config.db_models import get_db_path
- # 导入泵状态监控模块(用于检测启停过渡期)
- try:
- from core.pump_state_monitor import PumpStateMonitor
- PUMP_STATE_MONITOR_AVAILABLE = True
- except ImportError:
- PUMP_STATE_MONITOR_AVAILABLE = False
- # 导入人体检测读取模块(用于抑制有人时的误报)
- try:
- from core.human_detection_reader import HumanDetectionReader
- HUMAN_DETECTION_AVAILABLE = True
- except ImportError:
- HUMAN_DETECTION_AVAILABLE = False
- # 导入报警聚合器(跨设备聚合抑制 + 分类型冷却)
- try:
- from core.alert_aggregator import AlertAggregator
- ALERT_AGGREGATOR_AVAILABLE = True
- except ImportError:
- ALERT_AGGREGATOR_AVAILABLE = False
- # ========================================
- # 配置日志系统
- # ========================================
- def setup_logging():
- # 配置日志系统(RotatingFileHandler -> system.log)
- from logging.handlers import RotatingFileHandler
- # 如果根 logger 已经被上层调用者配置过,则直接复用
- root = logging.getLogger()
- if root.handlers:
- return logging.getLogger('PickupMonitor')
- # 日志配置
- log_dir = Path(__file__).parent / "logs"
- log_dir.mkdir(parents=True, exist_ok=True)
- log_file = log_dir / "system.log"
- formatter = logging.Formatter(
- '%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S'
- )
- # 按文件大小轮转,最多保留 2 个备份(共 30MB)
- file_handler = RotatingFileHandler(
- log_file,
- maxBytes=10 * 1024 * 1024,
- backupCount=2,
- encoding='utf-8'
- )
- file_handler.setFormatter(formatter)
- # 控制台输出(前台运行时可见,后台运行时 stdout 已被丢弃不影响)
- console_handler = logging.StreamHandler(sys.stdout)
- console_handler.setFormatter(formatter)
- logging.basicConfig(
- level=logging.INFO,
- handlers=[file_handler, console_handler]
- )
- return logging.getLogger('PickupMonitor')
- # 初始化日志系统
- logger = setup_logging()
- # 导入能量基线模块
- try:
- from core.energy_baseline import EnergyBaseline
- ENERGY_BASELINE_AVAILABLE = True
- except ImportError:
- ENERGY_BASELINE_AVAILABLE = False
- logger.warning("能量基线模块未找到,泵状态检测功能禁用")
- class RTSPStreamConfig:
- """
- RTSP流配置类
-
- 封装单个RTSP流的配置信息,包含拾音器特有字段
- """
-
- def __init__(self, plant_name, rtsp_url, channel,
- camera_name, device_code, pump_name,
- flow_plc, project_id):
- """
- 初始化RTSP流配置
-
- 参数:
- plant_name: 区域名称(泵房-反渗透高压泵等)
- rtsp_url: RTSP流URL
- channel: 通道号
- camera_name: 设备名称
- device_code: 设备编号(如1#-1)
- pump_name: 泵名称(A/B/C/D),用于匹配流量PLC
- flow_plc: 流量PLC地址映射
- """
- self.plant_name = plant_name
- self.rtsp_url = rtsp_url
- self.channel = channel
- self.pump_id = f"ch{channel}"
- self.camera_name = camera_name or f"ch{channel}"
- self.device_code = device_code
- self.pump_name = pump_name
- self.flow_plc = flow_plc or {}
- self.project_id = project_id
-
- def get_flow_plc_address(self):
- """
- 获取该设备对应的进水流量PLC地址
-
- 返回:
- PLC地址字符串,不存在则返回空字符串
- """
- if self.pump_name and self.flow_plc:
- return self.flow_plc.get(self.pump_name, "")
- return ""
-
- def __repr__(self):
- return f"RTSPStreamConfig(plant='{self.plant_name}', camera='{self.camera_name}', pump='{self.pump_id}')"
- class FFmpegProcess:
- """
- FFmpeg进程管理类
-
- 负责启动和管理单个RTSP流的FFmpeg进程。
- 进程将RTSP流转换为固定时长的WAV音频文件。
-
- 文件名格式: {project_id}_{device_code}_{时间戳}.wav
- """
-
- def __init__(self, stream_config, output_dir, config=None):
- """
- 初始化FFmpeg进程管理器
-
- 参数:
- stream_config: RTSP流配置
- output_dir: 音频输出目录
- config: 全局配置字典
- """
- self.config_dict = config or {}
- self.stream_config = stream_config
- self.output_dir = output_dir
- self.process = None
-
- # 从配置中读取文件时长,默认8秒
- audio_cfg = self.config_dict.get('audio', {})
- self.file_duration = audio_cfg.get('file_duration', 8)
-
- # 获取project_id(从 stream_config 中读取)
- self.project_id = stream_config.project_id
-
- def start(self):
- """
- 启动FFmpeg进程
-
- 返回:
- bool: 启动成功返回True,失败返回False
- """
- # 获取设备编号
- device_code = self.stream_config.device_code or self.stream_config.pump_id
-
- # 创建输出目录(每个设备独立目录)
- # 结构: data/{device_code}/current/
- current_dir = self.output_dir / device_code / "current"
- current_dir.mkdir(parents=True, exist_ok=True)
- # 存放人工核查确认为正常的异常音频,增训时全量参与训练
- verified_dir = self.output_dir / device_code / "verified_normal"
- verified_dir.mkdir(parents=True, exist_ok=True)
-
- # 构建输出文件名模板
- # 格式: {project_id}_{device_code}_{时间戳}.wav
- # 例如: 92_1#-1_20251218142000.wav
- output_pattern = str(current_dir / f"{self.project_id}_{device_code}_%Y%m%d%H%M%S.wav")
-
- # 构建FFmpeg命令
- # 添加内存限制参数,防止 RTSP 缓冲区无限增长导致 OOM
- cmd = [
- "ffmpeg",
- # RTSP 输入参数(内存限制)
- "-rtsp_transport", "tcp", # 使用TCP传输
- "-probesize", "1000000", # 限制探测大小为1MB(默认5MB)
- "-analyzeduration", "1000000", # 限制分析时长为1秒(默认5秒)
- "-max_delay", "500000", # 最大延迟500ms
- "-fflags", "nobuffer", # 禁用输入缓冲
- "-flags", "low_delay", # 低延迟模式
- "-i", self.stream_config.rtsp_url, # 输入RTSP流
- # 音频输出参数
- "-vn", # 不处理视频
- "-ac", "1", # 单声道
- "-ar", str(CFG.SR), # 采样率(16000Hz)
- "-f", "segment", # 分段模式
- "-segment_time", str(int(self.file_duration)), # 每段时长
- "-strftime", "1", # 启用时间格式化
- "-loglevel", "error", # 只输出错误日志
- "-y", # 覆盖已存在的文件
- output_pattern,
- ]
-
- try:
- # 启动FFmpeg进程
- self.process = subprocess.Popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE
- )
-
- logger.info(f"FFmpeg已启动: {device_code} | {self.stream_config.camera_name} | "
- f"文件时长: {self.file_duration}秒 | PID={self.process.pid}")
- return True
-
- except FileNotFoundError:
- logger.error("FFmpeg错误: 未找到ffmpeg命令,请确保已安装FFmpeg")
- return False
- except Exception as e:
- logger.error(f"FFmpeg启动失败: {device_code} | 错误: {e}")
- return False
-
- def is_running(self):
- """
- 检查FFmpeg进程是否在运行
-
- 返回:
- bool: 进程运行中返回True,否则返回False
- """
- if self.process is None:
- return False
- return self.process.poll() is None
-
- def stop(self):
- """
- 停止FFmpeg进程
- """
- if self.process is not None and self.is_running():
- logger.info(f"FFmpeg停止: {self.stream_config.plant_name} | {self.stream_config.camera_name} | PID={self.process.pid}")
- self.process.terminate()
- try:
- self.process.wait(timeout=5)
- except subprocess.TimeoutExpired:
- logger.warning(f"FFmpeg强制终止: PID={self.process.pid}")
- self.process.kill()
- class PickupMonitor:
- """
- 拾音器监控线程类
-
- 监控音频目录,调用预测器检测异常,推送告警。
-
- 主要功能:
- 1. 不截取视频帧(纯音频)
- 2. 计算并上报频谱图数据(this_frequency + normal_frequency_middle)
- 3. 每分钟汇总上报一次
- 4. 使用SCADA API获取进水流量
- """
-
- def __init__(self, audio_dir, multi_predictor,
- stream_configs,
- check_interval=1.0, config=None,
- config_manager=None):
- """
- 初始化监控器
-
- Args:
- audio_dir: 音频根目录
- multi_predictor: 多模型预测器实例
- stream_configs: 所有RTSP流配置
- check_interval: 检查间隔(秒)
- config: 配置字典
- config_manager: ConfigManager 实例(用于热更新,为 None 时禁用热更新)
- """
- # 音频根目录(各设备目录在其下)
- self.audio_dir = audio_dir
- self.multi_predictor = multi_predictor
- self.predictor = None # 兼容性保留,已废弃
- self.stream_configs = stream_configs
- self.check_interval = check_interval
- self.config = config or {}
-
- # 热更新:持有 ConfigManager 引用,定期从 DB 刷新配置
- self._config_manager = config_manager
- self._last_config_reload = 0 # 上次配置刷新时间戳
- self._config_reload_interval = 30 # 配置刷新间隔(秒)
-
- # project_id
- self.project_id = self.config.get('platform', {}).get('project_id', 92)
-
- # 构建device_code到stream_config的映射
- self.device_map = {}
- for cfg in stream_configs:
- if cfg.device_code:
- self.device_map[cfg.device_code] = cfg
-
- # 已处理文件集合
- self.seen_files = set()
-
- # 每个设备的检测结果缓存(用于1分钟汇总)
- # key: device_code(如 "1#-1")
- self.device_cache = defaultdict(lambda: {
- "errors": [], # 每8秒的重建误差列表
- "last_upload": None, # 上次上报时间
- "audio_data": [], # 用于计算频谱图的音频数据
- "status": None # 最近的运行状态
- })
-
- # 频谱图历史缓存(用于计算normal_frequency_middle)
- # key: device_code, value: list of (timestamp, freq_db)
- freq_cfg = self.config.get('prediction', {}).get('frequency_history', {})
- self.freq_history_enabled = freq_cfg.get('enabled', True)
- self.freq_history_minutes = freq_cfg.get('history_minutes', 10)
- self.freq_history = defaultdict(list)
-
- # 上一次上报状态(True=异常,False=正常,None=初始)
- # 用于状态变更时去重,防止持续报警
- self.last_report_status = {}
-
- # 上报周期(秒)
- audio_cfg = self.config.get('audio', {})
- self.segment_duration = audio_cfg.get('segment_duration', 60)
-
- # 异常音频保存配置
- save_cfg = self.config.get('prediction', {}).get('save_anomaly_audio', {})
- self.save_anomaly_enabled = save_cfg.get('enabled', True)
- self.save_anomaly_dir = Path(__file__).parent / save_cfg.get('save_dir', 'data/anomaly_detected')
- if self.save_anomaly_enabled:
- self.save_anomaly_dir.mkdir(parents=True, exist_ok=True)
-
- # 异常推送配置
- push_cfg = self.config.get('push_notification', {})
- self.push_enabled = push_cfg.get('enabled', False)
- self.alert_enabled = push_cfg.get('alert_enabled', True) # 是否启用异常告警(false=暂时禁用异常上报)
- self.push_timeout = push_cfg.get('timeout', 30)
- self.push_retry_count = push_cfg.get('retry_count', 2)
- # 推送基地址列表(统一用 base_url/{project_id} 拼接,可无限扩展)
- raw_urls = push_cfg.get('push_base_urls', [])
- self.push_base_urls = [
- {"label": item.get("label", f"target-{i}"), "url": item.get("url", "").rstrip("/")}
- for i, item in enumerate(raw_urls)
- if item.get("url") # 跳过空URL
- ]
- # 推送失败记录文件路径
- failed_log_path = push_cfg.get('failed_push_log', 'data/push_failures.jsonl')
- self.failed_push_log = Path(__file__).parent / failed_log_path
- self.failed_push_log.parent.mkdir(parents=True, exist_ok=True)
-
- # 如果 alert_enabled 为 False,记录日志提醒
- if not self.alert_enabled:
- logger.warning("异常告警已禁用(alert_enabled=false),仅上报心跳状态")
-
- # ========================================
- # 报警聚合器(替代原有固定 cooldown_minutes)
- # ----------------------------------------
- # 功能1:跨设备聚合抑制 - 同一水厂多设备同时报警 -> 环境噪声,全部抑制
- # 功能2:分类型冷却 - 同类型24h,不同类型1h
- # ========================================
- agg_cfg = push_cfg.get('alert_aggregate', {})
- self.alert_aggregator = None
- if ALERT_AGGREGATOR_AVAILABLE:
- self.alert_aggregator = AlertAggregator(
- push_callback=self._push_detection_result,
- aggregate_enabled=agg_cfg.get('enabled', True),
- window_seconds=agg_cfg.get('window_seconds', 300),
- min_devices=agg_cfg.get('min_devices', 2),
- cooldown_same_type_hours=push_cfg.get('cooldown_same_type_hours', 24),
- cooldown_diff_type_hours=push_cfg.get('cooldown_diff_type_hours', 1)
- )
- else:
- logger.warning("报警聚合器模块未找到,使用默认报警逻辑")
-
- # 上次异常音频保存时间(用于保存冷却时间计算)
- self.last_anomaly_save_time = {}
- # 异常保存冷却时间(分钟),同一设备连续异常时,每N分钟只保存一次
- self.anomaly_save_cooldown_minutes = save_cfg.get('cooldown_minutes', 10)
-
- # 当前异常分类结果锁定(持续异常期间保持分类结果不变)
- # key: device_code, value: (anomaly_type_code, type_name)
- self.locked_anomaly_type = {}
-
- # 滑动窗口投票配置(5次中有3次异常才判定为异常)
- voting_cfg = self.config.get('prediction', {}).get('voting', {})
- self.voting_enabled = voting_cfg.get('enabled', True)
- self.voting_window_size = voting_cfg.get('window_size', 5) # 窗口大小
- self.voting_threshold = voting_cfg.get('threshold', 3) # 异常阈值(>=3次则判定异常)
- self.detection_history = {} # 每个设备的检测历史(True=异常)
-
- # 阈值容差区间配置(避免边界值反复跳变)
- # 误差在 threshold*(1-tolerance) ~ threshold*(1+tolerance) 范围内为灰区,维持上一状态
- self.tolerance_ratio = voting_cfg.get('tolerance_ratio', 0.05) # 默认5%容差
- self.last_single_anomaly = {} # 每个设备上一次的单周期判定结果
-
- # 阈值现在由 multi_predictor 管理,每个设备从其对应模型目录加载
-
- # 能量检测配置
- energy_cfg = self.config.get('prediction', {}).get('energy_detection', {})
- self.energy_detection_enabled = ENERGY_BASELINE_AVAILABLE and energy_cfg.get('enabled', True)
- self.skip_detection_when_stopped = energy_cfg.get('skip_when_stopped', True)
-
- # 能量基线检测器(每个设备一个)
- if self.energy_detection_enabled:
- self.energy_baselines = {}
- else:
- self.energy_baselines = None
-
- # SCADA API配置(用于获取泵状态和进水流量)
- scada_cfg = self.config.get('scada_api', {})
- self.scada_enabled = scada_cfg.get('enabled', False)
- self.scada_url = scada_cfg.get('base_url', '') # 历史数据接口(备用)
- self.scada_realtime_url = scada_cfg.get('realtime_url', '') # 实时数据接口(主用)
- self.scada_jwt = scada_cfg.get('jwt_token', '')
- self.scada_timeout = scada_cfg.get('timeout', 10)
-
- # 泵状态监控器(用于检测启停过渡期)
- self.pump_state_monitor = None
- self.pump_status_plc_configs = {} # {pump_name: [{point, name}, ...]}
- if PUMP_STATE_MONITOR_AVAILABLE and self.scada_enabled:
- # 初始化泵状态监控器
- # 获取第一个启用水厂的project_id
- project_id = 0
- for plant in self.config.get('plants', []):
- if plant.get('enabled', False):
- project_id = plant.get('project_id', 0)
- # 加载泵状态点位配置
- pump_status_plc = plant.get('pump_status_plc', {})
- self.pump_status_plc_configs = pump_status_plc
- break
-
- if project_id > 0:
- # 使用实时接口 URL 进行泵状态查询
- self.pump_state_monitor = PumpStateMonitor(
- scada_url=self.scada_realtime_url, # 使用实时数据接口
- scada_jwt=self.scada_jwt,
- project_id=project_id,
- timeout=self.scada_timeout,
- transition_window_minutes=15 # 启停后15分钟内视为过渡期
- )
- logger.info(f"泵状态监控器已启用 (project_id={project_id}, 过渡期窗口=15分钟, 使用实时接口)")
-
- # 线程控制
- self.running = False
- self.thread = None
- # 推送线程池(避免推送超时阻塞主监控循环)
- self._push_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="push")
-
- # 启动时间(用于跳过启动期间的状态变化日志)
- self.startup_time = None
- self.startup_warmup_seconds = 120 # 启动后120秒内不记录状态变化
-
- # ========================================
- # 异常上下文捕获配置
- # ========================================
- context_cfg = save_cfg.get('context_capture', {})
- self.context_pre_minutes = context_cfg.get('pre_minutes', 2)
- self.context_post_minutes = context_cfg.get('post_minutes', 2)
- # 音频文件历史缓存(用于捕获异常前的音频)
- # key: device_code, value: deque of (timestamp, file_path)
- from collections import deque
- # 计算需要保留的历史文件数量(按分钟计算,每分钟约1个文件)
- history_size = self.context_pre_minutes + 2 # 多保留2个作为缓冲
- self.audio_file_history = defaultdict(lambda: deque(maxlen=history_size))
- # 异常上下文捕获状态
- # key: device_code, value: dict
- self.anomaly_capture_state = {}
- logger.info(f"异常上下文捕获: 前{self.context_pre_minutes}分钟 + 后{self.context_post_minutes}分钟")
-
- # ========================================
- # 人体检测报警抑制配置
- # ========================================
- # 只要任意摄像头在冷却时间内检测到人,所有设备的异常报警都会被抑制
- human_cfg = self.config.get('human_detection', {})
- self.human_detection_enabled = human_cfg.get('enabled', False) and HUMAN_DETECTION_AVAILABLE
- self.human_reader = None
-
- if self.human_detection_enabled:
- db_path = human_cfg.get('db_path', '')
- cooldown_minutes = human_cfg.get('cooldown_minutes', 5)
- self.human_reader = HumanDetectionReader(
- db_path=db_path,
- cooldown_minutes=cooldown_minutes
- )
- logger.info(f"人体检测报警抑制已启用 (冷却时间={cooldown_minutes}分钟, 数据库={db_path})")
-
- # ========================================
- # 远程异响检测调度配置
- # ========================================
- # 通过定时 GET 请求远程接口,根据返回的 mode_type、model_list、时间窗口
- # 动态控制 alert_enabled 开关
- schedule_cfg = self.config.get('detection_schedule', {})
- self._detection_schedule_url = schedule_cfg.get('url', '')
- self._detection_schedule_interval = schedule_cfg.get('poll_interval', 60)
- self._detection_schedule_timeout = schedule_cfg.get('request_timeout', 10)
- # 独立计时器,与配置热刷新的 30 秒间隔解耦
- self._last_detection_schedule_check = 0
- # 远程调度控制的暂停标志(优先级最高,True 时跳过整个检测流程,包括模型推理)
- self.detection_paused = False
-
- if self._detection_schedule_url:
- logger.info(f"远程异响调度已启用 | URL={self._detection_schedule_url} | 间隔={self._detection_schedule_interval}秒")
-
- def start(self):
- """
- 启动监控线程
- """
- if self.running:
- return
-
- # 启动前清理current目录中的遗留文件
- self._cleanup_current_on_startup()
-
- self.running = True
- self.startup_time = datetime.now() # 记录启动时间
- self.thread = threading.Thread(target=self._monitor_loop, daemon=True)
- self.thread.start()
- # 打印监控的设备列表
- device_codes = list(self.device_map.keys())
- logger.info(f"监控线程已启动 | 目录: {self.audio_dir} | 监控设备: {device_codes}")
-
- def _cleanup_current_on_startup(self):
- """
- 启动时清理current目录中的遗留文件
-
- 删除上次运行遗留的文件,避免混入新采集数据影响检测准确性
- """
- cleaned_count = 0
-
- for device_code in self.device_map.keys():
- current_dir = self.audio_dir / device_code / "current"
- if not current_dir.exists():
- continue
-
- for wav_file in current_dir.glob("*.wav"):
- try:
- wav_file.unlink()
- cleaned_count += 1
- except Exception as e:
- logger.warning(f"清理遗留文件失败: {wav_file.name} | {e}")
-
- if cleaned_count > 0:
- logger.info(f"启动清理: 已删除current目录中 {cleaned_count} 个遗留文件")
-
- def stop(self):
- """
- 停止监控线程
- """
- if not self.running:
- return
-
- self.running = False
- if self.thread is not None:
- self.thread.join(timeout=5)
- # 关闭推送线程池,等待已提交的推送任务完成
- self._push_executor.shutdown(wait=True, cancel_futures=False)
- logger.info(f"监控线程已停止")
-
- def _reload_hot_config(self):
- # 从 DB 热加载可变配置项,无需重启服务
- # 只刷新运行时可安全变更的参数,不影响 FFmpeg 进程和流映射
- if self._config_manager is None:
- return
-
- now = time.time()
- # 未达到刷新间隔则跳过
- if now - self._last_config_reload < self._config_reload_interval:
- return
- self._last_config_reload = now
-
- try:
- # 从 DB 读取最新配置
- fresh = self._config_manager.get_full_config()
- self.config = fresh
-
- # 刷新推送配置
- push_cfg = fresh.get('push_notification', {})
- self.push_enabled = push_cfg.get('enabled', False)
- self.alert_enabled = push_cfg.get('alert_enabled', True)
- self.push_timeout = push_cfg.get('timeout', 30)
- self.push_retry_count = push_cfg.get('retry_count', 2)
- raw_urls = push_cfg.get('push_base_urls', [])
- self.push_base_urls = [
- {"label": item.get("label", f"target-{i}"), "url": item.get("url", "").rstrip("/")}
- for i, item in enumerate(raw_urls)
- if item.get("url")
- ]
-
- # 刷新投票配置
- voting_cfg = fresh.get('prediction', {}).get('voting', {})
- self.voting_enabled = voting_cfg.get('enabled', True)
- self.voting_window_size = voting_cfg.get('window_size', 5)
- self.voting_threshold = voting_cfg.get('threshold', 3)
- self.tolerance_ratio = voting_cfg.get('tolerance_ratio', 0.05)
-
- # 刷新能量检测配置
- energy_cfg = fresh.get('prediction', {}).get('energy_detection', {})
- self.skip_detection_when_stopped = energy_cfg.get('skip_when_stopped', True)
-
- # 刷新人体检测配置
- human_cfg = fresh.get('human_detection', {})
- new_human_enabled = human_cfg.get('enabled', False)
- if new_human_enabled != self.human_detection_enabled:
- logger.info(f"人体检测抑制配置变更: {self.human_detection_enabled} -> {new_human_enabled}")
- self.human_detection_enabled = new_human_enabled
-
- # 刷新远程异响调度配置
- schedule_cfg = fresh.get('detection_schedule', {})
- self._detection_schedule_url = schedule_cfg.get('url', '')
- self._detection_schedule_interval = schedule_cfg.get('poll_interval', 60)
- self._detection_schedule_timeout = schedule_cfg.get('request_timeout', 10)
-
- logger.debug("配置热刷新完成")
- except Exception as e:
- logger.error(f"配置热刷新失败: {e}")
-
- def _check_detection_schedule(self):
- # 远程异响检测调度检查
- # 独立于 DB 配置热刷新,有自己的 60 秒计时器
- # 条件全部满足才开启 alert_enabled:
- # 1. model_list 中包含 "异响"
- # 2. mode_type == 1
- # 3. start_time <= 当前时间 <= end_time
- if not self._detection_schedule_url:
- return
-
- now_ts = time.time()
- if now_ts - self._last_detection_schedule_check < self._detection_schedule_interval:
- return
- self._last_detection_schedule_check = now_ts
-
- try:
- resp = requests.get(
- self._detection_schedule_url,
- params={'project_id': self.project_id},
- timeout=self._detection_schedule_timeout
- )
- resp.raise_for_status()
- data = resp.json()
-
- model_list = data.get('model_list', [])
- mode_type = int(data.get('mode_type', 0))
- start_str = data.get('start_time', '')
- end_str = data.get('end_time', '')
-
- # model_list 中没有"异响",说明本项目不涉及异响检测,保持现状不做干预
- if '异响' not in model_list:
- return
-
- if not start_str or not end_str:
- logger.warning("远程调度时间窗口不完整,保持当前状态")
- return
-
- # 解析时间,支持多种常见格式
- now = datetime.now()
- start_dt = self._parse_schedule_time(start_str)
- end_dt = self._parse_schedule_time(end_str)
-
- if start_dt is None or end_dt is None:
- logger.warning(f"时间解析失败: start='{start_str}', end='{end_str}'")
- return
-
- # 核心判定:mode_type==1 且当前时间在窗口内 -> 检测启用
- # 反之 -> 暂停整个检测流程(不做模型推理,不上报)
- should_detect = (mode_type == 1) and (start_dt <= now <= end_dt)
- new_paused = not should_detect
-
- if new_paused != self.detection_paused:
- logger.info(
- f"异响检测调度变更: {'暂停' if new_paused else '恢复'} | "
- f"mode_type={mode_type} | "
- f"窗口=[{start_str}, {end_str}] | 当前={now.strftime('%Y-%m-%d %H:%M:%S')}"
- )
- self.detection_paused = new_paused
- except Exception as e:
- # 请求失败保持当前状态不变
- logger.warning(f"远程异响调度请求失败,保持当前状态: {e}")
-
- @staticmethod
- def _parse_schedule_time(time_str):
- # 尝试多种格式解析时间字符串,全部失败返回 None
- for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M',
- '%Y-%m-%dT%H:%M:%S', '%Y-%m-%dT%H:%M'):
- try:
- return datetime.strptime(time_str, fmt)
- except ValueError:
- continue
- return None
-
- def _monitor_loop(self):
- """
- 监控循环(线程主函数)
-
- 持续检查各设备的音频目录,处理新生成的WAV文件。
- 每分钟汇总一次结果进行上报。
- """
- # 确保根目录存在
- self.audio_dir.mkdir(parents=True, exist_ok=True)
-
- while self.running:
- try:
- # 热更新:定期从 DB 刷新可变配置
- self._reload_hot_config()
- # 远程异响调度:每60秒检查一次远程接口,控制 detection_paused
- self._check_detection_schedule()
- # 远程调度暂停时,从源头跳过整个扫描+推理+上报流程
- # 只保留热更新和调度检查,连目录遍历都不做
- if self.detection_paused:
- time.sleep(self.check_interval)
- continue
- # 扫描所有设备目录下的current文件夹
- for device_code in self.device_map.keys():
- device_current_dir = self.audio_dir / device_code / "current"
-
- # 目录不存在则跳过
- if not device_current_dir.exists():
- continue
-
- for wav_file in device_current_dir.glob("*.wav"):
- # 跳过已处理的文件
- if wav_file in self.seen_files:
- continue
- # 检查文件是否完整
- try:
- stat_info = wav_file.stat()
- file_age = time.time() - stat_info.st_mtime
- file_size = stat_info.st_size
- # 文件还在写入中,下次再检查(不加入 seen_files)
- if file_age < 12.0:
- continue
- # 文件大小异常处理(60秒 × 16000Hz × 2字节 ≈ 1.9MB)
- if file_size < 500_000:
- if file_age > 20.0:
- # 确认是损坏文件,删除并标记
- try:
- wav_file.unlink()
- logger.debug(f"删除异常小文件: {wav_file.name} ({file_size / 1000:.1f}KB)")
- except Exception as e:
- logger.error(f"删除文件失败: {wav_file.name} | {e}")
- self.seen_files.add(wav_file)
- continue
- if file_size > 3_000_000:
- if file_age > 20.0:
- # 文件过大,直接归档到日期目录(不丢弃,不加 seen)
- logger.warning(f"文件过大,直接归档: {wav_file.name} ({file_size / 1000:.1f}KB)")
- self._move_audio_to_date_dir(wav_file)
- self.seen_files.add(wav_file)
- continue
- except Exception as e:
- logger.error(f"文件状态检查失败: {wav_file.name} | {e}")
- continue
- # 处理新文件
- self._process_new_file(wav_file)
- # 标记为已处理
- self.seen_files.add(wav_file)
-
- # 检查是否需要进行周期性上报
- self._check_periodic_upload()
-
- # 检查聚合窗口是否到期,到期则执行聚合判定并推送
- if self.alert_aggregator:
- self.alert_aggregator.check_and_flush()
-
- # 清理过大的已处理文件集合
- if len(self.seen_files) > 10000:
- recent_files = sorted(self.seen_files,
- key=lambda f: f.stat().st_mtime if f.exists() else 0)[-5000:]
- self.seen_files = set(recent_files)
-
- except Exception as e:
- logger.error(f"监控循环错误: {e}")
-
- # 等待下一次检查
- time.sleep(self.check_interval)
-
- def _process_new_file(self, wav_file):
- """
- 处理新的音频文件
-
- 文件名格式: {project_id}_{device_code}_{时间戳}.wav
- 例如: 92_1#-1_20251218142000.wav
-
- 流程:
- 1. 加载音频
- 2. 计算能量判断设备状态
- 3. 进行AE异常检测,记录重建误差
- 4. 保存音频数据用于计算频谱图
- """
- try:
- # 从文件名解析device_code
- # 格式: {project_id}_{device_code}_{时间戳}.wav
- try:
- parts = wav_file.stem.split('_')
- if len(parts) >= 3:
- device_code = parts[1] # 第二部分是device_code
- else:
- device_code = "unknown"
- except:
- device_code = "unknown"
-
- # ========================================
- # 泵启停状态检测(基于 PLC 查询)- 优先于模型检查
- # ----------------------------------------
- # 逻辑:通过 SCADA API 查询 PLC 点位判断泵是否运行
- # 作用:
- # 1. 冷启动模式下也能过滤停机时的音频(保证训练数据质量)
- # 2. 泵停机时跳过异常检测(避免无意义的检测)
- # 3. 记录设备状态用于后续上报
- # 依赖:pump_state_monitor + rtsp_config 中的 pump_status_plc 配置
- # ========================================
- device_status = "未知"
- pump_is_running = True # 默认认为运行中(PLC 查询失败时的保守策略)
-
- # 获取该设备对应的流配置
- stream_config = self.device_map.get(device_code)
-
- if self.pump_state_monitor and stream_config:
- # 根据设备的 pump_name 找到关联的 PLC 点位配置
- pump_name = stream_config.pump_name
- pump_configs = self.pump_status_plc_configs.get(pump_name, [])
-
- if pump_configs:
- # 遍历所有关联泵,只要有一个运行就认为设备在工作
- any_pump_running = False
- for pump_cfg in pump_configs:
- point = pump_cfg.get("point", "")
- name = pump_cfg.get("name", point)
- pump_id = point
-
- # 查询泵状态(带 60 秒缓存,不会频繁请求 SCADA)
- is_running, _ = self.pump_state_monitor.update_pump_state(pump_id, point, name)
- if is_running:
- any_pump_running = True
-
- pump_is_running = any_pump_running
- device_status = "开机" if pump_is_running else "停机"
-
- # 泵全部停机时跳过(可通过配置禁用此行为)
- # 冷启动和正常模式都适用,确保训练数据质量
- if self.skip_detection_when_stopped and not pump_is_running:
- logger.info(f"泵停机(PLC): {device_code} | 归档到过渡期目录(不用于训练)")
- self._move_audio_to_transition_dir(wav_file, "stopped")
- return
-
- # ========================================
- # 过渡期检测(泵启停后15分钟内)
- # ----------------------------------------
- # 目的:过滤启停过程中的不稳定音频
- # 确保训练数据只包含稳定运行期的音频
- # ========================================
- if self.skip_detection_when_stopped:
- pump_in_transition, transition_pump_names = \
- self.pump_state_monitor.check_pumps_transition(pump_configs)
-
- if pump_in_transition:
- logger.info(f"泵过渡期: {device_code} | 归档到过渡期目录(不用于训练) | "
- f"过渡期泵: {', '.join(transition_pump_names)}")
- self._move_audio_to_transition_dir(wav_file, "transition")
- return
-
- # 获取该设备的预测器(懒加载模型)
- device_predictor = self.multi_predictor.get_predictor(device_code)
- if device_predictor is None:
- logger.info(f"冷启动模式(设备 {device_code} 无模型): 归档 {wav_file.name}")
- self._move_audio_to_date_dir(wav_file)
- return
- # 加载音频
- try:
- y, sr = librosa.load(str(wav_file), sr=CFG.SR, mono=True)
- except Exception as e:
- logger.error(f"音频加载失败: {wav_file.name} | {e}")
- return
-
- # 记录状态到缓存(用于周期上报)
- # 注意:泵状态检测已在前面完成,这里只记录状态
- self.device_cache[device_code]["status"] = device_status
-
- # ========================================
- # 计算重建误差
- # ========================================
- error = self._compute_reconstruction_error(wav_file, device_predictor)
-
- if error is not None:
- self.device_cache[device_code]["errors"].append(error)
-
- # ========================================
- # 保存音频数据用于计算频谱图
- # ========================================
- self.device_cache[device_code]["audio_data"].append(y)
-
- # ========================================
- # 暂存文件路径,等待周期聚合判定后再归档
- # ========================================
- if "pending_files" not in self.device_cache[device_code]:
- self.device_cache[device_code]["pending_files"] = []
- self.device_cache[device_code]["pending_files"].append(wav_file)
-
- # ========================================
- # 记录到音频历史缓存(用于异常上下文捕获)
- # ========================================
- self.audio_file_history[device_code].append((datetime.now(), wav_file))
-
- # 初始化上次上报时间
- if self.device_cache[device_code]["last_upload"] is None:
- self.device_cache[device_code]["last_upload"] = datetime.now()
-
- # 获取阈值并判断结果
- threshold = self._get_threshold(device_code)
-
- # ========================================
- # 快速通道:连续多个文件误差极高时快速预警(暂时关闭)
- # 不走投票窗口,用于捕获突发性严重故障
- # ========================================
- # if error is not None and threshold is not None and threshold > 0:
- # # 维护快速通道缓冲区
- # if "fast_alert_buffer" not in self.device_cache[device_code]:
- # self.device_cache[device_code]["fast_alert_buffer"] = []
- #
- # fast_buf = self.device_cache[device_code]["fast_alert_buffer"]
- # # 连续性检查:误差超过 2x 阈值时记录,否则清空缓冲区
- # if error > threshold * 2.0:
- # fast_buf.append(error)
- # else:
- # fast_buf.clear()
- #
- # # 连续 3 个文件(~24秒)都超过 2x 阈值 → 触发快速预警
- # FAST_ALERT_CONSECUTIVE = 3
- # if len(fast_buf) >= FAST_ALERT_CONSECUTIVE:
- # # 检查是否已触发过(避免重复告警)
- # last_fast = self.device_cache[device_code].get("last_fast_alert_time")
- # now = datetime.now()
- # can_fast_alert = (last_fast is None or
- # (now - last_fast).total_seconds() > 300) # 5分钟冷却
- #
- # if can_fast_alert:
- # # 快速通道同样受抑制逻辑约束
- # suppress = False
- # # 泵过渡期抑制
- # if self.pump_state_monitor and stream_config:
- # pump_name = stream_config.pump_name
- # pump_configs = self.pump_status_plc_configs.get(pump_name, [])
- # if pump_configs:
- # in_transition, _ = self.pump_state_monitor.check_pumps_transition(pump_configs)
- # if in_transition:
- # suppress = True
- # # 人体检测抑制
- # if self.human_detection_enabled and self.human_reader:
- # if self.human_reader.is_in_cooldown():
- # suppress = True
- # # alert_enabled 开关
- # if not self.alert_enabled:
- # suppress = True
- #
- # if not suppress:
- # avg_fast = float(np.mean(fast_buf))
- # logger.warning(
- # f"[!!] 快速通道触发: {device_code} | "
- # f"连续{len(fast_buf)}个文件异常 | "
- # f"平均误差={avg_fast:.6f} 阈值={threshold:.6f}")
- # self.device_cache[device_code]["last_fast_alert_time"] = now
- # fast_buf.clear()
- # # 标记快速预警,在下次周期上报时一并处理
- # self.device_cache[device_code]["fast_alert_pending"] = True
-
- if error is not None and threshold is not None:
- is_anomaly = error > threshold
- result_tag = "!!" if is_anomaly else "OK"
- logger.info(f"[{result_tag}] {device_code} | {wav_file.name} | "
- f"误差={error:.6f} 阈值={threshold:.6f}")
- elif error is not None:
- logger.debug(f"文件预测: {wav_file.name} | 误差={error:.6f} | 阀值=未设置")
- else:
- logger.warning(f"预测跳过: {wav_file.name} | 误差计算失败")
-
- except Exception as e:
- logger.error(f"处理文件失败: {wav_file.name} | 错误: {e}")
-
- def _compute_reconstruction_error(self, wav_file, device_predictor):
- """
- 计算单个音频文件的重建误差(Min-Max 标准化)
-
- 使用8秒窗口、4秒步长切割音频,提取多个patches分别计算误差后取平均值。
-
- 参数:
- wav_file: 音频文件路径
- device_predictor: 设备预测器实例
-
- 返回:
- 重建误差值(所有patches的平均MSE),失败返回None
- """
- try:
- import torch
- from predictor.utils import align_to_target
-
- # Min-Max 标准化参数
- global_min = device_predictor.global_min
- global_max = device_predictor.global_max
-
- # 加载音频
- y, _ = librosa.load(str(wav_file), sr=CFG.SR, mono=True)
-
- win_samples = int(CFG.WIN_SEC * CFG.SR)
- hop_samples = int(CFG.HOP_SEC * CFG.SR)
-
- if len(y) < win_samples:
- logger.warning(f"音频太短,无法提取patches: {wav_file.name}")
- return None
-
- patches = []
- for start in range(0, len(y) - win_samples + 1, hop_samples):
- window = y[start:start + win_samples]
-
- S = librosa.feature.melspectrogram(
- y=window, sr=CFG.SR, n_fft=CFG.N_FFT,
- hop_length=CFG.HOP_LENGTH, n_mels=CFG.N_MELS, power=2.0
- )
- S_db = librosa.power_to_db(S, ref=np.max)
-
- if S_db.shape[1] < CFG.TARGET_FRAMES:
- S_db = np.pad(S_db, ((0, 0), (0, CFG.TARGET_FRAMES - S_db.shape[1])))
- else:
- S_db = S_db[:, :CFG.TARGET_FRAMES]
-
- # Min-Max 标准化
- S_norm = (S_db - global_min) / (global_max - global_min + 1e-6)
- patches.append(S_norm.astype(np.float32))
-
- if not patches:
- logger.warning(f"未能提取任何patches: {wav_file.name}")
- return None
-
- arr = np.stack(patches, 0)
- arr = np.expand_dims(arr, 1)
- tensor = torch.from_numpy(arr)
-
- torch_device = device_predictor.torch_device
- tensor = tensor.to(torch_device)
-
- with torch.no_grad():
- recon = device_predictor.model(tensor)
- recon = align_to_target(recon, tensor)
- mse_per_patch = torch.mean((recon - tensor) ** 2, dim=[1, 2, 3])
- mean_mse = torch.mean(mse_per_patch).item()
-
- logger.debug(f"重建误差: {wav_file.name} | patches={len(patches)} | MSE={mean_mse:.6f}")
- return mean_mse
-
- except Exception as e:
- logger.error(f"计算重建误差失败: {wav_file.name} | {e}")
- return None
-
- def _check_periodic_upload(self):
- """
- 检查是否需要进行周期性上报
-
- 每分钟汇总一次各设备的检测结果并上报
- """
- # 远程调度暂停时跳过上报,与 _monitor_loop 中的推理拦截保持一致
- if self.detection_paused:
- return
-
- now = datetime.now()
-
- for device_code, cache in self.device_cache.items():
- # 检查上报时间间隔
- last_upload = cache.get("last_upload")
- if last_upload is None:
- continue
-
- elapsed = (now - last_upload).total_seconds()
-
- # 达到上报周期
- if elapsed >= self.segment_duration:
- # 获取该设备的流配置
- stream_config = self.device_map.get(device_code)
-
- # 计算平均重建误差
- errors = cache.get("errors", [])
- avg_error = float(np.mean(errors)) if errors else 0.0
-
- # 获取阈值
- threshold = self._get_threshold(device_code)
-
- # 判断当前周期是否异常(带容差区间)
- # 容差区间:避免边界值反复跳变,但灰区内仍与阈值比较
- if threshold:
- upper_bound = threshold * (1 + self.tolerance_ratio) # 确定异常边界
- lower_bound = threshold * (1 - self.tolerance_ratio) # 确定正常边界
-
- if avg_error > upper_bound:
- # 超过上边界 -> 确定异常
- is_current_anomaly = True
- elif avg_error < lower_bound:
- # 低于下边界 -> 确定正常
- is_current_anomaly = False
- else:
- # 灰区 -> 与阈值比较(避免异常状态延长)
- is_current_anomaly = avg_error > threshold
-
- # 记录本次判定结果
- self.last_single_anomaly[device_code] = is_current_anomaly
- else:
- is_current_anomaly = False
-
- # ========================================
- # 滑动窗口投票:5次中有3次异常才判定为异常
- # ========================================
- if device_code not in self.detection_history:
- self.detection_history[device_code] = []
-
- # 记录本次检测结果
- self.detection_history[device_code].append(is_current_anomaly)
-
- # 保持窗口大小
- if len(self.detection_history[device_code]) > self.voting_window_size:
- self.detection_history[device_code].pop(0)
-
- # 投票判定最终异常状态
- if self.voting_enabled and len(self.detection_history[device_code]) >= self.voting_window_size:
- anomaly_count = sum(self.detection_history[device_code])
- is_anomaly = anomaly_count >= self.voting_threshold
- window_info = f"窗口[{anomaly_count}/{self.voting_window_size}]"
- else:
- is_anomaly = is_current_anomaly
- window_info = "窗口未满"
-
- # ========================================
- # 状态变更检测
- # trigger_alert: 仅在 正常(或初始) -> 异常 时为True
- # 冷却逻辑已移至 AlertAggregator 内部处理
- # ========================================
- last_is_anomaly = self.last_report_status.get(device_code)
- trigger_alert = False
-
- if is_anomaly:
- # 只有状态变化(正常->异常) 才触发报警流程
- # 泵过渡期检查已在 _process_new_file 中完成,过渡期内的文件不会进入 pending
- if last_is_anomaly is None or not last_is_anomaly:
- if self.alert_enabled:
- # 人体检测抑制:任意摄像头检测到人则不报警
- if self.human_detection_enabled and self.human_reader:
- if self.human_reader.is_in_cooldown():
- trigger_alert = False
- status_info = self.human_reader.get_status_info()
- logger.info(f"人体检测抑制: {device_code} | {status_info},跳过报警")
- else:
- trigger_alert = True
- else:
- trigger_alert = True
- else:
- trigger_alert = False
- logger.debug(f"异常告警已禁用,跳过告警: {device_code}")
-
- # 获取运行状态
- running_status = cache.get("status", "未知")
-
- # 获取进水流量
- inlet_flow = self._get_inlet_flow(stream_config) if stream_config else 0.0
-
- # 计算本次频谱图
- audio_data = cache.get("audio_data", [])
- freq_db = self._compute_frequency_spectrum(audio_data)
-
- # 保存到频谱图历史(只保存dB值)
- if self.freq_history_enabled and freq_db:
- self.freq_history[device_code].append((now, freq_db))
- # 清理过期历史
- cutoff = now - timedelta(minutes=self.freq_history_minutes)
- self.freq_history[device_code] = [
- (t, d) for t, d in self.freq_history[device_code]
- if t > cutoff
- ]
-
- # 计算历史频谱图平均值(normal_frequency_middle)
- freq_middle_db = self._compute_frequency_middle(device_code)
-
- # ========================================
- # 异常分类:只在状态从正常变为异常时进行分类
- # 持续异常期间沿用上次分类结果,保持一致性
- # ========================================
- anomaly_type_code = 6 # 默认:未分类异常
- type_name = "未分类异常"
-
- if is_anomaly and audio_data:
- # 检查是否是新的异常(从正常变为异常)
- is_new_anomaly = (last_is_anomaly is None or not last_is_anomaly)
-
- if is_new_anomaly:
- # 新异常:进行分类并锁定结果
- try:
- from core.anomaly_classifier import classify_anomaly
- if len(audio_data) > 0:
- y = audio_data[-1] if isinstance(audio_data[-1], np.ndarray) else np.array(audio_data[-1])
- anomaly_type_code, type_name, confidence = classify_anomaly(y, sr=16000)
- # 锁定分类结果
- self.locked_anomaly_type[device_code] = (anomaly_type_code, type_name)
- logger.info(f"异常分类(新异常): {type_name} (code={anomaly_type_code}, 置信度={confidence:.2f})")
- except Exception as e:
- logger.warning(f"异常分类失败: {e}")
- else:
- # 持续异常:沿用锁定的分类结果
- if device_code in self.locked_anomaly_type:
- anomaly_type_code, type_name = self.locked_anomaly_type[device_code]
- logger.debug(f"异常分类(沿用): {type_name} (code={anomaly_type_code})")
- else:
- # 状态正常时清除锁定的分类结果
- if device_code in self.locked_anomaly_type:
- del self.locked_anomaly_type[device_code]
-
- # 上报逻辑
- if self.push_enabled and stream_config:
- # 预读异常音频base64:在文件被归档/清空之前立即读取
- # 解决竞态问题:异步推送或聚合器延迟推送时文件可能已被移走
- pre_read_wav_b64 = ""
- if trigger_alert:
- try:
- current_pending = cache.get("pending_files", [])
- if current_pending and current_pending[0].exists():
- with open(current_pending[0], "rb") as f:
- pre_read_wav_b64 = base64.b64encode(f.read()).decode('utf-8')
- logger.debug(f"预读异常音频成功: {current_pending[0].name} | size={len(pre_read_wav_b64)}")
- except Exception as e:
- logger.warning(f"预读异常音频失败: {e}")
- if trigger_alert and self.alert_aggregator:
- # 报警走聚合器:跨设备聚合判定 + 分类型冷却
- # 聚合器会在窗口到期后决定是否真正推送
- self.alert_aggregator.submit_alert(
- plant_name=stream_config.plant_name,
- device_code=device_code,
- anomaly_type_code=anomaly_type_code,
- push_kwargs=dict(
- stream_config=stream_config,
- device_code=device_code,
- is_anomaly=is_anomaly,
- trigger_alert=True,
- abnormal_score=avg_error,
- score_threshold=threshold,
- running_status=running_status,
- inlet_flow=inlet_flow,
- freq_db=freq_db,
- freq_middle_db=freq_middle_db,
- anomaly_type_code=anomaly_type_code,
- abnormal_wav_b64=pre_read_wav_b64
- )
- )
- else:
- # 非报警(心跳)或聚合器不可用 -> 提交到线程池异步推送
- self._push_executor.submit(
- self._push_detection_result,
- stream_config=stream_config,
- device_code=device_code,
- is_anomaly=is_anomaly,
- trigger_alert=trigger_alert,
- abnormal_score=avg_error,
- score_threshold=threshold,
- running_status=running_status,
- inlet_flow=inlet_flow,
- freq_db=freq_db,
- freq_middle_db=freq_middle_db,
- anomaly_type_code=anomaly_type_code,
- abnormal_wav_b64=pre_read_wav_b64
- )
-
- # 更新上一次状态
- self.last_report_status[device_code] = is_anomaly
-
- # 日志记录
- thr_str = f"{threshold:.6f}" if threshold else "未设置"
- # 报警去向说明
- if trigger_alert and self.alert_aggregator:
- alert_dest = "-> 聚合器"
- elif trigger_alert:
- alert_dest = "-> 直接推送"
- else:
- alert_dest = ""
-
- # 使用设备名作为标识,增加视觉分隔
- cam_label = ""
- if stream_config:
- cam_label = f"({stream_config.camera_name})"
-
- result_emoji = "!!" if is_anomaly else "OK"
- alert_str = f"报警=是 {alert_dest}" if trigger_alert else "报警=否"
-
- logger.info(
- f"[{result_emoji}] {device_code}{cam_label} | "
- f"误差={avg_error:.6f} 阈值={thr_str} | "
- f"{window_info} | {running_status} | "
- f"{'异常' if is_anomaly else '正常'} | {alert_str}"
- )
-
- # ========================================
- # 根据投票结果归档文件
- # 用投票后的 is_anomaly 决定归档,避免单次波动误归
- # ========================================
- pending_files = cache.get("pending_files", [])
- # 检查是否是新的异常(从正常变为异常)
- is_new_anomaly = is_anomaly and (last_is_anomaly is None or not last_is_anomaly)
- # 更新异常上下文捕获状态机
- self._update_anomaly_capture_state(device_code, is_anomaly, is_new_anomaly,
- avg_error, threshold, now, pending_files)
- if pending_files:
- if is_anomaly:
- # 异常文件由上下文捕获状态机统一管理
- # 状态机会在收集完前+后文件后一次性保存到 anomaly_detected
- pass
- else:
- # 正常 -> 移到日期目录归档
- for f in pending_files:
- self._move_audio_to_date_dir(f)
- # 状态恢复正常时,清除保存冷却时间
- if device_code in self.last_anomaly_save_time:
- del self.last_anomaly_save_time[device_code]
-
- # 重置缓存
- cache["errors"] = []
- cache["audio_data"] = []
- cache["pending_files"] = []
- cache["last_upload"] = now
- logger.info("─" * 60)
-
- def _update_anomaly_capture_state(self, device_code, is_anomaly,
- is_new_anomaly, avg_error,
- threshold, now, pending_files):
- """
- 异常上下文捕获状态机
- 状态流转:
- 1. 无状态 + 新异常 -> 触发捕获,回溯 audio_file_history 获取前置文件
- 2. 已触发 + 异常持续 -> 持续收集异常文件
- 3. 已触发 + 异常结束 -> 开始收集后续文件(post阶段)
- 4. post阶段 + 时间到 -> 保存所有文件到 anomaly_detected
- 所有异常文件(从触发到结束)统一保存到 anomaly_detected 目录,
- 包含 metadata.json 记录误差/阈值等信息。
- """
- state = self.anomaly_capture_state.get(device_code)
- if is_new_anomaly and state is None:
- # 新异常触发:回溯获取前置文件
- anomaly_cutoff = now - timedelta(minutes=1)
- pre_cutoff = now - timedelta(minutes=self.context_pre_minutes + 1)
- pre_files = []
- anomaly_files = []
- for ts, fpath in self.audio_file_history[device_code]:
- if not fpath.exists():
- continue
- if ts >= anomaly_cutoff:
- anomaly_files.append(fpath)
- elif ts >= pre_cutoff:
- pre_files.append(fpath)
- # 兜底:如果回溯没找到,把当前 pending 加入
- if not anomaly_files and pending_files:
- anomaly_files = [f for f in pending_files if f.exists()]
- self.anomaly_capture_state[device_code] = {
- "trigger_time": now,
- "avg_error": avg_error,
- "threshold": threshold,
- "pre_files": pre_files,
- "anomaly_files": anomaly_files,
- "post_files": [],
- "anomaly_ended": False,
- "post_start_time": None
- }
- logger.info(f"异常上下文捕获已触发: {device_code} | "
- f"前置={len(pre_files)}个 | 异常={len(anomaly_files)}个")
- elif state is not None:
- if is_anomaly and not state["anomaly_ended"]:
- # 异常持续中:继续收集异常文件
- for f in pending_files:
- if f.exists():
- state["anomaly_files"].append(f)
- else:
- # 异常结束(或之前已结束,在收集后续文件)
- if not state["anomaly_ended"]:
- state["anomaly_ended"] = True
- state["post_start_time"] = now
- logger.info(f"异常结束,开始收集后续文件: {device_code} | "
- f"异常文件共{len(state['anomaly_files'])}个 | "
- f"等待{self.context_post_minutes}分钟")
- # 收集后续文件
- for f in pending_files:
- if f.exists():
- state["post_files"].append(f)
- # 检查 post 阶段是否到时间
- elapsed_post = (now - state["post_start_time"]).total_seconds() / 60
- if elapsed_post >= self.context_post_minutes:
- self._save_anomaly_context(device_code, state)
- del self.anomaly_capture_state[device_code]
- self.last_anomaly_save_time[device_code] = now
-
- def _save_anomaly_context(self, device_code: str, state: dict):
- """
- 保存异常上下文文件到独立目录
-
- 目录结构:
- data/anomaly_detected/{device_code}/{异常文件名(不含扩展名)}/
- ├── 92_1#-1_20260130140313.wav (保持原文件名)
- ├── ...
- └── metadata.json
-
- metadata.json 字段说明:
- - before_trigger: 触发前 1~(N+1) 分钟的文件(正常时期,用于对比)
- - at_trigger: 触发时刻前 1 分钟内的文件(异常开始出现的时期)
- - after_trigger: 触发后 N 分钟的文件(不管异常是否恢复)
-
- 参数:
- device_code: 设备编号
- state: 捕获状态字典
- """
- import shutil
- import json
-
- try:
- # 获取第一个异常文件名作为文件夹名
- anomaly_files = state.get("anomaly_files", [])
- if not anomaly_files:
- logger.warning(f"无异常文件,跳过保存: {device_code}")
- return
-
- # 用第一个异常文件名(不含扩展名)作为文件夹名
- first_anomaly = anomaly_files[0]
- folder_name = first_anomaly.stem # 如 92_1#-1_20260130140313
- save_dir = self.save_anomaly_dir / device_code / folder_name
- save_dir.mkdir(parents=True, exist_ok=True)
-
- # 收集所有文件名(使用新命名)
- all_files = {"before_trigger": [], "at_trigger": [], "after_trigger": []}
-
- # 移动前置文件(来自日期目录,移动后原位置不再保留)
- for fpath in state.get("pre_files", []):
- if fpath.exists():
- dest = save_dir / fpath.name
- shutil.move(str(fpath), str(dest)) # 移动而非复制,避免重复
- all_files["before_trigger"].append(fpath.name)
-
- # 移动触发时刻文件(保持原名)
- for fpath in anomaly_files:
- if fpath.exists():
- dest = save_dir / fpath.name
- shutil.move(str(fpath), str(dest))
- all_files["at_trigger"].append(fpath.name)
-
- # 移动后续文件(保持原名)
- for fpath in state.get("post_files", []):
- if fpath.exists():
- dest = save_dir / fpath.name
- shutil.move(str(fpath), str(dest))
- all_files["after_trigger"].append(fpath.name)
-
- # 生成精简的元数据文件
- trigger_time = state.get("trigger_time")
- metadata = {
- "device_code": device_code,
- "trigger_time": trigger_time.strftime("%Y-%m-%d %H:%M:%S") if trigger_time else None,
- "avg_error": round(state.get("avg_error", 0.0), 6),
- "threshold": round(state.get("threshold", 0.0), 6),
- "files": all_files
- }
-
- metadata_path = save_dir / "metadata.json"
- with open(metadata_path, 'w', encoding='utf-8') as f:
- json.dump(metadata, f, ensure_ascii=False, indent=2)
-
- total = sum(len(v) for v in all_files.values())
- logger.warning(f"异常上下文已保存: {device_code}/{folder_name} | "
- f"共{total}个文件 (前{len(all_files['before_trigger'])}+异常{len(all_files['at_trigger'])}+后{len(all_files['after_trigger'])})")
-
- except Exception as e:
- logger.error(f"保存异常上下文失败: {device_code} | {e}")
-
- def _compute_frequency_middle(self, device_code):
- """
- 计算历史频谱图平均值(normal_frequency_middle)
-
- 参数:
- device_code: 设备编号
-
- 返回:
- dB值列表的平均值
- """
- history = self.freq_history.get(device_code, [])
- if not history or len(history) < 2:
- return []
-
- try:
- # 收集所有历史dB值(只保留长度一致的)
- all_db = []
- ref_len = len(history[0][1]) # 使用第一条记录的长度作为参考
-
- for _, db in history:
- if len(db) == ref_len:
- all_db.append(db)
-
- if not all_db:
- return []
-
- # 计算各频率点的平均dB
- avg_db = np.mean(all_db, axis=0).tolist()
- return avg_db
-
- except Exception as e:
- logger.error(f"计算频谱图历史平均失败: {e}")
- return []
-
- def _get_threshold(self, device_code):
- """
- 获取指定设备的阈值
-
- 优先级:
- 1. 从 multi_predictor 获取设备对应模型的阈值
- 2. 使用配置文件中的默认阈值
- 3. 返回0.0(不进行异常判定)
-
- 参数:
- device_code: 设备编号(如 "LT-1")
-
- 返回:
- 阈值,未找到返回默认值或0.0
- """
- # 方式1:从 multi_predictor 获取设备阈值
- thr = self.multi_predictor.get_threshold(device_code)
- if thr is not None:
- logger.debug(f"阈值来源: {device_code} -> 设备模型阈值 = {thr:.6f}")
- return thr
-
- # 方式2:使用配置中的默认阈值
- default_threshold = self.config.get('prediction', {}).get('default_threshold', 0.0)
-
- if default_threshold > 0:
- logger.debug(f"阈值来源: {device_code} -> 配置默认值 = {default_threshold:.6f}")
- return default_threshold
-
- # 首次找不到阈值时记录警告
- if device_code not in getattr(self, '_threshold_warned', set()):
- if not hasattr(self, '_threshold_warned'):
- self._threshold_warned = set()
- self._threshold_warned.add(device_code)
- logger.warning(f"未找到阈值: {device_code},将跳过异常判定")
-
- return 0.0
-
- def _get_inlet_flow(self, stream_config: RTSPStreamConfig) -> float:
- """
- 获取进水流量(使用实时数据接口)
-
- 使用 current-data 接口直接获取最新一条数据
-
- 参数:
- stream_config: 流配置
-
- 返回:
- 进水流量值,失败返回0.0
- """
- if not self.scada_enabled or not stream_config:
- logger.debug(f"流量跳过: scada_enabled={self.scada_enabled}, stream_config={stream_config}")
- return 0.0
-
- # 获取PLC数据点位
- plc_address = stream_config.get_flow_plc_address()
- if not plc_address:
- logger.debug(f"流量跳过(无PLC地址): {stream_config.device_code} | pump_name='{stream_config.pump_name}'")
- return 0.0
-
- # 使用水厂配置的 project_id
- project_id = stream_config.project_id
-
- try:
- # 当前时间戳(毫秒)
- now_ms = int(datetime.now().timestamp() * 1000)
-
- # 请求参数
- params = {"time": now_ms}
-
- # 请求体:使用实时数据接口格式
- request_body = [
- {
- "deviceId": "1",
- "deviceItems": plc_address,
- "deviceName": f"流量_{stream_config.pump_name}",
- "project_id": project_id
- }
- ]
-
- # 请求头
- headers = {
- "Content-Type": "application/json",
- "JWT-TOKEN": self.scada_jwt
- }
-
- logger.debug(f"流量请求: {stream_config.device_code} | project_id={project_id} | plc={plc_address}")
-
- # 发送 POST 请求到实时接口
- response = requests.post(
- self.scada_realtime_url,
- params=params,
- json=request_body,
- headers=headers,
- timeout=self.scada_timeout
- )
-
- if response.status_code == 200:
- data = response.json()
- if data.get("code") == 200:
- if data.get("data"):
- # 获取第一条数据(实时接口只返回最新一条)
- latest = data["data"][0]
- if "val" in latest:
- flow = float(latest["val"])
- logger.debug(f"流量获取成功: {stream_config.device_code} | 流量={flow}")
- return flow
- else:
- logger.debug(f"流量数据无val字段: {stream_config.device_code}")
- else:
- # API正常返回但无数据
- logger.debug(f"流量查询无数据: {stream_config.device_code}")
- else:
- logger.warning(f"流量API返回异常: {stream_config.device_code} | code={data.get('code')} | msg={data.get('msg')}")
- else:
- logger.warning(f"流量HTTP错误: {stream_config.device_code} | status={response.status_code}")
-
- except Exception as e:
- logger.warning(f"流量获取异常: {stream_config.device_code} | {e}")
-
- return 0.0
-
- def _compute_frequency_spectrum(self, audio_data):
- """
- 计算频谱图数据
-
- 将1分钟内的多个8秒音频片段合并,计算整体FFT
-
- 参数:
- audio_data: 音频数据列表
-
- 返回:
- dB值列表(0-8000Hz均匀分布,共400个点)
- """
- if not audio_data:
- return []
-
- try:
- # 合并所有音频片段
- combined = np.concatenate(audio_data)
-
- # 计算FFT
- n = len(combined)
- fft_result = np.fft.rfft(combined)
- freqs = np.fft.rfftfreq(n, 1.0 / CFG.SR)
-
- # 计算幅度(转换为dB)
- magnitude = np.abs(fft_result)
- # 避免log(0)
- magnitude = np.maximum(magnitude, 1e-10)
- db = 20 * np.log10(magnitude)
-
- # 降采样到400个点(0-8000Hz均匀分布)
- max_freq = 8000
- num_points = 400
-
- db_list = []
-
- for i in range(num_points):
- # 计算目标频率(0到8000Hz均匀分布)
- target_freq = (i / (num_points - 1)) * max_freq
- # 找到最接近的频率索引
- idx = np.argmin(np.abs(freqs - target_freq))
- if idx < len(freqs):
- db_list.append(float(db[idx]))
-
- return db_list
-
- except Exception as e:
- logger.error(f"计算频谱图失败: {e}")
- return []
-
- def _push_detection_result(self, stream_config, device_code,
- is_anomaly, trigger_alert, abnormal_score, score_threshold,
- running_status, inlet_flow,
- freq_db, freq_middle_db=None,
- anomaly_type_code=6,
- abnormal_wav_b64=""):
- """
- 推送检测结果到远程服务器
-
- 上报格式符合用户要求:
- - 包含频谱图数据(this_frequency + normal_frequency_middle)
- - 包含启停状态和进水流量
- - sound_detection.status 和 abnormalwav 仅在 trigger_alert=True 时上报异常(0)和base64,否则为正常(1)和空
-
- 参数:
- stream_config: 流配置
- device_code: 设备编号
- is_anomaly: 实际检测是否异常
- trigger_alert: 是否触发报警(仅在新异常产生时为True)
- abnormal_score: 平均重建误差
- score_threshold: 阈值
- running_status: 启停状态
- inlet_flow: 进水流量
- freq_db: 本次频谱图dB值列表
- freq_middle_db: 历史平均频谱图dB值列表
- abnormal_wav_b64: 预读的异常音频base64编码(在文件归档前读取,避免竞态)
- """
- try:
- import time as time_module
-
- # 获取设备名称
- camera_name = stream_config.camera_name
-
- # 从 push_base_urls 列表构建推送目标(每个 base_url + /{project_id})
- if not self.push_base_urls:
- logger.warning(f"未配置push_base_urls: {device_code}")
- return
-
- # 获取该设备对应的 project_id,用于拼接最终推送URL
- project_id = stream_config.project_id
-
- # 构建推送消息
- request_time = int(time_module.time() * 1000)
-
- # 异常音频已通过参数 abnormal_wav_b64 传入(调用方在文件归档前预读)
- # 如果调用方未提供,记录警告以便排查
- if trigger_alert and not abnormal_wav_b64:
- logger.warning(f"报警推送但无异常音频数据: {device_code}")
- # 决定 sound_detection.status
- # 只有在 trigger_alert=True 时才报 0 (异常)
- # 其他情况(正常、持续异常)都报 1 (正常/空) -- 根据用户要求: "正常情况下 ... 是空 就行"
- report_status = 0 if trigger_alert else 1
-
- payload = {
- "message": {
- # 通道信息
- "channelInfo": {"name": camera_name},
- # 请求时间戳
- "requestTime": request_time,
- # 分类信息
- "classification": {
- "level_one": 2, # 音频检测大类
- "level_two": anomaly_type_code # 异常类型小类(6=未分类, 7=轴承, 8=气蚀, 9=松动, 10=叶轮, 11=阀件)
- },
- # 技能信息
- "skillInfo": {"name": "异响检测"},
- # 声音检测数据
- "sound_detection": {
- # 异常音频
- "abnormalwav": abnormal_wav_b64,
- # 状态:0=异常(仅新异常), 1=正常(或持续异常)
- "status": report_status,
- # 设备状态信息
- "condition": {
- "running_status": "运行中" if running_status == "开机" else "停机中",
- "inlet_flow": inlet_flow
- },
- # 得分信息
- "score": {
- "abnormal_score": abnormal_score, # 当前1分钟平均重构误差
- "score_threshold": score_threshold # 该设备异常阀值
- },
- # 频谱图数据
- "frequency": {
- "this_frequency": freq_db, # 当前1分钟的频谱图
- "normal_frequency_middle": freq_middle_db or [], # 过去10分钟的频谱图平均
- }
- }
- }
- }
-
- # 遍历所有推送基地址,逐个拼接 project_id 发送(各目标互不影响)
- push_targets = [
- (f"{item['url']}/{project_id}", item['label'])
- for item in self.push_base_urls
- ]
-
- # 逐个目标推送(各目标互不影响)
- for target_url, target_label in push_targets:
- self._send_to_target(
- target_url, target_label, payload,
- device_code, camera_name, trigger_alert, abnormal_score
- )
-
- except Exception as e:
- logger.error(f"推送通知异常: {e}")
-
- def _send_to_target(self, target_url, target_label, payload,
- device_code, camera_name, trigger_alert, abnormal_score):
- # 向单个推送目标发送数据(含重试逻辑)
- # 各目标独立调用,某个目标失败不影响其他目标
- import time as time_module
-
- push_success = False
- for attempt in range(self.push_retry_count + 1):
- try:
- response = requests.post(
- target_url,
- json=payload,
- timeout=self.push_timeout,
- headers={"Content-Type": "application/json"}
- )
-
- if response.status_code == 200:
- alert_tag = "报警" if trigger_alert else "心跳"
- logger.info(
- f" [{alert_tag}][{target_label}] {device_code}({camera_name}) | "
- f"误差={abnormal_score:.6f}"
- )
- push_success = True
- break
- else:
- logger.warning(
- f"推送失败[{target_label}]: {device_code} | URL={target_url} | "
- f"状态码={response.status_code} | 内容={response.text[:100]}"
- )
-
- except requests.exceptions.Timeout:
- logger.warning(f"推送超时[{target_label}]: {device_code} | URL={target_url} | 尝试 {attempt + 1}/{self.push_retry_count + 1}")
- except requests.exceptions.RequestException as e:
- logger.warning(f"推送异常[{target_label}]: {device_code} | URL={target_url} | {e}")
-
- # 重试间隔
- if attempt < self.push_retry_count:
- time_module.sleep(1)
-
- if not push_success:
- logger.error(f"推送失败[{target_label}]: {device_code} | URL={target_url} | 已达最大重试次数")
-
- def _move_audio_to_date_dir(self, wav_file):
- """
- 将音频移动到日期目录归档
-
- 目录结构:
- deploy_pickup/data/{device_code}/{日期}/{文件名}
-
- 参数:
- wav_file: 音频文件路径
- """
- try:
- import shutil
-
- # 从文件名提取device_code和日期
- # 格式: 92_1#-1_20251218142000.wav
- match = re.match(r'\d+_(.+)_(\d{8})\d{6}\.wav', wav_file.name)
- if not match:
- logger.warning(f"无法从文件名提取信息: {wav_file.name}")
- return
-
- device_code = match.group(1) # 如 1#-1
- date_str = match.group(2) # YYYYMMDD
-
- # 构建目标目录: data/{device_code}/{日期}/
- date_dir = self.audio_dir / device_code / date_str
- date_dir.mkdir(parents=True, exist_ok=True)
-
- # 移动文件
- dest_file = date_dir / wav_file.name
- shutil.move(str(wav_file), str(dest_file))
- logger.debug(f"音频已归档: {device_code}/{date_str}/{wav_file.name}")
-
- except Exception as e:
- logger.error(f"移动音频失败: {wav_file.name} | 错误: {e}")
-
- def _move_audio_to_transition_dir(self, wav_file, reason):
- """
- 将泵停机/过渡期的音频移动到过渡期目录
-
- 目录结构:
- deploy_pickup/data/{device_code}/pump_transition/{文件名}
-
- 这些音频不会被用于模型训练,但保留用于分析调试
-
- 参数:
- wav_file: 音频文件路径
- reason: 原因标识(stopped=停机, transition=过渡期)
- """
- try:
- import shutil
-
- # 从文件名提取device_code
- # 格式: 92_1#-1_20251218142000.wav
- match = re.match(r'\d+_(.+)_\d{14}\.wav', wav_file.name)
- if not match:
- logger.warning(f"无法从文件名提取信息: {wav_file.name}")
- return
-
- device_code = match.group(1) # 如 1#-1
-
- # 构建目标目录: data/{device_code}/pump_transition/
- transition_dir = self.audio_dir / device_code / "pump_transition"
- transition_dir.mkdir(parents=True, exist_ok=True)
-
- # 移动文件
- dest_file = transition_dir / wav_file.name
- shutil.move(str(wav_file), str(dest_file))
- logger.debug(f"过渡期音频已归档: {device_code}/pump_transition/{wav_file.name} ({reason})")
-
- except Exception as e:
- logger.error(f"移动过渡期音频失败: {wav_file.name} | 错误: {e}")
-
- def _cleanup_old_files(self, days: int = 7):
- """
- 清理超过指定天数的正常音频文件
-
- 清理规则:
- - 只清理日期归档目录(data/{device_code}/{日期}/)
- - 保留current目录和anomaly_detected目录
- - 超过days天的文件删除
-
- 参数:
- days: 保留天数,默认7天
- """
- try:
- import shutil
- from datetime import datetime, timedelta
-
- # 计算截止日期
- cutoff_date = datetime.now() - timedelta(days=days)
- cutoff_str = cutoff_date.strftime("%Y%m%d")
-
- deleted_count = 0
-
- # 遍历每个设备目录
- for device_dir in self.audio_dir.iterdir():
- if not device_dir.is_dir():
- continue
-
- # 检查是否是日期目录(跳过current和其他特殊目录)
- for subdir in device_dir.iterdir():
- if not subdir.is_dir():
- continue
-
- # 跳过current目录
- if subdir.name == "current":
- continue
-
- # 检查是否为日期目录(YYYYMMDD格式)
- if not re.match(r'^\d{8}$', subdir.name):
- continue
-
- # 如果日期早于截止日期,删除整个目录
- if subdir.name < cutoff_str:
- try:
- shutil.rmtree(subdir)
- deleted_count += 1
- logger.debug(f"清理过期目录: {device_dir.name}/{subdir.name}")
- except Exception as e:
- logger.error(f"删除目录失败: {subdir} | {e}")
-
- if deleted_count > 0:
- logger.info(f"清理完成: 共删除{deleted_count}个过期目录")
-
- except Exception as e:
- logger.error(f"清理过期文件失败: {e}")
- class PickupMonitoringSystem:
- """
- 拾音器监控系统
-
- 管理FFmpeg进程和监控线程
- """
-
- def __init__(self, db_path=None, yaml_config=None):
- """
- 初始化监控系统
- 参数:
- db_path: SQLite 数据库路径(为 None 时使用默认路径 config/pickup_config.db)
- yaml_config: 若不为 None,则直接使用该 dict 作为配置(跳过 DB)
- """
- self.db_path = db_path
- self.config_manager = None
- if yaml_config is not None:
- # 直接使用传入的 YAML 配置字典
- self.config = yaml_config
- print(f"配置源: YAML (外部传入)")
- else:
- # 从 SQLite DB 加载
- actual_db = Path(db_path) if db_path else get_db_path()
- if not actual_db.exists():
- raise FileNotFoundError(
- f"\u914d\u7f6e\u6570\u636e\u5e93\u4e0d\u5b58\u5728: {actual_db}\n"
- f"\u8bf7\u5148\u8fd0\u884c\u8fc1\u79fb\u811a\u672c: python tool/migrate_yaml_to_db.py"
- )
- self.config_manager = ConfigManager(str(actual_db))
- print(f"配置源: SQLite ({actual_db})")
- self.config = self._load_config()
-
- # 冷启动模式标记
- self.cold_start_mode = False
-
- # 初始化多模型预测器(支持每个设备独立模型)
- self.multi_predictor = MultiModelPredictor()
- self.predictor = None # 兼容性保留,已废弃
-
- # 从配置中注册所有设备的模型目录映射
- print("\n正在初始化多模型预测器...")
- for plant in self.config.get('plants', []):
- if not plant.get('enabled', False):
- continue
- for stream in plant.get('rtsp_streams', []):
- device_code = stream.get('device_code', '')
- model_subdir = stream.get('model_subdir', device_code)
- if device_code and model_subdir:
- self.multi_predictor.register_device(device_code, model_subdir)
- print(f" 注册设备: {device_code} -> models/{model_subdir}/")
-
- print(f"已注册 {len(self.multi_predictor.registered_devices)} 个设备模型映射")
-
- # 进程和监控器列表
- self.ffmpeg_processes = []
- self.monitors = []
-
- # 信号处理
- signal.signal(signal.SIGINT, self._signal_handler)
- signal.signal(signal.SIGTERM, self._signal_handler)
-
- def _load_config(self):
- """
- 从 SQLite DB 加载配置
-
- 返回:
- Dict: 配置字典
- """
- config = self.config_manager.get_full_config()
- logger.info("配置已从 SQLite 加载")
- return config
-
- def _parse_rtsp_streams(self):
- """
- 解析配置文件中的RTSP流信息
-
- 返回:
- List[RTSPStreamConfig]: RTSP流配置列表
- """
- streams = []
-
- plants = self.config.get('plants', [])
- if not plants:
- raise ValueError("配置文件中未找到水厂配置")
-
- for plant in plants:
- plant_name = plant.get('name')
- if not plant_name:
- print("警告: 跳过未命名的区域配置")
- continue
-
- # 检查是否启用该水厂(默认启用以兼容旧配置)
- if not plant.get('enabled', True):
- logger.debug(f"跳过禁用的水厂: {plant_name}")
- continue
-
- # 获取流量PLC配置
- flow_plc = plant.get('flow_plc', {})
-
- # 获取该水厂的project_id(每个plant有自己的project_id)
- project_id = plant.get('project_id', 92)
- logger.info(f"加载区域配置: {plant_name} | project_id={project_id}")
-
- rtsp_streams = plant.get('rtsp_streams', [])
- for stream in rtsp_streams:
- url = stream.get('url')
- channel = stream.get('channel')
- camera_name = stream.get('name', '')
- device_code = stream.get('device_code', '')
- pump_name = stream.get('pump_name', '')
-
- if not url or channel is None:
- print(f"警告: 跳过不完整的RTSP流配置 (区域: {plant_name})")
- continue
-
- streams.append(RTSPStreamConfig(
- plant_name=plant_name,
- rtsp_url=url,
- channel=channel,
- camera_name=camera_name,
- device_code=device_code,
- pump_name=pump_name,
- flow_plc=flow_plc,
- project_id=project_id
- ))
-
- return streams
-
- def start(self):
- """
- 启动监控系统
- """
- print("=" * 70)
- print("拾音器异响检测系统")
- print("=" * 70)
-
- # 解析流配置
- streams = self._parse_rtsp_streams()
- print(f"\n共配置 {len(streams)} 个拾音设备:")
- for stream in streams:
- print(f" - {stream.device_code} | {stream.camera_name}")
-
- # 启动FFmpeg进程
- print("\n启动FFmpeg进程...")
- for stream in streams:
- ffmpeg = FFmpegProcess(stream, CFG.AUDIO_DIR, self.config)
- if ffmpeg.start():
- self.ffmpeg_processes.append(ffmpeg)
- else:
- print(f"警告: FFmpeg启动失败,跳过该流: {stream}")
-
- if not self.ffmpeg_processes:
- print("\n错误: 所有FFmpeg进程均启动失败")
- sys.exit(1)
-
- print(f"\n成功启动 {len(self.ffmpeg_processes)}/{len(streams)} 个FFmpeg进程")
-
- # 收集所有流配置(用于PickupMonitor)
- all_stream_configs = [p.stream_config for p in self.ffmpeg_processes]
-
- # 启动监控线程(统一一个监控器)
- print("\n启动监控线程...")
- check_interval = self.config.get('prediction', {}).get('check_interval', 1.0)
-
- monitor = PickupMonitor(
- audio_dir=CFG.AUDIO_DIR,
- multi_predictor=self.multi_predictor,
- stream_configs=all_stream_configs,
- check_interval=check_interval,
- config=self.config,
- config_manager=self.config_manager
- )
- monitor.start()
- self.monitors.append(monitor)
-
- print(f"\n成功启动监控线程")
- print("\n" + "=" * 70)
- print("系统已启动,开始监控...")
- print("按 Ctrl+C 停止系统")
- print("=" * 70 + "\n")
-
- # FFmpeg重启配置
- max_restart_attempts = 5 # 单个进程最大重启次数
- restart_interval_base = 30 # 基础重启间隔(秒)-> 改为30s
- restart_counts = {id(p): 0 for p in self.ffmpeg_processes} # 重启计数
-
- # 主循环(带自动重启)
- try:
- while True:
- # 检查每个FFmpeg进程状态
- for ffmpeg in self.ffmpeg_processes:
- if not ffmpeg.is_running():
- pid = id(ffmpeg)
- device_code = ffmpeg.stream_config.device_code
-
- # 检查重启次数
- if restart_counts.get(pid, 0) < max_restart_attempts:
- # 计算等待时间(指数退避)
- wait_time = restart_interval_base * (2 ** restart_counts.get(pid, 0))
- logger.warning(f"FFmpeg进程停止: {device_code} | 将在{wait_time}秒后重启 | "
- f"重试次数: {restart_counts.get(pid, 0) + 1}/{max_restart_attempts}")
-
- time.sleep(wait_time)
-
- # 尝试重启
- if ffmpeg.start():
- logger.debug(f"FFmpeg重启成功: {device_code}")
- restart_counts[pid] = 0 # 重置计数
- else:
- restart_counts[pid] = restart_counts.get(pid, 0) + 1
- logger.error(f"FFmpeg重启失败: {device_code}")
- else:
- logger.error(f"FFmpeg达到最大重启次数: {device_code} | 已放弃重启")
-
- # 检查是否所有进程都已放弃
- running_count = sum(1 for p in self.ffmpeg_processes if p.is_running())
- all_abandoned = all(restart_counts.get(id(p), 0) >= max_restart_attempts
- for p in self.ffmpeg_processes if not p.is_running())
-
- if running_count == 0 and all_abandoned:
- print("\n错误: 所有FFmpeg进程均已停止且无法重启")
- break
-
- # 每天0点执行7天清理
- current_hour = datetime.now().hour
- current_date = datetime.now().strftime("%Y%m%d")
-
- if not hasattr(self, '_last_cleanup_date'):
- self._last_cleanup_date = ""
-
- # 在0点且今天还没清理过时执行
- if current_hour == 0 and self._last_cleanup_date != current_date:
- logger.info("执行7天过期文件清理...")
- for monitor in self.monitors:
- monitor._cleanup_old_files(days=7)
- self._last_cleanup_date = current_date
-
- # 定期打印RTSP状态(每分钟一次)
- if not hasattr(self, '_last_status_log'):
- self._last_status_log = datetime.now()
-
- if (datetime.now() - self._last_status_log).total_seconds() >= 60:
- running = sum(1 for p in self.ffmpeg_processes if p.is_running())
- total = len(self.ffmpeg_processes)
- logger.info(f"RTSP状态: {running}/{total} 个FFmpeg进程运行中")
- logger.info("─" * 60)
- self._last_status_log = datetime.now()
-
- time.sleep(10)
- except KeyboardInterrupt:
- print("\n\n收到停止信号,正在关闭系统...")
- finally:
- self.stop()
-
- def stop(self):
- """
- 停止监控系统
- """
- print("\n正在停止监控系统...")
-
- print("停止监控线程...")
- for monitor in self.monitors:
- monitor.stop()
-
- print("停止FFmpeg进程...")
- for ffmpeg in self.ffmpeg_processes:
- ffmpeg.stop()
-
- print("系统已完全停止")
-
- def _signal_handler(self, signum, frame):
- """
- 信号处理函数
- """
- print(f"\n\n收到信号 {signum},正在关闭系统...")
- self.stop()
- sys.exit(0)
- def _start_config_api_server(config_manager, multi_predictor=None, port=18080):
- # 在后台线程中启动 FastAPI 配置管理 API
- try:
- import uvicorn
- init_config_api(config_manager, multi_predictor)
- logger.info(f"启动配置管理 API: http://0.0.0.0:{port}")
- uvicorn.run(config_app, host="0.0.0.0", port=port, log_level="warning")
- except ImportError:
- logger.warning("uvicorn 未安装,配置管理 API 无法启动。请安装: pip install uvicorn")
- except Exception as e:
- logger.error(f"配置管理 API 启动失败: {e}")
- def main():
- """
- 主函数
- """
- # 检测 DB 是否存在
- db_path = get_db_path(Path(__file__).parent / "config")
- if not db_path.exists():
- print(f"错误: 配置数据库不存在: {db_path}")
- print(f"\n请先运行迁移脚本: python tool/migrate_yaml_to_db.py")
- sys.exit(1)
-
- try:
- system = PickupMonitoringSystem()
-
- # 后台线程启动配置管理 API
- api_port = 18080
- api_thread = threading.Thread(
- target=_start_config_api_server,
- args=(system.config_manager, system.multi_predictor, api_port),
- daemon=True,
- name="config-api"
- )
- api_thread.start()
- system.start()
- except FileNotFoundError as e:
- print(f"\n错误: {e}")
- print("\n请确保:")
- print("1. 已完成训练并计算阈值")
- print("2. 已复制必要的模型文件到 models/ 目录")
- sys.exit(1)
- except Exception as e:
- print(f"\n严重错误: {e}")
- import traceback
- traceback.print_exc()
- sys.exit(1)
- if __name__ == "__main__":
- main()
|