data_cleanup.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. data_cleanup.py
  5. ---------------
  6. 每日数据清理任务
  7. 功能:
  8. 1. 删除过期的正常音频(超过keep_normal_days天)
  9. 2. 异常音频永久保留(keep_anomaly_days=-1时不删除)
  10. 3. 删除过期的日志文件
  11. """
  12. import sys
  13. import logging
  14. import shutil
  15. from pathlib import Path
  16. from datetime import datetime, timedelta
  17. import yaml
  18. logger = logging.getLogger('DataCleanup')
  19. class DataCleaner:
  20. """数据清理器"""
  21. def __init__(self, config_file: Path = None, config: dict = None):
  22. # 支持两种初始化方式:
  23. # 1. 传 config dict(从数据库读取后直接传入,主程序使用)
  24. # 2. 传 config_file YAML 路径(命令行独立运行使用)
  25. if config is not None:
  26. self.config = config
  27. self.config_file = None
  28. elif config_file is not None:
  29. self.config_file = config_file
  30. self.config = self._load_config()
  31. else:
  32. raise ValueError("必须提供 config_file 或 config 之一")
  33. # 路径配置
  34. self.deploy_root = Path(__file__).parent.parent
  35. self.audio_root = self.deploy_root / "data" / "audio"
  36. self.anomaly_root = self.deploy_root / "data" / "anomaly_detected"
  37. self.backup_dir = self.deploy_root / "models" / "backups"
  38. self.logs_dir = self.deploy_root / "logs"
  39. def _load_config(self):
  40. # 从 YAML 文件加载配置(仅 config_file 模式使用)
  41. with open(self.config_file, 'r', encoding='utf-8') as f:
  42. return yaml.safe_load(f)
  43. def cleanup_old_normal_audio(self):
  44. """清理过期的正常音频"""
  45. keep_days = self.config['auto_training']['data']['keep_normal_days']
  46. cutoff_date = (datetime.now() - timedelta(days=keep_days)).strftime('%Y%m%d')
  47. logger.info(f"清理 {cutoff_date} 之前的正常音频(保留{keep_days}天)")
  48. total_deleted = 0
  49. total_size = 0
  50. if not self.audio_root.exists():
  51. return
  52. # deploy_pickup使用设备目录结构: audio/{device_code}/{date}/*.wav
  53. for device_dir in self.audio_root.iterdir():
  54. if not device_dir.is_dir():
  55. continue
  56. for date_dir in device_dir.iterdir():
  57. # current: 正在写入的目录; verified_normal: 核查确认的正常音频(增训用)
  58. if not date_dir.is_dir() or date_dir.name in ("current", "verified_normal"):
  59. continue
  60. # 检查日期
  61. if date_dir.name < cutoff_date:
  62. if date_dir.exists():
  63. # rglob 递归统计所有子目录(normal/ + pump_transition/)中的音频
  64. for f in date_dir.rglob("*.wav"):
  65. total_size += f.stat().st_size
  66. total_deleted += 1
  67. shutil.rmtree(date_dir)
  68. logger.info(f"已删除: {device_dir.name}/{date_dir.name}")
  69. logger.info(f"正常音频清理完成: 删除 {total_deleted} 个文件, 释放 {total_size / 1e6:.2f} MB")
  70. def cleanup_old_anomaly_audio(self):
  71. """清理过期的异常音频(-1表示永久保留)"""
  72. keep_days = self.config['auto_training']['data']['keep_anomaly_days']
  73. if keep_days < 0:
  74. logger.info("异常音频配置为永久保留,跳过清理")
  75. return
  76. cutoff_date = (datetime.now() - timedelta(days=keep_days)).strftime('%Y%m%d')
  77. logger.info(f"清理 {cutoff_date} 之前的异常音频(保留{keep_days}天)")
  78. total_deleted = 0
  79. total_size = 0
  80. if not self.anomaly_root.exists():
  81. return
  82. for date_dir in self.anomaly_root.iterdir():
  83. if not date_dir.is_dir():
  84. continue
  85. if date_dir.name < cutoff_date:
  86. for f in date_dir.glob("*.wav"):
  87. total_size += f.stat().st_size
  88. total_deleted += 1
  89. shutil.rmtree(date_dir)
  90. logger.info(f"已删除: anomaly/{date_dir.name}")
  91. logger.info(f"异常音频清理完成: 删除 {total_deleted} 个文件, 释放 {total_size / 1e6:.2f} MB")
  92. def cleanup_old_logs(self):
  93. """清理过期的日志文件"""
  94. logs_config = self.config['auto_training'].get('logs', {})
  95. keep_days = logs_config.get('keep_days', 30)
  96. cutoff_date = datetime.now() - timedelta(days=keep_days)
  97. logger.info(f"清理 {keep_days} 天前的日志文件")
  98. total_deleted = 0
  99. total_size = 0
  100. if not self.logs_dir.exists():
  101. return
  102. for log_file in self.logs_dir.glob("*.log"):
  103. try:
  104. mtime = datetime.fromtimestamp(log_file.stat().st_mtime)
  105. if mtime < cutoff_date:
  106. total_size += log_file.stat().st_size
  107. log_file.unlink()
  108. total_deleted += 1
  109. logger.info(f"已删除日志: {log_file.name}")
  110. except Exception as e:
  111. logger.warning(f"删除日志失败: {log_file.name} | {e}")
  112. logger.info(f"日志清理完成: 删除 {total_deleted} 个文件, 释放 {total_size / 1e6:.2f} MB")
  113. def run_cleanup(self):
  114. """执行清理(主入口)"""
  115. try:
  116. logger.info("=" * 70)
  117. logger.info("开始每日数据清理")
  118. logger.info("=" * 70)
  119. self.cleanup_old_normal_audio()
  120. self.cleanup_old_anomaly_audio()
  121. self.cleanup_old_logs()
  122. logger.info("数据清理完成")
  123. return True
  124. except Exception as e:
  125. logger.error(f"数据清理失败: {e}", exc_info=True)
  126. return False
  127. def main():
  128. """命令行入口"""
  129. logging.basicConfig(
  130. level=logging.INFO,
  131. format='%(asctime)s | %(levelname)-8s | %(message)s',
  132. datefmt='%Y-%m-%d %H:%M:%S'
  133. )
  134. config_file = Path(__file__).parent.parent / "config" / "auto_training.yaml"
  135. cleaner = DataCleaner(config_file)
  136. success = cleaner.run_cleanup()
  137. sys.exit(0 if success else 1)
  138. if __name__ == "__main__":
  139. main()