| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- run_pickup_monitor.py
- ---------------------
- 拾音器异响检测主程序
- 功能说明:
- 该程序用于音频采集设备(拾音器)的实时异常检测。
- 与摄像头版本不同,本版本:
- 1. 没有视频/图片采集和上报
- 2. 上报时包含音频频谱图分析数据
- 3. 支持进水流量PLC数据读取
- 4. 每1分钟计算一次平均重建误差并上报
- 上报数据结构:
- {
- "message": {
- "channelInfo": {"name": "设备信息"},
- "requestTime": 时间戳,
- "classification": {
- "level_one": 2, // 音频检测大类
- "level_two": 6 // 异常类型小类: 6=未分类, 7=轴承, 8=气蚀, 9=松动/共振, 10=叶轮, 11=阀件
- },
- "skillInfo": {"name": "异响检测"},
- "sound_detection": {
- "abnormalwav": "", // base64编码的音频
- "status": 0/1, // 0=异常, 1=正常
- "condition": {
- "running_status": "运行中/停机中", // 启停状态
- "inlet_flow": 0.0 // 进水流量
- },
- "score": {
- "abnormal_score": 0.0, // 当前1分钟内8s平均重构误差
- "score_threshold": 0.0 // 该设备的异常阀值
- },
- "frequency": {
- "this_frequency": [], // 当前1分钟的频谱图
- "normal_frequency_middle": [], // 过去10分钟的频谱图平均
- "normal_frequency_upper": [], // 上限(暂为空)
- "normal_frequency_lower": [] // 下限(暂为空)
- }
- }
- }
- }
- 使用方法:
- python run_pickup_monitor.py
- 配置文件:
- config/rtsp_config.yaml
- """
- import subprocess
- import time
- import re
- import signal
- import sys
- import threading
- import logging
- import base64
- from concurrent.futures import ThreadPoolExecutor
- from pathlib import Path
- from datetime import datetime, timedelta
- from collections import defaultdict
- # 用于计算FFT
- import numpy as np
- try:
- import librosa
- except ImportError:
- print("错误:缺少librosa库,请安装: pip install librosa")
- sys.exit(1)
- try:
- import requests
- except ImportError:
- print("错误:缺少requests库,请安装: pip install requests")
- sys.exit(1)
- # 导入预测器模块
- from predictor import MultiModelPredictor, CFG
- # 导入配置管理模块(SQLite)
- from config.config_manager import ConfigManager
- from config.config_api import app as config_app, init_config_api
- from config.db_models import get_db_path
- # 导入泵状态监控模块(用于检测启停过渡期)
- try:
- from core.pump_state_monitor import PumpStateMonitor
- PUMP_STATE_MONITOR_AVAILABLE = True
- except ImportError:
- PUMP_STATE_MONITOR_AVAILABLE = False
- # 导入人体检测读取模块(用于抑制有人时的误报)
- try:
- from core.human_detection_reader import HumanDetectionReader
- HUMAN_DETECTION_AVAILABLE = True
- except ImportError:
- HUMAN_DETECTION_AVAILABLE = False
- # 导入报警聚合器(跨设备聚合抑制 + 分类型冷却)
- try:
- from core.alert_aggregator import AlertAggregator
- ALERT_AGGREGATOR_AVAILABLE = True
- except ImportError:
- ALERT_AGGREGATOR_AVAILABLE = False
- # ========================================
- # 配置日志系统
- # ========================================
- def setup_logging():
- # 配置日志系统(RotatingFileHandler -> system.log)
- from logging.handlers import RotatingFileHandler
- # 如果根 logger 已经被上层调用者配置过,则直接复用
- root = logging.getLogger()
- if root.handlers:
- return logging.getLogger('PickupMonitor')
- # 日志配置
- log_dir = Path(__file__).parent / "logs"
- log_dir.mkdir(parents=True, exist_ok=True)
- log_file = log_dir / "system.log"
- formatter = logging.Formatter(
- '%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S'
- )
- # 按文件大小轮转,最多保留 2 个备份(共 30MB)
- file_handler = RotatingFileHandler(
- log_file,
- maxBytes=10 * 1024 * 1024,
- backupCount=2,
- encoding='utf-8'
- )
- file_handler.setFormatter(formatter)
- # 控制台输出(前台运行时可见,后台运行时 stdout 已被丢弃不影响)
- console_handler = logging.StreamHandler(sys.stdout)
- console_handler.setFormatter(formatter)
- logging.basicConfig(
- level=logging.INFO,
- handlers=[file_handler, console_handler]
- )
- return logging.getLogger('PickupMonitor')
- # 初始化日志系统
- logger = setup_logging()
- # 导入能量基线模块
- try:
- from core.energy_baseline import EnergyBaseline
- ENERGY_BASELINE_AVAILABLE = True
- except ImportError:
- ENERGY_BASELINE_AVAILABLE = False
- logger.warning("能量基线模块未找到,泵状态检测功能禁用")
- class RTSPStreamConfig:
- """
- RTSP流配置类
-
- 封装单个RTSP流的配置信息,包含拾音器特有字段
- """
-
- def __init__(self, plant_name, rtsp_url, channel,
- camera_name, device_code, pump_name,
- flow_plc, project_id):
- """
- 初始化RTSP流配置
-
- 参数:
- plant_name: 区域名称(泵房-反渗透高压泵等)
- rtsp_url: RTSP流URL
- channel: 通道号
- camera_name: 设备名称
- device_code: 设备编号(如1#-1)
- pump_name: 泵名称(A/B/C/D),用于匹配流量PLC
- flow_plc: 流量PLC地址映射
- """
- self.plant_name = plant_name
- self.rtsp_url = rtsp_url
- self.channel = channel
- self.pump_id = f"ch{channel}"
- self.camera_name = camera_name or f"ch{channel}"
- self.device_code = device_code
- self.pump_name = pump_name
- self.flow_plc = flow_plc or {}
- self.project_id = project_id
-
- def get_flow_plc_address(self):
- """
- 获取该设备对应的进水流量PLC地址
-
- 返回:
- PLC地址字符串,不存在则返回空字符串
- """
- if self.pump_name and self.flow_plc:
- return self.flow_plc.get(self.pump_name, "")
- return ""
-
- def __repr__(self):
- return f"RTSPStreamConfig(plant='{self.plant_name}', camera='{self.camera_name}', pump='{self.pump_id}')"
- class FFmpegProcess:
- """
- FFmpeg进程管理类
-
- 负责启动和管理单个RTSP流的FFmpeg进程。
- 进程将RTSP流转换为固定时长的WAV音频文件。
-
- 文件名格式: {project_id}_{device_code}_{时间戳}.wav
- """
-
- def __init__(self, stream_config, output_dir, config=None):
- """
- 初始化FFmpeg进程管理器
-
- 参数:
- stream_config: RTSP流配置
- output_dir: 音频输出目录
- config: 全局配置字典
- """
- self.config_dict = config or {}
- self.stream_config = stream_config
- self.output_dir = output_dir
- self.process = None
-
- # 从配置中读取文件时长,默认8秒
- audio_cfg = self.config_dict.get('audio', {})
- self.file_duration = audio_cfg.get('file_duration', 8)
-
- # 获取project_id(从 stream_config 中读取)
- self.project_id = stream_config.project_id
-
- def start(self):
- """
- 启动FFmpeg进程
-
- 返回:
- bool: 启动成功返回True,失败返回False
- """
- # 获取设备编号
- device_code = self.stream_config.device_code or self.stream_config.pump_id
-
- # 创建输出目录(每个设备独立目录)
- # 结构: data/{device_code}/current/
- current_dir = self.output_dir / device_code / "current"
- current_dir.mkdir(parents=True, exist_ok=True)
-
- # 构建输出文件名模板
- # 格式: {project_id}_{device_code}_{时间戳}.wav
- # 例如: 92_1#-1_20251218142000.wav
- output_pattern = str(current_dir / f"{self.project_id}_{device_code}_%Y%m%d%H%M%S.wav")
-
- # 构建FFmpeg命令
- # 添加内存限制参数,防止 RTSP 缓冲区无限增长导致 OOM
- cmd = [
- "ffmpeg",
- # RTSP 输入参数(内存限制)
- "-rtsp_transport", "tcp", # 使用TCP传输
- "-probesize", "1000000", # 限制探测大小为1MB(默认5MB)
- "-analyzeduration", "1000000", # 限制分析时长为1秒(默认5秒)
- "-max_delay", "500000", # 最大延迟500ms
- "-fflags", "nobuffer", # 禁用输入缓冲
- "-flags", "low_delay", # 低延迟模式
- "-i", self.stream_config.rtsp_url, # 输入RTSP流
- # 音频输出参数
- "-vn", # 不处理视频
- "-ac", "1", # 单声道
- "-ar", str(CFG.SR), # 采样率(16000Hz)
- "-f", "segment", # 分段模式
- "-segment_time", str(int(self.file_duration)), # 每段时长
- "-strftime", "1", # 启用时间格式化
- "-loglevel", "error", # 只输出错误日志
- "-y", # 覆盖已存在的文件
- output_pattern,
- ]
-
- try:
- # 启动FFmpeg进程
- self.process = subprocess.Popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE
- )
-
- logger.info(f"FFmpeg已启动: {device_code} | {self.stream_config.camera_name} | "
- f"文件时长: {self.file_duration}秒 | PID={self.process.pid}")
- return True
-
- except FileNotFoundError:
- logger.error("FFmpeg错误: 未找到ffmpeg命令,请确保已安装FFmpeg")
- return False
- except Exception as e:
- logger.error(f"FFmpeg启动失败: {device_code} | 错误: {e}")
- return False
-
- def is_running(self):
- """
- 检查FFmpeg进程是否在运行
-
- 返回:
- bool: 进程运行中返回True,否则返回False
- """
- if self.process is None:
- return False
- return self.process.poll() is None
-
- def stop(self):
- """
- 停止FFmpeg进程
- """
- if self.process is not None and self.is_running():
- logger.info(f"FFmpeg停止: {self.stream_config.plant_name} | {self.stream_config.camera_name} | PID={self.process.pid}")
- self.process.terminate()
- try:
- self.process.wait(timeout=5)
- except subprocess.TimeoutExpired:
- logger.warning(f"FFmpeg强制终止: PID={self.process.pid}")
- self.process.kill()
- class PickupMonitor:
- """
- 拾音器监控线程类
-
- 监控音频目录,调用预测器检测异常,推送告警。
-
- 主要功能:
- 1. 不截取视频帧(纯音频)
- 2. 计算并上报频谱图数据(this_frequency + normal_frequency_middle)
- 3. 每分钟汇总上报一次
- 4. 使用SCADA API获取进水流量
- """
-
- def __init__(self, audio_dir, multi_predictor,
- stream_configs,
- check_interval=1.0, config=None,
- config_manager=None):
- """
- 初始化监控器
-
- Args:
- audio_dir: 音频根目录
- multi_predictor: 多模型预测器实例
- stream_configs: 所有RTSP流配置
- check_interval: 检查间隔(秒)
- config: 配置字典
- config_manager: ConfigManager 实例(用于热更新,为 None 时禁用热更新)
- """
- # 音频根目录(各设备目录在其下)
- self.audio_dir = audio_dir
- self.multi_predictor = multi_predictor
- self.predictor = None # 兼容性保留,已废弃
- self.stream_configs = stream_configs
- self.check_interval = check_interval
- self.config = config or {}
-
- # 热更新:持有 ConfigManager 引用,定期从 DB 刷新配置
- self._config_manager = config_manager
- self._last_config_reload = 0 # 上次配置刷新时间戳
- self._config_reload_interval = 30 # 配置刷新间隔(秒)
-
- # project_id
- self.project_id = self.config.get('platform', {}).get('project_id', 92)
-
- # 构建device_code到stream_config的映射
- self.device_map = {}
- for cfg in stream_configs:
- if cfg.device_code:
- self.device_map[cfg.device_code] = cfg
-
- # 已处理文件集合
- self.seen_files = set()
-
- # 每个设备的检测结果缓存(用于1分钟汇总)
- # key: device_code(如 "1#-1")
- self.device_cache = defaultdict(lambda: {
- "errors": [], # 每8秒的重建误差列表
- "last_upload": None, # 上次上报时间
- "audio_data": [], # 用于计算频谱图的音频数据
- "status": None # 最近的运行状态
- })
-
- # 频谱图历史缓存(用于计算normal_frequency_middle)
- # key: device_code, value: list of (timestamp, freq_db)
- freq_cfg = self.config.get('prediction', {}).get('frequency_history', {})
- self.freq_history_enabled = freq_cfg.get('enabled', True)
- self.freq_history_minutes = freq_cfg.get('history_minutes', 10)
- self.freq_history = defaultdict(list)
-
- # 上一次上报状态(True=异常,False=正常,None=初始)
- # 用于状态变更时去重,防止持续报警
- self.last_report_status = {}
-
- # 上报周期(秒)
- audio_cfg = self.config.get('audio', {})
- self.segment_duration = audio_cfg.get('segment_duration', 60)
-
- # 异常音频保存配置
- save_cfg = self.config.get('prediction', {}).get('save_anomaly_audio', {})
- self.save_anomaly_enabled = save_cfg.get('enabled', True)
- self.save_anomaly_dir = Path(__file__).parent / save_cfg.get('save_dir', 'data/anomaly_detected')
- if self.save_anomaly_enabled:
- self.save_anomaly_dir.mkdir(parents=True, exist_ok=True)
-
- # 异常推送配置
- push_cfg = self.config.get('push_notification', {})
- self.push_enabled = push_cfg.get('enabled', False)
- self.alert_enabled = push_cfg.get('alert_enabled', True) # 是否启用异常告警(false=暂时禁用异常上报)
- self.push_timeout = push_cfg.get('timeout', 30)
- self.push_retry_count = push_cfg.get('retry_count', 2)
- # 推送基地址列表(统一用 base_url/{project_id} 拼接,可无限扩展)
- raw_urls = push_cfg.get('push_base_urls', [])
- self.push_base_urls = [
- {"label": item.get("label", f"target-{i}"), "url": item.get("url", "").rstrip("/")}
- for i, item in enumerate(raw_urls)
- if item.get("url") # 跳过空URL
- ]
- # 推送失败记录文件路径
- failed_log_path = push_cfg.get('failed_push_log', 'data/push_failures.jsonl')
- self.failed_push_log = Path(__file__).parent / failed_log_path
- self.failed_push_log.parent.mkdir(parents=True, exist_ok=True)
-
- # 如果 alert_enabled 为 False,记录日志提醒
- if not self.alert_enabled:
- logger.warning("异常告警已禁用(alert_enabled=false),仅上报心跳状态")
-
- # ========================================
- # 报警聚合器(替代原有固定 cooldown_minutes)
- # ----------------------------------------
- # 功能1:跨设备聚合抑制 - 同一水厂多设备同时报警 -> 环境噪声,全部抑制
- # 功能2:分类型冷却 - 同类型24h,不同类型1h
- # ========================================
- agg_cfg = push_cfg.get('alert_aggregate', {})
- self.alert_aggregator = None
- if ALERT_AGGREGATOR_AVAILABLE:
- self.alert_aggregator = AlertAggregator(
- push_callback=self._push_detection_result,
- aggregate_enabled=agg_cfg.get('enabled', True),
- window_seconds=agg_cfg.get('window_seconds', 300),
- min_devices=agg_cfg.get('min_devices', 2),
- cooldown_same_type_hours=push_cfg.get('cooldown_same_type_hours', 24),
- cooldown_diff_type_hours=push_cfg.get('cooldown_diff_type_hours', 1)
- )
- else:
- logger.warning("报警聚合器模块未找到,使用默认报警逻辑")
-
- # 上次异常音频保存时间(用于保存冷却时间计算)
- self.last_anomaly_save_time = {}
- # 异常保存冷却时间(分钟),同一设备连续异常时,每N分钟只保存一次
- self.anomaly_save_cooldown_minutes = save_cfg.get('cooldown_minutes', 10)
-
- # 当前异常分类结果锁定(持续异常期间保持分类结果不变)
- # key: device_code, value: (anomaly_type_code, type_name)
- self.locked_anomaly_type = {}
-
- # 滑动窗口投票配置(5次中有3次异常才判定为异常)
- voting_cfg = self.config.get('prediction', {}).get('voting', {})
- self.voting_enabled = voting_cfg.get('enabled', True)
- self.voting_window_size = voting_cfg.get('window_size', 5) # 窗口大小
- self.voting_threshold = voting_cfg.get('threshold', 3) # 异常阈值(>=3次则判定异常)
- self.detection_history = {} # 每个设备的检测历史(True=异常)
-
- # 阈值容差区间配置(避免边界值反复跳变)
- # 误差在 threshold*(1-tolerance) ~ threshold*(1+tolerance) 范围内为灰区,维持上一状态
- self.tolerance_ratio = voting_cfg.get('tolerance_ratio', 0.05) # 默认5%容差
- self.last_single_anomaly = {} # 每个设备上一次的单周期判定结果
-
- # 阈值现在由 multi_predictor 管理,每个设备从其对应模型目录加载
-
- # 能量检测配置
- energy_cfg = self.config.get('prediction', {}).get('energy_detection', {})
- self.energy_detection_enabled = ENERGY_BASELINE_AVAILABLE and energy_cfg.get('enabled', True)
- self.skip_detection_when_stopped = energy_cfg.get('skip_when_stopped', True)
-
- # 能量基线检测器(每个设备一个)
- if self.energy_detection_enabled:
- self.energy_baselines = {}
- else:
- self.energy_baselines = None
-
- # SCADA API配置(用于获取泵状态和进水流量)
- scada_cfg = self.config.get('scada_api', {})
- self.scada_enabled = scada_cfg.get('enabled', False)
- self.scada_url = scada_cfg.get('base_url', '') # 历史数据接口(备用)
- self.scada_realtime_url = scada_cfg.get('realtime_url', '') # 实时数据接口(主用)
- self.scada_jwt = scada_cfg.get('jwt_token', '')
- self.scada_timeout = scada_cfg.get('timeout', 10)
-
- # 泵状态监控器(用于检测启停过渡期)
- self.pump_state_monitor = None
- self.pump_status_plc_configs = {} # {pump_name: [{point, name}, ...]}
- if PUMP_STATE_MONITOR_AVAILABLE and self.scada_enabled:
- # 初始化泵状态监控器
- # 获取第一个启用水厂的project_id
- project_id = 0
- for plant in self.config.get('plants', []):
- if plant.get('enabled', False):
- project_id = plant.get('project_id', 0)
- # 加载泵状态点位配置
- pump_status_plc = plant.get('pump_status_plc', {})
- self.pump_status_plc_configs = pump_status_plc
- break
-
- if project_id > 0:
- # 使用实时接口 URL 进行泵状态查询
- self.pump_state_monitor = PumpStateMonitor(
- scada_url=self.scada_realtime_url, # 使用实时数据接口
- scada_jwt=self.scada_jwt,
- project_id=project_id,
- timeout=self.scada_timeout,
- transition_window_minutes=15 # 启停后15分钟内视为过渡期
- )
- logger.info(f"泵状态监控器已启用 (project_id={project_id}, 过渡期窗口=15分钟, 使用实时接口)")
-
- # 线程控制
- self.running = False
- self.thread = None
- # 推送线程池(避免推送超时阻塞主监控循环)
- self._push_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="push")
-
- # 启动时间(用于跳过启动期间的状态变化日志)
- self.startup_time = None
- self.startup_warmup_seconds = 120 # 启动后120秒内不记录状态变化
-
- # ========================================
- # 异常上下文捕获配置
- # ========================================
- context_cfg = save_cfg.get('context_capture', {})
- self.context_capture_enabled = context_cfg.get('enabled', False)
- self.context_pre_minutes = context_cfg.get('pre_minutes', 2)
- self.context_post_minutes = context_cfg.get('post_minutes', 2)
-
- # 音频文件历史缓存(用于捕获异常前的音频)
- # key: device_code, value: deque of (timestamp, file_path)
- from collections import deque
- # 计算需要保留的历史文件数量(按分钟计算,每分钟约1个文件)
- history_size = self.context_pre_minutes + 2 # 多保留2个作为缓冲
- self.audio_file_history = defaultdict(lambda: deque(maxlen=history_size))
-
- # 异常上下文捕获状态
- # key: device_code, value: dict
- self.anomaly_capture_state = {}
-
- if self.context_capture_enabled:
- logger.info(f"异常上下文捕获已启用 (前{self.context_pre_minutes}分钟 + 后{self.context_post_minutes}分钟)")
-
- # ========================================
- # 人体检测报警抑制配置
- # ========================================
- # 只要任意摄像头在冷却时间内检测到人,所有设备的异常报警都会被抑制
- human_cfg = self.config.get('human_detection', {})
- self.human_detection_enabled = human_cfg.get('enabled', False) and HUMAN_DETECTION_AVAILABLE
- self.human_reader = None
-
- if self.human_detection_enabled:
- db_path = human_cfg.get('db_path', '')
- cooldown_minutes = human_cfg.get('cooldown_minutes', 5)
- self.human_reader = HumanDetectionReader(
- db_path=db_path,
- cooldown_minutes=cooldown_minutes
- )
- logger.info(f"人体检测报警抑制已启用 (冷却时间={cooldown_minutes}分钟, 数据库={db_path})")
-
- def start(self):
- """
- 启动监控线程
- """
- if self.running:
- return
-
- # 启动前清理current目录中的遗留文件
- self._cleanup_current_on_startup()
-
- self.running = True
- self.startup_time = datetime.now() # 记录启动时间
- self.thread = threading.Thread(target=self._monitor_loop, daemon=True)
- self.thread.start()
- # 打印监控的设备列表
- device_codes = list(self.device_map.keys())
- logger.info(f"监控线程已启动 | 目录: {self.audio_dir} | 监控设备: {device_codes}")
-
- def _cleanup_current_on_startup(self):
- """
- 启动时清理current目录中的遗留文件
-
- 删除上次运行遗留的文件,避免混入新采集数据影响检测准确性
- """
- cleaned_count = 0
-
- for device_code in self.device_map.keys():
- current_dir = self.audio_dir / device_code / "current"
- if not current_dir.exists():
- continue
-
- for wav_file in current_dir.glob("*.wav"):
- try:
- wav_file.unlink()
- cleaned_count += 1
- except Exception as e:
- logger.warning(f"清理遗留文件失败: {wav_file.name} | {e}")
-
- if cleaned_count > 0:
- logger.info(f"启动清理: 已删除current目录中 {cleaned_count} 个遗留文件")
-
- def stop(self):
- """
- 停止监控线程
- """
- if not self.running:
- return
-
- self.running = False
- if self.thread is not None:
- self.thread.join(timeout=5)
- # 关闭推送线程池,等待已提交的推送任务完成
- self._push_executor.shutdown(wait=True, cancel_futures=False)
- logger.info(f"监控线程已停止")
-
- def _reload_hot_config(self):
- # 从 DB 热加载可变配置项,无需重启服务
- # 只刷新运行时可安全变更的参数,不影响 FFmpeg 进程和流映射
- if self._config_manager is None:
- return
-
- now = time.time()
- # 未达到刷新间隔则跳过
- if now - self._last_config_reload < self._config_reload_interval:
- return
- self._last_config_reload = now
-
- try:
- # 从 DB 读取最新配置
- fresh = self._config_manager.get_full_config()
- self.config = fresh
-
- # 刷新推送配置
- push_cfg = fresh.get('push_notification', {})
- self.push_enabled = push_cfg.get('enabled', False)
- self.alert_enabled = push_cfg.get('alert_enabled', True)
- self.push_timeout = push_cfg.get('timeout', 30)
- self.push_retry_count = push_cfg.get('retry_count', 2)
- raw_urls = push_cfg.get('push_base_urls', [])
- self.push_base_urls = [
- {"label": item.get("label", f"target-{i}"), "url": item.get("url", "").rstrip("/")}
- for i, item in enumerate(raw_urls)
- if item.get("url")
- ]
-
- # 刷新投票配置
- voting_cfg = fresh.get('prediction', {}).get('voting', {})
- self.voting_enabled = voting_cfg.get('enabled', True)
- self.voting_window_size = voting_cfg.get('window_size', 5)
- self.voting_threshold = voting_cfg.get('threshold', 3)
- self.tolerance_ratio = voting_cfg.get('tolerance_ratio', 0.05)
-
- # 刷新能量检测配置
- energy_cfg = fresh.get('prediction', {}).get('energy_detection', {})
- self.skip_detection_when_stopped = energy_cfg.get('skip_when_stopped', True)
-
- # 刷新人体检测配置
- human_cfg = fresh.get('human_detection', {})
- new_human_enabled = human_cfg.get('enabled', False)
- if new_human_enabled != self.human_detection_enabled:
- logger.info(f"人体检测抑制配置变更: {self.human_detection_enabled} -> {new_human_enabled}")
- self.human_detection_enabled = new_human_enabled
-
- logger.debug("配置热刷新完成")
- except Exception as e:
- logger.error(f"配置热刷新失败: {e}")
-
- def _monitor_loop(self):
- """
- 监控循环(线程主函数)
-
- 持续检查各设备的音频目录,处理新生成的WAV文件。
- 每分钟汇总一次结果进行上报。
- """
- # 确保根目录存在
- self.audio_dir.mkdir(parents=True, exist_ok=True)
-
- while self.running:
- try:
- # 热更新:定期从 DB 刷新可变配置
- self._reload_hot_config()
- # 扫描所有设备目录下的current文件夹
- for device_code in self.device_map.keys():
- device_current_dir = self.audio_dir / device_code / "current"
-
- # 目录不存在则跳过
- if not device_current_dir.exists():
- continue
-
- for wav_file in device_current_dir.glob("*.wav"):
- # 跳过已处理的文件
- if wav_file in self.seen_files:
- continue
-
- # 检查文件是否完整
- try:
- stat_info = wav_file.stat()
- file_age = time.time() - stat_info.st_mtime
- file_size = stat_info.st_size
-
- # 文件修改时间检查(确保文件写入完成)
- # FFmpeg生成文件需要时间,太新的文件可能还没写完
- if file_age < 12.0:
- continue
-
- # 文件大小检查(60秒 × 16000Hz × 2字节 ≈ 1.9MB)
- # 范围:500KB - 3MB
- if file_size < 500_000 or file_size > 3_000_000:
- # 只有当文件生成已经有一段时间(>20秒)且依然很小,才判定为异常
- # 这样可以避免刚开始生成、正在写入的文件被误报
- if file_age > 20.0:
- logger.warning(f"文件大小异常: {wav_file.name} ({file_size / 1000:.1f}KB)")
-
- # 自动删除过小的文件(通常是0KB或损坏文件)
- if file_size < 500_000:
- try:
- wav_file.unlink()
- logger.debug(f"删除异常小文件: {wav_file.name}")
- except Exception as e:
- logger.error(f"删除文件失败: {wav_file.name} | {e}")
-
- self.seen_files.add(wav_file)
- continue # 继续下一个文件循环
-
- # 正常大小的文件继续处理
- if file_size < 500_000: # 双重检查,防止漏网
- continue
-
- except Exception as e:
- logger.error(f"文件状态检查失败: {wav_file.name} | {e}")
- continue
-
-
- # 处理新文件
- self._process_new_file(wav_file)
-
- # 标记为已处理
- self.seen_files.add(wav_file)
-
- # 检查是否需要进行周期性上报
- self._check_periodic_upload()
-
- # 检查聚合窗口是否到期,到期则执行聚合判定并推送
- if self.alert_aggregator:
- self.alert_aggregator.check_and_flush()
-
- # 清理过大的已处理文件集合
- if len(self.seen_files) > 10000:
- recent_files = sorted(self.seen_files,
- key=lambda f: f.stat().st_mtime if f.exists() else 0)[-5000:]
- self.seen_files = set(recent_files)
-
- except Exception as e:
- logger.error(f"监控循环错误: {e}")
-
- # 等待下一次检查
- time.sleep(self.check_interval)
-
- def _process_new_file(self, wav_file):
- """
- 处理新的音频文件
-
- 文件名格式: {project_id}_{device_code}_{时间戳}.wav
- 例如: 92_1#-1_20251218142000.wav
-
- 流程:
- 1. 加载音频
- 2. 计算能量判断设备状态
- 3. 进行AE异常检测,记录重建误差
- 4. 保存音频数据用于计算频谱图
- """
- try:
- # 从文件名解析device_code
- # 格式: {project_id}_{device_code}_{时间戳}.wav
- try:
- parts = wav_file.stem.split('_')
- if len(parts) >= 3:
- device_code = parts[1] # 第二部分是device_code
- else:
- device_code = "unknown"
- except:
- device_code = "unknown"
-
- # ========================================
- # 泵启停状态检测(基于 PLC 查询)- 优先于模型检查
- # ----------------------------------------
- # 逻辑:通过 SCADA API 查询 PLC 点位判断泵是否运行
- # 作用:
- # 1. 冷启动模式下也能过滤停机时的音频(保证训练数据质量)
- # 2. 泵停机时跳过异常检测(避免无意义的检测)
- # 3. 记录设备状态用于后续上报
- # 依赖:pump_state_monitor + rtsp_config 中的 pump_status_plc 配置
- # ========================================
- device_status = "未知"
- pump_is_running = True # 默认认为运行中(PLC 查询失败时的保守策略)
-
- # 获取该设备对应的流配置
- stream_config = self.device_map.get(device_code)
-
- if self.pump_state_monitor and stream_config:
- # 根据设备的 pump_name 找到关联的 PLC 点位配置
- pump_name = stream_config.pump_name
- pump_configs = self.pump_status_plc_configs.get(pump_name, [])
-
- if pump_configs:
- # 遍历所有关联泵,只要有一个运行就认为设备在工作
- any_pump_running = False
- for pump_cfg in pump_configs:
- point = pump_cfg.get("point", "")
- name = pump_cfg.get("name", point)
- pump_id = point
-
- # 查询泵状态(带 60 秒缓存,不会频繁请求 SCADA)
- is_running, _ = self.pump_state_monitor.update_pump_state(pump_id, point, name)
- if is_running:
- any_pump_running = True
-
- pump_is_running = any_pump_running
- device_status = "开机" if pump_is_running else "停机"
-
- # 泵全部停机时跳过(可通过配置禁用此行为)
- # 冷启动和正常模式都适用,确保训练数据质量
- if self.skip_detection_when_stopped and not pump_is_running:
- logger.info(f"泵停机(PLC): {device_code} | 归档到过渡期目录(不用于训练)")
- self._move_audio_to_transition_dir(wav_file, "stopped")
- return
-
- # ========================================
- # 过渡期检测(泵启停后15分钟内)
- # ----------------------------------------
- # 目的:过滤启停过程中的不稳定音频
- # 确保训练数据只包含稳定运行期的音频
- # ========================================
- if self.skip_detection_when_stopped:
- pump_in_transition, transition_pump_names = \
- self.pump_state_monitor.check_pumps_transition(pump_configs)
-
- if pump_in_transition:
- logger.info(f"泵过渡期: {device_code} | 归档到过渡期目录(不用于训练) | "
- f"过渡期泵: {', '.join(transition_pump_names)}")
- self._move_audio_to_transition_dir(wav_file, "transition")
- return
-
- # 获取该设备的预测器(懒加载模型)
- device_predictor = self.multi_predictor.get_predictor(device_code)
- if device_predictor is None:
- logger.info(f"冷启动模式(设备 {device_code} 无模型): 归档 {wav_file.name}")
- self._move_audio_to_date_dir(wav_file)
- return
- # 加载音频
- try:
- y, sr = librosa.load(str(wav_file), sr=CFG.SR, mono=True)
- except Exception as e:
- logger.error(f"音频加载失败: {wav_file.name} | {e}")
- return
-
- # 记录状态到缓存(用于周期上报)
- # 注意:泵状态检测已在前面完成,这里只记录状态
- self.device_cache[device_code]["status"] = device_status
-
- # ========================================
- # 计算重建误差
- # ========================================
- error = self._compute_reconstruction_error(wav_file, device_predictor)
-
- if error is not None:
- self.device_cache[device_code]["errors"].append(error)
-
- # ========================================
- # 保存音频数据用于计算频谱图
- # ========================================
- self.device_cache[device_code]["audio_data"].append(y)
-
- # ========================================
- # 暂存文件路径,等待周期聚合判定后再归档
- # ========================================
- if "pending_files" not in self.device_cache[device_code]:
- self.device_cache[device_code]["pending_files"] = []
- self.device_cache[device_code]["pending_files"].append(wav_file)
-
- # ========================================
- # 记录到音频历史缓存(用于异常上下文捕获)
- # ========================================
- if self.context_capture_enabled:
- self.audio_file_history[device_code].append((datetime.now(), wav_file))
-
- # 初始化上次上报时间
- if self.device_cache[device_code]["last_upload"] is None:
- self.device_cache[device_code]["last_upload"] = datetime.now()
-
- # 获取阈值并判断结果
- threshold = self._get_threshold(device_code)
-
- # ========================================
- # 快速通道:连续多个文件误差极高时快速预警(暂时关闭)
- # 不走投票窗口,用于捕获突发性严重故障
- # ========================================
- # if error is not None and threshold is not None and threshold > 0:
- # # 维护快速通道缓冲区
- # if "fast_alert_buffer" not in self.device_cache[device_code]:
- # self.device_cache[device_code]["fast_alert_buffer"] = []
- #
- # fast_buf = self.device_cache[device_code]["fast_alert_buffer"]
- # # 连续性检查:误差超过 2x 阈值时记录,否则清空缓冲区
- # if error > threshold * 2.0:
- # fast_buf.append(error)
- # else:
- # fast_buf.clear()
- #
- # # 连续 3 个文件(~24秒)都超过 2x 阈值 → 触发快速预警
- # FAST_ALERT_CONSECUTIVE = 3
- # if len(fast_buf) >= FAST_ALERT_CONSECUTIVE:
- # # 检查是否已触发过(避免重复告警)
- # last_fast = self.device_cache[device_code].get("last_fast_alert_time")
- # now = datetime.now()
- # can_fast_alert = (last_fast is None or
- # (now - last_fast).total_seconds() > 300) # 5分钟冷却
- #
- # if can_fast_alert:
- # # 快速通道同样受抑制逻辑约束
- # suppress = False
- # # 泵过渡期抑制
- # if self.pump_state_monitor and stream_config:
- # pump_name = stream_config.pump_name
- # pump_configs = self.pump_status_plc_configs.get(pump_name, [])
- # if pump_configs:
- # in_transition, _ = self.pump_state_monitor.check_pumps_transition(pump_configs)
- # if in_transition:
- # suppress = True
- # # 人体检测抑制
- # if self.human_detection_enabled and self.human_reader:
- # if self.human_reader.is_in_cooldown():
- # suppress = True
- # # alert_enabled 开关
- # if not self.alert_enabled:
- # suppress = True
- #
- # if not suppress:
- # avg_fast = float(np.mean(fast_buf))
- # logger.warning(
- # f"[!!] 快速通道触发: {device_code} | "
- # f"连续{len(fast_buf)}个文件异常 | "
- # f"平均误差={avg_fast:.6f} 阈值={threshold:.6f}")
- # self.device_cache[device_code]["last_fast_alert_time"] = now
- # fast_buf.clear()
- # # 标记快速预警,在下次周期上报时一并处理
- # self.device_cache[device_code]["fast_alert_pending"] = True
-
- if error is not None and threshold is not None:
- is_anomaly = error > threshold
- result_tag = "!!" if is_anomaly else "OK"
- logger.info(f"[{result_tag}] {device_code} | {wav_file.name} | "
- f"误差={error:.6f} 阈值={threshold:.6f}")
- elif error is not None:
- logger.debug(f"文件预测: {wav_file.name} | 误差={error:.6f} | 阀值=未设置")
- else:
- logger.warning(f"预测跳过: {wav_file.name} | 误差计算失败")
-
- except Exception as e:
- logger.error(f"处理文件失败: {wav_file.name} | 错误: {e}")
-
- def _compute_reconstruction_error(self, wav_file, device_predictor):
- """
- 计算单个音频文件的重建误差(Min-Max 标准化)
-
- 使用8秒窗口、4秒步长切割音频,提取多个patches分别计算误差后取平均值。
-
- 参数:
- wav_file: 音频文件路径
- device_predictor: 设备预测器实例
-
- 返回:
- 重建误差值(所有patches的平均MSE),失败返回None
- """
- try:
- import torch
- from predictor.utils import align_to_target
-
- # Min-Max 标准化参数
- global_min = device_predictor.global_min
- global_max = device_predictor.global_max
-
- # 加载音频
- y, _ = librosa.load(str(wav_file), sr=CFG.SR, mono=True)
-
- win_samples = int(CFG.WIN_SEC * CFG.SR)
- hop_samples = int(CFG.HOP_SEC * CFG.SR)
-
- if len(y) < win_samples:
- logger.warning(f"音频太短,无法提取patches: {wav_file.name}")
- return None
-
- patches = []
- for start in range(0, len(y) - win_samples + 1, hop_samples):
- window = y[start:start + win_samples]
-
- S = librosa.feature.melspectrogram(
- y=window, sr=CFG.SR, n_fft=CFG.N_FFT,
- hop_length=CFG.HOP_LENGTH, n_mels=CFG.N_MELS, power=2.0
- )
- S_db = librosa.power_to_db(S, ref=np.max)
-
- if S_db.shape[1] < CFG.TARGET_FRAMES:
- S_db = np.pad(S_db, ((0, 0), (0, CFG.TARGET_FRAMES - S_db.shape[1])))
- else:
- S_db = S_db[:, :CFG.TARGET_FRAMES]
-
- # Min-Max 标准化
- S_norm = (S_db - global_min) / (global_max - global_min + 1e-6)
- patches.append(S_norm.astype(np.float32))
-
- if not patches:
- logger.warning(f"未能提取任何patches: {wav_file.name}")
- return None
-
- arr = np.stack(patches, 0)
- arr = np.expand_dims(arr, 1)
- tensor = torch.from_numpy(arr)
-
- torch_device = device_predictor.torch_device
- tensor = tensor.to(torch_device)
-
- with torch.no_grad():
- recon = device_predictor.model(tensor)
- recon = align_to_target(recon, tensor)
- mse_per_patch = torch.mean((recon - tensor) ** 2, dim=[1, 2, 3])
- mean_mse = torch.mean(mse_per_patch).item()
-
- logger.debug(f"重建误差: {wav_file.name} | patches={len(patches)} | MSE={mean_mse:.6f}")
- return mean_mse
-
- except Exception as e:
- logger.error(f"计算重建误差失败: {wav_file.name} | {e}")
- return None
-
- def _check_periodic_upload(self):
- """
- 检查是否需要进行周期性上报
-
- 每分钟汇总一次各设备的检测结果并上报
- """
- now = datetime.now()
-
- for device_code, cache in self.device_cache.items():
- # 检查上报时间间隔
- last_upload = cache.get("last_upload")
- if last_upload is None:
- continue
-
- elapsed = (now - last_upload).total_seconds()
-
- # 达到上报周期
- if elapsed >= self.segment_duration:
- # 获取该设备的流配置
- stream_config = self.device_map.get(device_code)
-
- # 计算平均重建误差
- errors = cache.get("errors", [])
- avg_error = float(np.mean(errors)) if errors else 0.0
-
- # 获取阈值
- threshold = self._get_threshold(device_code)
-
- # 判断当前周期是否异常(带容差区间)
- # 容差区间:避免边界值反复跳变,但灰区内仍与阈值比较
- if threshold:
- upper_bound = threshold * (1 + self.tolerance_ratio) # 确定异常边界
- lower_bound = threshold * (1 - self.tolerance_ratio) # 确定正常边界
-
- if avg_error > upper_bound:
- # 超过上边界 -> 确定异常
- is_current_anomaly = True
- elif avg_error < lower_bound:
- # 低于下边界 -> 确定正常
- is_current_anomaly = False
- else:
- # 灰区 -> 与阈值比较(避免异常状态延长)
- is_current_anomaly = avg_error > threshold
-
- # 记录本次判定结果
- self.last_single_anomaly[device_code] = is_current_anomaly
- else:
- is_current_anomaly = False
-
- # ========================================
- # 滑动窗口投票:5次中有3次异常才判定为异常
- # ========================================
- if device_code not in self.detection_history:
- self.detection_history[device_code] = []
-
- # 记录本次检测结果
- self.detection_history[device_code].append(is_current_anomaly)
-
- # 保持窗口大小
- if len(self.detection_history[device_code]) > self.voting_window_size:
- self.detection_history[device_code].pop(0)
-
- # 投票判定最终异常状态
- if self.voting_enabled and len(self.detection_history[device_code]) >= self.voting_window_size:
- anomaly_count = sum(self.detection_history[device_code])
- is_anomaly = anomaly_count >= self.voting_threshold
- window_info = f"窗口[{anomaly_count}/{self.voting_window_size}]"
- else:
- is_anomaly = is_current_anomaly
- window_info = "窗口未满"
-
- # ========================================
- # 状态变更检测
- # trigger_alert: 仅在 正常(或初始) -> 异常 时为True
- # 冷却逻辑已移至 AlertAggregator 内部处理
- # ========================================
- last_is_anomaly = self.last_report_status.get(device_code)
- trigger_alert = False
-
- if is_anomaly:
- # 只有状态变化(正常->异常) 才触发报警流程
- if last_is_anomaly is None or not last_is_anomaly:
- # ========================================
- # 泵启停过渡期检查(抑制误报)
- # ----------------------------------------
- # 逻辑:检测到异常时,检查关联泵是否刚启停
- # 作用:泵启停过程中音频特征剧烈变化,易被误判为异常
- # 过渡期内抑制告警,避免误报
- # 过渡期窗口:配置中的 transition_window_minutes(默认15分钟)
- # ========================================
- pump_in_transition = False
- transition_pump_names = []
-
- if self.pump_state_monitor and stream_config:
- pump_name = stream_config.pump_name
- pump_configs = self.pump_status_plc_configs.get(pump_name, [])
-
- if pump_configs:
- # 批量检查所有关联泵是否有处于过渡期的
- pump_in_transition, transition_pump_names = \
- self.pump_state_monitor.check_pumps_transition(pump_configs)
-
- if pump_in_transition:
- # 有泵处于过渡期 -> 抑制本次告警
- trigger_alert = False
- logger.info(f"泵启停过渡期,抑制告警: {device_code} | "
- f"过渡期泵: {', '.join(transition_pump_names)}")
- else:
- # 检查 alert_enabled 开关:如果禁用则不触发告警
- if self.alert_enabled:
- # ========================================
- # 人体检测抑制:任意摄像头检测到人则不报警
- # ========================================
- if self.human_detection_enabled and self.human_reader:
- if self.human_reader.is_in_cooldown():
- trigger_alert = False
- status_info = self.human_reader.get_status_info()
- logger.info(f"人体检测抑制: {device_code} | {status_info},跳过报警")
- else:
- trigger_alert = True
- else:
- trigger_alert = True
- else:
- trigger_alert = False
- logger.debug(f"异常告警已禁用,跳过告警: {device_code}")
-
- # 获取运行状态
- running_status = cache.get("status", "未知")
-
- # 获取进水流量
- inlet_flow = self._get_inlet_flow(stream_config) if stream_config else 0.0
-
- # 计算本次频谱图
- audio_data = cache.get("audio_data", [])
- freq_db = self._compute_frequency_spectrum(audio_data)
-
- # 保存到频谱图历史(只保存dB值)
- if self.freq_history_enabled and freq_db:
- self.freq_history[device_code].append((now, freq_db))
- # 清理过期历史
- cutoff = now - timedelta(minutes=self.freq_history_minutes)
- self.freq_history[device_code] = [
- (t, d) for t, d in self.freq_history[device_code]
- if t > cutoff
- ]
-
- # 计算历史频谱图平均值(normal_frequency_middle)
- freq_middle_db = self._compute_frequency_middle(device_code)
-
- # ========================================
- # 异常分类:只在状态从正常变为异常时进行分类
- # 持续异常期间沿用上次分类结果,保持一致性
- # ========================================
- anomaly_type_code = 6 # 默认:未分类异常
- type_name = "未分类异常"
-
- if is_anomaly and audio_data:
- # 检查是否是新的异常(从正常变为异常)
- is_new_anomaly = (last_is_anomaly is None or not last_is_anomaly)
-
- if is_new_anomaly:
- # 新异常:进行分类并锁定结果
- try:
- from core.anomaly_classifier import classify_anomaly
- if len(audio_data) > 0:
- y = audio_data[-1] if isinstance(audio_data[-1], np.ndarray) else np.array(audio_data[-1])
- anomaly_type_code, type_name, confidence = classify_anomaly(y, sr=16000)
- # 锁定分类结果
- self.locked_anomaly_type[device_code] = (anomaly_type_code, type_name)
- logger.info(f"异常分类(新异常): {type_name} (code={anomaly_type_code}, 置信度={confidence:.2f})")
- except Exception as e:
- logger.warning(f"异常分类失败: {e}")
- else:
- # 持续异常:沿用锁定的分类结果
- if device_code in self.locked_anomaly_type:
- anomaly_type_code, type_name = self.locked_anomaly_type[device_code]
- logger.debug(f"异常分类(沿用): {type_name} (code={anomaly_type_code})")
- else:
- # 状态正常时清除锁定的分类结果
- if device_code in self.locked_anomaly_type:
- del self.locked_anomaly_type[device_code]
-
- # 上报逻辑
- if self.push_enabled and stream_config:
- # 预读异常音频base64:在文件被归档/清空之前立即读取
- # 解决竞态问题:异步推送或聚合器延迟推送时文件可能已被移走
- pre_read_wav_b64 = ""
- if trigger_alert:
- try:
- current_pending = cache.get("pending_files", [])
- if current_pending and current_pending[0].exists():
- with open(current_pending[0], "rb") as f:
- pre_read_wav_b64 = base64.b64encode(f.read()).decode('utf-8')
- logger.debug(f"预读异常音频成功: {current_pending[0].name} | size={len(pre_read_wav_b64)}")
- except Exception as e:
- logger.warning(f"预读异常音频失败: {e}")
- if trigger_alert and self.alert_aggregator:
- # 报警走聚合器:跨设备聚合判定 + 分类型冷却
- # 聚合器会在窗口到期后决定是否真正推送
- self.alert_aggregator.submit_alert(
- plant_name=stream_config.plant_name,
- device_code=device_code,
- anomaly_type_code=anomaly_type_code,
- push_kwargs=dict(
- stream_config=stream_config,
- device_code=device_code,
- is_anomaly=is_anomaly,
- trigger_alert=True,
- abnormal_score=avg_error,
- score_threshold=threshold,
- running_status=running_status,
- inlet_flow=inlet_flow,
- freq_db=freq_db,
- freq_middle_db=freq_middle_db,
- anomaly_type_code=anomaly_type_code,
- abnormal_wav_b64=pre_read_wav_b64
- )
- )
- else:
- # 非报警(心跳)或聚合器不可用 -> 提交到线程池异步推送
- self._push_executor.submit(
- self._push_detection_result,
- stream_config=stream_config,
- device_code=device_code,
- is_anomaly=is_anomaly,
- trigger_alert=trigger_alert,
- abnormal_score=avg_error,
- score_threshold=threshold,
- running_status=running_status,
- inlet_flow=inlet_flow,
- freq_db=freq_db,
- freq_middle_db=freq_middle_db,
- anomaly_type_code=anomaly_type_code,
- abnormal_wav_b64=pre_read_wav_b64
- )
-
- # 更新上一次状态
- self.last_report_status[device_code] = is_anomaly
-
- # 日志记录
- thr_str = f"{threshold:.6f}" if threshold else "未设置"
- # 报警去向说明
- if trigger_alert and self.alert_aggregator:
- alert_dest = "-> 聚合器"
- elif trigger_alert:
- alert_dest = "-> 直接推送"
- else:
- alert_dest = ""
-
- # 使用设备名作为标识,增加视觉分隔
- cam_label = ""
- if stream_config:
- cam_label = f"({stream_config.camera_name})"
-
- result_emoji = "!!" if is_anomaly else "OK"
- alert_str = f"报警=是 {alert_dest}" if trigger_alert else "报警=否"
-
- logger.info(
- f"[{result_emoji}] {device_code}{cam_label} | "
- f"误差={avg_error:.6f} 阈值={thr_str} | "
- f"{window_info} | {running_status} | "
- f"{'异常' if is_anomaly else '正常'} | {alert_str}"
- )
-
- # ========================================
- # 根据单次检测结果归档文件
- # 归档基于 is_current_anomaly(单次检测),而非投票后的 is_anomaly
- # 投票机制只影响是否推送报警,不影响文件分类
- # ========================================
- pending_files = cache.get("pending_files", [])
-
- # 检查是否是新的异常(从正常变为异常)
- is_new_anomaly = is_anomaly and (last_is_anomaly is None or not last_is_anomaly)
-
- # ========================================
- # 异常上下文捕获逻辑
- # ========================================
- if self.context_capture_enabled:
- # 检查并更新捕获状态
- self._update_anomaly_capture_state(device_code, is_anomaly, is_new_anomaly,
- avg_error, threshold, now, pending_files)
-
- if pending_files:
- if is_current_anomaly:
- # 单次检测为异常 -> 归档到异常目录
- # 检查异常保存冷却时间
- last_save = self.last_anomaly_save_time.get(device_code)
- should_save = True
-
- if last_save:
- elapsed_minutes = (now - last_save).total_seconds() / 60
- should_save = elapsed_minutes >= self.anomaly_save_cooldown_minutes
-
- if should_save and not self.context_capture_enabled:
- # 上下文捕获禁用时,使用原有逻辑:移动到异常待排查目录
- for f in pending_files:
- self._move_audio_to_anomaly_pending(f)
- self.last_anomaly_save_time[device_code] = now
- logger.warning(f"已隔离 {len(pending_files)} 个异常文件到待排查目录")
- elif self.context_capture_enabled:
- # 上下文捕获启用时:
- # - 异常文件已在 _update_anomaly_capture_state 中记录路径
- # - 文件会在 _save_anomaly_context 中被移动到异常目录
- # - 这里暂时保留文件,不做处理
- pass # 文件由捕获逻辑处理
- else:
- # 冷却时间内,删除文件不保存
- for f in pending_files:
- try:
- f.unlink()
- except Exception as e:
- logger.debug(f"删除冷却期内文件失败: {f.name} | {e}")
- remaining_minutes = self.anomaly_save_cooldown_minutes - elapsed_minutes
- logger.debug(f"异常保存冷却中: {device_code} | 剩余 {remaining_minutes:.1f} 分钟")
- else:
- # 单次检测为正常 -> 移到日期目录归档
- for f in pending_files:
- self._move_audio_to_date_dir(f)
- # 状态恢复正常时,清除保存冷却时间(下次异常时可立即保存)
- if device_code in self.last_anomaly_save_time:
- del self.last_anomaly_save_time[device_code]
-
- # 重置缓存
- cache["errors"] = []
- cache["audio_data"] = []
- cache["pending_files"] = []
- cache["last_upload"] = now
- logger.info("─" * 60)
-
- def _update_anomaly_capture_state(self, device_code, is_anomaly,
- is_new_anomaly, avg_error,
- threshold, now, pending_files):
- """
- 更新异常上下文捕获状态
-
- 状态机:
- 1. 未触发 -> 检测到新异常 -> 触发捕获,从 audio_file_history 回溯获取文件
- 2. 已触发,等待后续 -> 持续收集后续文件
- 3. 已触发,时间到 -> 保存所有文件和元数据
-
- 修复说明:
- - anomaly_files 不再依赖 pending_files(可能为空)
- - 改为从 audio_file_history 回溯获取触发时刻前 1 分钟内的文件
- - pre_files 改为获取触发时刻前 1~N+1 分钟的文件(排除 anomaly 时间段)
-
- 参数:
- device_code: 设备编号
- is_anomaly: 当前周期是否异常
- is_new_anomaly: 是否是新异常(从正常变为异常)
- avg_error: 平均重建误差
- threshold: 阈值
- now: 当前时间
- pending_files: 当前周期的待处理文件
- """
- import shutil
- import json
-
- state = self.anomaly_capture_state.get(device_code)
-
- if is_new_anomaly and state is None:
- # 新异常触发:开始捕获
- # ========================================
- # 从 audio_file_history 回溯获取文件
- # ----------------------------------------
- # anomaly_files: 触发时刻前 1 分钟内的文件(最接近异常的时间段)
- # pre_files: 触发时刻前 1~(N+1) 分钟的文件(更早的正常时间段)
- # ========================================
- anomaly_cutoff = now - timedelta(minutes=1) # 前1分钟
- pre_cutoff = now - timedelta(minutes=self.context_pre_minutes + 1) # 前N+1分钟
-
- pre_files = []
- anomaly_files = []
-
- for ts, fpath in self.audio_file_history[device_code]:
- if not fpath.exists():
- continue
- if ts >= anomaly_cutoff:
- # 触发时刻前1分钟内 -> anomaly_files
- anomaly_files.append(fpath)
- elif ts >= pre_cutoff:
- # 触发时刻前1~(N+1)分钟 -> pre_files
- pre_files.append(fpath)
-
- # 如果 anomaly_files 仍为空,把当前 pending_files 加入(兜底)
- if not anomaly_files and pending_files:
- anomaly_files = [f for f in pending_files if f.exists()]
-
- # 初始化捕获状态
- self.anomaly_capture_state[device_code] = {
- "trigger_time": now,
- "avg_error": avg_error,
- "threshold": threshold,
- "pre_files": pre_files,
- "anomaly_files": anomaly_files,
- "post_files": [],
- "post_start_time": now
- }
-
- logger.info(f"异常上下文捕获已触发: {device_code} | "
- f"前置文件={len(pre_files)}个 | 异常文件={len(anomaly_files)}个 | "
- f"等待后续{self.context_post_minutes}分钟")
-
- elif state is not None:
- # 已触发状态:收集后续文件
- elapsed_post = (now - state["post_start_time"]).total_seconds() / 60
-
- if elapsed_post < self.context_post_minutes:
- # 还在收集后续文件
- for f in pending_files:
- if f.exists():
- state["post_files"].append(f)
- else:
- # 时间到,保存所有文件
- self._save_anomaly_context(device_code, state)
- # 清除状态
- del self.anomaly_capture_state[device_code]
- # 更新保存冷却时间
- self.last_anomaly_save_time[device_code] = now
-
- def _save_anomaly_context(self, device_code: str, state: dict):
- """
- 保存异常上下文文件到独立目录
-
- 目录结构:
- data/anomaly_detected/{device_code}/{异常文件名(不含扩展名)}/
- ├── 92_1#-1_20260130140313.wav (保持原文件名)
- ├── ...
- └── metadata.json
-
- metadata.json 字段说明:
- - before_trigger: 触发前 1~(N+1) 分钟的文件(正常时期,用于对比)
- - at_trigger: 触发时刻前 1 分钟内的文件(异常开始出现的时期)
- - after_trigger: 触发后 N 分钟的文件(不管异常是否恢复)
-
- 参数:
- device_code: 设备编号
- state: 捕获状态字典
- """
- import shutil
- import json
-
- try:
- # 获取第一个异常文件名作为文件夹名
- anomaly_files = state.get("anomaly_files", [])
- if not anomaly_files:
- logger.warning(f"无异常文件,跳过保存: {device_code}")
- return
-
- # 用第一个异常文件名(不含扩展名)作为文件夹名
- first_anomaly = anomaly_files[0]
- folder_name = first_anomaly.stem # 如 92_1#-1_20260130140313
- save_dir = self.save_anomaly_dir / device_code / folder_name
- save_dir.mkdir(parents=True, exist_ok=True)
-
- # 收集所有文件名(使用新命名)
- all_files = {"before_trigger": [], "at_trigger": [], "after_trigger": []}
-
- # 移动前置文件(来自日期目录,移动后原位置不再保留)
- for fpath in state.get("pre_files", []):
- if fpath.exists():
- dest = save_dir / fpath.name
- shutil.move(str(fpath), str(dest)) # 移动而非复制,避免重复
- all_files["before_trigger"].append(fpath.name)
-
- # 移动触发时刻文件(保持原名)
- for fpath in anomaly_files:
- if fpath.exists():
- dest = save_dir / fpath.name
- shutil.move(str(fpath), str(dest))
- all_files["at_trigger"].append(fpath.name)
-
- # 移动后续文件(保持原名)
- for fpath in state.get("post_files", []):
- if fpath.exists():
- dest = save_dir / fpath.name
- shutil.move(str(fpath), str(dest))
- all_files["after_trigger"].append(fpath.name)
-
- # 生成精简的元数据文件
- trigger_time = state.get("trigger_time")
- metadata = {
- "device_code": device_code,
- "trigger_time": trigger_time.strftime("%Y-%m-%d %H:%M:%S") if trigger_time else None,
- "avg_error": round(state.get("avg_error", 0.0), 6),
- "threshold": round(state.get("threshold", 0.0), 6),
- "files": all_files
- }
-
- metadata_path = save_dir / "metadata.json"
- with open(metadata_path, 'w', encoding='utf-8') as f:
- json.dump(metadata, f, ensure_ascii=False, indent=2)
-
- total = sum(len(v) for v in all_files.values())
- logger.warning(f"异常上下文已保存: {device_code}/{folder_name} | "
- f"共{total}个文件 (前{len(all_files['before_trigger'])}+异常{len(all_files['at_trigger'])}+后{len(all_files['after_trigger'])})")
-
- except Exception as e:
- logger.error(f"保存异常上下文失败: {device_code} | {e}")
-
- def _compute_frequency_middle(self, device_code):
- """
- 计算历史频谱图平均值(normal_frequency_middle)
-
- 参数:
- device_code: 设备编号
-
- 返回:
- dB值列表的平均值
- """
- history = self.freq_history.get(device_code, [])
- if not history or len(history) < 2:
- return []
-
- try:
- # 收集所有历史dB值(只保留长度一致的)
- all_db = []
- ref_len = len(history[0][1]) # 使用第一条记录的长度作为参考
-
- for _, db in history:
- if len(db) == ref_len:
- all_db.append(db)
-
- if not all_db:
- return []
-
- # 计算各频率点的平均dB
- avg_db = np.mean(all_db, axis=0).tolist()
- return avg_db
-
- except Exception as e:
- logger.error(f"计算频谱图历史平均失败: {e}")
- return []
-
- def _get_threshold(self, device_code):
- """
- 获取指定设备的阈值
-
- 优先级:
- 1. 从 multi_predictor 获取设备对应模型的阈值
- 2. 使用配置文件中的默认阈值
- 3. 返回0.0(不进行异常判定)
-
- 参数:
- device_code: 设备编号(如 "LT-1")
-
- 返回:
- 阈值,未找到返回默认值或0.0
- """
- # 方式1:从 multi_predictor 获取设备阈值
- thr = self.multi_predictor.get_threshold(device_code)
- if thr is not None:
- logger.debug(f"阈值来源: {device_code} -> 设备模型阈值 = {thr:.6f}")
- return thr
-
- # 方式2:使用配置中的默认阈值
- default_threshold = self.config.get('prediction', {}).get('default_threshold', 0.0)
-
- if default_threshold > 0:
- logger.debug(f"阈值来源: {device_code} -> 配置默认值 = {default_threshold:.6f}")
- return default_threshold
-
- # 首次找不到阈值时记录警告
- if device_code not in getattr(self, '_threshold_warned', set()):
- if not hasattr(self, '_threshold_warned'):
- self._threshold_warned = set()
- self._threshold_warned.add(device_code)
- logger.warning(f"未找到阈值: {device_code},将跳过异常判定")
-
- return 0.0
-
- def _get_inlet_flow(self, stream_config: RTSPStreamConfig) -> float:
- """
- 获取进水流量(使用实时数据接口)
-
- 使用 current-data 接口直接获取最新一条数据
-
- 参数:
- stream_config: 流配置
-
- 返回:
- 进水流量值,失败返回0.0
- """
- if not self.scada_enabled or not stream_config:
- logger.debug(f"流量跳过: scada_enabled={self.scada_enabled}, stream_config={stream_config}")
- return 0.0
-
- # 获取PLC数据点位
- plc_address = stream_config.get_flow_plc_address()
- if not plc_address:
- logger.debug(f"流量跳过(无PLC地址): {stream_config.device_code} | pump_name='{stream_config.pump_name}'")
- return 0.0
-
- # 使用水厂配置的 project_id
- project_id = stream_config.project_id
-
- try:
- # 当前时间戳(毫秒)
- now_ms = int(datetime.now().timestamp() * 1000)
-
- # 请求参数
- params = {"time": now_ms}
-
- # 请求体:使用实时数据接口格式
- request_body = [
- {
- "deviceId": "1",
- "deviceItems": plc_address,
- "deviceName": f"流量_{stream_config.pump_name}",
- "project_id": project_id
- }
- ]
-
- # 请求头
- headers = {
- "Content-Type": "application/json",
- "JWT-TOKEN": self.scada_jwt
- }
-
- logger.debug(f"流量请求: {stream_config.device_code} | project_id={project_id} | plc={plc_address}")
-
- # 发送 POST 请求到实时接口
- response = requests.post(
- self.scada_realtime_url,
- params=params,
- json=request_body,
- headers=headers,
- timeout=self.scada_timeout
- )
-
- if response.status_code == 200:
- data = response.json()
- if data.get("code") == 200:
- if data.get("data"):
- # 获取第一条数据(实时接口只返回最新一条)
- latest = data["data"][0]
- if "val" in latest:
- flow = float(latest["val"])
- logger.debug(f"流量获取成功: {stream_config.device_code} | 流量={flow}")
- return flow
- else:
- logger.debug(f"流量数据无val字段: {stream_config.device_code}")
- else:
- # API正常返回但无数据
- logger.debug(f"流量查询无数据: {stream_config.device_code}")
- else:
- logger.warning(f"流量API返回异常: {stream_config.device_code} | code={data.get('code')} | msg={data.get('msg')}")
- else:
- logger.warning(f"流量HTTP错误: {stream_config.device_code} | status={response.status_code}")
-
- except Exception as e:
- logger.warning(f"流量获取异常: {stream_config.device_code} | {e}")
-
- return 0.0
-
- def _compute_frequency_spectrum(self, audio_data):
- """
- 计算频谱图数据
-
- 将1分钟内的多个8秒音频片段合并,计算整体FFT
-
- 参数:
- audio_data: 音频数据列表
-
- 返回:
- dB值列表(0-8000Hz均匀分布,共400个点)
- """
- if not audio_data:
- return []
-
- try:
- # 合并所有音频片段
- combined = np.concatenate(audio_data)
-
- # 计算FFT
- n = len(combined)
- fft_result = np.fft.rfft(combined)
- freqs = np.fft.rfftfreq(n, 1.0 / CFG.SR)
-
- # 计算幅度(转换为dB)
- magnitude = np.abs(fft_result)
- # 避免log(0)
- magnitude = np.maximum(magnitude, 1e-10)
- db = 20 * np.log10(magnitude)
-
- # 降采样到400个点(0-8000Hz均匀分布)
- max_freq = 8000
- num_points = 400
-
- db_list = []
-
- for i in range(num_points):
- # 计算目标频率(0到8000Hz均匀分布)
- target_freq = (i / (num_points - 1)) * max_freq
- # 找到最接近的频率索引
- idx = np.argmin(np.abs(freqs - target_freq))
- if idx < len(freqs):
- db_list.append(float(db[idx]))
-
- return db_list
-
- except Exception as e:
- logger.error(f"计算频谱图失败: {e}")
- return []
-
- def _push_detection_result(self, stream_config, device_code,
- is_anomaly, trigger_alert, abnormal_score, score_threshold,
- running_status, inlet_flow,
- freq_db, freq_middle_db=None,
- anomaly_type_code=6,
- abnormal_wav_b64=""):
- """
- 推送检测结果到远程服务器
-
- 上报格式符合用户要求:
- - 包含频谱图数据(this_frequency + normal_frequency_middle)
- - 包含启停状态和进水流量
- - sound_detection.status 和 abnormalwav 仅在 trigger_alert=True 时上报异常(0)和base64,否则为正常(1)和空
-
- 参数:
- stream_config: 流配置
- device_code: 设备编号
- is_anomaly: 实际检测是否异常
- trigger_alert: 是否触发报警(仅在新异常产生时为True)
- abnormal_score: 平均重建误差
- score_threshold: 阈值
- running_status: 启停状态
- inlet_flow: 进水流量
- freq_db: 本次频谱图dB值列表
- freq_middle_db: 历史平均频谱图dB值列表
- abnormal_wav_b64: 预读的异常音频base64编码(在文件归档前读取,避免竞态)
- """
- try:
- import time as time_module
-
- # 获取设备名称
- camera_name = stream_config.camera_name
-
- # 从 push_base_urls 列表构建推送目标(每个 base_url + /{project_id})
- if not self.push_base_urls:
- logger.warning(f"未配置push_base_urls: {device_code}")
- return
-
- # 获取该设备对应的 project_id,用于拼接最终推送URL
- project_id = stream_config.project_id
-
- # 构建推送消息
- request_time = int(time_module.time() * 1000)
-
- # 异常音频已通过参数 abnormal_wav_b64 传入(调用方在文件归档前预读)
- # 如果调用方未提供,记录警告以便排查
- if trigger_alert and not abnormal_wav_b64:
- logger.warning(f"报警推送但无异常音频数据: {device_code}")
- # 决定 sound_detection.status
- # 只有在 trigger_alert=True 时才报 0 (异常)
- # 其他情况(正常、持续异常)都报 1 (正常/空) -- 根据用户要求: "正常情况下 ... 是空 就行"
- report_status = 0 if trigger_alert else 1
-
- payload = {
- "message": {
- # 通道信息
- "channelInfo": {"name": camera_name},
- # 请求时间戳
- "requestTime": request_time,
- # 分类信息
- "classification": {
- "level_one": 2, # 音频检测大类
- "level_two": anomaly_type_code # 异常类型小类(6=未分类, 7=轴承, 8=气蚀, 9=松动, 10=叶轮, 11=阀件)
- },
- # 技能信息
- "skillInfo": {"name": "异响检测"},
- # 声音检测数据
- "sound_detection": {
- # 异常音频
- "abnormalwav": abnormal_wav_b64,
- # 状态:0=异常(仅新异常), 1=正常(或持续异常)
- "status": report_status,
- # 设备状态信息
- "condition": {
- "running_status": "运行中" if running_status == "开机" else "停机中",
- "inlet_flow": inlet_flow
- },
- # 得分信息
- "score": {
- "abnormal_score": abnormal_score, # 当前1分钟平均重构误差
- "score_threshold": score_threshold # 该设备异常阀值
- },
- # 频谱图数据
- "frequency": {
- "this_frequency": freq_db, # 当前1分钟的频谱图
- "normal_frequency_middle": freq_middle_db or [], # 过去10分钟的频谱图平均
- "normal_frequency_upper": [], # 上限(暂为空)
- "normal_frequency_lower": [] # 下限(暂为空)
- }
- }
- }
- }
-
- # 遍历所有推送基地址,逐个拼接 project_id 发送(各目标互不影响)
- push_targets = [
- (f"{item['url']}/{project_id}", item['label'])
- for item in self.push_base_urls
- ]
-
- # 逐个目标推送(各目标互不影响)
- for target_url, target_label in push_targets:
- self._send_to_target(
- target_url, target_label, payload,
- device_code, camera_name, trigger_alert, abnormal_score
- )
-
- except Exception as e:
- logger.error(f"推送通知异常: {e}")
-
- def _send_to_target(self, target_url, target_label, payload,
- device_code, camera_name, trigger_alert, abnormal_score):
- # 向单个推送目标发送数据(含重试逻辑)
- # 各目标独立调用,某个目标失败不影响其他目标
- import time as time_module
-
- push_success = False
- for attempt in range(self.push_retry_count + 1):
- try:
- response = requests.post(
- target_url,
- json=payload,
- timeout=self.push_timeout,
- headers={"Content-Type": "application/json"}
- )
-
- if response.status_code == 200:
- alert_tag = "报警" if trigger_alert else "心跳"
- logger.info(
- f" [{alert_tag}][{target_label}] {device_code}({camera_name}) | "
- f"误差={abnormal_score:.6f}"
- )
- push_success = True
- break
- else:
- logger.warning(
- f"推送失败[{target_label}]: {device_code} | URL={target_url} | "
- f"状态码={response.status_code} | 内容={response.text[:100]}"
- )
-
- except requests.exceptions.Timeout:
- logger.warning(f"推送超时[{target_label}]: {device_code} | URL={target_url} | 尝试 {attempt + 1}/{self.push_retry_count + 1}")
- except requests.exceptions.RequestException as e:
- logger.warning(f"推送异常[{target_label}]: {device_code} | URL={target_url} | {e}")
-
- # 重试间隔
- if attempt < self.push_retry_count:
- time_module.sleep(1)
-
- if not push_success:
- logger.error(f"推送失败[{target_label}]: {device_code} | URL={target_url} | 已达最大重试次数")
-
- def _move_audio_to_date_dir(self, wav_file):
- """
- 将音频移动到日期目录归档
-
- 目录结构:
- deploy_pickup/data/{device_code}/{日期}/{文件名}
-
- 参数:
- wav_file: 音频文件路径
- """
- try:
- import shutil
-
- # 从文件名提取device_code和日期
- # 格式: 92_1#-1_20251218142000.wav
- match = re.match(r'\d+_(.+)_(\d{8})\d{6}\.wav', wav_file.name)
- if not match:
- logger.warning(f"无法从文件名提取信息: {wav_file.name}")
- return
-
- device_code = match.group(1) # 如 1#-1
- date_str = match.group(2) # YYYYMMDD
-
- # 构建目标目录: data/{device_code}/{日期}/
- date_dir = self.audio_dir / device_code / date_str
- date_dir.mkdir(parents=True, exist_ok=True)
-
- # 移动文件
- dest_file = date_dir / wav_file.name
- shutil.move(str(wav_file), str(dest_file))
- logger.debug(f"音频已归档: {device_code}/{date_str}/{wav_file.name}")
-
- except Exception as e:
- logger.error(f"移动音频失败: {wav_file.name} | 错误: {e}")
-
- def _move_audio_to_transition_dir(self, wav_file, reason):
- """
- 将泵停机/过渡期的音频移动到过渡期目录
-
- 目录结构:
- deploy_pickup/data/{device_code}/pump_transition/{文件名}
-
- 这些音频不会被用于模型训练,但保留用于分析调试
-
- 参数:
- wav_file: 音频文件路径
- reason: 原因标识(stopped=停机, transition=过渡期)
- """
- try:
- import shutil
-
- # 从文件名提取device_code
- # 格式: 92_1#-1_20251218142000.wav
- match = re.match(r'\d+_(.+)_\d{14}\.wav', wav_file.name)
- if not match:
- logger.warning(f"无法从文件名提取信息: {wav_file.name}")
- return
-
- device_code = match.group(1) # 如 1#-1
-
- # 构建目标目录: data/{device_code}/pump_transition/
- transition_dir = self.audio_dir / device_code / "pump_transition"
- transition_dir.mkdir(parents=True, exist_ok=True)
-
- # 移动文件
- dest_file = transition_dir / wav_file.name
- shutil.move(str(wav_file), str(dest_file))
- logger.debug(f"过渡期音频已归档: {device_code}/pump_transition/{wav_file.name} ({reason})")
-
- except Exception as e:
- logger.error(f"移动过渡期音频失败: {wav_file.name} | 错误: {e}")
-
- def _move_audio_to_anomaly_pending(self, wav_file):
- """
- 将异常音频移动到待排查目录
-
- 目录结构:
- deploy_pickup/data/{device_code}/anomaly_pending/{文件名}
-
- 用户确认是误报后,可手动将文件移到日期目录参与增训
-
- 参数:
- wav_file: 音频文件路径
- """
- try:
- import shutil
-
- # 从文件名提取device_code
- # 格式: 92_1#-1_20251218142000.wav
- match = re.match(r'\d+_(.+)_\d{14}\.wav', wav_file.name)
- if not match:
- logger.warning(f"无法从文件名提取信息: {wav_file.name}")
- return
-
- device_code = match.group(1) # 如 1#-1
-
- # 构建目标目录: data/{device_code}/anomaly_pending/
- pending_dir = self.audio_dir / device_code / "anomaly_pending"
- pending_dir.mkdir(parents=True, exist_ok=True)
-
- # 移动文件
- dest_file = pending_dir / wav_file.name
- shutil.move(str(wav_file), str(dest_file))
- logger.debug(f"异常音频已隔离: {device_code}/anomaly_pending/{wav_file.name}")
-
- except Exception as e:
- logger.error(f"移动异常音频失败: {wav_file.name} | 错误: {e}")
-
- def _cleanup_old_files(self, days: int = 7):
- """
- 清理超过指定天数的正常音频文件
-
- 清理规则:
- - 只清理日期归档目录(data/{device_code}/{日期}/)
- - 保留current目录和anomaly_detected目录
- - 超过days天的文件删除
-
- 参数:
- days: 保留天数,默认7天
- """
- try:
- import shutil
- from datetime import datetime, timedelta
-
- # 计算截止日期
- cutoff_date = datetime.now() - timedelta(days=days)
- cutoff_str = cutoff_date.strftime("%Y%m%d")
-
- deleted_count = 0
-
- # 遍历每个设备目录
- for device_dir in self.audio_dir.iterdir():
- if not device_dir.is_dir():
- continue
-
- # 检查是否是日期目录(跳过current和其他特殊目录)
- for subdir in device_dir.iterdir():
- if not subdir.is_dir():
- continue
-
- # 跳过current目录
- if subdir.name == "current":
- continue
-
- # 检查是否为日期目录(YYYYMMDD格式)
- if not re.match(r'^\d{8}$', subdir.name):
- continue
-
- # 如果日期早于截止日期,删除整个目录
- if subdir.name < cutoff_str:
- try:
- shutil.rmtree(subdir)
- deleted_count += 1
- logger.debug(f"清理过期目录: {device_dir.name}/{subdir.name}")
- except Exception as e:
- logger.error(f"删除目录失败: {subdir} | {e}")
-
- if deleted_count > 0:
- logger.info(f"清理完成: 共删除{deleted_count}个过期目录")
-
- except Exception as e:
- logger.error(f"清理过期文件失败: {e}")
- class PickupMonitoringSystem:
- """
- 拾音器监控系统
-
- 管理FFmpeg进程和监控线程
- """
-
- def __init__(self, db_path=None):
- """
- 初始化监控系统
-
- 参数:
- db_path: SQLite 数据库路径(为 None 时使用默认路径 config/pickup_config.db)
- """
- self.db_path = db_path
-
- # 初始化 ConfigManager
- actual_db = Path(db_path) if db_path else get_db_path()
- if not actual_db.exists():
- raise FileNotFoundError(
- f"\u914d\u7f6e\u6570\u636e\u5e93\u4e0d\u5b58\u5728: {actual_db}\n"
- f"\u8bf7\u5148\u8fd0\u884c\u8fc1\u79fb\u811a\u672c: python tool/migrate_yaml_to_db.py"
- )
- self.config_manager = ConfigManager(str(actual_db))
- print(f"\u914d\u7f6e\u6e90: SQLite ({actual_db})")
-
- self.config = self._load_config()
-
- # 冷启动模式标记
- self.cold_start_mode = False
-
- # 初始化多模型预测器(支持每个设备独立模型)
- self.multi_predictor = MultiModelPredictor()
- self.predictor = None # 兼容性保留,已废弃
-
- # 从配置中注册所有设备的模型目录映射
- print("\n正在初始化多模型预测器...")
- for plant in self.config.get('plants', []):
- if not plant.get('enabled', False):
- continue
- for stream in plant.get('rtsp_streams', []):
- device_code = stream.get('device_code', '')
- model_subdir = stream.get('model_subdir', device_code)
- if device_code and model_subdir:
- self.multi_predictor.register_device(device_code, model_subdir)
- print(f" 注册设备: {device_code} -> models/{model_subdir}/")
-
- print(f"已注册 {len(self.multi_predictor.registered_devices)} 个设备模型映射")
-
- # 进程和监控器列表
- self.ffmpeg_processes = []
- self.monitors = []
-
- # 信号处理
- signal.signal(signal.SIGINT, self._signal_handler)
- signal.signal(signal.SIGTERM, self._signal_handler)
-
- def _load_config(self):
- """
- 从 SQLite DB 加载配置
-
- 返回:
- Dict: 配置字典
- """
- config = self.config_manager.get_full_config()
- logger.info("配置已从 SQLite 加载")
- return config
-
- def _parse_rtsp_streams(self):
- """
- 解析配置文件中的RTSP流信息
-
- 返回:
- List[RTSPStreamConfig]: RTSP流配置列表
- """
- streams = []
-
- plants = self.config.get('plants', [])
- if not plants:
- raise ValueError("配置文件中未找到水厂配置")
-
- for plant in plants:
- plant_name = plant.get('name')
- if not plant_name:
- print("警告: 跳过未命名的区域配置")
- continue
-
- # 检查是否启用该水厂(默认启用以兼容旧配置)
- if not plant.get('enabled', True):
- logger.debug(f"跳过禁用的水厂: {plant_name}")
- continue
-
- # 获取流量PLC配置
- flow_plc = plant.get('flow_plc', {})
-
- # 获取该水厂的project_id(每个plant有自己的project_id)
- project_id = plant.get('project_id', 92)
- logger.info(f"加载区域配置: {plant_name} | project_id={project_id}")
-
- rtsp_streams = plant.get('rtsp_streams', [])
- for stream in rtsp_streams:
- url = stream.get('url')
- channel = stream.get('channel')
- camera_name = stream.get('name', '')
- device_code = stream.get('device_code', '')
- pump_name = stream.get('pump_name', '')
-
- if not url or channel is None:
- print(f"警告: 跳过不完整的RTSP流配置 (区域: {plant_name})")
- continue
-
- streams.append(RTSPStreamConfig(
- plant_name=plant_name,
- rtsp_url=url,
- channel=channel,
- camera_name=camera_name,
- device_code=device_code,
- pump_name=pump_name,
- flow_plc=flow_plc,
- project_id=project_id
- ))
-
- return streams
-
- def start(self):
- """
- 启动监控系统
- """
- print("=" * 70)
- print("拾音器异响检测系统")
- print("=" * 70)
-
- # 解析流配置
- streams = self._parse_rtsp_streams()
- print(f"\n共配置 {len(streams)} 个拾音设备:")
- for stream in streams:
- print(f" - {stream.device_code} | {stream.camera_name}")
-
- # 启动FFmpeg进程
- print("\n启动FFmpeg进程...")
- for stream in streams:
- ffmpeg = FFmpegProcess(stream, CFG.AUDIO_DIR, self.config)
- if ffmpeg.start():
- self.ffmpeg_processes.append(ffmpeg)
- else:
- print(f"警告: FFmpeg启动失败,跳过该流: {stream}")
-
- if not self.ffmpeg_processes:
- print("\n错误: 所有FFmpeg进程均启动失败")
- sys.exit(1)
-
- print(f"\n成功启动 {len(self.ffmpeg_processes)}/{len(streams)} 个FFmpeg进程")
-
- # 收集所有流配置(用于PickupMonitor)
- all_stream_configs = [p.stream_config for p in self.ffmpeg_processes]
-
- # 启动监控线程(统一一个监控器)
- print("\n启动监控线程...")
- check_interval = self.config.get('prediction', {}).get('check_interval', 1.0)
-
- monitor = PickupMonitor(
- audio_dir=CFG.AUDIO_DIR,
- multi_predictor=self.multi_predictor,
- stream_configs=all_stream_configs,
- check_interval=check_interval,
- config=self.config,
- config_manager=self.config_manager
- )
- monitor.start()
- self.monitors.append(monitor)
-
- print(f"\n成功启动监控线程")
- print("\n" + "=" * 70)
- print("系统已启动,开始监控...")
- print("按 Ctrl+C 停止系统")
- print("=" * 70 + "\n")
-
- # FFmpeg重启配置
- max_restart_attempts = 5 # 单个进程最大重启次数
- restart_interval_base = 30 # 基础重启间隔(秒)-> 改为30s
- restart_counts = {id(p): 0 for p in self.ffmpeg_processes} # 重启计数
-
- # 主循环(带自动重启)
- try:
- while True:
- # 检查每个FFmpeg进程状态
- for ffmpeg in self.ffmpeg_processes:
- if not ffmpeg.is_running():
- pid = id(ffmpeg)
- device_code = ffmpeg.stream_config.device_code
-
- # 检查重启次数
- if restart_counts.get(pid, 0) < max_restart_attempts:
- # 计算等待时间(指数退避)
- wait_time = restart_interval_base * (2 ** restart_counts.get(pid, 0))
- logger.warning(f"FFmpeg进程停止: {device_code} | 将在{wait_time}秒后重启 | "
- f"重试次数: {restart_counts.get(pid, 0) + 1}/{max_restart_attempts}")
-
- time.sleep(wait_time)
-
- # 尝试重启
- if ffmpeg.start():
- logger.debug(f"FFmpeg重启成功: {device_code}")
- restart_counts[pid] = 0 # 重置计数
- else:
- restart_counts[pid] = restart_counts.get(pid, 0) + 1
- logger.error(f"FFmpeg重启失败: {device_code}")
- else:
- logger.error(f"FFmpeg达到最大重启次数: {device_code} | 已放弃重启")
-
- # 检查是否所有进程都已放弃
- running_count = sum(1 for p in self.ffmpeg_processes if p.is_running())
- all_abandoned = all(restart_counts.get(id(p), 0) >= max_restart_attempts
- for p in self.ffmpeg_processes if not p.is_running())
-
- if running_count == 0 and all_abandoned:
- print("\n错误: 所有FFmpeg进程均已停止且无法重启")
- break
-
- # 每天0点执行7天清理
- current_hour = datetime.now().hour
- current_date = datetime.now().strftime("%Y%m%d")
-
- if not hasattr(self, '_last_cleanup_date'):
- self._last_cleanup_date = ""
-
- # 在0点且今天还没清理过时执行
- if current_hour == 0 and self._last_cleanup_date != current_date:
- logger.info("执行7天过期文件清理...")
- for monitor in self.monitors:
- monitor._cleanup_old_files(days=7)
- self._last_cleanup_date = current_date
-
- # 定期打印RTSP状态(每分钟一次)
- if not hasattr(self, '_last_status_log'):
- self._last_status_log = datetime.now()
-
- if (datetime.now() - self._last_status_log).total_seconds() >= 60:
- running = sum(1 for p in self.ffmpeg_processes if p.is_running())
- total = len(self.ffmpeg_processes)
- logger.info(f"RTSP状态: {running}/{total} 个FFmpeg进程运行中")
- logger.info("─" * 60)
- self._last_status_log = datetime.now()
-
- time.sleep(10)
- except KeyboardInterrupt:
- print("\n\n收到停止信号,正在关闭系统...")
- finally:
- self.stop()
-
- def stop(self):
- """
- 停止监控系统
- """
- print("\n正在停止监控系统...")
-
- print("停止监控线程...")
- for monitor in self.monitors:
- monitor.stop()
-
- print("停止FFmpeg进程...")
- for ffmpeg in self.ffmpeg_processes:
- ffmpeg.stop()
-
- print("系统已完全停止")
-
- def _signal_handler(self, signum, frame):
- """
- 信号处理函数
- """
- print(f"\n\n收到信号 {signum},正在关闭系统...")
- self.stop()
- sys.exit(0)
- def _start_config_api_server(config_manager, multi_predictor=None, port=18080):
- # 在后台线程中启动 FastAPI 配置管理 API
- try:
- import uvicorn
- init_config_api(config_manager, multi_predictor)
- logger.info(f"启动配置管理 API: http://0.0.0.0:{port}")
- uvicorn.run(config_app, host="0.0.0.0", port=port, log_level="warning")
- except ImportError:
- logger.warning("uvicorn 未安装,配置管理 API 无法启动。请安装: pip install uvicorn")
- except Exception as e:
- logger.error(f"配置管理 API 启动失败: {e}")
- def main():
- """
- 主函数
- """
- # 检测 DB 是否存在
- db_path = get_db_path(Path(__file__).parent / "config")
- if not db_path.exists():
- print(f"错误: 配置数据库不存在: {db_path}")
- print(f"\n请先运行迁移脚本: python tool/migrate_yaml_to_db.py")
- sys.exit(1)
-
- try:
- system = PickupMonitoringSystem()
-
- # 后台线程启动配置管理 API
- api_port = 18080
- api_thread = threading.Thread(
- target=_start_config_api_server,
- args=(system.config_manager, system.multi_predictor, api_port),
- daemon=True,
- name="config-api"
- )
- api_thread.start()
- system.start()
- except FileNotFoundError as e:
- print(f"\n错误: {e}")
- print("\n请确保:")
- print("1. 已完成训练并计算阈值")
- print("2. 已复制必要的模型文件到 models/ 目录")
- sys.exit(1)
- except Exception as e:
- print(f"\n严重错误: {e}")
- import traceback
- traceback.print_exc()
- sys.exit(1)
- if __name__ == "__main__":
- main()
|