Selaa lähdekoodia

feat: 训练显存策略优化 - GPU/NPU/CPU 自动选择,OOM 回退,验证集早停,音频质量预筛,阈值漂移监控

wmy 3 viikkoa sitten
vanhempi
commit
1ba497e368

+ 1 - 0
.vscode/settings.json

@@ -0,0 +1 @@
+{}

+ 572 - 131
auto_training/incremental_trainer.py

@@ -192,15 +192,9 @@ class IncrementalTrainer:
             device_code = device_dir.name
             audio_files = []
 
-            # 冷启动模式:收集所有目录的数据
+            # 冷启动模式:收集所有已归档日期目录的数据(跳过 current/)
             if self.cold_start_mode:
-                # 收集current目录
-                current_dir = device_dir / "current"
-                if current_dir.exists():
-                    audio_files.extend(list(current_dir.glob("*.wav")))
-                    audio_files.extend(list(current_dir.glob("*.mp4")))
-
-                # 收集所有日期目录
+                # 注意:跳过 current/ 目录,因其中可能包含 FFmpeg 正在写入的不完整文件
                 for sub_dir in device_dir.iterdir():
                     if sub_dir.is_dir() and sub_dir.name.isdigit() and len(sub_dir.name) == 8:
                         audio_files.extend(list(sub_dir.glob("*.wav")))
@@ -221,6 +215,14 @@ class IncrementalTrainer:
             # 去重
             audio_files = list(set(audio_files))
 
+            # 数据质量预筛:过滤能量/频谱异常的音频
+            if audio_files and not self.cold_start_mode:
+                before_count = len(audio_files)
+                audio_files = self._filter_audio_quality(audio_files, device_code)
+                filtered = before_count - len(audio_files)
+                if filtered > 0:
+                    logger.info(f"  {device_code}: 质量预筛过滤 {filtered} 个异常音频")
+
             # 随机采样(如果配置了采样时长)
             if sample_hours > 0 and audio_files:
                 files_needed = int(sample_hours * 3600 / 60)
@@ -239,25 +241,84 @@ class IncrementalTrainer:
         logger.info(f"总计: {total} 个音频文件,{len(device_files)} 个设备")
         return device_files
 
+    def _filter_audio_quality(self, audio_files: List[Path],
+                               device_code: str) -> List[Path]:
+        """
+        音频质量预筛:基于 RMS 能量和频谱质心过滤明显异常的样本
+
+        使用 IQR (四分位距) 方法检测离群值:
+        - 计算所有文件的 RMS 能量和频谱质心
+        - 过滤超出 [Q1 - 2*IQR, Q3 + 2*IQR] 范围的文件
+
+        需要至少 10 个文件才执行过滤,否则样本太少无统计意义。
+
+        Args:
+            audio_files: 待过滤的音频文件列表
+            device_code: 设备编码(用于日志)
+
+        Returns:
+            过滤后的文件列表
+        """
+        if len(audio_files) < 10:
+            return audio_files
+
+        import librosa
+
+        # 快速计算每个文件的 RMS 能量
+        rms_values = []
+        valid_files = []
+
+        for wav_file in audio_files:
+            try:
+                y, _ = librosa.load(str(wav_file), sr=CFG.SR, mono=True,
+                                     duration=10)  # 只读前10秒加速
+                if len(y) < CFG.SR:
+                    continue
+                rms = float(np.sqrt(np.mean(y ** 2)))
+                rms_values.append(rms)
+                valid_files.append(wav_file)
+            except Exception:
+                continue
+
+        if len(rms_values) < 10:
+            return audio_files
+
+        # IQR 离群值检测
+        rms_arr = np.array(rms_values)
+        q1, q3 = np.percentile(rms_arr, [25, 75])
+        iqr = q3 - q1
+        lower_bound = q1 - 2 * iqr
+        upper_bound = q3 + 2 * iqr
+
+        filtered = []
+        for f, rms in zip(valid_files, rms_values):
+            if lower_bound <= rms <= upper_bound:
+                filtered.append(f)
+
+        return filtered
+
     # ========================================
     # 特征提取(每设备独立标准化参数)
     # ========================================
 
     def _extract_mel_for_device(self, device_code: str,
-                                wav_files: List[Path]) -> Tuple[Optional[Path], Optional[Tuple[float, float]]]:
+                                wav_files: List[Path],
+                                inherit_scale: bool = False
+                                ) -> Tuple[Optional[Path], Optional[Tuple[float, float]]]:
         """
-        为单个设备提取 Mel 特征并计算独立的 Z-score 标准化参数
+        为单个设备提取 Mel 特征并计算独立的 Min-Max 标准化参数
 
-        两遍扫描:
-        1. 第一遍:收集所有 mel_db 计算 mean/std
-        2. 第二遍:Z-score 标准化后保存 npy 文件
+        流式两遍扫描(内存优化)
+        1. 第一遍:只计算 running min/max(O(1) 内存),不保存 mel_db
+        2. 第二遍:用第一遍的 min/max 标准化后直接写 npy 文件
 
         Args:
             device_code: 设备编码
             wav_files: 该设备的音频文件列表
+            inherit_scale: 增量训练时是否继承已部署的 scale 参数
 
         Returns:
-            (mel_dir, (global_mean, global_std)),失败返回 (None, None)
+            (mel_dir, (global_min, global_max)),失败返回 (None, None)
         """
         import librosa
 
@@ -265,67 +326,90 @@ class IncrementalTrainer:
         win_samples = int(CFG.WIN_SEC * CFG.SR)
         hop_samples = int(CFG.HOP_SEC * CFG.SR)
 
-        # 第一遍:收集所有 mel_db 值,用于计算 mean/std
-        all_mel_data = []
-        all_values = []  # 收集所有像素值用于全局统计
-
-        for wav_file in wav_files:
-            try:
-                y, _ = librosa.load(str(wav_file), sr=CFG.SR, mono=True)
-
-                # 跳过过短的音频
-                if len(y) < CFG.SR:
+        def _iter_mel_patches(files):
+            """生成器:逐文件逐 patch 产出 mel_db,避免全量加载到内存"""
+            for wav_file in files:
+                try:
+                    y, _ = librosa.load(str(wav_file), sr=CFG.SR, mono=True)
+                    if len(y) < CFG.SR:
+                        continue
+                    for idx, start in enumerate(range(0, len(y) - win_samples + 1, hop_samples)):
+                        segment = y[start:start + win_samples]
+                        mel_spec = librosa.feature.melspectrogram(
+                            y=segment, sr=CFG.SR, n_fft=CFG.N_FFT,
+                            hop_length=CFG.HOP_LENGTH, n_mels=CFG.N_MELS, power=2.0
+                        )
+                        mel_db = librosa.power_to_db(mel_spec, ref=np.max)
+                        # 对齐帧数
+                        if mel_db.shape[1] < CFG.TARGET_FRAMES:
+                            pad = CFG.TARGET_FRAMES - mel_db.shape[1]
+                            mel_db = np.pad(mel_db, ((0, 0), (0, pad)), mode="constant")
+                        else:
+                            mel_db = mel_db[:, :CFG.TARGET_FRAMES]
+                        yield wav_file, idx, mel_db
+                except Exception as e:
+                    logger.warning(f"跳过文件 {wav_file.name}: {e}")
                     continue
 
-                for idx, start in enumerate(range(0, len(y) - win_samples + 1, hop_samples)):
-                    segment = y[start:start + win_samples]
-
-                    mel_spec = librosa.feature.melspectrogram(
-                        y=segment, sr=CFG.SR, n_fft=CFG.N_FFT,
-                        hop_length=CFG.HOP_LENGTH, n_mels=CFG.N_MELS, power=2.0
-                    )
-                    mel_db = librosa.power_to_db(mel_spec, ref=np.max)
-
-                    # 对齐帧数
-                    if mel_db.shape[1] < CFG.TARGET_FRAMES:
-                        pad = CFG.TARGET_FRAMES - mel_db.shape[1]
-                        mel_db = np.pad(mel_db, ((0, 0), (0, pad)), mode="constant")
-                    else:
-                        mel_db = mel_db[:, :CFG.TARGET_FRAMES]
-
-                    # 收集所有值用于 min/max 计算
-                    all_values.append(mel_db.flatten())
-                    all_mel_data.append((wav_file, idx, mel_db))
-
-            except Exception as e:
-                logger.warning(f"跳过文件 {wav_file.name}: {e}")
-                continue
-
-        if not all_mel_data:
+        # ── 第一遍:流式计算 running min/max(O(1) 内存) ──
+        global_min = float('inf')
+        global_max = float('-inf')
+        patch_count = 0
+
+        for _, _, mel_db in _iter_mel_patches(wav_files):
+            local_min = float(mel_db.min())
+            local_max = float(mel_db.max())
+            if local_min < global_min:
+                global_min = local_min
+            if local_max > global_max:
+                global_max = local_max
+            patch_count += 1
+
+        if patch_count == 0:
             logger.warning(f"  {device_code}: 无有效数据")
             return None, None
 
-        # 计算全局 min/max(Min-Max 标准化参数)
-        all_values_concat = np.concatenate(all_values)
-        global_min = float(np.min(all_values_concat))
-        global_max = float(np.max(all_values_concat))
-
-        logger.info(f"  {device_code}: {len(all_mel_data)} patches | "
+        # 增量训练时,用已部署的 scale 做 EMA 平滑,避免剧烈偏移
+        if inherit_scale:
+            old_scale = self._load_deployed_scale(device_code)
+            if old_scale is not None:
+                ema_alpha = 0.3  # 新数据权重
+                old_min, old_max = old_scale
+                global_min = ema_alpha * global_min + (1 - ema_alpha) * old_min
+                global_max = ema_alpha * global_max + (1 - ema_alpha) * old_max
+                logger.info(f"  {device_code}: scale EMA 融合 | "
+                            f"old=[{old_min:.4f}, {old_max:.4f}] → "
+                            f"new=[{global_min:.4f}, {global_max:.4f}]")
+
+        logger.info(f"  {device_code}: {patch_count} patches | "
                     f"min={global_min:.4f} max={global_max:.4f}")
 
-        # 第二遍:Min-Max 标准化并保存
+        # ── 第二遍:Min-Max 标准化并保存 ──
         device_mel_dir = self.temp_mel_dir / device_code
         device_mel_dir.mkdir(parents=True, exist_ok=True)
 
-        for wav_file, idx, mel_db in all_mel_data:
-            # Min-Max: (x - min) / (max - min)
-            mel_norm = (mel_db - global_min) / (global_max - global_min + 1e-6)
+        scale_range = global_max - global_min + 1e-6
+        for wav_file, idx, mel_db in _iter_mel_patches(wav_files):
+            mel_norm = (mel_db - global_min) / scale_range
             npy_name = f"{device_code}@@{wav_file.stem}@@win{idx:05d}.npy"
             np.save(device_mel_dir / npy_name, mel_norm.astype(np.float32))
 
         return device_mel_dir, (global_min, global_max)
 
-    def prepare_mel_features_per_device(self, device_files: Dict[str, List[Path]]
+    def _load_deployed_scale(self, device_code: str) -> Optional[Tuple[float, float]]:
+        """加载已部署的 global_scale.npy,用于增量训练时的 scale 继承"""
+        scale_path = self.model_root / device_code / "global_scale.npy"
+        if not scale_path.exists():
+            return None
+        try:
+            scale = np.load(scale_path)
+            return float(scale[0]), float(scale[1])
+        except Exception as e:
+            logger.warning(f"加载旧 scale 失败: {device_code} | {e}")
+            return None
+
+    def prepare_mel_features_per_device(self, device_files: Dict[str, List[Path]],
+                                        inherit_scale: bool = False
                                         ) -> Dict[str, Tuple[Path, Tuple[float, float]]]:
         """
         为每个设备独立提取 Mel 特征
@@ -334,6 +418,7 @@ class IncrementalTrainer:
 
         Args:
             device_files: {device_code: [wav_files]}
+            inherit_scale: 增量训练时传 True,将新旧 scale 做 EMA 融合
 
         Returns:
             {device_code: (mel_dir, (global_min, global_max))}
@@ -348,7 +433,9 @@ class IncrementalTrainer:
         device_results = {}
 
         for device_code, wav_files in device_files.items():
-            mel_dir, scale = self._extract_mel_for_device(device_code, wav_files)
+            mel_dir, scale = self._extract_mel_for_device(
+                device_code, wav_files, inherit_scale=inherit_scale
+            )
             if mel_dir is not None:
                 device_results[device_code] = (mel_dir, scale)
 
@@ -363,75 +450,279 @@ class IncrementalTrainer:
     # 模型训练(每设备独立)
     # ========================================
 
+    def _select_training_device(self) -> torch.device:
+        # 智能选择训练设备:GPU/NPU 显存充足则使用,否则回退 CPU
+        # 训练配置中可通过 training_device 强制指定 (auto/cpu/cuda/npu)
+        training_cfg = self.config['auto_training']['incremental']
+        forced_device = training_cfg.get('training_device', 'auto')
+
+        # 强制指定设备时直接返回
+        if forced_device == 'cpu':
+            logger.info("训练设备: CPU(配置强制指定)")
+            return torch.device('cpu')
+        if forced_device == 'cuda':
+            if torch.cuda.is_available():
+                return torch.device('cuda')
+            logger.warning("配置指定 CUDA 但不可用,回退 CPU")
+            return torch.device('cpu')
+        if forced_device == 'npu':
+            if self._npu_available():
+                return torch.device('npu')
+            logger.warning("配置指定 NPU 但不可用,回退 CPU")
+            return torch.device('cpu')
+
+        # auto 模式:依次检测 CUDA → NPU → CPU
+        # 1. 检测 CUDA
+        if torch.cuda.is_available():
+            try:
+                free_mem = torch.cuda.mem_get_info()[0] / (1024 * 1024)
+                min_gpu_mem_mb = training_cfg.get('min_gpu_mem_mb', 512)
+                if free_mem >= min_gpu_mem_mb:
+                    logger.info(f"训练设备: CUDA(空闲显存 {free_mem:.0f}MB)")
+                    return torch.device('cuda')
+                logger.info(
+                    f"CUDA 空闲显存不足 ({free_mem:.0f}MB < {min_gpu_mem_mb}MB)"
+                )
+            except Exception as e:
+                logger.warning(f"CUDA 显存检测失败: {e}")
+
+        # 2. 检测 NPU (华为昇腾)
+        if self._npu_available():
+            try:
+                free_mem = torch.npu.mem_get_info()[0] / (1024 * 1024)
+                min_gpu_mem_mb = training_cfg.get('min_gpu_mem_mb', 512)
+                if free_mem >= min_gpu_mem_mb:
+                    logger.info(f"训练设备: NPU(空闲显存 {free_mem:.0f}MB)")
+                    return torch.device('npu')
+                logger.info(
+                    f"NPU 空闲显存不足 ({free_mem:.0f}MB < {min_gpu_mem_mb}MB)"
+                )
+            except Exception as e:
+                logger.warning(f"NPU 显存检测失败: {e},回退 CPU")
+
+        logger.info("训练设备: CPU")
+        return torch.device('cpu')
+
+    @staticmethod
+    def _npu_available() -> bool:
+        """检查华为昇腾 NPU 是否可用"""
+        try:
+            import torch_npu  # noqa: F401
+            return torch.npu.is_available()
+        except ImportError:
+            return False
+
+    def _run_training_loop(self, device_code: str, model: nn.Module,
+                           train_loader, val_loader, epochs: int, lr: float,
+                           device: torch.device) -> Tuple[nn.Module, float]:
+        # 执行实际的训练循环,与设备选择解耦
+        # 早停基于验证集损失(如有),否则基于训练损失
+        model = model.to(device)
+        model.train()
+        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
+        criterion = nn.MSELoss()
+
+        # AMP 混合精度(GPU/NPU 生效,减少约 40% 显存占用)
+        use_amp = device.type in ('cuda', 'npu')
+        scaler = torch.amp.GradScaler(device.type) if use_amp else None
+
+        # 早停配置
+        early_stop_cfg = self.config['auto_training']['incremental']
+        patience = early_stop_cfg.get('early_stop_patience', 5)
+        best_loss = float('inf')
+        no_improve_count = 0
+
+        avg_loss = 0.0
+        actual_epochs = 0
+
+        for epoch in range(epochs):
+            # ── 训练阶段 ──
+            model.train()
+            epoch_loss = 0.0
+            batch_count = 0
+
+            for batch in train_loader:
+                batch = batch.to(device)
+                optimizer.zero_grad()
+
+                if use_amp:
+                    with torch.amp.autocast(device.type):
+                        output = model(batch)
+                        output = align_to_target(output, batch)
+                        loss = criterion(output, batch)
+                    scaler.scale(loss).backward()
+                    scaler.step(optimizer)
+                    scaler.update()
+                else:
+                    output = model(batch)
+                    output = align_to_target(output, batch)
+                    loss = criterion(output, batch)
+                    loss.backward()
+                    optimizer.step()
+
+                epoch_loss += loss.item()
+                batch_count += 1
+
+            avg_loss = epoch_loss / batch_count
+            actual_epochs = epoch + 1
+
+            # ── 验证阶段(如有验证集) ──
+            if val_loader is not None:
+                model.eval()
+                val_loss = 0.0
+                val_count = 0
+                with torch.no_grad():
+                    for batch in val_loader:
+                        batch = batch.to(device)
+                        if use_amp:
+                            with torch.amp.autocast(device.type):
+                                output = model(batch)
+                                output = align_to_target(output, batch)
+                                loss = criterion(output, batch)
+                        else:
+                            output = model(batch)
+                            output = align_to_target(output, batch)
+                            loss = criterion(output, batch)
+                        val_loss += loss.item()
+                        val_count += 1
+                avg_val_loss = val_loss / val_count
+                monitor_loss = avg_val_loss  # 早停监控验证损失
+            else:
+                avg_val_loss = None
+                monitor_loss = avg_loss  # 无验证集时回退训练损失
+
+            if actual_epochs % 10 == 0 or epoch == epochs - 1:
+                val_str = f" | ValLoss: {avg_val_loss:.6f}" if avg_val_loss is not None else ""
+                logger.info(f"  [{device_code}] Epoch {actual_epochs}/{epochs} | "
+                            f"Loss: {avg_loss:.6f}{val_str} | device={device.type}")
+
+            # 早停检测:连续 patience 轮无改善则提前终止
+            if monitor_loss < best_loss:
+                best_loss = monitor_loss
+                no_improve_count = 0
+            else:
+                no_improve_count += 1
+
+            if no_improve_count >= patience and actual_epochs >= 10:
+                logger.info(f"  [{device_code}] 早停触发: 连续{patience}轮无改善 | "
+                           f"最终轮数={actual_epochs}/{epochs} | Loss={avg_loss:.6f}")
+                break
+
+        # 训练后清理加速器缓存
+        if device.type == 'cuda':
+            torch.cuda.empty_cache()
+        elif device.type == 'npu':
+            torch.npu.empty_cache()
+
+        if actual_epochs < epochs:
+            logger.info(f"  [{device_code}] 早停节省 {epochs - actual_epochs} 轮训练")
+
+        return model, avg_loss
+
     def train_single_device(self, device_code: str, mel_dir: Path,
                             epochs: int, lr: float,
                             from_scratch: bool = True
                             ) -> Tuple[nn.Module, float]:
-        """
-        训练单个设备的独立模型
-
-        Args:
-            device_code: 设备编码
-            mel_dir: 该设备的 Mel 特征目录
-            epochs: 训练轮数
-            lr: 学习率
-            from_scratch: True=从零训练(全量),False=加载已有模型微调(增量)
-
-        Returns:
-            (model, final_loss)
-        """
+        # 训练单个设备的独立模型
+        # 策略:优先 GPU 训练,显存不足自动回退 CPU;训练中 OOM 也会捕获并用 CPU 重试
         logger.info(f"训练设备 {device_code}: epochs={epochs}, lr={lr}, "
                     f"mode={'全量' if from_scratch else '增量'}")
 
-        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
-        model = ConvAutoencoder().to(device)
+        # 智能选择训练设备
+        device = self._select_training_device()
+
+        # 训练前清理加速器缓存,释放推理残留的显存碎片
+        if device.type == 'cuda':
+            torch.cuda.empty_cache()
+            import gc
+            gc.collect()
+        elif device.type == 'npu':
+            torch.npu.empty_cache()
+            import gc
+            gc.collect()
+
+        model = ConvAutoencoder()
 
         # 增量模式下加载已有模型
         if not from_scratch:
             model_path = self.model_root / device_code / "ae_model.pth"
             if model_path.exists():
-                model.load_state_dict(torch.load(model_path, map_location=device))
+                model.load_state_dict(torch.load(model_path, map_location='cpu'))
                 logger.info(f"  已加载已有模型: {model_path}")
             else:
                 logger.warning(f"  模型不存在,自动切换为全量训练: {model_path}")
 
-        # 加载数据
+        # 加载数据并按 80/20 划分训练集/验证集
         dataset = MelNPYDataset(mel_dir)
         if len(dataset) == 0:
             raise ValueError(f"设备 {device_code} 无训练数据")
 
         batch_size = self.config['auto_training']['incremental']['batch_size']
-        dataloader = torch.utils.data.DataLoader(
-            dataset, batch_size=batch_size, shuffle=True, num_workers=0
-        )
-
-        # 训练
-        model.train()
-        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
-        criterion = nn.MSELoss()
 
-        avg_loss = 0.0
-        for epoch in range(epochs):
-            epoch_loss = 0.0
-            batch_count = 0
-
-            for batch in dataloader:
-                batch = batch.to(device)
-                optimizer.zero_grad()
-                output = model(batch)
-                output = align_to_target(output, batch)
-                loss = criterion(output, batch)
-                loss.backward()
-                optimizer.step()
-                epoch_loss += loss.item()
-                batch_count += 1
+        # 验证集划分:数据量 >= 20 时才划分(否则太少无统计意义)
+        val_loader = None
+        if len(dataset) >= 20:
+            val_size = max(1, int(len(dataset) * 0.2))
+            train_size = len(dataset) - val_size
+            train_dataset, val_dataset = torch.utils.data.random_split(
+                dataset, [train_size, val_size]
+            )
+            train_loader = torch.utils.data.DataLoader(
+                train_dataset, batch_size=batch_size, shuffle=True,
+                num_workers=0, pin_memory=False
+            )
+            val_loader = torch.utils.data.DataLoader(
+                val_dataset, batch_size=batch_size, shuffle=False,
+                num_workers=0, pin_memory=False
+            )
+            logger.info(f"  数据集划分: 训练={train_size}, 验证={val_size}")
+        else:
+            train_loader = torch.utils.data.DataLoader(
+                dataset, batch_size=batch_size, shuffle=True,
+                num_workers=0, pin_memory=False
+            )
+            logger.info(f"  数据量不足20,跳过验证集划分(共{len(dataset)}样本)")
+
+        # 尝试在选定设备上训练
+        if device.type in ('cuda', 'npu'):
+            try:
+                return self._run_training_loop(
+                    device_code, model, train_loader, val_loader,
+                    epochs, lr, device
+                )
+            except (torch.cuda.OutOfMemoryError, RuntimeError) as e:
+                # GPU/NPU OOM -> 清理显存后回退 CPU 重试
+                if 'out of memory' not in str(e).lower() and isinstance(e, RuntimeError):
+                    raise  # 非 OOM 的 RuntimeError 不拦截
+                logger.warning(
+                    f"  [{device_code}] {device.type.upper()} OOM,"
+                    f"清理显存后回退 CPU 训练"
+                )
+                import gc
+                gc.collect()
+                if device.type == 'cuda':
+                    torch.cuda.empty_cache()
+                elif device.type == 'npu':
+                    torch.npu.empty_cache()
+                # 模型可能处于脏状态,重新初始化
+                model = ConvAutoencoder()
+                if not from_scratch:
+                    model_path = self.model_root / device_code / "ae_model.pth"
+                    if model_path.exists():
+                        model.load_state_dict(
+                            torch.load(model_path, map_location='cpu')
+                        )
+                return self._run_training_loop(
+                    device_code, model, train_loader, val_loader,
+                    epochs, lr, torch.device('cpu')
+                )
+        else:
+            # CPU 训练,无需 OOM 保护
+            return self._run_training_loop(
+                device_code, model, train_loader, val_loader,
+                epochs, lr, device
+            )
 
-            avg_loss = epoch_loss / batch_count
-            # 每10轮或最后一轮打印日志,避免日志刷屏
-            if (epoch + 1) % 10 == 0 or epoch == epochs - 1:
-                logger.info(f"  [{device_code}] Epoch {epoch+1}/{epochs} | Loss: {avg_loss:.6f}")
-
-        return model, avg_loss
 
     # ========================================
     # 产出部署(每设备独立目录)
@@ -616,14 +907,23 @@ class IncrementalTrainer:
     # 增量训练入口(保留兼容)
     # ========================================
 
-    def run_daily_training(self) -> bool:
+    def run_daily_training(self, on_device_trained=None) -> bool:
         """
-        执行每日增量训练(保留原有逻辑)
+        执行每日增量训练 — 逐设备串行处理
+
+        流程(每个设备完整走完再处理下一个,降低内存/CPU 峰值):
+        1. 收集所有设备的文件列表(仅路径,开销极低)
+        2. 备份模型
+        3. 逐设备:提取特征 → 训练 → 验证 → 部署 → 清理临时文件 → 回调通知
+        4. 更新分类器基线
 
-        改造点:每设备独立训练+部署,不再共享模型
+        Args:
+            on_device_trained: 可选回调 fn(device_code: str),
+                               每个设备训练+部署成功后调用,
+                               用于即时触发该设备的模型热重载
 
         Returns:
-            bool: 是否成功
+            bool: 是否至少有一个设备成功
         """
         try:
             days_ago = (self.use_days_ago if self.use_days_ago is not None
@@ -635,7 +935,7 @@ class IncrementalTrainer:
             logger.info(f"{mode_str} - {target_date}")
             logger.info("=" * 60)
 
-            # 1. 收集数据
+            # 1. 收集数据(仅文件路径,不加载音频,开销极低)
             device_files = self.collect_training_data(target_date)
             total = sum(len(f) for f in device_files.values())
             min_samples = self.config['auto_training']['incremental']['min_samples']
@@ -647,43 +947,130 @@ class IncrementalTrainer:
             if self.config['auto_training']['model']['backup_before_train']:
                 self.backup_model(target_date)
 
-            # 3. 每设备提取特征
-            device_results = self.prepare_mel_features_per_device(device_files)
-            if not device_results:
-                logger.error("特征提取失败")
-                return False
-
-            # 4. 训练参数
+            # 3. 训练参数
             epochs = (self.epochs if self.epochs is not None
                       else self.config['auto_training']['incremental']['epochs'])
             lr = (self.learning_rate if self.learning_rate is not None
                   else self.config['auto_training']['incremental']['learning_rate'])
 
-            # 5. 每设备独立训练+部署
-            # 冷启动=全量训练(从零),增量=加载已有模型微调
             from_scratch = self.cold_start_mode
+            inherit_scale = not self.cold_start_mode
+
+            model_cfg = self.config['auto_training']['model']
+            rollback_enabled = model_cfg.get('rollback_on_degradation', True)
+            rollback_factor = model_cfg.get('rollback_factor', 2.0)
 
+            # 4. 逐设备串行处理:提取 → 训练 → 部署 → 清理
             success_count = 0
-            for device_code, (mel_dir, scale) in device_results.items():
+            degraded_count = 0
+            device_count = len(device_files)
+
+            for idx, (device_code, wav_files) in enumerate(device_files.items(), 1):
+                logger.info(f"\n{'='*40}")
+                logger.info(f"[{idx}/{device_count}] 设备: {device_code}")
+                logger.info(f"{'='*40}")
+
                 try:
-                    model, _ = self.train_single_device(
+                    # ── 4a. 单设备特征提取 ──
+                    mel_dir, scale = self._extract_mel_for_device(
+                        device_code, wav_files, inherit_scale=inherit_scale
+                    )
+                    if mel_dir is None:
+                        logger.warning(f"{device_code}: 特征提取无有效数据,跳过")
+                        continue
+
+                    # ── 4b. 训练 ──
+                    model, final_loss = self.train_single_device(
                         device_code, mel_dir, epochs, lr, from_scratch=from_scratch
                     )
 
-                    if self._validate_model(model):
-                        if self.config['auto_training']['model']['auto_deploy']:
-                            self.deploy_device_model(device_code, model, scale, mel_dir)
-                        success_count += 1
-                    else:
-                        logger.error(f"{device_code}: 验证失败,跳过部署")
+                    # ── 4c. 形状验证 ──
+                    if not self._validate_model(model):
+                        logger.error(f"{device_code}: 形状验证失败,跳过部署")
+                        continue
+
+                    # ── 4d. 损失退化检测(增量训练时生效) ──
+                    if rollback_enabled and not self.cold_start_mode:
+                        old_threshold = self._get_old_threshold(device_code)
+                        if old_threshold and old_threshold > 0:
+                            if final_loss > old_threshold * rollback_factor:
+                                logger.warning(
+                                    f"{device_code}: 损失退化检测触发 | "
+                                    f"训练损失={final_loss:.6f} > "
+                                    f"旧阈值={old_threshold:.6f} × {rollback_factor} = "
+                                    f"{old_threshold * rollback_factor:.6f}"
+                                )
+                                degraded_count += 1
+                                continue
+
+                    # ── 4e. 阈值偏移检测 + 部署 ──
+                    if model_cfg.get('auto_deploy', True):
+                        if rollback_enabled and not self.cold_start_mode:
+                            old_threshold = self._get_old_threshold(device_code)
+                            if old_threshold and old_threshold > 0:
+                                new_threshold = self._compute_threshold(model, mel_dir)
+                                drift_ratio = abs(new_threshold - old_threshold) / old_threshold
+                                # 记录阈值变化趋势(用于长期漂移监控)
+                                self._log_threshold_history(
+                                    device_code, target_date,
+                                    old_threshold, new_threshold, final_loss
+                                )
+                                if drift_ratio > 0.3:
+                                    logger.warning(
+                                        f"{device_code}: 阈值偏移告警 | "
+                                        f"旧={old_threshold:.6f} → "
+                                        f"新={new_threshold:.6f} | "
+                                        f"偏移={drift_ratio:.1%}"
+                                    )
+                                    if drift_ratio > 1.0:
+                                        logger.warning(
+                                            f"{device_code}: 阈值偏移过大"
+                                            f"(>{drift_ratio:.0%}),跳过部署"
+                                        )
+                                        degraded_count += 1
+                                        continue
+                        self.deploy_device_model(device_code, model, scale, mel_dir)
+
+                    success_count += 1
+                    logger.info(f"{device_code}: 训练+部署完成 | loss={final_loss:.6f}")
+
+                    # ── 4f. 即时通知该设备模型重载 ──
+                    if on_device_trained:
+                        try:
+                            on_device_trained(device_code)
+                        except Exception as e:
+                            logger.warning(f"{device_code}: 模型重载回调失败 | {e}")
+
                 except Exception as e:
-                    logger.error(f"{device_code}: 训练失败 | {e}")
+                    logger.error(f"{device_code}: 训练失败 | {e}", exc_info=True)
+
+                finally:
+                    # ── 4g. 清理该设备的临时 Mel 文件,释放磁盘空间 ──
+                    device_mel_dir = self.temp_mel_dir / device_code
+                    if device_mel_dir.exists():
+                        shutil.rmtree(device_mel_dir)
+
+            # 5. 如果所有设备都退化,整体回滚到训练前备份
+            if degraded_count > 0 and success_count == 0:
+                logger.error(
+                    f"所有设备训练后损失退化({degraded_count}个),执行整体回滚"
+                )
+                self.restore_backup(target_date)
+                return False
+
+            if degraded_count > 0:
+                logger.warning(
+                    f"{degraded_count} 个设备因损失退化跳过部署,"
+                    f"{success_count} 个设备部署成功"
+                )
 
             # 6. 更新分类器基线
             self._update_classifier_baseline(device_files)
 
             logger.info("=" * 60)
-            logger.info(f"增量训练完成: {success_count}/{len(device_results)} 个设备成功")
+            logger.info(f"增量训练完成: {success_count}/{device_count} 个设备成功")
+            if degraded_count > 0:
+                logger.info(f"  其中 {degraded_count} 个设备因损失退化跳过")
             logger.info("=" * 60)
             return success_count > 0
 
@@ -699,6 +1086,60 @@ class IncrementalTrainer:
     # 辅助方法
     # ========================================
 
+    def _get_old_threshold(self, device_code: str) -> float:
+        """
+        读取设备当前已部署的阈值(训练前的旧阈值)
+
+        用于损失退化校验:新模型的训练损失不应远超旧阈值。
+        阈值文件路径: models/{device_code}/thresholds/threshold_{device_code}.npy
+
+        返回:
+            阈值浮点数,文件不存在时返回 0.0
+        """
+        threshold_file = self.model_root / device_code / "thresholds" / f"threshold_{device_code}.npy"
+        if not threshold_file.exists():
+            return 0.0
+        try:
+            data = np.load(threshold_file)
+            return float(data.flat[0])
+        except Exception as e:
+            logger.warning(f"读取旧阈值失败: {device_code} | {e}")
+            return 0.0
+
+    def _log_threshold_history(self, device_code: str, date_str: str,
+                                old_threshold: float, new_threshold: float,
+                                train_loss: float):
+        """
+        记录阈值变化历史到 CSV,用于监控模型长期漂移趋势
+
+        文件路径: logs/threshold_history.csv
+        格式: date,device_code,old_threshold,new_threshold,drift_ratio,train_loss
+        """
+        import csv
+
+        log_dir = self.deploy_root / "logs"
+        log_dir.mkdir(parents=True, exist_ok=True)
+        csv_path = log_dir / "threshold_history.csv"
+
+        drift_ratio = (new_threshold - old_threshold) / old_threshold if old_threshold > 0 else 0.0
+        write_header = not csv_path.exists()
+
+        try:
+            with open(csv_path, 'a', newline='', encoding='utf-8') as f:
+                writer = csv.writer(f)
+                if write_header:
+                    writer.writerow([
+                        'date', 'device_code', 'old_threshold', 'new_threshold',
+                        'drift_ratio', 'train_loss'
+                    ])
+                writer.writerow([
+                    date_str, device_code,
+                    f"{old_threshold:.8f}", f"{new_threshold:.8f}",
+                    f"{drift_ratio:.4f}", f"{train_loss:.8f}"
+                ])
+        except Exception as e:
+            logger.warning(f"写入阈值历史失败: {e}")
+
     def _validate_model(self, model: nn.Module) -> bool:
         # 验证模型输出形状是否合理
         if not self.config['auto_training']['validation']['enabled']:

+ 10 - 4
config/auto_training.yaml

@@ -17,13 +17,17 @@ auto_training:
     
     # 数据采样
     use_days_ago: 1             # 使用N天前的数据(1=昨天)
-    sample_hours: 2             # 随机采样时长(小时),0=使用全部
+    sample_hours: 1             # 随机采样时长(小时),0=使用全部
     min_samples: 50             # 最少样本数,不足则跳过
     
-    # 训练参数
-    epochs: 50                   # 训练轮数
+    # 训练参数(低配服务器优化)
+    epochs: 30                   # 训练轮数(配合早停,实际通常更少)
     learning_rate: 0.0001       # 学习率
-    batch_size: 64              # 批大小
+    batch_size: 32              # 批大小(降低显存占用)
+    early_stop_patience: 5      # 早停耐心值:连续N轮loss无改善则停止
+    training_device: cpu           # 训练设备选择:auto(自动检测显存)/cpu/cuda/npu
+                                    # 低配服务器推荐 cpu,模型小(~214KB) CPU训练30epoch耗时可接受
+    min_gpu_mem_mb: 512          # auto模式下,GPU空闲显存低于此值(MB)时回退CPU
     
   # 模型管理
   model:
@@ -31,6 +35,8 @@ auto_training:
     keep_backups: 7             # 保留备份数量
     auto_deploy: true           # 自动部署新模型
     update_thresholds: true     # 训练后更新阈值npy
+    rollback_on_degradation: true   # 训练后损失异常时自动回滚到备份
+    rollback_factor: 2.0            # 新模型损失 > 旧阈值 * 此因子 则判定为退化
     
   # 验证配置
   validation:

+ 14 - 3
predictor/utils.py

@@ -33,11 +33,22 @@ def ensure_dirs():
 def get_device():
     """
     获取可用的计算设备
-    
+
+    优先级: CUDA > NPU (华为昇腾) > CPU
+
     返回:
-        str: "cuda" 或 "cpu"
+        str: "cuda", "npu" 或 "cpu"
     """
-    return "cuda" if torch.cuda.is_available() else "cpu"
+    if torch.cuda.is_available():
+        return "cuda"
+    # 华为昇腾 NPU
+    try:
+        import torch_npu  # noqa: F401
+        if torch.npu.is_available():
+            return "npu"
+    except ImportError:
+        pass
+    return "cpu"
 
 
 def align_to_target(pred, target):

+ 485 - 0
run_with_auto_training.py

@@ -0,0 +1,485 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+run_with_auto_training.py
+-------------------------
+带自动训练的主程序
+
+功能:
+1. RTSP拾音器监控(通过 PickupMonitoringSystem)
+2. 冷启动检测(无模型时等待数据收集后自动训练)
+3. 每日增量训练(默认02:00,可配置)
+4. 每日数据清理(默认00:00,可配置)
+5. 训练完成后主动通知 MultiModelPredictor 重载,无需等待60秒轮询
+"""
+
+import sys
+import signal
+import logging
+import threading
+import time
+from pathlib import Path
+from datetime import datetime
+
+# 确保项目根目录在 sys.path 中
+sys.path.insert(0, str(Path(__file__).parent))
+
+try:
+    from apscheduler.schedulers.background import BackgroundScheduler
+    from apscheduler.triggers.cron import CronTrigger
+    import yaml
+except ImportError:
+    print("错误:缺少依赖库")
+    print("请运行:pip install apscheduler pyyaml")
+    sys.exit(1)
+
+
+def setup_logging():
+    # 配置日志系统(按文件大小轮转),与 run_pickup_monitor.py 共用同一日志文件
+    from logging.handlers import RotatingFileHandler
+
+    # 如果根 logger 已经被上层调用者配置过,则直接复用
+    root = logging.getLogger()
+    if root.handlers:
+        return
+
+    log_dir = Path(__file__).parent / "logs"
+    log_dir.mkdir(parents=True, exist_ok=True)
+
+    log_file = log_dir / "system.log"
+
+    formatter = logging.Formatter(
+        '%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s',
+        datefmt='%Y-%m-%d %H:%M:%S'
+    )
+
+    # 10MB 单文件上限,保留 2 个备份(总计约 30MB)
+    file_handler = RotatingFileHandler(
+        log_file,
+        maxBytes=10 * 1024 * 1024,
+        backupCount=2,
+        encoding='utf-8'
+    )
+    file_handler.setFormatter(formatter)
+
+    console_handler = logging.StreamHandler(sys.stdout)
+    console_handler.setFormatter(formatter)
+
+    logging.basicConfig(
+        level=logging.INFO,
+        handlers=[file_handler, console_handler]
+    )
+
+
+logger = logging.getLogger('MainSystem')
+
+
+class ColdStartManager:
+    """
+    冷启动管理器
+
+    检查注册的设备是否都有对应的模型文件,
+    缺少模型的设备会被标记为冷启动状态,等待数据收集后自动训练。
+    """
+
+    def __init__(self, deploy_root: Path, config: dict):
+        self.deploy_root = deploy_root
+        self.config = config
+
+        # 模型根目录(每设备子目录)
+        self.model_root = deploy_root / "models"
+        # 音频数据根目录
+        self.audio_root = deploy_root / "data" / "audio"
+
+        # 冷启动配置
+        cold_start_cfg = config.get('auto_training', {}).get('cold_start', {})
+        self.enabled = cold_start_cfg.get('enabled', True)
+        # 等待收集数据的最短时长(小时),避免数据量过少就开始训练
+        self.wait_hours = cold_start_cfg.get('wait_hours', 2)
+        # 每设备最少样本数
+        self.min_samples = cold_start_cfg.get('min_samples', 100)
+
+        # 运行时状态
+        self.is_cold_start = False
+        self.cold_start_time = None
+        # 缺少模型的设备列表
+        self.missing_devices = []
+
+    def check_cold_start_needed(self, registered_devices: dict) -> bool:
+        """
+        检查是否有设备缺少模型
+
+        参数:
+            registered_devices: {device_code: model_subdir} 已注册的设备映射
+
+        返回:
+            True 表示至少有一个设备缺少模型,需要冷启动
+        """
+        if not self.enabled:
+            return False
+
+        self.missing_devices = []
+        for device_code, model_subdir in registered_devices.items():
+            model_path = self.model_root / model_subdir / "ae_model.pth"
+            if not model_path.exists():
+                self.missing_devices.append(device_code)
+                logger.warning(f"模型不存在: {device_code} -> {model_path}")
+
+        if self.missing_devices:
+            logger.warning(f"以下设备缺少模型,进入冷启动模式: {self.missing_devices}")
+            return True
+
+        return False
+
+    def start_cold_start_mode(self):
+        # 进入冷启动模式,开始计时
+        self.is_cold_start = True
+        self.cold_start_time = datetime.now()
+
+        logger.info("=" * 70)
+        logger.info("冷启动模式")
+        logger.info("=" * 70)
+        logger.info(f"缺少模型的设备: {self.missing_devices}")
+        logger.info(f"等待时长: {self.wait_hours} 小时")
+        logger.info(f"每设备最少样本: {self.min_samples}")
+        logger.info("系统将只采集音频,不进行检测(缺模型的设备)")
+        logger.info("=" * 70)
+
+    def check_ready_for_training(self) -> bool:
+        """
+        检查是否已收集到足够的数据可以开始首次训练
+
+        条件:
+        1. 已等待超过 wait_hours 小时
+        2. 所有缺模型设备的音频文件数 >= min_samples
+
+        返回:
+            True 表示数据已就绪
+        """
+        if not self.is_cold_start:
+            return False
+
+        # 条件1:等待时间
+        elapsed = (datetime.now() - self.cold_start_time).total_seconds() / 3600
+        if elapsed < self.wait_hours:
+            return False
+
+        # 条件2:逐设备检查数据量
+        for device_code in self.missing_devices:
+            device_dir = self.audio_root / device_code
+            if not device_dir.exists():
+                logger.info(f"数据目录不存在: {device_code}")
+                return False
+
+            # 统计 current 目录和日期目录下的 wav 文件数
+            total_samples = 0
+            for sub_dir in device_dir.iterdir():
+                if sub_dir.is_dir():
+                    total_samples += len(list(sub_dir.glob("*.wav")))
+
+            if total_samples < self.min_samples:
+                logger.info(f"数据量不足: {device_code} -> {total_samples}/{self.min_samples}")
+                return False
+
+        logger.info("所有冷启动设备数据收集完成")
+        return True
+
+    def run_initial_training(self, on_device_trained=None) -> bool:
+        """
+        执行首次训练(冷启动)
+
+        使用 IncrementalTrainer 的 cold_start_mode,收集所有可用数据进行全量训练。
+
+        Args:
+            on_device_trained: 可选回调 fn(device_code),单设备训练完成后调用
+
+        返回:
+            True 表示训练成功
+        """
+        logger.info("=" * 70)
+        logger.info("开始首次训练(冷启动)")
+        logger.info("=" * 70)
+
+        try:
+            from auto_training.incremental_trainer import IncrementalTrainer
+
+            config_file = self.deploy_root / "config" / "auto_training.yaml"
+            trainer = IncrementalTrainer(config_file)
+
+            # 冷启动模式:收集所有目录的数据,用全量训练
+            trainer.cold_start_mode = True
+            trainer.use_days_ago = 0
+            trainer.sample_hours = 0
+            # 首次训练用更多轮数和稍高学习率
+            trainer.epochs = 90
+            trainer.learning_rate = 0.001
+
+            success = trainer.run_daily_training(
+                on_device_trained=on_device_trained
+            )
+
+            if success:
+                logger.info("=" * 70)
+                logger.info("首次训练完成,切换到正常检测模式")
+                logger.info("=" * 70)
+                self.is_cold_start = False
+
+            return success
+
+        except Exception as e:
+            logger.error(f"首次训练失败: {e}", exc_info=True)
+            return False
+
+
+class IntegratedSystem:
+    """
+    集成系统:冷启动 + 拾音器监控 + 自动增量训练 + 数据清理
+
+    作为 run_pickup_monitor.py 的上层封装,在其基础上增加:
+    1. 冷启动自动训练
+    2. APScheduler 定时增量训练和数据清理
+    3. 训练完成后主动触发模型热更新
+    """
+
+    def __init__(self):
+        self.deploy_root = Path(__file__).parent
+        self.auto_config_file = self.deploy_root / "config" / "auto_training.yaml"
+
+        # 加载自动训练配置
+        self.auto_config = self._load_yaml(self.auto_config_file)
+
+        # 运行时对象
+        self.scheduler = None
+        self.pickup_system = None
+        self.cold_start_manager = None
+        self.cold_start_thread = None
+
+    def _load_yaml(self, config_file: Path) -> dict:
+        # 加载 YAML 配置文件,不存在时返回空字典
+        if not config_file.exists():
+            logger.warning(f"配置文件不存在: {config_file}")
+            return {}
+
+        with open(config_file, 'r', encoding='utf-8') as f:
+            return yaml.safe_load(f) or {}
+
+    def _check_and_handle_cold_start(self) -> bool:
+        """
+        检查并处理冷启动
+
+        利用 PickupMonitoringSystem 中已注册的设备列表,
+        检查哪些设备缺少模型文件,对缺失设备启动冷启动训练。
+
+        返回:
+            True 表示处于冷启动模式
+        """
+        self.cold_start_manager = ColdStartManager(
+            self.deploy_root,
+            self.auto_config
+        )
+
+        # 从 pickup_system 的 multi_predictor 获取已注册设备
+        registered = self.pickup_system.multi_predictor.device_model_map
+
+        if not self.cold_start_manager.check_cold_start_needed(registered):
+            logger.info("所有设备模型完整,进入正常检测模式")
+            return False
+
+        # 进入冷启动模式
+        self.cold_start_manager.start_cold_start_mode()
+
+        # 启动后台线程监控冷启动状态
+        self.cold_start_thread = threading.Thread(
+            target=self._cold_start_monitor_loop,
+            daemon=True,
+            name="cold-start-monitor"
+        )
+        self.cold_start_thread.start()
+
+        return True
+
+    def _cold_start_monitor_loop(self):
+        # 后台线程:每分钟检查冷启动数据是否就绪,就绪后触发首次训练
+        while self.cold_start_manager.is_cold_start:
+            time.sleep(60)
+
+            if self.cold_start_manager.check_ready_for_training():
+                success = self.cold_start_manager.run_initial_training(
+                    on_device_trained=self._reload_single_device
+                )
+
+                if success:
+                    break
+                else:
+                    # 训练失败,等待10分钟后重试
+                    logger.warning("首次训练失败,将在10分钟后重试")
+                    time.sleep(600)
+
+    def _reload_single_device(self, device_code: str):
+        """训练回调:单个设备训练完成后即时重载该设备模型"""
+        if not self.pickup_system:
+            return
+        mp = self.pickup_system.multi_predictor
+        try:
+            if mp.reload_device(device_code):
+                logger.info(f"模型即时重载成功: {device_code}")
+            else:
+                logger.warning(f"模型即时重载失败: {device_code}")
+        except Exception as e:
+            logger.error(f"模型即时重载异常: {device_code} | {e}")
+
+    def _reload_models_after_training(self):
+        """
+        训练完成后主动触发 MultiModelPredictor 重载所有设备模型
+
+        相比被动等待60秒轮询检测 mtime 变化,主动调用 reload_device() 可以:
+        1. 即时生效(零延迟)
+        2. 有明确的成功/失败反馈
+        """
+        if not self.pickup_system:
+            return
+
+        mp = self.pickup_system.multi_predictor
+        success_count = 0
+        fail_count = 0
+
+        for device_code in mp.registered_devices:
+            try:
+                if mp.reload_device(device_code):
+                    success_count += 1
+                else:
+                    fail_count += 1
+            except Exception as e:
+                logger.error(f"重载设备模型失败: {device_code} | {e}")
+                fail_count += 1
+
+        logger.info(f"模型重载完成: 成功={success_count}, 失败={fail_count}")
+
+    def _setup_auto_training_tasks(self):
+        # 配置 APScheduler 定时任务:增量训练 + 数据清理
+        if not self.auto_config.get('auto_training', {}).get('enabled', False):
+            logger.info("自动训练已禁用(auto_training.enabled=false),跳过定时任务配置")
+            return
+
+        logger.info("=" * 70)
+        logger.info("配置定时任务")
+        logger.info("=" * 70)
+
+        self.scheduler = BackgroundScheduler()
+
+        # 定时增量训练
+        incremental_cfg = self.auto_config['auto_training'].get('incremental', {})
+        if incremental_cfg.get('enabled', False):
+            schedule_time = incremental_cfg.get('schedule_time', '02:00')
+            hour, minute = map(int, schedule_time.split(':'))
+
+            self.scheduler.add_job(
+                self._run_incremental_training,
+                trigger=CronTrigger(hour=hour, minute=minute),
+                id='incremental_training',
+                name='每日增量训练',
+                misfire_grace_time=3600  # 错过1小时内仍执行
+            )
+            logger.info(f"每日增量训练: 每天 {schedule_time}")
+
+        # 定时数据清理
+        data_cfg = self.auto_config['auto_training'].get('data', {})
+        cleanup_time = data_cfg.get('cleanup_time', '00:00')
+        hour, minute = map(int, cleanup_time.split(':'))
+
+        self.scheduler.add_job(
+            self._run_data_cleanup,
+            trigger=CronTrigger(hour=hour, minute=minute),
+            id='data_cleanup',
+            name='每日数据清理',
+            misfire_grace_time=3600
+        )
+        logger.info(f"每日数据清理: 每天 {cleanup_time}")
+
+        self.scheduler.start()
+        logger.info("定时任务调度器已启动")
+        logger.info("=" * 70)
+
+    def _run_incremental_training(self):
+        # 定时任务回调:执行增量训练(逐设备串行,每完成一个即时重载)
+        try:
+            logger.info("定时任务触发:增量训练开始")
+
+            from auto_training.incremental_trainer import IncrementalTrainer
+            trainer = IncrementalTrainer(self.auto_config_file)
+            success = trainer.run_daily_training(
+                on_device_trained=self._reload_single_device
+            )
+
+            if not success:
+                logger.warning("增量训练返回失败,保持当前推理模型不变")
+
+        except Exception as e:
+            logger.error(f"增量训练异常: {e}", exc_info=True)
+
+
+
+    def _run_data_cleanup(self):
+        # 定时任务回调:执行每日数据清理
+        try:
+            logger.info("定时任务触发:数据清理开始")
+            from auto_training.data_cleanup import DataCleaner
+
+            cleaner = DataCleaner(self.auto_config_file)
+            cleaner.run_cleanup()
+        except Exception as e:
+            logger.error(f"数据清理异常: {e}", exc_info=True)
+
+    def start(self):
+        # 主启动流程
+        logger.info("=" * 70)
+        logger.info("拾音器异响检测系统(带自动训练)")
+        logger.info("=" * 70)
+
+        # 1. 创建 PickupMonitoringSystem(会初始化 multi_predictor + 注册设备)
+        logger.info("初始化监控系统...")
+        from run_pickup_monitor import PickupMonitoringSystem
+        self.pickup_system = PickupMonitoringSystem()
+
+        # 2. 检查冷启动(需要在 pickup_system 初始化之后,因为需要设备注册信息)
+        is_cold_start = self._check_and_handle_cold_start()
+
+        # 3. 设置定时任务
+        self._setup_auto_training_tasks()
+
+        # 4. 覆盖信号处理(确保优雅关闭 scheduler)
+        signal.signal(signal.SIGINT, self._signal_handler)
+        signal.signal(signal.SIGTERM, self._signal_handler)
+
+        # 5. 启动拾音器监控(这是阻塞调用,包含主循环)
+        logger.info("启动拾音器监控...")
+        self.pickup_system.start()
+
+    def stop(self):
+        # 关闭所有组件
+        logger.info("停止系统...")
+
+        # 先关 scheduler,避免训练任务在关停过程中触发
+        if self.scheduler and self.scheduler.running:
+            self.scheduler.shutdown(wait=False)
+            logger.info("定时任务调度器已停止")
+
+        if self.pickup_system:
+            self.pickup_system.stop()
+
+        logger.info("系统已停止")
+
+    def _signal_handler(self, signum, frame):
+        logger.info(f"收到信号 {signum}")
+        self.stop()
+        sys.exit(0)
+
+
+def main():
+    setup_logging()
+    system = IntegratedSystem()
+    system.start()
+
+
+if __name__ == "__main__":
+    main()

+ 6 - 6
start.sh

@@ -74,8 +74,8 @@ start_service() {
     activate_conda
     
     # 检查必要文件
-    if [ ! -f "run_pickup_monitor.py" ]; then
-        echo "错误: run_pickup_monitor.py 不存在"
+    if [ ! -f "run_with_auto_training.py" ]; then
+        echo "错误: run_with_auto_training.py 不存在"
         exit 1
     fi
     
@@ -91,7 +91,7 @@ start_service() {
     # 启动服务
     echo "后台运行模式..."
     # stdout/stderr 丢弃,所有日志由 RotatingFileHandler 写入 logs/system.log
-    nohup python run_pickup_monitor.py > /dev/null 2>&1 &
+    nohup python run_with_auto_training.py > /dev/null 2>&1 &
     PID=$!
     echo $PID > "$PID_FILE"
     
@@ -205,8 +205,8 @@ run_foreground() {
     activate_conda
     
     # 检查必要文件
-    if [ ! -f "run_pickup_monitor.py" ]; then
-        echo "错误: run_pickup_monitor.py 不存在"
+    if [ ! -f "run_with_auto_training.py" ]; then
+        echo "错误: run_with_auto_training.py 不存在"
         exit 1
     fi
     
@@ -220,7 +220,7 @@ run_foreground() {
     mkdir -p logs
     
     echo "前台运行模式..."
-    python run_pickup_monitor.py
+    python run_with_auto_training.py
 }
 
 # ========================================

BIN
泵异响模型瞬时故障逻辑优化.docx


+ 92 - 0
瞬时异响方案.md

@@ -0,0 +1,92 @@
+# 泵异响模型瞬时故障逻辑优化
+
+> 日期:2026-03-13
+
+---
+
+## 一、现状
+
+当前检测链路:8秒音频 → AutoEncoder重建误差 → 60秒聚合取均值 → 投票窗口(5中3) → 告警聚合 → 推送。
+
+异常检测**只有重建误差一个指标**。AnomalyClassifier 虽然能提取 RMS、能量波动、频谱质心、周期性等10维特征,但只在异常确认后用于分类(判断是轴承/气蚀/松动等),从未参与检测触发。
+
+快速通道代码已有(`run_pickup_monitor.py:913-965`)但被注释,且存在严格连续、无推送路径等问题。
+
+---
+
+## 二、问题
+
+1. **单次高幅度异响漏检**:单文件极端误差被周期内6-7个正常文件均值拉平,投票窗口无法触发
+2. **间断性异响漏检**:非连续异常在投票窗口中永远凑不够3次
+3. **特征能力浪费**:已有的 RMS/能量/频谱/周期性特征只做事后分类,不做检测
+4. **快速通道不可用**:代码注释 + 严格连续 + pending无消费
+5. **停机前证据丢失**:过渡期抑制丢弃异常记录,无停机前音频归档,无法事后分析
+6. **无PLC场景无关联能力**:EnergyBaseline 能检测启停但没有回调,无法触发归档和关联
+
+---
+
+## 三、方案
+
+### 3.1 瞬时异常检测 + 特征增强
+
+- 将 AnomalyClassifier 的特征提取前移到每文件处理阶段,维护每设备特征滑动基线(最近30个文件)
+- 特征突变检测:RMS突变、能量波动突变、频谱质心跳变、周期性冲击出现,任一满足即标记特征异常
+- 双重确认:重建误差 > 5倍阈值 + 特征异常 → 瞬时告警;误差2-5倍 + 特征异常 → 加速快速通道
+- 启用快速通道:注释代码改为滑动窗口(最近5文件中3个超2倍阈值),补全推送路径
+- AlertAggregator 增加告警类型路由:瞬时告警直接推送、快速预警30秒短窗口、投票告警走现有5分钟窗口
+
+### 3.2 异响-停机关联分析
+
+- 所有通道检测到异常时记入环形缓冲(含过渡期被抑制的,保留证据不丢弃)
+- PumpStateMonitor 和 EnergyBaseline 增加状态变化回调,统一进入停机处理
+- 停机时回溯120秒异常记录,有异常标记"故障停机",归档停机前音频 + 写入元数据
+
+---
+
+## 四、排期
+
+统一下周(3/16 - 3/20)完成开发和测试。
+
+| 日期 | 任务 | 产出 |
+|------|------|------|
+| 周一 3/16 | 特征提取前移 + 特征滑动基线 + 突变检测逻辑 | `_process_new_file()` 新增特征检测 |
+| 周二 3/17 | 快速通道改造(滑动窗口 + 推送路径) + 瞬时告警通道 | 快速通道可用,瞬时告警可用 |
+| 周三 3/18 | AlertAggregator 告警类型路由 + 双重确认矩阵 | 三通道告警路径打通 |
+| 周四 3/19 | 异常事件缓冲 + 停机回调 + 关联分析 + 音频归档 | 停机关联完整可用 |
+| 周五 3/20 | 集成测试 + 参数调优 + 上线观察 | 全流程验证通过 |
+
+---
+
+## 五、测试方案
+
+### 5.1 单元测试(开发过程中同步进行)
+
+| 测试项 | 方法 | 预期 |
+|--------|------|------|
+| 特征突变检测 | 构造正常特征序列,插入一个RMS突增3倍的样本 | `_check_feature_anomaly()` 返回 True |
+| 滑动窗口快速通道 | 喂入5个误差值,其中3个超2倍阈值(不连续) | 触发快速预警 |
+| 瞬时告警双重确认 | 构造误差5倍 + 特征异常 / 仅误差5倍 / 仅特征异常 | 仅双重满足时触发 |
+| 异常事件记录 | 过渡期内触发异常 | 事件记录到缓冲且 suppressed=True |
+| 停机关联 | 记录异常事件后触发停机回调 | 返回"故障停机" + 证据链 |
+| 正常停机 | 无异常事件时触发停机回调 | 返回"正常停机" |
+
+### 5.2 模拟音频测试(周五)
+
+准备三类测试音频:
+
+1. **单次冲击音频**:在正常背景音中插入一段高能量脉冲(模拟瞬时故障)
+2. **间断异常音频**:每隔2-3个正常文件放一个异常文件(模拟间歇性问题)
+3. **正常音频序列**:确保新逻辑不会对正常音频产生误报
+
+测试步骤:
+- 将测试音频放入监控目录,观察日志输出
+- 验证瞬时告警是否在8秒内触发(单文件级别)
+- 验证快速通道是否在24秒内触发(3文件级别)
+- 验证正常音频不触发新通道
+- 手动模拟停机(停止放入音频 / 放入低能量音频),验证关联分析和音频归档
+
+### 5.3 线上灰度验证(周五-下周)
+
+- 新通道先设为 **仅记录不推送**(`enabled: true` + 日志输出但不走推送)
+- 观察1-2天日志,统计触发次数和场景
+- 确认误报率可接受后再开启推送