# -*- coding: utf-8 -*- """ video_frame_extractor.py Python 3.8+ 修复 NPU(BM) 抽帧“花屏/块状错位/灰块”的常见原因:pipe 实际输出像素格式不是 bgr24 或存在 stride/对齐问题。 改动要点(NPU 路径): 1) ffmpeg 输出改为 rawvideo + yuv420p(I420),避免 bgr24 在 BM 链路中不稳定 2) Python 按 yuv420p 每帧读取 w*h*3/2 字节,再用 OpenCV 转成 BGR ndarray 3) CPU/CUDA 仍然直接输出 bgr24(保持原行为) 同时保留: - 自动重连 - 分辨率/codec 探测(按你给的特殊 ffprobe 解析) - NPU/CUDA 检测与 prefer 策略 """ import subprocess as sp import threading import queue import time import os import signal import logging from logging.handlers import RotatingFileHandler from typing import Optional, Tuple import numpy as np import cv2 class RTSPFrameExtractor: """ 高并发、自动重连、可选 CUDA / NPU(BM) 加速的 RTSP → Numpy 帧提取器 prefer: - "npu": 优先 NPU,其次 CUDA,否则 CPU - "cuda": 优先 CUDA,其次 NPU,否则 CPU - "cpu": 强制 CPU """ def __init__(self, rtsp_url: str, fps: int = 1, width: Optional[int] = 1280, height: Optional[int] = 720, queue_size: int = 200, no_frame_timeout: int = 300, log_path: str = "./logs/frame_extractor.log", use_cuda: bool = True, use_npu: bool = True, prefer: str = "npu", ): # ----------------- 基本参数 ----------------- self.rtsp_url = rtsp_url self.fps = int(fps) self.width = width self.height = height self.codec_name: str = "" # h264/hevc/... self.output_pix_fmt: str = "bgr24" # NPU 会切到 yuv420p self.frame_size: int = 0 # bytes per frame (pipe) self.queue = queue.Queue(maxsize=queue_size) self.no_frame_timeout = int(no_frame_timeout) self.proc: Optional[sp.Popen] = None self.read_thread: Optional[threading.Thread] = None self.running = threading.Event() self.last_frame_ts = 0.0 self.last_probe_ts = 0.0 self._restart_lock = threading.Lock() # ----------------- 加速配置 ----------------- self.use_cuda_cfg = bool(use_cuda) self.use_npu_cfg = bool(use_npu) self.prefer = (prefer or "npu").lower().strip() self.cuda_enabled = False self.npu_enabled = False # ----------------- 日志 ----------------- log_dir = os.path.dirname(log_path) if log_dir and (not os.path.exists(log_dir)): os.makedirs(log_dir, exist_ok=True) if not os.path.exists(log_path): open(log_path, 'w').close() self.logger = self._init_logger(log_path) # ----------------- 检测加速能力 ----------------- npu_ok = self._npu_available() if self.use_npu_cfg else False cuda_ok = self._cuda_available() if self.use_cuda_cfg else False # 按 prefer 选择 self._select_accel(npu_ok=npu_ok, cuda_ok=cuda_ok) self.logger.info( "Accel decision: prefer=%s, npu_ok=%s, cuda_ok=%s -> NPU=%s CUDA=%s", self.prefer, npu_ok, cuda_ok, self.npu_enabled, self.cuda_enabled ) # ----------------- 启动 ----------------- self._bootstrap() # --------------------------------------------------------------------- # # PUBLIC API # # --------------------------------------------------------------------- # def get_frame(self, timeout: float = 1.0) -> Optional[Tuple[np.ndarray, float]]: """ - 正常返回: (frame: np.ndarray[BGR], timestamp: float) - 超时 / 无帧: None """ try: return self.queue.get(timeout=timeout) except queue.Empty: if time.time() - self.last_frame_ts > self.no_frame_timeout: self.logger.warning("No frame for %.1f sec, restarting...", self.no_frame_timeout) self._restart() return None def stop(self): self.running.clear() if self.read_thread and self.read_thread.is_alive(): self.read_thread.join(timeout=2) if self.proc: self._kill_proc(self.proc) self.proc = None self.logger.info("RTSPFrameExtractor stopped.") def close(self): self.stop() # --------------------------------------------------------------------- # # INTERNAL # # --------------------------------------------------------------------- # def _select_accel(self, npu_ok: bool, cuda_ok: bool): if self.prefer == "npu": self.npu_enabled = bool(npu_ok) self.cuda_enabled = bool((not self.npu_enabled) and cuda_ok) elif self.prefer == "cuda": self.cuda_enabled = bool(cuda_ok) self.npu_enabled = bool((not self.cuda_enabled) and npu_ok) elif self.prefer == "cpu": self.cuda_enabled = False self.npu_enabled = False else: # 兜底:npu -> cuda -> cpu self.npu_enabled = bool(npu_ok) self.cuda_enabled = bool((not self.npu_enabled) and cuda_ok) def _bootstrap(self): if self.width is None or self.height is None or not self.codec_name: self.logger.info("Probing RTSP resolution/codec...") w, h, codec = self._probe_resolution_loop() if self.width is None: self.width = w if self.height is None: self.height = h self.codec_name = codec or "h264" self.logger.info("Got stream info: %dx%d codec=%s", self.width, self.height, self.codec_name) self._recompute_output_format_and_size() self._start_ffmpeg() self._start_reader() def _recompute_output_format_and_size(self): """ 关键:NPU 走 yuv420p 输出,CPU/CUDA 走 bgr24 输出 """ if not self.width or not self.height: raise ValueError("width/height not set") if self.npu_enabled: # 为了稳定,NPU 输出 yuv420p (I420): size = w*h*3/2 self.output_pix_fmt = "yuv420p" self.frame_size = int(self.width) * int(self.height) * 3 // 2 else: self.output_pix_fmt = "bgr24" self.frame_size = int(self.width) * int(self.height) * 3 # ----------------------------- probing -------------------------------- # def _probe_resolution_loop(self) -> Tuple[int, int, str]: while True: w, h, c = self._probe_once() if w and h: return w, h, (c or "h264") self.logger.warning("ffprobe failed, retry in 2s...") time.sleep(2) def _probe_once(self) -> Tuple[int, int, str]: """ 你给的特殊 ffprobe 解析方式 """ cmd = [ "ffprobe", "-v", "error", "-rtsp_transport", "tcp", "-select_streams", "v:0", "-show_entries", "stream=width,height,codec_name", "-of", "csv=p=0", self.rtsp_url ] try: process = sp.run(cmd, stdout=sp.PIPE, stderr=sp.STDOUT, timeout=10) output = process.stdout.decode('utf-8', errors='ignore') for line in output.splitlines(): line = line.strip() if not line: continue ignore_keywords = ['interleave', 'TCP based', 'BMvid', 'libbmvideo', 'firmware', 'VERSION'] if any(k in line for k in ignore_keywords): continue parts = [p.strip() for p in line.split(',') if p.strip()] if len(parts) < 2: continue nums = [] codec_candidate = "" for p_str in parts: if p_str.isdigit(): nums.append(int(p_str)) elif len(p_str) < 20: codec_candidate = p_str.lower() if len(nums) >= 2: codec_candidate = codec_candidate or "h264" if codec_candidate in ("h265", "hevc"): codec_candidate = "hevc" elif codec_candidate in ("h264", "avc1", "avc"): codec_candidate = "h264" return nums[0], nums[1], codec_candidate except Exception as e: self.logger.error(f"ffprobe unexpected error: {e}") return 0, 0, "" # --------------------------- ffmpeg 管理 ------------------------------ # def _ffmpeg_cmd(self): """ NPU(BM): -c:v h264_bm/hevc_bm -vf "scale_bm=...:format=yuv420p,fps=..." 输出:-f rawvideo -pix_fmt yuv420p - CPU/CUDA: 输出:-f rawvideo -pix_fmt bgr24 - """ cmd = ['ffmpeg', '-loglevel', 'error', '-rtsp_transport', 'tcp'] if self.npu_enabled: codec = (self.codec_name or "h264").lower() dec = "hevc_bm" if codec in ("hevc", "h265") else "h264_bm" vf_parts = [] # 强制经过 scale_bm 并输出 yuv420p,减少 stride/格式不确定性 vf_parts.append(f"scale_bm=w={int(self.width)}:h={int(self.height)}:format=yuv420p") vf_parts.append(f"fps={int(self.fps)}") vf = ",".join(vf_parts) cmd += ['-c:v', dec, '-i', self.rtsp_url, '-vf', vf, '-an', '-f', 'rawvideo', '-pix_fmt', 'yuv420p', '-'] return cmd # CUDA(仅硬解,输出仍由 ffmpeg 转成 bgr24) if self.cuda_enabled: cmd += ['-hwaccel', 'cuda'] cmd += ['-i', self.rtsp_url, '-vf', f'fps={int(self.fps)}', '-an', '-f', 'rawvideo', '-pix_fmt', 'bgr24', '-'] return cmd def _start_ffmpeg(self): if self.proc: self._kill_proc(self.proc) self.logger.info("Starting ffmpeg... (NPU=%s CUDA=%s codec=%s out_pix=%s)", self.npu_enabled, self.cuda_enabled, self.codec_name, self.output_pix_fmt) self.logger.debug("ffmpeg cmd → %s", " ".join(self._ffmpeg_cmd())) kwargs = {} if os.name == 'posix': kwargs['preexec_fn'] = os.setsid elif os.name == 'nt': kwargs['creationflags'] = sp.CREATE_NEW_PROCESS_GROUP self.proc = sp.Popen( self._ffmpeg_cmd(), stdout=sp.PIPE, stderr=sp.PIPE, bufsize=self.frame_size * 10, **kwargs ) self.last_frame_ts = time.time() self.last_probe_ts = time.time() # -------------------------- Reader Thread ----------------------------- # def _start_reader(self): self.running.set() self.read_thread = threading.Thread(target=self._reader_loop, daemon=True) self.read_thread.start() def _reader_loop(self): self.logger.info("Reader thread started.") w = int(self.width) h = int(self.height) while self.running.is_set(): try: if not self.proc or not self.proc.stdout: time.sleep(0.1) continue raw = self.proc.stdout.read(self.frame_size) ts = time.time() except Exception as e: self.logger.error("Read error: %s", e) raw, ts = b'', time.time() if len(raw) != self.frame_size: self.logger.warning("Incomplete frame (%d/%d bytes).", len(raw), self.frame_size) # 进程退出则重启 if self.proc and (self.proc.poll() is not None): self.logger.warning("ffmpeg exited with code=%s, restarting...", self.proc.returncode) self._restart() return time.sleep(0.05) continue # --------- 按输出像素格式解包 --------- if self.output_pix_fmt == "yuv420p": # I420: shape = (h*3/2, w) yuv = np.frombuffer(raw, np.uint8).reshape((h * 3 // 2, w)) frame = cv2.cvtColor(yuv, cv2.COLOR_YUV2BGR_I420) else: frame = np.frombuffer(raw, np.uint8).reshape((h, w, 3)).copy() # 入队 try: self.queue.put_nowait((frame, ts)) except queue.Full: _ = self.queue.get_nowait() self.queue.put_nowait((frame, ts)) self.last_frame_ts = time.time() # 每小时 probe 一次(分辨率/codec 变化就重启) if time.time() - self.last_probe_ts >= 3600: self.last_probe_ts = time.time() nw, nh, ncodec = self._probe_once() if nw and nh: changed = (nw != self.width or nh != self.height or (ncodec and ncodec != self.codec_name)) if changed: self.logger.warning("Stream info changed %dx%d/%s → %dx%d/%s, restarting...", self.width, self.height, self.codec_name, nw, nh, ncodec) self.width, self.height = nw, nh self.codec_name = ncodec or self.codec_name self._recompute_output_format_and_size() self._restart() return self.logger.info("Reader thread exit.") # ----------------------------- Restart -------------------------------- # def _restart(self): with self._restart_lock: self.running.clear() if self.read_thread and self.read_thread.is_alive(): self.read_thread.join(timeout=2) if self.proc: self._kill_proc(self.proc) self.proc = None self.logger.info("Restarting, probing resolution/codec...") w, h, codec = self._probe_resolution_loop() self.width, self.height, self.codec_name = w, h, (codec or self.codec_name or "h264") # 重启时重新检测设备能力 npu_ok = self._npu_available() if self.use_npu_cfg else False cuda_ok = self._cuda_available() if self.use_cuda_cfg else False self._select_accel(npu_ok=npu_ok, cuda_ok=cuda_ok) self._recompute_output_format_and_size() self.logger.info("New stream info: %dx%d codec=%s | Accel: NPU=%s CUDA=%s | out_pix=%s", self.width, self.height, self.codec_name, self.npu_enabled, self.cuda_enabled, self.output_pix_fmt) self._start_ffmpeg() self._start_reader() # ----------------------------- Utils ---------------------------------- # @staticmethod def _kill_proc(proc: sp.Popen): if proc and proc.poll() is None: try: if os.name == 'posix': os.killpg(os.getpgid(proc.pid), signal.SIGTERM) elif os.name == 'nt': proc.send_signal(signal.CTRL_BREAK_EVENT) except Exception: try: proc.terminate() except Exception: pass try: proc.wait(timeout=3) except sp.TimeoutExpired: try: proc.kill() except Exception: pass def _cuda_available(self) -> bool: try: out = sp.check_output( ['ffmpeg', '-hide_banner', '-hwaccels'], stderr=sp.STDOUT, timeout=3 ).decode(errors='ignore').lower() if 'cuda' not in out: return False except Exception: return False try: sp.check_output(['nvidia-smi', '-L'], stderr=sp.STDOUT, timeout=3) except Exception: return False return True def _npu_available(self) -> bool: """ 检测 BM NPU 能力: - codecs 里存在 h264_bm/hevc_bm - filters 里存在 scale_bm """ try: codecs = sp.check_output( ['ffmpeg', '-hide_banner', '-codecs'], stderr=sp.STDOUT, timeout=3 ).decode(errors='ignore').lower() if ('h264_bm' not in codecs) and ('hevc_bm' not in codecs): return False except Exception: return False try: filters = sp.check_output( ['ffmpeg', '-hide_banner', '-filters'], stderr=sp.STDOUT, timeout=3 ).decode(errors='ignore').lower() if 'scale_bm' not in filters: return False except Exception: return False return True @staticmethod def _init_logger(log_path: str): logger = logging.getLogger("FrameExtractor") if logger.handlers: return logger logger.setLevel(logging.INFO) handler = RotatingFileHandler( log_path, maxBytes=10 * 1024 * 1024, backupCount=5, encoding='utf-8' ) fmt = logging.Formatter( fmt="%(asctime)s %(levelname)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) handler.setFormatter(fmt) logger.addHandler(handler) console = logging.StreamHandler() console.setFormatter(fmt) logger.addHandler(console) return logger # ----------------------------- Demo ---------------------------------- # if __name__ == "__main__": RTSP = "rtsp://rtsp:newwater123@222.130.26.194:59371/streaming/channels/401" extractor = RTSPFrameExtractor( rtsp_url=RTSP, fps=1, use_npu=True, use_cuda=True, prefer="npu", width=1920, height=1080 ) try: while True: item = extractor.get_frame(timeout=2) if item is None: continue frame, ts = item cv2.putText(frame, f"Time: {ts:.3f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) cv2.imshow("RTSP Stream", frame) if cv2.waitKey(1) & 0xFF == ord('q'): break finally: extractor.close() cv2.destroyAllWindows()