| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import sys
- import yaml
- import logging
- from pathlib import Path
- # 将项目根目录加入 sys.path
- PROJECT_ROOT = Path(__file__).resolve().parent.parent
- sys.path.insert(0, str(PROJECT_ROOT))
- from config.config_manager import ConfigManager
- from config.db_models import get_db_path, get_connection
- logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
- logger = logging.getLogger(__name__)
- def _clear_all_tables(db_path):
- # 清空所有业务表数据,保留表结构
- conn = get_connection(db_path)
- try:
- # 按外键依赖顺序删除(先子表后父表)
- tables = ['pump_status_plc', 'flow_plc', 'rtsp_stream', 'system_config', 'plant']
- for table in tables:
- conn.execute(f"DELETE FROM {table}")
- conn.commit()
- logger.info("已清空所有配置数据")
- finally:
- conn.close()
- def migrate_yaml_to_db(yaml_path: str, db_path: str = None, force: bool = False):
- """
- 将 rtsp_config.yaml 中的全部数据迁移到 SQLite 数据库
- Args:
- yaml_path: YAML 配置文件路径
- db_path: 数据库路径(默认 config/pickup_config.db)
- force: 是否清空现有数据后重建(幂等操作)
- """
- yaml_file = Path(yaml_path)
- if not yaml_file.exists():
- logger.error(f"YAML 文件不存在: {yaml_file}")
- sys.exit(1)
- with open(yaml_file, 'r', encoding='utf-8') as f:
- config = yaml.safe_load(f)
- logger.info(f"已读取 YAML: {yaml_file}")
- # 解析实际 DB 路径(用于 _clear_all_tables)
- actual_db_path = Path(db_path) if db_path else get_db_path()
- # --force 模式:先清空所有数据再导入
- if force:
- if actual_db_path.exists():
- _clear_all_tables(actual_db_path)
- logger.info("--force 模式:清空后重建")
- # 初始化 ConfigManager(自动创建表结构)
- mgr = ConfigManager(db_path)
- # ========================================
- # 1. 迁移水厂和关联数据
- # ========================================
- plants = config.get('plants', [])
- logger.info(f"开始迁移 {len(plants)} 个水厂配置...")
- for plant_data in plants:
- plant_name = plant_data.get('name', '')
- if not plant_name:
- logger.warning("跳过无名称的水厂配置")
- continue
- plant_id = mgr.create_plant(
- name=plant_name,
- project_id=plant_data.get('project_id', 0),
- push_url=plant_data.get('push_url', ''),
- enabled=plant_data.get('enabled', False)
- )
- logger.info(f" 水厂: {plant_name} (id={plant_id})")
- # 流量 PLC 映射
- flow_plc = plant_data.get('flow_plc', {})
- if flow_plc:
- for pump_name, plc_address in flow_plc.items():
- mgr.set_flow_plc(plant_id, pump_name, plc_address)
- logger.info(f" 流量PLC: {len(flow_plc)} 条")
- # 泵状态 PLC 点位
- pump_status_plc = plant_data.get('pump_status_plc', {})
- if pump_status_plc:
- total_points = 0
- for pump_name, points in pump_status_plc.items():
- for point_data in points:
- mgr.add_pump_status_plc(
- plant_id,
- pump_name,
- point_data.get('point', ''),
- point_data.get('name', '')
- )
- total_points += 1
- logger.info(f" 泵状态PLC: {total_points} 条")
- # RTSP 流
- streams = plant_data.get('rtsp_streams', [])
- for stream in streams:
- url = stream.get('url', '')
- if not url:
- logger.warning(f" 跳过空URL的流: {stream.get('name', '未知')}")
- continue
- mgr.create_stream(
- plant_id=plant_id,
- name=stream.get('name', ''),
- url=url,
- channel=stream.get('channel', 0),
- device_code=stream.get('device_code', ''),
- pump_name=stream.get('pump_name', ''),
- model_subdir=stream.get('model_subdir', ''),
- enabled=True
- )
- logger.info(f" RTSP流: {len(streams)} 条")
- # ========================================
- # 2. 迁移系统级配置
- # ========================================
- system_sections = ['audio', 'prediction', 'push_notification', 'scada_api', 'human_detection']
- for section in system_sections:
- section_data = config.get(section, {})
- if section_data:
- mgr.update_section_config(section, section_data)
- flat = mgr._flatten_dict(section_data)
- logger.info(f"系统配置 [{section}]: {len(flat)} 项")
- # ========================================
- # 3. 验证迁移结果
- # ========================================
- logger.info("\n" + "=" * 60)
- logger.info("迁移完成,开始验证...")
- restored = mgr.get_full_config()
- orig_plants = [p for p in config.get('plants', [])]
- db_plants = restored.get('plants', [])
- logger.info(f"水厂数量: YAML={len(orig_plants)}, DB={len(db_plants)}")
- for section in system_sections:
- orig_flat = mgr._flatten_dict(config.get(section, {}))
- db_flat = mgr._flatten_dict(restored.get(section, {}))
- match = len(orig_flat) == len(db_flat)
- status = "OK" if match else "FAIL"
- logger.info(f"配置 [{section}]: YAML={len(orig_flat)}项, DB={len(db_flat)}项 {status}")
- for i, (orig_p, db_p) in enumerate(zip(orig_plants, db_plants)):
- orig_streams = len(orig_p.get('rtsp_streams', []))
- db_streams = len(db_p.get('rtsp_streams', []))
- name = orig_p.get('name', f'水厂{i}')
- status = "OK" if orig_streams == db_streams else "FAIL"
- logger.info(f" {name}: RTSP流 YAML={orig_streams}, DB={db_streams} {status}")
- logger.info("=" * 60)
- logger.info("迁移验证完成")
- mgr.close()
- if __name__ == '__main__':
- import argparse
- parser = argparse.ArgumentParser(description='YAML 配置迁移至 SQLite')
- parser.add_argument('--yaml', type=str,
- default=str(PROJECT_ROOT / 'config' / 'rtsp_config.yaml'),
- help='YAML 配置文件路径')
- parser.add_argument('--db', type=str, default=None,
- help='SQLite 数据库路径(默认: config/pickup_config.db)')
- parser.add_argument('--force', action='store_true',
- help='清空现有数据后重建(幂等操作),适合重复导入同一份 YAML')
- args = parser.parse_args()
- migrate_yaml_to_db(args.yaml, args.db, force=args.force)
|