run_pickup_monitor_old.py 103 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. run_pickup_monitor.py
  5. ---------------------
  6. 拾音器异响检测主程序
  7. 功能说明:
  8. 该程序用于音频采集设备(拾音器)的实时异常检测。
  9. 与摄像头版本不同,本版本:
  10. 1. 没有视频/图片采集和上报
  11. 2. 上报时包含音频频谱图分析数据
  12. 3. 支持进水流量PLC数据读取
  13. 4. 每1分钟计算一次平均重建误差并上报
  14. 上报数据结构:
  15. {
  16. "message": {
  17. "channelInfo": {"name": "设备信息"},
  18. "requestTime": 时间戳,
  19. "classification": {
  20. "level_one": 2, // 音频检测大类
  21. "level_two": 6 // 异常类型小类: 6=未分类, 7=轴承, 8=气蚀, 9=松动/共振, 10=叶轮, 11=阀件
  22. },
  23. "skillInfo": {"name": "异响检测"},
  24. "sound_detection": {
  25. "abnormalwav": "", // base64编码的音频
  26. "status": 0/1, // 0=异常, 1=正常
  27. "condition": {
  28. "running_status": "运行中/停机中", // 启停状态
  29. "inlet_flow": 0.0 // 进水流量
  30. },
  31. "score": {
  32. "abnormal_score": 0.0, // 当前1分钟内8s平均重构误差
  33. "score_threshold": 0.0 // 该设备的异常阀值
  34. },
  35. "frequency": {
  36. "this_frequency": [], // 当前1分钟的频谱图
  37. "normal_frequency_middle": [], // 过去10分钟的频谱图平均
  38. "normal_frequency_upper": [], // 上限(暂为空)
  39. "normal_frequency_lower": [] // 下限(暂为空)
  40. }
  41. }
  42. }
  43. }
  44. 使用方法:
  45. python run_pickup_monitor.py
  46. 配置文件:
  47. config/rtsp_config.yaml
  48. """
  49. import subprocess
  50. import time
  51. import re
  52. import signal
  53. import sys
  54. import threading
  55. import logging
  56. import base64
  57. from concurrent.futures import ThreadPoolExecutor
  58. from pathlib import Path
  59. from datetime import datetime, timedelta
  60. from collections import defaultdict
  61. # 用于计算FFT
  62. import numpy as np
  63. try:
  64. import librosa
  65. except ImportError:
  66. print("错误:缺少librosa库,请安装: pip install librosa")
  67. sys.exit(1)
  68. try:
  69. import requests
  70. except ImportError:
  71. print("错误:缺少requests库,请安装: pip install requests")
  72. sys.exit(1)
  73. # 导入预测器模块
  74. from predictor import MultiModelPredictor, CFG
  75. # 导入配置管理模块(SQLite)
  76. from config.config_manager import ConfigManager
  77. from config.config_api import app as config_app, init_config_api
  78. from config.db_models import get_db_path
  79. # 导入泵状态监控模块(用于检测启停过渡期)
  80. try:
  81. from core.pump_state_monitor import PumpStateMonitor
  82. PUMP_STATE_MONITOR_AVAILABLE = True
  83. except ImportError:
  84. PUMP_STATE_MONITOR_AVAILABLE = False
  85. # 导入人体检测读取模块(用于抑制有人时的误报)
  86. try:
  87. from core.human_detection_reader import HumanDetectionReader
  88. HUMAN_DETECTION_AVAILABLE = True
  89. except ImportError:
  90. HUMAN_DETECTION_AVAILABLE = False
  91. # 导入报警聚合器(跨设备聚合抑制 + 分类型冷却)
  92. try:
  93. from core.alert_aggregator import AlertAggregator
  94. ALERT_AGGREGATOR_AVAILABLE = True
  95. except ImportError:
  96. ALERT_AGGREGATOR_AVAILABLE = False
  97. # ========================================
  98. # 配置日志系统
  99. # ========================================
  100. def setup_logging():
  101. # 配置日志系统(RotatingFileHandler -> system.log)
  102. from logging.handlers import RotatingFileHandler
  103. # 如果根 logger 已经被上层调用者配置过,则直接复用
  104. root = logging.getLogger()
  105. if root.handlers:
  106. return logging.getLogger('PickupMonitor')
  107. # 日志配置
  108. log_dir = Path(__file__).parent / "logs"
  109. log_dir.mkdir(parents=True, exist_ok=True)
  110. log_file = log_dir / "system.log"
  111. formatter = logging.Formatter(
  112. '%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s',
  113. datefmt='%Y-%m-%d %H:%M:%S'
  114. )
  115. # 按文件大小轮转,最多保留 2 个备份(共 30MB)
  116. file_handler = RotatingFileHandler(
  117. log_file,
  118. maxBytes=10 * 1024 * 1024,
  119. backupCount=2,
  120. encoding='utf-8'
  121. )
  122. file_handler.setFormatter(formatter)
  123. # 控制台输出(前台运行时可见,后台运行时 stdout 已被丢弃不影响)
  124. console_handler = logging.StreamHandler(sys.stdout)
  125. console_handler.setFormatter(formatter)
  126. logging.basicConfig(
  127. level=logging.INFO,
  128. handlers=[file_handler, console_handler]
  129. )
  130. return logging.getLogger('PickupMonitor')
  131. # 初始化日志系统
  132. logger = setup_logging()
  133. # 导入能量基线模块
  134. try:
  135. from core.energy_baseline import EnergyBaseline
  136. ENERGY_BASELINE_AVAILABLE = True
  137. except ImportError:
  138. ENERGY_BASELINE_AVAILABLE = False
  139. logger.warning("能量基线模块未找到,泵状态检测功能禁用")
  140. class RTSPStreamConfig:
  141. """
  142. RTSP流配置类
  143. 封装单个RTSP流的配置信息,包含拾音器特有字段
  144. """
  145. def __init__(self, plant_name, rtsp_url, channel,
  146. camera_name, device_code, pump_name,
  147. flow_plc, project_id):
  148. """
  149. 初始化RTSP流配置
  150. 参数:
  151. plant_name: 区域名称(泵房-反渗透高压泵等)
  152. rtsp_url: RTSP流URL
  153. channel: 通道号
  154. camera_name: 设备名称
  155. device_code: 设备编号(如1#-1)
  156. pump_name: 泵名称(A/B/C/D),用于匹配流量PLC
  157. flow_plc: 流量PLC地址映射
  158. """
  159. self.plant_name = plant_name
  160. self.rtsp_url = rtsp_url
  161. self.channel = channel
  162. self.pump_id = f"ch{channel}"
  163. self.camera_name = camera_name or f"ch{channel}"
  164. self.device_code = device_code
  165. self.pump_name = pump_name
  166. self.flow_plc = flow_plc or {}
  167. self.project_id = project_id
  168. def get_flow_plc_address(self):
  169. """
  170. 获取该设备对应的进水流量PLC地址
  171. 返回:
  172. PLC地址字符串,不存在则返回空字符串
  173. """
  174. if self.pump_name and self.flow_plc:
  175. return self.flow_plc.get(self.pump_name, "")
  176. return ""
  177. def __repr__(self):
  178. return f"RTSPStreamConfig(plant='{self.plant_name}', camera='{self.camera_name}', pump='{self.pump_id}')"
  179. class FFmpegProcess:
  180. """
  181. FFmpeg进程管理类
  182. 负责启动和管理单个RTSP流的FFmpeg进程。
  183. 进程将RTSP流转换为固定时长的WAV音频文件。
  184. 文件名格式: {project_id}_{device_code}_{时间戳}.wav
  185. """
  186. def __init__(self, stream_config, output_dir, config=None):
  187. """
  188. 初始化FFmpeg进程管理器
  189. 参数:
  190. stream_config: RTSP流配置
  191. output_dir: 音频输出目录
  192. config: 全局配置字典
  193. """
  194. self.config_dict = config or {}
  195. self.stream_config = stream_config
  196. self.output_dir = output_dir
  197. self.process = None
  198. # 从配置中读取文件时长,默认8秒
  199. audio_cfg = self.config_dict.get('audio', {})
  200. self.file_duration = audio_cfg.get('file_duration', 8)
  201. # 获取project_id(从 stream_config 中读取)
  202. self.project_id = stream_config.project_id
  203. def start(self):
  204. """
  205. 启动FFmpeg进程
  206. 返回:
  207. bool: 启动成功返回True,失败返回False
  208. """
  209. # 获取设备编号
  210. device_code = self.stream_config.device_code or self.stream_config.pump_id
  211. # 创建输出目录(每个设备独立目录)
  212. # 结构: data/{device_code}/current/
  213. current_dir = self.output_dir / device_code / "current"
  214. current_dir.mkdir(parents=True, exist_ok=True)
  215. # 存放人工核查确认为正常的异常音频,增训时全量参与训练
  216. verified_dir = self.output_dir / device_code / "verified_normal"
  217. verified_dir.mkdir(parents=True, exist_ok=True)
  218. # 构建输出文件名模板
  219. # 格式: {project_id}_{device_code}_{时间戳}.wav
  220. # 例如: 92_1#-1_20251218142000.wav
  221. output_pattern = str(current_dir / f"{self.project_id}_{device_code}_%Y%m%d%H%M%S.wav")
  222. # 构建FFmpeg命令
  223. # 添加内存限制参数,防止 RTSP 缓冲区无限增长导致 OOM
  224. cmd = [
  225. "ffmpeg",
  226. # RTSP 输入参数(内存限制)
  227. "-rtsp_transport", "tcp", # 使用TCP传输
  228. "-probesize", "1000000", # 限制探测大小为1MB(默认5MB)
  229. "-analyzeduration", "1000000", # 限制分析时长为1秒(默认5秒)
  230. "-max_delay", "500000", # 最大延迟500ms
  231. "-fflags", "nobuffer", # 禁用输入缓冲
  232. "-flags", "low_delay", # 低延迟模式
  233. "-i", self.stream_config.rtsp_url, # 输入RTSP流
  234. # 音频输出参数
  235. "-vn", # 不处理视频
  236. "-ac", "1", # 单声道
  237. "-ar", str(CFG.SR), # 采样率(16000Hz)
  238. "-f", "segment", # 分段模式
  239. "-segment_time", str(int(self.file_duration)), # 每段时长
  240. "-strftime", "1", # 启用时间格式化
  241. "-loglevel", "error", # 只输出错误日志
  242. "-y", # 覆盖已存在的文件
  243. output_pattern,
  244. ]
  245. try:
  246. # 启动FFmpeg进程
  247. self.process = subprocess.Popen(
  248. cmd,
  249. stdout=subprocess.PIPE,
  250. stderr=subprocess.PIPE
  251. )
  252. logger.info(f"FFmpeg已启动: {device_code} | {self.stream_config.camera_name} | "
  253. f"文件时长: {self.file_duration}秒 | PID={self.process.pid}")
  254. return True
  255. except FileNotFoundError:
  256. logger.error("FFmpeg错误: 未找到ffmpeg命令,请确保已安装FFmpeg")
  257. return False
  258. except Exception as e:
  259. logger.error(f"FFmpeg启动失败: {device_code} | 错误: {e}")
  260. return False
  261. def is_running(self):
  262. """
  263. 检查FFmpeg进程是否在运行
  264. 返回:
  265. bool: 进程运行中返回True,否则返回False
  266. """
  267. if self.process is None:
  268. return False
  269. return self.process.poll() is None
  270. def stop(self):
  271. """
  272. 停止FFmpeg进程
  273. """
  274. if self.process is not None and self.is_running():
  275. logger.info(f"FFmpeg停止: {self.stream_config.plant_name} | {self.stream_config.camera_name} | PID={self.process.pid}")
  276. self.process.terminate()
  277. try:
  278. self.process.wait(timeout=5)
  279. except subprocess.TimeoutExpired:
  280. logger.warning(f"FFmpeg强制终止: PID={self.process.pid}")
  281. self.process.kill()
  282. class PickupMonitor:
  283. """
  284. 拾音器监控线程类
  285. 监控音频目录,调用预测器检测异常,推送告警。
  286. 主要功能:
  287. 1. 不截取视频帧(纯音频)
  288. 2. 计算并上报频谱图数据(this_frequency + normal_frequency_middle)
  289. 3. 每分钟汇总上报一次
  290. 4. 使用SCADA API获取进水流量
  291. """
  292. def __init__(self, audio_dir, multi_predictor,
  293. stream_configs,
  294. check_interval=1.0, config=None,
  295. config_manager=None):
  296. """
  297. 初始化监控器
  298. Args:
  299. audio_dir: 音频根目录
  300. multi_predictor: 多模型预测器实例
  301. stream_configs: 所有RTSP流配置
  302. check_interval: 检查间隔(秒)
  303. config: 配置字典
  304. config_manager: ConfigManager 实例(用于热更新,为 None 时禁用热更新)
  305. """
  306. # 音频根目录(各设备目录在其下)
  307. self.audio_dir = audio_dir
  308. self.multi_predictor = multi_predictor
  309. self.predictor = None # 兼容性保留,已废弃
  310. self.stream_configs = stream_configs
  311. self.check_interval = check_interval
  312. self.config = config or {}
  313. # 热更新:持有 ConfigManager 引用,定期从 DB 刷新配置
  314. self._config_manager = config_manager
  315. self._last_config_reload = 0 # 上次配置刷新时间戳
  316. self._config_reload_interval = 30 # 配置刷新间隔(秒)
  317. # project_id
  318. self.project_id = self.config.get('platform', {}).get('project_id', 92)
  319. # 构建device_code到stream_config的映射
  320. self.device_map = {}
  321. for cfg in stream_configs:
  322. if cfg.device_code:
  323. self.device_map[cfg.device_code] = cfg
  324. # 已处理文件集合
  325. self.seen_files = set()
  326. # 每个设备的检测结果缓存(用于1分钟汇总)
  327. # key: device_code(如 "1#-1")
  328. self.device_cache = defaultdict(lambda: {
  329. "errors": [], # 每8秒的重建误差列表
  330. "last_upload": None, # 上次上报时间
  331. "audio_data": [], # 用于计算频谱图的音频数据
  332. "status": None # 最近的运行状态
  333. })
  334. # 频谱图历史缓存(用于计算normal_frequency_middle)
  335. # key: device_code, value: list of (timestamp, freq_db)
  336. freq_cfg = self.config.get('prediction', {}).get('frequency_history', {})
  337. self.freq_history_enabled = freq_cfg.get('enabled', True)
  338. self.freq_history_minutes = freq_cfg.get('history_minutes', 10)
  339. self.freq_history = defaultdict(list)
  340. # 上一次上报状态(True=异常,False=正常,None=初始)
  341. # 用于状态变更时去重,防止持续报警
  342. self.last_report_status = {}
  343. # 上报周期(秒)
  344. audio_cfg = self.config.get('audio', {})
  345. self.segment_duration = audio_cfg.get('segment_duration', 60)
  346. # 异常音频保存配置
  347. save_cfg = self.config.get('prediction', {}).get('save_anomaly_audio', {})
  348. self.save_anomaly_enabled = save_cfg.get('enabled', True)
  349. self.save_anomaly_dir = Path(__file__).parent / save_cfg.get('save_dir', 'data/anomaly_detected')
  350. if self.save_anomaly_enabled:
  351. self.save_anomaly_dir.mkdir(parents=True, exist_ok=True)
  352. # 异常推送配置
  353. push_cfg = self.config.get('push_notification', {})
  354. self.push_enabled = push_cfg.get('enabled', False)
  355. self.alert_enabled = push_cfg.get('alert_enabled', True) # 是否启用异常告警(false=暂时禁用异常上报)
  356. self.push_timeout = push_cfg.get('timeout', 30)
  357. self.push_retry_count = push_cfg.get('retry_count', 2)
  358. # 推送基地址列表(统一用 base_url/{project_id} 拼接,可无限扩展)
  359. raw_urls = push_cfg.get('push_base_urls', [])
  360. self.push_base_urls = [
  361. {"label": item.get("label", f"target-{i}"), "url": item.get("url", "").rstrip("/")}
  362. for i, item in enumerate(raw_urls)
  363. if item.get("url") # 跳过空URL
  364. ]
  365. # 推送失败记录文件路径
  366. failed_log_path = push_cfg.get('failed_push_log', 'data/push_failures.jsonl')
  367. self.failed_push_log = Path(__file__).parent / failed_log_path
  368. self.failed_push_log.parent.mkdir(parents=True, exist_ok=True)
  369. # 如果 alert_enabled 为 False,记录日志提醒
  370. if not self.alert_enabled:
  371. logger.warning("异常告警已禁用(alert_enabled=false),仅上报心跳状态")
  372. # ========================================
  373. # 报警聚合器(替代原有固定 cooldown_minutes)
  374. # ----------------------------------------
  375. # 功能1:跨设备聚合抑制 - 同一水厂多设备同时报警 -> 环境噪声,全部抑制
  376. # 功能2:分类型冷却 - 同类型24h,不同类型1h
  377. # ========================================
  378. agg_cfg = push_cfg.get('alert_aggregate', {})
  379. self.alert_aggregator = None
  380. if ALERT_AGGREGATOR_AVAILABLE:
  381. self.alert_aggregator = AlertAggregator(
  382. push_callback=self._push_detection_result,
  383. aggregate_enabled=agg_cfg.get('enabled', True),
  384. window_seconds=agg_cfg.get('window_seconds', 300),
  385. min_devices=agg_cfg.get('min_devices', 2),
  386. cooldown_same_type_hours=push_cfg.get('cooldown_same_type_hours', 24),
  387. cooldown_diff_type_hours=push_cfg.get('cooldown_diff_type_hours', 1)
  388. )
  389. else:
  390. logger.warning("报警聚合器模块未找到,使用默认报警逻辑")
  391. # 上次异常音频保存时间(用于保存冷却时间计算)
  392. self.last_anomaly_save_time = {}
  393. # 异常保存冷却时间(分钟),同一设备连续异常时,每N分钟只保存一次
  394. self.anomaly_save_cooldown_minutes = save_cfg.get('cooldown_minutes', 10)
  395. # 当前异常分类结果锁定(持续异常期间保持分类结果不变)
  396. # key: device_code, value: (anomaly_type_code, type_name)
  397. self.locked_anomaly_type = {}
  398. # 滑动窗口投票配置(5次中有3次异常才判定为异常)
  399. voting_cfg = self.config.get('prediction', {}).get('voting', {})
  400. self.voting_enabled = voting_cfg.get('enabled', True)
  401. self.voting_window_size = voting_cfg.get('window_size', 5) # 窗口大小
  402. self.voting_threshold = voting_cfg.get('threshold', 3) # 异常阈值(>=3次则判定异常)
  403. self.detection_history = {} # 每个设备的检测历史(True=异常)
  404. # 阈值容差区间配置(避免边界值反复跳变)
  405. # 误差在 threshold*(1-tolerance) ~ threshold*(1+tolerance) 范围内为灰区,维持上一状态
  406. self.tolerance_ratio = voting_cfg.get('tolerance_ratio', 0.05) # 默认5%容差
  407. self.last_single_anomaly = {} # 每个设备上一次的单周期判定结果
  408. # 阈值现在由 multi_predictor 管理,每个设备从其对应模型目录加载
  409. # 能量检测配置
  410. energy_cfg = self.config.get('prediction', {}).get('energy_detection', {})
  411. self.energy_detection_enabled = ENERGY_BASELINE_AVAILABLE and energy_cfg.get('enabled', True)
  412. self.skip_detection_when_stopped = energy_cfg.get('skip_when_stopped', True)
  413. # 能量基线检测器(每个设备一个)
  414. if self.energy_detection_enabled:
  415. self.energy_baselines = {}
  416. else:
  417. self.energy_baselines = None
  418. # SCADA API配置(用于获取泵状态和进水流量)
  419. scada_cfg = self.config.get('scada_api', {})
  420. self.scada_enabled = scada_cfg.get('enabled', False)
  421. self.scada_url = scada_cfg.get('base_url', '') # 历史数据接口(备用)
  422. self.scada_realtime_url = scada_cfg.get('realtime_url', '') # 实时数据接口(主用)
  423. self.scada_jwt = scada_cfg.get('jwt_token', '')
  424. self.scada_timeout = scada_cfg.get('timeout', 10)
  425. # 泵状态监控器(用于检测启停过渡期)
  426. self.pump_state_monitor = None
  427. self.pump_status_plc_configs = {} # {pump_name: [{point, name}, ...]}
  428. if PUMP_STATE_MONITOR_AVAILABLE and self.scada_enabled:
  429. # 初始化泵状态监控器
  430. # 获取第一个启用水厂的project_id
  431. project_id = 0
  432. for plant in self.config.get('plants', []):
  433. if plant.get('enabled', False):
  434. project_id = plant.get('project_id', 0)
  435. # 加载泵状态点位配置
  436. pump_status_plc = plant.get('pump_status_plc', {})
  437. self.pump_status_plc_configs = pump_status_plc
  438. break
  439. if project_id > 0:
  440. # 使用实时接口 URL 进行泵状态查询
  441. self.pump_state_monitor = PumpStateMonitor(
  442. scada_url=self.scada_realtime_url, # 使用实时数据接口
  443. scada_jwt=self.scada_jwt,
  444. project_id=project_id,
  445. timeout=self.scada_timeout,
  446. transition_window_minutes=15 # 启停后15分钟内视为过渡期
  447. )
  448. logger.info(f"泵状态监控器已启用 (project_id={project_id}, 过渡期窗口=15分钟, 使用实时接口)")
  449. # 线程控制
  450. self.running = False
  451. self.thread = None
  452. # 推送线程池(避免推送超时阻塞主监控循环)
  453. self._push_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="push")
  454. # 启动时间(用于跳过启动期间的状态变化日志)
  455. self.startup_time = None
  456. self.startup_warmup_seconds = 120 # 启动后120秒内不记录状态变化
  457. # ========================================
  458. # 异常上下文捕获配置
  459. # ========================================
  460. context_cfg = save_cfg.get('context_capture', {})
  461. self.context_pre_minutes = context_cfg.get('pre_minutes', 2)
  462. self.context_post_minutes = context_cfg.get('post_minutes', 2)
  463. # 音频文件历史缓存(用于捕获异常前的音频)
  464. # key: device_code, value: deque of (timestamp, file_path)
  465. from collections import deque
  466. # 计算需要保留的历史文件数量(按分钟计算,每分钟约1个文件)
  467. history_size = self.context_pre_minutes + 2 # 多保留2个作为缓冲
  468. self.audio_file_history = defaultdict(lambda: deque(maxlen=history_size))
  469. # 异常上下文捕获状态
  470. # key: device_code, value: dict
  471. self.anomaly_capture_state = {}
  472. logger.info(f"异常上下文捕获: 前{self.context_pre_minutes}分钟 + 后{self.context_post_minutes}分钟")
  473. # ========================================
  474. # 人体检测报警抑制配置
  475. # ========================================
  476. # 只要任意摄像头在冷却时间内检测到人,所有设备的异常报警都会被抑制
  477. human_cfg = self.config.get('human_detection', {})
  478. self.human_detection_enabled = human_cfg.get('enabled', False) and HUMAN_DETECTION_AVAILABLE
  479. self.human_reader = None
  480. if self.human_detection_enabled:
  481. db_path = human_cfg.get('db_path', '')
  482. cooldown_minutes = human_cfg.get('cooldown_minutes', 5)
  483. self.human_reader = HumanDetectionReader(
  484. db_path=db_path,
  485. cooldown_minutes=cooldown_minutes
  486. )
  487. logger.info(f"人体检测报警抑制已启用 (冷却时间={cooldown_minutes}分钟, 数据库={db_path})")
  488. # ========================================
  489. # 远程异响检测调度配置
  490. # ========================================
  491. # 通过定时 GET 请求远程接口,根据返回的 mode_type、model_list、时间窗口
  492. # 动态控制 alert_enabled 开关
  493. schedule_cfg = self.config.get('detection_schedule', {})
  494. self._detection_schedule_url = schedule_cfg.get('url', '')
  495. self._detection_schedule_interval = schedule_cfg.get('poll_interval', 60)
  496. self._detection_schedule_timeout = schedule_cfg.get('request_timeout', 10)
  497. # 独立计时器,与配置热刷新的 30 秒间隔解耦
  498. self._last_detection_schedule_check = 0
  499. # 远程调度控制的暂停标志(优先级最高,True 时跳过整个检测流程,包括模型推理)
  500. self.detection_paused = False
  501. if self._detection_schedule_url:
  502. logger.info(f"远程异响调度已启用 | URL={self._detection_schedule_url} | 间隔={self._detection_schedule_interval}秒")
  503. def start(self):
  504. """
  505. 启动监控线程
  506. """
  507. if self.running:
  508. return
  509. # 启动前清理current目录中的遗留文件
  510. self._cleanup_current_on_startup()
  511. self.running = True
  512. self.startup_time = datetime.now() # 记录启动时间
  513. self.thread = threading.Thread(target=self._monitor_loop, daemon=True)
  514. self.thread.start()
  515. # 打印监控的设备列表
  516. device_codes = list(self.device_map.keys())
  517. logger.info(f"监控线程已启动 | 目录: {self.audio_dir} | 监控设备: {device_codes}")
  518. def _cleanup_current_on_startup(self):
  519. """
  520. 启动时清理current目录中的遗留文件
  521. 删除上次运行遗留的文件,避免混入新采集数据影响检测准确性
  522. """
  523. cleaned_count = 0
  524. for device_code in self.device_map.keys():
  525. current_dir = self.audio_dir / device_code / "current"
  526. if not current_dir.exists():
  527. continue
  528. for wav_file in current_dir.glob("*.wav"):
  529. try:
  530. wav_file.unlink()
  531. cleaned_count += 1
  532. except Exception as e:
  533. logger.warning(f"清理遗留文件失败: {wav_file.name} | {e}")
  534. if cleaned_count > 0:
  535. logger.info(f"启动清理: 已删除current目录中 {cleaned_count} 个遗留文件")
  536. def stop(self):
  537. """
  538. 停止监控线程
  539. """
  540. if not self.running:
  541. return
  542. self.running = False
  543. if self.thread is not None:
  544. self.thread.join(timeout=5)
  545. # 关闭推送线程池,等待已提交的推送任务完成
  546. self._push_executor.shutdown(wait=True, cancel_futures=False)
  547. logger.info(f"监控线程已停止")
  548. def _reload_hot_config(self):
  549. # 从 DB 热加载可变配置项,无需重启服务
  550. # 只刷新运行时可安全变更的参数,不影响 FFmpeg 进程和流映射
  551. if self._config_manager is None:
  552. return
  553. now = time.time()
  554. # 未达到刷新间隔则跳过
  555. if now - self._last_config_reload < self._config_reload_interval:
  556. return
  557. self._last_config_reload = now
  558. try:
  559. # 从 DB 读取最新配置
  560. fresh = self._config_manager.get_full_config()
  561. self.config = fresh
  562. # 刷新推送配置
  563. push_cfg = fresh.get('push_notification', {})
  564. self.push_enabled = push_cfg.get('enabled', False)
  565. self.alert_enabled = push_cfg.get('alert_enabled', True)
  566. self.push_timeout = push_cfg.get('timeout', 30)
  567. self.push_retry_count = push_cfg.get('retry_count', 2)
  568. raw_urls = push_cfg.get('push_base_urls', [])
  569. self.push_base_urls = [
  570. {"label": item.get("label", f"target-{i}"), "url": item.get("url", "").rstrip("/")}
  571. for i, item in enumerate(raw_urls)
  572. if item.get("url")
  573. ]
  574. # 刷新投票配置
  575. voting_cfg = fresh.get('prediction', {}).get('voting', {})
  576. self.voting_enabled = voting_cfg.get('enabled', True)
  577. self.voting_window_size = voting_cfg.get('window_size', 5)
  578. self.voting_threshold = voting_cfg.get('threshold', 3)
  579. self.tolerance_ratio = voting_cfg.get('tolerance_ratio', 0.05)
  580. # 刷新能量检测配置
  581. energy_cfg = fresh.get('prediction', {}).get('energy_detection', {})
  582. self.skip_detection_when_stopped = energy_cfg.get('skip_when_stopped', True)
  583. # 刷新人体检测配置
  584. human_cfg = fresh.get('human_detection', {})
  585. new_human_enabled = human_cfg.get('enabled', False)
  586. if new_human_enabled != self.human_detection_enabled:
  587. logger.info(f"人体检测抑制配置变更: {self.human_detection_enabled} -> {new_human_enabled}")
  588. self.human_detection_enabled = new_human_enabled
  589. # 刷新远程异响调度配置
  590. schedule_cfg = fresh.get('detection_schedule', {})
  591. self._detection_schedule_url = schedule_cfg.get('url', '')
  592. self._detection_schedule_interval = schedule_cfg.get('poll_interval', 60)
  593. self._detection_schedule_timeout = schedule_cfg.get('request_timeout', 10)
  594. logger.debug("配置热刷新完成")
  595. except Exception as e:
  596. logger.error(f"配置热刷新失败: {e}")
  597. def _check_detection_schedule(self):
  598. # 远程异响检测调度检查
  599. # 独立于 DB 配置热刷新,有自己的 60 秒计时器
  600. # 条件全部满足才开启 alert_enabled:
  601. # 1. model_list 中包含 "异响"
  602. # 2. mode_type == 1
  603. # 3. start_time <= 当前时间 <= end_time
  604. if not self._detection_schedule_url:
  605. return
  606. now_ts = time.time()
  607. if now_ts - self._last_detection_schedule_check < self._detection_schedule_interval:
  608. return
  609. self._last_detection_schedule_check = now_ts
  610. try:
  611. resp = requests.get(
  612. self._detection_schedule_url,
  613. params={'project_id': self.project_id},
  614. timeout=self._detection_schedule_timeout
  615. )
  616. resp.raise_for_status()
  617. data = resp.json()
  618. model_list = data.get('model_list', [])
  619. mode_type = int(data.get('mode_type', 0))
  620. start_str = data.get('start_time', '')
  621. end_str = data.get('end_time', '')
  622. # model_list 中没有"异响",说明本项目不涉及异响检测,保持现状不做干预
  623. if '异响' not in model_list:
  624. return
  625. if not start_str or not end_str:
  626. logger.warning("远程调度时间窗口不完整,保持当前状态")
  627. return
  628. # 解析时间,支持多种常见格式
  629. now = datetime.now()
  630. start_dt = self._parse_schedule_time(start_str)
  631. end_dt = self._parse_schedule_time(end_str)
  632. if start_dt is None or end_dt is None:
  633. logger.warning(f"时间解析失败: start='{start_str}', end='{end_str}'")
  634. return
  635. # 核心判定:mode_type==1 且当前时间在窗口内 -> 检测启用
  636. # 反之 -> 暂停整个检测流程(不做模型推理,不上报)
  637. should_detect = (mode_type == 1) and (start_dt <= now <= end_dt)
  638. new_paused = not should_detect
  639. if new_paused != self.detection_paused:
  640. logger.info(
  641. f"异响检测调度变更: {'暂停' if new_paused else '恢复'} | "
  642. f"mode_type={mode_type} | "
  643. f"窗口=[{start_str}, {end_str}] | 当前={now.strftime('%Y-%m-%d %H:%M:%S')}"
  644. )
  645. self.detection_paused = new_paused
  646. except Exception as e:
  647. # 请求失败保持当前状态不变
  648. logger.warning(f"远程异响调度请求失败,保持当前状态: {e}")
  649. @staticmethod
  650. def _parse_schedule_time(time_str):
  651. # 尝试多种格式解析时间字符串,全部失败返回 None
  652. for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M',
  653. '%Y-%m-%dT%H:%M:%S', '%Y-%m-%dT%H:%M'):
  654. try:
  655. return datetime.strptime(time_str, fmt)
  656. except ValueError:
  657. continue
  658. return None
  659. def _monitor_loop(self):
  660. """
  661. 监控循环(线程主函数)
  662. 持续检查各设备的音频目录,处理新生成的WAV文件。
  663. 每分钟汇总一次结果进行上报。
  664. """
  665. # 确保根目录存在
  666. self.audio_dir.mkdir(parents=True, exist_ok=True)
  667. while self.running:
  668. try:
  669. # 热更新:定期从 DB 刷新可变配置
  670. self._reload_hot_config()
  671. # 远程异响调度:每60秒检查一次远程接口,控制 detection_paused
  672. self._check_detection_schedule()
  673. # 远程调度暂停时,从源头跳过整个扫描+推理+上报流程
  674. # 只保留热更新和调度检查,连目录遍历都不做
  675. if self.detection_paused:
  676. time.sleep(self.check_interval)
  677. continue
  678. # 扫描所有设备目录下的current文件夹
  679. for device_code in self.device_map.keys():
  680. device_current_dir = self.audio_dir / device_code / "current"
  681. # 目录不存在则跳过
  682. if not device_current_dir.exists():
  683. continue
  684. for wav_file in device_current_dir.glob("*.wav"):
  685. # 跳过已处理的文件
  686. if wav_file in self.seen_files:
  687. continue
  688. # 检查文件是否完整
  689. try:
  690. stat_info = wav_file.stat()
  691. file_age = time.time() - stat_info.st_mtime
  692. file_size = stat_info.st_size
  693. # 文件还在写入中,下次再检查(不加入 seen_files)
  694. if file_age < 12.0:
  695. continue
  696. # 文件大小异常处理(60秒 × 16000Hz × 2字节 ≈ 1.9MB)
  697. if file_size < 500_000:
  698. if file_age > 20.0:
  699. # 确认是损坏文件,删除并标记
  700. try:
  701. wav_file.unlink()
  702. logger.debug(f"删除异常小文件: {wav_file.name} ({file_size / 1000:.1f}KB)")
  703. except Exception as e:
  704. logger.error(f"删除文件失败: {wav_file.name} | {e}")
  705. self.seen_files.add(wav_file)
  706. continue
  707. if file_size > 3_000_000:
  708. if file_age > 20.0:
  709. # 文件过大,直接归档到日期目录(不丢弃,不加 seen)
  710. logger.warning(f"文件过大,直接归档: {wav_file.name} ({file_size / 1000:.1f}KB)")
  711. self._move_audio_to_date_dir(wav_file)
  712. self.seen_files.add(wav_file)
  713. continue
  714. except Exception as e:
  715. logger.error(f"文件状态检查失败: {wav_file.name} | {e}")
  716. continue
  717. # 处理新文件
  718. self._process_new_file(wav_file)
  719. # 标记为已处理
  720. self.seen_files.add(wav_file)
  721. # 检查是否需要进行周期性上报
  722. self._check_periodic_upload()
  723. # 检查聚合窗口是否到期,到期则执行聚合判定并推送
  724. if self.alert_aggregator:
  725. self.alert_aggregator.check_and_flush()
  726. # 清理过大的已处理文件集合
  727. if len(self.seen_files) > 10000:
  728. recent_files = sorted(self.seen_files,
  729. key=lambda f: f.stat().st_mtime if f.exists() else 0)[-5000:]
  730. self.seen_files = set(recent_files)
  731. except Exception as e:
  732. logger.error(f"监控循环错误: {e}")
  733. # 等待下一次检查
  734. time.sleep(self.check_interval)
  735. def _process_new_file(self, wav_file):
  736. """
  737. 处理新的音频文件
  738. 文件名格式: {project_id}_{device_code}_{时间戳}.wav
  739. 例如: 92_1#-1_20251218142000.wav
  740. 流程:
  741. 1. 加载音频
  742. 2. 计算能量判断设备状态
  743. 3. 进行AE异常检测,记录重建误差
  744. 4. 保存音频数据用于计算频谱图
  745. """
  746. try:
  747. # 从文件名解析device_code
  748. # 格式: {project_id}_{device_code}_{时间戳}.wav
  749. try:
  750. parts = wav_file.stem.split('_')
  751. if len(parts) >= 3:
  752. device_code = parts[1] # 第二部分是device_code
  753. else:
  754. device_code = "unknown"
  755. except:
  756. device_code = "unknown"
  757. # ========================================
  758. # 泵启停状态检测(基于 PLC 查询)- 优先于模型检查
  759. # ----------------------------------------
  760. # 逻辑:通过 SCADA API 查询 PLC 点位判断泵是否运行
  761. # 作用:
  762. # 1. 冷启动模式下也能过滤停机时的音频(保证训练数据质量)
  763. # 2. 泵停机时跳过异常检测(避免无意义的检测)
  764. # 3. 记录设备状态用于后续上报
  765. # 依赖:pump_state_monitor + rtsp_config 中的 pump_status_plc 配置
  766. # ========================================
  767. device_status = "未知"
  768. pump_is_running = True # 默认认为运行中(PLC 查询失败时的保守策略)
  769. # 获取该设备对应的流配置
  770. stream_config = self.device_map.get(device_code)
  771. if self.pump_state_monitor and stream_config:
  772. # 根据设备的 pump_name 找到关联的 PLC 点位配置
  773. pump_name = stream_config.pump_name
  774. pump_configs = self.pump_status_plc_configs.get(pump_name, [])
  775. if pump_configs:
  776. # 遍历所有关联泵,只要有一个运行就认为设备在工作
  777. any_pump_running = False
  778. for pump_cfg in pump_configs:
  779. point = pump_cfg.get("point", "")
  780. name = pump_cfg.get("name", point)
  781. pump_id = point
  782. # 查询泵状态(带 60 秒缓存,不会频繁请求 SCADA)
  783. is_running, _ = self.pump_state_monitor.update_pump_state(pump_id, point, name)
  784. if is_running:
  785. any_pump_running = True
  786. pump_is_running = any_pump_running
  787. device_status = "开机" if pump_is_running else "停机"
  788. # 泵全部停机时跳过(可通过配置禁用此行为)
  789. # 冷启动和正常模式都适用,确保训练数据质量
  790. if self.skip_detection_when_stopped and not pump_is_running:
  791. logger.info(f"泵停机(PLC): {device_code} | 归档到过渡期目录(不用于训练)")
  792. self._move_audio_to_transition_dir(wav_file, "stopped")
  793. return
  794. # ========================================
  795. # 过渡期检测(泵启停后15分钟内)
  796. # ----------------------------------------
  797. # 目的:过滤启停过程中的不稳定音频
  798. # 确保训练数据只包含稳定运行期的音频
  799. # ========================================
  800. if self.skip_detection_when_stopped:
  801. pump_in_transition, transition_pump_names = \
  802. self.pump_state_monitor.check_pumps_transition(pump_configs)
  803. if pump_in_transition:
  804. logger.info(f"泵过渡期: {device_code} | 归档到过渡期目录(不用于训练) | "
  805. f"过渡期泵: {', '.join(transition_pump_names)}")
  806. self._move_audio_to_transition_dir(wav_file, "transition")
  807. return
  808. # 获取该设备的预测器(懒加载模型)
  809. device_predictor = self.multi_predictor.get_predictor(device_code)
  810. if device_predictor is None:
  811. logger.info(f"冷启动模式(设备 {device_code} 无模型): 归档 {wav_file.name}")
  812. self._move_audio_to_date_dir(wav_file)
  813. return
  814. # 加载音频
  815. try:
  816. y, sr = librosa.load(str(wav_file), sr=CFG.SR, mono=True)
  817. except Exception as e:
  818. logger.error(f"音频加载失败: {wav_file.name} | {e}")
  819. return
  820. # 记录状态到缓存(用于周期上报)
  821. # 注意:泵状态检测已在前面完成,这里只记录状态
  822. self.device_cache[device_code]["status"] = device_status
  823. # ========================================
  824. # 计算重建误差
  825. # ========================================
  826. error = self._compute_reconstruction_error(wav_file, device_predictor)
  827. if error is not None:
  828. self.device_cache[device_code]["errors"].append(error)
  829. # ========================================
  830. # 保存音频数据用于计算频谱图
  831. # ========================================
  832. self.device_cache[device_code]["audio_data"].append(y)
  833. # ========================================
  834. # 暂存文件路径,等待周期聚合判定后再归档
  835. # ========================================
  836. if "pending_files" not in self.device_cache[device_code]:
  837. self.device_cache[device_code]["pending_files"] = []
  838. self.device_cache[device_code]["pending_files"].append(wav_file)
  839. # ========================================
  840. # 记录到音频历史缓存(用于异常上下文捕获)
  841. # ========================================
  842. self.audio_file_history[device_code].append((datetime.now(), wav_file))
  843. # 初始化上次上报时间
  844. if self.device_cache[device_code]["last_upload"] is None:
  845. self.device_cache[device_code]["last_upload"] = datetime.now()
  846. # 获取阈值并判断结果
  847. threshold = self._get_threshold(device_code)
  848. # ========================================
  849. # 快速通道:连续多个文件误差极高时快速预警(暂时关闭)
  850. # 不走投票窗口,用于捕获突发性严重故障
  851. # ========================================
  852. # if error is not None and threshold is not None and threshold > 0:
  853. # # 维护快速通道缓冲区
  854. # if "fast_alert_buffer" not in self.device_cache[device_code]:
  855. # self.device_cache[device_code]["fast_alert_buffer"] = []
  856. #
  857. # fast_buf = self.device_cache[device_code]["fast_alert_buffer"]
  858. # # 连续性检查:误差超过 2x 阈值时记录,否则清空缓冲区
  859. # if error > threshold * 2.0:
  860. # fast_buf.append(error)
  861. # else:
  862. # fast_buf.clear()
  863. #
  864. # # 连续 3 个文件(~24秒)都超过 2x 阈值 → 触发快速预警
  865. # FAST_ALERT_CONSECUTIVE = 3
  866. # if len(fast_buf) >= FAST_ALERT_CONSECUTIVE:
  867. # # 检查是否已触发过(避免重复告警)
  868. # last_fast = self.device_cache[device_code].get("last_fast_alert_time")
  869. # now = datetime.now()
  870. # can_fast_alert = (last_fast is None or
  871. # (now - last_fast).total_seconds() > 300) # 5分钟冷却
  872. #
  873. # if can_fast_alert:
  874. # # 快速通道同样受抑制逻辑约束
  875. # suppress = False
  876. # # 泵过渡期抑制
  877. # if self.pump_state_monitor and stream_config:
  878. # pump_name = stream_config.pump_name
  879. # pump_configs = self.pump_status_plc_configs.get(pump_name, [])
  880. # if pump_configs:
  881. # in_transition, _ = self.pump_state_monitor.check_pumps_transition(pump_configs)
  882. # if in_transition:
  883. # suppress = True
  884. # # 人体检测抑制
  885. # if self.human_detection_enabled and self.human_reader:
  886. # if self.human_reader.is_in_cooldown():
  887. # suppress = True
  888. # # alert_enabled 开关
  889. # if not self.alert_enabled:
  890. # suppress = True
  891. #
  892. # if not suppress:
  893. # avg_fast = float(np.mean(fast_buf))
  894. # logger.warning(
  895. # f"[!!] 快速通道触发: {device_code} | "
  896. # f"连续{len(fast_buf)}个文件异常 | "
  897. # f"平均误差={avg_fast:.6f} 阈值={threshold:.6f}")
  898. # self.device_cache[device_code]["last_fast_alert_time"] = now
  899. # fast_buf.clear()
  900. # # 标记快速预警,在下次周期上报时一并处理
  901. # self.device_cache[device_code]["fast_alert_pending"] = True
  902. if error is not None and threshold is not None:
  903. is_anomaly = error > threshold
  904. result_tag = "!!" if is_anomaly else "OK"
  905. logger.info(f"[{result_tag}] {device_code} | {wav_file.name} | "
  906. f"误差={error:.6f} 阈值={threshold:.6f}")
  907. elif error is not None:
  908. logger.debug(f"文件预测: {wav_file.name} | 误差={error:.6f} | 阀值=未设置")
  909. else:
  910. logger.warning(f"预测跳过: {wav_file.name} | 误差计算失败")
  911. except Exception as e:
  912. logger.error(f"处理文件失败: {wav_file.name} | 错误: {e}")
  913. def _compute_reconstruction_error(self, wav_file, device_predictor):
  914. """
  915. 计算单个音频文件的重建误差(Min-Max 标准化)
  916. 使用8秒窗口、4秒步长切割音频,提取多个patches分别计算误差后取平均值。
  917. 参数:
  918. wav_file: 音频文件路径
  919. device_predictor: 设备预测器实例
  920. 返回:
  921. 重建误差值(所有patches的平均MSE),失败返回None
  922. """
  923. try:
  924. import torch
  925. from predictor.utils import align_to_target
  926. # Min-Max 标准化参数
  927. global_min = device_predictor.global_min
  928. global_max = device_predictor.global_max
  929. # 加载音频
  930. y, _ = librosa.load(str(wav_file), sr=CFG.SR, mono=True)
  931. win_samples = int(CFG.WIN_SEC * CFG.SR)
  932. hop_samples = int(CFG.HOP_SEC * CFG.SR)
  933. if len(y) < win_samples:
  934. logger.warning(f"音频太短,无法提取patches: {wav_file.name}")
  935. return None
  936. patches = []
  937. for start in range(0, len(y) - win_samples + 1, hop_samples):
  938. window = y[start:start + win_samples]
  939. S = librosa.feature.melspectrogram(
  940. y=window, sr=CFG.SR, n_fft=CFG.N_FFT,
  941. hop_length=CFG.HOP_LENGTH, n_mels=CFG.N_MELS, power=2.0
  942. )
  943. S_db = librosa.power_to_db(S, ref=np.max)
  944. if S_db.shape[1] < CFG.TARGET_FRAMES:
  945. S_db = np.pad(S_db, ((0, 0), (0, CFG.TARGET_FRAMES - S_db.shape[1])))
  946. else:
  947. S_db = S_db[:, :CFG.TARGET_FRAMES]
  948. # Min-Max 标准化
  949. S_norm = (S_db - global_min) / (global_max - global_min + 1e-6)
  950. patches.append(S_norm.astype(np.float32))
  951. if not patches:
  952. logger.warning(f"未能提取任何patches: {wav_file.name}")
  953. return None
  954. arr = np.stack(patches, 0)
  955. arr = np.expand_dims(arr, 1)
  956. tensor = torch.from_numpy(arr)
  957. torch_device = device_predictor.torch_device
  958. tensor = tensor.to(torch_device)
  959. with torch.no_grad():
  960. recon = device_predictor.model(tensor)
  961. recon = align_to_target(recon, tensor)
  962. mse_per_patch = torch.mean((recon - tensor) ** 2, dim=[1, 2, 3])
  963. mean_mse = torch.mean(mse_per_patch).item()
  964. logger.debug(f"重建误差: {wav_file.name} | patches={len(patches)} | MSE={mean_mse:.6f}")
  965. return mean_mse
  966. except Exception as e:
  967. logger.error(f"计算重建误差失败: {wav_file.name} | {e}")
  968. return None
  969. def _check_periodic_upload(self):
  970. """
  971. 检查是否需要进行周期性上报
  972. 每分钟汇总一次各设备的检测结果并上报
  973. """
  974. # 远程调度暂停时跳过上报,与 _monitor_loop 中的推理拦截保持一致
  975. if self.detection_paused:
  976. return
  977. now = datetime.now()
  978. for device_code, cache in self.device_cache.items():
  979. # 检查上报时间间隔
  980. last_upload = cache.get("last_upload")
  981. if last_upload is None:
  982. continue
  983. elapsed = (now - last_upload).total_seconds()
  984. # 达到上报周期
  985. if elapsed >= self.segment_duration:
  986. # 获取该设备的流配置
  987. stream_config = self.device_map.get(device_code)
  988. # 计算平均重建误差
  989. errors = cache.get("errors", [])
  990. avg_error = float(np.mean(errors)) if errors else 0.0
  991. # 获取阈值
  992. threshold = self._get_threshold(device_code)
  993. # 判断当前周期是否异常(带容差区间)
  994. # 容差区间:避免边界值反复跳变,但灰区内仍与阈值比较
  995. if threshold:
  996. upper_bound = threshold * (1 + self.tolerance_ratio) # 确定异常边界
  997. lower_bound = threshold * (1 - self.tolerance_ratio) # 确定正常边界
  998. if avg_error > upper_bound:
  999. # 超过上边界 -> 确定异常
  1000. is_current_anomaly = True
  1001. elif avg_error < lower_bound:
  1002. # 低于下边界 -> 确定正常
  1003. is_current_anomaly = False
  1004. else:
  1005. # 灰区 -> 与阈值比较(避免异常状态延长)
  1006. is_current_anomaly = avg_error > threshold
  1007. # 记录本次判定结果
  1008. self.last_single_anomaly[device_code] = is_current_anomaly
  1009. else:
  1010. is_current_anomaly = False
  1011. # ========================================
  1012. # 滑动窗口投票:5次中有3次异常才判定为异常
  1013. # ========================================
  1014. if device_code not in self.detection_history:
  1015. self.detection_history[device_code] = []
  1016. # 记录本次检测结果
  1017. self.detection_history[device_code].append(is_current_anomaly)
  1018. # 保持窗口大小
  1019. if len(self.detection_history[device_code]) > self.voting_window_size:
  1020. self.detection_history[device_code].pop(0)
  1021. # 投票判定最终异常状态
  1022. if self.voting_enabled and len(self.detection_history[device_code]) >= self.voting_window_size:
  1023. anomaly_count = sum(self.detection_history[device_code])
  1024. is_anomaly = anomaly_count >= self.voting_threshold
  1025. window_info = f"窗口[{anomaly_count}/{self.voting_window_size}]"
  1026. else:
  1027. is_anomaly = is_current_anomaly
  1028. window_info = "窗口未满"
  1029. # ========================================
  1030. # 状态变更检测
  1031. # trigger_alert: 仅在 正常(或初始) -> 异常 时为True
  1032. # 冷却逻辑已移至 AlertAggregator 内部处理
  1033. # ========================================
  1034. last_is_anomaly = self.last_report_status.get(device_code)
  1035. trigger_alert = False
  1036. if is_anomaly:
  1037. # 只有状态变化(正常->异常) 才触发报警流程
  1038. # 泵过渡期检查已在 _process_new_file 中完成,过渡期内的文件不会进入 pending
  1039. if last_is_anomaly is None or not last_is_anomaly:
  1040. if self.alert_enabled:
  1041. # 人体检测抑制:任意摄像头检测到人则不报警
  1042. if self.human_detection_enabled and self.human_reader:
  1043. if self.human_reader.is_in_cooldown():
  1044. trigger_alert = False
  1045. status_info = self.human_reader.get_status_info()
  1046. logger.info(f"人体检测抑制: {device_code} | {status_info},跳过报警")
  1047. else:
  1048. trigger_alert = True
  1049. else:
  1050. trigger_alert = True
  1051. else:
  1052. trigger_alert = False
  1053. logger.debug(f"异常告警已禁用,跳过告警: {device_code}")
  1054. # 获取运行状态
  1055. running_status = cache.get("status", "未知")
  1056. # 获取进水流量
  1057. inlet_flow = self._get_inlet_flow(stream_config) if stream_config else 0.0
  1058. # 计算本次频谱图
  1059. audio_data = cache.get("audio_data", [])
  1060. freq_db = self._compute_frequency_spectrum(audio_data)
  1061. # 保存到频谱图历史(只保存dB值)
  1062. if self.freq_history_enabled and freq_db:
  1063. self.freq_history[device_code].append((now, freq_db))
  1064. # 清理过期历史
  1065. cutoff = now - timedelta(minutes=self.freq_history_minutes)
  1066. self.freq_history[device_code] = [
  1067. (t, d) for t, d in self.freq_history[device_code]
  1068. if t > cutoff
  1069. ]
  1070. # 计算历史频谱图平均值(normal_frequency_middle)
  1071. freq_middle_db = self._compute_frequency_middle(device_code)
  1072. # ========================================
  1073. # 异常分类:只在状态从正常变为异常时进行分类
  1074. # 持续异常期间沿用上次分类结果,保持一致性
  1075. # ========================================
  1076. anomaly_type_code = 6 # 默认:未分类异常
  1077. type_name = "未分类异常"
  1078. if is_anomaly and audio_data:
  1079. # 检查是否是新的异常(从正常变为异常)
  1080. is_new_anomaly = (last_is_anomaly is None or not last_is_anomaly)
  1081. if is_new_anomaly:
  1082. # 新异常:进行分类并锁定结果
  1083. try:
  1084. from core.anomaly_classifier import classify_anomaly
  1085. if len(audio_data) > 0:
  1086. y = audio_data[-1] if isinstance(audio_data[-1], np.ndarray) else np.array(audio_data[-1])
  1087. anomaly_type_code, type_name, confidence = classify_anomaly(y, sr=16000)
  1088. # 锁定分类结果
  1089. self.locked_anomaly_type[device_code] = (anomaly_type_code, type_name)
  1090. logger.info(f"异常分类(新异常): {type_name} (code={anomaly_type_code}, 置信度={confidence:.2f})")
  1091. except Exception as e:
  1092. logger.warning(f"异常分类失败: {e}")
  1093. else:
  1094. # 持续异常:沿用锁定的分类结果
  1095. if device_code in self.locked_anomaly_type:
  1096. anomaly_type_code, type_name = self.locked_anomaly_type[device_code]
  1097. logger.debug(f"异常分类(沿用): {type_name} (code={anomaly_type_code})")
  1098. else:
  1099. # 状态正常时清除锁定的分类结果
  1100. if device_code in self.locked_anomaly_type:
  1101. del self.locked_anomaly_type[device_code]
  1102. # 上报逻辑
  1103. if self.push_enabled and stream_config:
  1104. # 预读异常音频base64:在文件被归档/清空之前立即读取
  1105. # 解决竞态问题:异步推送或聚合器延迟推送时文件可能已被移走
  1106. pre_read_wav_b64 = ""
  1107. if trigger_alert:
  1108. try:
  1109. current_pending = cache.get("pending_files", [])
  1110. if current_pending and current_pending[0].exists():
  1111. with open(current_pending[0], "rb") as f:
  1112. pre_read_wav_b64 = base64.b64encode(f.read()).decode('utf-8')
  1113. logger.debug(f"预读异常音频成功: {current_pending[0].name} | size={len(pre_read_wav_b64)}")
  1114. except Exception as e:
  1115. logger.warning(f"预读异常音频失败: {e}")
  1116. if trigger_alert and self.alert_aggregator:
  1117. # 报警走聚合器:跨设备聚合判定 + 分类型冷却
  1118. # 聚合器会在窗口到期后决定是否真正推送
  1119. self.alert_aggregator.submit_alert(
  1120. plant_name=stream_config.plant_name,
  1121. device_code=device_code,
  1122. anomaly_type_code=anomaly_type_code,
  1123. push_kwargs=dict(
  1124. stream_config=stream_config,
  1125. device_code=device_code,
  1126. is_anomaly=is_anomaly,
  1127. trigger_alert=True,
  1128. abnormal_score=avg_error,
  1129. score_threshold=threshold,
  1130. running_status=running_status,
  1131. inlet_flow=inlet_flow,
  1132. freq_db=freq_db,
  1133. freq_middle_db=freq_middle_db,
  1134. anomaly_type_code=anomaly_type_code,
  1135. abnormal_wav_b64=pre_read_wav_b64
  1136. )
  1137. )
  1138. else:
  1139. # 非报警(心跳)或聚合器不可用 -> 提交到线程池异步推送
  1140. self._push_executor.submit(
  1141. self._push_detection_result,
  1142. stream_config=stream_config,
  1143. device_code=device_code,
  1144. is_anomaly=is_anomaly,
  1145. trigger_alert=trigger_alert,
  1146. abnormal_score=avg_error,
  1147. score_threshold=threshold,
  1148. running_status=running_status,
  1149. inlet_flow=inlet_flow,
  1150. freq_db=freq_db,
  1151. freq_middle_db=freq_middle_db,
  1152. anomaly_type_code=anomaly_type_code,
  1153. abnormal_wav_b64=pre_read_wav_b64
  1154. )
  1155. # 更新上一次状态
  1156. self.last_report_status[device_code] = is_anomaly
  1157. # 日志记录
  1158. thr_str = f"{threshold:.6f}" if threshold else "未设置"
  1159. # 报警去向说明
  1160. if trigger_alert and self.alert_aggregator:
  1161. alert_dest = "-> 聚合器"
  1162. elif trigger_alert:
  1163. alert_dest = "-> 直接推送"
  1164. else:
  1165. alert_dest = ""
  1166. # 使用设备名作为标识,增加视觉分隔
  1167. cam_label = ""
  1168. if stream_config:
  1169. cam_label = f"({stream_config.camera_name})"
  1170. result_emoji = "!!" if is_anomaly else "OK"
  1171. alert_str = f"报警=是 {alert_dest}" if trigger_alert else "报警=否"
  1172. logger.info(
  1173. f"[{result_emoji}] {device_code}{cam_label} | "
  1174. f"误差={avg_error:.6f} 阈值={thr_str} | "
  1175. f"{window_info} | {running_status} | "
  1176. f"{'异常' if is_anomaly else '正常'} | {alert_str}"
  1177. )
  1178. # ========================================
  1179. # 根据投票结果归档文件
  1180. # 用投票后的 is_anomaly 决定归档,避免单次波动误归
  1181. # ========================================
  1182. pending_files = cache.get("pending_files", [])
  1183. # 检查是否是新的异常(从正常变为异常)
  1184. is_new_anomaly = is_anomaly and (last_is_anomaly is None or not last_is_anomaly)
  1185. # 更新异常上下文捕获状态机
  1186. self._update_anomaly_capture_state(device_code, is_anomaly, is_new_anomaly,
  1187. avg_error, threshold, now, pending_files)
  1188. if pending_files:
  1189. if is_anomaly:
  1190. # 异常文件由上下文捕获状态机统一管理
  1191. # 状态机会在收集完前+后文件后一次性保存到 anomaly_detected
  1192. pass
  1193. else:
  1194. # 正常 -> 移到日期目录归档
  1195. for f in pending_files:
  1196. self._move_audio_to_date_dir(f)
  1197. # 状态恢复正常时,清除保存冷却时间
  1198. if device_code in self.last_anomaly_save_time:
  1199. del self.last_anomaly_save_time[device_code]
  1200. # 重置缓存
  1201. cache["errors"] = []
  1202. cache["audio_data"] = []
  1203. cache["pending_files"] = []
  1204. cache["last_upload"] = now
  1205. logger.info("─" * 60)
  1206. def _update_anomaly_capture_state(self, device_code, is_anomaly,
  1207. is_new_anomaly, avg_error,
  1208. threshold, now, pending_files):
  1209. """
  1210. 异常上下文捕获状态机
  1211. 状态流转:
  1212. 1. 无状态 + 新异常 -> 触发捕获,回溯 audio_file_history 获取前置文件
  1213. 2. 已触发 + 异常持续 -> 持续收集异常文件
  1214. 3. 已触发 + 异常结束 -> 开始收集后续文件(post阶段)
  1215. 4. post阶段 + 时间到 -> 保存所有文件到 anomaly_detected
  1216. 所有异常文件(从触发到结束)统一保存到 anomaly_detected 目录,
  1217. 包含 metadata.json 记录误差/阈值等信息。
  1218. """
  1219. state = self.anomaly_capture_state.get(device_code)
  1220. if is_new_anomaly and state is None:
  1221. # 新异常触发:回溯获取前置文件
  1222. anomaly_cutoff = now - timedelta(minutes=1)
  1223. pre_cutoff = now - timedelta(minutes=self.context_pre_minutes + 1)
  1224. pre_files = []
  1225. anomaly_files = []
  1226. for ts, fpath in self.audio_file_history[device_code]:
  1227. if not fpath.exists():
  1228. continue
  1229. if ts >= anomaly_cutoff:
  1230. anomaly_files.append(fpath)
  1231. elif ts >= pre_cutoff:
  1232. pre_files.append(fpath)
  1233. # 兜底:如果回溯没找到,把当前 pending 加入
  1234. if not anomaly_files and pending_files:
  1235. anomaly_files = [f for f in pending_files if f.exists()]
  1236. self.anomaly_capture_state[device_code] = {
  1237. "trigger_time": now,
  1238. "avg_error": avg_error,
  1239. "threshold": threshold,
  1240. "pre_files": pre_files,
  1241. "anomaly_files": anomaly_files,
  1242. "post_files": [],
  1243. "anomaly_ended": False,
  1244. "post_start_time": None
  1245. }
  1246. logger.info(f"异常上下文捕获已触发: {device_code} | "
  1247. f"前置={len(pre_files)}个 | 异常={len(anomaly_files)}个")
  1248. elif state is not None:
  1249. if is_anomaly and not state["anomaly_ended"]:
  1250. # 异常持续中:继续收集异常文件
  1251. for f in pending_files:
  1252. if f.exists():
  1253. state["anomaly_files"].append(f)
  1254. else:
  1255. # 异常结束(或之前已结束,在收集后续文件)
  1256. if not state["anomaly_ended"]:
  1257. state["anomaly_ended"] = True
  1258. state["post_start_time"] = now
  1259. logger.info(f"异常结束,开始收集后续文件: {device_code} | "
  1260. f"异常文件共{len(state['anomaly_files'])}个 | "
  1261. f"等待{self.context_post_minutes}分钟")
  1262. # 收集后续文件
  1263. for f in pending_files:
  1264. if f.exists():
  1265. state["post_files"].append(f)
  1266. # 检查 post 阶段是否到时间
  1267. elapsed_post = (now - state["post_start_time"]).total_seconds() / 60
  1268. if elapsed_post >= self.context_post_minutes:
  1269. self._save_anomaly_context(device_code, state)
  1270. del self.anomaly_capture_state[device_code]
  1271. self.last_anomaly_save_time[device_code] = now
  1272. def _save_anomaly_context(self, device_code: str, state: dict):
  1273. """
  1274. 保存异常上下文文件到独立目录
  1275. 目录结构:
  1276. data/anomaly_detected/{device_code}/{异常文件名(不含扩展名)}/
  1277. ├── 92_1#-1_20260130140313.wav (保持原文件名)
  1278. ├── ...
  1279. └── metadata.json
  1280. metadata.json 字段说明:
  1281. - before_trigger: 触发前 1~(N+1) 分钟的文件(正常时期,用于对比)
  1282. - at_trigger: 触发时刻前 1 分钟内的文件(异常开始出现的时期)
  1283. - after_trigger: 触发后 N 分钟的文件(不管异常是否恢复)
  1284. 参数:
  1285. device_code: 设备编号
  1286. state: 捕获状态字典
  1287. """
  1288. import shutil
  1289. import json
  1290. try:
  1291. # 获取第一个异常文件名作为文件夹名
  1292. anomaly_files = state.get("anomaly_files", [])
  1293. if not anomaly_files:
  1294. logger.warning(f"无异常文件,跳过保存: {device_code}")
  1295. return
  1296. # 用第一个异常文件名(不含扩展名)作为文件夹名
  1297. first_anomaly = anomaly_files[0]
  1298. folder_name = first_anomaly.stem # 如 92_1#-1_20260130140313
  1299. save_dir = self.save_anomaly_dir / device_code / folder_name
  1300. save_dir.mkdir(parents=True, exist_ok=True)
  1301. # 收集所有文件名(使用新命名)
  1302. all_files = {"before_trigger": [], "at_trigger": [], "after_trigger": []}
  1303. # 移动前置文件(来自日期目录,移动后原位置不再保留)
  1304. for fpath in state.get("pre_files", []):
  1305. if fpath.exists():
  1306. dest = save_dir / fpath.name
  1307. shutil.move(str(fpath), str(dest)) # 移动而非复制,避免重复
  1308. all_files["before_trigger"].append(fpath.name)
  1309. # 移动触发时刻文件(保持原名)
  1310. for fpath in anomaly_files:
  1311. if fpath.exists():
  1312. dest = save_dir / fpath.name
  1313. shutil.move(str(fpath), str(dest))
  1314. all_files["at_trigger"].append(fpath.name)
  1315. # 移动后续文件(保持原名)
  1316. for fpath in state.get("post_files", []):
  1317. if fpath.exists():
  1318. dest = save_dir / fpath.name
  1319. shutil.move(str(fpath), str(dest))
  1320. all_files["after_trigger"].append(fpath.name)
  1321. # 生成精简的元数据文件
  1322. trigger_time = state.get("trigger_time")
  1323. metadata = {
  1324. "device_code": device_code,
  1325. "trigger_time": trigger_time.strftime("%Y-%m-%d %H:%M:%S") if trigger_time else None,
  1326. "avg_error": round(state.get("avg_error", 0.0), 6),
  1327. "threshold": round(state.get("threshold", 0.0), 6),
  1328. "files": all_files
  1329. }
  1330. metadata_path = save_dir / "metadata.json"
  1331. with open(metadata_path, 'w', encoding='utf-8') as f:
  1332. json.dump(metadata, f, ensure_ascii=False, indent=2)
  1333. total = sum(len(v) for v in all_files.values())
  1334. logger.warning(f"异常上下文已保存: {device_code}/{folder_name} | "
  1335. f"共{total}个文件 (前{len(all_files['before_trigger'])}+异常{len(all_files['at_trigger'])}+后{len(all_files['after_trigger'])})")
  1336. except Exception as e:
  1337. logger.error(f"保存异常上下文失败: {device_code} | {e}")
  1338. def _compute_frequency_middle(self, device_code):
  1339. """
  1340. 计算历史频谱图平均值(normal_frequency_middle)
  1341. 参数:
  1342. device_code: 设备编号
  1343. 返回:
  1344. dB值列表的平均值
  1345. """
  1346. history = self.freq_history.get(device_code, [])
  1347. if not history or len(history) < 2:
  1348. return []
  1349. try:
  1350. # 收集所有历史dB值(只保留长度一致的)
  1351. all_db = []
  1352. ref_len = len(history[0][1]) # 使用第一条记录的长度作为参考
  1353. for _, db in history:
  1354. if len(db) == ref_len:
  1355. all_db.append(db)
  1356. if not all_db:
  1357. return []
  1358. # 计算各频率点的平均dB
  1359. avg_db = np.mean(all_db, axis=0).tolist()
  1360. return avg_db
  1361. except Exception as e:
  1362. logger.error(f"计算频谱图历史平均失败: {e}")
  1363. return []
  1364. def _get_threshold(self, device_code):
  1365. """
  1366. 获取指定设备的阈值
  1367. 优先级:
  1368. 1. 从 multi_predictor 获取设备对应模型的阈值
  1369. 2. 使用配置文件中的默认阈值
  1370. 3. 返回0.0(不进行异常判定)
  1371. 参数:
  1372. device_code: 设备编号(如 "LT-1")
  1373. 返回:
  1374. 阈值,未找到返回默认值或0.0
  1375. """
  1376. # 方式1:从 multi_predictor 获取设备阈值
  1377. thr = self.multi_predictor.get_threshold(device_code)
  1378. if thr is not None:
  1379. logger.debug(f"阈值来源: {device_code} -> 设备模型阈值 = {thr:.6f}")
  1380. return thr
  1381. # 方式2:使用配置中的默认阈值
  1382. default_threshold = self.config.get('prediction', {}).get('default_threshold', 0.0)
  1383. if default_threshold > 0:
  1384. logger.debug(f"阈值来源: {device_code} -> 配置默认值 = {default_threshold:.6f}")
  1385. return default_threshold
  1386. # 首次找不到阈值时记录警告
  1387. if device_code not in getattr(self, '_threshold_warned', set()):
  1388. if not hasattr(self, '_threshold_warned'):
  1389. self._threshold_warned = set()
  1390. self._threshold_warned.add(device_code)
  1391. logger.warning(f"未找到阈值: {device_code},将跳过异常判定")
  1392. return 0.0
  1393. def _get_inlet_flow(self, stream_config: RTSPStreamConfig) -> float:
  1394. """
  1395. 获取进水流量(使用实时数据接口)
  1396. 使用 current-data 接口直接获取最新一条数据
  1397. 参数:
  1398. stream_config: 流配置
  1399. 返回:
  1400. 进水流量值,失败返回0.0
  1401. """
  1402. if not self.scada_enabled or not stream_config:
  1403. logger.debug(f"流量跳过: scada_enabled={self.scada_enabled}, stream_config={stream_config}")
  1404. return 0.0
  1405. # 获取PLC数据点位
  1406. plc_address = stream_config.get_flow_plc_address()
  1407. if not plc_address:
  1408. logger.debug(f"流量跳过(无PLC地址): {stream_config.device_code} | pump_name='{stream_config.pump_name}'")
  1409. return 0.0
  1410. # 使用水厂配置的 project_id
  1411. project_id = stream_config.project_id
  1412. try:
  1413. # 当前时间戳(毫秒)
  1414. now_ms = int(datetime.now().timestamp() * 1000)
  1415. # 请求参数
  1416. params = {"time": now_ms}
  1417. # 请求体:使用实时数据接口格式
  1418. request_body = [
  1419. {
  1420. "deviceId": "1",
  1421. "deviceItems": plc_address,
  1422. "deviceName": f"流量_{stream_config.pump_name}",
  1423. "project_id": project_id
  1424. }
  1425. ]
  1426. # 请求头
  1427. headers = {
  1428. "Content-Type": "application/json",
  1429. "JWT-TOKEN": self.scada_jwt
  1430. }
  1431. logger.debug(f"流量请求: {stream_config.device_code} | project_id={project_id} | plc={plc_address}")
  1432. # 发送 POST 请求到实时接口
  1433. response = requests.post(
  1434. self.scada_realtime_url,
  1435. params=params,
  1436. json=request_body,
  1437. headers=headers,
  1438. timeout=self.scada_timeout
  1439. )
  1440. if response.status_code == 200:
  1441. data = response.json()
  1442. if data.get("code") == 200:
  1443. if data.get("data"):
  1444. # 获取第一条数据(实时接口只返回最新一条)
  1445. latest = data["data"][0]
  1446. if "val" in latest:
  1447. flow = float(latest["val"])
  1448. logger.debug(f"流量获取成功: {stream_config.device_code} | 流量={flow}")
  1449. return flow
  1450. else:
  1451. logger.debug(f"流量数据无val字段: {stream_config.device_code}")
  1452. else:
  1453. # API正常返回但无数据
  1454. logger.debug(f"流量查询无数据: {stream_config.device_code}")
  1455. else:
  1456. logger.warning(f"流量API返回异常: {stream_config.device_code} | code={data.get('code')} | msg={data.get('msg')}")
  1457. else:
  1458. logger.warning(f"流量HTTP错误: {stream_config.device_code} | status={response.status_code}")
  1459. except Exception as e:
  1460. logger.warning(f"流量获取异常: {stream_config.device_code} | {e}")
  1461. return 0.0
  1462. def _compute_frequency_spectrum(self, audio_data):
  1463. """
  1464. 计算频谱图数据
  1465. 将1分钟内的多个8秒音频片段合并,计算整体FFT
  1466. 参数:
  1467. audio_data: 音频数据列表
  1468. 返回:
  1469. dB值列表(0-8000Hz均匀分布,共400个点)
  1470. """
  1471. if not audio_data:
  1472. return []
  1473. try:
  1474. # 合并所有音频片段
  1475. combined = np.concatenate(audio_data)
  1476. # 计算FFT
  1477. n = len(combined)
  1478. fft_result = np.fft.rfft(combined)
  1479. freqs = np.fft.rfftfreq(n, 1.0 / CFG.SR)
  1480. # 计算幅度(转换为dB)
  1481. magnitude = np.abs(fft_result)
  1482. # 避免log(0)
  1483. magnitude = np.maximum(magnitude, 1e-10)
  1484. db = 20 * np.log10(magnitude)
  1485. # 降采样到400个点(0-8000Hz均匀分布)
  1486. max_freq = 8000
  1487. num_points = 400
  1488. db_list = []
  1489. for i in range(num_points):
  1490. # 计算目标频率(0到8000Hz均匀分布)
  1491. target_freq = (i / (num_points - 1)) * max_freq
  1492. # 找到最接近的频率索引
  1493. idx = np.argmin(np.abs(freqs - target_freq))
  1494. if idx < len(freqs):
  1495. db_list.append(float(db[idx]))
  1496. return db_list
  1497. except Exception as e:
  1498. logger.error(f"计算频谱图失败: {e}")
  1499. return []
  1500. def _push_detection_result(self, stream_config, device_code,
  1501. is_anomaly, trigger_alert, abnormal_score, score_threshold,
  1502. running_status, inlet_flow,
  1503. freq_db, freq_middle_db=None,
  1504. anomaly_type_code=6,
  1505. abnormal_wav_b64=""):
  1506. """
  1507. 推送检测结果到远程服务器
  1508. 上报格式符合用户要求:
  1509. - 包含频谱图数据(this_frequency + normal_frequency_middle)
  1510. - 包含启停状态和进水流量
  1511. - sound_detection.status 和 abnormalwav 仅在 trigger_alert=True 时上报异常(0)和base64,否则为正常(1)和空
  1512. 参数:
  1513. stream_config: 流配置
  1514. device_code: 设备编号
  1515. is_anomaly: 实际检测是否异常
  1516. trigger_alert: 是否触发报警(仅在新异常产生时为True)
  1517. abnormal_score: 平均重建误差
  1518. score_threshold: 阈值
  1519. running_status: 启停状态
  1520. inlet_flow: 进水流量
  1521. freq_db: 本次频谱图dB值列表
  1522. freq_middle_db: 历史平均频谱图dB值列表
  1523. abnormal_wav_b64: 预读的异常音频base64编码(在文件归档前读取,避免竞态)
  1524. """
  1525. try:
  1526. import time as time_module
  1527. # 获取设备名称
  1528. camera_name = stream_config.camera_name
  1529. # 从 push_base_urls 列表构建推送目标(每个 base_url + /{project_id})
  1530. if not self.push_base_urls:
  1531. logger.warning(f"未配置push_base_urls: {device_code}")
  1532. return
  1533. # 获取该设备对应的 project_id,用于拼接最终推送URL
  1534. project_id = stream_config.project_id
  1535. # 构建推送消息
  1536. request_time = int(time_module.time() * 1000)
  1537. # 异常音频已通过参数 abnormal_wav_b64 传入(调用方在文件归档前预读)
  1538. # 如果调用方未提供,记录警告以便排查
  1539. if trigger_alert and not abnormal_wav_b64:
  1540. logger.warning(f"报警推送但无异常音频数据: {device_code}")
  1541. # 决定 sound_detection.status
  1542. # 只有在 trigger_alert=True 时才报 0 (异常)
  1543. # 其他情况(正常、持续异常)都报 1 (正常/空) -- 根据用户要求: "正常情况下 ... 是空 就行"
  1544. report_status = 0 if trigger_alert else 1
  1545. payload = {
  1546. "message": {
  1547. # 通道信息
  1548. "channelInfo": {"name": camera_name},
  1549. # 请求时间戳
  1550. "requestTime": request_time,
  1551. # 分类信息
  1552. "classification": {
  1553. "level_one": 2, # 音频检测大类
  1554. "level_two": anomaly_type_code # 异常类型小类(6=未分类, 7=轴承, 8=气蚀, 9=松动, 10=叶轮, 11=阀件)
  1555. },
  1556. # 技能信息
  1557. "skillInfo": {"name": "异响检测"},
  1558. # 声音检测数据
  1559. "sound_detection": {
  1560. # 异常音频
  1561. "abnormalwav": abnormal_wav_b64,
  1562. # 状态:0=异常(仅新异常), 1=正常(或持续异常)
  1563. "status": report_status,
  1564. # 设备状态信息
  1565. "condition": {
  1566. "running_status": "运行中" if running_status == "开机" else "停机中",
  1567. "inlet_flow": inlet_flow
  1568. },
  1569. # 得分信息
  1570. "score": {
  1571. "abnormal_score": abnormal_score, # 当前1分钟平均重构误差
  1572. "score_threshold": score_threshold # 该设备异常阀值
  1573. },
  1574. # 频谱图数据
  1575. "frequency": {
  1576. "this_frequency": freq_db, # 当前1分钟的频谱图
  1577. "normal_frequency_middle": freq_middle_db or [], # 过去10分钟的频谱图平均
  1578. }
  1579. }
  1580. }
  1581. }
  1582. # 遍历所有推送基地址,逐个拼接 project_id 发送(各目标互不影响)
  1583. push_targets = [
  1584. (f"{item['url']}/{project_id}", item['label'])
  1585. for item in self.push_base_urls
  1586. ]
  1587. # 逐个目标推送(各目标互不影响)
  1588. for target_url, target_label in push_targets:
  1589. self._send_to_target(
  1590. target_url, target_label, payload,
  1591. device_code, camera_name, trigger_alert, abnormal_score
  1592. )
  1593. except Exception as e:
  1594. logger.error(f"推送通知异常: {e}")
  1595. def _send_to_target(self, target_url, target_label, payload,
  1596. device_code, camera_name, trigger_alert, abnormal_score):
  1597. # 向单个推送目标发送数据(含重试逻辑)
  1598. # 各目标独立调用,某个目标失败不影响其他目标
  1599. import time as time_module
  1600. push_success = False
  1601. for attempt in range(self.push_retry_count + 1):
  1602. try:
  1603. response = requests.post(
  1604. target_url,
  1605. json=payload,
  1606. timeout=self.push_timeout,
  1607. headers={"Content-Type": "application/json"}
  1608. )
  1609. if response.status_code == 200:
  1610. alert_tag = "报警" if trigger_alert else "心跳"
  1611. logger.info(
  1612. f" [{alert_tag}][{target_label}] {device_code}({camera_name}) | "
  1613. f"误差={abnormal_score:.6f}"
  1614. )
  1615. push_success = True
  1616. break
  1617. else:
  1618. logger.warning(
  1619. f"推送失败[{target_label}]: {device_code} | URL={target_url} | "
  1620. f"状态码={response.status_code} | 内容={response.text[:100]}"
  1621. )
  1622. except requests.exceptions.Timeout:
  1623. logger.warning(f"推送超时[{target_label}]: {device_code} | URL={target_url} | 尝试 {attempt + 1}/{self.push_retry_count + 1}")
  1624. except requests.exceptions.RequestException as e:
  1625. logger.warning(f"推送异常[{target_label}]: {device_code} | URL={target_url} | {e}")
  1626. # 重试间隔
  1627. if attempt < self.push_retry_count:
  1628. time_module.sleep(1)
  1629. if not push_success:
  1630. logger.error(f"推送失败[{target_label}]: {device_code} | URL={target_url} | 已达最大重试次数")
  1631. def _move_audio_to_date_dir(self, wav_file):
  1632. """
  1633. 将音频移动到日期目录归档
  1634. 目录结构:
  1635. deploy_pickup/data/{device_code}/{日期}/{文件名}
  1636. 参数:
  1637. wav_file: 音频文件路径
  1638. """
  1639. try:
  1640. import shutil
  1641. # 从文件名提取device_code和日期
  1642. # 格式: 92_1#-1_20251218142000.wav
  1643. match = re.match(r'\d+_(.+)_(\d{8})\d{6}\.wav', wav_file.name)
  1644. if not match:
  1645. logger.warning(f"无法从文件名提取信息: {wav_file.name}")
  1646. return
  1647. device_code = match.group(1) # 如 1#-1
  1648. date_str = match.group(2) # YYYYMMDD
  1649. # 构建目标目录: data/{device_code}/{日期}/
  1650. date_dir = self.audio_dir / device_code / date_str
  1651. date_dir.mkdir(parents=True, exist_ok=True)
  1652. # 移动文件
  1653. dest_file = date_dir / wav_file.name
  1654. shutil.move(str(wav_file), str(dest_file))
  1655. logger.debug(f"音频已归档: {device_code}/{date_str}/{wav_file.name}")
  1656. except Exception as e:
  1657. logger.error(f"移动音频失败: {wav_file.name} | 错误: {e}")
  1658. def _move_audio_to_transition_dir(self, wav_file, reason):
  1659. """
  1660. 将泵停机/过渡期的音频移动到过渡期目录
  1661. 目录结构:
  1662. deploy_pickup/data/{device_code}/pump_transition/{文件名}
  1663. 这些音频不会被用于模型训练,但保留用于分析调试
  1664. 参数:
  1665. wav_file: 音频文件路径
  1666. reason: 原因标识(stopped=停机, transition=过渡期)
  1667. """
  1668. try:
  1669. import shutil
  1670. # 从文件名提取device_code
  1671. # 格式: 92_1#-1_20251218142000.wav
  1672. match = re.match(r'\d+_(.+)_\d{14}\.wav', wav_file.name)
  1673. if not match:
  1674. logger.warning(f"无法从文件名提取信息: {wav_file.name}")
  1675. return
  1676. device_code = match.group(1) # 如 1#-1
  1677. # 构建目标目录: data/{device_code}/pump_transition/
  1678. transition_dir = self.audio_dir / device_code / "pump_transition"
  1679. transition_dir.mkdir(parents=True, exist_ok=True)
  1680. # 移动文件
  1681. dest_file = transition_dir / wav_file.name
  1682. shutil.move(str(wav_file), str(dest_file))
  1683. logger.debug(f"过渡期音频已归档: {device_code}/pump_transition/{wav_file.name} ({reason})")
  1684. except Exception as e:
  1685. logger.error(f"移动过渡期音频失败: {wav_file.name} | 错误: {e}")
  1686. def _cleanup_old_files(self, days: int = 7):
  1687. """
  1688. 清理超过指定天数的正常音频文件
  1689. 清理规则:
  1690. - 只清理日期归档目录(data/{device_code}/{日期}/)
  1691. - 保留current目录和anomaly_detected目录
  1692. - 超过days天的文件删除
  1693. 参数:
  1694. days: 保留天数,默认7天
  1695. """
  1696. try:
  1697. import shutil
  1698. from datetime import datetime, timedelta
  1699. # 计算截止日期
  1700. cutoff_date = datetime.now() - timedelta(days=days)
  1701. cutoff_str = cutoff_date.strftime("%Y%m%d")
  1702. deleted_count = 0
  1703. # 遍历每个设备目录
  1704. for device_dir in self.audio_dir.iterdir():
  1705. if not device_dir.is_dir():
  1706. continue
  1707. # 检查是否是日期目录(跳过current和其他特殊目录)
  1708. for subdir in device_dir.iterdir():
  1709. if not subdir.is_dir():
  1710. continue
  1711. # 跳过current目录
  1712. if subdir.name == "current":
  1713. continue
  1714. # 检查是否为日期目录(YYYYMMDD格式)
  1715. if not re.match(r'^\d{8}$', subdir.name):
  1716. continue
  1717. # 如果日期早于截止日期,删除整个目录
  1718. if subdir.name < cutoff_str:
  1719. try:
  1720. shutil.rmtree(subdir)
  1721. deleted_count += 1
  1722. logger.debug(f"清理过期目录: {device_dir.name}/{subdir.name}")
  1723. except Exception as e:
  1724. logger.error(f"删除目录失败: {subdir} | {e}")
  1725. if deleted_count > 0:
  1726. logger.info(f"清理完成: 共删除{deleted_count}个过期目录")
  1727. except Exception as e:
  1728. logger.error(f"清理过期文件失败: {e}")
  1729. class PickupMonitoringSystem:
  1730. """
  1731. 拾音器监控系统
  1732. 管理FFmpeg进程和监控线程
  1733. """
  1734. def __init__(self, db_path=None, yaml_config=None):
  1735. """
  1736. 初始化监控系统
  1737. 参数:
  1738. db_path: SQLite 数据库路径(为 None 时使用默认路径 config/pickup_config.db)
  1739. yaml_config: 若不为 None,则直接使用该 dict 作为配置(跳过 DB)
  1740. """
  1741. self.db_path = db_path
  1742. self.config_manager = None
  1743. if yaml_config is not None:
  1744. # 直接使用传入的 YAML 配置字典
  1745. self.config = yaml_config
  1746. print(f"配置源: YAML (外部传入)")
  1747. else:
  1748. # 从 SQLite DB 加载
  1749. actual_db = Path(db_path) if db_path else get_db_path()
  1750. if not actual_db.exists():
  1751. raise FileNotFoundError(
  1752. f"\u914d\u7f6e\u6570\u636e\u5e93\u4e0d\u5b58\u5728: {actual_db}\n"
  1753. f"\u8bf7\u5148\u8fd0\u884c\u8fc1\u79fb\u811a\u672c: python tool/migrate_yaml_to_db.py"
  1754. )
  1755. self.config_manager = ConfigManager(str(actual_db))
  1756. print(f"配置源: SQLite ({actual_db})")
  1757. self.config = self._load_config()
  1758. # 冷启动模式标记
  1759. self.cold_start_mode = False
  1760. # 初始化多模型预测器(支持每个设备独立模型)
  1761. self.multi_predictor = MultiModelPredictor()
  1762. self.predictor = None # 兼容性保留,已废弃
  1763. # 从配置中注册所有设备的模型目录映射
  1764. print("\n正在初始化多模型预测器...")
  1765. for plant in self.config.get('plants', []):
  1766. if not plant.get('enabled', False):
  1767. continue
  1768. for stream in plant.get('rtsp_streams', []):
  1769. device_code = stream.get('device_code', '')
  1770. model_subdir = stream.get('model_subdir', device_code)
  1771. if device_code and model_subdir:
  1772. self.multi_predictor.register_device(device_code, model_subdir)
  1773. print(f" 注册设备: {device_code} -> models/{model_subdir}/")
  1774. print(f"已注册 {len(self.multi_predictor.registered_devices)} 个设备模型映射")
  1775. # 进程和监控器列表
  1776. self.ffmpeg_processes = []
  1777. self.monitors = []
  1778. # 信号处理
  1779. signal.signal(signal.SIGINT, self._signal_handler)
  1780. signal.signal(signal.SIGTERM, self._signal_handler)
  1781. def _load_config(self):
  1782. """
  1783. 从 SQLite DB 加载配置
  1784. 返回:
  1785. Dict: 配置字典
  1786. """
  1787. config = self.config_manager.get_full_config()
  1788. logger.info("配置已从 SQLite 加载")
  1789. return config
  1790. def _parse_rtsp_streams(self):
  1791. """
  1792. 解析配置文件中的RTSP流信息
  1793. 返回:
  1794. List[RTSPStreamConfig]: RTSP流配置列表
  1795. """
  1796. streams = []
  1797. plants = self.config.get('plants', [])
  1798. if not plants:
  1799. raise ValueError("配置文件中未找到水厂配置")
  1800. for plant in plants:
  1801. plant_name = plant.get('name')
  1802. if not plant_name:
  1803. print("警告: 跳过未命名的区域配置")
  1804. continue
  1805. # 检查是否启用该水厂(默认启用以兼容旧配置)
  1806. if not plant.get('enabled', True):
  1807. logger.debug(f"跳过禁用的水厂: {plant_name}")
  1808. continue
  1809. # 获取流量PLC配置
  1810. flow_plc = plant.get('flow_plc', {})
  1811. # 获取该水厂的project_id(每个plant有自己的project_id)
  1812. project_id = plant.get('project_id', 92)
  1813. logger.info(f"加载区域配置: {plant_name} | project_id={project_id}")
  1814. rtsp_streams = plant.get('rtsp_streams', [])
  1815. for stream in rtsp_streams:
  1816. url = stream.get('url')
  1817. channel = stream.get('channel')
  1818. camera_name = stream.get('name', '')
  1819. device_code = stream.get('device_code', '')
  1820. pump_name = stream.get('pump_name', '')
  1821. if not url or channel is None:
  1822. print(f"警告: 跳过不完整的RTSP流配置 (区域: {plant_name})")
  1823. continue
  1824. streams.append(RTSPStreamConfig(
  1825. plant_name=plant_name,
  1826. rtsp_url=url,
  1827. channel=channel,
  1828. camera_name=camera_name,
  1829. device_code=device_code,
  1830. pump_name=pump_name,
  1831. flow_plc=flow_plc,
  1832. project_id=project_id
  1833. ))
  1834. return streams
  1835. def start(self):
  1836. """
  1837. 启动监控系统
  1838. """
  1839. print("=" * 70)
  1840. print("拾音器异响检测系统")
  1841. print("=" * 70)
  1842. # 解析流配置
  1843. streams = self._parse_rtsp_streams()
  1844. print(f"\n共配置 {len(streams)} 个拾音设备:")
  1845. for stream in streams:
  1846. print(f" - {stream.device_code} | {stream.camera_name}")
  1847. # 启动FFmpeg进程
  1848. print("\n启动FFmpeg进程...")
  1849. for stream in streams:
  1850. ffmpeg = FFmpegProcess(stream, CFG.AUDIO_DIR, self.config)
  1851. if ffmpeg.start():
  1852. self.ffmpeg_processes.append(ffmpeg)
  1853. else:
  1854. print(f"警告: FFmpeg启动失败,跳过该流: {stream}")
  1855. if not self.ffmpeg_processes:
  1856. print("\n错误: 所有FFmpeg进程均启动失败")
  1857. sys.exit(1)
  1858. print(f"\n成功启动 {len(self.ffmpeg_processes)}/{len(streams)} 个FFmpeg进程")
  1859. # 收集所有流配置(用于PickupMonitor)
  1860. all_stream_configs = [p.stream_config for p in self.ffmpeg_processes]
  1861. # 启动监控线程(统一一个监控器)
  1862. print("\n启动监控线程...")
  1863. check_interval = self.config.get('prediction', {}).get('check_interval', 1.0)
  1864. monitor = PickupMonitor(
  1865. audio_dir=CFG.AUDIO_DIR,
  1866. multi_predictor=self.multi_predictor,
  1867. stream_configs=all_stream_configs,
  1868. check_interval=check_interval,
  1869. config=self.config,
  1870. config_manager=self.config_manager
  1871. )
  1872. monitor.start()
  1873. self.monitors.append(monitor)
  1874. print(f"\n成功启动监控线程")
  1875. print("\n" + "=" * 70)
  1876. print("系统已启动,开始监控...")
  1877. print("按 Ctrl+C 停止系统")
  1878. print("=" * 70 + "\n")
  1879. # FFmpeg重启配置
  1880. max_restart_attempts = 5 # 单个进程最大重启次数
  1881. restart_interval_base = 30 # 基础重启间隔(秒)-> 改为30s
  1882. restart_counts = {id(p): 0 for p in self.ffmpeg_processes} # 重启计数
  1883. # 主循环(带自动重启)
  1884. try:
  1885. while True:
  1886. # 检查每个FFmpeg进程状态
  1887. for ffmpeg in self.ffmpeg_processes:
  1888. if not ffmpeg.is_running():
  1889. pid = id(ffmpeg)
  1890. device_code = ffmpeg.stream_config.device_code
  1891. # 检查重启次数
  1892. if restart_counts.get(pid, 0) < max_restart_attempts:
  1893. # 计算等待时间(指数退避)
  1894. wait_time = restart_interval_base * (2 ** restart_counts.get(pid, 0))
  1895. logger.warning(f"FFmpeg进程停止: {device_code} | 将在{wait_time}秒后重启 | "
  1896. f"重试次数: {restart_counts.get(pid, 0) + 1}/{max_restart_attempts}")
  1897. time.sleep(wait_time)
  1898. # 尝试重启
  1899. if ffmpeg.start():
  1900. logger.debug(f"FFmpeg重启成功: {device_code}")
  1901. restart_counts[pid] = 0 # 重置计数
  1902. else:
  1903. restart_counts[pid] = restart_counts.get(pid, 0) + 1
  1904. logger.error(f"FFmpeg重启失败: {device_code}")
  1905. else:
  1906. logger.error(f"FFmpeg达到最大重启次数: {device_code} | 已放弃重启")
  1907. # 检查是否所有进程都已放弃
  1908. running_count = sum(1 for p in self.ffmpeg_processes if p.is_running())
  1909. all_abandoned = all(restart_counts.get(id(p), 0) >= max_restart_attempts
  1910. for p in self.ffmpeg_processes if not p.is_running())
  1911. if running_count == 0 and all_abandoned:
  1912. print("\n错误: 所有FFmpeg进程均已停止且无法重启")
  1913. break
  1914. # 每天0点执行7天清理
  1915. current_hour = datetime.now().hour
  1916. current_date = datetime.now().strftime("%Y%m%d")
  1917. if not hasattr(self, '_last_cleanup_date'):
  1918. self._last_cleanup_date = ""
  1919. # 在0点且今天还没清理过时执行
  1920. if current_hour == 0 and self._last_cleanup_date != current_date:
  1921. logger.info("执行7天过期文件清理...")
  1922. for monitor in self.monitors:
  1923. monitor._cleanup_old_files(days=7)
  1924. self._last_cleanup_date = current_date
  1925. # 定期打印RTSP状态(每分钟一次)
  1926. if not hasattr(self, '_last_status_log'):
  1927. self._last_status_log = datetime.now()
  1928. if (datetime.now() - self._last_status_log).total_seconds() >= 60:
  1929. running = sum(1 for p in self.ffmpeg_processes if p.is_running())
  1930. total = len(self.ffmpeg_processes)
  1931. logger.info(f"RTSP状态: {running}/{total} 个FFmpeg进程运行中")
  1932. logger.info("─" * 60)
  1933. self._last_status_log = datetime.now()
  1934. time.sleep(10)
  1935. except KeyboardInterrupt:
  1936. print("\n\n收到停止信号,正在关闭系统...")
  1937. finally:
  1938. self.stop()
  1939. def stop(self):
  1940. """
  1941. 停止监控系统
  1942. """
  1943. print("\n正在停止监控系统...")
  1944. print("停止监控线程...")
  1945. for monitor in self.monitors:
  1946. monitor.stop()
  1947. print("停止FFmpeg进程...")
  1948. for ffmpeg in self.ffmpeg_processes:
  1949. ffmpeg.stop()
  1950. print("系统已完全停止")
  1951. def _signal_handler(self, signum, frame):
  1952. """
  1953. 信号处理函数
  1954. """
  1955. print(f"\n\n收到信号 {signum},正在关闭系统...")
  1956. self.stop()
  1957. sys.exit(0)
  1958. def _start_config_api_server(config_manager, multi_predictor=None, port=18080):
  1959. # 在后台线程中启动 FastAPI 配置管理 API
  1960. try:
  1961. import uvicorn
  1962. init_config_api(config_manager, multi_predictor)
  1963. logger.info(f"启动配置管理 API: http://0.0.0.0:{port}")
  1964. uvicorn.run(config_app, host="0.0.0.0", port=port, log_level="warning")
  1965. except ImportError:
  1966. logger.warning("uvicorn 未安装,配置管理 API 无法启动。请安装: pip install uvicorn")
  1967. except Exception as e:
  1968. logger.error(f"配置管理 API 启动失败: {e}")
  1969. def main():
  1970. """
  1971. 主函数
  1972. """
  1973. # 检测 DB 是否存在
  1974. db_path = get_db_path(Path(__file__).parent / "config")
  1975. if not db_path.exists():
  1976. print(f"错误: 配置数据库不存在: {db_path}")
  1977. print(f"\n请先运行迁移脚本: python tool/migrate_yaml_to_db.py")
  1978. sys.exit(1)
  1979. try:
  1980. system = PickupMonitoringSystem()
  1981. # 后台线程启动配置管理 API
  1982. api_port = 18080
  1983. api_thread = threading.Thread(
  1984. target=_start_config_api_server,
  1985. args=(system.config_manager, system.multi_predictor, api_port),
  1986. daemon=True,
  1987. name="config-api"
  1988. )
  1989. api_thread.start()
  1990. system.start()
  1991. except FileNotFoundError as e:
  1992. print(f"\n错误: {e}")
  1993. print("\n请确保:")
  1994. print("1. 已完成训练并计算阈值")
  1995. print("2. 已复制必要的模型文件到 models/ 目录")
  1996. sys.exit(1)
  1997. except Exception as e:
  1998. print(f"\n严重错误: {e}")
  1999. import traceback
  2000. traceback.print_exc()
  2001. sys.exit(1)
  2002. if __name__ == "__main__":
  2003. main()