data_cleanup.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. data_cleanup.py
  5. ---------------
  6. 每日数据清理任务
  7. 功能:
  8. 1. 删除过期的正常音频(超过keep_normal_days天)
  9. 2. 异常音频永久保留(keep_anomaly_days=-1时不删除)
  10. 3. 删除过期的日志文件
  11. """
  12. import sys
  13. import logging
  14. import shutil
  15. from pathlib import Path
  16. from datetime import datetime, timedelta
  17. import yaml
  18. logger = logging.getLogger('DataCleanup')
  19. class DataCleaner:
  20. """数据清理器"""
  21. def __init__(self, config_file: Path = None, config: dict = None):
  22. # 支持两种初始化方式:
  23. # 1. 传 config dict(从数据库读取后直接传入,主程序使用)
  24. # 2. 传 config_file YAML 路径(命令行独立运行使用)
  25. if config is not None:
  26. self.config = config
  27. self.config_file = None
  28. elif config_file is not None:
  29. self.config_file = config_file
  30. self.config = self._load_config()
  31. else:
  32. raise ValueError("必须提供 config_file 或 config 之一")
  33. # 路径配置
  34. self.deploy_root = Path(__file__).parent.parent
  35. self.audio_root = self.deploy_root / "data" / "audio"
  36. self.anomaly_root = self.deploy_root / "data" / "anomaly_detected"
  37. self.backup_dir = self.deploy_root / "models" / "backups"
  38. self.logs_dir = self.deploy_root / "logs"
  39. def _load_config(self):
  40. # 从 YAML 文件加载配置(仅 config_file 模式使用)
  41. with open(self.config_file, 'r', encoding='utf-8') as f:
  42. return yaml.safe_load(f)
  43. def cleanup_old_normal_audio(self):
  44. """清理过期的正常音频(含 pump_transition 和 verified_normal)"""
  45. keep_days = self.config['auto_training']['data']['keep_normal_days']
  46. cutoff_date = (datetime.now() - timedelta(days=keep_days)).strftime('%Y%m%d')
  47. cutoff_ts = (datetime.now() - timedelta(days=keep_days)).timestamp()
  48. logger.info(f"清理 {cutoff_date} 之前的正常音频(保留{keep_days}天)")
  49. total_deleted = 0
  50. total_size = 0
  51. if not self.audio_root.exists():
  52. return
  53. for device_dir in self.audio_root.iterdir():
  54. if not device_dir.is_dir():
  55. continue
  56. for sub_dir in device_dir.iterdir():
  57. if not sub_dir.is_dir():
  58. continue
  59. # current: 正在写入的目录,由主进程负责清理
  60. if sub_dir.name == "current":
  61. continue
  62. # pump_transition / verified_normal:平铺目录,按 mtime 清理
  63. # 这两个目录没有日期子结构,只根据文件修改时间判断过期
  64. if sub_dir.name in ("pump_transition", "verified_normal"):
  65. for f in sub_dir.glob("*.wav"):
  66. try:
  67. st = f.stat()
  68. if st.st_mtime < cutoff_ts:
  69. total_size += st.st_size
  70. f.unlink()
  71. total_deleted += 1
  72. except Exception:
  73. pass
  74. continue
  75. # 日期目录:按目录名比较
  76. if sub_dir.name < cutoff_date:
  77. if sub_dir.exists():
  78. for f in sub_dir.rglob("*.wav"):
  79. total_size += f.stat().st_size
  80. total_deleted += 1
  81. shutil.rmtree(sub_dir)
  82. logger.info(f"已删除: {device_dir.name}/{sub_dir.name}")
  83. logger.info(f"正常音频清理完成: 删除 {total_deleted} 个文件, 释放 {total_size / 1e6:.2f} MB")
  84. def cleanup_old_anomaly_audio(self):
  85. """清理过期的异常音频(-1表示永久保留)"""
  86. keep_days = self.config['auto_training']['data']['keep_anomaly_days']
  87. if keep_days < 0:
  88. logger.info("异常音频配置为永久保留,跳过清理")
  89. return
  90. cutoff_date = (datetime.now() - timedelta(days=keep_days)).strftime('%Y%m%d')
  91. logger.info(f"清理 {cutoff_date} 之前的异常音频(保留{keep_days}天)")
  92. total_deleted = 0
  93. total_size = 0
  94. if not self.anomaly_root.exists():
  95. return
  96. for date_dir in self.anomaly_root.iterdir():
  97. if not date_dir.is_dir():
  98. continue
  99. if date_dir.name < cutoff_date:
  100. for f in date_dir.glob("*.wav"):
  101. total_size += f.stat().st_size
  102. total_deleted += 1
  103. shutil.rmtree(date_dir)
  104. logger.info(f"已删除: anomaly/{date_dir.name}")
  105. logger.info(f"异常音频清理完成: 删除 {total_deleted} 个文件, 释放 {total_size / 1e6:.2f} MB")
  106. def cleanup_old_logs(self):
  107. """清理过期的日志文件"""
  108. logs_config = self.config['auto_training'].get('logs', {})
  109. keep_days = logs_config.get('keep_days', 30)
  110. cutoff_date = datetime.now() - timedelta(days=keep_days)
  111. logger.info(f"清理 {keep_days} 天前的日志文件")
  112. total_deleted = 0
  113. total_size = 0
  114. if not self.logs_dir.exists():
  115. return
  116. for log_file in self.logs_dir.glob("*.log"):
  117. try:
  118. mtime = datetime.fromtimestamp(log_file.stat().st_mtime)
  119. if mtime < cutoff_date:
  120. total_size += log_file.stat().st_size
  121. log_file.unlink()
  122. total_deleted += 1
  123. logger.info(f"已删除日志: {log_file.name}")
  124. except Exception as e:
  125. logger.warning(f"删除日志失败: {log_file.name} | {e}")
  126. logger.info(f"日志清理完成: 删除 {total_deleted} 个文件, 释放 {total_size / 1e6:.2f} MB")
  127. def run_cleanup(self):
  128. """执行清理(主入口)"""
  129. try:
  130. logger.info("=" * 70)
  131. logger.info("开始每日数据清理")
  132. logger.info("=" * 70)
  133. self.cleanup_old_normal_audio()
  134. self.cleanup_old_anomaly_audio()
  135. self.cleanup_old_logs()
  136. logger.info("数据清理完成")
  137. return True
  138. except Exception as e:
  139. logger.error(f"数据清理失败: {e}", exc_info=True)
  140. return False
  141. def main():
  142. """命令行入口"""
  143. logging.basicConfig(
  144. level=logging.INFO,
  145. format='%(asctime)s | %(levelname)-8s | %(message)s',
  146. datefmt='%Y-%m-%d %H:%M:%S'
  147. )
  148. config_file = Path(__file__).parent.parent / "config" / "auto_training.yaml"
  149. cleaner = DataCleaner(config_file)
  150. success = cleaner.run_cleanup()
  151. sys.exit(0 if success else 1)
  152. if __name__ == "__main__":
  153. main()