run_pickup_monitor.py 105 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. run_pickup_monitor.py
  5. ---------------------
  6. 拾音器异响检测主程序
  7. 功能说明:
  8. 该程序用于音频采集设备(拾音器)的实时异常检测。
  9. 与摄像头版本不同,本版本:
  10. 1. 没有视频/图片采集和上报
  11. 2. 上报时包含音频频谱图分析数据
  12. 3. 支持进水流量PLC数据读取
  13. 4. 每1分钟计算一次平均重建误差并上报
  14. 上报数据结构:
  15. {
  16. "message": {
  17. "channelInfo": {"name": "设备信息"},
  18. "requestTime": 时间戳,
  19. "classification": {
  20. "level_one": 2, // 音频检测大类
  21. "level_two": 6 // 异常类型小类: 6=未分类, 7=轴承, 8=气蚀, 9=松动/共振, 10=叶轮, 11=阀件
  22. },
  23. "skillInfo": {"name": "异响检测"},
  24. "sound_detection": {
  25. "abnormalwav": "", // base64编码的音频
  26. "status": 0/1, // 0=异常, 1=正常
  27. "condition": {
  28. "running_status": "运行中/停机中", // 启停状态
  29. "inlet_flow": 0.0 // 进水流量
  30. },
  31. "score": {
  32. "abnormal_score": 0.0, // 当前1分钟内8s平均重构误差
  33. "score_threshold": 0.0 // 该设备的异常阀值
  34. },
  35. "frequency": {
  36. "this_frequency": [], // 当前1分钟的频谱图
  37. "normal_frequency_middle": [], // 过去10分钟的频谱图平均
  38. "normal_frequency_upper": [], // 上限(暂为空)
  39. "normal_frequency_lower": [] // 下限(暂为空)
  40. }
  41. }
  42. }
  43. }
  44. 使用方法:
  45. python run_pickup_monitor.py
  46. 配置文件:
  47. config/rtsp_config.yaml
  48. """
  49. import subprocess
  50. import time
  51. import re
  52. import signal
  53. import sys
  54. import threading
  55. import logging
  56. import base64
  57. from concurrent.futures import ThreadPoolExecutor
  58. from pathlib import Path
  59. from datetime import datetime, timedelta
  60. from collections import defaultdict
  61. # 用于计算FFT
  62. import numpy as np
  63. try:
  64. import librosa
  65. except ImportError:
  66. print("错误:缺少librosa库,请安装: pip install librosa")
  67. sys.exit(1)
  68. try:
  69. import requests
  70. except ImportError:
  71. print("错误:缺少requests库,请安装: pip install requests")
  72. sys.exit(1)
  73. # 导入预测器模块
  74. from predictor import MultiModelPredictor, CFG
  75. # 导入配置管理模块(SQLite)
  76. from config.config_manager import ConfigManager
  77. from config.config_api import app as config_app, init_config_api
  78. from config.db_models import get_db_path
  79. # 导入泵状态监控模块(用于检测启停过渡期)
  80. try:
  81. from core.pump_state_monitor import PumpStateMonitor
  82. PUMP_STATE_MONITOR_AVAILABLE = True
  83. except ImportError:
  84. PUMP_STATE_MONITOR_AVAILABLE = False
  85. # 导入人体检测读取模块(用于抑制有人时的误报)
  86. try:
  87. from core.human_detection_reader import HumanDetectionReader
  88. HUMAN_DETECTION_AVAILABLE = True
  89. except ImportError:
  90. HUMAN_DETECTION_AVAILABLE = False
  91. # 导入报警聚合器(跨设备聚合抑制 + 分类型冷却)
  92. try:
  93. from core.alert_aggregator import AlertAggregator
  94. ALERT_AGGREGATOR_AVAILABLE = True
  95. except ImportError:
  96. ALERT_AGGREGATOR_AVAILABLE = False
  97. # ========================================
  98. # 配置日志系统
  99. # ========================================
  100. def setup_logging():
  101. # 配置日志系统(RotatingFileHandler -> system.log)
  102. from logging.handlers import RotatingFileHandler
  103. # 如果根 logger 已经被上层调用者配置过,则直接复用
  104. root = logging.getLogger()
  105. if root.handlers:
  106. return logging.getLogger('PickupMonitor')
  107. # 日志配置
  108. log_dir = Path(__file__).parent / "logs"
  109. log_dir.mkdir(parents=True, exist_ok=True)
  110. log_file = log_dir / "system.log"
  111. formatter = logging.Formatter(
  112. '%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s',
  113. datefmt='%Y-%m-%d %H:%M:%S'
  114. )
  115. # 按文件大小轮转,最多保留 2 个备份(共 30MB)
  116. file_handler = RotatingFileHandler(
  117. log_file,
  118. maxBytes=10 * 1024 * 1024,
  119. backupCount=2,
  120. encoding='utf-8'
  121. )
  122. file_handler.setFormatter(formatter)
  123. # 控制台输出(前台运行时可见,后台运行时 stdout 已被丢弃不影响)
  124. console_handler = logging.StreamHandler(sys.stdout)
  125. console_handler.setFormatter(formatter)
  126. logging.basicConfig(
  127. level=logging.INFO,
  128. handlers=[file_handler, console_handler]
  129. )
  130. return logging.getLogger('PickupMonitor')
  131. # 初始化日志系统
  132. logger = setup_logging()
  133. # 导入能量基线模块
  134. try:
  135. from core.energy_baseline import EnergyBaseline
  136. ENERGY_BASELINE_AVAILABLE = True
  137. except ImportError:
  138. ENERGY_BASELINE_AVAILABLE = False
  139. logger.warning("能量基线模块未找到,泵状态检测功能禁用")
  140. class RTSPStreamConfig:
  141. """
  142. RTSP流配置类
  143. 封装单个RTSP流的配置信息,包含拾音器特有字段
  144. """
  145. def __init__(self, plant_name, rtsp_url, channel,
  146. camera_name, device_code, pump_name,
  147. flow_plc, project_id):
  148. """
  149. 初始化RTSP流配置
  150. 参数:
  151. plant_name: 区域名称(泵房-反渗透高压泵等)
  152. rtsp_url: RTSP流URL
  153. channel: 通道号
  154. camera_name: 设备名称
  155. device_code: 设备编号(如1#-1)
  156. pump_name: 泵名称(A/B/C/D),用于匹配流量PLC
  157. flow_plc: 流量PLC地址映射
  158. """
  159. self.plant_name = plant_name
  160. self.rtsp_url = rtsp_url
  161. self.channel = channel
  162. self.pump_id = f"ch{channel}"
  163. self.camera_name = camera_name or f"ch{channel}"
  164. self.device_code = device_code
  165. self.pump_name = pump_name
  166. self.flow_plc = flow_plc or {}
  167. self.project_id = project_id
  168. def get_flow_plc_address(self):
  169. """
  170. 获取该设备对应的进水流量PLC地址
  171. 返回:
  172. PLC地址字符串,不存在则返回空字符串
  173. """
  174. if self.pump_name and self.flow_plc:
  175. return self.flow_plc.get(self.pump_name, "")
  176. return ""
  177. def __repr__(self):
  178. return f"RTSPStreamConfig(plant='{self.plant_name}', camera='{self.camera_name}', pump='{self.pump_id}')"
  179. class FFmpegProcess:
  180. """
  181. FFmpeg进程管理类
  182. 负责启动和管理单个RTSP流的FFmpeg进程。
  183. 进程将RTSP流转换为固定时长的WAV音频文件。
  184. 文件名格式: {project_id}_{device_code}_{时间戳}.wav
  185. """
  186. def __init__(self, stream_config, output_dir, config=None):
  187. """
  188. 初始化FFmpeg进程管理器
  189. 参数:
  190. stream_config: RTSP流配置
  191. output_dir: 音频输出目录
  192. config: 全局配置字典
  193. """
  194. self.config_dict = config or {}
  195. self.stream_config = stream_config
  196. self.output_dir = output_dir
  197. self.process = None
  198. # 从配置中读取文件时长,默认8秒
  199. audio_cfg = self.config_dict.get('audio', {})
  200. self.file_duration = audio_cfg.get('file_duration', 8)
  201. # 获取project_id(从 stream_config 中读取)
  202. self.project_id = stream_config.project_id
  203. def start(self):
  204. """
  205. 启动FFmpeg进程
  206. 返回:
  207. bool: 启动成功返回True,失败返回False
  208. """
  209. # 获取设备编号
  210. device_code = self.stream_config.device_code or self.stream_config.pump_id
  211. # 创建输出目录(每个设备独立目录)
  212. # 结构: data/{device_code}/current/
  213. current_dir = self.output_dir / device_code / "current"
  214. current_dir.mkdir(parents=True, exist_ok=True)
  215. # 构建输出文件名模板
  216. # 格式: {project_id}_{device_code}_{时间戳}.wav
  217. # 例如: 92_1#-1_20251218142000.wav
  218. output_pattern = str(current_dir / f"{self.project_id}_{device_code}_%Y%m%d%H%M%S.wav")
  219. # 构建FFmpeg命令
  220. # 添加内存限制参数,防止 RTSP 缓冲区无限增长导致 OOM
  221. cmd = [
  222. "ffmpeg",
  223. # RTSP 输入参数(内存限制)
  224. "-rtsp_transport", "tcp", # 使用TCP传输
  225. "-probesize", "1000000", # 限制探测大小为1MB(默认5MB)
  226. "-analyzeduration", "1000000", # 限制分析时长为1秒(默认5秒)
  227. "-max_delay", "500000", # 最大延迟500ms
  228. "-fflags", "nobuffer", # 禁用输入缓冲
  229. "-flags", "low_delay", # 低延迟模式
  230. "-i", self.stream_config.rtsp_url, # 输入RTSP流
  231. # 音频输出参数
  232. "-vn", # 不处理视频
  233. "-ac", "1", # 单声道
  234. "-ar", str(CFG.SR), # 采样率(16000Hz)
  235. "-f", "segment", # 分段模式
  236. "-segment_time", str(int(self.file_duration)), # 每段时长
  237. "-strftime", "1", # 启用时间格式化
  238. "-loglevel", "error", # 只输出错误日志
  239. "-y", # 覆盖已存在的文件
  240. output_pattern,
  241. ]
  242. try:
  243. # 启动FFmpeg进程
  244. self.process = subprocess.Popen(
  245. cmd,
  246. stdout=subprocess.PIPE,
  247. stderr=subprocess.PIPE
  248. )
  249. logger.info(f"FFmpeg已启动: {device_code} | {self.stream_config.camera_name} | "
  250. f"文件时长: {self.file_duration}秒 | PID={self.process.pid}")
  251. return True
  252. except FileNotFoundError:
  253. logger.error("FFmpeg错误: 未找到ffmpeg命令,请确保已安装FFmpeg")
  254. return False
  255. except Exception as e:
  256. logger.error(f"FFmpeg启动失败: {device_code} | 错误: {e}")
  257. return False
  258. def is_running(self):
  259. """
  260. 检查FFmpeg进程是否在运行
  261. 返回:
  262. bool: 进程运行中返回True,否则返回False
  263. """
  264. if self.process is None:
  265. return False
  266. return self.process.poll() is None
  267. def stop(self):
  268. """
  269. 停止FFmpeg进程
  270. """
  271. if self.process is not None and self.is_running():
  272. logger.info(f"FFmpeg停止: {self.stream_config.plant_name} | {self.stream_config.camera_name} | PID={self.process.pid}")
  273. self.process.terminate()
  274. try:
  275. self.process.wait(timeout=5)
  276. except subprocess.TimeoutExpired:
  277. logger.warning(f"FFmpeg强制终止: PID={self.process.pid}")
  278. self.process.kill()
  279. class PickupMonitor:
  280. """
  281. 拾音器监控线程类
  282. 监控音频目录,调用预测器检测异常,推送告警。
  283. 主要功能:
  284. 1. 不截取视频帧(纯音频)
  285. 2. 计算并上报频谱图数据(this_frequency + normal_frequency_middle)
  286. 3. 每分钟汇总上报一次
  287. 4. 使用SCADA API获取进水流量
  288. """
  289. def __init__(self, audio_dir, multi_predictor,
  290. stream_configs,
  291. check_interval=1.0, config=None,
  292. config_manager=None):
  293. """
  294. 初始化监控器
  295. Args:
  296. audio_dir: 音频根目录
  297. multi_predictor: 多模型预测器实例
  298. stream_configs: 所有RTSP流配置
  299. check_interval: 检查间隔(秒)
  300. config: 配置字典
  301. config_manager: ConfigManager 实例(用于热更新,为 None 时禁用热更新)
  302. """
  303. # 音频根目录(各设备目录在其下)
  304. self.audio_dir = audio_dir
  305. self.multi_predictor = multi_predictor
  306. self.predictor = None # 兼容性保留,已废弃
  307. self.stream_configs = stream_configs
  308. self.check_interval = check_interval
  309. self.config = config or {}
  310. # 热更新:持有 ConfigManager 引用,定期从 DB 刷新配置
  311. self._config_manager = config_manager
  312. self._last_config_reload = 0 # 上次配置刷新时间戳
  313. self._config_reload_interval = 30 # 配置刷新间隔(秒)
  314. # project_id
  315. self.project_id = self.config.get('platform', {}).get('project_id', 92)
  316. # 构建device_code到stream_config的映射
  317. self.device_map = {}
  318. for cfg in stream_configs:
  319. if cfg.device_code:
  320. self.device_map[cfg.device_code] = cfg
  321. # 已处理文件集合
  322. self.seen_files = set()
  323. # 每个设备的检测结果缓存(用于1分钟汇总)
  324. # key: device_code(如 "1#-1")
  325. self.device_cache = defaultdict(lambda: {
  326. "errors": [], # 每8秒的重建误差列表
  327. "last_upload": None, # 上次上报时间
  328. "audio_data": [], # 用于计算频谱图的音频数据
  329. "status": None # 最近的运行状态
  330. })
  331. # 频谱图历史缓存(用于计算normal_frequency_middle)
  332. # key: device_code, value: list of (timestamp, freq_db)
  333. freq_cfg = self.config.get('prediction', {}).get('frequency_history', {})
  334. self.freq_history_enabled = freq_cfg.get('enabled', True)
  335. self.freq_history_minutes = freq_cfg.get('history_minutes', 10)
  336. self.freq_history = defaultdict(list)
  337. # 上一次上报状态(True=异常,False=正常,None=初始)
  338. # 用于状态变更时去重,防止持续报警
  339. self.last_report_status = {}
  340. # 上报周期(秒)
  341. audio_cfg = self.config.get('audio', {})
  342. self.segment_duration = audio_cfg.get('segment_duration', 60)
  343. # 异常音频保存配置
  344. save_cfg = self.config.get('prediction', {}).get('save_anomaly_audio', {})
  345. self.save_anomaly_enabled = save_cfg.get('enabled', True)
  346. self.save_anomaly_dir = Path(__file__).parent / save_cfg.get('save_dir', 'data/anomaly_detected')
  347. if self.save_anomaly_enabled:
  348. self.save_anomaly_dir.mkdir(parents=True, exist_ok=True)
  349. # 异常推送配置
  350. push_cfg = self.config.get('push_notification', {})
  351. self.push_enabled = push_cfg.get('enabled', False)
  352. self.alert_enabled = push_cfg.get('alert_enabled', True) # 是否启用异常告警(false=暂时禁用异常上报)
  353. self.push_timeout = push_cfg.get('timeout', 30)
  354. self.push_retry_count = push_cfg.get('retry_count', 2)
  355. # 推送基地址列表(统一用 base_url/{project_id} 拼接,可无限扩展)
  356. raw_urls = push_cfg.get('push_base_urls', [])
  357. self.push_base_urls = [
  358. {"label": item.get("label", f"target-{i}"), "url": item.get("url", "").rstrip("/")}
  359. for i, item in enumerate(raw_urls)
  360. if item.get("url") # 跳过空URL
  361. ]
  362. # 推送失败记录文件路径
  363. failed_log_path = push_cfg.get('failed_push_log', 'data/push_failures.jsonl')
  364. self.failed_push_log = Path(__file__).parent / failed_log_path
  365. self.failed_push_log.parent.mkdir(parents=True, exist_ok=True)
  366. # 如果 alert_enabled 为 False,记录日志提醒
  367. if not self.alert_enabled:
  368. logger.warning("异常告警已禁用(alert_enabled=false),仅上报心跳状态")
  369. # ========================================
  370. # 报警聚合器(替代原有固定 cooldown_minutes)
  371. # ----------------------------------------
  372. # 功能1:跨设备聚合抑制 - 同一水厂多设备同时报警 -> 环境噪声,全部抑制
  373. # 功能2:分类型冷却 - 同类型24h,不同类型1h
  374. # ========================================
  375. agg_cfg = push_cfg.get('alert_aggregate', {})
  376. self.alert_aggregator = None
  377. if ALERT_AGGREGATOR_AVAILABLE:
  378. self.alert_aggregator = AlertAggregator(
  379. push_callback=self._push_detection_result,
  380. aggregate_enabled=agg_cfg.get('enabled', True),
  381. window_seconds=agg_cfg.get('window_seconds', 300),
  382. min_devices=agg_cfg.get('min_devices', 2),
  383. cooldown_same_type_hours=push_cfg.get('cooldown_same_type_hours', 24),
  384. cooldown_diff_type_hours=push_cfg.get('cooldown_diff_type_hours', 1)
  385. )
  386. else:
  387. logger.warning("报警聚合器模块未找到,使用默认报警逻辑")
  388. # 上次异常音频保存时间(用于保存冷却时间计算)
  389. self.last_anomaly_save_time = {}
  390. # 异常保存冷却时间(分钟),同一设备连续异常时,每N分钟只保存一次
  391. self.anomaly_save_cooldown_minutes = save_cfg.get('cooldown_minutes', 10)
  392. # 当前异常分类结果锁定(持续异常期间保持分类结果不变)
  393. # key: device_code, value: (anomaly_type_code, type_name)
  394. self.locked_anomaly_type = {}
  395. # 滑动窗口投票配置(5次中有3次异常才判定为异常)
  396. voting_cfg = self.config.get('prediction', {}).get('voting', {})
  397. self.voting_enabled = voting_cfg.get('enabled', True)
  398. self.voting_window_size = voting_cfg.get('window_size', 5) # 窗口大小
  399. self.voting_threshold = voting_cfg.get('threshold', 3) # 异常阈值(>=3次则判定异常)
  400. self.detection_history = {} # 每个设备的检测历史(True=异常)
  401. # 阈值容差区间配置(避免边界值反复跳变)
  402. # 误差在 threshold*(1-tolerance) ~ threshold*(1+tolerance) 范围内为灰区,维持上一状态
  403. self.tolerance_ratio = voting_cfg.get('tolerance_ratio', 0.05) # 默认5%容差
  404. self.last_single_anomaly = {} # 每个设备上一次的单周期判定结果
  405. # 阈值现在由 multi_predictor 管理,每个设备从其对应模型目录加载
  406. # 能量检测配置
  407. energy_cfg = self.config.get('prediction', {}).get('energy_detection', {})
  408. self.energy_detection_enabled = ENERGY_BASELINE_AVAILABLE and energy_cfg.get('enabled', True)
  409. self.skip_detection_when_stopped = energy_cfg.get('skip_when_stopped', True)
  410. # 能量基线检测器(每个设备一个)
  411. if self.energy_detection_enabled:
  412. self.energy_baselines = {}
  413. else:
  414. self.energy_baselines = None
  415. # SCADA API配置(用于获取泵状态和进水流量)
  416. scada_cfg = self.config.get('scada_api', {})
  417. self.scada_enabled = scada_cfg.get('enabled', False)
  418. self.scada_url = scada_cfg.get('base_url', '') # 历史数据接口(备用)
  419. self.scada_realtime_url = scada_cfg.get('realtime_url', '') # 实时数据接口(主用)
  420. self.scada_jwt = scada_cfg.get('jwt_token', '')
  421. self.scada_timeout = scada_cfg.get('timeout', 10)
  422. # 泵状态监控器(用于检测启停过渡期)
  423. self.pump_state_monitor = None
  424. self.pump_status_plc_configs = {} # {pump_name: [{point, name}, ...]}
  425. if PUMP_STATE_MONITOR_AVAILABLE and self.scada_enabled:
  426. # 初始化泵状态监控器
  427. # 获取第一个启用水厂的project_id
  428. project_id = 0
  429. for plant in self.config.get('plants', []):
  430. if plant.get('enabled', False):
  431. project_id = plant.get('project_id', 0)
  432. # 加载泵状态点位配置
  433. pump_status_plc = plant.get('pump_status_plc', {})
  434. self.pump_status_plc_configs = pump_status_plc
  435. break
  436. if project_id > 0:
  437. # 使用实时接口 URL 进行泵状态查询
  438. self.pump_state_monitor = PumpStateMonitor(
  439. scada_url=self.scada_realtime_url, # 使用实时数据接口
  440. scada_jwt=self.scada_jwt,
  441. project_id=project_id,
  442. timeout=self.scada_timeout,
  443. transition_window_minutes=15 # 启停后15分钟内视为过渡期
  444. )
  445. logger.info(f"泵状态监控器已启用 (project_id={project_id}, 过渡期窗口=15分钟, 使用实时接口)")
  446. # 线程控制
  447. self.running = False
  448. self.thread = None
  449. # 推送线程池(避免推送超时阻塞主监控循环)
  450. self._push_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="push")
  451. # 启动时间(用于跳过启动期间的状态变化日志)
  452. self.startup_time = None
  453. self.startup_warmup_seconds = 120 # 启动后120秒内不记录状态变化
  454. # ========================================
  455. # 异常上下文捕获配置
  456. # ========================================
  457. context_cfg = save_cfg.get('context_capture', {})
  458. self.context_capture_enabled = context_cfg.get('enabled', False)
  459. self.context_pre_minutes = context_cfg.get('pre_minutes', 2)
  460. self.context_post_minutes = context_cfg.get('post_minutes', 2)
  461. # 音频文件历史缓存(用于捕获异常前的音频)
  462. # key: device_code, value: deque of (timestamp, file_path)
  463. from collections import deque
  464. # 计算需要保留的历史文件数量(按分钟计算,每分钟约1个文件)
  465. history_size = self.context_pre_minutes + 2 # 多保留2个作为缓冲
  466. self.audio_file_history = defaultdict(lambda: deque(maxlen=history_size))
  467. # 异常上下文捕获状态
  468. # key: device_code, value: dict
  469. self.anomaly_capture_state = {}
  470. if self.context_capture_enabled:
  471. logger.info(f"异常上下文捕获已启用 (前{self.context_pre_minutes}分钟 + 后{self.context_post_minutes}分钟)")
  472. # ========================================
  473. # 人体检测报警抑制配置
  474. # ========================================
  475. # 只要任意摄像头在冷却时间内检测到人,所有设备的异常报警都会被抑制
  476. human_cfg = self.config.get('human_detection', {})
  477. self.human_detection_enabled = human_cfg.get('enabled', False) and HUMAN_DETECTION_AVAILABLE
  478. self.human_reader = None
  479. if self.human_detection_enabled:
  480. db_path = human_cfg.get('db_path', '')
  481. cooldown_minutes = human_cfg.get('cooldown_minutes', 5)
  482. self.human_reader = HumanDetectionReader(
  483. db_path=db_path,
  484. cooldown_minutes=cooldown_minutes
  485. )
  486. logger.info(f"人体检测报警抑制已启用 (冷却时间={cooldown_minutes}分钟, 数据库={db_path})")
  487. def start(self):
  488. """
  489. 启动监控线程
  490. """
  491. if self.running:
  492. return
  493. # 启动前清理current目录中的遗留文件
  494. self._cleanup_current_on_startup()
  495. self.running = True
  496. self.startup_time = datetime.now() # 记录启动时间
  497. self.thread = threading.Thread(target=self._monitor_loop, daemon=True)
  498. self.thread.start()
  499. # 打印监控的设备列表
  500. device_codes = list(self.device_map.keys())
  501. logger.info(f"监控线程已启动 | 目录: {self.audio_dir} | 监控设备: {device_codes}")
  502. def _cleanup_current_on_startup(self):
  503. """
  504. 启动时清理current目录中的遗留文件
  505. 删除上次运行遗留的文件,避免混入新采集数据影响检测准确性
  506. """
  507. cleaned_count = 0
  508. for device_code in self.device_map.keys():
  509. current_dir = self.audio_dir / device_code / "current"
  510. if not current_dir.exists():
  511. continue
  512. for wav_file in current_dir.glob("*.wav"):
  513. try:
  514. wav_file.unlink()
  515. cleaned_count += 1
  516. except Exception as e:
  517. logger.warning(f"清理遗留文件失败: {wav_file.name} | {e}")
  518. if cleaned_count > 0:
  519. logger.info(f"启动清理: 已删除current目录中 {cleaned_count} 个遗留文件")
  520. def stop(self):
  521. """
  522. 停止监控线程
  523. """
  524. if not self.running:
  525. return
  526. self.running = False
  527. if self.thread is not None:
  528. self.thread.join(timeout=5)
  529. # 关闭推送线程池,等待已提交的推送任务完成
  530. self._push_executor.shutdown(wait=True, cancel_futures=False)
  531. logger.info(f"监控线程已停止")
  532. def _reload_hot_config(self):
  533. # 从 DB 热加载可变配置项,无需重启服务
  534. # 只刷新运行时可安全变更的参数,不影响 FFmpeg 进程和流映射
  535. if self._config_manager is None:
  536. return
  537. now = time.time()
  538. # 未达到刷新间隔则跳过
  539. if now - self._last_config_reload < self._config_reload_interval:
  540. return
  541. self._last_config_reload = now
  542. try:
  543. # 从 DB 读取最新配置
  544. fresh = self._config_manager.get_full_config()
  545. self.config = fresh
  546. # 刷新推送配置
  547. push_cfg = fresh.get('push_notification', {})
  548. self.push_enabled = push_cfg.get('enabled', False)
  549. self.alert_enabled = push_cfg.get('alert_enabled', True)
  550. self.push_timeout = push_cfg.get('timeout', 30)
  551. self.push_retry_count = push_cfg.get('retry_count', 2)
  552. raw_urls = push_cfg.get('push_base_urls', [])
  553. self.push_base_urls = [
  554. {"label": item.get("label", f"target-{i}"), "url": item.get("url", "").rstrip("/")}
  555. for i, item in enumerate(raw_urls)
  556. if item.get("url")
  557. ]
  558. # 刷新投票配置
  559. voting_cfg = fresh.get('prediction', {}).get('voting', {})
  560. self.voting_enabled = voting_cfg.get('enabled', True)
  561. self.voting_window_size = voting_cfg.get('window_size', 5)
  562. self.voting_threshold = voting_cfg.get('threshold', 3)
  563. self.tolerance_ratio = voting_cfg.get('tolerance_ratio', 0.05)
  564. # 刷新能量检测配置
  565. energy_cfg = fresh.get('prediction', {}).get('energy_detection', {})
  566. self.skip_detection_when_stopped = energy_cfg.get('skip_when_stopped', True)
  567. # 刷新人体检测配置
  568. human_cfg = fresh.get('human_detection', {})
  569. new_human_enabled = human_cfg.get('enabled', False)
  570. if new_human_enabled != self.human_detection_enabled:
  571. logger.info(f"人体检测抑制配置变更: {self.human_detection_enabled} -> {new_human_enabled}")
  572. self.human_detection_enabled = new_human_enabled
  573. logger.debug("配置热刷新完成")
  574. except Exception as e:
  575. logger.error(f"配置热刷新失败: {e}")
  576. def _monitor_loop(self):
  577. """
  578. 监控循环(线程主函数)
  579. 持续检查各设备的音频目录,处理新生成的WAV文件。
  580. 每分钟汇总一次结果进行上报。
  581. """
  582. # 确保根目录存在
  583. self.audio_dir.mkdir(parents=True, exist_ok=True)
  584. while self.running:
  585. try:
  586. # 热更新:定期从 DB 刷新可变配置
  587. self._reload_hot_config()
  588. # 扫描所有设备目录下的current文件夹
  589. for device_code in self.device_map.keys():
  590. device_current_dir = self.audio_dir / device_code / "current"
  591. # 目录不存在则跳过
  592. if not device_current_dir.exists():
  593. continue
  594. for wav_file in device_current_dir.glob("*.wav"):
  595. # 跳过已处理的文件
  596. if wav_file in self.seen_files:
  597. continue
  598. # 检查文件是否完整
  599. try:
  600. stat_info = wav_file.stat()
  601. file_age = time.time() - stat_info.st_mtime
  602. file_size = stat_info.st_size
  603. # 文件修改时间检查(确保文件写入完成)
  604. # FFmpeg生成文件需要时间,太新的文件可能还没写完
  605. if file_age < 12.0:
  606. continue
  607. # 文件大小检查(60秒 × 16000Hz × 2字节 ≈ 1.9MB)
  608. # 范围:500KB - 3MB
  609. if file_size < 500_000 or file_size > 3_000_000:
  610. # 只有当文件生成已经有一段时间(>20秒)且依然很小,才判定为异常
  611. # 这样可以避免刚开始生成、正在写入的文件被误报
  612. if file_age > 20.0:
  613. logger.warning(f"文件大小异常: {wav_file.name} ({file_size / 1000:.1f}KB)")
  614. # 自动删除过小的文件(通常是0KB或损坏文件)
  615. if file_size < 500_000:
  616. try:
  617. wav_file.unlink()
  618. logger.debug(f"删除异常小文件: {wav_file.name}")
  619. except Exception as e:
  620. logger.error(f"删除文件失败: {wav_file.name} | {e}")
  621. self.seen_files.add(wav_file)
  622. continue # 继续下一个文件循环
  623. # 正常大小的文件继续处理
  624. if file_size < 500_000: # 双重检查,防止漏网
  625. continue
  626. except Exception as e:
  627. logger.error(f"文件状态检查失败: {wav_file.name} | {e}")
  628. continue
  629. # 处理新文件
  630. self._process_new_file(wav_file)
  631. # 标记为已处理
  632. self.seen_files.add(wav_file)
  633. # 检查是否需要进行周期性上报
  634. self._check_periodic_upload()
  635. # 检查聚合窗口是否到期,到期则执行聚合判定并推送
  636. if self.alert_aggregator:
  637. self.alert_aggregator.check_and_flush()
  638. # 清理过大的已处理文件集合
  639. if len(self.seen_files) > 10000:
  640. recent_files = sorted(self.seen_files,
  641. key=lambda f: f.stat().st_mtime if f.exists() else 0)[-5000:]
  642. self.seen_files = set(recent_files)
  643. except Exception as e:
  644. logger.error(f"监控循环错误: {e}")
  645. # 等待下一次检查
  646. time.sleep(self.check_interval)
  647. def _process_new_file(self, wav_file):
  648. """
  649. 处理新的音频文件
  650. 文件名格式: {project_id}_{device_code}_{时间戳}.wav
  651. 例如: 92_1#-1_20251218142000.wav
  652. 流程:
  653. 1. 加载音频
  654. 2. 计算能量判断设备状态
  655. 3. 进行AE异常检测,记录重建误差
  656. 4. 保存音频数据用于计算频谱图
  657. """
  658. try:
  659. # 从文件名解析device_code
  660. # 格式: {project_id}_{device_code}_{时间戳}.wav
  661. try:
  662. parts = wav_file.stem.split('_')
  663. if len(parts) >= 3:
  664. device_code = parts[1] # 第二部分是device_code
  665. else:
  666. device_code = "unknown"
  667. except:
  668. device_code = "unknown"
  669. # ========================================
  670. # 泵启停状态检测(基于 PLC 查询)- 优先于模型检查
  671. # ----------------------------------------
  672. # 逻辑:通过 SCADA API 查询 PLC 点位判断泵是否运行
  673. # 作用:
  674. # 1. 冷启动模式下也能过滤停机时的音频(保证训练数据质量)
  675. # 2. 泵停机时跳过异常检测(避免无意义的检测)
  676. # 3. 记录设备状态用于后续上报
  677. # 依赖:pump_state_monitor + rtsp_config 中的 pump_status_plc 配置
  678. # ========================================
  679. device_status = "未知"
  680. pump_is_running = True # 默认认为运行中(PLC 查询失败时的保守策略)
  681. # 获取该设备对应的流配置
  682. stream_config = self.device_map.get(device_code)
  683. if self.pump_state_monitor and stream_config:
  684. # 根据设备的 pump_name 找到关联的 PLC 点位配置
  685. pump_name = stream_config.pump_name
  686. pump_configs = self.pump_status_plc_configs.get(pump_name, [])
  687. if pump_configs:
  688. # 遍历所有关联泵,只要有一个运行就认为设备在工作
  689. any_pump_running = False
  690. for pump_cfg in pump_configs:
  691. point = pump_cfg.get("point", "")
  692. name = pump_cfg.get("name", point)
  693. pump_id = point
  694. # 查询泵状态(带 60 秒缓存,不会频繁请求 SCADA)
  695. is_running, _ = self.pump_state_monitor.update_pump_state(pump_id, point, name)
  696. if is_running:
  697. any_pump_running = True
  698. pump_is_running = any_pump_running
  699. device_status = "开机" if pump_is_running else "停机"
  700. # 泵全部停机时跳过(可通过配置禁用此行为)
  701. # 冷启动和正常模式都适用,确保训练数据质量
  702. if self.skip_detection_when_stopped and not pump_is_running:
  703. logger.info(f"泵停机(PLC): {device_code} | 归档到过渡期目录(不用于训练)")
  704. self._move_audio_to_transition_dir(wav_file, "stopped")
  705. return
  706. # ========================================
  707. # 过渡期检测(泵启停后15分钟内)
  708. # ----------------------------------------
  709. # 目的:过滤启停过程中的不稳定音频
  710. # 确保训练数据只包含稳定运行期的音频
  711. # ========================================
  712. if self.skip_detection_when_stopped:
  713. pump_in_transition, transition_pump_names = \
  714. self.pump_state_monitor.check_pumps_transition(pump_configs)
  715. if pump_in_transition:
  716. logger.info(f"泵过渡期: {device_code} | 归档到过渡期目录(不用于训练) | "
  717. f"过渡期泵: {', '.join(transition_pump_names)}")
  718. self._move_audio_to_transition_dir(wav_file, "transition")
  719. return
  720. # 获取该设备的预测器(懒加载模型)
  721. device_predictor = self.multi_predictor.get_predictor(device_code)
  722. if device_predictor is None:
  723. logger.info(f"冷启动模式(设备 {device_code} 无模型): 归档 {wav_file.name}")
  724. self._move_audio_to_date_dir(wav_file)
  725. return
  726. # 加载音频
  727. try:
  728. y, sr = librosa.load(str(wav_file), sr=CFG.SR, mono=True)
  729. except Exception as e:
  730. logger.error(f"音频加载失败: {wav_file.name} | {e}")
  731. return
  732. # 记录状态到缓存(用于周期上报)
  733. # 注意:泵状态检测已在前面完成,这里只记录状态
  734. self.device_cache[device_code]["status"] = device_status
  735. # ========================================
  736. # 计算重建误差
  737. # ========================================
  738. error = self._compute_reconstruction_error(wav_file, device_predictor)
  739. if error is not None:
  740. self.device_cache[device_code]["errors"].append(error)
  741. # ========================================
  742. # 保存音频数据用于计算频谱图
  743. # ========================================
  744. self.device_cache[device_code]["audio_data"].append(y)
  745. # ========================================
  746. # 暂存文件路径,等待周期聚合判定后再归档
  747. # ========================================
  748. if "pending_files" not in self.device_cache[device_code]:
  749. self.device_cache[device_code]["pending_files"] = []
  750. self.device_cache[device_code]["pending_files"].append(wav_file)
  751. # ========================================
  752. # 记录到音频历史缓存(用于异常上下文捕获)
  753. # ========================================
  754. if self.context_capture_enabled:
  755. self.audio_file_history[device_code].append((datetime.now(), wav_file))
  756. # 初始化上次上报时间
  757. if self.device_cache[device_code]["last_upload"] is None:
  758. self.device_cache[device_code]["last_upload"] = datetime.now()
  759. # 获取阈值并判断结果
  760. threshold = self._get_threshold(device_code)
  761. # ========================================
  762. # 快速通道:连续多个文件误差极高时快速预警(暂时关闭)
  763. # 不走投票窗口,用于捕获突发性严重故障
  764. # ========================================
  765. # if error is not None and threshold is not None and threshold > 0:
  766. # # 维护快速通道缓冲区
  767. # if "fast_alert_buffer" not in self.device_cache[device_code]:
  768. # self.device_cache[device_code]["fast_alert_buffer"] = []
  769. #
  770. # fast_buf = self.device_cache[device_code]["fast_alert_buffer"]
  771. # # 连续性检查:误差超过 2x 阈值时记录,否则清空缓冲区
  772. # if error > threshold * 2.0:
  773. # fast_buf.append(error)
  774. # else:
  775. # fast_buf.clear()
  776. #
  777. # # 连续 3 个文件(~24秒)都超过 2x 阈值 → 触发快速预警
  778. # FAST_ALERT_CONSECUTIVE = 3
  779. # if len(fast_buf) >= FAST_ALERT_CONSECUTIVE:
  780. # # 检查是否已触发过(避免重复告警)
  781. # last_fast = self.device_cache[device_code].get("last_fast_alert_time")
  782. # now = datetime.now()
  783. # can_fast_alert = (last_fast is None or
  784. # (now - last_fast).total_seconds() > 300) # 5分钟冷却
  785. #
  786. # if can_fast_alert:
  787. # # 快速通道同样受抑制逻辑约束
  788. # suppress = False
  789. # # 泵过渡期抑制
  790. # if self.pump_state_monitor and stream_config:
  791. # pump_name = stream_config.pump_name
  792. # pump_configs = self.pump_status_plc_configs.get(pump_name, [])
  793. # if pump_configs:
  794. # in_transition, _ = self.pump_state_monitor.check_pumps_transition(pump_configs)
  795. # if in_transition:
  796. # suppress = True
  797. # # 人体检测抑制
  798. # if self.human_detection_enabled and self.human_reader:
  799. # if self.human_reader.is_in_cooldown():
  800. # suppress = True
  801. # # alert_enabled 开关
  802. # if not self.alert_enabled:
  803. # suppress = True
  804. #
  805. # if not suppress:
  806. # avg_fast = float(np.mean(fast_buf))
  807. # logger.warning(
  808. # f"[!!] 快速通道触发: {device_code} | "
  809. # f"连续{len(fast_buf)}个文件异常 | "
  810. # f"平均误差={avg_fast:.6f} 阈值={threshold:.6f}")
  811. # self.device_cache[device_code]["last_fast_alert_time"] = now
  812. # fast_buf.clear()
  813. # # 标记快速预警,在下次周期上报时一并处理
  814. # self.device_cache[device_code]["fast_alert_pending"] = True
  815. if error is not None and threshold is not None:
  816. is_anomaly = error > threshold
  817. result_tag = "!!" if is_anomaly else "OK"
  818. logger.info(f"[{result_tag}] {device_code} | {wav_file.name} | "
  819. f"误差={error:.6f} 阈值={threshold:.6f}")
  820. elif error is not None:
  821. logger.debug(f"文件预测: {wav_file.name} | 误差={error:.6f} | 阀值=未设置")
  822. else:
  823. logger.warning(f"预测跳过: {wav_file.name} | 误差计算失败")
  824. except Exception as e:
  825. logger.error(f"处理文件失败: {wav_file.name} | 错误: {e}")
  826. def _compute_reconstruction_error(self, wav_file, device_predictor):
  827. """
  828. 计算单个音频文件的重建误差(Min-Max 标准化)
  829. 使用8秒窗口、4秒步长切割音频,提取多个patches分别计算误差后取平均值。
  830. 参数:
  831. wav_file: 音频文件路径
  832. device_predictor: 设备预测器实例
  833. 返回:
  834. 重建误差值(所有patches的平均MSE),失败返回None
  835. """
  836. try:
  837. import torch
  838. from predictor.utils import align_to_target
  839. # Min-Max 标准化参数
  840. global_min = device_predictor.global_min
  841. global_max = device_predictor.global_max
  842. # 加载音频
  843. y, _ = librosa.load(str(wav_file), sr=CFG.SR, mono=True)
  844. win_samples = int(CFG.WIN_SEC * CFG.SR)
  845. hop_samples = int(CFG.HOP_SEC * CFG.SR)
  846. if len(y) < win_samples:
  847. logger.warning(f"音频太短,无法提取patches: {wav_file.name}")
  848. return None
  849. patches = []
  850. for start in range(0, len(y) - win_samples + 1, hop_samples):
  851. window = y[start:start + win_samples]
  852. S = librosa.feature.melspectrogram(
  853. y=window, sr=CFG.SR, n_fft=CFG.N_FFT,
  854. hop_length=CFG.HOP_LENGTH, n_mels=CFG.N_MELS, power=2.0
  855. )
  856. S_db = librosa.power_to_db(S, ref=np.max)
  857. if S_db.shape[1] < CFG.TARGET_FRAMES:
  858. S_db = np.pad(S_db, ((0, 0), (0, CFG.TARGET_FRAMES - S_db.shape[1])))
  859. else:
  860. S_db = S_db[:, :CFG.TARGET_FRAMES]
  861. # Min-Max 标准化
  862. S_norm = (S_db - global_min) / (global_max - global_min + 1e-6)
  863. patches.append(S_norm.astype(np.float32))
  864. if not patches:
  865. logger.warning(f"未能提取任何patches: {wav_file.name}")
  866. return None
  867. arr = np.stack(patches, 0)
  868. arr = np.expand_dims(arr, 1)
  869. tensor = torch.from_numpy(arr)
  870. torch_device = device_predictor.torch_device
  871. tensor = tensor.to(torch_device)
  872. with torch.no_grad():
  873. recon = device_predictor.model(tensor)
  874. recon = align_to_target(recon, tensor)
  875. mse_per_patch = torch.mean((recon - tensor) ** 2, dim=[1, 2, 3])
  876. mean_mse = torch.mean(mse_per_patch).item()
  877. logger.debug(f"重建误差: {wav_file.name} | patches={len(patches)} | MSE={mean_mse:.6f}")
  878. return mean_mse
  879. except Exception as e:
  880. logger.error(f"计算重建误差失败: {wav_file.name} | {e}")
  881. return None
  882. def _check_periodic_upload(self):
  883. """
  884. 检查是否需要进行周期性上报
  885. 每分钟汇总一次各设备的检测结果并上报
  886. """
  887. now = datetime.now()
  888. for device_code, cache in self.device_cache.items():
  889. # 检查上报时间间隔
  890. last_upload = cache.get("last_upload")
  891. if last_upload is None:
  892. continue
  893. elapsed = (now - last_upload).total_seconds()
  894. # 达到上报周期
  895. if elapsed >= self.segment_duration:
  896. # 获取该设备的流配置
  897. stream_config = self.device_map.get(device_code)
  898. # 计算平均重建误差
  899. errors = cache.get("errors", [])
  900. avg_error = float(np.mean(errors)) if errors else 0.0
  901. # 获取阈值
  902. threshold = self._get_threshold(device_code)
  903. # 判断当前周期是否异常(带容差区间)
  904. # 容差区间:避免边界值反复跳变,但灰区内仍与阈值比较
  905. if threshold:
  906. upper_bound = threshold * (1 + self.tolerance_ratio) # 确定异常边界
  907. lower_bound = threshold * (1 - self.tolerance_ratio) # 确定正常边界
  908. if avg_error > upper_bound:
  909. # 超过上边界 -> 确定异常
  910. is_current_anomaly = True
  911. elif avg_error < lower_bound:
  912. # 低于下边界 -> 确定正常
  913. is_current_anomaly = False
  914. else:
  915. # 灰区 -> 与阈值比较(避免异常状态延长)
  916. is_current_anomaly = avg_error > threshold
  917. # 记录本次判定结果
  918. self.last_single_anomaly[device_code] = is_current_anomaly
  919. else:
  920. is_current_anomaly = False
  921. # ========================================
  922. # 滑动窗口投票:5次中有3次异常才判定为异常
  923. # ========================================
  924. if device_code not in self.detection_history:
  925. self.detection_history[device_code] = []
  926. # 记录本次检测结果
  927. self.detection_history[device_code].append(is_current_anomaly)
  928. # 保持窗口大小
  929. if len(self.detection_history[device_code]) > self.voting_window_size:
  930. self.detection_history[device_code].pop(0)
  931. # 投票判定最终异常状态
  932. if self.voting_enabled and len(self.detection_history[device_code]) >= self.voting_window_size:
  933. anomaly_count = sum(self.detection_history[device_code])
  934. is_anomaly = anomaly_count >= self.voting_threshold
  935. window_info = f"窗口[{anomaly_count}/{self.voting_window_size}]"
  936. else:
  937. is_anomaly = is_current_anomaly
  938. window_info = "窗口未满"
  939. # ========================================
  940. # 状态变更检测
  941. # trigger_alert: 仅在 正常(或初始) -> 异常 时为True
  942. # 冷却逻辑已移至 AlertAggregator 内部处理
  943. # ========================================
  944. last_is_anomaly = self.last_report_status.get(device_code)
  945. trigger_alert = False
  946. if is_anomaly:
  947. # 只有状态变化(正常->异常) 才触发报警流程
  948. if last_is_anomaly is None or not last_is_anomaly:
  949. # ========================================
  950. # 泵启停过渡期检查(抑制误报)
  951. # ----------------------------------------
  952. # 逻辑:检测到异常时,检查关联泵是否刚启停
  953. # 作用:泵启停过程中音频特征剧烈变化,易被误判为异常
  954. # 过渡期内抑制告警,避免误报
  955. # 过渡期窗口:配置中的 transition_window_minutes(默认15分钟)
  956. # ========================================
  957. pump_in_transition = False
  958. transition_pump_names = []
  959. if self.pump_state_monitor and stream_config:
  960. pump_name = stream_config.pump_name
  961. pump_configs = self.pump_status_plc_configs.get(pump_name, [])
  962. if pump_configs:
  963. # 批量检查所有关联泵是否有处于过渡期的
  964. pump_in_transition, transition_pump_names = \
  965. self.pump_state_monitor.check_pumps_transition(pump_configs)
  966. if pump_in_transition:
  967. # 有泵处于过渡期 -> 抑制本次告警
  968. trigger_alert = False
  969. logger.info(f"泵启停过渡期,抑制告警: {device_code} | "
  970. f"过渡期泵: {', '.join(transition_pump_names)}")
  971. else:
  972. # 检查 alert_enabled 开关:如果禁用则不触发告警
  973. if self.alert_enabled:
  974. # ========================================
  975. # 人体检测抑制:任意摄像头检测到人则不报警
  976. # ========================================
  977. if self.human_detection_enabled and self.human_reader:
  978. if self.human_reader.is_in_cooldown():
  979. trigger_alert = False
  980. status_info = self.human_reader.get_status_info()
  981. logger.info(f"人体检测抑制: {device_code} | {status_info},跳过报警")
  982. else:
  983. trigger_alert = True
  984. else:
  985. trigger_alert = True
  986. else:
  987. trigger_alert = False
  988. logger.debug(f"异常告警已禁用,跳过告警: {device_code}")
  989. # 获取运行状态
  990. running_status = cache.get("status", "未知")
  991. # 获取进水流量
  992. inlet_flow = self._get_inlet_flow(stream_config) if stream_config else 0.0
  993. # 计算本次频谱图
  994. audio_data = cache.get("audio_data", [])
  995. freq_db = self._compute_frequency_spectrum(audio_data)
  996. # 保存到频谱图历史(只保存dB值)
  997. if self.freq_history_enabled and freq_db:
  998. self.freq_history[device_code].append((now, freq_db))
  999. # 清理过期历史
  1000. cutoff = now - timedelta(minutes=self.freq_history_minutes)
  1001. self.freq_history[device_code] = [
  1002. (t, d) for t, d in self.freq_history[device_code]
  1003. if t > cutoff
  1004. ]
  1005. # 计算历史频谱图平均值(normal_frequency_middle)
  1006. freq_middle_db = self._compute_frequency_middle(device_code)
  1007. # ========================================
  1008. # 异常分类:只在状态从正常变为异常时进行分类
  1009. # 持续异常期间沿用上次分类结果,保持一致性
  1010. # ========================================
  1011. anomaly_type_code = 6 # 默认:未分类异常
  1012. type_name = "未分类异常"
  1013. if is_anomaly and audio_data:
  1014. # 检查是否是新的异常(从正常变为异常)
  1015. is_new_anomaly = (last_is_anomaly is None or not last_is_anomaly)
  1016. if is_new_anomaly:
  1017. # 新异常:进行分类并锁定结果
  1018. try:
  1019. from core.anomaly_classifier import classify_anomaly
  1020. if len(audio_data) > 0:
  1021. y = audio_data[-1] if isinstance(audio_data[-1], np.ndarray) else np.array(audio_data[-1])
  1022. anomaly_type_code, type_name, confidence = classify_anomaly(y, sr=16000)
  1023. # 锁定分类结果
  1024. self.locked_anomaly_type[device_code] = (anomaly_type_code, type_name)
  1025. logger.info(f"异常分类(新异常): {type_name} (code={anomaly_type_code}, 置信度={confidence:.2f})")
  1026. except Exception as e:
  1027. logger.warning(f"异常分类失败: {e}")
  1028. else:
  1029. # 持续异常:沿用锁定的分类结果
  1030. if device_code in self.locked_anomaly_type:
  1031. anomaly_type_code, type_name = self.locked_anomaly_type[device_code]
  1032. logger.debug(f"异常分类(沿用): {type_name} (code={anomaly_type_code})")
  1033. else:
  1034. # 状态正常时清除锁定的分类结果
  1035. if device_code in self.locked_anomaly_type:
  1036. del self.locked_anomaly_type[device_code]
  1037. # 上报逻辑
  1038. if self.push_enabled and stream_config:
  1039. # 预读异常音频base64:在文件被归档/清空之前立即读取
  1040. # 解决竞态问题:异步推送或聚合器延迟推送时文件可能已被移走
  1041. pre_read_wav_b64 = ""
  1042. if trigger_alert:
  1043. try:
  1044. current_pending = cache.get("pending_files", [])
  1045. if current_pending and current_pending[0].exists():
  1046. with open(current_pending[0], "rb") as f:
  1047. pre_read_wav_b64 = base64.b64encode(f.read()).decode('utf-8')
  1048. logger.debug(f"预读异常音频成功: {current_pending[0].name} | size={len(pre_read_wav_b64)}")
  1049. except Exception as e:
  1050. logger.warning(f"预读异常音频失败: {e}")
  1051. if trigger_alert and self.alert_aggregator:
  1052. # 报警走聚合器:跨设备聚合判定 + 分类型冷却
  1053. # 聚合器会在窗口到期后决定是否真正推送
  1054. self.alert_aggregator.submit_alert(
  1055. plant_name=stream_config.plant_name,
  1056. device_code=device_code,
  1057. anomaly_type_code=anomaly_type_code,
  1058. push_kwargs=dict(
  1059. stream_config=stream_config,
  1060. device_code=device_code,
  1061. is_anomaly=is_anomaly,
  1062. trigger_alert=True,
  1063. abnormal_score=avg_error,
  1064. score_threshold=threshold,
  1065. running_status=running_status,
  1066. inlet_flow=inlet_flow,
  1067. freq_db=freq_db,
  1068. freq_middle_db=freq_middle_db,
  1069. anomaly_type_code=anomaly_type_code,
  1070. abnormal_wav_b64=pre_read_wav_b64
  1071. )
  1072. )
  1073. else:
  1074. # 非报警(心跳)或聚合器不可用 -> 提交到线程池异步推送
  1075. self._push_executor.submit(
  1076. self._push_detection_result,
  1077. stream_config=stream_config,
  1078. device_code=device_code,
  1079. is_anomaly=is_anomaly,
  1080. trigger_alert=trigger_alert,
  1081. abnormal_score=avg_error,
  1082. score_threshold=threshold,
  1083. running_status=running_status,
  1084. inlet_flow=inlet_flow,
  1085. freq_db=freq_db,
  1086. freq_middle_db=freq_middle_db,
  1087. anomaly_type_code=anomaly_type_code,
  1088. abnormal_wav_b64=pre_read_wav_b64
  1089. )
  1090. # 更新上一次状态
  1091. self.last_report_status[device_code] = is_anomaly
  1092. # 日志记录
  1093. thr_str = f"{threshold:.6f}" if threshold else "未设置"
  1094. # 报警去向说明
  1095. if trigger_alert and self.alert_aggregator:
  1096. alert_dest = "-> 聚合器"
  1097. elif trigger_alert:
  1098. alert_dest = "-> 直接推送"
  1099. else:
  1100. alert_dest = ""
  1101. # 使用设备名作为标识,增加视觉分隔
  1102. cam_label = ""
  1103. if stream_config:
  1104. cam_label = f"({stream_config.camera_name})"
  1105. result_emoji = "!!" if is_anomaly else "OK"
  1106. alert_str = f"报警=是 {alert_dest}" if trigger_alert else "报警=否"
  1107. logger.info(
  1108. f"[{result_emoji}] {device_code}{cam_label} | "
  1109. f"误差={avg_error:.6f} 阈值={thr_str} | "
  1110. f"{window_info} | {running_status} | "
  1111. f"{'异常' if is_anomaly else '正常'} | {alert_str}"
  1112. )
  1113. # ========================================
  1114. # 根据单次检测结果归档文件
  1115. # 归档基于 is_current_anomaly(单次检测),而非投票后的 is_anomaly
  1116. # 投票机制只影响是否推送报警,不影响文件分类
  1117. # ========================================
  1118. pending_files = cache.get("pending_files", [])
  1119. # 检查是否是新的异常(从正常变为异常)
  1120. is_new_anomaly = is_anomaly and (last_is_anomaly is None or not last_is_anomaly)
  1121. # ========================================
  1122. # 异常上下文捕获逻辑
  1123. # ========================================
  1124. if self.context_capture_enabled:
  1125. # 检查并更新捕获状态
  1126. self._update_anomaly_capture_state(device_code, is_anomaly, is_new_anomaly,
  1127. avg_error, threshold, now, pending_files)
  1128. if pending_files:
  1129. if is_current_anomaly:
  1130. # 单次检测为异常 -> 归档到异常目录
  1131. # 检查异常保存冷却时间
  1132. last_save = self.last_anomaly_save_time.get(device_code)
  1133. should_save = True
  1134. if last_save:
  1135. elapsed_minutes = (now - last_save).total_seconds() / 60
  1136. should_save = elapsed_minutes >= self.anomaly_save_cooldown_minutes
  1137. if should_save and not self.context_capture_enabled:
  1138. # 上下文捕获禁用时,使用原有逻辑:移动到异常待排查目录
  1139. for f in pending_files:
  1140. self._move_audio_to_anomaly_pending(f)
  1141. self.last_anomaly_save_time[device_code] = now
  1142. logger.warning(f"已隔离 {len(pending_files)} 个异常文件到待排查目录")
  1143. elif self.context_capture_enabled:
  1144. # 上下文捕获启用时:
  1145. # - 异常文件已在 _update_anomaly_capture_state 中记录路径
  1146. # - 文件会在 _save_anomaly_context 中被移动到异常目录
  1147. # - 这里暂时保留文件,不做处理
  1148. pass # 文件由捕获逻辑处理
  1149. else:
  1150. # 冷却时间内,删除文件不保存
  1151. for f in pending_files:
  1152. try:
  1153. f.unlink()
  1154. except Exception as e:
  1155. logger.debug(f"删除冷却期内文件失败: {f.name} | {e}")
  1156. remaining_minutes = self.anomaly_save_cooldown_minutes - elapsed_minutes
  1157. logger.debug(f"异常保存冷却中: {device_code} | 剩余 {remaining_minutes:.1f} 分钟")
  1158. else:
  1159. # 单次检测为正常 -> 移到日期目录归档
  1160. for f in pending_files:
  1161. self._move_audio_to_date_dir(f)
  1162. # 状态恢复正常时,清除保存冷却时间(下次异常时可立即保存)
  1163. if device_code in self.last_anomaly_save_time:
  1164. del self.last_anomaly_save_time[device_code]
  1165. # 重置缓存
  1166. cache["errors"] = []
  1167. cache["audio_data"] = []
  1168. cache["pending_files"] = []
  1169. cache["last_upload"] = now
  1170. logger.info("─" * 60)
  1171. def _update_anomaly_capture_state(self, device_code, is_anomaly,
  1172. is_new_anomaly, avg_error,
  1173. threshold, now, pending_files):
  1174. """
  1175. 更新异常上下文捕获状态
  1176. 状态机:
  1177. 1. 未触发 -> 检测到新异常 -> 触发捕获,从 audio_file_history 回溯获取文件
  1178. 2. 已触发,等待后续 -> 持续收集后续文件
  1179. 3. 已触发,时间到 -> 保存所有文件和元数据
  1180. 修复说明:
  1181. - anomaly_files 不再依赖 pending_files(可能为空)
  1182. - 改为从 audio_file_history 回溯获取触发时刻前 1 分钟内的文件
  1183. - pre_files 改为获取触发时刻前 1~N+1 分钟的文件(排除 anomaly 时间段)
  1184. 参数:
  1185. device_code: 设备编号
  1186. is_anomaly: 当前周期是否异常
  1187. is_new_anomaly: 是否是新异常(从正常变为异常)
  1188. avg_error: 平均重建误差
  1189. threshold: 阈值
  1190. now: 当前时间
  1191. pending_files: 当前周期的待处理文件
  1192. """
  1193. import shutil
  1194. import json
  1195. state = self.anomaly_capture_state.get(device_code)
  1196. if is_new_anomaly and state is None:
  1197. # 新异常触发:开始捕获
  1198. # ========================================
  1199. # 从 audio_file_history 回溯获取文件
  1200. # ----------------------------------------
  1201. # anomaly_files: 触发时刻前 1 分钟内的文件(最接近异常的时间段)
  1202. # pre_files: 触发时刻前 1~(N+1) 分钟的文件(更早的正常时间段)
  1203. # ========================================
  1204. anomaly_cutoff = now - timedelta(minutes=1) # 前1分钟
  1205. pre_cutoff = now - timedelta(minutes=self.context_pre_minutes + 1) # 前N+1分钟
  1206. pre_files = []
  1207. anomaly_files = []
  1208. for ts, fpath in self.audio_file_history[device_code]:
  1209. if not fpath.exists():
  1210. continue
  1211. if ts >= anomaly_cutoff:
  1212. # 触发时刻前1分钟内 -> anomaly_files
  1213. anomaly_files.append(fpath)
  1214. elif ts >= pre_cutoff:
  1215. # 触发时刻前1~(N+1)分钟 -> pre_files
  1216. pre_files.append(fpath)
  1217. # 如果 anomaly_files 仍为空,把当前 pending_files 加入(兜底)
  1218. if not anomaly_files and pending_files:
  1219. anomaly_files = [f for f in pending_files if f.exists()]
  1220. # 初始化捕获状态
  1221. self.anomaly_capture_state[device_code] = {
  1222. "trigger_time": now,
  1223. "avg_error": avg_error,
  1224. "threshold": threshold,
  1225. "pre_files": pre_files,
  1226. "anomaly_files": anomaly_files,
  1227. "post_files": [],
  1228. "post_start_time": now
  1229. }
  1230. logger.info(f"异常上下文捕获已触发: {device_code} | "
  1231. f"前置文件={len(pre_files)}个 | 异常文件={len(anomaly_files)}个 | "
  1232. f"等待后续{self.context_post_minutes}分钟")
  1233. elif state is not None:
  1234. # 已触发状态:收集后续文件
  1235. elapsed_post = (now - state["post_start_time"]).total_seconds() / 60
  1236. if elapsed_post < self.context_post_minutes:
  1237. # 还在收集后续文件
  1238. for f in pending_files:
  1239. if f.exists():
  1240. state["post_files"].append(f)
  1241. else:
  1242. # 时间到,保存所有文件
  1243. self._save_anomaly_context(device_code, state)
  1244. # 清除状态
  1245. del self.anomaly_capture_state[device_code]
  1246. # 更新保存冷却时间
  1247. self.last_anomaly_save_time[device_code] = now
  1248. def _save_anomaly_context(self, device_code: str, state: dict):
  1249. """
  1250. 保存异常上下文文件到独立目录
  1251. 目录结构:
  1252. data/anomaly_detected/{device_code}/{异常文件名(不含扩展名)}/
  1253. ├── 92_1#-1_20260130140313.wav (保持原文件名)
  1254. ├── ...
  1255. └── metadata.json
  1256. metadata.json 字段说明:
  1257. - before_trigger: 触发前 1~(N+1) 分钟的文件(正常时期,用于对比)
  1258. - at_trigger: 触发时刻前 1 分钟内的文件(异常开始出现的时期)
  1259. - after_trigger: 触发后 N 分钟的文件(不管异常是否恢复)
  1260. 参数:
  1261. device_code: 设备编号
  1262. state: 捕获状态字典
  1263. """
  1264. import shutil
  1265. import json
  1266. try:
  1267. # 获取第一个异常文件名作为文件夹名
  1268. anomaly_files = state.get("anomaly_files", [])
  1269. if not anomaly_files:
  1270. logger.warning(f"无异常文件,跳过保存: {device_code}")
  1271. return
  1272. # 用第一个异常文件名(不含扩展名)作为文件夹名
  1273. first_anomaly = anomaly_files[0]
  1274. folder_name = first_anomaly.stem # 如 92_1#-1_20260130140313
  1275. save_dir = self.save_anomaly_dir / device_code / folder_name
  1276. save_dir.mkdir(parents=True, exist_ok=True)
  1277. # 收集所有文件名(使用新命名)
  1278. all_files = {"before_trigger": [], "at_trigger": [], "after_trigger": []}
  1279. # 移动前置文件(来自日期目录,移动后原位置不再保留)
  1280. for fpath in state.get("pre_files", []):
  1281. if fpath.exists():
  1282. dest = save_dir / fpath.name
  1283. shutil.move(str(fpath), str(dest)) # 移动而非复制,避免重复
  1284. all_files["before_trigger"].append(fpath.name)
  1285. # 移动触发时刻文件(保持原名)
  1286. for fpath in anomaly_files:
  1287. if fpath.exists():
  1288. dest = save_dir / fpath.name
  1289. shutil.move(str(fpath), str(dest))
  1290. all_files["at_trigger"].append(fpath.name)
  1291. # 移动后续文件(保持原名)
  1292. for fpath in state.get("post_files", []):
  1293. if fpath.exists():
  1294. dest = save_dir / fpath.name
  1295. shutil.move(str(fpath), str(dest))
  1296. all_files["after_trigger"].append(fpath.name)
  1297. # 生成精简的元数据文件
  1298. trigger_time = state.get("trigger_time")
  1299. metadata = {
  1300. "device_code": device_code,
  1301. "trigger_time": trigger_time.strftime("%Y-%m-%d %H:%M:%S") if trigger_time else None,
  1302. "avg_error": round(state.get("avg_error", 0.0), 6),
  1303. "threshold": round(state.get("threshold", 0.0), 6),
  1304. "files": all_files
  1305. }
  1306. metadata_path = save_dir / "metadata.json"
  1307. with open(metadata_path, 'w', encoding='utf-8') as f:
  1308. json.dump(metadata, f, ensure_ascii=False, indent=2)
  1309. total = sum(len(v) for v in all_files.values())
  1310. logger.warning(f"异常上下文已保存: {device_code}/{folder_name} | "
  1311. f"共{total}个文件 (前{len(all_files['before_trigger'])}+异常{len(all_files['at_trigger'])}+后{len(all_files['after_trigger'])})")
  1312. except Exception as e:
  1313. logger.error(f"保存异常上下文失败: {device_code} | {e}")
  1314. def _compute_frequency_middle(self, device_code):
  1315. """
  1316. 计算历史频谱图平均值(normal_frequency_middle)
  1317. 参数:
  1318. device_code: 设备编号
  1319. 返回:
  1320. dB值列表的平均值
  1321. """
  1322. history = self.freq_history.get(device_code, [])
  1323. if not history or len(history) < 2:
  1324. return []
  1325. try:
  1326. # 收集所有历史dB值(只保留长度一致的)
  1327. all_db = []
  1328. ref_len = len(history[0][1]) # 使用第一条记录的长度作为参考
  1329. for _, db in history:
  1330. if len(db) == ref_len:
  1331. all_db.append(db)
  1332. if not all_db:
  1333. return []
  1334. # 计算各频率点的平均dB
  1335. avg_db = np.mean(all_db, axis=0).tolist()
  1336. return avg_db
  1337. except Exception as e:
  1338. logger.error(f"计算频谱图历史平均失败: {e}")
  1339. return []
  1340. def _get_threshold(self, device_code):
  1341. """
  1342. 获取指定设备的阈值
  1343. 优先级:
  1344. 1. 从 multi_predictor 获取设备对应模型的阈值
  1345. 2. 使用配置文件中的默认阈值
  1346. 3. 返回0.0(不进行异常判定)
  1347. 参数:
  1348. device_code: 设备编号(如 "LT-1")
  1349. 返回:
  1350. 阈值,未找到返回默认值或0.0
  1351. """
  1352. # 方式1:从 multi_predictor 获取设备阈值
  1353. thr = self.multi_predictor.get_threshold(device_code)
  1354. if thr is not None:
  1355. logger.debug(f"阈值来源: {device_code} -> 设备模型阈值 = {thr:.6f}")
  1356. return thr
  1357. # 方式2:使用配置中的默认阈值
  1358. default_threshold = self.config.get('prediction', {}).get('default_threshold', 0.0)
  1359. if default_threshold > 0:
  1360. logger.debug(f"阈值来源: {device_code} -> 配置默认值 = {default_threshold:.6f}")
  1361. return default_threshold
  1362. # 首次找不到阈值时记录警告
  1363. if device_code not in getattr(self, '_threshold_warned', set()):
  1364. if not hasattr(self, '_threshold_warned'):
  1365. self._threshold_warned = set()
  1366. self._threshold_warned.add(device_code)
  1367. logger.warning(f"未找到阈值: {device_code},将跳过异常判定")
  1368. return 0.0
  1369. def _get_inlet_flow(self, stream_config: RTSPStreamConfig) -> float:
  1370. """
  1371. 获取进水流量(使用实时数据接口)
  1372. 使用 current-data 接口直接获取最新一条数据
  1373. 参数:
  1374. stream_config: 流配置
  1375. 返回:
  1376. 进水流量值,失败返回0.0
  1377. """
  1378. if not self.scada_enabled or not stream_config:
  1379. logger.debug(f"流量跳过: scada_enabled={self.scada_enabled}, stream_config={stream_config}")
  1380. return 0.0
  1381. # 获取PLC数据点位
  1382. plc_address = stream_config.get_flow_plc_address()
  1383. if not plc_address:
  1384. logger.debug(f"流量跳过(无PLC地址): {stream_config.device_code} | pump_name='{stream_config.pump_name}'")
  1385. return 0.0
  1386. # 使用水厂配置的 project_id
  1387. project_id = stream_config.project_id
  1388. try:
  1389. # 当前时间戳(毫秒)
  1390. now_ms = int(datetime.now().timestamp() * 1000)
  1391. # 请求参数
  1392. params = {"time": now_ms}
  1393. # 请求体:使用实时数据接口格式
  1394. request_body = [
  1395. {
  1396. "deviceId": "1",
  1397. "deviceItems": plc_address,
  1398. "deviceName": f"流量_{stream_config.pump_name}",
  1399. "project_id": project_id
  1400. }
  1401. ]
  1402. # 请求头
  1403. headers = {
  1404. "Content-Type": "application/json",
  1405. "JWT-TOKEN": self.scada_jwt
  1406. }
  1407. logger.debug(f"流量请求: {stream_config.device_code} | project_id={project_id} | plc={plc_address}")
  1408. # 发送 POST 请求到实时接口
  1409. response = requests.post(
  1410. self.scada_realtime_url,
  1411. params=params,
  1412. json=request_body,
  1413. headers=headers,
  1414. timeout=self.scada_timeout
  1415. )
  1416. if response.status_code == 200:
  1417. data = response.json()
  1418. if data.get("code") == 200:
  1419. if data.get("data"):
  1420. # 获取第一条数据(实时接口只返回最新一条)
  1421. latest = data["data"][0]
  1422. if "val" in latest:
  1423. flow = float(latest["val"])
  1424. logger.debug(f"流量获取成功: {stream_config.device_code} | 流量={flow}")
  1425. return flow
  1426. else:
  1427. logger.debug(f"流量数据无val字段: {stream_config.device_code}")
  1428. else:
  1429. # API正常返回但无数据
  1430. logger.debug(f"流量查询无数据: {stream_config.device_code}")
  1431. else:
  1432. logger.warning(f"流量API返回异常: {stream_config.device_code} | code={data.get('code')} | msg={data.get('msg')}")
  1433. else:
  1434. logger.warning(f"流量HTTP错误: {stream_config.device_code} | status={response.status_code}")
  1435. except Exception as e:
  1436. logger.warning(f"流量获取异常: {stream_config.device_code} | {e}")
  1437. return 0.0
  1438. def _compute_frequency_spectrum(self, audio_data):
  1439. """
  1440. 计算频谱图数据
  1441. 将1分钟内的多个8秒音频片段合并,计算整体FFT
  1442. 参数:
  1443. audio_data: 音频数据列表
  1444. 返回:
  1445. dB值列表(0-8000Hz均匀分布,共400个点)
  1446. """
  1447. if not audio_data:
  1448. return []
  1449. try:
  1450. # 合并所有音频片段
  1451. combined = np.concatenate(audio_data)
  1452. # 计算FFT
  1453. n = len(combined)
  1454. fft_result = np.fft.rfft(combined)
  1455. freqs = np.fft.rfftfreq(n, 1.0 / CFG.SR)
  1456. # 计算幅度(转换为dB)
  1457. magnitude = np.abs(fft_result)
  1458. # 避免log(0)
  1459. magnitude = np.maximum(magnitude, 1e-10)
  1460. db = 20 * np.log10(magnitude)
  1461. # 降采样到400个点(0-8000Hz均匀分布)
  1462. max_freq = 8000
  1463. num_points = 400
  1464. db_list = []
  1465. for i in range(num_points):
  1466. # 计算目标频率(0到8000Hz均匀分布)
  1467. target_freq = (i / (num_points - 1)) * max_freq
  1468. # 找到最接近的频率索引
  1469. idx = np.argmin(np.abs(freqs - target_freq))
  1470. if idx < len(freqs):
  1471. db_list.append(float(db[idx]))
  1472. return db_list
  1473. except Exception as e:
  1474. logger.error(f"计算频谱图失败: {e}")
  1475. return []
  1476. def _push_detection_result(self, stream_config, device_code,
  1477. is_anomaly, trigger_alert, abnormal_score, score_threshold,
  1478. running_status, inlet_flow,
  1479. freq_db, freq_middle_db=None,
  1480. anomaly_type_code=6,
  1481. abnormal_wav_b64=""):
  1482. """
  1483. 推送检测结果到远程服务器
  1484. 上报格式符合用户要求:
  1485. - 包含频谱图数据(this_frequency + normal_frequency_middle)
  1486. - 包含启停状态和进水流量
  1487. - sound_detection.status 和 abnormalwav 仅在 trigger_alert=True 时上报异常(0)和base64,否则为正常(1)和空
  1488. 参数:
  1489. stream_config: 流配置
  1490. device_code: 设备编号
  1491. is_anomaly: 实际检测是否异常
  1492. trigger_alert: 是否触发报警(仅在新异常产生时为True)
  1493. abnormal_score: 平均重建误差
  1494. score_threshold: 阈值
  1495. running_status: 启停状态
  1496. inlet_flow: 进水流量
  1497. freq_db: 本次频谱图dB值列表
  1498. freq_middle_db: 历史平均频谱图dB值列表
  1499. abnormal_wav_b64: 预读的异常音频base64编码(在文件归档前读取,避免竞态)
  1500. """
  1501. try:
  1502. import time as time_module
  1503. # 获取设备名称
  1504. camera_name = stream_config.camera_name
  1505. # 从 push_base_urls 列表构建推送目标(每个 base_url + /{project_id})
  1506. if not self.push_base_urls:
  1507. logger.warning(f"未配置push_base_urls: {device_code}")
  1508. return
  1509. # 获取该设备对应的 project_id,用于拼接最终推送URL
  1510. project_id = stream_config.project_id
  1511. # 构建推送消息
  1512. request_time = int(time_module.time() * 1000)
  1513. # 异常音频已通过参数 abnormal_wav_b64 传入(调用方在文件归档前预读)
  1514. # 如果调用方未提供,记录警告以便排查
  1515. if trigger_alert and not abnormal_wav_b64:
  1516. logger.warning(f"报警推送但无异常音频数据: {device_code}")
  1517. # 决定 sound_detection.status
  1518. # 只有在 trigger_alert=True 时才报 0 (异常)
  1519. # 其他情况(正常、持续异常)都报 1 (正常/空) -- 根据用户要求: "正常情况下 ... 是空 就行"
  1520. report_status = 0 if trigger_alert else 1
  1521. payload = {
  1522. "message": {
  1523. # 通道信息
  1524. "channelInfo": {"name": camera_name},
  1525. # 请求时间戳
  1526. "requestTime": request_time,
  1527. # 分类信息
  1528. "classification": {
  1529. "level_one": 2, # 音频检测大类
  1530. "level_two": anomaly_type_code # 异常类型小类(6=未分类, 7=轴承, 8=气蚀, 9=松动, 10=叶轮, 11=阀件)
  1531. },
  1532. # 技能信息
  1533. "skillInfo": {"name": "异响检测"},
  1534. # 声音检测数据
  1535. "sound_detection": {
  1536. # 异常音频
  1537. "abnormalwav": abnormal_wav_b64,
  1538. # 状态:0=异常(仅新异常), 1=正常(或持续异常)
  1539. "status": report_status,
  1540. # 设备状态信息
  1541. "condition": {
  1542. "running_status": "运行中" if running_status == "开机" else "停机中",
  1543. "inlet_flow": inlet_flow
  1544. },
  1545. # 得分信息
  1546. "score": {
  1547. "abnormal_score": abnormal_score, # 当前1分钟平均重构误差
  1548. "score_threshold": score_threshold # 该设备异常阀值
  1549. },
  1550. # 频谱图数据
  1551. "frequency": {
  1552. "this_frequency": freq_db, # 当前1分钟的频谱图
  1553. "normal_frequency_middle": freq_middle_db or [], # 过去10分钟的频谱图平均
  1554. "normal_frequency_upper": [], # 上限(暂为空)
  1555. "normal_frequency_lower": [] # 下限(暂为空)
  1556. }
  1557. }
  1558. }
  1559. }
  1560. # 遍历所有推送基地址,逐个拼接 project_id 发送(各目标互不影响)
  1561. push_targets = [
  1562. (f"{item['url']}/{project_id}", item['label'])
  1563. for item in self.push_base_urls
  1564. ]
  1565. # 逐个目标推送(各目标互不影响)
  1566. for target_url, target_label in push_targets:
  1567. self._send_to_target(
  1568. target_url, target_label, payload,
  1569. device_code, camera_name, trigger_alert, abnormal_score
  1570. )
  1571. except Exception as e:
  1572. logger.error(f"推送通知异常: {e}")
  1573. def _send_to_target(self, target_url, target_label, payload,
  1574. device_code, camera_name, trigger_alert, abnormal_score):
  1575. # 向单个推送目标发送数据(含重试逻辑)
  1576. # 各目标独立调用,某个目标失败不影响其他目标
  1577. import time as time_module
  1578. push_success = False
  1579. for attempt in range(self.push_retry_count + 1):
  1580. try:
  1581. response = requests.post(
  1582. target_url,
  1583. json=payload,
  1584. timeout=self.push_timeout,
  1585. headers={"Content-Type": "application/json"}
  1586. )
  1587. if response.status_code == 200:
  1588. alert_tag = "报警" if trigger_alert else "心跳"
  1589. logger.info(
  1590. f" [{alert_tag}][{target_label}] {device_code}({camera_name}) | "
  1591. f"误差={abnormal_score:.6f}"
  1592. )
  1593. push_success = True
  1594. break
  1595. else:
  1596. logger.warning(
  1597. f"推送失败[{target_label}]: {device_code} | URL={target_url} | "
  1598. f"状态码={response.status_code} | 内容={response.text[:100]}"
  1599. )
  1600. except requests.exceptions.Timeout:
  1601. logger.warning(f"推送超时[{target_label}]: {device_code} | URL={target_url} | 尝试 {attempt + 1}/{self.push_retry_count + 1}")
  1602. except requests.exceptions.RequestException as e:
  1603. logger.warning(f"推送异常[{target_label}]: {device_code} | URL={target_url} | {e}")
  1604. # 重试间隔
  1605. if attempt < self.push_retry_count:
  1606. time_module.sleep(1)
  1607. if not push_success:
  1608. logger.error(f"推送失败[{target_label}]: {device_code} | URL={target_url} | 已达最大重试次数")
  1609. def _move_audio_to_date_dir(self, wav_file):
  1610. """
  1611. 将音频移动到日期目录归档
  1612. 目录结构:
  1613. deploy_pickup/data/{device_code}/{日期}/{文件名}
  1614. 参数:
  1615. wav_file: 音频文件路径
  1616. """
  1617. try:
  1618. import shutil
  1619. # 从文件名提取device_code和日期
  1620. # 格式: 92_1#-1_20251218142000.wav
  1621. match = re.match(r'\d+_(.+)_(\d{8})\d{6}\.wav', wav_file.name)
  1622. if not match:
  1623. logger.warning(f"无法从文件名提取信息: {wav_file.name}")
  1624. return
  1625. device_code = match.group(1) # 如 1#-1
  1626. date_str = match.group(2) # YYYYMMDD
  1627. # 构建目标目录: data/{device_code}/{日期}/
  1628. date_dir = self.audio_dir / device_code / date_str
  1629. date_dir.mkdir(parents=True, exist_ok=True)
  1630. # 移动文件
  1631. dest_file = date_dir / wav_file.name
  1632. shutil.move(str(wav_file), str(dest_file))
  1633. logger.debug(f"音频已归档: {device_code}/{date_str}/{wav_file.name}")
  1634. except Exception as e:
  1635. logger.error(f"移动音频失败: {wav_file.name} | 错误: {e}")
  1636. def _move_audio_to_transition_dir(self, wav_file, reason):
  1637. """
  1638. 将泵停机/过渡期的音频移动到过渡期目录
  1639. 目录结构:
  1640. deploy_pickup/data/{device_code}/pump_transition/{文件名}
  1641. 这些音频不会被用于模型训练,但保留用于分析调试
  1642. 参数:
  1643. wav_file: 音频文件路径
  1644. reason: 原因标识(stopped=停机, transition=过渡期)
  1645. """
  1646. try:
  1647. import shutil
  1648. # 从文件名提取device_code
  1649. # 格式: 92_1#-1_20251218142000.wav
  1650. match = re.match(r'\d+_(.+)_\d{14}\.wav', wav_file.name)
  1651. if not match:
  1652. logger.warning(f"无法从文件名提取信息: {wav_file.name}")
  1653. return
  1654. device_code = match.group(1) # 如 1#-1
  1655. # 构建目标目录: data/{device_code}/pump_transition/
  1656. transition_dir = self.audio_dir / device_code / "pump_transition"
  1657. transition_dir.mkdir(parents=True, exist_ok=True)
  1658. # 移动文件
  1659. dest_file = transition_dir / wav_file.name
  1660. shutil.move(str(wav_file), str(dest_file))
  1661. logger.debug(f"过渡期音频已归档: {device_code}/pump_transition/{wav_file.name} ({reason})")
  1662. except Exception as e:
  1663. logger.error(f"移动过渡期音频失败: {wav_file.name} | 错误: {e}")
  1664. def _move_audio_to_anomaly_pending(self, wav_file):
  1665. """
  1666. 将异常音频移动到待排查目录
  1667. 目录结构:
  1668. deploy_pickup/data/{device_code}/anomaly_pending/{文件名}
  1669. 用户确认是误报后,可手动将文件移到日期目录参与增训
  1670. 参数:
  1671. wav_file: 音频文件路径
  1672. """
  1673. try:
  1674. import shutil
  1675. # 从文件名提取device_code
  1676. # 格式: 92_1#-1_20251218142000.wav
  1677. match = re.match(r'\d+_(.+)_\d{14}\.wav', wav_file.name)
  1678. if not match:
  1679. logger.warning(f"无法从文件名提取信息: {wav_file.name}")
  1680. return
  1681. device_code = match.group(1) # 如 1#-1
  1682. # 构建目标目录: data/{device_code}/anomaly_pending/
  1683. pending_dir = self.audio_dir / device_code / "anomaly_pending"
  1684. pending_dir.mkdir(parents=True, exist_ok=True)
  1685. # 移动文件
  1686. dest_file = pending_dir / wav_file.name
  1687. shutil.move(str(wav_file), str(dest_file))
  1688. logger.debug(f"异常音频已隔离: {device_code}/anomaly_pending/{wav_file.name}")
  1689. except Exception as e:
  1690. logger.error(f"移动异常音频失败: {wav_file.name} | 错误: {e}")
  1691. def _cleanup_old_files(self, days: int = 7):
  1692. """
  1693. 清理超过指定天数的正常音频文件
  1694. 清理规则:
  1695. - 只清理日期归档目录(data/{device_code}/{日期}/)
  1696. - 保留current目录和anomaly_detected目录
  1697. - 超过days天的文件删除
  1698. 参数:
  1699. days: 保留天数,默认7天
  1700. """
  1701. try:
  1702. import shutil
  1703. from datetime import datetime, timedelta
  1704. # 计算截止日期
  1705. cutoff_date = datetime.now() - timedelta(days=days)
  1706. cutoff_str = cutoff_date.strftime("%Y%m%d")
  1707. deleted_count = 0
  1708. # 遍历每个设备目录
  1709. for device_dir in self.audio_dir.iterdir():
  1710. if not device_dir.is_dir():
  1711. continue
  1712. # 检查是否是日期目录(跳过current和其他特殊目录)
  1713. for subdir in device_dir.iterdir():
  1714. if not subdir.is_dir():
  1715. continue
  1716. # 跳过current目录
  1717. if subdir.name == "current":
  1718. continue
  1719. # 检查是否为日期目录(YYYYMMDD格式)
  1720. if not re.match(r'^\d{8}$', subdir.name):
  1721. continue
  1722. # 如果日期早于截止日期,删除整个目录
  1723. if subdir.name < cutoff_str:
  1724. try:
  1725. shutil.rmtree(subdir)
  1726. deleted_count += 1
  1727. logger.debug(f"清理过期目录: {device_dir.name}/{subdir.name}")
  1728. except Exception as e:
  1729. logger.error(f"删除目录失败: {subdir} | {e}")
  1730. if deleted_count > 0:
  1731. logger.info(f"清理完成: 共删除{deleted_count}个过期目录")
  1732. except Exception as e:
  1733. logger.error(f"清理过期文件失败: {e}")
  1734. class PickupMonitoringSystem:
  1735. """
  1736. 拾音器监控系统
  1737. 管理FFmpeg进程和监控线程
  1738. """
  1739. def __init__(self, db_path=None):
  1740. """
  1741. 初始化监控系统
  1742. 参数:
  1743. db_path: SQLite 数据库路径(为 None 时使用默认路径 config/pickup_config.db)
  1744. """
  1745. self.db_path = db_path
  1746. # 初始化 ConfigManager
  1747. actual_db = Path(db_path) if db_path else get_db_path()
  1748. if not actual_db.exists():
  1749. raise FileNotFoundError(
  1750. f"\u914d\u7f6e\u6570\u636e\u5e93\u4e0d\u5b58\u5728: {actual_db}\n"
  1751. f"\u8bf7\u5148\u8fd0\u884c\u8fc1\u79fb\u811a\u672c: python tool/migrate_yaml_to_db.py"
  1752. )
  1753. self.config_manager = ConfigManager(str(actual_db))
  1754. print(f"\u914d\u7f6e\u6e90: SQLite ({actual_db})")
  1755. self.config = self._load_config()
  1756. # 冷启动模式标记
  1757. self.cold_start_mode = False
  1758. # 初始化多模型预测器(支持每个设备独立模型)
  1759. self.multi_predictor = MultiModelPredictor()
  1760. self.predictor = None # 兼容性保留,已废弃
  1761. # 从配置中注册所有设备的模型目录映射
  1762. print("\n正在初始化多模型预测器...")
  1763. for plant in self.config.get('plants', []):
  1764. if not plant.get('enabled', False):
  1765. continue
  1766. for stream in plant.get('rtsp_streams', []):
  1767. device_code = stream.get('device_code', '')
  1768. model_subdir = stream.get('model_subdir', device_code)
  1769. if device_code and model_subdir:
  1770. self.multi_predictor.register_device(device_code, model_subdir)
  1771. print(f" 注册设备: {device_code} -> models/{model_subdir}/")
  1772. print(f"已注册 {len(self.multi_predictor.registered_devices)} 个设备模型映射")
  1773. # 进程和监控器列表
  1774. self.ffmpeg_processes = []
  1775. self.monitors = []
  1776. # 信号处理
  1777. signal.signal(signal.SIGINT, self._signal_handler)
  1778. signal.signal(signal.SIGTERM, self._signal_handler)
  1779. def _load_config(self):
  1780. """
  1781. 从 SQLite DB 加载配置
  1782. 返回:
  1783. Dict: 配置字典
  1784. """
  1785. config = self.config_manager.get_full_config()
  1786. logger.info("配置已从 SQLite 加载")
  1787. return config
  1788. def _parse_rtsp_streams(self):
  1789. """
  1790. 解析配置文件中的RTSP流信息
  1791. 返回:
  1792. List[RTSPStreamConfig]: RTSP流配置列表
  1793. """
  1794. streams = []
  1795. plants = self.config.get('plants', [])
  1796. if not plants:
  1797. raise ValueError("配置文件中未找到水厂配置")
  1798. for plant in plants:
  1799. plant_name = plant.get('name')
  1800. if not plant_name:
  1801. print("警告: 跳过未命名的区域配置")
  1802. continue
  1803. # 检查是否启用该水厂(默认启用以兼容旧配置)
  1804. if not plant.get('enabled', True):
  1805. logger.debug(f"跳过禁用的水厂: {plant_name}")
  1806. continue
  1807. # 获取流量PLC配置
  1808. flow_plc = plant.get('flow_plc', {})
  1809. # 获取该水厂的project_id(每个plant有自己的project_id)
  1810. project_id = plant.get('project_id', 92)
  1811. logger.info(f"加载区域配置: {plant_name} | project_id={project_id}")
  1812. rtsp_streams = plant.get('rtsp_streams', [])
  1813. for stream in rtsp_streams:
  1814. url = stream.get('url')
  1815. channel = stream.get('channel')
  1816. camera_name = stream.get('name', '')
  1817. device_code = stream.get('device_code', '')
  1818. pump_name = stream.get('pump_name', '')
  1819. if not url or channel is None:
  1820. print(f"警告: 跳过不完整的RTSP流配置 (区域: {plant_name})")
  1821. continue
  1822. streams.append(RTSPStreamConfig(
  1823. plant_name=plant_name,
  1824. rtsp_url=url,
  1825. channel=channel,
  1826. camera_name=camera_name,
  1827. device_code=device_code,
  1828. pump_name=pump_name,
  1829. flow_plc=flow_plc,
  1830. project_id=project_id
  1831. ))
  1832. return streams
  1833. def start(self):
  1834. """
  1835. 启动监控系统
  1836. """
  1837. print("=" * 70)
  1838. print("拾音器异响检测系统")
  1839. print("=" * 70)
  1840. # 解析流配置
  1841. streams = self._parse_rtsp_streams()
  1842. print(f"\n共配置 {len(streams)} 个拾音设备:")
  1843. for stream in streams:
  1844. print(f" - {stream.device_code} | {stream.camera_name}")
  1845. # 启动FFmpeg进程
  1846. print("\n启动FFmpeg进程...")
  1847. for stream in streams:
  1848. ffmpeg = FFmpegProcess(stream, CFG.AUDIO_DIR, self.config)
  1849. if ffmpeg.start():
  1850. self.ffmpeg_processes.append(ffmpeg)
  1851. else:
  1852. print(f"警告: FFmpeg启动失败,跳过该流: {stream}")
  1853. if not self.ffmpeg_processes:
  1854. print("\n错误: 所有FFmpeg进程均启动失败")
  1855. sys.exit(1)
  1856. print(f"\n成功启动 {len(self.ffmpeg_processes)}/{len(streams)} 个FFmpeg进程")
  1857. # 收集所有流配置(用于PickupMonitor)
  1858. all_stream_configs = [p.stream_config for p in self.ffmpeg_processes]
  1859. # 启动监控线程(统一一个监控器)
  1860. print("\n启动监控线程...")
  1861. check_interval = self.config.get('prediction', {}).get('check_interval', 1.0)
  1862. monitor = PickupMonitor(
  1863. audio_dir=CFG.AUDIO_DIR,
  1864. multi_predictor=self.multi_predictor,
  1865. stream_configs=all_stream_configs,
  1866. check_interval=check_interval,
  1867. config=self.config,
  1868. config_manager=self.config_manager
  1869. )
  1870. monitor.start()
  1871. self.monitors.append(monitor)
  1872. print(f"\n成功启动监控线程")
  1873. print("\n" + "=" * 70)
  1874. print("系统已启动,开始监控...")
  1875. print("按 Ctrl+C 停止系统")
  1876. print("=" * 70 + "\n")
  1877. # FFmpeg重启配置
  1878. max_restart_attempts = 5 # 单个进程最大重启次数
  1879. restart_interval_base = 30 # 基础重启间隔(秒)-> 改为30s
  1880. restart_counts = {id(p): 0 for p in self.ffmpeg_processes} # 重启计数
  1881. # 主循环(带自动重启)
  1882. try:
  1883. while True:
  1884. # 检查每个FFmpeg进程状态
  1885. for ffmpeg in self.ffmpeg_processes:
  1886. if not ffmpeg.is_running():
  1887. pid = id(ffmpeg)
  1888. device_code = ffmpeg.stream_config.device_code
  1889. # 检查重启次数
  1890. if restart_counts.get(pid, 0) < max_restart_attempts:
  1891. # 计算等待时间(指数退避)
  1892. wait_time = restart_interval_base * (2 ** restart_counts.get(pid, 0))
  1893. logger.warning(f"FFmpeg进程停止: {device_code} | 将在{wait_time}秒后重启 | "
  1894. f"重试次数: {restart_counts.get(pid, 0) + 1}/{max_restart_attempts}")
  1895. time.sleep(wait_time)
  1896. # 尝试重启
  1897. if ffmpeg.start():
  1898. logger.debug(f"FFmpeg重启成功: {device_code}")
  1899. restart_counts[pid] = 0 # 重置计数
  1900. else:
  1901. restart_counts[pid] = restart_counts.get(pid, 0) + 1
  1902. logger.error(f"FFmpeg重启失败: {device_code}")
  1903. else:
  1904. logger.error(f"FFmpeg达到最大重启次数: {device_code} | 已放弃重启")
  1905. # 检查是否所有进程都已放弃
  1906. running_count = sum(1 for p in self.ffmpeg_processes if p.is_running())
  1907. all_abandoned = all(restart_counts.get(id(p), 0) >= max_restart_attempts
  1908. for p in self.ffmpeg_processes if not p.is_running())
  1909. if running_count == 0 and all_abandoned:
  1910. print("\n错误: 所有FFmpeg进程均已停止且无法重启")
  1911. break
  1912. # 每天0点执行7天清理
  1913. current_hour = datetime.now().hour
  1914. current_date = datetime.now().strftime("%Y%m%d")
  1915. if not hasattr(self, '_last_cleanup_date'):
  1916. self._last_cleanup_date = ""
  1917. # 在0点且今天还没清理过时执行
  1918. if current_hour == 0 and self._last_cleanup_date != current_date:
  1919. logger.info("执行7天过期文件清理...")
  1920. for monitor in self.monitors:
  1921. monitor._cleanup_old_files(days=7)
  1922. self._last_cleanup_date = current_date
  1923. # 定期打印RTSP状态(每分钟一次)
  1924. if not hasattr(self, '_last_status_log'):
  1925. self._last_status_log = datetime.now()
  1926. if (datetime.now() - self._last_status_log).total_seconds() >= 60:
  1927. running = sum(1 for p in self.ffmpeg_processes if p.is_running())
  1928. total = len(self.ffmpeg_processes)
  1929. logger.info(f"RTSP状态: {running}/{total} 个FFmpeg进程运行中")
  1930. logger.info("─" * 60)
  1931. self._last_status_log = datetime.now()
  1932. time.sleep(10)
  1933. except KeyboardInterrupt:
  1934. print("\n\n收到停止信号,正在关闭系统...")
  1935. finally:
  1936. self.stop()
  1937. def stop(self):
  1938. """
  1939. 停止监控系统
  1940. """
  1941. print("\n正在停止监控系统...")
  1942. print("停止监控线程...")
  1943. for monitor in self.monitors:
  1944. monitor.stop()
  1945. print("停止FFmpeg进程...")
  1946. for ffmpeg in self.ffmpeg_processes:
  1947. ffmpeg.stop()
  1948. print("系统已完全停止")
  1949. def _signal_handler(self, signum, frame):
  1950. """
  1951. 信号处理函数
  1952. """
  1953. print(f"\n\n收到信号 {signum},正在关闭系统...")
  1954. self.stop()
  1955. sys.exit(0)
  1956. def _start_config_api_server(config_manager, multi_predictor=None, port=18080):
  1957. # 在后台线程中启动 FastAPI 配置管理 API
  1958. try:
  1959. import uvicorn
  1960. init_config_api(config_manager, multi_predictor)
  1961. logger.info(f"启动配置管理 API: http://0.0.0.0:{port}")
  1962. uvicorn.run(config_app, host="0.0.0.0", port=port, log_level="warning")
  1963. except ImportError:
  1964. logger.warning("uvicorn 未安装,配置管理 API 无法启动。请安装: pip install uvicorn")
  1965. except Exception as e:
  1966. logger.error(f"配置管理 API 启动失败: {e}")
  1967. def main():
  1968. """
  1969. 主函数
  1970. """
  1971. # 检测 DB 是否存在
  1972. db_path = get_db_path(Path(__file__).parent / "config")
  1973. if not db_path.exists():
  1974. print(f"错误: 配置数据库不存在: {db_path}")
  1975. print(f"\n请先运行迁移脚本: python tool/migrate_yaml_to_db.py")
  1976. sys.exit(1)
  1977. try:
  1978. system = PickupMonitoringSystem()
  1979. # 后台线程启动配置管理 API
  1980. api_port = 18080
  1981. api_thread = threading.Thread(
  1982. target=_start_config_api_server,
  1983. args=(system.config_manager, system.multi_predictor, api_port),
  1984. daemon=True,
  1985. name="config-api"
  1986. )
  1987. api_thread.start()
  1988. system.start()
  1989. except FileNotFoundError as e:
  1990. print(f"\n错误: {e}")
  1991. print("\n请确保:")
  1992. print("1. 已完成训练并计算阈值")
  1993. print("2. 已复制必要的模型文件到 models/ 目录")
  1994. sys.exit(1)
  1995. except Exception as e:
  1996. print(f"\n严重错误: {e}")
  1997. import traceback
  1998. traceback.print_exc()
  1999. sys.exit(1)
  2000. if __name__ == "__main__":
  2001. main()