#!/usr/bin/env python # -*- coding: utf-8 -*- import sys import yaml import logging from pathlib import Path # 将项目根目录加入 sys.path PROJECT_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(PROJECT_ROOT)) from config.config_manager import ConfigManager from config.db_models import get_db_path, get_connection logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') logger = logging.getLogger(__name__) def _clear_all_tables(db_path): # 清空所有业务表数据,保留表结构 conn = get_connection(db_path) try: # 按外键依赖顺序删除(先子表后父表) tables = ['pump_status_plc', 'flow_plc', 'rtsp_stream', 'system_config', 'plant'] for table in tables: conn.execute(f"DELETE FROM {table}") conn.commit() logger.info("已清空所有配置数据") finally: conn.close() def migrate_yaml_to_db(yaml_path: str, db_path: str = None, force: bool = False): """ 将 rtsp_config.yaml 中的全部数据迁移到 SQLite 数据库 Args: yaml_path: YAML 配置文件路径 db_path: 数据库路径(默认 config/pickup_config.db) force: 是否清空现有数据后重建(幂等操作) """ yaml_file = Path(yaml_path) if not yaml_file.exists(): logger.error(f"YAML 文件不存在: {yaml_file}") sys.exit(1) with open(yaml_file, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) logger.info(f"已读取 YAML: {yaml_file}") # 解析实际 DB 路径(用于 _clear_all_tables) actual_db_path = Path(db_path) if db_path else get_db_path() # --force 模式:先清空所有数据再导入 if force: if actual_db_path.exists(): _clear_all_tables(actual_db_path) logger.info("--force 模式:清空后重建") # 初始化 ConfigManager(自动创建表结构) mgr = ConfigManager(db_path) # ======================================== # 1. 迁移水厂和关联数据 # ======================================== plants = config.get('plants', []) logger.info(f"开始迁移 {len(plants)} 个水厂配置...") for plant_data in plants: plant_name = plant_data.get('name', '') if not plant_name: logger.warning("跳过无名称的水厂配置") continue plant_id = mgr.create_plant( name=plant_name, project_id=plant_data.get('project_id', 0), push_url=plant_data.get('push_url', ''), enabled=plant_data.get('enabled', False) ) logger.info(f" 水厂: {plant_name} (id={plant_id})") # 流量 PLC 映射 flow_plc = plant_data.get('flow_plc', {}) if flow_plc: for pump_name, plc_address in flow_plc.items(): mgr.set_flow_plc(plant_id, pump_name, plc_address) logger.info(f" 流量PLC: {len(flow_plc)} 条") # 泵状态 PLC 点位 pump_status_plc = plant_data.get('pump_status_plc', {}) if pump_status_plc: total_points = 0 for pump_name, points in pump_status_plc.items(): for point_data in points: mgr.add_pump_status_plc( plant_id, pump_name, point_data.get('point', ''), point_data.get('name', '') ) total_points += 1 logger.info(f" 泵状态PLC: {total_points} 条") # RTSP 流 streams = plant_data.get('rtsp_streams', []) for stream in streams: url = stream.get('url', '') if not url: logger.warning(f" 跳过空URL的流: {stream.get('name', '未知')}") continue mgr.create_stream( plant_id=plant_id, name=stream.get('name', ''), url=url, channel=stream.get('channel', 0), device_code=stream.get('device_code', ''), pump_name=stream.get('pump_name', ''), model_subdir=stream.get('model_subdir', ''), enabled=True ) logger.info(f" RTSP流: {len(streams)} 条") # ======================================== # 2. 迁移系统级配置 # ======================================== system_sections = ['audio', 'prediction', 'push_notification', 'scada_api', 'human_detection'] for section in system_sections: section_data = config.get(section, {}) if section_data: mgr.update_section_config(section, section_data) flat = mgr._flatten_dict(section_data) logger.info(f"系统配置 [{section}]: {len(flat)} 项") # ======================================== # 3. 验证迁移结果 # ======================================== logger.info("\n" + "=" * 60) logger.info("迁移完成,开始验证...") restored = mgr.get_full_config() orig_plants = [p for p in config.get('plants', [])] db_plants = restored.get('plants', []) logger.info(f"水厂数量: YAML={len(orig_plants)}, DB={len(db_plants)}") for section in system_sections: orig_flat = mgr._flatten_dict(config.get(section, {})) db_flat = mgr._flatten_dict(restored.get(section, {})) match = len(orig_flat) == len(db_flat) status = "OK" if match else "FAIL" logger.info(f"配置 [{section}]: YAML={len(orig_flat)}项, DB={len(db_flat)}项 {status}") for i, (orig_p, db_p) in enumerate(zip(orig_plants, db_plants)): orig_streams = len(orig_p.get('rtsp_streams', [])) db_streams = len(db_p.get('rtsp_streams', [])) name = orig_p.get('name', f'水厂{i}') status = "OK" if orig_streams == db_streams else "FAIL" logger.info(f" {name}: RTSP流 YAML={orig_streams}, DB={db_streams} {status}") logger.info("=" * 60) logger.info("迁移验证完成") mgr.close() if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='YAML 配置迁移至 SQLite') parser.add_argument('--yaml', type=str, default=str(PROJECT_ROOT / 'config' / 'rtsp_config.yaml'), help='YAML 配置文件路径') parser.add_argument('--db', type=str, default=None, help='SQLite 数据库路径(默认: config/pickup_config.db)') parser.add_argument('--force', action='store_true', help='清空现有数据后重建(幂等操作),适合重复导入同一份 YAML') args = parser.parse_args() migrate_yaml_to_db(args.yaml, args.db, force=args.force)