rtsp_video_extractor.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519
  1. # -*- coding: utf-8 -*-
  2. """
  3. video_frame_extractor.py
  4. Python 3.8+
  5. 修复 NPU(BM) 抽帧“花屏/块状错位/灰块”的常见原因:pipe 实际输出像素格式不是 bgr24
  6. 或存在 stride/对齐问题。
  7. 改动要点(NPU 路径):
  8. 1) ffmpeg 输出改为 rawvideo + yuv420p(I420),避免 bgr24 在 BM 链路中不稳定
  9. 2) Python 按 yuv420p 每帧读取 w*h*3/2 字节,再用 OpenCV 转成 BGR ndarray
  10. 3) CPU/CUDA 仍然直接输出 bgr24(保持原行为)
  11. 同时保留:
  12. - 自动重连
  13. - 分辨率/codec 探测(按你给的特殊 ffprobe 解析)
  14. - NPU/CUDA 检测与 prefer 策略
  15. """
  16. import subprocess as sp
  17. import threading
  18. import queue
  19. import time
  20. import os
  21. import signal
  22. import logging
  23. from logging.handlers import RotatingFileHandler
  24. from typing import Optional, Tuple
  25. import numpy as np
  26. import cv2
  27. class RTSPFrameExtractor:
  28. """
  29. 高并发、自动重连、可选 CUDA / NPU(BM) 加速的 RTSP → Numpy 帧提取器
  30. prefer:
  31. - "npu": 优先 NPU,其次 CUDA,否则 CPU
  32. - "cuda": 优先 CUDA,其次 NPU,否则 CPU
  33. - "cpu": 强制 CPU
  34. """
  35. def __init__(self,
  36. rtsp_url: str,
  37. fps: int = 1,
  38. width: Optional[int] = 1280,
  39. height: Optional[int] = 720,
  40. queue_size: int = 200,
  41. no_frame_timeout: int = 300,
  42. log_path: str = "./logs/frame_extractor.log",
  43. use_cuda: bool = True,
  44. use_npu: bool = True,
  45. prefer: str = "npu",
  46. ):
  47. # ----------------- 基本参数 -----------------
  48. self.rtsp_url = rtsp_url
  49. self.fps = int(fps)
  50. self.width = width
  51. self.height = height
  52. self.codec_name: str = "" # h264/hevc/...
  53. self.output_pix_fmt: str = "bgr24" # NPU 会切到 yuv420p
  54. self.frame_size: int = 0 # bytes per frame (pipe)
  55. self.queue = queue.Queue(maxsize=queue_size)
  56. self.no_frame_timeout = int(no_frame_timeout)
  57. self.proc: Optional[sp.Popen] = None
  58. self.read_thread: Optional[threading.Thread] = None
  59. self.running = threading.Event()
  60. self.last_frame_ts = 0.0
  61. self.last_probe_ts = 0.0
  62. self._restart_lock = threading.Lock()
  63. # ----------------- 加速配置 -----------------
  64. self.use_cuda_cfg = bool(use_cuda)
  65. self.use_npu_cfg = bool(use_npu)
  66. self.prefer = (prefer or "npu").lower().strip()
  67. self.cuda_enabled = False
  68. self.npu_enabled = False
  69. # ----------------- 日志 -----------------
  70. log_dir = os.path.dirname(log_path)
  71. if log_dir and (not os.path.exists(log_dir)):
  72. os.makedirs(log_dir, exist_ok=True)
  73. if not os.path.exists(log_path):
  74. open(log_path, 'w').close()
  75. self.logger = self._init_logger(log_path)
  76. # ----------------- 检测加速能力 -----------------
  77. npu_ok = self._npu_available() if self.use_npu_cfg else False
  78. cuda_ok = self._cuda_available() if self.use_cuda_cfg else False
  79. # 按 prefer 选择
  80. self._select_accel(npu_ok=npu_ok, cuda_ok=cuda_ok)
  81. self.logger.info(
  82. "Accel decision: prefer=%s, npu_ok=%s, cuda_ok=%s -> NPU=%s CUDA=%s",
  83. self.prefer, npu_ok, cuda_ok, self.npu_enabled, self.cuda_enabled
  84. )
  85. # ----------------- 启动 -----------------
  86. self._bootstrap()
  87. # --------------------------------------------------------------------- #
  88. # PUBLIC API #
  89. # --------------------------------------------------------------------- #
  90. def get_frame(self, timeout: float = 1.0) -> Optional[Tuple[np.ndarray, float]]:
  91. """
  92. - 正常返回: (frame: np.ndarray[BGR], timestamp: float)
  93. - 超时 / 无帧: None
  94. """
  95. try:
  96. return self.queue.get(timeout=timeout)
  97. except queue.Empty:
  98. if time.time() - self.last_frame_ts > self.no_frame_timeout:
  99. self.logger.warning("No frame for %.1f sec, restarting...", self.no_frame_timeout)
  100. self._restart()
  101. return None
  102. def stop(self):
  103. self.running.clear()
  104. if self.read_thread and self.read_thread.is_alive():
  105. self.read_thread.join(timeout=2)
  106. if self.proc:
  107. self._kill_proc(self.proc)
  108. self.proc = None
  109. self.logger.info("RTSPFrameExtractor stopped.")
  110. def close(self):
  111. self.stop()
  112. # --------------------------------------------------------------------- #
  113. # INTERNAL #
  114. # --------------------------------------------------------------------- #
  115. def _select_accel(self, npu_ok: bool, cuda_ok: bool):
  116. if self.prefer == "npu":
  117. self.npu_enabled = bool(npu_ok)
  118. self.cuda_enabled = bool((not self.npu_enabled) and cuda_ok)
  119. elif self.prefer == "cuda":
  120. self.cuda_enabled = bool(cuda_ok)
  121. self.npu_enabled = bool((not self.cuda_enabled) and npu_ok)
  122. elif self.prefer == "cpu":
  123. self.cuda_enabled = False
  124. self.npu_enabled = False
  125. else:
  126. # 兜底:npu -> cuda -> cpu
  127. self.npu_enabled = bool(npu_ok)
  128. self.cuda_enabled = bool((not self.npu_enabled) and cuda_ok)
  129. def _bootstrap(self):
  130. if self.width is None or self.height is None or not self.codec_name:
  131. self.logger.info("Probing RTSP resolution/codec...")
  132. w, h, codec = self._probe_resolution_loop()
  133. if self.width is None:
  134. self.width = w
  135. if self.height is None:
  136. self.height = h
  137. self.codec_name = codec or "h264"
  138. self.logger.info("Got stream info: %dx%d codec=%s", self.width, self.height, self.codec_name)
  139. self._recompute_output_format_and_size()
  140. self._start_ffmpeg()
  141. self._start_reader()
  142. def _recompute_output_format_and_size(self):
  143. """
  144. 关键:NPU 走 yuv420p 输出,CPU/CUDA 走 bgr24 输出
  145. """
  146. if not self.width or not self.height:
  147. raise ValueError("width/height not set")
  148. if self.npu_enabled:
  149. # 为了稳定,NPU 输出 yuv420p (I420): size = w*h*3/2
  150. self.output_pix_fmt = "yuv420p"
  151. self.frame_size = int(self.width) * int(self.height) * 3 // 2
  152. else:
  153. self.output_pix_fmt = "bgr24"
  154. self.frame_size = int(self.width) * int(self.height) * 3
  155. # ----------------------------- probing -------------------------------- #
  156. def _probe_resolution_loop(self) -> Tuple[int, int, str]:
  157. while True:
  158. w, h, c = self._probe_once()
  159. if w and h:
  160. return w, h, (c or "h264")
  161. self.logger.warning("ffprobe failed, retry in 2s...")
  162. time.sleep(2)
  163. def _probe_once(self) -> Tuple[int, int, str]:
  164. """
  165. 你给的特殊 ffprobe 解析方式
  166. """
  167. cmd = [
  168. "ffprobe", "-v", "error", "-rtsp_transport", "tcp",
  169. "-select_streams", "v:0",
  170. "-show_entries", "stream=width,height,codec_name",
  171. "-of", "csv=p=0",
  172. self.rtsp_url
  173. ]
  174. try:
  175. process = sp.run(cmd, stdout=sp.PIPE, stderr=sp.STDOUT, timeout=10)
  176. output = process.stdout.decode('utf-8', errors='ignore')
  177. for line in output.splitlines():
  178. line = line.strip()
  179. if not line:
  180. continue
  181. ignore_keywords = ['interleave', 'TCP based', 'BMvid', 'libbmvideo', 'firmware', 'VERSION']
  182. if any(k in line for k in ignore_keywords):
  183. continue
  184. parts = [p.strip() for p in line.split(',') if p.strip()]
  185. if len(parts) < 2:
  186. continue
  187. nums = []
  188. codec_candidate = ""
  189. for p_str in parts:
  190. if p_str.isdigit():
  191. nums.append(int(p_str))
  192. elif len(p_str) < 20:
  193. codec_candidate = p_str.lower()
  194. if len(nums) >= 2:
  195. codec_candidate = codec_candidate or "h264"
  196. if codec_candidate in ("h265", "hevc"):
  197. codec_candidate = "hevc"
  198. elif codec_candidate in ("h264", "avc1", "avc"):
  199. codec_candidate = "h264"
  200. return nums[0], nums[1], codec_candidate
  201. except Exception as e:
  202. self.logger.error(f"ffprobe unexpected error: {e}")
  203. return 0, 0, ""
  204. # --------------------------- ffmpeg 管理 ------------------------------ #
  205. def _ffmpeg_cmd(self):
  206. """
  207. NPU(BM):
  208. -c:v h264_bm/hevc_bm
  209. -vf "scale_bm=...:format=yuv420p,fps=..."
  210. 输出:-f rawvideo -pix_fmt yuv420p -
  211. CPU/CUDA:
  212. 输出:-f rawvideo -pix_fmt bgr24 -
  213. """
  214. cmd = ['ffmpeg', '-loglevel', 'error', '-rtsp_transport', 'tcp']
  215. if self.npu_enabled:
  216. codec = (self.codec_name or "h264").lower()
  217. dec = "hevc_bm" if codec in ("hevc", "h265") else "h264_bm"
  218. vf_parts = []
  219. # 强制经过 scale_bm 并输出 yuv420p,减少 stride/格式不确定性
  220. vf_parts.append(f"scale_bm=w={int(self.width)}:h={int(self.height)}:format=yuv420p")
  221. vf_parts.append(f"fps={int(self.fps)}")
  222. vf = ",".join(vf_parts)
  223. cmd += ['-c:v', dec, '-i', self.rtsp_url, '-vf', vf, '-an',
  224. '-f', 'rawvideo', '-pix_fmt', 'yuv420p', '-']
  225. return cmd
  226. # CUDA(仅硬解,输出仍由 ffmpeg 转成 bgr24)
  227. if self.cuda_enabled:
  228. cmd += ['-hwaccel', 'cuda']
  229. cmd += ['-i', self.rtsp_url,
  230. '-vf', f'fps={int(self.fps)}',
  231. '-an',
  232. '-f', 'rawvideo',
  233. '-pix_fmt', 'bgr24',
  234. '-']
  235. return cmd
  236. def _start_ffmpeg(self):
  237. if self.proc:
  238. self._kill_proc(self.proc)
  239. self.logger.info("Starting ffmpeg... (NPU=%s CUDA=%s codec=%s out_pix=%s)",
  240. self.npu_enabled, self.cuda_enabled, self.codec_name, self.output_pix_fmt)
  241. self.logger.debug("ffmpeg cmd → %s", " ".join(self._ffmpeg_cmd()))
  242. kwargs = {}
  243. if os.name == 'posix':
  244. kwargs['preexec_fn'] = os.setsid
  245. elif os.name == 'nt':
  246. kwargs['creationflags'] = sp.CREATE_NEW_PROCESS_GROUP
  247. self.proc = sp.Popen(
  248. self._ffmpeg_cmd(),
  249. stdout=sp.PIPE,
  250. stderr=sp.PIPE,
  251. bufsize=self.frame_size * 10,
  252. **kwargs
  253. )
  254. self.last_frame_ts = time.time()
  255. self.last_probe_ts = time.time()
  256. # -------------------------- Reader Thread ----------------------------- #
  257. def _start_reader(self):
  258. self.running.set()
  259. self.read_thread = threading.Thread(target=self._reader_loop, daemon=True)
  260. self.read_thread.start()
  261. def _reader_loop(self):
  262. self.logger.info("Reader thread started.")
  263. w = int(self.width)
  264. h = int(self.height)
  265. while self.running.is_set():
  266. try:
  267. if not self.proc or not self.proc.stdout:
  268. time.sleep(0.1)
  269. continue
  270. raw = self.proc.stdout.read(self.frame_size)
  271. ts = time.time()
  272. except Exception as e:
  273. self.logger.error("Read error: %s", e)
  274. raw, ts = b'', time.time()
  275. if len(raw) != self.frame_size:
  276. self.logger.warning("Incomplete frame (%d/%d bytes).", len(raw), self.frame_size)
  277. # 进程退出则重启
  278. if self.proc and (self.proc.poll() is not None):
  279. self.logger.warning("ffmpeg exited with code=%s, restarting...", self.proc.returncode)
  280. self._restart()
  281. return
  282. time.sleep(0.05)
  283. continue
  284. # --------- 按输出像素格式解包 ---------
  285. if self.output_pix_fmt == "yuv420p":
  286. # I420: shape = (h*3/2, w)
  287. yuv = np.frombuffer(raw, np.uint8).reshape((h * 3 // 2, w))
  288. frame = cv2.cvtColor(yuv, cv2.COLOR_YUV2BGR_I420)
  289. else:
  290. frame = np.frombuffer(raw, np.uint8).reshape((h, w, 3)).copy()
  291. # 入队
  292. try:
  293. self.queue.put_nowait((frame, ts))
  294. except queue.Full:
  295. _ = self.queue.get_nowait()
  296. self.queue.put_nowait((frame, ts))
  297. self.last_frame_ts = time.time()
  298. # 每小时 probe 一次(分辨率/codec 变化就重启)
  299. if time.time() - self.last_probe_ts >= 3600:
  300. self.last_probe_ts = time.time()
  301. nw, nh, ncodec = self._probe_once()
  302. if nw and nh:
  303. changed = (nw != self.width or nh != self.height or (ncodec and ncodec != self.codec_name))
  304. if changed:
  305. self.logger.warning("Stream info changed %dx%d/%s → %dx%d/%s, restarting...",
  306. self.width, self.height, self.codec_name, nw, nh, ncodec)
  307. self.width, self.height = nw, nh
  308. self.codec_name = ncodec or self.codec_name
  309. self._recompute_output_format_and_size()
  310. self._restart()
  311. return
  312. self.logger.info("Reader thread exit.")
  313. # ----------------------------- Restart -------------------------------- #
  314. def _restart(self):
  315. with self._restart_lock:
  316. self.running.clear()
  317. if self.read_thread and self.read_thread.is_alive():
  318. self.read_thread.join(timeout=2)
  319. if self.proc:
  320. self._kill_proc(self.proc)
  321. self.proc = None
  322. self.logger.info("Restarting, probing resolution/codec...")
  323. w, h, codec = self._probe_resolution_loop()
  324. self.width, self.height, self.codec_name = w, h, (codec or self.codec_name or "h264")
  325. # 重启时重新检测设备能力
  326. npu_ok = self._npu_available() if self.use_npu_cfg else False
  327. cuda_ok = self._cuda_available() if self.use_cuda_cfg else False
  328. self._select_accel(npu_ok=npu_ok, cuda_ok=cuda_ok)
  329. self._recompute_output_format_and_size()
  330. self.logger.info("New stream info: %dx%d codec=%s | Accel: NPU=%s CUDA=%s | out_pix=%s",
  331. self.width, self.height, self.codec_name, self.npu_enabled, self.cuda_enabled, self.output_pix_fmt)
  332. self._start_ffmpeg()
  333. self._start_reader()
  334. # ----------------------------- Utils ---------------------------------- #
  335. @staticmethod
  336. def _kill_proc(proc: sp.Popen):
  337. if proc and proc.poll() is None:
  338. try:
  339. if os.name == 'posix':
  340. os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
  341. elif os.name == 'nt':
  342. proc.send_signal(signal.CTRL_BREAK_EVENT)
  343. except Exception:
  344. try:
  345. proc.terminate()
  346. except Exception:
  347. pass
  348. try:
  349. proc.wait(timeout=3)
  350. except sp.TimeoutExpired:
  351. try:
  352. proc.kill()
  353. except Exception:
  354. pass
  355. def _cuda_available(self) -> bool:
  356. try:
  357. out = sp.check_output(
  358. ['ffmpeg', '-hide_banner', '-hwaccels'],
  359. stderr=sp.STDOUT, timeout=3
  360. ).decode(errors='ignore').lower()
  361. if 'cuda' not in out:
  362. return False
  363. except Exception:
  364. return False
  365. try:
  366. sp.check_output(['nvidia-smi', '-L'], stderr=sp.STDOUT, timeout=3)
  367. except Exception:
  368. return False
  369. return True
  370. def _npu_available(self) -> bool:
  371. """
  372. 检测 BM NPU 能力:
  373. - codecs 里存在 h264_bm/hevc_bm
  374. - filters 里存在 scale_bm
  375. """
  376. try:
  377. codecs = sp.check_output(
  378. ['ffmpeg', '-hide_banner', '-codecs'],
  379. stderr=sp.STDOUT, timeout=3
  380. ).decode(errors='ignore').lower()
  381. if ('h264_bm' not in codecs) and ('hevc_bm' not in codecs):
  382. return False
  383. except Exception:
  384. return False
  385. try:
  386. filters = sp.check_output(
  387. ['ffmpeg', '-hide_banner', '-filters'],
  388. stderr=sp.STDOUT, timeout=3
  389. ).decode(errors='ignore').lower()
  390. if 'scale_bm' not in filters:
  391. return False
  392. except Exception:
  393. return False
  394. return True
  395. @staticmethod
  396. def _init_logger(log_path: str):
  397. logger = logging.getLogger("FrameExtractor")
  398. if logger.handlers:
  399. return logger
  400. logger.setLevel(logging.INFO)
  401. handler = RotatingFileHandler(
  402. log_path, maxBytes=10 * 1024 * 1024,
  403. backupCount=5, encoding='utf-8'
  404. )
  405. fmt = logging.Formatter(
  406. fmt="%(asctime)s %(levelname)s: %(message)s",
  407. datefmt="%Y-%m-%d %H:%M:%S"
  408. )
  409. handler.setFormatter(fmt)
  410. logger.addHandler(handler)
  411. console = logging.StreamHandler()
  412. console.setFormatter(fmt)
  413. logger.addHandler(console)
  414. return logger
  415. # ----------------------------- Demo ---------------------------------- #
  416. if __name__ == "__main__":
  417. RTSP = "rtsp://rtsp:newwater123@222.130.26.194:59371/streaming/channels/401"
  418. extractor = RTSPFrameExtractor(
  419. rtsp_url=RTSP,
  420. fps=1,
  421. use_npu=True,
  422. use_cuda=True,
  423. prefer="npu",
  424. width=1920,
  425. height=1080
  426. )
  427. try:
  428. while True:
  429. item = extractor.get_frame(timeout=2)
  430. if item is None:
  431. continue
  432. frame, ts = item
  433. cv2.putText(frame, f"Time: {ts:.3f}", (10, 30),
  434. cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
  435. cv2.imshow("RTSP Stream", frame)
  436. if cv2.waitKey(1) & 0xFF == ord('q'):
  437. break
  438. finally:
  439. extractor.close()
  440. cv2.destroyAllWindows()