import sqlite3 import json import logging from pathlib import Path logger = logging.getLogger(__name__) # 数据库文件默认名(与 rtsp_config.yaml 同级) DEFAULT_DB_NAME = "pickup_config.db" def get_db_path(config_dir=None): # 默认放在 config/ 目录下 if config_dir is None: config_dir = Path(__file__).parent return Path(config_dir) / DEFAULT_DB_NAME def get_connection(db_path=None): # 获取 SQLite 连接,启用 WAL 模式以支持并发读写 if db_path is None: db_path = get_db_path() conn = sqlite3.connect(str(db_path), timeout=10) conn.row_factory = sqlite3.Row # WAL 模式:允许读写并发,避免锁阻塞 conn.execute("PRAGMA journal_mode=WAL") # 启用外键约束 conn.execute("PRAGMA foreign_keys=ON") return conn def init_db(db_path=None): # 初始化数据库表结构(幂等操作,已有表则跳过) conn = get_connection(db_path) try: conn.executescript(SCHEMA_SQL) conn.commit() logger.info(f"数据库初始化完成: {db_path or get_db_path()}") finally: conn.close() # 完整的表结构定义 SCHEMA_SQL = """ -- 水厂表:每个水厂一条记录 CREATE TABLE IF NOT EXISTS plant ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, -- 水厂名称(如"锡山中荷") enabled INTEGER NOT NULL DEFAULT 0, -- 是否启用(0=禁用, 1=启用) project_id INTEGER NOT NULL, -- 项目ID(用于数据上报) push_url TEXT NOT NULL DEFAULT '', -- 异常推送接口URL created_at TEXT DEFAULT (datetime('now','localtime')), updated_at TEXT DEFAULT (datetime('now','localtime')) ); -- RTSP 流表:每个拾音器/摄像头一条记录 CREATE TABLE IF NOT EXISTS rtsp_stream ( id INTEGER PRIMARY KEY AUTOINCREMENT, plant_id INTEGER NOT NULL, -- 所属水厂 name TEXT NOT NULL DEFAULT '', -- 摄像头/拾音器名称(用于告警显示) url TEXT NOT NULL, -- RTSP 流地址 channel INTEGER NOT NULL, -- 通道号 device_code TEXT NOT NULL, -- 设备编码(唯一标识,用于文件命名) pump_name TEXT NOT NULL DEFAULT '', -- 关联泵名称(用于流量/状态查询) model_subdir TEXT DEFAULT '', -- 模型子目录(默认使用 device_code) enabled INTEGER NOT NULL DEFAULT 1, -- 是否启用该流 created_at TEXT DEFAULT (datetime('now','localtime')), updated_at TEXT DEFAULT (datetime('now','localtime')), FOREIGN KEY (plant_id) REFERENCES plant(id) ON DELETE CASCADE, UNIQUE(device_code) -- device_code 全局唯一 ); -- 流量 PLC 映射表:泵名 -> PLC 地址 CREATE TABLE IF NOT EXISTS flow_plc ( id INTEGER PRIMARY KEY AUTOINCREMENT, plant_id INTEGER NOT NULL, pump_name TEXT NOT NULL, -- 泵名称/标识(如"A", "高压泵1流量") plc_address TEXT NOT NULL, -- PLC 数据点位地址 FOREIGN KEY (plant_id) REFERENCES plant(id) ON DELETE CASCADE, UNIQUE(plant_id, pump_name) ); -- 泵状态 PLC 点位表:用于检测泵启停 CREATE TABLE IF NOT EXISTS pump_status_plc ( id INTEGER PRIMARY KEY AUTOINCREMENT, plant_id INTEGER NOT NULL, pump_name TEXT NOT NULL, -- 拾音器关联的泵名称(对应 rtsp_stream.pump_name) point TEXT NOT NULL, -- PLC 运行状态点位地址 point_name TEXT NOT NULL DEFAULT '', -- 点位显示名称(如"1#RO高压泵") FOREIGN KEY (plant_id) REFERENCES plant(id) ON DELETE CASCADE ); -- 系统级配置表:KV 存储,存放 audio/prediction/push_notification/scada_api/human_detection -- 通过 section + key 组合定位配置项 CREATE TABLE IF NOT EXISTS system_config ( id INTEGER PRIMARY KEY AUTOINCREMENT, section TEXT NOT NULL, -- 配置区域(如 audio, prediction, push_notification) key TEXT NOT NULL, -- 配置键(支持点号嵌套,如 voting.window_size) value TEXT NOT NULL DEFAULT '', -- 配置值(JSON 序列化存储) value_type TEXT NOT NULL DEFAULT 'str', -- 值类型标识(str/int/float/bool/json) description TEXT DEFAULT '', -- 配置项说明 updated_at TEXT DEFAULT (datetime('now','localtime')), UNIQUE(section, key) ); -- 更新时间触发器:plant 表 CREATE TRIGGER IF NOT EXISTS plant_updated_at AFTER UPDATE ON plant BEGIN UPDATE plant SET updated_at = datetime('now','localtime') WHERE id = NEW.id; END; -- 更新时间触发器:rtsp_stream 表 CREATE TRIGGER IF NOT EXISTS stream_updated_at AFTER UPDATE ON rtsp_stream BEGIN UPDATE rtsp_stream SET updated_at = datetime('now','localtime') WHERE id = NEW.id; END; -- 更新时间触发器:system_config 表 CREATE TRIGGER IF NOT EXISTS config_updated_at AFTER UPDATE ON system_config BEGIN UPDATE system_config SET updated_at = datetime('now','localtime') WHERE id = NEW.id; END; """