| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- data_cleanup.py
- ---------------
- 每日数据清理任务
- 功能:
- 1. 删除过期的正常音频(超过keep_normal_days天)
- 2. 异常音频永久保留(keep_anomaly_days=-1时不删除)
- 3. 删除过期的日志文件
- """
- import sys
- import logging
- import shutil
- from pathlib import Path
- from datetime import datetime, timedelta
- import yaml
- logger = logging.getLogger('DataCleanup')
- class DataCleaner:
- """数据清理器"""
-
- def __init__(self, config_file: Path):
- """初始化清理器"""
- self.config_file = config_file
- self.config = self._load_config()
-
- # 路径配置
- self.deploy_root = Path(__file__).parent.parent
- self.audio_root = self.deploy_root / "data" / "audio"
- self.anomaly_root = self.deploy_root / "data" / "anomaly_detected"
- self.backup_dir = self.deploy_root / "models" / "backups"
- self.logs_dir = self.deploy_root / "logs"
-
- def _load_config(self):
- """加载配置"""
- with open(self.config_file, 'r', encoding='utf-8') as f:
- return yaml.safe_load(f)
-
- def cleanup_old_normal_audio(self):
- """清理过期的正常音频"""
- keep_days = self.config['auto_training']['data']['keep_normal_days']
- cutoff_date = (datetime.now() - timedelta(days=keep_days)).strftime('%Y%m%d')
-
- logger.info(f"清理 {cutoff_date} 之前的正常音频(保留{keep_days}天)")
-
- total_deleted = 0
- total_size = 0
-
- if not self.audio_root.exists():
- return
-
- # deploy_pickup使用设备目录结构: audio/{device_code}/{date}/*.wav
- for device_dir in self.audio_root.iterdir():
- if not device_dir.is_dir():
- continue
-
- for date_dir in device_dir.iterdir():
- if not date_dir.is_dir() or date_dir.name == "current":
- continue
-
- # 检查日期
- if date_dir.name < cutoff_date:
- if date_dir.exists():
- for f in date_dir.rglob("*.wav"):
- total_size += f.stat().st_size
- total_deleted += 1
-
- shutil.rmtree(date_dir)
- logger.info(f"已删除: {device_dir.name}/{date_dir.name}")
-
- logger.info(f"正常音频清理完成: 删除 {total_deleted} 个文件, 释放 {total_size / 1e6:.2f} MB")
-
- def cleanup_old_anomaly_audio(self):
- """清理过期的异常音频(-1表示永久保留)"""
- keep_days = self.config['auto_training']['data']['keep_anomaly_days']
-
- if keep_days < 0:
- logger.info("异常音频配置为永久保留,跳过清理")
- return
-
- cutoff_date = (datetime.now() - timedelta(days=keep_days)).strftime('%Y%m%d')
-
- logger.info(f"清理 {cutoff_date} 之前的异常音频(保留{keep_days}天)")
-
- total_deleted = 0
- total_size = 0
-
- if not self.anomaly_root.exists():
- return
-
- for date_dir in self.anomaly_root.iterdir():
- if not date_dir.is_dir():
- continue
-
- if date_dir.name < cutoff_date:
- for f in date_dir.glob("*.wav"):
- total_size += f.stat().st_size
- total_deleted += 1
-
- shutil.rmtree(date_dir)
- logger.info(f"已删除: anomaly/{date_dir.name}")
-
- logger.info(f"异常音频清理完成: 删除 {total_deleted} 个文件, 释放 {total_size / 1e6:.2f} MB")
-
- def cleanup_old_logs(self):
- """清理过期的日志文件"""
- logs_config = self.config['auto_training'].get('logs', {})
- keep_days = logs_config.get('keep_days', 30)
-
- cutoff_date = datetime.now() - timedelta(days=keep_days)
-
- logger.info(f"清理 {keep_days} 天前的日志文件")
-
- total_deleted = 0
- total_size = 0
-
- if not self.logs_dir.exists():
- return
-
- for log_file in self.logs_dir.glob("*.log"):
- try:
- mtime = datetime.fromtimestamp(log_file.stat().st_mtime)
-
- if mtime < cutoff_date:
- total_size += log_file.stat().st_size
- log_file.unlink()
- total_deleted += 1
- logger.info(f"已删除日志: {log_file.name}")
- except Exception as e:
- logger.warning(f"删除日志失败: {log_file.name} | {e}")
-
- logger.info(f"日志清理完成: 删除 {total_deleted} 个文件, 释放 {total_size / 1e6:.2f} MB")
-
- def run_cleanup(self):
- """执行清理(主入口)"""
- try:
- logger.info("=" * 70)
- logger.info("开始每日数据清理")
- logger.info("=" * 70)
-
- self.cleanup_old_normal_audio()
- self.cleanup_old_anomaly_audio()
- self.cleanup_old_logs()
-
- logger.info("数据清理完成")
- return True
-
- except Exception as e:
- logger.error(f"数据清理失败: {e}", exc_info=True)
- return False
- def main():
- """命令行入口"""
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s | %(levelname)-8s | %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S'
- )
-
- config_file = Path(__file__).parent.parent / "config" / "auto_training.yaml"
- cleaner = DataCleaner(config_file)
- success = cleaner.run_cleanup()
-
- sys.exit(0 if success else 1)
- if __name__ == "__main__":
- main()
|