human_detection_reader.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. """
  2. 人体检测 SQLite 读取模块
  3. 读取 YOLO 人体检测结果,判断是否在冷却期内
  4. 用于抑制异常报警:5 分钟内检测到人 → 不报警
  5. """
  6. import sqlite3
  7. from datetime import datetime, timedelta
  8. from pathlib import Path
  9. from typing import Optional
  10. import logging
  11. logger = logging.getLogger('PickupMonitor')
  12. class HumanDetectionReader:
  13. """
  14. 人体检测结果读取器
  15. 只要任意摄像头在指定时间窗口内检测到人,就返回 True(抑制报警)
  16. """
  17. def __init__(self, db_path: str, cooldown_minutes: int = 5):
  18. """
  19. 初始化读取器
  20. 参数:
  21. db_path: SQLite 数据库路径
  22. cooldown_minutes: 冷却时间(分钟),检测到人后多久内抑制报警
  23. """
  24. self.db_path = Path(db_path) if db_path else None
  25. self.cooldown_minutes = cooldown_minutes
  26. # 缓存:避免频繁查询数据库
  27. self._cache_result: Optional[bool] = None
  28. self._cache_time: Optional[datetime] = None
  29. self._cache_ttl_seconds = 10 # 缓存有效期(秒)
  30. def _get_connection(self) -> Optional[sqlite3.Connection]:
  31. """
  32. 获取数据库连接
  33. 返回:
  34. 连接对象,数据库不存在时返回 None
  35. """
  36. if self.db_path is None or not self.db_path.exists():
  37. return None
  38. try:
  39. conn = sqlite3.connect(str(self.db_path), timeout=5)
  40. return conn
  41. except Exception as e:
  42. logger.warning(f"人体检测数据库连接失败: {e}")
  43. return None
  44. def get_last_person_time(self) -> Optional[datetime]:
  45. """
  46. 获取最后一次检测到人的时间(任意摄像头)
  47. 返回:
  48. 最后检测到人的时间,无数据返回 None
  49. """
  50. conn = self._get_connection()
  51. if conn is None:
  52. return None
  53. try:
  54. row = conn.execute("""
  55. SELECT datetime FROM detections
  56. WHERE detected = 1
  57. ORDER BY datetime DESC
  58. LIMIT 1
  59. """).fetchone()
  60. conn.close()
  61. if row:
  62. # 解析时间字符串
  63. return datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S")
  64. return None
  65. except Exception as e:
  66. logger.warning(f"查询人体检测数据失败: {e}")
  67. if conn:
  68. conn.close()
  69. return None
  70. def is_in_cooldown(self) -> bool:
  71. """
  72. 判断是否在冷却期内(任意摄像头检测到人后的 N 分钟内)
  73. 返回:
  74. True: 在冷却期内,应抑制报警
  75. False: 不在冷却期,可正常报警
  76. """
  77. now = datetime.now()
  78. # 检查缓存是否有效
  79. if (self._cache_result is not None and
  80. self._cache_time is not None and
  81. (now - self._cache_time).total_seconds() < self._cache_ttl_seconds):
  82. return self._cache_result
  83. # 查询数据库
  84. last_person_time = self.get_last_person_time()
  85. if last_person_time is None:
  86. result = False # 从未检测到人,不抑制
  87. else:
  88. # 判断是否在冷却期内
  89. cooldown_end = last_person_time + timedelta(minutes=self.cooldown_minutes)
  90. result = now < cooldown_end
  91. # 更新缓存
  92. self._cache_result = result
  93. self._cache_time = now
  94. return result
  95. def get_status_info(self) -> str:
  96. """
  97. 获取状态信息(用于日志)
  98. 返回:
  99. 状态描述字符串
  100. """
  101. last_time = self.get_last_person_time()
  102. if last_time is None:
  103. return "无人体检测数据"
  104. elapsed = (datetime.now() - last_time).total_seconds() / 60
  105. if elapsed < self.cooldown_minutes:
  106. remaining = self.cooldown_minutes - elapsed
  107. return f"冷却中(剩余{remaining:.1f}分钟)"
  108. else:
  109. return f"已超时({elapsed:.1f}分钟前有人)"