소스 검색

更新启动逻辑、优化算法

wmy 4 일 전
부모
커밋
7bc2c07c88

+ 28 - 12
auto_training/data_cleanup.py

@@ -50,9 +50,10 @@ class DataCleaner:
             return yaml.safe_load(f)
     
     def cleanup_old_normal_audio(self):
-        """清理过期的正常音频"""
+        """清理过期的正常音频(含 pump_transition 和 verified_normal)"""
         keep_days = self.config['auto_training']['data']['keep_normal_days']
         cutoff_date = (datetime.now() - timedelta(days=keep_days)).strftime('%Y%m%d')
+        cutoff_ts = (datetime.now() - timedelta(days=keep_days)).timestamp()
         
         logger.info(f"清理 {cutoff_date} 之前的正常音频(保留{keep_days}天)")
         
@@ -62,26 +63,41 @@ class DataCleaner:
         if not self.audio_root.exists():
             return
         
-        # deploy_pickup使用设备目录结构: audio/{device_code}/{date}/*.wav
         for device_dir in self.audio_root.iterdir():
             if not device_dir.is_dir():
                 continue
             
-            for date_dir in device_dir.iterdir():
-                # current: 正在写入的目录; verified_normal: 核查确认的正常音频(增训用)
-                if not date_dir.is_dir() or date_dir.name in ("current", "verified_normal"):
+            for sub_dir in device_dir.iterdir():
+                if not sub_dir.is_dir():
                     continue
                 
-                # 检查日期
-                if date_dir.name < cutoff_date:
-                    if date_dir.exists():
-                        # rglob 递归统计所有子目录(normal/ + pump_transition/)中的音频
-                        for f in date_dir.rglob("*.wav"):
+                # current: 正在写入的目录,由主进程负责清理
+                if sub_dir.name == "current":
+                    continue
+                
+                # pump_transition / verified_normal:平铺目录,按 mtime 清理
+                # 这两个目录没有日期子结构,只根据文件修改时间判断过期
+                if sub_dir.name in ("pump_transition", "verified_normal"):
+                    for f in sub_dir.glob("*.wav"):
+                        try:
+                            st = f.stat()
+                            if st.st_mtime < cutoff_ts:
+                                total_size += st.st_size
+                                f.unlink()
+                                total_deleted += 1
+                        except Exception:
+                            pass
+                    continue
+                
+                # 日期目录:按目录名比较
+                if sub_dir.name < cutoff_date:
+                    if sub_dir.exists():
+                        for f in sub_dir.rglob("*.wav"):
                             total_size += f.stat().st_size
                             total_deleted += 1
                         
-                        shutil.rmtree(date_dir)
-                        logger.info(f"已删除: {device_dir.name}/{date_dir.name}")
+                        shutil.rmtree(sub_dir)
+                        logger.info(f"已删除: {device_dir.name}/{sub_dir.name}")
         
         logger.info(f"正常音频清理完成: 删除 {total_deleted} 个文件, 释放 {total_size / 1e6:.2f} MB")
     

+ 1 - 1
config/auto_training.yaml

@@ -25,7 +25,7 @@ auto_training:
     learning_rate: 0.0001       # 学习率
     batch_size: 32              # 批大小(降低显存占用)
     early_stop_patience: 5      # 早停耐心值:连续N轮loss无改善则停止
-    training_device: cpu           # 训练设备选择:auto(自动检测GPU显存)/cpu/cuda
+    training_device: auto           # 训练设备选择:auto(自动检测GPU显存)/cpu/cuda
                                     # 低配服务器推荐 cpu,模型小(~192KB) CPU训练30epoch耗时可接受
     min_gpu_mem_mb: 512          # auto模式下,GPU空闲显存低于此值(MB)时回退CPU
     

BIN
config/pickup_config.db


+ 5 - 4
config/yaml_backup/rtsp_config_longting.yaml

@@ -91,10 +91,11 @@ push_notification:
   enabled: false                     # 总开关(false = 不推送任何消息)
   alert_enabled: false               # false = 只推心跳不推告警
   push_base_urls:
-    - label: "外网"
-      url: "http://120.55.44.4:8900/api/v1/dumu/push-msg"
-    - label: "内网"
-      url: "http://192.168.60.8:8900/api/v1/dumu/push-msg"
+    production:                        # 正式环境推送地址(mode 1/2/4 使用)
+      - "http://120.55.44.4:8900/api/v1/dumu/push-msg"
+      - "http://192.168.60.8:8900/api/v1/dumu/push-msg"
+    test:                              # 测试环境推送地址(mode 1/2/4 使用,mode 4 仅用此组)
+      - "http://47.96.12.136:8788/api/v1/dumu/push-msg"
   timeout: 30
   retry_count: 2
   cooldown_minutes: 15

+ 6 - 1
config/yaml_backup/rtsp_config_xishan.yaml

@@ -136,8 +136,13 @@ scada_api:
   enabled: true                      # 是否启用 PLC 查询(false 时用音频能量判断启停)
   base_url: http://120.55.44.4:8900/api/v1/jinke-cloud/db/device/history-data    # 历史数据接口
   realtime_url: http://120.55.44.4:8900/api/v1/jinke-cloud/device/current-data  # 实时数据接口
-  jwt_token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJJRCI6NywiVXNlcm5hbWUiOiJhZG1pbiIsIkRlcCI6IjEzNSIsImV4cCI6MTc3NjExOTExNCwiaXNzIjoiZ2luLWJsb2cifQ.0HTtzHZjyd2mHo8VCy8icYROxmntRMuQhyoZsAYRL_M
+  jwt_token: ""                              # 静态 JWT(优先使用自动登录,留空即可)
   timeout: 10                        # 查询超时(秒)
+  login_url: http://120.55.44.4:8900/api/v2/user/login
+  login_username: admin
+  login_password: JK20200508
+  login_type: account
+  login_dep_id: "135"
 
 # ----------------------------------------------------------
 # 人体检测抑制(有人在设备旁时抑制告警,减少误报)

+ 77 - 32
core/alert_aggregator.py

@@ -5,9 +5,10 @@ alert_aggregator.py
 -------------------
 报警聚合器 - 跨设备聚合抑制 + 分类型冷却
 
-两项核心功能:
+核心功能:
 1. 跨设备聚合抑制:同一水厂 N 分钟内 >=M 个设备同时报警 -> 全部抑制(环境噪声)
 2. 分类型冷却时间:同类型异常 24 小时冷却,不同类型异常 1 小时冷却
+3. 调试模式感知:mode_id=4 时使用短冷却并跳过聚合窗口,方便现场测试
 """
 
 import logging
@@ -25,7 +26,9 @@ class AlertAggregator:
                  window_seconds=300,
                  min_devices=2,
                  cooldown_same_type_hours=24,
-                 cooldown_diff_type_hours=1):
+                 cooldown_diff_type_hours=1,
+                 mode_provider=None,
+                 debug_cooldown_minutes=5):
         # 聚合窗口到期后,符合条件的报警通过此回调实际推送
         self.push_callback = push_callback
 
@@ -42,6 +45,12 @@ class AlertAggregator:
         # 同一设备不同类型异常的冷却时间(小时),允许较快报警新类型
         self.cooldown_diff_type_hours = cooldown_diff_type_hours
 
+        # 项目模式感知:回调返回当前 mode_id(1=日常, 2=参观, 3=检修, 4=调试)
+        # mode_id=4 时使用短冷却并跳过聚合窗口
+        self._mode_provider = mode_provider
+        # 调试模式下同类型异常的冷却时间(分钟),远小于正常的24h
+        self._debug_cooldown_minutes = debug_cooldown_minutes
+
         # 聚合窗口状态
         # key: plant_name
         # value: {"start_time": datetime, "alerts": [alert_info_dict, ...]}
@@ -56,9 +65,19 @@ class AlertAggregator:
             f"报警聚合器已初始化 | "
             f"聚合={'启用' if aggregate_enabled else '禁用'} "
             f"窗口={window_seconds}秒 最小设备数={min_devices} | "
-            f"冷却: 同类型={cooldown_same_type_hours}h 不同类型={cooldown_diff_type_hours}h"
+            f"冷却: 同类型={cooldown_same_type_hours}h 不同类型={cooldown_diff_type_hours}h | "
+            f"调试冷却={debug_cooldown_minutes}分钟"
         )
 
+    def _is_debug_mode(self):
+        # 判断当前是否为调试模式(mode_id=4)
+        if self._mode_provider is None:
+            return False
+        try:
+            return self._mode_provider() == 4
+        except Exception:
+            return False
+
     def check_cooldown(self, device_code, anomaly_type_code):
         # 检查该设备是否在冷却期内
         # 返回 True 表示冷却中(应抑制),False 表示可以报警
@@ -69,32 +88,52 @@ class AlertAggregator:
             # 该设备从未报过警,不在冷却期
             return False
 
+        # 调试模式使用分钟级短冷却,加速现场测试迭代
+        is_debug = self._is_debug_mode()
+
         if anomaly_type_code in records:
-            # 同类型异常:检查 24 小时冷却
             last_time = records[anomaly_type_code]
-            elapsed_hours = (now - last_time).total_seconds() / 3600
-            if elapsed_hours < self.cooldown_same_type_hours:
-                remaining = self.cooldown_same_type_hours - elapsed_hours
-                logger.info(
-                    f"同类型冷却中: {device_code} | "
-                    f"类型={anomaly_type_code} | "
-                    f"剩余 {remaining:.1f} 小时"
-                )
-                return True
-        else:
-            # 不同类型异常:检查最近一次任意类型报警的 1 小时冷却
-            # 取所有类型中最近的一次报警时间
-            last_any = max(records.values()) if records else None
-            if last_any:
-                elapsed_hours = (now - last_any).total_seconds() / 3600
-                if elapsed_hours < self.cooldown_diff_type_hours:
-                    remaining = self.cooldown_diff_type_hours - elapsed_hours
+            elapsed_seconds = (now - last_time).total_seconds()
+
+            if is_debug:
+                # 调试模式:同类型异常使用分钟级冷却
+                cooldown_seconds = self._debug_cooldown_minutes * 60
+                if elapsed_seconds < cooldown_seconds:
+                    remaining_min = (cooldown_seconds - elapsed_seconds) / 60
                     logger.info(
-                        f"不同类型冷却中: {device_code} | "
-                        f"新类型={anomaly_type_code} | "
+                        f"[调试模式] 同类型冷却中: {device_code} | "
+                        f"类型={anomaly_type_code} | "
+                        f"剩余 {remaining_min:.1f} 分钟"
+                    )
+                    return True
+            else:
+                # 正常模式:同类型异常 24 小时冷却
+                elapsed_hours = elapsed_seconds / 3600
+                if elapsed_hours < self.cooldown_same_type_hours:
+                    remaining = self.cooldown_same_type_hours - elapsed_hours
+                    logger.info(
+                        f"同类型冷却中: {device_code} | "
+                        f"类型={anomaly_type_code} | "
                         f"剩余 {remaining:.1f} 小时"
                     )
                     return True
+        else:
+            if is_debug:
+                # 调试模式:不同类型异常零冷却,切换场景时立即可报
+                pass
+            else:
+                # 正常模式:不同类型异常 1 小时冷却
+                last_any = max(records.values()) if records else None
+                if last_any:
+                    elapsed_hours = (now - last_any).total_seconds() / 3600
+                    if elapsed_hours < self.cooldown_diff_type_hours:
+                        remaining = self.cooldown_diff_type_hours - elapsed_hours
+                        logger.info(
+                            f"不同类型冷却中: {device_code} | "
+                            f"新类型={anomaly_type_code} | "
+                            f"剩余 {remaining:.1f} 小时"
+                        )
+                        return True
 
         return False
 
@@ -117,6 +156,13 @@ class AlertAggregator:
             logger.info(f"报警被冷却抑制: {device_code} | 类型={anomaly_type_code}")
             return
 
+        # 调试模式(mode_id=4):跳过聚合窗口,单次异常立即推送
+        # 调试目的是验证报警链路,不需要等待多设备比对
+        if self._is_debug_mode():
+            logger.info(f"[调试模式] 直接推送: {device_code} | 类型={anomaly_type_code}")
+            self._do_push(device_code, anomaly_type_code, push_kwargs)
+            return
+
         if not self.aggregate_enabled:
             # 聚合禁用时直接推送
             self._do_push(device_code, anomaly_type_code, push_kwargs)
@@ -169,15 +215,14 @@ class AlertAggregator:
                     f"{device_count}个设备同时报警,判定为环境噪声 | "
                     f"设备: {', '.join(device_names)}"
                 )
-            elif device_count == 1:
-                # 仅单个设备报警 -> 正常推送
-                alert = alerts[0]
-                self._do_push(
-                    alert["device_code"],
-                    alert["anomaly_type_code"],
-                    alert["push_kwargs"]
-                )
-            # device_count == 0 理论上不会出现,忽略
+            else:
+                # 设备数不足 min_devices -> 逐个推送(正常报警)
+                for alert in alerts:
+                    self._do_push(
+                        alert["device_code"],
+                        alert["anomaly_type_code"],
+                        alert["push_kwargs"]
+                    )
 
     def _do_push(self, device_code, anomaly_type_code, push_kwargs):
         # 实际执行推送,并记录冷却时间

+ 160 - 58
core/pump_state_monitor.py

@@ -60,10 +60,14 @@ for audio_file in get_audio_files():
 """
 
 
+import json
 import requests
 import logging
+import threading
 from datetime import datetime, timedelta
+from typing import List, Optional
 from collections import defaultdict
+from urllib import request as urllib_request, error as urllib_error
 
 
 logger = logging.getLogger(__name__)
@@ -79,23 +83,38 @@ class PumpStateMonitor:
         - 判断是否处于启停过渡期
     """
     
-    def __init__(self, scada_url, scada_jwt, project_id, 
-                 timeout=10, transition_window_minutes=15):
+    def __init__(self, scada_url, scada_jwt, project_id,
+                 timeout=10, transition_window_minutes=15,
+                 login_url="", login_username="", login_password="",
+                 login_type="account", login_dep_id=""):
         """
         初始化泵状态监控器
-        
+
         参数:
             scada_url: SCADA API地址
-            scada_jwt: JWT认证Token
+            scada_jwt: JWT认证Token(静态,可为空)
             project_id: 项目ID (用于API查询)
             timeout: 请求超时秒数
             transition_window_minutes: 启停过渡期窗口(默认 15分钟)
+            login_url: 登录接口地址(启用后自动获取 JWT)
+            login_username: 登录用户名
+            login_password: 登录密码
+            login_type: 登录类型
+            login_dep_id: 部门ID
         """
         self.scada_url = scada_url
-        self.scada_jwt = scada_jwt
+        self.scada_jwt = str(scada_jwt or "").strip()
         self.project_id = project_id
         self.timeout = timeout
         self.transition_window_minutes = transition_window_minutes
+
+        # 自动登录配置
+        self.login_url = str(login_url or "").strip()
+        self.login_username = str(login_username or "").strip()
+        self.login_password = str(login_password or "")
+        self.login_type = str(login_type or "account")
+        self.login_dep_id = str(login_dep_id or "")
+        self._auth_lock = threading.Lock()
         
         # 状态缓存: {pump_id: True/False}
         self.current_states = {}
@@ -107,33 +126,97 @@ class PumpStateMonitor:
         # 上次查询时间(避免频繁查询)
         self.last_query_time = {}
         self.min_query_interval_seconds = 30  # 30 秒查询一次
+
+    # ------------------------------------------------------------------ #
+    #                          Token 管理                                 #
+    # ------------------------------------------------------------------ #
+
+    def _can_auto_login(self) -> bool:
+        return bool(self.login_url and self.login_username and self.login_password)
+
+    @staticmethod
+    def _extract_token(payload: dict) -> str:
+        if not isinstance(payload, dict):
+            return ""
+        candidates: List[object] = []
+        data = payload.get("data")
+        if isinstance(data, dict):
+            candidates.extend([
+                data.get("token"), data.get("jwt"), data.get("jwtToken"),
+                data.get("accessToken"), data.get("access_token"),
+            ])
+        elif isinstance(data, str):
+            candidates.append(data)
+        candidates.extend([
+            payload.get("token"), payload.get("jwt"), payload.get("jwtToken"),
+            payload.get("accessToken"), payload.get("access_token"),
+        ])
+        for value in candidates:
+            if isinstance(value, str) and value.strip():
+                return value.strip()
+        return ""
+
+    def _login_and_get_jwt(self) -> bool:
+        if not self._can_auto_login():
+            return False
+        body = json.dumps({
+            "UserName": self.login_username,
+            "Password": self.login_password,
+            "type": self.login_type,
+            "DepId": self.login_dep_id,
+        }).encode("utf-8")
+        req = urllib_request.Request(
+            self.login_url, data=body,
+            headers={"Content-Type": "application/json"}, method="POST",
+        )
+        try:
+            with urllib_request.urlopen(req, timeout=self.timeout) as resp:
+                data = json.loads(resp.read().decode("utf-8"))
+                token = self._extract_token(data)
+                if token:
+                    self.scada_jwt = token
+                    logger.info("SCADA 登录成功,JWT 已刷新")
+                    return True
+                logger.warning("SCADA 登录成功但响应内无 JWT 字段")
+        except (urllib_error.URLError, TimeoutError, json.JSONDecodeError, Exception) as e:
+            logger.warning("SCADA 登录失败: %s", e)
+        return False
+
+    def _ensure_jwt(self) -> bool:
+        if self.scada_jwt:
+            return True
+        with self._auth_lock:
+            if self.scada_jwt:
+                return True
+            return self._login_and_get_jwt()
+
+    def _clear_jwt(self) -> None:
+        with self._auth_lock:
+            self.scada_jwt = ""
     
     def query_pump_status(self, point, pump_name=""):
         """
         查询单个泵的运行状态(使用实时数据接口)
-        
+
         使用 current-data 接口直接获取最新一条数据,无需时间窗口查询。
-        
+        支持自动登录获取 JWT,401/403 时自动刷新重试。
+
         参数:
             point: 点位标识, 如 "C.M.RO1_GYB@run"
             pump_name: 泵名称, 用于日志
-        
+
         返回:
             (is_running, last_change_time):
                 is_running: True=运行中, False=停机
                 last_change_time: 最后一次状态变化时间 (来自 htime 字段)
         """
-        headers = {
-            "Content-Type": "application/json",
-            "JWT-TOKEN": self.scada_jwt
-        }
-        
+
         # 当前时间戳(毫秒)
         now_ms = int(datetime.now().timestamp() * 1000)
-        
+
         # 请求参数
         params = {"time": now_ms}
-        
+
         # 请求体:使用实时数据接口格式
         request_body = [
             {
@@ -143,49 +226,68 @@ class PumpStateMonitor:
                 "project_id": self.project_id
             }
         ]
-        
-        try:
-            response = requests.post(
-                self.scada_url,
-                params=params,
-                json=request_body,
-                headers=headers,
-                timeout=self.timeout
-            )
-            
-            if response.status_code == 200:
-                data = response.json()
-                if data.get("code") == 200 and data.get("data"):
-                    # 获取第一条数据(实时接口只返回最新一条)
-                    latest = data["data"][0]
-                    if "val" in latest:
-                        val = int(float(latest["val"]))
-                        is_running = val == 1
-                        
-                        # 解析 htime 时间字段(实时接口返回的是北京时间字符串)
-                        htime_str = latest.get("htime", "")
-                        last_change_time = None
-                        if htime_str:
-                            try:
-                                # 直接解析,不做时区转换(按北京时间处理)
-                                last_change_time = datetime.strptime(htime_str, "%Y-%m-%d %H:%M:%S")
-                            except ValueError:
-                                logger.warning(f"无法解析 htime: {htime_str}")
-                        
-                        logger.debug(f"泵状态查询: {pump_name or point} = {'运行' if is_running else '停机'} (变化时间={htime_str})")
-                        return is_running, last_change_time
-                
-                # 接口返回成功但无数据
-                logger.warning(f"泵状态查询无数据: {pump_name or point}")
-            else:
-                logger.warning(f"泵状态查询HTTP错误: {pump_name or point} | status={response.status_code}")
-                
-        except Exception as e:
-            logger.warning(f"泵状态查询失败: {pump_name or point} | {e}")
-        
-        # 查询失败时默认返回停机状态
-        logger.warning(f"泵状态查询: {pump_name or point} | 查询失败,默认视为停机")
-        return False, None
+
+        for attempt in range(2):
+            if not self._ensure_jwt():
+                logger.warning("泵状态查询失败: JWT 不可用")
+                break
+
+            headers = {
+                "Content-Type": "application/json",
+                "JWT-TOKEN": self.scada_jwt
+            }
+
+            try:
+                response = requests.post(
+                    self.scada_url,
+                    params=params,
+                    json=request_body,
+                    headers=headers,
+                    timeout=self.timeout
+                )
+
+                if response.status_code == 200:
+                    data = response.json()
+                    code = data.get("code")
+
+                    if code in (401, 403) and self._can_auto_login() and attempt == 0:
+                        self._clear_jwt()
+                        continue
+
+                    if code == 200 and data.get("data"):
+                        latest = data["data"][0]
+                        if "val" in latest:
+                            val = int(float(latest["val"]))
+                            is_running = val == 1
+
+                            htime_str = latest.get("htime", "")
+                            last_change_time = None
+                            if htime_str:
+                                try:
+                                    last_change_time = datetime.strptime(htime_str, "%Y-%m-%d %H:%M:%S")
+                                except ValueError:
+                                    logger.warning(f"无法解析 htime: {htime_str}")
+
+                            logger.debug(f"泵状态查询: {pump_name or point} = {'运行' if is_running else '停机'} (变化时间={htime_str})")
+                            return is_running, last_change_time
+
+                    logger.warning(f"泵状态查询无数据: {pump_name or point}")
+                elif response.status_code in (401, 403) and self._can_auto_login() and attempt == 0:
+                    self._clear_jwt()
+                    if self._ensure_jwt():
+                        continue
+                    break
+                else:
+                    logger.warning(f"泵状态查询HTTP错误: {pump_name or point} | status={response.status_code}")
+
+            except Exception as e:
+                logger.warning(f"泵状态查询失败: {pump_name or point} | {e}")
+                break
+
+        logger.warning(f"泵状态查询: {pump_name or point} | 查询失败,保持上次状态")
+        # 查询失败时返回上次已知状态,避免网络抖动误判停机触发15分钟过渡期
+        last_state = self.current_states.get(pump_id, True)
+        return last_state, self.state_change_time.get(pump_id)
     
     def update_pump_state(self, pump_id, point, pump_name=""):
         """

BIN
models/LT-2/ae_model.pth


BIN
models/LT-2/global_scale.npy


BIN
models/LT-5/ae_model.pth


BIN
models/LT-5/global_scale.npy


+ 1 - 1
predictor/config.py

@@ -114,7 +114,7 @@ class DeployConfig:
     THRESHOLD_QUANTILE = 0.95
     
     # 异常patch比例阈值
-    ANOMALY_RATIO_THRESHOLD = 0.1
+    ANOMALY_RATIO_THRESHOLD = 0.25
 
 
 # 全局配置实例

+ 1 - 0
requirements.txt

@@ -24,6 +24,7 @@ requests>=2.28.0          # SCADA API / 推送接口调用
 fastapi>=0.100.0          # 配置管理 RESTful 接口
 uvicorn>=0.22.0           # ASGI 服务器
 pydantic>=2.0             # 数据验证
+python-multipart>=0.0.6   # FastAPI 文件上传(云端服务必需)
 
 # 训练 / 迁移工具依赖
 pyyaml>=6.0               # YAML 解析(迁移脚本 + auto_training)

파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
+ 520 - 225
run_pickup_monitor.py


+ 9 - 16
run_with_auto_training.py

@@ -252,25 +252,18 @@ class IntegratedSystem:
         self.deploy_root = Path(__file__).parent
 
         # =========================================================================
-        # 配置加载来源:自动检测
-        #   优先使用 YAML(config/rtsp_config.yaml 存在时)
-        #   否则使用 SQLite 数据库
+        # 配置加载来源:统一使用 config.loader
         # =========================================================================
-        yaml_path = self.deploy_root / "config" / "rtsp_config.yaml"
-
-        if yaml_path.exists():
-            import yaml
-            with open(yaml_path, 'r', encoding='utf-8') as f:
-                full_config = yaml.safe_load(f)
-            self.full_yaml_config = full_config
+        from config.loader import load_project_config
+        
+        full_config, source, cm = load_project_config()
+        self.full_yaml_config = full_config if source == 'yaml' else None
+        
+        if source == 'yaml':
             self.auto_config = {'auto_training': full_config.get('auto_training', {})}
-            logger.info(f"已从 YAML ({yaml_path.name}) 加载配置")
         else:
-            self.full_yaml_config = None
-            mgr = ConfigManager()
-            self.auto_config = {'auto_training': mgr.get_system_config('auto_training')}
-            mgr.close()
-            logger.info(f"已从数据库加载 auto_training 配置 ({len(self.auto_config.get('auto_training', {}))} 项)")
+            self.auto_config = {'auto_training': cm.get_system_config('auto_training')}
+            cm.close()
 
         # 运行时对象
         self.scheduler = None

+ 93 - 3
start.sh

@@ -182,6 +182,10 @@ start_service() {
         spawn_pid_watcher "$PID"
         echo "服务启动成功, PID: $PID"
         echo "日志文件: logs/system.log"
+
+        # 自动启动辅助进程(根据配置开关决定)
+        start_auxiliary_workers
+
         echo ""
         echo "查看日志: tail -f logs/system.log"
         echo "停止服务: ./start.sh stop"
@@ -193,18 +197,77 @@ start_service() {
     fi
 }
 
+# ========================================
+# 函数:启动辅助进程(上传Worker + 模型同步)
+# ========================================
+UPLOAD_PID_FILE="logs/upload_worker.pid"
+MODEL_SYNC_PID_FILE="logs/model_sync.pid"
+
+start_auxiliary_workers() {
+    # 读取 YAML 配置中的开关,决定是否启动辅助进程
+    local yaml_file="config/rtsp_config.yaml"
+    if [ ! -f "$yaml_file" ]; then
+        return 0
+    fi
+
+    # 检查 cloud_sync.enabled
+    if grep -A1 'cloud_sync:' "$yaml_file" | grep -q 'enabled: [Tt]rue'; then
+        if [ -f "run_upload_worker.py" ]; then
+            nohup nice -n 19 python run_upload_worker.py > /dev/null 2>&1 &
+            echo $! > "$UPLOAD_PID_FILE"
+            echo "上传Worker已启动, PID: $!"
+        fi
+    fi
+
+    # 检查 model_sync.enabled
+    if grep -A1 'model_sync:' "$yaml_file" | grep -q 'enabled: [Tt]rue'; then
+        if [ -f "run_model_sync.py" ]; then
+            nohup nice -n 19 python run_model_sync.py > /dev/null 2>&1 &
+            echo $! > "$MODEL_SYNC_PID_FILE"
+            echo "模型同步已启动, PID: $!"
+        fi
+    fi
+}
+
+stop_auxiliary_workers() {
+    # 停止上传Worker
+    if [ -f "$UPLOAD_PID_FILE" ]; then
+        local upid
+        upid=$(cat "$UPLOAD_PID_FILE" 2>/dev/null)
+        if [ -n "$upid" ] && ps -p "$upid" > /dev/null 2>&1; then
+            kill "$upid" 2>/dev/null
+            echo "上传Worker已停止, PID: $upid"
+        fi
+        rm -f "$UPLOAD_PID_FILE"
+    fi
+
+    # 停止模型同步
+    if [ -f "$MODEL_SYNC_PID_FILE" ]; then
+        local mpid
+        mpid=$(cat "$MODEL_SYNC_PID_FILE" 2>/dev/null)
+        if [ -n "$mpid" ] && ps -p "$mpid" > /dev/null 2>&1; then
+            kill "$mpid" 2>/dev/null
+            echo "模型同步已停止, PID: $mpid"
+        fi
+        rm -f "$MODEL_SYNC_PID_FILE"
+    fi
+}
+
 # ========================================
 # 函数:停止服务
 # ========================================
 stop_service() {
+    # 先停辅助进程
+    stop_auxiliary_workers
+
     if ! is_running; then
-        echo "服务未运行"
+        echo "服务未运行"
         cleanup_pid_file_if_matches ""
         return 0
     fi
     
     PID=$(get_pid)
-    echo "正在停止服务, PID: $PID"
+    echo "正在停止服务, PID: $PID"
     
     # 发送 SIGTERM 信号,优雅停止
     kill "$PID" 2>/dev/null
@@ -264,12 +327,39 @@ show_status() {
         echo "------------------------------------------"
         tail -10 logs/system.log 2>/dev/null || echo "(无日志)"
     else
-        echo "状态: 未运行"
+        echo "主服务状态: 未运行"
         if [ -f "$PID_FILE" ]; then
             echo "注意: PID文件存在但进程已停止,可能是异常退出"
             cleanup_pid_file_if_matches ""
         fi
     fi
+
+    # 辅助进程状态
+    echo ""
+    echo "辅助进程:"
+    if [ -f "$UPLOAD_PID_FILE" ]; then
+        local upid
+        upid=$(cat "$UPLOAD_PID_FILE" 2>/dev/null)
+        if [ -n "$upid" ] && ps -p "$upid" > /dev/null 2>&1; then
+            echo "  上传Worker:  运行中 (PID: $upid)"
+        else
+            echo "  上传Worker:  已停止"
+        fi
+    else
+        echo "  上传Worker:  未启动"
+    fi
+
+    if [ -f "$MODEL_SYNC_PID_FILE" ]; then
+        local mpid
+        mpid=$(cat "$MODEL_SYNC_PID_FILE" 2>/dev/null)
+        if [ -n "$mpid" ] && ps -p "$mpid" > /dev/null 2>&1; then
+            echo "  模型同步:    运行中 (PID: $mpid)"
+        else
+            echo "  模型同步:    已停止"
+        fi
+    else
+        echo "  模型同步:    未启动"
+    fi
 }
 
 # ========================================

+ 7 - 11
tool/migrate_yaml_to_db.py

@@ -30,14 +30,13 @@ def _clear_all_tables(db_path):
         conn.close()
 
 
-def migrate_yaml_to_db(yaml_path: str, db_path: str = None, force: bool = False):
+def migrate_yaml_to_db(yaml_path: str, db_path: str = None):
     """
     将 rtsp_config.yaml 中的全部数据迁移到 SQLite 数据库
 
     Args:
         yaml_path: YAML 配置文件路径
         db_path: 数据库路径(默认 config/pickup_config.db)
-        force: 是否清空现有数据后重建(幂等操作)
     """
     yaml_file = Path(yaml_path)
     if not yaml_file.exists():
@@ -52,11 +51,10 @@ def migrate_yaml_to_db(yaml_path: str, db_path: str = None, force: bool = False)
     # 解析实际 DB 路径(用于 _clear_all_tables)
     actual_db_path = Path(db_path) if db_path else get_db_path()
 
-    # --force 模式:先清空所有数据再导入
-    if force:
-        if actual_db_path.exists():
-            _clear_all_tables(actual_db_path)
-        logger.info("--force 模式:清空后重建")
+    # 默认每次都清空旧数据再导入,保证是最干净的状态
+    if actual_db_path.exists():
+        _clear_all_tables(actual_db_path)
+        logger.info("已清空旧数据库数据,准备重新导入")
 
     # 初始化 ConfigManager(自动创建表结构)
     mgr = ConfigManager(db_path)
@@ -169,14 +167,12 @@ def migrate_yaml_to_db(yaml_path: str, db_path: str = None, force: bool = False)
 if __name__ == '__main__':
     import argparse
 
-    parser = argparse.ArgumentParser(description='YAML 配置迁移至 SQLite')
+    parser = argparse.ArgumentParser(description='YAML 配置迁移至 SQLite (默认每次都会先清空旧数据)')
     parser.add_argument('--yaml', type=str,
                         default=str(PROJECT_ROOT / 'config' / 'rtsp_config.yaml'),
                         help='YAML 配置文件路径')
     parser.add_argument('--db', type=str, default=None,
                         help='SQLite 数据库路径(默认: config/pickup_config.db)')
-    parser.add_argument('--force', action='store_true',
-                        help='清空现有数据后重建(幂等操作),适合重复导入同一份 YAML')
     args = parser.parse_args()
 
-    migrate_yaml_to_db(args.yaml, args.db, force=args.force)
+    migrate_yaml_to_db(args.yaml, args.db)

이 변경점에서 너무 많은 파일들이 변경되어 몇몇 파일들은 표시되지 않았습니다.