migrate_yaml_to_db.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. import yaml
  5. import logging
  6. from pathlib import Path
  7. # 将项目根目录加入 sys.path
  8. PROJECT_ROOT = Path(__file__).resolve().parent.parent
  9. sys.path.insert(0, str(PROJECT_ROOT))
  10. from config.config_manager import ConfigManager
  11. from config.db_models import get_db_path, get_connection
  12. logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
  13. logger = logging.getLogger(__name__)
  14. def _clear_all_tables(db_path):
  15. # 清空所有业务表数据,保留表结构
  16. conn = get_connection(db_path)
  17. try:
  18. # 按外键依赖顺序删除(先子表后父表)
  19. tables = ['pump_status_plc', 'flow_plc', 'rtsp_stream', 'system_config', 'plant']
  20. for table in tables:
  21. conn.execute(f"DELETE FROM {table}")
  22. conn.commit()
  23. logger.info("已清空所有配置数据")
  24. finally:
  25. conn.close()
  26. def migrate_yaml_to_db(yaml_path: str, db_path: str = None):
  27. """
  28. 将 rtsp_config.yaml 中的全部数据迁移到 SQLite 数据库
  29. Args:
  30. yaml_path: YAML 配置文件路径
  31. db_path: 数据库路径(默认 config/pickup_config.db)
  32. """
  33. yaml_file = Path(yaml_path)
  34. if not yaml_file.exists():
  35. logger.error(f"YAML 文件不存在: {yaml_file}")
  36. sys.exit(1)
  37. with open(yaml_file, 'r', encoding='utf-8') as f:
  38. config = yaml.safe_load(f)
  39. logger.info(f"已读取 YAML: {yaml_file}")
  40. # 解析实际 DB 路径(用于 _clear_all_tables)
  41. actual_db_path = Path(db_path) if db_path else get_db_path()
  42. # 默认每次都清空旧数据再导入,保证是最干净的状态
  43. if actual_db_path.exists():
  44. _clear_all_tables(actual_db_path)
  45. logger.info("已清空旧数据库数据,准备重新导入")
  46. # 初始化 ConfigManager(自动创建表结构)
  47. mgr = ConfigManager(db_path)
  48. # ========================================
  49. # 1. 迁移水厂和关联数据
  50. # ========================================
  51. plants = config.get('plants', [])
  52. logger.info(f"开始迁移 {len(plants)} 个水厂配置...")
  53. for plant_data in plants:
  54. plant_name = plant_data.get('name', '')
  55. if not plant_name:
  56. logger.warning("跳过无名称的水厂配置")
  57. continue
  58. plant_id = mgr.create_plant(
  59. name=plant_name,
  60. project_id=plant_data.get('project_id', 0),
  61. push_url=plant_data.get('push_url', ''),
  62. enabled=plant_data.get('enabled', False)
  63. )
  64. logger.info(f" 水厂: {plant_name} (id={plant_id})")
  65. # 流量 PLC 映射
  66. flow_plc = plant_data.get('flow_plc', {})
  67. if flow_plc:
  68. for pump_name, plc_address in flow_plc.items():
  69. mgr.set_flow_plc(plant_id, pump_name, plc_address)
  70. logger.info(f" 流量PLC: {len(flow_plc)} 条")
  71. # 泵状态 PLC 点位
  72. pump_status_plc = plant_data.get('pump_status_plc', {})
  73. if pump_status_plc:
  74. total_points = 0
  75. for pump_name, points in pump_status_plc.items():
  76. for point_data in points:
  77. mgr.add_pump_status_plc(
  78. plant_id,
  79. pump_name,
  80. point_data.get('point', ''),
  81. point_data.get('name', '')
  82. )
  83. total_points += 1
  84. logger.info(f" 泵状态PLC: {total_points} 条")
  85. # RTSP 流
  86. streams = plant_data.get('rtsp_streams', [])
  87. for stream in streams:
  88. url = stream.get('url', '')
  89. if not url:
  90. logger.warning(f" 跳过空URL的流: {stream.get('name', '未知')}")
  91. continue
  92. mgr.create_stream(
  93. plant_id=plant_id,
  94. name=stream.get('name', ''),
  95. url=url,
  96. channel=stream.get('channel', 0),
  97. device_code=stream.get('device_code', ''),
  98. pump_name=stream.get('pump_name', ''),
  99. model_subdir=stream.get('model_subdir', ''),
  100. enabled=True
  101. )
  102. logger.info(f" RTSP流: {len(streams)} 条")
  103. # ========================================
  104. # 2. 迁移系统级配置
  105. # ========================================
  106. system_sections = ['audio', 'prediction', 'push_notification', 'scada_api', 'human_detection', 'auto_training']
  107. for section in system_sections:
  108. section_data = config.get(section, {})
  109. if section_data:
  110. mgr.update_section_config(section, section_data)
  111. flat = mgr._flatten_dict(section_data)
  112. logger.info(f"系统配置 [{section}]: {len(flat)} 项")
  113. # ========================================
  114. # 3. 验证迁移结果
  115. # ========================================
  116. logger.info("\n" + "=" * 60)
  117. logger.info("迁移完成,开始验证...")
  118. restored = mgr.get_full_config()
  119. orig_plants = [p for p in config.get('plants', [])]
  120. db_plants = restored.get('plants', [])
  121. logger.info(f"水厂数量: YAML={len(orig_plants)}, DB={len(db_plants)}")
  122. for section in system_sections:
  123. orig_flat = mgr._flatten_dict(config.get(section, {}))
  124. db_flat = mgr._flatten_dict(restored.get(section, {}))
  125. match = len(orig_flat) == len(db_flat)
  126. status = "OK" if match else "FAIL"
  127. logger.info(f"配置 [{section}]: YAML={len(orig_flat)}项, DB={len(db_flat)}项 {status}")
  128. for i, (orig_p, db_p) in enumerate(zip(orig_plants, db_plants)):
  129. orig_streams = len(orig_p.get('rtsp_streams', []))
  130. db_streams = len(db_p.get('rtsp_streams', []))
  131. name = orig_p.get('name', f'水厂{i}')
  132. status = "OK" if orig_streams == db_streams else "FAIL"
  133. logger.info(f" {name}: RTSP流 YAML={orig_streams}, DB={db_streams} {status}")
  134. logger.info("=" * 60)
  135. logger.info("迁移验证完成")
  136. mgr.close()
  137. if __name__ == '__main__':
  138. import argparse
  139. parser = argparse.ArgumentParser(description='YAML 配置迁移至 SQLite (默认每次都会先清空旧数据)')
  140. parser.add_argument('--yaml', type=str,
  141. default=str(PROJECT_ROOT / 'config' / 'rtsp_config.yaml'),
  142. help='YAML 配置文件路径')
  143. parser.add_argument('--db', type=str, default=None,
  144. help='SQLite 数据库路径(默认: config/pickup_config.db)')
  145. args = parser.parse_args()
  146. migrate_yaml_to_db(args.yaml, args.db)