run_pickup_monitor.py 133 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. run_pickup_monitor.py
  5. ---------------------
  6. 拾音器异响检测主程序
  7. 功能说明:
  8. 该程序用于音频采集设备(拾音器)的实时异常检测。
  9. 与摄像头版本不同,本版本:
  10. 1. 没有视频/图片采集和上报
  11. 2. 上报时包含音频频谱图分析数据
  12. 3. 支持进水流量PLC数据读取
  13. 4. 每1分钟计算一次平均重建误差并上报
  14. 上报数据结构:
  15. {
  16. "message": {
  17. "channelInfo": {"name": "设备信息"},
  18. "requestTime": 时间戳,
  19. "classification": {
  20. "level_one": 2, // 音频检测大类
  21. "level_two": 6 // 异常类型小类: 6=未分类, 7=轴承, 8=气蚀, 9=松动/共振, 10=叶轮, 11=阀件
  22. },
  23. "skillInfo": {"name": "异响检测"},
  24. "sound_detection": {
  25. "abnormalwav": "", // base64编码的音频
  26. "status": 0/1, // 0=异常, 1=正常
  27. "condition": {
  28. "running_status": "运行中/停机中", // 启停状态
  29. "inlet_flow": 0.0 // 进水流量
  30. },
  31. "score": {
  32. "abnormal_score": 0.0, // 当前1分钟内8s平均重构误差
  33. "score_threshold": 0.0 // 该设备的异常阀值
  34. },
  35. "frequency": {
  36. "this_frequency": [], // 当前1分钟的频谱图
  37. "normal_frequency_middle": [], // 过去10分钟的频谱图平均
  38. "normal_frequency_upper": [], // 上限(暂为空)
  39. "normal_frequency_lower": [] // 下限(暂为空)
  40. }
  41. }
  42. }
  43. }
  44. 使用方法:
  45. python run_pickup_monitor.py
  46. 配置文件:
  47. config/rtsp_config.yaml
  48. """
  49. import subprocess
  50. import time
  51. import re
  52. import signal
  53. import sys
  54. import threading
  55. import logging
  56. import base64
  57. from concurrent.futures import ThreadPoolExecutor
  58. from pathlib import Path
  59. from datetime import datetime, timedelta
  60. from collections import defaultdict
  61. # 用于计算FFT
  62. import numpy as np
  63. try:
  64. import librosa
  65. except ImportError:
  66. print("错误:缺少librosa库,请安装: pip install librosa")
  67. sys.exit(1)
  68. try:
  69. import requests
  70. except ImportError:
  71. print("错误:缺少requests库,请安装: pip install requests")
  72. sys.exit(1)
  73. import json as _json
  74. from urllib import request as _urllib_request, error as _urllib_error
  75. # 导入预测器模块
  76. from predictor import MultiModelPredictor, CFG
  77. # 导入配置管理模块(SQLite)
  78. from config.config_manager import ConfigManager
  79. from config.config_api import app as config_app, init_config_api
  80. from config.db_models import get_db_path
  81. # 导入泵状态监控模块(用于检测启停过渡期)
  82. try:
  83. from core.pump_state_monitor import PumpStateMonitor
  84. PUMP_STATE_MONITOR_AVAILABLE = True
  85. except ImportError:
  86. PUMP_STATE_MONITOR_AVAILABLE = False
  87. # 导入人体检测读取模块(用于抑制有人时的误报)
  88. try:
  89. from core.human_detection_reader import HumanDetectionReader
  90. HUMAN_DETECTION_AVAILABLE = True
  91. except ImportError:
  92. HUMAN_DETECTION_AVAILABLE = False
  93. # 导入报警聚合器(跨设备聚合抑制 + 分类型冷却)
  94. try:
  95. from core.alert_aggregator import AlertAggregator
  96. ALERT_AGGREGATOR_AVAILABLE = True
  97. except ImportError:
  98. ALERT_AGGREGATOR_AVAILABLE = False
  99. # 导入音频上传旁路(边云协同 - 训练数据上传)
  100. # enabled=false 时所有入队操作都是 no-op,零开销
  101. try:
  102. from core.audio_uploader import AudioUploader
  103. AUDIO_UPLOADER_AVAILABLE = True
  104. except ImportError:
  105. AUDIO_UPLOADER_AVAILABLE = False
  106. # ========================================
  107. # 配置日志系统
  108. # ========================================
  109. def setup_logging():
  110. # 配置日志系统(RotatingFileHandler -> system.log)
  111. from logging.handlers import RotatingFileHandler
  112. # 如果根 logger 已经被上层调用者配置过,则直接复用
  113. root = logging.getLogger()
  114. if root.handlers:
  115. return logging.getLogger('PickupMonitor')
  116. # 日志配置
  117. log_dir = Path(__file__).parent / "logs"
  118. log_dir.mkdir(parents=True, exist_ok=True)
  119. log_file = log_dir / "system.log"
  120. formatter = logging.Formatter(
  121. '%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s',
  122. datefmt='%Y-%m-%d %H:%M:%S'
  123. )
  124. # 按文件大小轮转,最多保留 2 个备份(共 30MB)
  125. file_handler = RotatingFileHandler(
  126. log_file,
  127. maxBytes=10 * 1024 * 1024,
  128. backupCount=2,
  129. encoding='utf-8'
  130. )
  131. file_handler.setFormatter(formatter)
  132. # 控制台输出(前台运行时可见,后台运行时 stdout 已被丢弃不影响)
  133. console_handler = logging.StreamHandler(sys.stdout)
  134. console_handler.setFormatter(formatter)
  135. logging.basicConfig(
  136. level=logging.INFO,
  137. handlers=[file_handler, console_handler]
  138. )
  139. return logging.getLogger('PickupMonitor')
  140. # 初始化日志系统
  141. logger = setup_logging()
  142. # 导入能量基线模块
  143. try:
  144. from core.energy_baseline import EnergyBaseline
  145. ENERGY_BASELINE_AVAILABLE = True
  146. except ImportError:
  147. ENERGY_BASELINE_AVAILABLE = False
  148. logger.warning("能量基线模块未找到,泵状态检测功能禁用")
  149. class RTSPStreamConfig:
  150. """
  151. RTSP流配置类
  152. 封装单个RTSP流的配置信息,包含拾音器特有字段
  153. """
  154. def __init__(self, plant_name, rtsp_url, channel,
  155. camera_name, device_code, pump_name,
  156. flow_plc, project_id):
  157. """
  158. 初始化RTSP流配置
  159. 参数:
  160. plant_name: 区域名称(泵房-反渗透高压泵等)
  161. rtsp_url: RTSP流URL
  162. channel: 通道号
  163. camera_name: 设备名称
  164. device_code: 设备编号(如1#-1)
  165. pump_name: 泵名称(A/B/C/D),用于匹配流量PLC
  166. flow_plc: 流量PLC地址映射
  167. """
  168. self.plant_name = plant_name
  169. self.rtsp_url = rtsp_url
  170. self.channel = channel
  171. self.pump_id = f"ch{channel}"
  172. self.camera_name = camera_name or f"ch{channel}"
  173. self.device_code = device_code
  174. self.pump_name = pump_name
  175. self.flow_plc = flow_plc or {}
  176. self.project_id = project_id
  177. def get_flow_plc_address(self):
  178. """
  179. 获取该设备对应的进水流量PLC地址
  180. 返回:
  181. PLC地址字符串,不存在则返回空字符串
  182. """
  183. if self.pump_name and self.flow_plc:
  184. return self.flow_plc.get(self.pump_name, "")
  185. return ""
  186. def __repr__(self):
  187. return f"RTSPStreamConfig(plant='{self.plant_name}', camera='{self.camera_name}', pump='{self.pump_id}')"
  188. class FFmpegProcess:
  189. """
  190. FFmpeg进程管理类
  191. 负责启动和管理单个RTSP流的FFmpeg进程。
  192. 进程将RTSP流转换为固定时长的WAV音频文件。
  193. 文件名格式: {project_id}_{device_code}_{时间戳}.wav
  194. """
  195. def __init__(self, stream_config, output_dir, config=None):
  196. """
  197. 初始化FFmpeg进程管理器
  198. 参数:
  199. stream_config: RTSP流配置
  200. output_dir: 音频输出目录
  201. config: 全局配置字典
  202. """
  203. self.config_dict = config or {}
  204. self.stream_config = stream_config
  205. self.output_dir = output_dir
  206. self.process = None
  207. # 从配置中读取文件时长,默认8秒
  208. audio_cfg = self.config_dict.get('audio', {})
  209. self.file_duration = audio_cfg.get('file_duration', 8)
  210. # 获取project_id(从 stream_config 中读取)
  211. self.project_id = stream_config.project_id
  212. def start(self):
  213. """
  214. 启动FFmpeg进程
  215. 返回:
  216. bool: 启动成功返回True,失败返回False
  217. """
  218. # 获取设备编号
  219. device_code = self.stream_config.device_code or self.stream_config.pump_id
  220. # 创建输出目录(每个设备独立目录)
  221. # 结构: data/{device_code}/current/
  222. current_dir = self.output_dir / device_code / "current"
  223. current_dir.mkdir(parents=True, exist_ok=True)
  224. # 存放人工核查确认为正常的异常音频,增训时全量参与训练
  225. verified_dir = self.output_dir / device_code / "verified_normal"
  226. verified_dir.mkdir(parents=True, exist_ok=True)
  227. # 构建输出文件名模板
  228. # 格式: {project_id}_{device_code}_{时间戳}.wav
  229. # 例如: 92_1#-1_20251218142000.wav
  230. output_pattern = str(current_dir / f"{self.project_id}_{device_code}_%Y%m%d%H%M%S.wav")
  231. # 构建FFmpeg命令
  232. # 添加内存限制参数,防止 RTSP 缓冲区无限增长导致 OOM
  233. cmd = [
  234. "ffmpeg",
  235. # RTSP 输入参数(内存限制)
  236. "-rtsp_transport", "tcp", # 使用TCP传输
  237. "-probesize", "1000000", # 限制探测大小为1MB(默认5MB)
  238. "-analyzeduration", "1000000", # 限制分析时长为1秒(默认5秒)
  239. "-max_delay", "500000", # 最大延迟500ms
  240. "-fflags", "nobuffer", # 禁用输入缓冲
  241. "-flags", "low_delay", # 低延迟模式
  242. "-i", self.stream_config.rtsp_url, # 输入RTSP流
  243. # 音频输出参数
  244. "-vn", # 不处理视频
  245. "-ac", "1", # 单声道
  246. "-ar", str(CFG.SR), # 采样率(16000Hz)
  247. "-f", "segment", # 分段模式
  248. "-segment_time", str(int(self.file_duration)), # 每段时长
  249. "-strftime", "1", # 启用时间格式化
  250. "-loglevel", "error", # 只输出错误日志
  251. "-y", # 覆盖已存在的文件
  252. output_pattern,
  253. ]
  254. try:
  255. # 启动FFmpeg进程
  256. self.process = subprocess.Popen(
  257. cmd,
  258. stdout=subprocess.DEVNULL,
  259. stderr=subprocess.DEVNULL
  260. )
  261. logger.info(f"FFmpeg已启动: {device_code} | {self.stream_config.camera_name} | "
  262. f"文件时长: {self.file_duration}秒 | PID={self.process.pid}")
  263. return True
  264. except FileNotFoundError:
  265. logger.error("FFmpeg错误: 未找到ffmpeg命令,请确保已安装FFmpeg")
  266. return False
  267. except Exception as e:
  268. logger.error(f"FFmpeg启动失败: {device_code} | 错误: {e}")
  269. return False
  270. def is_running(self):
  271. """
  272. 检查FFmpeg进程是否在运行
  273. 返回:
  274. bool: 进程运行中返回True,否则返回False
  275. """
  276. if self.process is None:
  277. return False
  278. return self.process.poll() is None
  279. def stop(self):
  280. """
  281. 停止FFmpeg进程
  282. """
  283. if self.process is not None and self.is_running():
  284. logger.info(f"FFmpeg停止: {self.stream_config.plant_name} | {self.stream_config.camera_name} | PID={self.process.pid}")
  285. self.process.terminate()
  286. try:
  287. self.process.wait(timeout=5)
  288. except subprocess.TimeoutExpired:
  289. logger.warning(f"FFmpeg强制终止: PID={self.process.pid}")
  290. self.process.kill()
  291. class PickupMonitor:
  292. """
  293. 拾音器监控线程类
  294. 监控音频目录,调用预测器检测异常,推送告警。
  295. 主要功能:
  296. 1. 不截取视频帧(纯音频)
  297. 2. 计算并上报频谱图数据(this_frequency + normal_frequency_middle)
  298. 3. 每分钟汇总上报一次
  299. 4. 使用SCADA API获取进水流量
  300. """
  301. def __init__(self, audio_dir, multi_predictor,
  302. stream_configs,
  303. check_interval=1.0, config=None,
  304. config_manager=None,
  305. test_mode=False):
  306. """
  307. 初始化监控器
  308. Args:
  309. audio_dir: 音频根目录
  310. multi_predictor: 多模型预测器实例
  311. stream_configs: 所有RTSP流配置
  312. check_interval: 检查间隔(秒)
  313. config: 配置字典
  314. config_manager: ConfigManager 实例(用于热更新,为 None 时禁用热更新)
  315. """
  316. # 音频根目录(各设备目录在其下)
  317. self.audio_dir = audio_dir
  318. self.multi_predictor = multi_predictor
  319. self.predictor = None # 兼容性保留,已废弃
  320. self.stream_configs = stream_configs
  321. self.check_interval = check_interval
  322. self.config = config or {}
  323. # 测试模式:禁用聚合抑制、强制 alert_enabled=True
  324. # 用于水厂现场播放音频测试,每个设备独立报警不被聚合抑制
  325. self.test_mode = test_mode
  326. if test_mode:
  327. logger.warning("测试模式已启用: 聚合抑制=关闭 | alert_enabled=强制开启")
  328. # 热更新:持有 ConfigManager 引用,定期从 DB 刷新配置
  329. self._config_manager = config_manager
  330. self._last_config_reload = 0 # 上次配置刷新时间戳
  331. self._config_reload_interval = 30 # 配置刷新间隔(秒)
  332. # 从 plants 列表中取第一个启用水厂的 project_id
  333. # 兼容旧配置:优先 plants,回退 platform.project_id
  334. self.project_id = 0
  335. for plant in self.config.get('plants', []):
  336. if plant.get('enabled', False):
  337. self.project_id = plant.get('project_id', 0)
  338. break
  339. if self.project_id == 0:
  340. self.project_id = self.config.get('platform', {}).get('project_id', 92)
  341. # 构建device_code到stream_config的映射
  342. self.device_map = {}
  343. for cfg in stream_configs:
  344. if cfg.device_code:
  345. self.device_map[cfg.device_code] = cfg
  346. # 已处理文件集合
  347. self.seen_files = set()
  348. # 每个设备的检测结果缓存(用于1分钟汇总)
  349. # key: device_code(如 "1#-1")
  350. self.device_cache = defaultdict(lambda: {
  351. "errors": [], # 每个文件的平均重建误差列表(用于持续弱信号检测)
  352. "anomaly_ratios": [], # 每个文件的 patch 异常比例列表(用于突发型故障检测)
  353. "last_upload": None, # 上次上报时间
  354. "audio_data": [], # 用于计算频谱图的音频数据
  355. "status": None # 最近的运行状态
  356. })
  357. # 频谱图历史缓存(用于计算normal_frequency_middle)
  358. # key: device_code, value: list of (timestamp, freq_db)
  359. freq_cfg = self.config.get('prediction', {}).get('frequency_history', {})
  360. self.freq_history_enabled = freq_cfg.get('enabled', True)
  361. self.freq_history_minutes = freq_cfg.get('history_minutes', 10)
  362. self.freq_history = defaultdict(list)
  363. # 上一次上报状态(True=异常,False=正常,None=初始)
  364. # 用于状态变更时去重,防止持续报警
  365. self.last_report_status = {}
  366. # 上报周期(秒)
  367. audio_cfg = self.config.get('audio', {})
  368. self.segment_duration = audio_cfg.get('segment_duration', 60)
  369. # 异常音频保存配置
  370. save_cfg = self.config.get('prediction', {}).get('save_anomaly_audio', {})
  371. self.save_anomaly_enabled = save_cfg.get('enabled', True)
  372. self.save_anomaly_dir = Path(__file__).parent / save_cfg.get('save_dir', 'data/anomaly_detected')
  373. if self.save_anomaly_enabled:
  374. self.save_anomaly_dir.mkdir(parents=True, exist_ok=True)
  375. # 异常推送配置
  376. push_cfg = self.config.get('push_notification', {})
  377. self.push_enabled = push_cfg.get('enabled', False)
  378. # 测试模式下强制开启异常告警,不受配置文件中 alert_enabled 影响
  379. if self.test_mode:
  380. self.alert_enabled = True
  381. else:
  382. self.alert_enabled = push_cfg.get('alert_enabled', True)
  383. self.push_timeout = push_cfg.get('timeout', 30)
  384. self.push_retry_count = push_cfg.get('retry_count', 2)
  385. # 推送基地址分组(production=正式, test=测试,按模式路由)
  386. url_groups = push_cfg.get('push_base_urls', {})
  387. # 兼容旧格式(列表)和新格式(字典)
  388. if isinstance(url_groups, dict):
  389. self.push_production_urls = [u.rstrip('/') for u in url_groups.get('production', []) if u]
  390. self.push_test_urls = [u.rstrip('/') for u in url_groups.get('test', []) if u]
  391. else:
  392. # 旧格式兜底:全部作为 production
  393. self.push_production_urls = [item.get('url', '').rstrip('/') for item in url_groups if item.get('url')]
  394. self.push_test_urls = []
  395. # 诊断日志:确认推送URL解析结果
  396. logger.info(f"推送URL解析结果 | production={self.push_production_urls} | test={self.push_test_urls} | url_groups类型={type(url_groups).__name__}")
  397. # 推送失败记录文件路径
  398. failed_log_path = push_cfg.get('failed_push_log', 'data/push_failures.jsonl')
  399. self.failed_push_log = Path(__file__).parent / failed_log_path
  400. self.failed_push_log.parent.mkdir(parents=True, exist_ok=True)
  401. # 如果 alert_enabled 为 False,记录日志提醒
  402. if not self.alert_enabled:
  403. logger.warning("异常告警已禁用(alert_enabled=false),仅上报心跳状态")
  404. # ========================================
  405. # 报警聚合器(替代原有固定 cooldown_minutes)
  406. # ----------------------------------------
  407. # 功能1:跨设备聚合抑制 - 同一水厂多设备同时报警 -> 环境噪声,全部抑制
  408. # 功能2:分类型冷却 - 同类型24h,不同类型1h
  409. # 功能3:调试模式感知 - mode_id=4 时使用5分钟短冷却+跳过聚合窗口
  410. # ========================================
  411. agg_cfg = push_cfg.get('alert_aggregate', {})
  412. self.alert_aggregator = None
  413. if self.test_mode:
  414. # 测试模式:不使用聚合器,每个设备独立报警
  415. logger.info("测试模式: 聚合器已禁用")
  416. elif ALERT_AGGREGATOR_AVAILABLE:
  417. self.alert_aggregator = AlertAggregator(
  418. push_callback=self._push_detection_result,
  419. aggregate_enabled=agg_cfg.get('enabled', True),
  420. window_seconds=agg_cfg.get('window_seconds', 300),
  421. min_devices=agg_cfg.get('min_devices', 2),
  422. cooldown_same_type_hours=push_cfg.get('cooldown_same_type_hours', 24),
  423. cooldown_diff_type_hours=push_cfg.get('cooldown_diff_type_hours', 1),
  424. # 传入模式回调,让聚合器感知 mode_id=4 调试模式
  425. mode_provider=lambda: self._remote_mode_id,
  426. debug_cooldown_minutes=push_cfg.get('debug_cooldown_minutes', 5)
  427. )
  428. else:
  429. logger.warning("报警聚合器模块未找到,使用默认报警逻辑")
  430. # 上次异常音频保存时间(用于保存冷却时间计算)
  431. self.last_anomaly_save_time = {}
  432. # 异常保存冷却时间(分钟),同一设备连续异常时,每N分钟只保存一次
  433. self.anomaly_save_cooldown_minutes = save_cfg.get('cooldown_minutes', 10)
  434. # 当前异常分类结果锁定(持续异常期间保持分类结果不变)
  435. # key: device_code, value: (anomaly_type_code, type_name)
  436. self.locked_anomaly_type = {}
  437. # 滑动窗口投票配置(5次中有3次异常才判定为异常)
  438. voting_cfg = self.config.get('prediction', {}).get('voting', {})
  439. self.voting_enabled = voting_cfg.get('enabled', True)
  440. self.voting_window_size = voting_cfg.get('window_size', 5) # 窗口大小
  441. self.voting_threshold = voting_cfg.get('threshold', 3) # 异常阈值(>=3次则判定异常)
  442. self.detection_history = {} # 每个设备的检测历史(True=异常)
  443. # 阈值容差区间配置(避免边界值反复跳变)
  444. # 误差在 threshold*(1-tolerance) ~ threshold*(1+tolerance) 范围内为灰区,维持上一状态
  445. self.tolerance_ratio = voting_cfg.get('tolerance_ratio', 0.05) # 默认5%容差
  446. self.last_single_anomaly = {} # 每个设备上一次的单周期判定结果
  447. # 阈值现在由 multi_predictor 管理,每个设备从其对应模型目录加载
  448. # 能量检测配置
  449. energy_cfg = self.config.get('prediction', {}).get('energy_detection', {})
  450. self.energy_detection_enabled = ENERGY_BASELINE_AVAILABLE and energy_cfg.get('enabled', True)
  451. self.skip_detection_when_stopped = energy_cfg.get('skip_when_stopped', True)
  452. # 能量基线检测器(每个设备一个)
  453. if self.energy_detection_enabled:
  454. self.energy_baselines = {}
  455. else:
  456. self.energy_baselines = None
  457. # SCADA API配置(用于获取泵状态和进水流量)
  458. scada_cfg = self.config.get('scada_api', {})
  459. self.scada_enabled = scada_cfg.get('enabled', False)
  460. self.scada_url = scada_cfg.get('base_url', '') # 历史数据接口(备用)
  461. self.scada_realtime_url = scada_cfg.get('realtime_url', '') # 实时数据接口(主用)
  462. self.scada_jwt = scada_cfg.get('jwt_token', '')
  463. self.scada_timeout = scada_cfg.get('timeout', 10)
  464. # SCADA 自动登录配置
  465. self._scada_login_url = scada_cfg.get('login_url', '')
  466. self._scada_login_username = scada_cfg.get('login_username', '')
  467. self._scada_login_password = scada_cfg.get('login_password', '')
  468. self._scada_login_type = scada_cfg.get('login_type', 'account')
  469. self._scada_login_dep_id = scada_cfg.get('login_dep_id', '')
  470. self._scada_auth_lock = threading.Lock()
  471. # 泵状态监控器(用于检测启停过渡期)
  472. self.pump_state_monitor = None
  473. self.pump_status_plc_configs = {} # {pump_name: [{point, name}, ...]}
  474. if PUMP_STATE_MONITOR_AVAILABLE and self.scada_enabled:
  475. # 初始化泵状态监控器
  476. # 获取第一个启用水厂的project_id
  477. project_id = 0
  478. for plant in self.config.get('plants', []):
  479. if plant.get('enabled', False):
  480. project_id = plant.get('project_id', 0)
  481. # 加载泵状态点位配置
  482. pump_status_plc = plant.get('pump_status_plc', {})
  483. self.pump_status_plc_configs = pump_status_plc
  484. break
  485. if project_id > 0:
  486. # 使用实时接口 URL 进行泵状态查询
  487. self.pump_state_monitor = PumpStateMonitor(
  488. scada_url=self.scada_realtime_url, # 使用实时数据接口
  489. scada_jwt=self.scada_jwt,
  490. project_id=project_id,
  491. timeout=self.scada_timeout,
  492. transition_window_minutes=15, # 启停后15分钟内视为过渡期
  493. login_url=self._scada_login_url,
  494. login_username=self._scada_login_username,
  495. login_password=self._scada_login_password,
  496. login_type=self._scada_login_type,
  497. login_dep_id=self._scada_login_dep_id,
  498. )
  499. logger.info(f"泵状态监控器已启用 (project_id={project_id}, 过渡期窗口=15分钟, 使用实时接口)")
  500. # 线程控制
  501. self.running = False
  502. self.thread = None
  503. # 推送线程池(避免推送超时阻塞主监控循环)
  504. self._push_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="push")
  505. # 启动时间(用于跳过启动期间的状态变化日志)
  506. self.startup_time = None
  507. self.startup_warmup_seconds = 120 # 启动后120秒内不记录状态变化
  508. # ========================================
  509. # 异常上下文捕获配置
  510. # ========================================
  511. context_cfg = save_cfg.get('context_capture', {})
  512. self.context_pre_minutes = context_cfg.get('pre_minutes', 2)
  513. self.context_post_minutes = context_cfg.get('post_minutes', 2)
  514. # 音频文件历史缓存(用于捕获异常前的音频)
  515. # key: device_code, value: deque of (timestamp, file_path)
  516. from collections import deque
  517. # 计算需要保留的历史文件数量(按分钟计算,每分钟约1个文件)
  518. history_size = self.context_pre_minutes + 2 # 多保留2个作为缓冲
  519. self.audio_file_history = defaultdict(lambda: deque(maxlen=history_size))
  520. # 异常上下文捕获状态
  521. # key: device_code, value: dict
  522. self.anomaly_capture_state = {}
  523. logger.info(f"异常上下文捕获: 前{self.context_pre_minutes}分钟 + 后{self.context_post_minutes}分钟")
  524. # ========================================
  525. # 人体检测报警抑制配置
  526. # ========================================
  527. # 只要任意摄像头在冷却时间内检测到人,所有设备的异常报警都会被抑制
  528. human_cfg = self.config.get('human_detection', {})
  529. self.human_detection_enabled = human_cfg.get('enabled', False) and HUMAN_DETECTION_AVAILABLE
  530. self.human_reader = None
  531. if self.human_detection_enabled:
  532. db_path = human_cfg.get('db_path', '')
  533. cooldown_minutes = human_cfg.get('cooldown_minutes', 5)
  534. self.human_reader = HumanDetectionReader(
  535. db_path=db_path,
  536. cooldown_minutes=cooldown_minutes
  537. )
  538. logger.info(f"人体检测报警抑制已启用 (冷却时间={cooldown_minutes}分钟, 数据库={db_path})")
  539. # ========================================
  540. # 远程异响检测调度配置
  541. # ========================================
  542. # 通过定时 GET 请求远程接口,根据返回的 mode_type、model_list、时间窗口
  543. # 动态控制 alert_enabled 开关
  544. schedule_cfg = self.config.get('detection_schedule', {})
  545. self._detection_schedule_url = schedule_cfg.get('url', '')
  546. self._detection_schedule_interval = schedule_cfg.get('poll_interval', 60)
  547. self._detection_schedule_timeout = schedule_cfg.get('request_timeout', 10)
  548. # 独立计时器,与配置热刷新的 30 秒间隔解耦
  549. self._last_detection_schedule_check = 0
  550. # 远程调度返回的项目模式(1=日常, 2=参观, 3=检修, 4=调试)
  551. # 1/2: 推理 + 上报所有推送目标(外网/内网/测试)
  552. # 3: 检修模式,采集的音频直接丢弃,不保存不推理不上报
  553. # 4: 调试模式,推理 + 仅上报到 label="测试" 的目标
  554. self._remote_mode_id = 1
  555. self._schedule_first_poll = True
  556. if self._detection_schedule_url:
  557. logger.info(f"远程异响调度已启用 | URL={self._detection_schedule_url} | 间隔={self._detection_schedule_interval}秒")
  558. # ========================================
  559. # 音频上传旁路(边云协同 - 训练数据上传)
  560. # ========================================
  561. # cloud_sync.enabled=false 时 AudioUploader 所有操作都是 no-op
  562. # 实际上传由独立进程 run_upload_worker.py 执行,不影响主推理
  563. self.audio_uploader = None
  564. if AUDIO_UPLOADER_AVAILABLE:
  565. self.audio_uploader = AudioUploader(
  566. config=self.config,
  567. project_id=self.project_id,
  568. audio_dir=self.audio_dir
  569. )
  570. # ========================================
  571. # 正常音频多样性采样配置
  572. # ========================================
  573. # 基于重建误差的多样性采样:保留与已留样本误差差异大的文件,丢弃冗余样本
  574. # 只影响正常音频的保存,异常音频走独立的上下文捕获分支不受影响
  575. auto_training_cfg = self.config.get('auto_training', {})
  576. cleanup_cfg = self.config.get('audio', {}).get('auto_cleanup', {})
  577. # 训练模式:self=边缘自训练(保留数据),cloud=云端训练(可选上传后删除)
  578. self._training_mode = auto_training_cfg.get('training_mode', 'self')
  579. # 总开关:增训关闭时正常音频推理后直接删除
  580. self._auto_training_enabled = auto_training_cfg.get('enabled', False)
  581. # 每设备每小时保留配额(总量 = keep_hourly_samples × 24)
  582. self._keep_hourly_samples = cleanup_cfg.get('keep_hourly_samples', 21)
  583. # 历史日期数据保留天数
  584. self._keep_days = cleanup_cfg.get('keep_days', 14)
  585. # 多样性采样:误差最小距离阈值(小于此值视为冗余)
  586. # 运行时会根据当前小时进度自适应调整
  587. self._diversity_base_epsilon = cleanup_cfg.get('diversity_epsilon', 0.001)
  588. # 采样运行时状态(按设备+小时跟踪)
  589. # key: (device_code, hour), value: {"kept_errors": [float], "kept_count": int}
  590. self._sample_state = {}
  591. def start(self):
  592. """
  593. 启动监控线程
  594. """
  595. if self.running:
  596. return
  597. # 启动前清理current目录中的遗留文件
  598. self._cleanup_current_on_startup()
  599. self.running = True
  600. self.startup_time = datetime.now() # 记录启动时间
  601. self.thread = threading.Thread(target=self._monitor_loop, daemon=True)
  602. self.thread.start()
  603. # 打印监控的设备列表
  604. device_codes = list(self.device_map.keys())
  605. logger.info(f"监控线程已启动 | 目录: {self.audio_dir} | 监控设备: {device_codes}")
  606. def _cleanup_current_on_startup(self):
  607. """
  608. 启动时清理current目录中的遗留文件
  609. 删除上次运行遗留的文件,避免混入新采集数据影响检测准确性
  610. """
  611. cleaned_count = 0
  612. for device_code in self.device_map.keys():
  613. current_dir = self.audio_dir / device_code / "current"
  614. if not current_dir.exists():
  615. continue
  616. for wav_file in current_dir.glob("*.wav"):
  617. try:
  618. wav_file.unlink()
  619. cleaned_count += 1
  620. except Exception as e:
  621. logger.warning(f"清理遗留文件失败: {wav_file.name} | {e}")
  622. if cleaned_count > 0:
  623. logger.info(f"启动清理: 已删除current目录中 {cleaned_count} 个遗留文件")
  624. def stop(self):
  625. """
  626. 停止监控线程
  627. """
  628. if not self.running:
  629. return
  630. self.running = False
  631. if self.thread is not None:
  632. self.thread.join(timeout=5)
  633. # 关闭推送线程池,取消未执行的任务,最多等待10秒
  634. self._push_executor.shutdown(wait=False, cancel_futures=True)
  635. logger.info(f"监控线程已停止")
  636. def _reload_hot_config(self):
  637. # 从 DB 热加载可变配置项,无需重启服务
  638. # 只刷新运行时可安全变更的参数,不影响 FFmpeg 进程和流映射
  639. if self._config_manager is None:
  640. return
  641. now = time.time()
  642. # 未达到刷新间隔则跳过
  643. if now - self._last_config_reload < self._config_reload_interval:
  644. return
  645. self._last_config_reload = now
  646. try:
  647. # 从 DB 读取最新配置
  648. fresh = self._config_manager.get_full_config()
  649. self.config = fresh
  650. # 刷新推送配置
  651. push_cfg = fresh.get('push_notification', {})
  652. self.push_enabled = push_cfg.get('enabled', False)
  653. self.alert_enabled = push_cfg.get('alert_enabled', True)
  654. self.push_timeout = push_cfg.get('timeout', 30)
  655. self.push_retry_count = push_cfg.get('retry_count', 2)
  656. url_groups = push_cfg.get('push_base_urls', {})
  657. if isinstance(url_groups, dict):
  658. self.push_production_urls = [u.rstrip('/') for u in url_groups.get('production', []) if u]
  659. self.push_test_urls = [u.rstrip('/') for u in url_groups.get('test', []) if u]
  660. else:
  661. self.push_production_urls = [item.get('url', '').rstrip('/') for item in url_groups if item.get('url')]
  662. self.push_test_urls = []
  663. # 刷新投票配置
  664. voting_cfg = fresh.get('prediction', {}).get('voting', {})
  665. self.voting_enabled = voting_cfg.get('enabled', True)
  666. self.voting_window_size = voting_cfg.get('window_size', 5)
  667. self.voting_threshold = voting_cfg.get('threshold', 3)
  668. self.tolerance_ratio = voting_cfg.get('tolerance_ratio', 0.05)
  669. # 刷新能量检测配置
  670. energy_cfg = fresh.get('prediction', {}).get('energy_detection', {})
  671. self.skip_detection_when_stopped = energy_cfg.get('skip_when_stopped', True)
  672. # 刷新人体检测配置
  673. human_cfg = fresh.get('human_detection', {})
  674. new_human_enabled = human_cfg.get('enabled', False)
  675. if new_human_enabled != self.human_detection_enabled:
  676. logger.info(f"人体检测抑制配置变更: {self.human_detection_enabled} -> {new_human_enabled}")
  677. self.human_detection_enabled = new_human_enabled
  678. # 刷新远程异响调度配置
  679. schedule_cfg = fresh.get('detection_schedule', {})
  680. self._detection_schedule_url = schedule_cfg.get('url', '')
  681. self._detection_schedule_interval = schedule_cfg.get('poll_interval', 60)
  682. self._detection_schedule_timeout = schedule_cfg.get('request_timeout', 10)
  683. logger.debug("配置热刷新完成")
  684. except Exception as e:
  685. logger.error(f"配置热刷新失败: {e}")
  686. def _check_detection_schedule(self):
  687. # 远程项目模式调度检查
  688. # 接口:GET /api/v2/project_mode/current?project_id=xxx
  689. # mode_id 含义(1=日常, 2=参观, 3=检修, 4=调试):
  690. # 1/2: 正常运行 -> 推理 + 上报所有推送目标
  691. # 3: 检修模式 -> 音频采集但不保存、不推理、不上报
  692. # 4: 调试模式 -> 推理 + 仅推送到 label="测试" 的目标
  693. if not self._detection_schedule_url:
  694. return
  695. now_ts = time.time()
  696. if now_ts - self._last_detection_schedule_check < self._detection_schedule_interval:
  697. return
  698. self._last_detection_schedule_check = now_ts
  699. try:
  700. for attempt in range(2):
  701. if not self._ensure_jwt():
  702. logger.warning("远程调度接口: JWT 不可用")
  703. return
  704. resp = requests.get(
  705. self._detection_schedule_url,
  706. params={'project_id': self.project_id},
  707. headers={"JWT-TOKEN": self.scada_jwt},
  708. timeout=self._detection_schedule_timeout
  709. )
  710. # 401/403 时刷新 token 重试一次
  711. if resp.status_code in (401, 403) and self._can_auto_login() and attempt == 0:
  712. self._clear_jwt()
  713. continue
  714. break
  715. resp.raise_for_status()
  716. result = resp.json()
  717. if result.get('code') != 200:
  718. logger.warning(f"远程调度接口返回异常: {result}")
  719. return
  720. # data 是数组,按 project_id 匹配当前项目
  721. entries = result.get('data', [])
  722. entry = None
  723. for item in entries:
  724. if item.get('project_id') == self.project_id:
  725. entry = item
  726. break
  727. if entry is None:
  728. logger.debug(f"远程调度未返回 project_id={self.project_id} 的记录,保持当前状态")
  729. return
  730. new_mode_id = int(entry.get('mode_id', 1))
  731. # mode_id=3 检修模式额外检查 scheduled_end_time 是否已过期
  732. # 过期则自动回退到日常模式,防止平台侧忘记切回
  733. if new_mode_id == 3:
  734. end_str = entry.get('scheduled_end_time', '')
  735. if end_str and not end_str.startswith('0001'):
  736. end_dt = self._parse_schedule_time(end_str)
  737. if end_dt is not None and datetime.now() > end_dt:
  738. new_mode_id = 1
  739. mode_names = {1: '日常', 2: '参观', 3: '检修', 4: '调试'}
  740. if new_mode_id != self._remote_mode_id:
  741. old_mode = self._remote_mode_id
  742. logger.info(
  743. f"项目模式变更: {mode_names.get(old_mode, '未知')} -> "
  744. f"{mode_names.get(new_mode_id, '未知')}(mode_id={new_mode_id})"
  745. )
  746. self._remote_mode_id = new_mode_id
  747. # 模式切换时重置所有报警相关状态
  748. # 防止上一模式的积累状态污染新模式的判定逻辑
  749. self._reset_alert_state_on_mode_change(old_mode, new_mode_id)
  750. else:
  751. logger.info(f"远程调度轮询 | 当前模式: {mode_names.get(new_mode_id, '未知')}(mode_id={new_mode_id})")
  752. except Exception as e:
  753. logger.warning(f"远程异响调度请求失败,保持当前状态: {e}")
  754. @staticmethod
  755. def _parse_schedule_time(time_str):
  756. # 尝试多种格式解析时间字符串(含 ISO 8601 带时区后缀),全部失败返回 None
  757. # 先去掉尾部 'Z' 或 '+08:00' 等时区标记,统一按本地时间解析
  758. clean = time_str.strip()
  759. if clean.endswith('Z'):
  760. clean = clean[:-1]
  761. # 去掉类似 +08:00 的时区偏移
  762. if '+' in clean and clean.index('+') > 10:
  763. clean = clean[:clean.rindex('+')]
  764. for fmt in ('%Y-%m-%dT%H:%M:%S', '%Y-%m-%dT%H:%M',
  765. '%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M'):
  766. try:
  767. return datetime.strptime(clean, fmt)
  768. except ValueError:
  769. continue
  770. return None
  771. def _reset_alert_state_on_mode_change(self, old_mode_id, new_mode_id):
  772. # 模式切换时重置所有报警相关状态
  773. # 投票窗口、异常状态、冷却记录、聚合窗口全部清空
  774. # 确保切换后从零开始判定,不受上一模式残留数据影响
  775. mode_names = {1: '日常', 2: '参观', 3: '检修', 4: '调试'}
  776. logger.info(
  777. f"重置报警状态: {mode_names.get(old_mode_id, '?')} -> "
  778. f"{mode_names.get(new_mode_id, '?')}"
  779. )
  780. # 清空投票窗口历史
  781. self.detection_history.clear()
  782. # 清空上次上报状态(重新触发正常->异常的判定)
  783. self.last_report_status.clear()
  784. # 清空锁定的异常分类类型
  785. self.locked_anomaly_type.clear()
  786. # 清空每个设备的累积检测结果(errors, anomaly_ratios)
  787. for device_code in list(self.device_cache.keys()):
  788. cache = self.device_cache[device_code]
  789. cache["errors"].clear()
  790. cache["anomaly_ratios"].clear()
  791. # 清空聚合器的冷却记录和聚合窗口
  792. if self.alert_aggregator:
  793. self.alert_aggregator.cooldown_records.clear()
  794. self.alert_aggregator.pending_windows.clear()
  795. logger.info("聚合器冷却记录和聚合窗口已清空")
  796. logger.info("报警状态重置完成")
  797. def _monitor_loop(self):
  798. """
  799. 监控循环(线程主函数)
  800. 持续检查各设备的音频目录,处理新生成的WAV文件。
  801. 每分钟汇总一次结果进行上报。
  802. """
  803. # 确保根目录存在
  804. self.audio_dir.mkdir(parents=True, exist_ok=True)
  805. while self.running:
  806. try:
  807. # 热更新:定期从 DB 刷新可变配置
  808. self._reload_hot_config()
  809. # 远程异响调度:每60秒检查一次远程接口,控制 _skip_inference
  810. self._check_detection_schedule()
  811. # 扫描所有设备目录下的current文件夹
  812. for device_code in self.device_map.keys():
  813. device_current_dir = self.audio_dir / device_code / "current"
  814. # 目录不存在则跳过
  815. if not device_current_dir.exists():
  816. continue
  817. for wav_file in device_current_dir.glob("*.wav"):
  818. # 跳过已处理的文件
  819. if wav_file in self.seen_files:
  820. continue
  821. # 检查文件是否完整
  822. try:
  823. stat_info = wav_file.stat()
  824. file_age = time.time() - stat_info.st_mtime
  825. file_size = stat_info.st_size
  826. # 文件还在写入中,下次再检查(不加入 seen_files)
  827. if file_age < 12.0:
  828. continue
  829. # 文件大小异常处理(60秒 × 16000Hz × 2字节 ≈ 1.9MB)
  830. if file_size < 500_000:
  831. if file_age > 20.0:
  832. # 确认是损坏文件,删除并标记
  833. try:
  834. wav_file.unlink()
  835. logger.debug(f"删除异常小文件: {wav_file.name} ({file_size / 1000:.1f}KB)")
  836. except Exception as e:
  837. logger.error(f"删除文件失败: {wav_file.name} | {e}")
  838. self.seen_files.add(wav_file)
  839. continue
  840. if file_size > 3_000_000:
  841. if file_age > 20.0:
  842. # 文件过大,直接归档到日期目录(不丢弃,不加 seen)
  843. logger.warning(f"文件过大,直接归档: {wav_file.name} ({file_size / 1000:.1f}KB)")
  844. self._move_audio_to_date_dir(wav_file)
  845. self.seen_files.add(wav_file)
  846. continue
  847. except Exception as e:
  848. logger.error(f"文件状态检查失败: {wav_file.name} | {e}")
  849. continue
  850. # 处理新文件
  851. self._process_new_file(wav_file)
  852. # 标记为已处理
  853. self.seen_files.add(wav_file)
  854. # 检查是否需要进行周期性上报
  855. self._check_periodic_upload()
  856. # 检查聚合窗口是否到期,到期则执行聚合判定并推送
  857. if self.alert_aggregator:
  858. self.alert_aggregator.check_and_flush()
  859. # 定期清理 current 目录中的滞留文件(防僵尸兜底)
  860. # 正常流程中文件应在 1 分钟内被处理并移走
  861. # 超过 5 分钟仍留在 current 的文件视为异常滞留
  862. self._cleanup_stale_current_files()
  863. # 清理过大的已处理文件集合(直接丢弃旧的,避免 stat 系统调用)
  864. if len(self.seen_files) > 10000:
  865. # 按添加顺序保留最近的一半(seen_files 是无序的,直接截断)
  866. self.seen_files = set(list(self.seen_files)[-5000:])
  867. except Exception as e:
  868. logger.error(f"监控循环错误: {e}")
  869. # 防御性清空:避免异常导致 audio_data 持续累积内存
  870. for dc in list(self.device_cache.keys()):
  871. self.device_cache[dc]["audio_data"] = []
  872. self.device_cache[dc]["pending_files"] = []
  873. # 等待下一次检查
  874. time.sleep(self.check_interval)
  875. def _process_new_file(self, wav_file):
  876. """
  877. 处理新的音频文件
  878. 文件名格式: {project_id}_{device_code}_{时间戳}.wav
  879. 例如: 92_1#-1_20251218142000.wav
  880. 流程:
  881. 1. 加载音频
  882. 2. 计算能量判断设备状态
  883. 3. 进行AE异常检测,记录重建误差
  884. 4. 保存音频数据用于计算频谱图
  885. """
  886. try:
  887. # 从文件名解析device_code
  888. # 格式: {project_id}_{device_code}_{时间戳}.wav
  889. try:
  890. parts = wav_file.stem.split('_')
  891. if len(parts) >= 3:
  892. device_code = parts[1] # 第二部分是device_code
  893. else:
  894. device_code = "unknown"
  895. except:
  896. device_code = "unknown"
  897. # ========================================
  898. # 泵启停状态检测(基于 PLC 查询)- 优先于模型检查
  899. # ----------------------------------------
  900. # 逻辑:通过 SCADA API 查询 PLC 点位判断泵是否运行
  901. # 作用:
  902. # 1. 冷启动模式下也能过滤停机时的音频(保证训练数据质量)
  903. # 2. 泵停机时跳过异常检测(避免无意义的检测)
  904. # 3. 记录设备状态用于后续上报
  905. # 依赖:pump_state_monitor + rtsp_config 中的 pump_status_plc 配置
  906. # ========================================
  907. device_status = "未知"
  908. pump_is_running = True # 默认认为运行中(PLC 查询失败时的保守策略)
  909. # 获取该设备对应的流配置
  910. stream_config = self.device_map.get(device_code)
  911. if self.pump_state_monitor and stream_config:
  912. # 根据设备的 pump_name 找到关联的 PLC 点位配置
  913. pump_name = stream_config.pump_name
  914. pump_configs = self.pump_status_plc_configs.get(pump_name, [])
  915. if pump_configs:
  916. # 遍历所有关联泵,只要有一个运行就认为设备在工作
  917. any_pump_running = False
  918. for pump_cfg in pump_configs:
  919. point = pump_cfg.get("point", "")
  920. name = pump_cfg.get("name", point)
  921. pump_id = point
  922. # 查询泵状态(带 60 秒缓存,不会频繁请求 SCADA)
  923. is_running, _ = self.pump_state_monitor.update_pump_state(pump_id, point, name)
  924. if is_running:
  925. any_pump_running = True
  926. pump_is_running = any_pump_running
  927. device_status = "开机" if pump_is_running else "停机"
  928. # 泵全部停机时跳过(可通过配置禁用此行为)
  929. # 冷启动和正常模式都适用,确保训练数据质量
  930. if self.skip_detection_when_stopped and not pump_is_running:
  931. logger.info(f"泵停机(PLC): {device_code} | 归档到过渡期目录(不用于训练)")
  932. self._move_audio_to_transition_dir(wav_file, "stopped")
  933. return
  934. # ========================================
  935. # 过渡期检测(泵启停后15分钟内)
  936. # ----------------------------------------
  937. # 目的:过滤启停过程中的不稳定音频
  938. # 确保训练数据只包含稳定运行期的音频
  939. # ========================================
  940. if self.skip_detection_when_stopped:
  941. pump_in_transition, transition_pump_names = \
  942. self.pump_state_monitor.check_pumps_transition(pump_configs)
  943. if pump_in_transition:
  944. logger.info(f"泵过渡期: {device_code} | 归档到过渡期目录(不用于训练) | "
  945. f"过渡期泵: {', '.join(transition_pump_names)}")
  946. self._move_audio_to_transition_dir(wav_file, "transition")
  947. return
  948. # 检修模式(mode_id=3):丢弃音频,不保存不推理
  949. if self._remote_mode_id == 3:
  950. try:
  951. wav_file.unlink()
  952. except Exception:
  953. pass
  954. return
  955. # 获取该设备的预测器(懒加载模型)
  956. device_predictor = self.multi_predictor.get_predictor(device_code)
  957. if device_predictor is None:
  958. logger.info(f"冷启动模式(设备 {device_code} 无模型): 归档 {wav_file.name}")
  959. self._move_audio_to_date_dir(wav_file)
  960. return
  961. # 加载音频
  962. try:
  963. y, sr = librosa.load(str(wav_file), sr=CFG.SR, mono=True)
  964. except Exception as e:
  965. logger.error(f"音频加载失败: {wav_file.name} | {e}")
  966. return
  967. # 记录状态到缓存(用于周期上报)
  968. # 注意:泵状态检测已在前面完成,这里只记录状态
  969. self.device_cache[device_code]["status"] = device_status
  970. # ========================================
  971. # 计算重建误差(双轨:平均MSE + patch异常比例)
  972. # ========================================
  973. threshold = self._get_threshold(device_code)
  974. error, anomaly_ratio = self._compute_reconstruction_error(
  975. wav_file, device_predictor, threshold=threshold
  976. )
  977. if error is not None:
  978. self.device_cache[device_code]["errors"].append(error)
  979. if anomaly_ratio is not None:
  980. self.device_cache[device_code]["anomaly_ratios"].append(anomaly_ratio)
  981. # ========================================
  982. # 保存音频数据用于计算频谱图
  983. # ========================================
  984. self.device_cache[device_code]["audio_data"].append(y)
  985. # ========================================
  986. # 暂存文件路径,等待周期聚合判定后再归档
  987. # ========================================
  988. if "pending_files" not in self.device_cache[device_code]:
  989. self.device_cache[device_code]["pending_files"] = []
  990. self.device_cache[device_code]["pending_files"].append(wav_file)
  991. # ========================================
  992. # 记录到音频历史缓存(用于异常上下文捕获)
  993. # ========================================
  994. self.audio_file_history[device_code].append((datetime.now(), wav_file))
  995. # 初始化上次上报时间
  996. if self.device_cache[device_code]["last_upload"] is None:
  997. self.device_cache[device_code]["last_upload"] = datetime.now()
  998. # 获取阈值并判断结果
  999. # 阈值已在上方双轨计算时获取,此处复用(避免重复调用)
  1000. # ========================================
  1001. # 快速通道:连续多个文件误差极高时快速预警(暂时关闭)
  1002. # 不走投票窗口,用于捕获突发性严重故障
  1003. # ========================================
  1004. # if error is not None and threshold is not None and threshold > 0:
  1005. # # 维护快速通道缓冲区
  1006. # if "fast_alert_buffer" not in self.device_cache[device_code]:
  1007. # self.device_cache[device_code]["fast_alert_buffer"] = []
  1008. #
  1009. # fast_buf = self.device_cache[device_code]["fast_alert_buffer"]
  1010. # # 连续性检查:误差超过 2x 阈值时记录,否则清空缓冲区
  1011. # if error > threshold * 2.0:
  1012. # fast_buf.append(error)
  1013. # else:
  1014. # fast_buf.clear()
  1015. #
  1016. # # 连续 3 个文件(~24秒)都超过 2x 阈值 → 触发快速预警
  1017. # FAST_ALERT_CONSECUTIVE = 3
  1018. # if len(fast_buf) >= FAST_ALERT_CONSECUTIVE:
  1019. # # 检查是否已触发过(避免重复告警)
  1020. # last_fast = self.device_cache[device_code].get("last_fast_alert_time")
  1021. # now = datetime.now()
  1022. # can_fast_alert = (last_fast is None or
  1023. # (now - last_fast).total_seconds() > 300) # 5分钟冷却
  1024. #
  1025. # if can_fast_alert:
  1026. # # 快速通道同样受抑制逻辑约束
  1027. # suppress = False
  1028. # # 泵过渡期抑制
  1029. # if self.pump_state_monitor and stream_config:
  1030. # pump_name = stream_config.pump_name
  1031. # pump_configs = self.pump_status_plc_configs.get(pump_name, [])
  1032. # if pump_configs:
  1033. # in_transition, _ = self.pump_state_monitor.check_pumps_transition(pump_configs)
  1034. # if in_transition:
  1035. # suppress = True
  1036. # # 人体检测抑制
  1037. # if self.human_detection_enabled and self.human_reader:
  1038. # if self.human_reader.is_in_cooldown():
  1039. # suppress = True
  1040. # # alert_enabled 开关
  1041. # if not self.alert_enabled:
  1042. # suppress = True
  1043. #
  1044. # if not suppress:
  1045. # avg_fast = float(np.mean(fast_buf))
  1046. # logger.warning(
  1047. # f"[!!] 快速通道触发: {device_code} | "
  1048. # f"连续{len(fast_buf)}个文件异常 | "
  1049. # f"平均误差={avg_fast:.6f} 阈值={threshold:.6f}")
  1050. # self.device_cache[device_code]["last_fast_alert_time"] = now
  1051. # fast_buf.clear()
  1052. # # 标记快速预警,在下次周期上报时一并处理
  1053. # self.device_cache[device_code]["fast_alert_pending"] = True
  1054. if error is not None and threshold is not None:
  1055. is_anomaly = error > threshold
  1056. ratio_tag = f" patch比例={anomaly_ratio:.2f}" if anomaly_ratio is not None else ""
  1057. result_tag = "!!" if is_anomaly else "OK"
  1058. logger.info(f"[{result_tag}] {device_code} | {wav_file.name} | "
  1059. f"误差={error:.6f} 阈值={threshold:.6f}{ratio_tag}")
  1060. elif error is not None:
  1061. logger.debug(f"文件预测: {wav_file.name} | 误差={error:.6f} | 阀值=未设置")
  1062. else:
  1063. logger.warning(f"预测跳过: {wav_file.name} | 误差计算失败")
  1064. except Exception as e:
  1065. logger.error(f"处理文件失败: {wav_file.name} | 错误: {e}")
  1066. # 处理失败的文件归档到日期目录,防止留在 current 变成僵尸文件
  1067. # 文件在 current 中会被 seen_files 标记为已处理,但物理上不会被移走
  1068. # 如果不主动移出,会导致 current 目录持续膨胀
  1069. try:
  1070. if wav_file.exists():
  1071. self._move_audio_to_date_dir(wav_file)
  1072. logger.info(f"异常文件已归档: {wav_file.name}")
  1073. except Exception as move_err:
  1074. logger.error(f"异常文件归档失败: {wav_file.name} | {move_err}")
  1075. def _compute_reconstruction_error(self, wav_file, device_predictor, threshold=None):
  1076. """
  1077. 计算单个音频文件的重建误差(双轨判定)
  1078. 使用8秒窗口、4秒步长切割音频,提取多个patches分别计算误差。
  1079. 同时返回:
  1080. 1. 平均 MSE(用于检测持续弱信号型故障,如轴承缓慢磨损)
  1081. 2. patch 异常比例(用于检测突发型故障,如轴承刮擦、水锤冲击)
  1082. 参数:
  1083. wav_file: 音频文件路径
  1084. device_predictor: 设备预测器实例
  1085. threshold: 设备阈值,用于计算 patch 级异常比例(为 None 时不计算比例)
  1086. 返回:
  1087. (mean_mse, anomaly_ratio) 元组,失败返回 (None, None)
  1088. """
  1089. try:
  1090. import torch
  1091. from predictor.utils import align_to_target
  1092. # Min-Max 标准化参数
  1093. global_min = device_predictor.global_min
  1094. global_max = device_predictor.global_max
  1095. # 加载音频
  1096. y, _ = librosa.load(str(wav_file), sr=CFG.SR, mono=True)
  1097. win_samples = int(CFG.WIN_SEC * CFG.SR)
  1098. hop_samples = int(CFG.HOP_SEC * CFG.SR)
  1099. if len(y) < win_samples:
  1100. logger.warning(f"音频太短,无法提取patches: {wav_file.name}")
  1101. return None, None
  1102. patches = []
  1103. for start in range(0, len(y) - win_samples + 1, hop_samples):
  1104. window = y[start:start + win_samples]
  1105. S = librosa.feature.melspectrogram(
  1106. y=window, sr=CFG.SR, n_fft=CFG.N_FFT,
  1107. hop_length=CFG.HOP_LENGTH, n_mels=CFG.N_MELS, power=2.0
  1108. )
  1109. S_db = librosa.power_to_db(S, ref=np.max)
  1110. if S_db.shape[1] < CFG.TARGET_FRAMES:
  1111. S_db = np.pad(S_db, ((0, 0), (0, CFG.TARGET_FRAMES - S_db.shape[1])))
  1112. else:
  1113. S_db = S_db[:, :CFG.TARGET_FRAMES]
  1114. # Min-Max 标准化
  1115. S_norm = (S_db - global_min) / (global_max - global_min + 1e-6)
  1116. patches.append(S_norm.astype(np.float32))
  1117. if not patches:
  1118. logger.warning(f"未能提取任何patches: {wav_file.name}")
  1119. return None, None
  1120. arr = np.stack(patches, 0)
  1121. arr = np.expand_dims(arr, 1)
  1122. tensor = torch.from_numpy(arr)
  1123. torch_device = device_predictor.torch_device
  1124. tensor = tensor.to(torch_device)
  1125. with torch.no_grad():
  1126. recon = device_predictor.model(tensor)
  1127. recon = align_to_target(recon, tensor)
  1128. mse_per_patch = torch.mean((recon - tensor) ** 2, dim=[1, 2, 3])
  1129. mean_mse = torch.mean(mse_per_patch).item()
  1130. # 计算 patch 级异常比例(突发型故障检测)
  1131. # 单个 patch 的 MSE 超过阈值即视为该 patch 异常
  1132. anomaly_ratio = 0.0
  1133. if threshold and threshold > 0:
  1134. patch_mses = mse_per_patch.cpu().numpy()
  1135. anomaly_patch_count = int(np.sum(patch_mses > threshold))
  1136. anomaly_ratio = anomaly_patch_count / len(patch_mses)
  1137. logger.debug(
  1138. f"patch级分析: {wav_file.name} | "
  1139. f"异常patch={anomaly_patch_count}/{len(patch_mses)} "
  1140. f"比例={anomaly_ratio:.2f}"
  1141. )
  1142. logger.debug(f"重建误差: {wav_file.name} | patches={len(patches)} | MSE={mean_mse:.6f} | ratio={anomaly_ratio:.2f}")
  1143. return mean_mse, anomaly_ratio
  1144. except Exception as e:
  1145. logger.error(f"计算重建误差失败: {wav_file.name} | {e}")
  1146. return None, None
  1147. def _check_periodic_upload(self):
  1148. """
  1149. 检查是否需要进行周期性上报
  1150. 每分钟汇总一次各设备的检测结果并上报
  1151. """
  1152. # 检修模式(mode_id=3)跳过上报
  1153. if self._remote_mode_id == 3:
  1154. return
  1155. now = datetime.now()
  1156. for device_code, cache in self.device_cache.items():
  1157. # 检查上报时间间隔
  1158. last_upload = cache.get("last_upload")
  1159. if last_upload is None:
  1160. continue
  1161. elapsed = (now - last_upload).total_seconds()
  1162. # 达到上报周期
  1163. if elapsed >= self.segment_duration:
  1164. # 获取该设备的流配置
  1165. stream_config = self.device_map.get(device_code)
  1166. # 计算平均重建误差
  1167. errors = cache.get("errors", [])
  1168. avg_error = float(np.mean(errors)) if errors else 0.0
  1169. # 获取阈值
  1170. threshold = self._get_threshold(device_code)
  1171. # ========================================
  1172. # 双轨判定:平均MSE(持续弱信号) OR patch异常比例(突发型故障)
  1173. # ========================================
  1174. # 轨道1:平均 MSE 判定(带容差区间,检测持续性退化型故障)
  1175. # 轨道2:patch 异常比例判定(检测瞬时突发型故障)
  1176. # 任一轨道触发即判定当前周期异常
  1177. anomaly_ratios = cache.get("anomaly_ratios", [])
  1178. avg_anomaly_ratio = float(np.mean(anomaly_ratios)) if anomaly_ratios else 0.0
  1179. if threshold:
  1180. # 轨道1:平均 MSE 容差区间判定
  1181. upper_bound = threshold * (1 + self.tolerance_ratio)
  1182. lower_bound = threshold * (1 - self.tolerance_ratio)
  1183. if avg_error > upper_bound:
  1184. # 超过上边界 -> 确定异常
  1185. mse_anomaly = True
  1186. elif avg_error < lower_bound:
  1187. # 低于下边界 -> 确定正常
  1188. mse_anomaly = False
  1189. else:
  1190. # 灰区 -> 与阈值比较
  1191. mse_anomaly = avg_error > threshold
  1192. # 轨道2:patch 异常比例判定
  1193. # ANOMALY_RATIO_THRESHOLD 定义在 predictor/config.py(默认 0.1 = 10% 的 patch 超阈值)
  1194. ratio_anomaly = avg_anomaly_ratio >= CFG.ANOMALY_RATIO_THRESHOLD
  1195. # OR 逻辑:任一轨道触发即判定异常
  1196. is_current_anomaly = mse_anomaly or ratio_anomaly
  1197. # 判定来源日志(仅异常时记录来源,便于调试)
  1198. if is_current_anomaly:
  1199. sources = []
  1200. if mse_anomaly:
  1201. sources.append(f"平均MSE={avg_error:.6f}")
  1202. if ratio_anomaly:
  1203. sources.append(f"patch比例={avg_anomaly_ratio:.2f}")
  1204. logger.debug(f"双轨判定异常: {device_code} | 触发源: {', '.join(sources)}")
  1205. # 记录本次判定结果
  1206. self.last_single_anomaly[device_code] = is_current_anomaly
  1207. else:
  1208. is_current_anomaly = False
  1209. # ========================================
  1210. # 滑动窗口投票:5次中有3次异常才判定为异常
  1211. # ========================================
  1212. if device_code not in self.detection_history:
  1213. self.detection_history[device_code] = []
  1214. # 按照用户要求:如果本次是正常判定,直接清空历史滑动窗口,强制要求连续异常
  1215. if not is_current_anomaly:
  1216. self.detection_history[device_code].clear()
  1217. # 记录本次检测结果
  1218. self.detection_history[device_code].append(is_current_anomaly)
  1219. # 保持窗口大小
  1220. if len(self.detection_history[device_code]) > self.voting_window_size:
  1221. self.detection_history[device_code].pop(0)
  1222. # 投票判定最终异常状态
  1223. if self.voting_enabled:
  1224. anomaly_count = sum(self.detection_history[device_code])
  1225. is_anomaly = anomaly_count >= self.voting_threshold
  1226. if len(self.detection_history[device_code]) >= self.voting_window_size:
  1227. window_info = f"窗口[{anomaly_count}/{self.voting_window_size}]"
  1228. else:
  1229. window_info = f"窗口未满[{anomaly_count}/{self.voting_window_size}]"
  1230. else:
  1231. is_anomaly = is_current_anomaly
  1232. window_info = "投票未开启"
  1233. # ========================================
  1234. # 状态变更检测
  1235. # trigger_alert: 仅在 正常(或初始) -> 异常 时为True
  1236. # 冷却逻辑已移至 AlertAggregator 内部处理
  1237. # ========================================
  1238. last_is_anomaly = self.last_report_status.get(device_code)
  1239. trigger_alert = False
  1240. if is_anomaly:
  1241. # 只有状态变化(正常->异常) 才触发报警流程
  1242. # 泵过渡期检查已在 _process_new_file 中完成,过渡期内的文件不会进入 pending
  1243. if last_is_anomaly is None or not last_is_anomaly:
  1244. if self.alert_enabled:
  1245. # 调试模式裕量检查:误差 barely 超阈值时不触发
  1246. # 防止边界抖动导致状态反复跳变,每隔几分钟刷一次报警
  1247. # 正常模式不加此限制(投票+容差已足够)
  1248. if self._remote_mode_id == 4 and threshold:
  1249. margin_ratio = self.config.get('push_notification', {}).get('debug_alert_margin', 0.05)
  1250. if avg_error < threshold * (1 + margin_ratio):
  1251. trigger_alert = False
  1252. logger.info(
  1253. f"[调试模式] 裕量不足,跳过报警: {device_code} | "
  1254. f"误差={avg_error:.6f} 阈值={threshold:.6f} "
  1255. f"(需超过 {threshold * (1 + margin_ratio):.6f})"
  1256. )
  1257. else:
  1258. trigger_alert = True
  1259. # 人体检测抑制:任意摄像头检测到人则不报警
  1260. elif self.human_detection_enabled and self.human_reader:
  1261. if self.human_reader.is_in_cooldown():
  1262. trigger_alert = False
  1263. status_info = self.human_reader.get_status_info()
  1264. logger.info(f"人体检测抑制: {device_code} | {status_info},跳过报警")
  1265. else:
  1266. trigger_alert = True
  1267. else:
  1268. trigger_alert = True
  1269. else:
  1270. trigger_alert = False
  1271. logger.debug(f"异常告警已禁用,跳过告警: {device_code}")
  1272. # 获取运行状态
  1273. running_status = cache.get("status", "未知")
  1274. # 获取进水流量
  1275. inlet_flow = self._get_inlet_flow(stream_config) if stream_config else 0.0
  1276. # 计算本次频谱图
  1277. audio_data = cache.get("audio_data", [])
  1278. freq_db = self._compute_frequency_spectrum(audio_data)
  1279. # 保存到频谱图历史(只保存dB值)
  1280. if self.freq_history_enabled and freq_db:
  1281. self.freq_history[device_code].append((now, freq_db))
  1282. # 清理过期历史
  1283. cutoff = now - timedelta(minutes=self.freq_history_minutes)
  1284. self.freq_history[device_code] = [
  1285. (t, d) for t, d in self.freq_history[device_code]
  1286. if t > cutoff
  1287. ]
  1288. # 计算历史频谱图平均值(normal_frequency_middle)
  1289. freq_middle_db = self._compute_frequency_middle(device_code)
  1290. # ========================================
  1291. # 异常分类:只在状态从正常变为异常时进行分类
  1292. # 持续异常期间沿用上次分类结果,保持一致性
  1293. # ========================================
  1294. anomaly_type_code = 6 # 默认:未分类异常
  1295. type_name = "未分类异常"
  1296. if is_anomaly and audio_data:
  1297. # 检查是否是新的异常(从正常变为异常)
  1298. is_new_anomaly = (last_is_anomaly is None or not last_is_anomaly)
  1299. if is_new_anomaly:
  1300. # 新异常:使用 ML 分类器进行故障类型判定并锁定结果
  1301. try:
  1302. from core.ml_classifier import classify_fault
  1303. if len(audio_data) > 0:
  1304. y = audio_data[-1] if isinstance(audio_data[-1], np.ndarray) else np.array(audio_data[-1])
  1305. # ML 分类器返回 4 元组:(编码, 名称, 置信度, 细分详情)
  1306. anomaly_type_code, type_name, confidence, detail = classify_fault(y, sr=16000)
  1307. # 锁定分类结果
  1308. self.locked_anomaly_type[device_code] = (anomaly_type_code, type_name)
  1309. # 细分类型用于日志追踪,便于后续模型迭代分析
  1310. fine_info = detail.get('fine_name', '') if detail else ''
  1311. logger.info(f"异常分类(ML): {type_name} (code={anomaly_type_code}, "
  1312. f"细分={fine_info}, 置信度={confidence:.2f})")
  1313. except Exception as e:
  1314. logger.warning(f"异常分类失败: {e}")
  1315. else:
  1316. # 持续异常:沿用锁定的分类结果
  1317. if device_code in self.locked_anomaly_type:
  1318. anomaly_type_code, type_name = self.locked_anomaly_type[device_code]
  1319. logger.debug(f"异常分类(沿用): {type_name} (code={anomaly_type_code})")
  1320. else:
  1321. # 状态正常时清除锁定的分类结果
  1322. if device_code in self.locked_anomaly_type:
  1323. del self.locked_anomaly_type[device_code]
  1324. # 上报逻辑
  1325. if self.push_enabled and stream_config:
  1326. # 预读异常音频base64:在文件被归档/清空之前立即读取
  1327. # 解决竞态问题:异步推送或聚合器延迟推送时文件可能已被移走
  1328. pre_read_wav_b64 = ""
  1329. if trigger_alert:
  1330. try:
  1331. current_pending = cache.get("pending_files", [])
  1332. if current_pending and current_pending[0].exists():
  1333. with open(current_pending[0], "rb") as f:
  1334. pre_read_wav_b64 = base64.b64encode(f.read()).decode('utf-8')
  1335. logger.debug(f"预读异常音频成功: {current_pending[0].name} | size={len(pre_read_wav_b64)}")
  1336. except Exception as e:
  1337. logger.warning(f"预读异常音频失败: {e}")
  1338. if trigger_alert and self.alert_aggregator and not self.test_mode:
  1339. # 报警走聚合器:跨设备聚合判定 + 分类型冷却
  1340. # 聚合器会在窗口到期后决定是否真正推送
  1341. # 测试模式下跳过聚合器,直接推送
  1342. self.alert_aggregator.submit_alert(
  1343. plant_name=stream_config.plant_name,
  1344. device_code=device_code,
  1345. anomaly_type_code=anomaly_type_code,
  1346. push_kwargs=dict(
  1347. stream_config=stream_config,
  1348. device_code=device_code,
  1349. is_anomaly=is_anomaly,
  1350. trigger_alert=True,
  1351. abnormal_score=avg_error,
  1352. score_threshold=threshold,
  1353. running_status=running_status,
  1354. inlet_flow=inlet_flow,
  1355. freq_db=freq_db,
  1356. freq_middle_db=freq_middle_db,
  1357. anomaly_type_code=anomaly_type_code,
  1358. abnormal_wav_b64=pre_read_wav_b64
  1359. )
  1360. )
  1361. else:
  1362. # 非报警(心跳)或聚合器不可用或测试模式 -> 提交到线程池异步推送
  1363. self._push_executor.submit(
  1364. self._push_detection_result,
  1365. stream_config=stream_config,
  1366. device_code=device_code,
  1367. is_anomaly=is_anomaly,
  1368. trigger_alert=trigger_alert,
  1369. abnormal_score=avg_error,
  1370. score_threshold=threshold,
  1371. running_status=running_status,
  1372. inlet_flow=inlet_flow,
  1373. freq_db=freq_db,
  1374. freq_middle_db=freq_middle_db,
  1375. anomaly_type_code=anomaly_type_code,
  1376. abnormal_wav_b64=pre_read_wav_b64
  1377. )
  1378. # 更新上一次状态
  1379. self.last_report_status[device_code] = is_anomaly
  1380. # 日志记录
  1381. thr_str = f"{threshold:.6f}" if threshold else "未设置"
  1382. # 报警去向说明
  1383. if trigger_alert and self.alert_aggregator:
  1384. alert_dest = "-> 聚合器"
  1385. elif trigger_alert:
  1386. alert_dest = "-> 直接推送"
  1387. else:
  1388. alert_dest = ""
  1389. # 使用设备名作为标识,增加视觉分隔
  1390. cam_label = ""
  1391. if stream_config:
  1392. cam_label = f"({stream_config.camera_name})"
  1393. result_emoji = "!!" if is_anomaly else "OK"
  1394. alert_str = f"报警=是 {alert_dest}" if trigger_alert else "报警=否"
  1395. # patch异常比例信息
  1396. ratio_str = f"patch比例={avg_anomaly_ratio:.2f}" if anomaly_ratios else "patch比例=N/A"
  1397. logger.info(
  1398. f"[{result_emoji}] {device_code}{cam_label} | "
  1399. f"误差={avg_error:.6f} 阈值={thr_str} {ratio_str} | "
  1400. f"{window_info} | {running_status} | "
  1401. f"{'异常' if is_anomaly else '正常'} | {alert_str}"
  1402. )
  1403. # ========================================
  1404. # 根据投票结果归档文件
  1405. # 用投票后的 is_anomaly 决定归档,避免单次波动误归
  1406. # ========================================
  1407. pending_files = cache.get("pending_files", [])
  1408. # 检查是否是新的异常(从正常变为异常)
  1409. is_new_anomaly = is_anomaly and (last_is_anomaly is None or not last_is_anomaly)
  1410. # 更新异常上下文捕获状态机
  1411. self._update_anomaly_capture_state(device_code, is_anomaly, is_new_anomaly,
  1412. avg_error, threshold, now, pending_files)
  1413. if pending_files:
  1414. if is_anomaly:
  1415. # 异常文件由上下文捕获状态机统一管理
  1416. # 状态机会在收集完前+后文件后一次性保存到 anomaly_detected
  1417. pass
  1418. else:
  1419. # 正常文件:基于重建误差的多样性采样
  1420. for f in pending_files:
  1421. should_keep = self._should_keep_normal_sample(
  1422. device_code, avg_error, now
  1423. )
  1424. if should_keep:
  1425. # 归档到日期目录(供增训使用)
  1426. self._move_audio_to_date_dir(f)
  1427. # 边云协同:正常抽样入队上传
  1428. # AudioUploader 内部自动执行抽样间隔和每日上限控制
  1429. # enabled=false 时 enqueue_normal_sample 直接 return
  1430. if self.audio_uploader:
  1431. model_group = self.multi_predictor.device_model_map.get(device_code, device_code)
  1432. self.audio_uploader.enqueue_normal_sample(
  1433. device_code=device_code,
  1434. wav_path=f,
  1435. model_group=model_group,
  1436. avg_error=avg_error,
  1437. threshold=threshold
  1438. )
  1439. else:
  1440. # 冗余或增训关闭,直接删除节省磁盘
  1441. try:
  1442. f.unlink()
  1443. except Exception as e:
  1444. logger.warning(f"删除冗余正常音频失败: {f.name} | {e}")
  1445. # 状态恢复正常时,清除保存冷却时间
  1446. if device_code in self.last_anomaly_save_time:
  1447. del self.last_anomaly_save_time[device_code]
  1448. # 重置缓存
  1449. cache["errors"] = []
  1450. cache["anomaly_ratios"] = []
  1451. cache["audio_data"] = []
  1452. cache["pending_files"] = []
  1453. cache["last_upload"] = now
  1454. logger.info("─" * 60)
  1455. def _update_anomaly_capture_state(self, device_code, is_anomaly,
  1456. is_new_anomaly, avg_error,
  1457. threshold, now, pending_files):
  1458. """
  1459. 异常上下文捕获状态机
  1460. 状态流转:
  1461. 1. 无状态 + 新异常 -> 触发捕获,回溯 audio_file_history 获取前置文件
  1462. 2. 已触发 + 异常持续 -> 持续收集异常文件
  1463. 3. 已触发 + 异常结束 -> 开始收集后续文件(post阶段)
  1464. 4. post阶段 + 时间到 -> 保存所有文件到 anomaly_detected
  1465. 所有异常文件(从触发到结束)统一保存到 anomaly_detected 目录,
  1466. 包含 metadata.json 记录误差/阈值等信息。
  1467. """
  1468. state = self.anomaly_capture_state.get(device_code)
  1469. if is_new_anomaly and state is None:
  1470. # 新异常触发:回溯获取前置文件
  1471. anomaly_cutoff = now - timedelta(minutes=1)
  1472. pre_cutoff = now - timedelta(minutes=self.context_pre_minutes + 1)
  1473. pre_files = []
  1474. anomaly_files = []
  1475. for ts, fpath in self.audio_file_history[device_code]:
  1476. if not fpath.exists():
  1477. continue
  1478. if ts >= anomaly_cutoff:
  1479. anomaly_files.append(fpath)
  1480. elif ts >= pre_cutoff:
  1481. pre_files.append(fpath)
  1482. # 兜底:如果回溯没找到,把当前 pending 加入
  1483. if not anomaly_files and pending_files:
  1484. anomaly_files = [f for f in pending_files if f.exists()]
  1485. self.anomaly_capture_state[device_code] = {
  1486. "trigger_time": now,
  1487. "avg_error": avg_error,
  1488. "threshold": threshold,
  1489. "pre_files": pre_files,
  1490. "anomaly_files": anomaly_files,
  1491. "post_files": [],
  1492. "anomaly_ended": False,
  1493. "post_start_time": None
  1494. }
  1495. logger.info(f"异常上下文捕获已触发: {device_code} | "
  1496. f"前置={len(pre_files)}个 | 异常={len(anomaly_files)}个")
  1497. elif state is not None:
  1498. if is_anomaly and not state["anomaly_ended"]:
  1499. # 异常持续中:继续收集异常文件
  1500. for f in pending_files:
  1501. if f.exists():
  1502. state["anomaly_files"].append(f)
  1503. else:
  1504. # 异常结束(或之前已结束,在收集后续文件)
  1505. if not state["anomaly_ended"]:
  1506. state["anomaly_ended"] = True
  1507. state["post_start_time"] = now
  1508. logger.info(f"异常结束,开始收集后续文件: {device_code} | "
  1509. f"异常文件共{len(state['anomaly_files'])}个 | "
  1510. f"等待{self.context_post_minutes}分钟")
  1511. # 收集后续文件
  1512. for f in pending_files:
  1513. if f.exists():
  1514. state["post_files"].append(f)
  1515. # 检查 post 阶段是否到时间
  1516. elapsed_post = (now - state["post_start_time"]).total_seconds() / 60
  1517. if elapsed_post >= self.context_post_minutes:
  1518. self._save_anomaly_context(device_code, state)
  1519. # 边云协同:异常事件入队上传
  1520. # 异常上下文保存完成后,将整个事件目录加入上传队列
  1521. # enabled=false 时 enqueue_anomaly_event 直接 return
  1522. if self.audio_uploader:
  1523. anomaly_files = state.get('anomaly_files', [])
  1524. if anomaly_files:
  1525. event_dir = self.save_anomaly_dir / device_code / anomaly_files[0].stem
  1526. model_group = self.multi_predictor.device_model_map.get(device_code, device_code)
  1527. self.audio_uploader.enqueue_anomaly_event(
  1528. device_code=device_code,
  1529. event_dir=event_dir,
  1530. metadata=state,
  1531. model_group=model_group
  1532. )
  1533. del self.anomaly_capture_state[device_code]
  1534. self.last_anomaly_save_time[device_code] = now
  1535. def _save_anomaly_context(self, device_code: str, state: dict):
  1536. """
  1537. 保存异常上下文文件到独立目录
  1538. 目录结构:
  1539. data/anomaly_detected/{device_code}/{异常文件名(不含扩展名)}/
  1540. ├── 92_1#-1_20260130140313.wav (保持原文件名)
  1541. ├── ...
  1542. └── metadata.json
  1543. metadata.json 字段说明:
  1544. - before_trigger: 触发前 1~(N+1) 分钟的文件(正常时期,用于对比)
  1545. - at_trigger: 触发时刻前 1 分钟内的文件(异常开始出现的时期)
  1546. - after_trigger: 触发后 N 分钟的文件(不管异常是否恢复)
  1547. 参数:
  1548. device_code: 设备编号
  1549. state: 捕获状态字典
  1550. """
  1551. import shutil
  1552. import json
  1553. try:
  1554. # 获取第一个异常文件名作为文件夹名
  1555. anomaly_files = state.get("anomaly_files", [])
  1556. if not anomaly_files:
  1557. logger.warning(f"无异常文件,跳过保存: {device_code}")
  1558. return
  1559. # 用第一个异常文件名(不含扩展名)作为文件夹名
  1560. first_anomaly = anomaly_files[0]
  1561. folder_name = first_anomaly.stem # 如 92_1#-1_20260130140313
  1562. save_dir = self.save_anomaly_dir / device_code / folder_name
  1563. save_dir.mkdir(parents=True, exist_ok=True)
  1564. # 收集所有文件名(使用新命名)
  1565. all_files = {"before_trigger": [], "at_trigger": [], "after_trigger": []}
  1566. # 移动前置文件(来自日期目录,移动后原位置不再保留)
  1567. for fpath in state.get("pre_files", []):
  1568. if fpath.exists():
  1569. dest = save_dir / fpath.name
  1570. shutil.move(str(fpath), str(dest)) # 移动而非复制,避免重复
  1571. all_files["before_trigger"].append(fpath.name)
  1572. # 移动触发时刻文件(保持原名)
  1573. for fpath in anomaly_files:
  1574. if fpath.exists():
  1575. dest = save_dir / fpath.name
  1576. shutil.move(str(fpath), str(dest))
  1577. all_files["at_trigger"].append(fpath.name)
  1578. # 移动后续文件(保持原名)
  1579. for fpath in state.get("post_files", []):
  1580. if fpath.exists():
  1581. dest = save_dir / fpath.name
  1582. shutil.move(str(fpath), str(dest))
  1583. all_files["after_trigger"].append(fpath.name)
  1584. # 生成精简的元数据文件
  1585. trigger_time = state.get("trigger_time")
  1586. metadata = {
  1587. "device_code": device_code,
  1588. "trigger_time": trigger_time.strftime("%Y-%m-%d %H:%M:%S") if trigger_time else None,
  1589. "avg_error": round(state.get("avg_error", 0.0), 6),
  1590. "threshold": round(state.get("threshold", 0.0), 6),
  1591. "files": all_files
  1592. }
  1593. metadata_path = save_dir / "metadata.json"
  1594. with open(metadata_path, 'w', encoding='utf-8') as f:
  1595. json.dump(metadata, f, ensure_ascii=False, indent=2)
  1596. total = sum(len(v) for v in all_files.values())
  1597. logger.warning(f"异常上下文已保存: {device_code}/{folder_name} | "
  1598. f"共{total}个文件 (前{len(all_files['before_trigger'])}+异常{len(all_files['at_trigger'])}+后{len(all_files['after_trigger'])})")
  1599. except Exception as e:
  1600. logger.error(f"保存异常上下文失败: {device_code} | {e}")
  1601. def _compute_frequency_middle(self, device_code):
  1602. """
  1603. 计算历史频谱图平均值(normal_frequency_middle)
  1604. 参数:
  1605. device_code: 设备编号
  1606. 返回:
  1607. dB值列表的平均值
  1608. """
  1609. history = self.freq_history.get(device_code, [])
  1610. if not history or len(history) < 2:
  1611. return []
  1612. try:
  1613. # 收集所有历史dB值(只保留长度一致的)
  1614. all_db = []
  1615. ref_len = len(history[0][1]) # 使用第一条记录的长度作为参考
  1616. for _, db in history:
  1617. if len(db) == ref_len:
  1618. all_db.append(db)
  1619. if not all_db:
  1620. return []
  1621. # 计算各频率点的平均dB
  1622. avg_db = np.mean(all_db, axis=0).tolist()
  1623. return avg_db
  1624. except Exception as e:
  1625. logger.error(f"计算频谱图历史平均失败: {e}")
  1626. return []
  1627. # ------------------------------------------------------------------ #
  1628. # SCADA JWT 自动管理 #
  1629. # ------------------------------------------------------------------ #
  1630. def _can_auto_login(self) -> bool:
  1631. return bool(self._scada_login_url and self._scada_login_username and self._scada_login_password)
  1632. @staticmethod
  1633. def _extract_token(payload: dict) -> str:
  1634. if not isinstance(payload, dict):
  1635. return ""
  1636. candidates = []
  1637. data = payload.get("data")
  1638. if isinstance(data, dict):
  1639. candidates.extend([
  1640. data.get("token"), data.get("jwt"), data.get("jwtToken"),
  1641. data.get("accessToken"), data.get("access_token"),
  1642. ])
  1643. elif isinstance(data, str):
  1644. candidates.append(data)
  1645. candidates.extend([
  1646. payload.get("token"), payload.get("jwt"), payload.get("jwtToken"),
  1647. payload.get("accessToken"), payload.get("access_token"),
  1648. ])
  1649. for value in candidates:
  1650. if isinstance(value, str) and value.strip():
  1651. return value.strip()
  1652. return ""
  1653. def _login_and_get_jwt(self) -> bool:
  1654. if not self._can_auto_login():
  1655. return False
  1656. body = _json.dumps({
  1657. "UserName": self._scada_login_username,
  1658. "Password": self._scada_login_password,
  1659. "type": self._scada_login_type,
  1660. "DepId": self._scada_login_dep_id,
  1661. }).encode("utf-8")
  1662. req = _urllib_request.Request(
  1663. self._scada_login_url, data=body,
  1664. headers={"Content-Type": "application/json"}, method="POST",
  1665. )
  1666. try:
  1667. with _urllib_request.urlopen(req, timeout=self.scada_timeout) as resp:
  1668. data = _json.loads(resp.read().decode("utf-8"))
  1669. token = self._extract_token(data)
  1670. if token:
  1671. self.scada_jwt = token
  1672. logger.info("SCADA 登录成功,JWT 已刷新")
  1673. return True
  1674. logger.warning("SCADA 登录成功但响应内无 JWT 字段")
  1675. except (_urllib_error.URLError, TimeoutError, _json.JSONDecodeError, Exception) as e:
  1676. logger.warning("SCADA 登录失败: %s", e)
  1677. return False
  1678. def _ensure_jwt(self) -> bool:
  1679. if self.scada_jwt:
  1680. return True
  1681. with self._scada_auth_lock:
  1682. if self.scada_jwt:
  1683. return True
  1684. return self._login_and_get_jwt()
  1685. def _clear_jwt(self) -> None:
  1686. with self._scada_auth_lock:
  1687. self.scada_jwt = ""
  1688. def _get_threshold(self, device_code):
  1689. """
  1690. 获取指定设备的阈值
  1691. 优先级:
  1692. 1. 从 multi_predictor 获取设备对应模型的阈值
  1693. 2. 使用配置文件中的默认阈值
  1694. 3. 返回0.0(不进行异常判定)
  1695. 参数:
  1696. device_code: 设备编号(如 "LT-1")
  1697. 返回:
  1698. 阈值,未找到返回默认值或0.0
  1699. """
  1700. # 方式1:从 multi_predictor 获取设备阈值
  1701. thr = self.multi_predictor.get_threshold(device_code)
  1702. # 应用倍率放宽 (如 1.2 倍),用于防止训练基线过严导致常亮误报
  1703. multiplier = self.config.get('prediction', {}).get('threshold_multiplier', 1.0)
  1704. if thr is not None:
  1705. if multiplier != 1.0:
  1706. thr = thr * multiplier
  1707. logger.debug(f"阈值来源: {device_code} -> 原始模型={thr/multiplier if multiplier!=0 else 0:.6f} × 倍数{multiplier} = 最终判定阈值{thr:.6f}")
  1708. return thr
  1709. # 方式2:使用配置中的默认阈值
  1710. default_threshold = self.config.get('prediction', {}).get('default_threshold', 0.0)
  1711. if default_threshold > 0:
  1712. logger.debug(f"阈值来源: {device_code} -> 配置默认值 = {default_threshold:.6f}")
  1713. return default_threshold
  1714. # 首次找不到阈值时记录警告
  1715. if device_code not in getattr(self, '_threshold_warned', set()):
  1716. if not hasattr(self, '_threshold_warned'):
  1717. self._threshold_warned = set()
  1718. self._threshold_warned.add(device_code)
  1719. logger.warning(f"未找到阈值: {device_code},将跳过异常判定")
  1720. return 0.0
  1721. def _get_inlet_flow(self, stream_config: RTSPStreamConfig) -> float:
  1722. """
  1723. 获取进水流量(使用实时数据接口)
  1724. 使用 current-data 接口直接获取最新一条数据
  1725. 参数:
  1726. stream_config: 流配置
  1727. 返回:
  1728. 进水流量值,失败返回0.0
  1729. """
  1730. if not self.scada_enabled or not stream_config:
  1731. logger.debug(f"流量跳过: scada_enabled={self.scada_enabled}, stream_config={stream_config}")
  1732. return 0.0
  1733. # 获取PLC数据点位
  1734. plc_address = stream_config.get_flow_plc_address()
  1735. if not plc_address:
  1736. logger.debug(f"流量跳过(无PLC地址): {stream_config.device_code} | pump_name='{stream_config.pump_name}'")
  1737. return 0.0
  1738. # 使用水厂配置的 project_id
  1739. project_id = stream_config.project_id
  1740. try:
  1741. # 当前时间戳(毫秒)
  1742. now_ms = int(datetime.now().timestamp() * 1000)
  1743. # 请求参数
  1744. params = {"time": now_ms}
  1745. # 请求体:使用实时数据接口格式
  1746. request_body = [
  1747. {
  1748. "deviceId": "1",
  1749. "deviceItems": plc_address,
  1750. "deviceName": f"流量_{stream_config.pump_name}",
  1751. "project_id": project_id
  1752. }
  1753. ]
  1754. logger.debug(f"流量请求: {stream_config.device_code} | project_id={project_id} | plc={plc_address}")
  1755. for attempt in range(2):
  1756. if not self._ensure_jwt():
  1757. logger.warning("流量查询: JWT 不可用")
  1758. break
  1759. headers = {
  1760. "Content-Type": "application/json",
  1761. "JWT-TOKEN": self.scada_jwt
  1762. }
  1763. # 发送 POST 请求到实时接口
  1764. response = requests.post(
  1765. self.scada_realtime_url,
  1766. params=params,
  1767. json=request_body,
  1768. headers=headers,
  1769. timeout=self.scada_timeout
  1770. )
  1771. if response.status_code == 200:
  1772. data = response.json()
  1773. code = data.get("code")
  1774. # 业务层 401/403 也触发 token 刷新
  1775. if code in (401, 403) and self._can_auto_login() and attempt == 0:
  1776. self._clear_jwt()
  1777. continue
  1778. if code == 200:
  1779. if data.get("data"):
  1780. latest = data["data"][0]
  1781. if "val" in latest:
  1782. flow = float(latest["val"])
  1783. logger.debug(f"流量获取成功: {stream_config.device_code} | 流量={flow}")
  1784. return flow
  1785. else:
  1786. logger.debug(f"流量数据无val字段: {stream_config.device_code}")
  1787. else:
  1788. logger.debug(f"流量查询无数据: {stream_config.device_code}")
  1789. else:
  1790. logger.warning(f"流量API返回异常: {stream_config.device_code} | code={code} | msg={data.get('msg')}")
  1791. elif response.status_code in (401, 403) and self._can_auto_login() and attempt == 0:
  1792. self._clear_jwt()
  1793. if self._ensure_jwt():
  1794. continue
  1795. break
  1796. else:
  1797. logger.warning(f"流量HTTP错误: {stream_config.device_code} | status={response.status_code}")
  1798. break
  1799. except Exception as e:
  1800. logger.warning(f"流量获取异常: {stream_config.device_code} | {e}")
  1801. return 0.0
  1802. def _compute_frequency_spectrum(self, audio_data):
  1803. """
  1804. 计算频谱图数据
  1805. 将1分钟内的多个8秒音频片段合并,计算整体FFT
  1806. 参数:
  1807. audio_data: 音频数据列表
  1808. 返回:
  1809. dB值列表(0-8000Hz均匀分布,共400个点)
  1810. """
  1811. if not audio_data:
  1812. return []
  1813. try:
  1814. # 合并所有音频片段
  1815. combined = np.concatenate(audio_data)
  1816. # 计算FFT
  1817. n = len(combined)
  1818. fft_result = np.fft.rfft(combined)
  1819. freqs = np.fft.rfftfreq(n, 1.0 / CFG.SR)
  1820. # 计算幅度(转换为dB)
  1821. magnitude = np.abs(fft_result)
  1822. # 避免log(0)
  1823. magnitude = np.maximum(magnitude, 1e-10)
  1824. db = 20 * np.log10(magnitude)
  1825. # 降采样到400个点(0-8000Hz均匀分布)
  1826. max_freq = 8000
  1827. num_points = 400
  1828. db_list = []
  1829. for i in range(num_points):
  1830. # 计算目标频率(0到8000Hz均匀分布)
  1831. target_freq = (i / (num_points - 1)) * max_freq
  1832. # 找到最接近的频率索引
  1833. idx = np.argmin(np.abs(freqs - target_freq))
  1834. if idx < len(freqs):
  1835. db_list.append(float(db[idx]))
  1836. return db_list
  1837. except Exception as e:
  1838. logger.error(f"计算频谱图失败: {e}")
  1839. return []
  1840. def _push_detection_result(self, stream_config, device_code,
  1841. is_anomaly, trigger_alert, abnormal_score, score_threshold,
  1842. running_status, inlet_flow,
  1843. freq_db, freq_middle_db=None,
  1844. anomaly_type_code=6,
  1845. abnormal_wav_b64=""):
  1846. """
  1847. 推送检测结果到远程服务器
  1848. 上报格式符合用户要求:
  1849. - 包含频谱图数据(this_frequency + normal_frequency_middle)
  1850. - 包含启停状态和进水流量
  1851. - sound_detection.status 和 abnormalwav 仅在 trigger_alert=True 时上报异常(0)和base64,否则为正常(1)和空
  1852. 参数:
  1853. stream_config: 流配置
  1854. device_code: 设备编号
  1855. is_anomaly: 实际检测是否异常
  1856. trigger_alert: 是否触发报警(仅在新异常产生时为True)
  1857. abnormal_score: 平均重建误差
  1858. score_threshold: 阈值
  1859. running_status: 启停状态
  1860. inlet_flow: 进水流量
  1861. freq_db: 本次频谱图dB值列表
  1862. freq_middle_db: 历史平均频谱图dB值列表
  1863. abnormal_wav_b64: 预读的异常音频base64编码(在文件归档前读取,避免竞态)
  1864. """
  1865. try:
  1866. import time as time_module
  1867. # 获取设备名称
  1868. camera_name = stream_config.camera_name
  1869. # 根据远程模式构建推送目标列表
  1870. if not self.push_production_urls and not self.push_test_urls:
  1871. logger.warning(f"未配置push_base_urls: {device_code}")
  1872. return
  1873. # 获取该设备对应的 project_id,用于拼接最终推送URL
  1874. project_id = stream_config.project_id
  1875. # 构建推送消息
  1876. request_time = int(time_module.time() * 1000)
  1877. # 异常音频已通过参数 abnormal_wav_b64 传入(调用方在文件归档前预读)
  1878. # 如果调用方未提供,记录警告以便排查
  1879. if trigger_alert and not abnormal_wav_b64:
  1880. logger.warning(f"报警推送但无异常音频数据: {device_code}")
  1881. # 决定 sound_detection.status
  1882. # 只有在 trigger_alert=True 时才报 0 (异常)
  1883. # 其他情况(正常、持续异常)都报 1 (正常/空) -- 根据用户要求: "正常情况下 ... 是空 就行"
  1884. report_status = 0 if trigger_alert else 1
  1885. payload = {
  1886. "message": {
  1887. # 通道信息
  1888. "channelInfo": {"name": camera_name},
  1889. # 请求时间戳
  1890. "requestTime": request_time,
  1891. # 分类信息
  1892. "classification": {
  1893. "level_one": 2, # 音频检测大类
  1894. "level_two": anomaly_type_code # 异常类型小类(6=未分类, 7=轴承, 8=气蚀, 9=松动, 10=叶轮, 11=阀件)
  1895. },
  1896. # 技能信息
  1897. "skillInfo": {"name": "异响检测"},
  1898. # 声音检测数据
  1899. "sound_detection": {
  1900. # 异常音频
  1901. "abnormalwav": abnormal_wav_b64,
  1902. # 状态:0=异常(仅新异常), 1=正常(或持续异常)
  1903. "status": report_status,
  1904. # 设备状态信息
  1905. "condition": {
  1906. "running_status": "运行中" if running_status == "开机" else "停机中",
  1907. "inlet_flow": inlet_flow
  1908. },
  1909. # 得分信息
  1910. "score": {
  1911. "abnormal_score": abnormal_score, # 当前1分钟平均重构误差
  1912. "score_threshold": score_threshold # 该设备异常阀值
  1913. },
  1914. # 频谱图数据
  1915. "frequency": {
  1916. "this_frequency": freq_db, # 当前1分钟的频谱图
  1917. "normal_frequency_middle": freq_middle_db or [], # 过去10分钟的频谱图平均
  1918. "normal_frequency_upper": [], # 上限(暂为空)
  1919. "normal_frequency_lower": [] # 下限(暂为空)
  1920. }
  1921. }
  1922. }
  1923. }
  1924. # 根据远程项目模式筛选推送目标
  1925. # mode 1/2(日常/参观):推 production + test
  1926. # mode 3(检修):不推送(已由 _check_periodic_upload 拦截,此处兜底)
  1927. # mode 4(调试):仅推 test
  1928. if self._remote_mode_id == 3:
  1929. logger.debug(f"检修模式,跳过推送: {device_code}")
  1930. return
  1931. if self._remote_mode_id == 4:
  1932. # 调试模式:仅推送测试地址
  1933. active_urls = [(u, '测试') for u in self.push_test_urls]
  1934. else:
  1935. # 日常/参观模式:推送所有地址
  1936. active_urls = [(u, '正式') for u in self.push_production_urls]
  1937. active_urls += [(u, '测试') for u in self.push_test_urls]
  1938. if not active_urls:
  1939. logger.warning(f"当前模式(mode_id={self._remote_mode_id})下无可用推送目标: {device_code}")
  1940. return
  1941. push_targets = [
  1942. (f"{url}/{project_id}", label)
  1943. for url, label in active_urls
  1944. ]
  1945. # 逐个目标推送(各目标互不影响)
  1946. for target_url, target_label in push_targets:
  1947. self._send_to_target(
  1948. target_url, target_label, payload,
  1949. device_code, camera_name, trigger_alert, abnormal_score
  1950. )
  1951. except Exception as e:
  1952. logger.error(f"推送通知异常: {e}")
  1953. def _send_to_target(self, target_url, target_label, payload,
  1954. device_code, camera_name, trigger_alert, abnormal_score):
  1955. # 向单个推送目标发送数据(含重试逻辑)
  1956. # 各目标独立调用,某个目标失败不影响其他目标
  1957. import time as time_module
  1958. push_success = False
  1959. for attempt in range(self.push_retry_count + 1):
  1960. try:
  1961. response = requests.post(
  1962. target_url,
  1963. json=payload,
  1964. timeout=self.push_timeout,
  1965. headers={"Content-Type": "application/json"}
  1966. )
  1967. if response.status_code == 200:
  1968. alert_tag = "报警" if trigger_alert else "心跳"
  1969. logger.info(
  1970. f" [{alert_tag}][{target_label}] {device_code}({camera_name}) | "
  1971. f"误差={abnormal_score:.6f}"
  1972. )
  1973. push_success = True
  1974. break
  1975. else:
  1976. logger.warning(
  1977. f"推送失败[{target_label}]: {device_code} | URL={target_url} | "
  1978. f"状态码={response.status_code} | 内容={response.text[:100]}"
  1979. )
  1980. except requests.exceptions.Timeout:
  1981. logger.warning(f"推送超时[{target_label}]: {device_code} | URL={target_url} | 尝试 {attempt + 1}/{self.push_retry_count + 1}")
  1982. except requests.exceptions.RequestException as e:
  1983. logger.warning(f"推送异常[{target_label}]: {device_code} | URL={target_url} | {e}")
  1984. # 重试间隔
  1985. if attempt < self.push_retry_count:
  1986. time_module.sleep(1)
  1987. if not push_success:
  1988. logger.error(f"推送失败[{target_label}]: {device_code} | URL={target_url} | 已达最大重试次数")
  1989. def _should_keep_normal_sample(self, device_code, avg_error, now):
  1990. # 基于重建误差的多样性采样,决定正常音频是否保留
  1991. #
  1992. # 策略(不论增训是否开启,数据始终是训练的基础):
  1993. # 1. 每小时配额制 + 误差多样性判定
  1994. # - 已保留样本中有误差接近的 → 视为冗余,丢弃
  1995. # - 误差有新信息 → 保留
  1996. # - 最后 10 分钟配额未满 → 放宽标准凑够配额
  1997. current_hour = now.hour
  1998. current_minute = now.minute
  1999. state_key = (device_code, current_hour)
  2000. # 获取或创建当前小时的采样状态
  2001. if state_key not in self._sample_state:
  2002. self._sample_state[state_key] = {
  2003. "kept_errors": [],
  2004. "kept_count": 0
  2005. }
  2006. # 清理过期的小时状态(只保留当前小时和上一小时)
  2007. expired_keys = [
  2008. k for k in self._sample_state
  2009. if k[0] == device_code and k[1] != current_hour and k[1] != (current_hour - 1) % 24
  2010. ]
  2011. for k in expired_keys:
  2012. del self._sample_state[k]
  2013. state = self._sample_state[state_key]
  2014. # 配额已满,直接丢弃
  2015. if state["kept_count"] >= self._keep_hourly_samples:
  2016. return False
  2017. # 自适应 epsilon:前 50 分钟用正常值,最后 10 分钟逐步降为 0(放宽标准凑配额)
  2018. if current_minute >= 50:
  2019. # 最后 10 分钟:epsilon 线性衰减到 0
  2020. decay_ratio = (60 - current_minute) / 10.0
  2021. epsilon = self._diversity_base_epsilon * decay_ratio
  2022. else:
  2023. epsilon = self._diversity_base_epsilon
  2024. # 计算与已保留样本的最小误差距离
  2025. kept_errors = state["kept_errors"]
  2026. if kept_errors:
  2027. min_distance = min(abs(avg_error - e) for e in kept_errors)
  2028. # 误差太接近已保留的样本,视为冗余
  2029. if min_distance < epsilon:
  2030. return False
  2031. # 通过多样性检查,保留此样本
  2032. state["kept_errors"].append(avg_error)
  2033. state["kept_count"] += 1
  2034. logger.debug(
  2035. f"正常音频采样保留: {device_code} | "
  2036. f"误差={avg_error:.6f} | "
  2037. f"本小时已保留={state['kept_count']}/{self._keep_hourly_samples}"
  2038. )
  2039. return True
  2040. def _move_audio_to_date_dir(self, wav_file):
  2041. """
  2042. 将音频移动到日期目录归档
  2043. 目录结构:
  2044. deploy_pickup/data/{device_code}/{日期}/{文件名}
  2045. 参数:
  2046. wav_file: 音频文件路径
  2047. """
  2048. try:
  2049. import shutil
  2050. # 从文件名提取device_code和日期
  2051. # 格式: 92_1#-1_20251218142000.wav
  2052. match = re.match(r'\d+_(.+)_(\d{8})\d{6}\.wav', wav_file.name)
  2053. if not match:
  2054. logger.warning(f"无法从文件名提取信息: {wav_file.name}")
  2055. return
  2056. device_code = match.group(1) # 如 1#-1
  2057. date_str = match.group(2) # YYYYMMDD
  2058. # 构建目标目录: data/{device_code}/{日期}/
  2059. date_dir = self.audio_dir / device_code / date_str
  2060. date_dir.mkdir(parents=True, exist_ok=True)
  2061. # 移动文件
  2062. dest_file = date_dir / wav_file.name
  2063. shutil.move(str(wav_file), str(dest_file))
  2064. logger.debug(f"音频已归档: {device_code}/{date_str}/{wav_file.name}")
  2065. except Exception as e:
  2066. logger.error(f"移动音频失败: {wav_file.name} | 错误: {e}")
  2067. def _move_audio_to_transition_dir(self, wav_file, reason):
  2068. """
  2069. 将泵停机/过渡期的音频移动到过渡期目录
  2070. 目录结构:
  2071. deploy_pickup/data/{device_code}/pump_transition/{文件名}
  2072. 这些音频不会被用于模型训练,但保留用于分析调试
  2073. 参数:
  2074. wav_file: 音频文件路径
  2075. reason: 原因标识(stopped=停机, transition=过渡期)
  2076. """
  2077. try:
  2078. import shutil
  2079. # 从文件名提取device_code
  2080. # 格式: 92_1#-1_20251218142000.wav
  2081. match = re.match(r'\d+_(.+)_\d{14}\.wav', wav_file.name)
  2082. if not match:
  2083. logger.warning(f"无法从文件名提取信息: {wav_file.name}")
  2084. return
  2085. device_code = match.group(1) # 如 1#-1
  2086. # 构建目标目录: data/{device_code}/pump_transition/
  2087. transition_dir = self.audio_dir / device_code / "pump_transition"
  2088. transition_dir.mkdir(parents=True, exist_ok=True)
  2089. # 移动文件
  2090. dest_file = transition_dir / wav_file.name
  2091. shutil.move(str(wav_file), str(dest_file))
  2092. logger.debug(f"过渡期音频已归档: {device_code}/pump_transition/{wav_file.name} ({reason})")
  2093. except Exception as e:
  2094. logger.error(f"移动过渡期音频失败: {wav_file.name} | 错误: {e}")
  2095. def _cleanup_stale_current_files(self):
  2096. # 定期清理 current 目录中超龄滞留的文件(兜底机制)
  2097. # 正常文件应在 ~60s 内被处理并移走,超过 5 分钟仍在 current 的视为异常
  2098. # 每 5 分钟执行一次(通过时间戳控制频率,避免每秒都遍历目录浪费 CPU)
  2099. now_ts = time.time()
  2100. if not hasattr(self, '_last_stale_cleanup'):
  2101. self._last_stale_cleanup = now_ts
  2102. # 5 分钟执行一次
  2103. if now_ts - self._last_stale_cleanup < 300:
  2104. return
  2105. self._last_stale_cleanup = now_ts
  2106. stale_threshold = 300 # 5 分钟
  2107. cleaned = 0
  2108. for device_code in self.device_map.keys():
  2109. current_dir = self.audio_dir / device_code / "current"
  2110. if not current_dir.exists():
  2111. continue
  2112. for wav_file in current_dir.glob("*.wav"):
  2113. try:
  2114. age = now_ts - wav_file.stat().st_mtime
  2115. if age > stale_threshold:
  2116. # 超龄文件归档到日期目录(保留数据价值)
  2117. self._move_audio_to_date_dir(wav_file)
  2118. # 从 seen_files 中移除(Path 对象,移走后路径已失效)
  2119. self.seen_files.discard(wav_file)
  2120. cleaned += 1
  2121. except Exception:
  2122. pass
  2123. if cleaned > 0:
  2124. logger.info(f"current目录兜底清理: 移走 {cleaned} 个超龄滞留文件")
  2125. def _cleanup_old_files(self, days: int = None):
  2126. # 定期清理音频归档目录
  2127. #
  2128. # 清理规则:
  2129. # 1. 超过 keep_days 天的日期目录 → 整个删除
  2130. # 2. 未过期但非当天的日期目录 → 按小时分桶抽样保留
  2131. # 3. 当天的日期目录 → 不清理
  2132. # 4. current → 跳过(由 _cleanup_stale_current_files 负责)
  2133. # 5. pump_transition → 按 mtime 清理超过 keep_days 的文件
  2134. # 6. verified_normal → 按 mtime 清理超过 keep_days 的文件
  2135. keep_days = days if days is not None else self._keep_days
  2136. try:
  2137. import shutil
  2138. now = datetime.now()
  2139. cutoff_date = now - timedelta(days=keep_days)
  2140. cutoff_str = cutoff_date.strftime("%Y%m%d")
  2141. cutoff_ts = (now - timedelta(days=keep_days)).timestamp()
  2142. today_str = now.strftime("%Y%m%d")
  2143. deleted_dirs = 0
  2144. trimmed_files = 0
  2145. transition_cleaned = 0
  2146. for device_dir in self.audio_dir.iterdir():
  2147. if not device_dir.is_dir():
  2148. continue
  2149. for subdir in device_dir.iterdir():
  2150. if not subdir.is_dir():
  2151. continue
  2152. # current 由 _cleanup_stale_current_files 负责
  2153. if subdir.name == "current":
  2154. continue
  2155. # pump_transition / verified_normal:按文件 mtime 清理
  2156. if subdir.name in ("pump_transition", "verified_normal"):
  2157. transition_cleaned += self._cleanup_flat_dir_by_mtime(
  2158. subdir, cutoff_ts
  2159. )
  2160. continue
  2161. # 只处理日期目录(YYYYMMDD 格式)
  2162. if not re.match(r'^\d{8}$', subdir.name):
  2163. continue
  2164. # 过期目录:整个删除
  2165. if subdir.name < cutoff_str:
  2166. try:
  2167. shutil.rmtree(subdir)
  2168. deleted_dirs += 1
  2169. logger.debug(f"清理过期目录: {device_dir.name}/{subdir.name}")
  2170. except Exception as e:
  2171. logger.error(f"删除目录失败: {subdir} | {e}")
  2172. continue
  2173. # 当天目录不清理
  2174. if subdir.name == today_str:
  2175. continue
  2176. # 历史未过期目录:按小时分桶抽样保留
  2177. trimmed_files += self._trim_date_dir_by_hour(subdir)
  2178. if deleted_dirs > 0 or trimmed_files > 0 or transition_cleaned > 0:
  2179. logger.info(
  2180. f"清理完成: 删除{deleted_dirs}个过期目录, "
  2181. f"裁剪{trimmed_files}个冗余文件, "
  2182. f"清理{transition_cleaned}个过渡期/核查文件"
  2183. )
  2184. except Exception as e:
  2185. logger.error(f"清理过期文件失败: {e}")
  2186. def _cleanup_flat_dir_by_mtime(self, target_dir, cutoff_ts):
  2187. # 按文件修改时间清理平铺目录(pump_transition / verified_normal)
  2188. # 这些目录没有日期子目录结构,直接存放 wav 文件
  2189. # 删除 mtime 早于 cutoff_ts 的文件,空目录不删除(保持结构)
  2190. cleaned = 0
  2191. freed_bytes = 0
  2192. for f in target_dir.glob("*.wav"):
  2193. try:
  2194. st = f.stat()
  2195. if st.st_mtime < cutoff_ts:
  2196. freed_bytes += st.st_size
  2197. f.unlink()
  2198. cleaned += 1
  2199. except Exception:
  2200. pass
  2201. if cleaned > 0:
  2202. logger.info(
  2203. f"清理 {target_dir.parent.name}/{target_dir.name}: "
  2204. f"删除 {cleaned} 个过期文件, 释放 {freed_bytes / 1e6:.1f}MB"
  2205. )
  2206. return cleaned
  2207. def _trim_date_dir_by_hour(self, date_dir):
  2208. # 对单个日期目录按小时分桶,每桶只保留 keep_hourly_samples 个文件
  2209. # 超出的文件删除,返回删除的文件数
  2210. wav_files = list(date_dir.glob("*.wav"))
  2211. if not wav_files:
  2212. return 0
  2213. # 按小时分桶(从文件名提取小时:{project}_{device}_{YYYYMMDDHHMMSS}.wav)
  2214. hour_buckets = defaultdict(list)
  2215. for f in wav_files:
  2216. match = re.match(r'\d+_.+_\d{8}(\d{2})\d{4}\.wav', f.name)
  2217. if match:
  2218. hour = int(match.group(1))
  2219. hour_buckets[hour].append(f)
  2220. else:
  2221. # 无法解析小时的文件直接保留
  2222. hour_buckets[-1].append(f)
  2223. deleted = 0
  2224. for hour, files in hour_buckets.items():
  2225. if len(files) <= self._keep_hourly_samples:
  2226. continue
  2227. # 按修改时间排序,均匀保留(等间隔取样)
  2228. files_sorted = sorted(files, key=lambda f: f.stat().st_mtime)
  2229. # 计算保留间隔
  2230. keep_indices = set()
  2231. step = len(files_sorted) / self._keep_hourly_samples
  2232. for i in range(self._keep_hourly_samples):
  2233. keep_indices.add(int(i * step))
  2234. for idx, f in enumerate(files_sorted):
  2235. if idx not in keep_indices:
  2236. try:
  2237. f.unlink()
  2238. deleted += 1
  2239. except Exception:
  2240. pass
  2241. return deleted
  2242. class PickupMonitoringSystem:
  2243. """
  2244. 拾音器监控系统
  2245. 管理FFmpeg进程和监控线程
  2246. """
  2247. def __init__(self, db_path=None, yaml_config=None, test_mode=False):
  2248. """
  2249. 初始化监控系统
  2250. 参数:
  2251. db_path: SQLite 数据库路径(为 None 时使用默认路径 config/pickup_config.db)
  2252. yaml_config: 若不为 None,则直接使用该 dict 作为配置(跳过 DB)
  2253. test_mode: 测试模式,禁用聚合/冷却/投票,所有URL都推送
  2254. """
  2255. from config.loader import load_project_config
  2256. # 测试模式标记,传递给 PickupMonitor
  2257. self.test_mode = test_mode
  2258. if yaml_config is not None:
  2259. # 向下兼容:如果外部强行传入了 yaml_config(例如从 run_with_auto_training),直接使用
  2260. self.config = yaml_config
  2261. self.config_manager = None
  2262. print("配置源: YAML (外部传入)")
  2263. else:
  2264. # 统一加载
  2265. config, source, cm = load_project_config()
  2266. self.config = config
  2267. self.config_manager = cm
  2268. # 冷启动模式标记
  2269. self.cold_start_mode = False
  2270. # 初始化多模型预测器(支持每个设备独立模型)
  2271. self.multi_predictor = MultiModelPredictor()
  2272. self.predictor = None # 兼容性保留,已废弃
  2273. # 从配置中注册所有设备的模型目录映射
  2274. print("\n正在初始化多模型预测器...")
  2275. for plant in self.config.get('plants', []):
  2276. if not plant.get('enabled', False):
  2277. continue
  2278. for stream in plant.get('rtsp_streams', []):
  2279. device_code = stream.get('device_code', '')
  2280. model_subdir = stream.get('model_subdir', device_code)
  2281. if device_code and model_subdir:
  2282. self.multi_predictor.register_device(device_code, model_subdir)
  2283. print(f" 注册设备: {device_code} -> models/{model_subdir}/")
  2284. print(f"已注册 {len(self.multi_predictor.registered_devices)} 个设备模型映射")
  2285. # 进程和监控器列表
  2286. self.ffmpeg_processes = []
  2287. self.monitors = []
  2288. # 信号处理
  2289. signal.signal(signal.SIGINT, self._signal_handler)
  2290. signal.signal(signal.SIGTERM, self._signal_handler)
  2291. def _load_config(self):
  2292. """
  2293. 从 SQLite DB 加载配置
  2294. 返回:
  2295. Dict: 配置字典
  2296. """
  2297. config = self.config_manager.get_full_config()
  2298. logger.info("配置已从 SQLite 加载")
  2299. return config
  2300. def _parse_rtsp_streams(self):
  2301. """
  2302. 解析配置文件中的RTSP流信息
  2303. 返回:
  2304. List[RTSPStreamConfig]: RTSP流配置列表
  2305. """
  2306. streams = []
  2307. plants = self.config.get('plants', [])
  2308. if not plants:
  2309. raise ValueError("配置文件中未找到水厂配置")
  2310. for plant in plants:
  2311. plant_name = plant.get('name')
  2312. if not plant_name:
  2313. print("警告: 跳过未命名的区域配置")
  2314. continue
  2315. # 检查是否启用该水厂(默认启用以兼容旧配置)
  2316. if not plant.get('enabled', True):
  2317. logger.debug(f"跳过禁用的水厂: {plant_name}")
  2318. continue
  2319. # 获取流量PLC配置
  2320. flow_plc = plant.get('flow_plc', {})
  2321. # 获取该水厂的project_id(每个plant有自己的project_id)
  2322. project_id = plant.get('project_id', 92)
  2323. logger.info(f"加载区域配置: {plant_name} | project_id={project_id}")
  2324. rtsp_streams = plant.get('rtsp_streams', [])
  2325. for stream in rtsp_streams:
  2326. url = stream.get('url')
  2327. channel = stream.get('channel')
  2328. camera_name = stream.get('name', '')
  2329. device_code = stream.get('device_code', '')
  2330. pump_name = stream.get('pump_name', '')
  2331. if not url or channel is None:
  2332. print(f"警告: 跳过不完整的RTSP流配置 (区域: {plant_name})")
  2333. continue
  2334. streams.append(RTSPStreamConfig(
  2335. plant_name=plant_name,
  2336. rtsp_url=url,
  2337. channel=channel,
  2338. camera_name=camera_name,
  2339. device_code=device_code,
  2340. pump_name=pump_name,
  2341. flow_plc=flow_plc,
  2342. project_id=project_id
  2343. ))
  2344. return streams
  2345. def start(self):
  2346. """
  2347. 启动监控系统
  2348. """
  2349. print("=" * 70)
  2350. print("拾音器异响检测系统")
  2351. print("=" * 70)
  2352. # 解析流配置
  2353. streams = self._parse_rtsp_streams()
  2354. print(f"\n共配置 {len(streams)} 个拾音设备:")
  2355. for stream in streams:
  2356. print(f" - {stream.device_code} | {stream.camera_name}")
  2357. # 启动FFmpeg进程
  2358. print("\n启动FFmpeg进程...")
  2359. for stream in streams:
  2360. ffmpeg = FFmpegProcess(stream, CFG.AUDIO_DIR, self.config)
  2361. if ffmpeg.start():
  2362. self.ffmpeg_processes.append(ffmpeg)
  2363. else:
  2364. print(f"警告: FFmpeg启动失败,跳过该流: {stream}")
  2365. if not self.ffmpeg_processes:
  2366. print("\n错误: 所有FFmpeg进程均启动失败")
  2367. sys.exit(1)
  2368. print(f"\n成功启动 {len(self.ffmpeg_processes)}/{len(streams)} 个FFmpeg进程")
  2369. # 收集所有流配置(用于PickupMonitor)
  2370. all_stream_configs = [p.stream_config for p in self.ffmpeg_processes]
  2371. # 启动监控线程(统一一个监控器)
  2372. print("\n启动监控线程...")
  2373. check_interval = self.config.get('prediction', {}).get('check_interval', 1.0)
  2374. monitor = PickupMonitor(
  2375. audio_dir=CFG.AUDIO_DIR,
  2376. multi_predictor=self.multi_predictor,
  2377. stream_configs=all_stream_configs,
  2378. check_interval=check_interval,
  2379. config=self.config,
  2380. config_manager=self.config_manager,
  2381. test_mode=self.test_mode
  2382. )
  2383. monitor.start()
  2384. self.monitors.append(monitor)
  2385. print(f"\n成功启动监控线程")
  2386. print("\n" + "=" * 70)
  2387. print("系统已启动,开始监控...")
  2388. print("按 Ctrl+C 停止系统")
  2389. print("=" * 70 + "\n")
  2390. # FFmpeg重启配置
  2391. max_restart_attempts = 5 # 单个进程最大重启次数
  2392. restart_interval_base = 30 # 基础重启间隔(秒)-> 改为30s
  2393. restart_counts = {id(p): 0 for p in self.ffmpeg_processes} # 重启计数
  2394. restart_timers = {} # {pid: next_restart_time} 非阻塞重启调度
  2395. # 主循环(带自动重启)
  2396. try:
  2397. while True:
  2398. now_ts = time.time()
  2399. # 检查每个FFmpeg进程状态
  2400. for ffmpeg in self.ffmpeg_processes:
  2401. if not ffmpeg.is_running():
  2402. pid = id(ffmpeg)
  2403. device_code = ffmpeg.stream_config.device_code
  2404. # 检查重启次数
  2405. if restart_counts.get(pid, 0) < max_restart_attempts:
  2406. # 非阻塞:检查是否到达重启时间
  2407. next_time = restart_timers.get(pid, 0)
  2408. if next_time == 0:
  2409. # 首次检测到停止,记录重启时间
  2410. wait_time = restart_interval_base * (2 ** restart_counts.get(pid, 0))
  2411. restart_timers[pid] = now_ts + wait_time
  2412. logger.warning(f"FFmpeg进程停止: {device_code} | 将在{wait_time}秒后重启 | "
  2413. f"重试次数: {restart_counts.get(pid, 0) + 1}/{max_restart_attempts}")
  2414. elif now_ts >= next_time:
  2415. # 到达重启时间,尝试重启
  2416. if ffmpeg.start():
  2417. logger.info(f"FFmpeg重启成功: {device_code}")
  2418. restart_counts[pid] = 0
  2419. restart_timers.pop(pid, None)
  2420. else:
  2421. restart_counts[pid] = restart_counts.get(pid, 0) + 1
  2422. logger.error(f"FFmpeg重启失败: {device_code}")
  2423. # 安排下一次重启
  2424. wait_time = restart_interval_base * (2 ** restart_counts.get(pid, 0))
  2425. restart_timers[pid] = now_ts + wait_time
  2426. else:
  2427. logger.error(f"FFmpeg达到最大重启次数: {device_code} | 已放弃重启")
  2428. # 检查是否所有进程都已放弃
  2429. running_count = sum(1 for p in self.ffmpeg_processes if p.is_running())
  2430. all_abandoned = all(restart_counts.get(id(p), 0) >= max_restart_attempts
  2431. for p in self.ffmpeg_processes if not p.is_running())
  2432. if running_count == 0 and all_abandoned:
  2433. print("\n错误: 所有FFmpeg进程均已停止且无法重启")
  2434. break
  2435. # 每天0点执行清理(整个0点小时内都尝试,避免sleep跨过错过)
  2436. current_hour = datetime.now().hour
  2437. current_date = datetime.now().strftime("%Y%m%d")
  2438. if not hasattr(self, '_last_cleanup_date'):
  2439. self._last_cleanup_date = ""
  2440. # 在0点小时内且今天还没清理过时执行
  2441. if 0 <= current_hour < 1 and self._last_cleanup_date != current_date:
  2442. logger.info("执行定期文件清理...")
  2443. for monitor in self.monitors:
  2444. monitor._cleanup_old_files()
  2445. self._last_cleanup_date = current_date
  2446. # 定期打印RTSP状态(每分钟一次)
  2447. if not hasattr(self, '_last_status_log'):
  2448. self._last_status_log = datetime.now()
  2449. if (datetime.now() - self._last_status_log).total_seconds() >= 60:
  2450. running = sum(1 for p in self.ffmpeg_processes if p.is_running())
  2451. total = len(self.ffmpeg_processes)
  2452. logger.info(f"RTSP状态: {running}/{total} 个FFmpeg进程运行中")
  2453. logger.info("─" * 60)
  2454. self._last_status_log = datetime.now()
  2455. time.sleep(10)
  2456. except KeyboardInterrupt:
  2457. print("\n\n收到停止信号,正在关闭系统...")
  2458. finally:
  2459. self.stop()
  2460. def stop(self):
  2461. """
  2462. 停止监控系统
  2463. """
  2464. print("\n正在停止监控系统...")
  2465. print("停止监控线程...")
  2466. for monitor in self.monitors:
  2467. monitor.stop()
  2468. print("停止FFmpeg进程...")
  2469. for ffmpeg in self.ffmpeg_processes:
  2470. ffmpeg.stop()
  2471. print("系统已完全停止")
  2472. def _signal_handler(self, signum, frame):
  2473. """
  2474. 信号处理函数
  2475. """
  2476. print(f"\n\n收到信号 {signum},正在关闭系统...")
  2477. self.stop()
  2478. sys.exit(0)
  2479. def _start_config_api_server(config_manager, multi_predictor=None, port=18080):
  2480. # 在后台线程中启动 FastAPI 配置管理 API
  2481. try:
  2482. import uvicorn
  2483. init_config_api(config_manager, multi_predictor)
  2484. logger.info(f"启动配置管理 API: http://0.0.0.0:{port}")
  2485. uvicorn.run(config_app, host="0.0.0.0", port=port, log_level="warning")
  2486. except ImportError:
  2487. logger.warning("uvicorn 未安装,配置管理 API 无法启动。请安装: pip install uvicorn")
  2488. except Exception as e:
  2489. logger.error(f"配置管理 API 启动失败: {e}")
  2490. def main():
  2491. """
  2492. 主函数
  2493. """
  2494. # 检测 DB 是否存在
  2495. db_path = get_db_path(Path(__file__).parent / "config")
  2496. if not db_path.exists():
  2497. print(f"错误: 配置数据库不存在: {db_path}")
  2498. print(f"\n请先运行迁移脚本: python tool/migrate_yaml_to_db.py")
  2499. sys.exit(1)
  2500. try:
  2501. system = PickupMonitoringSystem()
  2502. # 后台线程启动配置管理 API
  2503. api_port = 18080
  2504. api_thread = threading.Thread(
  2505. target=_start_config_api_server,
  2506. args=(system.config_manager, system.multi_predictor, api_port),
  2507. daemon=True,
  2508. name="config-api"
  2509. )
  2510. api_thread.start()
  2511. system.start()
  2512. except FileNotFoundError as e:
  2513. print(f"\n错误: {e}")
  2514. print("\n请确保:")
  2515. print("1. 已完成训练并计算阈值")
  2516. print("2. 已复制必要的模型文件到 models/ 目录")
  2517. sys.exit(1)
  2518. except Exception as e:
  2519. print(f"\n严重错误: {e}")
  2520. import traceback
  2521. traceback.print_exc()
  2522. sys.exit(1)
  2523. if __name__ == "__main__":
  2524. main()