泵异响项目

wmy f1af45dc1d 移除上报格式中预留的 upper/lower 频谱字段 3 dagar sedan
.vscode 1ba497e368 feat: 训练显存策略优化 - GPU/NPU/CPU 自动选择,OOM 回退,验证集早停,音频质量预筛,阈值漂移监控 1 månad sedan
auto_training 33c6123101 更新 1 månad sedan
config 33c6123101 更新 1 månad sedan
core 33c6123101 更新 1 månad sedan
data 24de9b11cf init: 初始化项目提交 1 månad sedan
models 33c6123101 更新 1 månad sedan
predictor 33c6123101 更新 1 månad sedan
tool 33c6123101 更新 1 månad sedan
.gitignore 24de9b11cf init: 初始化项目提交 1 månad sedan
README.md 715cf492a2 docs: 补充边缘端上传逻辑复用与移植代码指南 3 veckor sedan
requirements.txt 33c6123101 更新 1 månad sedan
run_pickup_monitor.py 24de9b11cf init: 初始化项目提交 1 månad sedan
run_pickup_monitor_old.py f1af45dc1d 移除上报格式中预留的 upper/lower 频谱字段 3 dagar sedan
run_with_auto_training.py 33c6123101 更新 1 månad sedan
start.sh 33c6123101 更新 1 månad sedan
泵异响模型瞬时故障逻辑优化.docx 1ba497e368 feat: 训练显存策略优化 - GPU/NPU/CPU 自动选择,OOM 回退,验证集早停,音频质量预筛,阈值漂移监控 1 månad sedan
瞬时异响方案.md 1ba497e368 feat: 训练显存策略优化 - GPU/NPU/CPU 自动选择,OOM 回退,验证集早停,音频质量预筛,阈值漂移监控 1 månad sedan

README.md

拾音器异响检测系统

水泵/设备异响实时检测系统。通过 RTSP 拾音器采集音频,基于 AutoEncoder 模型进行异常检测。


只需 3 步

第 1 步:配置水厂信息(只需做一次)

系统采用 双模配置设计

  • 测试/单机模式(优先读 YAML):只要 config/rtsp_config.yaml 文件存在,系统强制读取它。修改该文件并执行 ./start.sh restart 即可在一张表、所有相关进程(推理、上传、同步)中全局生效。
  • 生产/多机模式(回退读 DB):如果重命名或移走 rtsp_config.yaml,系统会自动尝试读取 config/pickup_config.db,此时支持通过 :8080/api/config 接口进行在线热更新配置。

如果你想使用 DB 模式,需要先将 YAML 导入到 DB:

# 执行迁移脚本(默认每次都会先清空 DB 里的旧配置,保证状态最新)
python tool/migrate_yaml_to_db.py --yaml config/rtsp_config.yaml

导入完毕后,记得把 YAML 删掉或者改名。

导入后可检查 DB 信息:

python -c "
import sys; sys.path.insert(0,'.')
from config.config_manager import ConfigManager
mgr = ConfigManager()
cfg = mgr.get_full_config()
for p in cfg['plants']:
    print(f\"水厂: {p['name']} (project_id={p['project_id']}, enabled={p['enabled']})\")
    for s in p.get('rtsp_streams', []):
        print(f\"  设备: {s['device_code']} | {s['name']} | model={s.get('model_subdir','')} | url={s['url'][:50]}...\")
mgr.close()
"

第 2 步:训练模型

python auto_training/standalone_train.py --data-dir /你的音频数据目录

就这一条命令。 训练结果自动保存到 models/{设备编码}/ 目录。

数据目录要求:

你的音频数据目录/
├── LT-2/          ← 子文件夹名 = 设备编码(与 DB 中 device_code 一致)
│   ├── xxx.wav
│   └── 2025-01-01/
│       └── yyy.wav
└── LT-5/
    └── ...

可选参数:

--devices LT-2 LT-5    # 只训练指定设备
--epochs 100            # 训练轮数
--lr 0.00005            # 学习率

第 3 步:启动运行

./start.sh              # 前台运行(调试用)
./start.sh -d           # 后台运行(主推理、自动模型训练等全部托管)
./start.sh stop         # 一键停止(含所有辅助进程)
./start.sh restart      # 重启
./start.sh status       # 查看所有关联进程状态

./start.sh 会自动检测配置中是否启用了边云协同(cloud_sync / model_sync),如果是,则会自动拉起数据上传 (run_upload_worker.py) 和模型同步 (run_model_sync.py) 等辅助进程,不需要手动分开管理。


模型更新(后期维护)

方式 操作
自动热加载 替换 models/{设备编码}/ 下的文件,60 秒内自动生效
API 上传 curl -X POST http://IP:8080/api/model/upload/LT-2 -F "model_file=@ae_model.pth"
API 触发重载 curl -X POST http://IP:8080/api/model/reload/LT-2
查看状态 curl http://IP:8080/api/model/status

配置管理 API

系统启动后在 :8080 自动提供,支持 Web 端实时修改配置。

接口 方法 说明
/api/config GET 获取全量配置
/api/config/plants GET/POST 水厂列表 / 创建
/api/config/plants/{id} GET/PUT/DELETE 单个水厂 CRUD
/api/config/streams GET/POST RTSP 流列表 / 创建
/api/config/streams/{id} PUT/DELETE 单个流更新 / 删除
/api/config/{section} GET/PUT 系统配置读写(audio/prediction/push 等)
/api/model/status GET 模型加载状态
/api/model/reload/{code} POST 重载指定设备模型
/api/model/reload-all POST 重载所有模型
/api/model/upload/{code} POST 上传模型文件并自动重载

可热更新(30 秒自动生效):推送开关、告警阈值、投票参数、人体检测开关等。

需重启生效:新增/删除 RTSP 流、修改采样率。


边云协同架构 (Edge-Cloud)

为支持多水厂、大规模设备的高效集约化管理,系统内置了完善的“边云协同”机制。云端服务位于 cloud_server/(基于 FastAPI)。在边缘端(一体机)开启协同后,将形成如下三进程架构(统一被 ./start.sh 所管理):

  1. 主监控进程 (run_with_auto_training.py + run_pickup_monitor.py) 负责 RTSP 拉流、异常检测及本地冷启动训练。检测到异常时,将相关上下文生成任务写入本地队列。
  2. 异步上传 Worker (run_upload_worker.py) 开关:cloud_sync.enabled = true 通过监控本地队列(data/upload_queue/),将正常的音频抽样和异常事件音频进行高压转码(转为 FLAC 格式,体积小50%)并异步推送到云端服务,彻底解耦网络延迟对主推理环的阻塞。
  3. 模型同步客户端 (run_model_sync.py) 开关:model_sync.enabled = true 定时长轮询云端的 manifest.json,发现新版本后自动下载压缩包、比对 SHA256,然后安全原子替换到边缘机,并通知主监控进程进行亚秒级的模型热重载。

    核心交互与数据流转契约

依靠标准的 HTTP/FastAPI 接口,实现边缘端与云端的解耦互通:

1. 边缘端 ⬆️ 上传数据到云端 (Data Upload API) 采用 multipart/form-data 以流式上传高压转码后的媒体文件与环境特征数据。

  • 接口:POST http://<云端IP>:8800/api/audio/upload
  • 表单数据结构:
    • file: 实际媒体文件流(压缩的 .flac 等)
    • project_id: 水厂编号(如 1450)
    • device_code: 具体设备号(如 "LT-2")
    • model_group: 所属模型分类(如 "LT_ZDB" 驻底泵)
    • media_type: 媒体大类("audio" / "image" / "video")
    • sample_type: 样本分类("normal" 正常底噪 / "anomaly" 异常告警)
    • metadata: Blackbox 维度的设备元信息 JSON (含 edge_node_id, avg_error, threshold, 告警时间等)。

2. 云端 ⬇️ 下发模型到边缘端 (Model Sync API) 被动式“边缘长轮询 Pull”拉取分发,降低云端带宽瞬时波峰。

  • 阶段一 (版本探针)
    • GET http://<云端IP>:8800/api/model/check?project_id=1450&device_codes=LT-1,LT-2&feature_version=mel_v1
    • 返回字典,如果存在更新,包含对应设备下的 download_url, 最新 version, sha256 校验和,以及专属的新检测阈值 threshold
  • 阶段二 (增量下载与热替换)
    • 边缘端凭 download_url 触发 ZIP 包下载,落盘 /tmp
    • 算力对齐与 sha256 防篡改校验通过后,自动备份老模型并原地执行安全原子级别的 os.rename 替换,主监控进程无感热重入。

💡 附录:边缘端(发送端)代码移植指北
如果其他边缘端/项目室的同事希望复用这套边云通信逻辑将数据传往统一部署的云端(如 :8800),只需向其提供以下 2 个核心代码文件即可:

  1. core/audio_uploader.py:此为底层发送客户端灵魂,包含了调用 FFmpeg 高压转密、封装 multipart/form-data、断网重试队列兜底机制及所有网络请求逻辑的示例。
  2. run_upload_worker.py:此为专职异步调度器。提供了一种解耦思路,使得主 AI 推理进程只需将报错文件扔进 pending/ 暂存区,由该 worker 专职执行发送,从而避免网络波动阻塞算力轮转。

技术架构

音频处理流水线

RTSP 拾音器 ──FFmpeg──> 8秒WAV切片 ──librosa──> Mel频谱图 ──AutoEncoder──> 重建误差 ──汇总──> 1分钟上报
                         │                    [1,1,64,504]     推理            │
                         │                                                     │
                    data/audio/{设备}/current/                    每分钟取平均 abnormal_score
  • 8 秒:模型推理的最小单元。FFmpeg 按 8 秒切片产出 WAV,每个文件对应一个 Mel 频谱图输入
  • 1 分钟:业务上报周期。汇总该分钟内所有 8 秒片段的平均重建误差作为 abnormal_score
  • 滑动窗口投票:5 次中 >= 3 次异常才判定为异常,避免单次毛刺误报

模型架构

4 层卷积自编码器(ConvAutoencoder),参数量 ~49K,权重文件 ~192KB:

组件 结构 维度变化
编码器 4×Conv2d(stride=2)+BN+ReLU [B,1,64,504] → [B,64,4,32]
解码器 4×ConvTranspose2d(stride=2)+BN+ReLU [B,64,4,32] → [B,1,64,504]

异常检测原理:正常音频重建误差低,异常音频重建误差高于阈值(3σ 法则)。

NPU 推理

系统支持在 BM1684X NPU 上执行推理(通过 sophon.sail.Engine),模型转换流程:

ae_model.pth ──torch.onnx.export──> ae_model.onnx ──TPU-MLIR──> ae_model.bmodel
# 导出 ONNX + 生成 BModel(需 TPU-MLIR 环境)
python tool/convert_to_bmodel.py --all --with-bmodel --quantize fp16

models/{设备}/ae_model.bmodel 存在且 sophon.sail 可用时,推理引擎自动切换到 NPU。

模型训练方案

系统支持三种训练模式,按项目阶段选择:

模式 命令 适用场景
本地离线训练(推荐) python auto_training/standalone_train.py --data-dir /数据目录 初期部署,水厂少,快速落地
NPU 端自训练 python run_with_auto_training.py(定时 02:00 触发) 边缘盒子 CPU 资源充足时
云端训练+下发 需额外开发数据上传和模型下发模块 规模化阶段(>3 个水厂)

训练参数config/auto_training.yaml):

参数 默认值 说明
epochs 30 训练轮数(配合早停,实际通常更少)
learning_rate 0.0001 学习率
batch_size 32 批大小
early_stop_patience 5 连续 N 轮无改善则早停
training_device auto auto/cpu/cuda,auto 自动检测 GPU 显存
min_samples 50 最少样本数,不足则跳过训练

安全机制

  • 训练前自动备份模型(保留 7 份)
  • 新旧模型误差对比,退化超 2 倍自动回滚
  • 音频质量预筛(IQR 离群值过滤)
  • 验证集早停防过拟合

详细方案对比见 docs/NPU自编码器模型训练方案.docx


多水厂部署

每个水厂独立部署一个实例,各自拥有独立的 pickup_config.db

服务器A: deploy_pickup/ + pickup_config.db(锡山) + models/(锡山设备模型)
服务器B: deploy_pickup/ + pickup_config.db(龙亭) + models/(龙亭设备模型)

项目结构

deploy_pickup/
├── run_with_auto_training.py  # 边缘端主入口(冷启动 + 监控主控)
├── run_pickup_monitor.py      # 实时监控核心逻辑(采集 + 投票 + 入队)
├── run_upload_worker.py       # 边缘端数据异步上传进程
├── run_model_sync.py          # 边缘端模型热更新同步进程
├── start.sh                   # 统一启停管控脚本(自动拉起辅助进程)
├── requirements.txt
│
├── cloud_server/              # 云端管理服务
│   ├── main.py                #   云端 FastAPI 启动入口 (:8800)
│   └── routers/               #   云端接口 (数据接收 / 模型分发)
│
├── config/                    # 配置
│   ├── loader.py              #   统一配置加载入口(双模解析核心)
│   ├── pickup_config.db       #   运行时 SQLite 配置中心
│   ├── config_manager.py      #   配置读写与热更新逻辑
│   ├── config_api.py          #   边缘端管理 API(:18080)
│   ├── db_models.py           #   数据库表与 ORM 定义
│   └── auto_training.yaml     #   训练参数(备用)
│
├── predictor/                 # 推理与模型
│   ├── model_def.py           #   核心自编码器(ConvAutoencoder)
│   ├── multi_model_predictor.py   #   多设备模型管理与无缝热加载
│   ├── config.py / datasets.py / utils.py
│
├── core/                      # 业务辅助模块
│   ├── alert_aggregator.py    #   跨设备告警聚合抑制
│   ├── ml_classifier.py       #   异常发生后的二次分类(接管原方案)
│   ├── audio_uploader.py      #   主进程录音转码入队封装
│   ├── model_sync.py          #   模型版本比对与原子替换封装
│   ├── pump_state_monitor.py  #   PLC 泵状态联动读取
│   ├── energy_baseline.py     #   设备底噪级停机判断
│   └── human_detection_reader.py  #   人形入侵检测抑制
│
├── auto_training/             # 二次学习与自动训练
│   ├── standalone_train.py    #   离线/单次手动全量训练入口
│   ├── incremental_trainer.py #   增量训练/冷启动训练核心
│   ├── verify_normal.py       #   误报打标校验工具
│   └── data_cleanup.py        #   磁盘轮转与过期数据清理
│
├── tool/                      # 运维与实施工具
│   ├── migrate_yaml_to_db.py  #   一键从 YAML 重置 SQLite 配置
│   └── convert_to_bmodel.py   #   NPU 加速用 BModel 转换工具
│
├── models/{设备编码}/          # 本地自训练产出的模型存放区
└── data/                      # 运行时音频 / 队列暂存 / 异常快照库