migrate_yaml_to_db.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. import yaml
  5. import logging
  6. from pathlib import Path
  7. # 将项目根目录加入 sys.path
  8. PROJECT_ROOT = Path(__file__).resolve().parent.parent
  9. sys.path.insert(0, str(PROJECT_ROOT))
  10. from config.config_manager import ConfigManager
  11. from config.db_models import get_db_path, get_connection
  12. logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
  13. logger = logging.getLogger(__name__)
  14. def _clear_all_tables(db_path):
  15. # 清空所有业务表数据,保留表结构
  16. conn = get_connection(db_path)
  17. try:
  18. # 按外键依赖顺序删除(先子表后父表)
  19. tables = ['pump_status_plc', 'flow_plc', 'rtsp_stream', 'system_config', 'plant']
  20. for table in tables:
  21. conn.execute(f"DELETE FROM {table}")
  22. conn.commit()
  23. logger.info("已清空所有配置数据")
  24. finally:
  25. conn.close()
  26. def migrate_yaml_to_db(yaml_path: str, db_path: str = None, force: bool = False):
  27. """
  28. 将 rtsp_config.yaml 中的全部数据迁移到 SQLite 数据库
  29. Args:
  30. yaml_path: YAML 配置文件路径
  31. db_path: 数据库路径(默认 config/pickup_config.db)
  32. force: 是否清空现有数据后重建(幂等操作)
  33. """
  34. yaml_file = Path(yaml_path)
  35. if not yaml_file.exists():
  36. logger.error(f"YAML 文件不存在: {yaml_file}")
  37. sys.exit(1)
  38. with open(yaml_file, 'r', encoding='utf-8') as f:
  39. config = yaml.safe_load(f)
  40. logger.info(f"已读取 YAML: {yaml_file}")
  41. # 解析实际 DB 路径(用于 _clear_all_tables)
  42. actual_db_path = Path(db_path) if db_path else get_db_path()
  43. # --force 模式:先清空所有数据再导入
  44. if force:
  45. if actual_db_path.exists():
  46. _clear_all_tables(actual_db_path)
  47. logger.info("--force 模式:清空后重建")
  48. # 初始化 ConfigManager(自动创建表结构)
  49. mgr = ConfigManager(db_path)
  50. # ========================================
  51. # 1. 迁移水厂和关联数据
  52. # ========================================
  53. plants = config.get('plants', [])
  54. logger.info(f"开始迁移 {len(plants)} 个水厂配置...")
  55. for plant_data in plants:
  56. plant_name = plant_data.get('name', '')
  57. if not plant_name:
  58. logger.warning("跳过无名称的水厂配置")
  59. continue
  60. plant_id = mgr.create_plant(
  61. name=plant_name,
  62. project_id=plant_data.get('project_id', 0),
  63. push_url=plant_data.get('push_url', ''),
  64. enabled=plant_data.get('enabled', False)
  65. )
  66. logger.info(f" 水厂: {plant_name} (id={plant_id})")
  67. # 流量 PLC 映射
  68. flow_plc = plant_data.get('flow_plc', {})
  69. if flow_plc:
  70. for pump_name, plc_address in flow_plc.items():
  71. mgr.set_flow_plc(plant_id, pump_name, plc_address)
  72. logger.info(f" 流量PLC: {len(flow_plc)} 条")
  73. # 泵状态 PLC 点位
  74. pump_status_plc = plant_data.get('pump_status_plc', {})
  75. if pump_status_plc:
  76. total_points = 0
  77. for pump_name, points in pump_status_plc.items():
  78. for point_data in points:
  79. mgr.add_pump_status_plc(
  80. plant_id,
  81. pump_name,
  82. point_data.get('point', ''),
  83. point_data.get('name', '')
  84. )
  85. total_points += 1
  86. logger.info(f" 泵状态PLC: {total_points} 条")
  87. # RTSP 流
  88. streams = plant_data.get('rtsp_streams', [])
  89. for stream in streams:
  90. url = stream.get('url', '')
  91. if not url:
  92. logger.warning(f" 跳过空URL的流: {stream.get('name', '未知')}")
  93. continue
  94. mgr.create_stream(
  95. plant_id=plant_id,
  96. name=stream.get('name', ''),
  97. url=url,
  98. channel=stream.get('channel', 0),
  99. device_code=stream.get('device_code', ''),
  100. pump_name=stream.get('pump_name', ''),
  101. model_subdir=stream.get('model_subdir', ''),
  102. enabled=True
  103. )
  104. logger.info(f" RTSP流: {len(streams)} 条")
  105. # ========================================
  106. # 2. 迁移系统级配置
  107. # ========================================
  108. system_sections = ['audio', 'prediction', 'push_notification', 'scada_api', 'human_detection']
  109. for section in system_sections:
  110. section_data = config.get(section, {})
  111. if section_data:
  112. mgr.update_section_config(section, section_data)
  113. flat = mgr._flatten_dict(section_data)
  114. logger.info(f"系统配置 [{section}]: {len(flat)} 项")
  115. # ========================================
  116. # 3. 验证迁移结果
  117. # ========================================
  118. logger.info("\n" + "=" * 60)
  119. logger.info("迁移完成,开始验证...")
  120. restored = mgr.get_full_config()
  121. orig_plants = [p for p in config.get('plants', [])]
  122. db_plants = restored.get('plants', [])
  123. logger.info(f"水厂数量: YAML={len(orig_plants)}, DB={len(db_plants)}")
  124. for section in system_sections:
  125. orig_flat = mgr._flatten_dict(config.get(section, {}))
  126. db_flat = mgr._flatten_dict(restored.get(section, {}))
  127. match = len(orig_flat) == len(db_flat)
  128. status = "OK" if match else "FAIL"
  129. logger.info(f"配置 [{section}]: YAML={len(orig_flat)}项, DB={len(db_flat)}项 {status}")
  130. for i, (orig_p, db_p) in enumerate(zip(orig_plants, db_plants)):
  131. orig_streams = len(orig_p.get('rtsp_streams', []))
  132. db_streams = len(db_p.get('rtsp_streams', []))
  133. name = orig_p.get('name', f'水厂{i}')
  134. status = "OK" if orig_streams == db_streams else "FAIL"
  135. logger.info(f" {name}: RTSP流 YAML={orig_streams}, DB={db_streams} {status}")
  136. logger.info("=" * 60)
  137. logger.info("迁移验证完成")
  138. mgr.close()
  139. if __name__ == '__main__':
  140. import argparse
  141. parser = argparse.ArgumentParser(description='YAML 配置迁移至 SQLite')
  142. parser.add_argument('--yaml', type=str,
  143. default=str(PROJECT_ROOT / 'config' / 'rtsp_config.yaml'),
  144. help='YAML 配置文件路径')
  145. parser.add_argument('--db', type=str, default=None,
  146. help='SQLite 数据库路径(默认: config/pickup_config.db)')
  147. parser.add_argument('--force', action='store_true',
  148. help='清空现有数据后重建(幂等操作),适合重复导入同一份 YAML')
  149. args = parser.parse_args()
  150. migrate_yaml_to_db(args.yaml, args.db, force=args.force)