|
|
před 3 dny | |
|---|---|---|
| .vscode | před 1 měsícem | |
| auto_training | před 1 měsícem | |
| config | před 1 měsícem | |
| core | před 1 měsícem | |
| data | před 1 měsícem | |
| models | před 1 měsícem | |
| predictor | před 1 měsícem | |
| tool | před 1 měsícem | |
| .gitignore | před 1 měsícem | |
| README.md | před 3 týdny | |
| requirements.txt | před 1 měsícem | |
| run_pickup_monitor.py | před 1 měsícem | |
| run_pickup_monitor_old.py | před 3 dny | |
| run_with_auto_training.py | před 1 měsícem | |
| start.sh | před 1 měsícem | |
| 泵异响模型瞬时故障逻辑优化.docx | před 1 měsícem | |
| 瞬时异响方案.md | před 1 měsícem |
水泵/设备异响实时检测系统。通过 RTSP 拾音器采集音频,基于 AutoEncoder 模型进行异常检测。
系统采用 双模配置设计:
config/rtsp_config.yaml 文件存在,系统强制读取它。修改该文件并执行 ./start.sh restart 即可在一张表、所有相关进程(推理、上传、同步)中全局生效。rtsp_config.yaml,系统会自动尝试读取 config/pickup_config.db,此时支持通过 :8080/api/config 接口进行在线热更新配置。如果你想使用 DB 模式,需要先将 YAML 导入到 DB:
# 执行迁移脚本(默认每次都会先清空 DB 里的旧配置,保证状态最新)
python tool/migrate_yaml_to_db.py --yaml config/rtsp_config.yaml
导入完毕后,记得把 YAML 删掉或者改名。
导入后可检查 DB 信息:
python -c "
import sys; sys.path.insert(0,'.')
from config.config_manager import ConfigManager
mgr = ConfigManager()
cfg = mgr.get_full_config()
for p in cfg['plants']:
print(f\"水厂: {p['name']} (project_id={p['project_id']}, enabled={p['enabled']})\")
for s in p.get('rtsp_streams', []):
print(f\" 设备: {s['device_code']} | {s['name']} | model={s.get('model_subdir','')} | url={s['url'][:50]}...\")
mgr.close()
"
python auto_training/standalone_train.py --data-dir /你的音频数据目录
就这一条命令。 训练结果自动保存到 models/{设备编码}/ 目录。
数据目录要求:
你的音频数据目录/
├── LT-2/ ← 子文件夹名 = 设备编码(与 DB 中 device_code 一致)
│ ├── xxx.wav
│ └── 2025-01-01/
│ └── yyy.wav
└── LT-5/
└── ...
可选参数:
--devices LT-2 LT-5 # 只训练指定设备
--epochs 100 # 训练轮数
--lr 0.00005 # 学习率
./start.sh # 前台运行(调试用)
./start.sh -d # 后台运行(主推理、自动模型训练等全部托管)
./start.sh stop # 一键停止(含所有辅助进程)
./start.sh restart # 重启
./start.sh status # 查看所有关联进程状态
注:
./start.sh会自动检测配置中是否启用了边云协同(cloud_sync/model_sync),如果是,则会自动拉起数据上传 (run_upload_worker.py) 和模型同步 (run_model_sync.py) 等辅助进程,不需要手动分开管理。
| 方式 | 操作 |
|---|---|
| 自动热加载 | 替换 models/{设备编码}/ 下的文件,60 秒内自动生效 |
| API 上传 | curl -X POST http://IP:8080/api/model/upload/LT-2 -F "model_file=@ae_model.pth" |
| API 触发重载 | curl -X POST http://IP:8080/api/model/reload/LT-2 |
| 查看状态 | curl http://IP:8080/api/model/status |
系统启动后在 :8080 自动提供,支持 Web 端实时修改配置。
| 接口 | 方法 | 说明 |
|---|---|---|
/api/config |
GET | 获取全量配置 |
/api/config/plants |
GET/POST | 水厂列表 / 创建 |
/api/config/plants/{id} |
GET/PUT/DELETE | 单个水厂 CRUD |
/api/config/streams |
GET/POST | RTSP 流列表 / 创建 |
/api/config/streams/{id} |
PUT/DELETE | 单个流更新 / 删除 |
/api/config/{section} |
GET/PUT | 系统配置读写(audio/prediction/push 等) |
/api/model/status |
GET | 模型加载状态 |
/api/model/reload/{code} |
POST | 重载指定设备模型 |
/api/model/reload-all |
POST | 重载所有模型 |
/api/model/upload/{code} |
POST | 上传模型文件并自动重载 |
可热更新(30 秒自动生效):推送开关、告警阈值、投票参数、人体检测开关等。
需重启生效:新增/删除 RTSP 流、修改采样率。
为支持多水厂、大规模设备的高效集约化管理,系统内置了完善的“边云协同”机制。云端服务位于 cloud_server/(基于 FastAPI)。在边缘端(一体机)开启协同后,将形成如下三进程架构(统一被 ./start.sh 所管理):
run_with_auto_training.py + run_pickup_monitor.py)
负责 RTSP 拉流、异常检测及本地冷启动训练。检测到异常时,将相关上下文生成任务写入本地队列。run_upload_worker.py)
开关:cloud_sync.enabled = true
通过监控本地队列(data/upload_queue/),将正常的音频抽样和异常事件音频进行高压转码(转为 FLAC 格式,体积小50%)并异步推送到云端服务,彻底解耦网络延迟对主推理环的阻塞。模型同步客户端 (run_model_sync.py)
开关:model_sync.enabled = true
定时长轮询云端的 manifest.json,发现新版本后自动下载压缩包、比对 SHA256,然后安全原子替换到边缘机,并通知主监控进程进行亚秒级的模型热重载。
依靠标准的 HTTP/FastAPI 接口,实现边缘端与云端的解耦互通:
1. 边缘端 ⬆️ 上传数据到云端 (Data Upload API)
采用 multipart/form-data 以流式上传高压转码后的媒体文件与环境特征数据。
POST http://<云端IP>:8800/api/audio/uploadfile: 实际媒体文件流(压缩的 .flac 等)project_id: 水厂编号(如 1450)device_code: 具体设备号(如 "LT-2")model_group: 所属模型分类(如 "LT_ZDB" 驻底泵)media_type: 媒体大类("audio" / "image" / "video")sample_type: 样本分类("normal" 正常底噪 / "anomaly" 异常告警)metadata: Blackbox 维度的设备元信息 JSON (含 edge_node_id, avg_error, threshold, 告警时间等)。2. 云端 ⬇️ 下发模型到边缘端 (Model Sync API) 被动式“边缘长轮询 Pull”拉取分发,降低云端带宽瞬时波峰。
GET http://<云端IP>:8800/api/model/check?project_id=1450&device_codes=LT-1,LT-2&feature_version=mel_v1download_url, 最新 version, sha256 校验和,以及专属的新检测阈值 threshold。download_url 触发 ZIP 包下载,落盘 /tmp。sha256 防篡改校验通过后,自动备份老模型并原地执行安全原子级别的 os.rename 替换,主监控进程无感热重入。💡 附录:边缘端(发送端)代码移植指北
如果其他边缘端/项目室的同事希望复用这套边云通信逻辑将数据传往统一部署的云端(如:8800),只需向其提供以下 2 个核心代码文件即可:
core/audio_uploader.py:此为底层发送客户端灵魂,包含了调用 FFmpeg 高压转密、封装multipart/form-data、断网重试队列兜底机制及所有网络请求逻辑的示例。run_upload_worker.py:此为专职异步调度器。提供了一种解耦思路,使得主 AI 推理进程只需将报错文件扔进pending/暂存区,由该 worker 专职执行发送,从而避免网络波动阻塞算力轮转。
RTSP 拾音器 ──FFmpeg──> 8秒WAV切片 ──librosa──> Mel频谱图 ──AutoEncoder──> 重建误差 ──汇总──> 1分钟上报
│ [1,1,64,504] 推理 │
│ │
data/audio/{设备}/current/ 每分钟取平均 abnormal_score
abnormal_score4 层卷积自编码器(ConvAutoencoder),参数量 ~49K,权重文件 ~192KB:
| 组件 | 结构 | 维度变化 |
|---|---|---|
| 编码器 | 4×Conv2d(stride=2)+BN+ReLU | [B,1,64,504] → [B,64,4,32] |
| 解码器 | 4×ConvTranspose2d(stride=2)+BN+ReLU | [B,64,4,32] → [B,1,64,504] |
异常检测原理:正常音频重建误差低,异常音频重建误差高于阈值(3σ 法则)。
系统支持在 BM1684X NPU 上执行推理(通过 sophon.sail.Engine),模型转换流程:
ae_model.pth ──torch.onnx.export──> ae_model.onnx ──TPU-MLIR──> ae_model.bmodel
# 导出 ONNX + 生成 BModel(需 TPU-MLIR 环境)
python tool/convert_to_bmodel.py --all --with-bmodel --quantize fp16
当 models/{设备}/ae_model.bmodel 存在且 sophon.sail 可用时,推理引擎自动切换到 NPU。
系统支持三种训练模式,按项目阶段选择:
| 模式 | 命令 | 适用场景 |
|---|---|---|
| 本地离线训练(推荐) | python auto_training/standalone_train.py --data-dir /数据目录 |
初期部署,水厂少,快速落地 |
| NPU 端自训练 | python run_with_auto_training.py(定时 02:00 触发) |
边缘盒子 CPU 资源充足时 |
| 云端训练+下发 | 需额外开发数据上传和模型下发模块 | 规模化阶段(>3 个水厂) |
训练参数(config/auto_training.yaml):
| 参数 | 默认值 | 说明 |
|---|---|---|
| epochs | 30 | 训练轮数(配合早停,实际通常更少) |
| learning_rate | 0.0001 | 学习率 |
| batch_size | 32 | 批大小 |
| early_stop_patience | 5 | 连续 N 轮无改善则早停 |
| training_device | auto | auto/cpu/cuda,auto 自动检测 GPU 显存 |
| min_samples | 50 | 最少样本数,不足则跳过训练 |
安全机制:
详细方案对比见
docs/NPU自编码器模型训练方案.docx
每个水厂独立部署一个实例,各自拥有独立的 pickup_config.db。
服务器A: deploy_pickup/ + pickup_config.db(锡山) + models/(锡山设备模型)
服务器B: deploy_pickup/ + pickup_config.db(龙亭) + models/(龙亭设备模型)
deploy_pickup/
├── run_with_auto_training.py # 边缘端主入口(冷启动 + 监控主控)
├── run_pickup_monitor.py # 实时监控核心逻辑(采集 + 投票 + 入队)
├── run_upload_worker.py # 边缘端数据异步上传进程
├── run_model_sync.py # 边缘端模型热更新同步进程
├── start.sh # 统一启停管控脚本(自动拉起辅助进程)
├── requirements.txt
│
├── cloud_server/ # 云端管理服务
│ ├── main.py # 云端 FastAPI 启动入口 (:8800)
│ └── routers/ # 云端接口 (数据接收 / 模型分发)
│
├── config/ # 配置
│ ├── loader.py # 统一配置加载入口(双模解析核心)
│ ├── pickup_config.db # 运行时 SQLite 配置中心
│ ├── config_manager.py # 配置读写与热更新逻辑
│ ├── config_api.py # 边缘端管理 API(:18080)
│ ├── db_models.py # 数据库表与 ORM 定义
│ └── auto_training.yaml # 训练参数(备用)
│
├── predictor/ # 推理与模型
│ ├── model_def.py # 核心自编码器(ConvAutoencoder)
│ ├── multi_model_predictor.py # 多设备模型管理与无缝热加载
│ ├── config.py / datasets.py / utils.py
│
├── core/ # 业务辅助模块
│ ├── alert_aggregator.py # 跨设备告警聚合抑制
│ ├── ml_classifier.py # 异常发生后的二次分类(接管原方案)
│ ├── audio_uploader.py # 主进程录音转码入队封装
│ ├── model_sync.py # 模型版本比对与原子替换封装
│ ├── pump_state_monitor.py # PLC 泵状态联动读取
│ ├── energy_baseline.py # 设备底噪级停机判断
│ └── human_detection_reader.py # 人形入侵检测抑制
│
├── auto_training/ # 二次学习与自动训练
│ ├── standalone_train.py # 离线/单次手动全量训练入口
│ ├── incremental_trainer.py # 增量训练/冷启动训练核心
│ ├── verify_normal.py # 误报打标校验工具
│ └── data_cleanup.py # 磁盘轮转与过期数据清理
│
├── tool/ # 运维与实施工具
│ ├── migrate_yaml_to_db.py # 一键从 YAML 重置 SQLite 配置
│ └── convert_to_bmodel.py # NPU 加速用 BModel 转换工具
│
├── models/{设备编码}/ # 本地自训练产出的模型存放区
└── data/ # 运行时音频 / 队列暂存 / 异常快照库