| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519 |
- # -*- coding: utf-8 -*-
- """
- video_frame_extractor.py
- Python 3.8+
- 修复 NPU(BM) 抽帧“花屏/块状错位/灰块”的常见原因:pipe 实际输出像素格式不是 bgr24
- 或存在 stride/对齐问题。
- 改动要点(NPU 路径):
- 1) ffmpeg 输出改为 rawvideo + yuv420p(I420),避免 bgr24 在 BM 链路中不稳定
- 2) Python 按 yuv420p 每帧读取 w*h*3/2 字节,再用 OpenCV 转成 BGR ndarray
- 3) CPU/CUDA 仍然直接输出 bgr24(保持原行为)
- 同时保留:
- - 自动重连
- - 分辨率/codec 探测(按你给的特殊 ffprobe 解析)
- - NPU/CUDA 检测与 prefer 策略
- """
- import subprocess as sp
- import threading
- import queue
- import time
- import os
- import signal
- import logging
- from logging.handlers import RotatingFileHandler
- from typing import Optional, Tuple
- import numpy as np
- import cv2
- class RTSPFrameExtractor:
- """
- 高并发、自动重连、可选 CUDA / NPU(BM) 加速的 RTSP → Numpy 帧提取器
- prefer:
- - "npu": 优先 NPU,其次 CUDA,否则 CPU
- - "cuda": 优先 CUDA,其次 NPU,否则 CPU
- - "cpu": 强制 CPU
- """
- def __init__(self,
- rtsp_url: str,
- fps: int = 1,
- width: Optional[int] = 1280,
- height: Optional[int] = 720,
- queue_size: int = 200,
- no_frame_timeout: int = 300,
- log_path: str = "./logs/frame_extractor.log",
- use_cuda: bool = True,
- use_npu: bool = True,
- prefer: str = "npu",
- ):
- # ----------------- 基本参数 -----------------
- self.rtsp_url = rtsp_url
- self.fps = int(fps)
- self.width = width
- self.height = height
- self.codec_name: str = "" # h264/hevc/...
- self.output_pix_fmt: str = "bgr24" # NPU 会切到 yuv420p
- self.frame_size: int = 0 # bytes per frame (pipe)
- self.queue = queue.Queue(maxsize=queue_size)
- self.no_frame_timeout = int(no_frame_timeout)
- self.proc: Optional[sp.Popen] = None
- self.read_thread: Optional[threading.Thread] = None
- self.running = threading.Event()
- self.last_frame_ts = 0.0
- self.last_probe_ts = 0.0
- self._restart_lock = threading.Lock()
- # ----------------- 加速配置 -----------------
- self.use_cuda_cfg = bool(use_cuda)
- self.use_npu_cfg = bool(use_npu)
- self.prefer = (prefer or "npu").lower().strip()
- self.cuda_enabled = False
- self.npu_enabled = False
- # ----------------- 日志 -----------------
- log_dir = os.path.dirname(log_path)
- if log_dir and (not os.path.exists(log_dir)):
- os.makedirs(log_dir, exist_ok=True)
- if not os.path.exists(log_path):
- open(log_path, 'w').close()
- self.logger = self._init_logger(log_path)
- # ----------------- 检测加速能力 -----------------
- npu_ok = self._npu_available() if self.use_npu_cfg else False
- cuda_ok = self._cuda_available() if self.use_cuda_cfg else False
- # 按 prefer 选择
- self._select_accel(npu_ok=npu_ok, cuda_ok=cuda_ok)
- self.logger.info(
- "Accel decision: prefer=%s, npu_ok=%s, cuda_ok=%s -> NPU=%s CUDA=%s",
- self.prefer, npu_ok, cuda_ok, self.npu_enabled, self.cuda_enabled
- )
- # ----------------- 启动 -----------------
- self._bootstrap()
- # --------------------------------------------------------------------- #
- # PUBLIC API #
- # --------------------------------------------------------------------- #
- def get_frame(self, timeout: float = 1.0) -> Optional[Tuple[np.ndarray, float]]:
- """
- - 正常返回: (frame: np.ndarray[BGR], timestamp: float)
- - 超时 / 无帧: None
- """
- try:
- return self.queue.get(timeout=timeout)
- except queue.Empty:
- if time.time() - self.last_frame_ts > self.no_frame_timeout:
- self.logger.warning("No frame for %.1f sec, restarting...", self.no_frame_timeout)
- self._restart()
- return None
- def stop(self):
- self.running.clear()
- if self.read_thread and self.read_thread.is_alive():
- self.read_thread.join(timeout=2)
- if self.proc:
- self._kill_proc(self.proc)
- self.proc = None
- self.logger.info("RTSPFrameExtractor stopped.")
- def close(self):
- self.stop()
- # --------------------------------------------------------------------- #
- # INTERNAL #
- # --------------------------------------------------------------------- #
- def _select_accel(self, npu_ok: bool, cuda_ok: bool):
- if self.prefer == "npu":
- self.npu_enabled = bool(npu_ok)
- self.cuda_enabled = bool((not self.npu_enabled) and cuda_ok)
- elif self.prefer == "cuda":
- self.cuda_enabled = bool(cuda_ok)
- self.npu_enabled = bool((not self.cuda_enabled) and npu_ok)
- elif self.prefer == "cpu":
- self.cuda_enabled = False
- self.npu_enabled = False
- else:
- # 兜底:npu -> cuda -> cpu
- self.npu_enabled = bool(npu_ok)
- self.cuda_enabled = bool((not self.npu_enabled) and cuda_ok)
- def _bootstrap(self):
- if self.width is None or self.height is None or not self.codec_name:
- self.logger.info("Probing RTSP resolution/codec...")
- w, h, codec = self._probe_resolution_loop()
- if self.width is None:
- self.width = w
- if self.height is None:
- self.height = h
- self.codec_name = codec or "h264"
- self.logger.info("Got stream info: %dx%d codec=%s", self.width, self.height, self.codec_name)
- self._recompute_output_format_and_size()
- self._start_ffmpeg()
- self._start_reader()
- def _recompute_output_format_and_size(self):
- """
- 关键:NPU 走 yuv420p 输出,CPU/CUDA 走 bgr24 输出
- """
- if not self.width or not self.height:
- raise ValueError("width/height not set")
- if self.npu_enabled:
- # 为了稳定,NPU 输出 yuv420p (I420): size = w*h*3/2
- self.output_pix_fmt = "yuv420p"
- self.frame_size = int(self.width) * int(self.height) * 3 // 2
- else:
- self.output_pix_fmt = "bgr24"
- self.frame_size = int(self.width) * int(self.height) * 3
- # ----------------------------- probing -------------------------------- #
- def _probe_resolution_loop(self) -> Tuple[int, int, str]:
- while True:
- w, h, c = self._probe_once()
- if w and h:
- return w, h, (c or "h264")
- self.logger.warning("ffprobe failed, retry in 2s...")
- time.sleep(2)
- def _probe_once(self) -> Tuple[int, int, str]:
- """
- 你给的特殊 ffprobe 解析方式
- """
- cmd = [
- "ffprobe", "-v", "error", "-rtsp_transport", "tcp",
- "-select_streams", "v:0",
- "-show_entries", "stream=width,height,codec_name",
- "-of", "csv=p=0",
- self.rtsp_url
- ]
- try:
- process = sp.run(cmd, stdout=sp.PIPE, stderr=sp.STDOUT, timeout=10)
- output = process.stdout.decode('utf-8', errors='ignore')
- for line in output.splitlines():
- line = line.strip()
- if not line:
- continue
- ignore_keywords = ['interleave', 'TCP based', 'BMvid', 'libbmvideo', 'firmware', 'VERSION']
- if any(k in line for k in ignore_keywords):
- continue
- parts = [p.strip() for p in line.split(',') if p.strip()]
- if len(parts) < 2:
- continue
- nums = []
- codec_candidate = ""
- for p_str in parts:
- if p_str.isdigit():
- nums.append(int(p_str))
- elif len(p_str) < 20:
- codec_candidate = p_str.lower()
- if len(nums) >= 2:
- codec_candidate = codec_candidate or "h264"
- if codec_candidate in ("h265", "hevc"):
- codec_candidate = "hevc"
- elif codec_candidate in ("h264", "avc1", "avc"):
- codec_candidate = "h264"
- return nums[0], nums[1], codec_candidate
- except Exception as e:
- self.logger.error(f"ffprobe unexpected error: {e}")
- return 0, 0, ""
- # --------------------------- ffmpeg 管理 ------------------------------ #
- def _ffmpeg_cmd(self):
- """
- NPU(BM):
- -c:v h264_bm/hevc_bm
- -vf "scale_bm=...:format=yuv420p,fps=..."
- 输出:-f rawvideo -pix_fmt yuv420p -
- CPU/CUDA:
- 输出:-f rawvideo -pix_fmt bgr24 -
- """
- cmd = ['ffmpeg', '-loglevel', 'error', '-rtsp_transport', 'tcp']
- if self.npu_enabled:
- codec = (self.codec_name or "h264").lower()
- dec = "hevc_bm" if codec in ("hevc", "h265") else "h264_bm"
- vf_parts = []
- # 强制经过 scale_bm 并输出 yuv420p,减少 stride/格式不确定性
- vf_parts.append(f"scale_bm=w={int(self.width)}:h={int(self.height)}:format=yuv420p")
- vf_parts.append(f"fps={int(self.fps)}")
- vf = ",".join(vf_parts)
- cmd += ['-c:v', dec, '-i', self.rtsp_url, '-vf', vf, '-an',
- '-f', 'rawvideo', '-pix_fmt', 'yuv420p', '-']
- return cmd
- # CUDA(仅硬解,输出仍由 ffmpeg 转成 bgr24)
- if self.cuda_enabled:
- cmd += ['-hwaccel', 'cuda']
- cmd += ['-i', self.rtsp_url,
- '-vf', f'fps={int(self.fps)}',
- '-an',
- '-f', 'rawvideo',
- '-pix_fmt', 'bgr24',
- '-']
- return cmd
- def _start_ffmpeg(self):
- if self.proc:
- self._kill_proc(self.proc)
- self.logger.info("Starting ffmpeg... (NPU=%s CUDA=%s codec=%s out_pix=%s)",
- self.npu_enabled, self.cuda_enabled, self.codec_name, self.output_pix_fmt)
- self.logger.debug("ffmpeg cmd → %s", " ".join(self._ffmpeg_cmd()))
- kwargs = {}
- if os.name == 'posix':
- kwargs['preexec_fn'] = os.setsid
- elif os.name == 'nt':
- kwargs['creationflags'] = sp.CREATE_NEW_PROCESS_GROUP
- self.proc = sp.Popen(
- self._ffmpeg_cmd(),
- stdout=sp.PIPE,
- stderr=sp.PIPE,
- bufsize=self.frame_size * 10,
- **kwargs
- )
- self.last_frame_ts = time.time()
- self.last_probe_ts = time.time()
- # -------------------------- Reader Thread ----------------------------- #
- def _start_reader(self):
- self.running.set()
- self.read_thread = threading.Thread(target=self._reader_loop, daemon=True)
- self.read_thread.start()
- def _reader_loop(self):
- self.logger.info("Reader thread started.")
- w = int(self.width)
- h = int(self.height)
- while self.running.is_set():
- try:
- if not self.proc or not self.proc.stdout:
- time.sleep(0.1)
- continue
- raw = self.proc.stdout.read(self.frame_size)
- ts = time.time()
- except Exception as e:
- self.logger.error("Read error: %s", e)
- raw, ts = b'', time.time()
- if len(raw) != self.frame_size:
- self.logger.warning("Incomplete frame (%d/%d bytes).", len(raw), self.frame_size)
- # 进程退出则重启
- if self.proc and (self.proc.poll() is not None):
- self.logger.warning("ffmpeg exited with code=%s, restarting...", self.proc.returncode)
- self._restart()
- return
- time.sleep(0.05)
- continue
- # --------- 按输出像素格式解包 ---------
- if self.output_pix_fmt == "yuv420p":
- # I420: shape = (h*3/2, w)
- yuv = np.frombuffer(raw, np.uint8).reshape((h * 3 // 2, w))
- frame = cv2.cvtColor(yuv, cv2.COLOR_YUV2BGR_I420)
- else:
- frame = np.frombuffer(raw, np.uint8).reshape((h, w, 3)).copy()
- # 入队
- try:
- self.queue.put_nowait((frame, ts))
- except queue.Full:
- _ = self.queue.get_nowait()
- self.queue.put_nowait((frame, ts))
- self.last_frame_ts = time.time()
- # 每小时 probe 一次(分辨率/codec 变化就重启)
- if time.time() - self.last_probe_ts >= 3600:
- self.last_probe_ts = time.time()
- nw, nh, ncodec = self._probe_once()
- if nw and nh:
- changed = (nw != self.width or nh != self.height or (ncodec and ncodec != self.codec_name))
- if changed:
- self.logger.warning("Stream info changed %dx%d/%s → %dx%d/%s, restarting...",
- self.width, self.height, self.codec_name, nw, nh, ncodec)
- self.width, self.height = nw, nh
- self.codec_name = ncodec or self.codec_name
- self._recompute_output_format_and_size()
- self._restart()
- return
- self.logger.info("Reader thread exit.")
- # ----------------------------- Restart -------------------------------- #
- def _restart(self):
- with self._restart_lock:
- self.running.clear()
- if self.read_thread and self.read_thread.is_alive():
- self.read_thread.join(timeout=2)
- if self.proc:
- self._kill_proc(self.proc)
- self.proc = None
- self.logger.info("Restarting, probing resolution/codec...")
- w, h, codec = self._probe_resolution_loop()
- self.width, self.height, self.codec_name = w, h, (codec or self.codec_name or "h264")
- # 重启时重新检测设备能力
- npu_ok = self._npu_available() if self.use_npu_cfg else False
- cuda_ok = self._cuda_available() if self.use_cuda_cfg else False
- self._select_accel(npu_ok=npu_ok, cuda_ok=cuda_ok)
- self._recompute_output_format_and_size()
- self.logger.info("New stream info: %dx%d codec=%s | Accel: NPU=%s CUDA=%s | out_pix=%s",
- self.width, self.height, self.codec_name, self.npu_enabled, self.cuda_enabled, self.output_pix_fmt)
- self._start_ffmpeg()
- self._start_reader()
- # ----------------------------- Utils ---------------------------------- #
- @staticmethod
- def _kill_proc(proc: sp.Popen):
- if proc and proc.poll() is None:
- try:
- if os.name == 'posix':
- os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
- elif os.name == 'nt':
- proc.send_signal(signal.CTRL_BREAK_EVENT)
- except Exception:
- try:
- proc.terminate()
- except Exception:
- pass
- try:
- proc.wait(timeout=3)
- except sp.TimeoutExpired:
- try:
- proc.kill()
- except Exception:
- pass
- def _cuda_available(self) -> bool:
- try:
- out = sp.check_output(
- ['ffmpeg', '-hide_banner', '-hwaccels'],
- stderr=sp.STDOUT, timeout=3
- ).decode(errors='ignore').lower()
- if 'cuda' not in out:
- return False
- except Exception:
- return False
- try:
- sp.check_output(['nvidia-smi', '-L'], stderr=sp.STDOUT, timeout=3)
- except Exception:
- return False
- return True
- def _npu_available(self) -> bool:
- """
- 检测 BM NPU 能力:
- - codecs 里存在 h264_bm/hevc_bm
- - filters 里存在 scale_bm
- """
- try:
- codecs = sp.check_output(
- ['ffmpeg', '-hide_banner', '-codecs'],
- stderr=sp.STDOUT, timeout=3
- ).decode(errors='ignore').lower()
- if ('h264_bm' not in codecs) and ('hevc_bm' not in codecs):
- return False
- except Exception:
- return False
- try:
- filters = sp.check_output(
- ['ffmpeg', '-hide_banner', '-filters'],
- stderr=sp.STDOUT, timeout=3
- ).decode(errors='ignore').lower()
- if 'scale_bm' not in filters:
- return False
- except Exception:
- return False
- return True
- @staticmethod
- def _init_logger(log_path: str):
- logger = logging.getLogger("FrameExtractor")
- if logger.handlers:
- return logger
- logger.setLevel(logging.INFO)
- handler = RotatingFileHandler(
- log_path, maxBytes=10 * 1024 * 1024,
- backupCount=5, encoding='utf-8'
- )
- fmt = logging.Formatter(
- fmt="%(asctime)s %(levelname)s: %(message)s",
- datefmt="%Y-%m-%d %H:%M:%S"
- )
- handler.setFormatter(fmt)
- logger.addHandler(handler)
- console = logging.StreamHandler()
- console.setFormatter(fmt)
- logger.addHandler(console)
- return logger
- # ----------------------------- Demo ---------------------------------- #
- if __name__ == "__main__":
- RTSP = "rtsp://rtsp:newwater123@222.130.26.194:59371/streaming/channels/401"
- extractor = RTSPFrameExtractor(
- rtsp_url=RTSP,
- fps=1,
- use_npu=True,
- use_cuda=True,
- prefer="npu",
- width=1920,
- height=1080
- )
- try:
- while True:
- item = extractor.get_frame(timeout=2)
- if item is None:
- continue
- frame, ts = item
- cv2.putText(frame, f"Time: {ts:.3f}", (10, 30),
- cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
- cv2.imshow("RTSP Stream", frame)
- if cv2.waitKey(1) & 0xFF == ord('q'):
- break
- finally:
- extractor.close()
- cv2.destroyAllWindows()
|