两种方式让你用Python轻松在RDKX5上部署推理

作者:SkyXZ

CSDN:SkyXZ~-CSDN博客

博客园:SkyXZ - 博客园

宿主机环境:WSL2-Ubuntu22.04+Cuda12.6、D-Robotics-OE 1.2.8、Ubuntu20.04 GPU Docker

端侧设备环境:RDK X5-Server-3.1.0

        2025年随着RDK X5的发布,地瓜官方随之也开放了RDK系列DNN推理部署的Python版的API进一步降低了使用X5的门槛,上一篇文章我介绍了C++版的API,这一篇文章我将介绍一下如何使用Python在我们的RDK X5上部署推理代码,地瓜的算法工具链的安装、模型的量化及C++端部署的教程在上一篇已经写了具体请参照:学弟一看就会的RDKX5模型转换及部署,你确定不学? - SkyXZ - 博客园

        地瓜这次释放的Python推理API可以分为两种版本,一种版本为BSP,使用hobot_dnn包,这是地瓜BSP(Board Support Package)提供的底层API,可以给用户提供更底层的控制,虽然需要手动实现更多的预处理和后处理步骤,但是可以更精细地控制内存和计算资源,适合需要深度优化或自定义处理的场景;另外一种版本为ModelZoo,使用bpu_infer_lib,这是地瓜ModelZoo提供的高级封装API适合用来进行快速验证和部署,其API更加简洁,使用也更方便,这个包提供更完整的预处理和后处理功能,包含了很多内置的辅助函数且模型加载和推理过程更简化,我们将在此结合手册手把手介绍这两种推理方式的使用

一、ModelZoo——bpu_infer_lib

        我们先从最简单最易用的包开始讲起叭,根据这个包中的.cpython-310-aarch64-linux-gnu.so文件可以知道bpu_infer_lib模块是为Python 3.10编译的,因此我们首先要确保我们的Python环境是3.10的版本,在安装完环境之后我们便可以从地瓜官方仓库下载bpu_infer_lib包啦(其他包如:OpenCV默认大家已经自行安装),具体命令如下大家可以根据自己的设备(X3 or X5)来选择下载:

# RDK X5 Python3.10 pip install bpu_infer_lib_x5 -i http://archive.d-robotics.cc/simple/ --trusted-host archive.d-robotics.cc # RDK X3 pip install bpu_infer_lib_x3 -i http://archive.d-robotics.cc/simple/ --trusted-host archive.d-robotics.cc 

        安装完之后我们先来查看地瓜官方的API手册,根据手册的介绍ModelZoo的推理API为一个Infer类,而这个类里面一共只有四个用法,分别是load_modelread_inputforwardget_output用于加载模型、读取输入、推理以及获取输出,整个推理的步骤使用起来非常的简单,接下来我将手把手带着大家结合API以及官方在ModelZoo中给出的jupyter_ModelZoo_YOLOv5.ipynb示例代码来从零部署一遍推理部分,首先我们先导入一些必要的包:

import cv2 import numpy as np from scipy.special import softmax from scipy.special import expit as sigmoid from time import time import bpu_infer_lib  # Model Zoo Python API from typing import Tuple import os 

(1)封装BPU_Detect类及初始化

        我们还是使用类的形式来实现我们的检测,因此我们还是来创建我们的BPU_Detect类,我们在这个类里首先将常见参数进行初始化,由于初始化特征图网络只需要加载一次因此我们创建了一个私有的_init_grids用于加载特征图,同时我们依旧按照前处理、推理、后处理的推理流程来对应创建了三个函数PreProcessdetectPostPrecess以及一系列的辅助函数:bgr2nv12_opencvdraw_detection和结果处理函数detect_result

class BPU_Detect:     def __init__(self, labelnames, model_path, conf, iou, anchors, strides, mode, is_save):# 初始化类     def _init_grids(self) : # 初始化特征图网格函数     def bgr2nv12_opencv(self, image): # OpenCV-BGR图像转推理NV12数据     def PreProcess(self, img): # 预处理函数用于图像预处理 	def PostPrecess(self, method): # 后处理函数     def draw_detection(self, img, box, score, class_id, labelname): # 结果可视化绘制函数     def detect_result(self, img): # 推理结果处理函数     def detect(self, img_path, method_pre, method_post): # 推理函数 

        在BPU_Detect类中我们首先初始化我们需要的所有参数比如我们最基本的模型路径、标签列表以及标签数量和置信度等参数,由于我们的特征图只需要初始化一次,因此我们便将特征图初始化函数添加至类的初始化中,在每次继承类的时候便完成特征图的初始化,最后我们继承bpu_infer_lib.Infer类,并直接完成模型的加载load_model

class BPU_Detect:     def __init__(self, model_path:str,                     labelnames:list,                     num_classes:int = None,                     conf:float = 0.45,                     iou:float = 0.45,                     anchors:np.array = np.array([                         [10,13, 16,30, 33,23],  # P3/8                         [30,61, 62,45, 59,119],  # P4/16                         [116,90, 156,198, 373,326],  # P5/32                        ]),                     strides = np.array([8, 16, 32]),                     mode:bool = False,                     is_save:bool = False                 			):         self.model = model_path         self.labelname = labelnames         self.conf = conf         self.iou = iou         self.anchors = anchors         self.strides = strides         self.input_w = 640         self.input_h = 640         self.nc = num_classes if num_classes is not None else len(self.labelname)         self.mode = mode         self.is_save = is_save         self._init_grids()                  self.inf = bpu_infer_lib.Infer(self.mode)         self.inf.load_model(self.model) 

(2)完成_init_grid()特征图及锚框生成函数

        接着便要来完成我们的_init_grids函数了,特征图大家应该不会陌生啦,在Yolov5中,特征图的生成与不同尺度的网格和锚框紧密相关,每个尺度的特征图通过步幅(strides)来计算出不同大小的网格,这些网格点会映射到输入图像的对应位置,通过这种方式,模型便能够对输入图像进行多尺度的目标检测,具体而言,我们需要根据输入图像的宽度和高度,结合每个尺度的步幅,生成相应的网格坐标,同时,为每个网格点分配合适的锚框,这样之后网络便可以更好地预测图像上物体的位置和大小。而在这里,我们使用np.tilenp.linspace来生成每个尺度的网格坐标,通过步幅对输入图像进行划分,计算出每个网格中心的位置,比如在生成小尺度网格时,我们使用以下代码:

self.s_grid = np.stack([np.tile(np.linspace(0.5, self.input_w//self.strides[0] - 0.5, self.input_w//self.strides[0]),                  	reps=self.input_h//self.strides[0]),np.repeat(np.arange(0.5, self.input_h//self.strides[0] + 0.5,                     1),self.input_w//self.strides[0])], axis=0).transpose(1,0) 

        这将计算出一个大小适应当前尺度的网格,其中np.linspace会生成从0.5到输入宽度除以步幅的均匀间隔值,np.repeat则生成每行的纵坐标,从而确保生成的网格覆盖整个特征图。这些网格的坐标随后会被平铺并重新组织成形状为(-1, 2)的矩阵,以便每个网格点对应一个特定的空间位置,为后续的锚框匹配和目标定位做好准备,接着我们使用以下代码将先前计算出的网格坐标(self.s_grid)进行扩展,使用np.hstack来将原本的网格坐标数组沿水平方向(即列方向)拼接三次用于给每个网格点重复分配多个坐标值:

self.s_grid = np.hstack([self.s_grid, self.s_grid, self.s_grid]).reshape(-1, 2) 

        在完成了网格的生成之后我们接着来完成锚框的分配,还是以小尺度网格为例,self.anchors[0] 是第一尺度的锚框尺寸,我们通过 np.tile 将其按网格点数量进行重复,确保每个网格点都有一个对应的锚框之后我们使用 .reshape(-1, 2) 将重复后的锚框数组重塑成每行包含两个值(宽度和高度)的形状,这样之后我们最终得到的 self.s_anchors 便是为第一尺度的特征图每个网格点分配的锚框集合:

self.s_anchors = np.tile(self.anchors[0], self.input_w//self.strides[0] * self.input_h//self.strides[0]).reshape(-1, 2) 

        不同的尺度的grid网格和anchors锚框完整计算代码如下:

# strides的grid网格 self.s_grid = np.stack([np.tile(np.linspace(0.5, self.input_w//self.strides[0] - 0.5, self.input_w//self.strides[0]),                  	reps=self.input_h//self.strides[0]),np.repeat(np.arange(0.5, self.input_h//self.strides[0] + 0.5,                     1),self.input_w//self.strides[0])], axis=0).transpose(1,0) self.s_grid = np.hstack([self.s_grid, self.s_grid, self.s_grid]).reshape(-1, 2)  self.m_grid = np.stack([np.tile(np.linspace(0.5, self.input_w//self.strides[1] - 0.5, self.input_w//self.strides[1]),                  	reps=self.input_h//self.strides[1]),np.repeat(np.arange(0.5, self.input_h//self.strides[1] + 0.5,                     1),self.input_w//self.strides[1])], axis=0).transpose(1,0) self.m_grid = np.hstack([self.m_grid, self.m_grid, self.m_grid]).reshape(-1, 2)  self.l_grid = np.stack([np.tile(np.linspace(0.5, self.input_w//self.strides[2] - 0.5, self.input_w//self.strides[2]),                  	reps=self.input_h//self.strides[2]),np.repeat(np.arange(0.5, self.input_h//self.strides[2] + 0.5,                     1),self.input_w//self.strides[2])], axis=0).transpose(1,0) self.l_grid = np.hstack([self.l_grid, self.l_grid, self.l_grid]).reshape(-1, 2) # 用于广播的anchors self.s_anchors = np.tile(self.anchors[0], self.input_w//self.strides[0] * self.input_h//self.strides[0]).reshape(-1, 2) self.m_anchors = np.tile(self.anchors[1], self.input_w//self.strides[1] * self.input_h//self.strides[1]).reshape(-1, 2) self.l_anchors = np.tile(self.anchors[2], self.input_w//self.strides[2] * self.input_h//self.strides[2]).reshape(-1, 2) 

        但是如果我们按照这样的写法的话代码的就有些许丑陋,我们优化一下通过将重复的网格生成和锚框分配逻辑提取成一个单独的函数 _create_grid,这样可以避免重复代码的冗余,同时使得每个尺度的网格和锚框的生成更加清晰易懂,我们只需要传入步幅(stride)参数,_create_grid 函数便可以灵活地为不同尺度的特征图生成对应的网格和锚框,因此我们的_init_grids函数完整代码如下:

def _init_grids(self) :     """初始化特征图网格"""     def _create_grid(stride: int) :         """创建单个stride的网格和anchors"""         grid = np.stack([             np.tile(np.linspace(0.5, self.input_w//stride - 0.5, self.input_w//stride),                     reps=self.input_h//stride),             np.repeat(np.arange(0.5, self.input_h//stride + 0.5, 1),                       self.input_w//stride)         ], axis=0).transpose(1,0)         grid = np.hstack([grid] * 3).reshape(-1, 2)          anchors = np.tile(             self.anchors[int(np.log2(stride/8))],              self.input_w//stride * self.input_h//stride         ).reshape(-1, 2)          return grid, anchors      # 创建不同尺度的网格     self.s_grid, self.s_anchors = _create_grid(self.strides[0])     self.m_grid, self.m_anchors = _create_grid(self.strides[1])      self.l_grid, self.l_anchors = _create_grid(self.strides[2])      print(f"网格尺寸: {self.s_grid.shape = }  {self.m_grid.shape = }  {self.l_grid.shape = }")     print(f"Anchors尺寸: {self.s_anchors.shape = }  {self.m_anchors.shape = }  {self.l_anchors.shape = }") 

(3)完成bgr2nv12_opencv()函数

        我们现在开始完成我们的工具函数bgr2nv12_opencv(),由于我们的BPU需要输入NV12格式的图像才能进行推理,因此我们这个函数主要用于将OpenCV读取的BGR格式的图像转换为BPU可以使用的NV12格式,NV12格式大家应该也不陌生,他是一种YUV格式,其中Y分量平面紧接在一起而UV分量则是交替排列,因此我们首先使用OpenCV将BGR图像转换为YUV420P格式,接着我们提取Y和UV分量并转换为符合NV12格式的结构即可,因此完整代码如下:

def bgr2nv12_opencv(self, image):     height, width = image.shape[0], image.shape[1]     area = height * width     yuv420p = cv2.cvtColor(image, cv2.COLOR_BGR2YUV_I420).reshape((area * 3 // 2,))     y = yuv420p[:area] # Y分量:前area个元素     uv_planar = yuv420p[area:].reshape((2, area // 4)) # UV分量:后面的元素,每2个元素分别为U和V分量     uv_packed = uv_planar.transpose((1, 0)).reshape((area // 2,)) # 将UV分量交替排列为交错的UV格式     nv12 = np.zeros_like(yuv420p) # 创建与原YUV数据形状相同的空数组用于存放NV12格式数据     nv12[:height * width] = y # 将Y分量直接赋值到NV12数组的前部     nv12[height * width:] = uv_packed  # 将交错的UV分量赋值到NV12数组的后部     return  nv12 

(4)完成PreProcess()预处理函数

        我们现在开始实现我们的预处理函数部分啦,预处理其实无非就两步,只需要修改输入图像的尺寸以及将输入图像的数据格式转换为NV12即可,接着我们查看用户手册中传入图像数据的read_input函数的介绍可以知道这个函数会自动校验我们输入的输入数据的尺寸和类型,因此,我们只需要将图像的尺寸调整符合目标大小,再将其转换为BPU所需要的NV12格式后,便可以通过 read_input 函数将数据传入模型啦

两种方式让你用Python轻松在RDKX5上部署推理

        因此我们读取图片后首先将输入图片resize到我们所需要的640上,接着使用我们先前定义的bgr2nv12_opencv()函数将输入图片的格式转化为NV12,最后调用API将准备好的数据输入进模型即可,具体代码如下:

orig_img = cv2.imread(img) # 读取图片 if orig_img is None:     raise ValueError(f"无法读取图片: {img}") input_tensor = cv2.resize(orig_img, (self.input_h, self.input_w)) # 将图片缩放到需求尺寸 input_tensor = self.bgr2nv12_opencv(input_tensor) # 转换图片格式 self.inf.read_input(input_tensor, 0) # 输入图像 

        虽然按照上述操作我们便可以完成预处理部分,但是!!!由于这个库是使用的 pybind11 封装的 C++ 库,我们在终端中打开Python环境导入bpu_infer_lib后我们使用help函数来打印出这个库的用法会发现关于输入这块,除了官方手册中提到的read_input,还暴露出了其他可用的接口函数:

(xq) root@ubuntu:~# python Python 3.10.16 (main, Dec 11 2024, 16:18:56) [GCC 11.2.0] on linux Type "help", "copyright", "credits" or "license" for more information. >>> import bpu_infer_lib >>> help(bpu_infer_lib) 

两种方式让你用Python轻松在RDKX5上部署推理

        我们在这里以其中一个read_img_to_nv12API为例子继续介绍,其他的函数大家可以自行探索或者是等地瓜官方完善手册后再使用。read_img_to_nv12函数的使用比read_input还简单,因为这个函数输入图像后会自动帮我们将图像转换为NV12格式,因此我们的代码可以如下所示更加简单,只需要用imread读取图像后直接传入这个函数即可:

orig_img = cv2.imread(img) # 读取图片 if orig_img is None:     raise ValueError(f"无法读取图片: {img}") img_h, img_w = orig_img.shape[0:2] self.inf.read_img_to_nv12(img, 0)# 使用API的方法直接读取 

        至此,我们便完成了PreProcess预处理函数,我们加上一些细节,以及图像读取API的选择后完整代码如下:

def PreProcess(self, img, method=0):     """     预处理函数     Args:         img: 输入图片路径         method: 选择使用的方法             - 0: 使用read_input             - 1: 使用read_img_to_nv12     """     # 获取原始图片和尺寸     if isinstance(img, str):         # 输入是图片路径         if method == 1:             # 先获取原始图片尺寸             orig_img = cv2.imread(img)             if orig_img is None:                 raise ValueError(f"无法读取图片: {img}")             img_h, img_w = orig_img.shape[0:2]             self.inf.read_img_to_nv12(img, 0)# 使用API的方法直接读取         elif method == 0:             # method == 0,读取图片后处理             orig_img = cv2.imread(img)             if orig_img is None:                 raise ValueError(f"无法读取图片: {img}")             input_tensor = cv2.resize(orig_img, (self.input_h, self.input_w))             input_tensor = self.bgr2nv12_opencv(input_tensor)             self.inf.read_input(input_tensor, 0)             img_h, img_w = orig_img.shape[0:2]     else:         print("输入格式有误")         return False      # 计算缩放比例     self.y_scale = img_h / self.input_h     self.x_scale = img_w / self.input_w      print(f"原始尺寸: {img_w}x{img_h}, 输入尺寸: {self.input_w}x{self.input_h}")     print(f"缩放比例: x_scale={self.x_scale}, y_scale={self.y_scale}")      return True 

(5)完成PostProcess()后处理函数

        完成了预处理后我们接着来完成后处理部分,后处理部分也无非就是几步,首先我们获取到模型的输出之后经过阈值处理将输出转换回我们可以理解的格式,接着根据输出的类别、位置等信息,进行非极大值抑制(NMS),去除冗余的框即完成整个后处理的流程。我们依旧翻开官方的API手册可以看到手册上给了我们get_output函数用来获取模型的输出,我们可以看到我们只需要使用这个函数便可以从outputs中获取classes_scoresbboxes

两种方式让你用Python轻松在RDKX5上部署推理

        我们首先使用get_output函数来获取输出,接着使用classes_scoresbboxes来获取结果,我们通过print这两个结果的维度可以发现classes_scores的形状为(1, 80, 80, 18),这意味着网络的输出是一个大小为80x80的网格,而每个网格点(即每个像素)有18个输出值,分别与每个锚框的6个参数(4个框坐标、1个物体得分、1个类别得分)相关,而bboxes的形状为(1, 40, 40, 18),这意味着bboxes的形状为(1, 40, 40, 18),这意味着网络的另一个输出是一个大小为40x40的网格,每个网格点同样有18个输出值,这些值也与3个锚框的6个参数相关

if not self.inf.get_output():     raise RuntimeError("获取输出失败") classes_scores = self.inf.outputs[0].data  # (1, 80, 80, 18) bboxes = self.inf.outputs[1].data         # (1, 40, 40, 18) 

        接着我们便可以从classes_scores中提取每个锚框的预测结果,我们设定num_anchors = 3pred_per_anchor = 6分别对应3个锚框和6个参数,初始化3个列表,scores_listboxes_listids_list,用来保存每个网格点经过筛选后的预测框、分数和类别ID

batch, height, width, channels = classes_scores.shape num_anchors = 3 pred_per_anchor = 6 # 提取每个anchor的预测 scores_list = [] boxes_list = [] ids_list = [] 

        之后我们便开始遍历每个网格点,并针对每个锚框提取其相应的预测值(框坐标、物体得分、类别得分)通过sigmoid激活函数计算得分,如果得分超过设定的阈值(score >= self.conf),便将框坐标进行解码,并转换为xyxy格式

# 处理每个网格点 for h in range(height):     for w in range(width):         for a in range(num_anchors):             # 获取当前anchor的预测值             start_idx = int(a * pred_per_anchor)  # 确保索引是整数             box = classes_scores[0, h, w, start_idx:start_idx+4].copy()  # 框坐标             obj_score = float(classes_scores[0, h, w, start_idx+4])      # objectness             cls_score = float(classes_scores[0, h, w, start_idx+5])      # 类别分数             # sigmoid激活             obj_score = 1 / (1 + np.exp(-obj_score))             cls_score = 1 / (1 + np.exp(-cls_score))             score = obj_score * cls_score             # 如果分数超过阈值,保存这个预测             if score >= self.conf:                 # 解码框坐标                 box = 1 / (1 + np.exp(-box))  # sigmoid                 cx = float((box[0] * 2.0 + w - 0.5) * self.strides[0])                 cy = float((box[1] * 2.0 + h - 0.5) * self.strides[0])                 w_pred = float((box[2] * 2.0) ** 2 * self.anchors[0][a*2])                 h_pred = float((box[3] * 2.0) ** 2 * self.anchors[0][a*2+1])                 # 转换为xyxy格式                 x1 = cx - w_pred/2                 y1 = cy - h_pred/2                 x2 = cx + w_pred/2                 y2 = cy + h_pred/2                 boxes_list.append([x1, y1, x2, y2])                 scores_list.append(float(score))  # 确保是标量                 ids_list.append(0)  # 假设只有一个类别 

        如果有检测结果,说明在遍历完所有网格点并进行筛选后,我们有符合条件的框,此时,我们将保存的框坐标列表boxes_list、分数列表scores_list和类别ID列表ids_list转化为NumPy数组,并确保它们的数据类型分别为np.float32np.int32。转换后的数组xyxy包含框的坐标信息,scores保存每个框的得分,ids保存类别标签。而如果没有检测结果,则说明所有框的得分都低于阈值,此时我们创建空的NumPy数组,xyxy的形状为(0, 4)表示没有框,scoresids也是空数组,分别表示没有得分和类别ID。这样,我们可以保证后续的代码在处理检测结果时不会出错,避免空列表带来的问题

# 如果有检测结果 if boxes_list:     xyxy = np.array(boxes_list, dtype=np.float32)     scores = np.array(scores_list, dtype=np.float32)     ids = np.array(ids_list, dtype=np.int32) else:     xyxy = np.array([], dtype=np.float32).reshape(0, 4)     scores = np.array([], dtype=np.float32)     ids = np.array([], dtype=np.int32) 

        最后在完成框筛选和得分计算后,我们使用OpenCV的cv2.dnn.NMSBoxes函数进行非极大值抑制(NMS),这个函数的介绍如下:

cv2.dnn.NMSBoxes(boxes, scores, score_threshold, nms_threshold, eta=1.0, top_k=0) # [boxes]  float 类型的数组,形状为 [num_boxes, 4],表示边界框的坐标。每个边界框由其左上角和右下角的坐标 [x, y, w, h] 表示。(x,y)为左上角坐标 # [scores] float 类型的数组,形状为 [num_boxes],表示每个边界框的置信度分数 # [score_threshold] float 类型,用于过滤低于这个分数阈值的边界框 # [nms_threshold] float 类型,用于确定哪些重叠的边界框应该被保留。如果两个边界框的重叠面积大于 nms_threshold,则其中一个边界框将被丢弃 # [eta] float 类型,默认值为 1,用于自适应调整 NMS 阈值。如果设置为小于 1 的值,则 NMS 阈值会随着迭代的进行而减小,这有助于在迭代过程中保留更多的边界框 # [top_k] int 类型,表示保留的最大边界框数量。默认值为 0,表示保留所有边界框 

        这个NMSBoxes函数将会返回一个indices,它是一个包含保留下来的框的索引的列表,我们通过检查indices的长度,如果大于零,表示有框通过了NMS,我们根据这些索引从原始的xyxyscoresids中提取对应的框坐标、得分和类别ID,然后,将框坐标进行缩放(根据输入图像与原始图像的比例),并将它们转换为np.int32类型。而如果indices为空,表示没有框通过NMS,这时我们便将bboxesscoresids设置为空数组,表示没有有效的检测结果

# NMS处理 indices = cv2.dnn.NMSBoxes(xyxy.tolist(), scores.tolist(), self.conf, self.iou) if len(indices) > 0:     indices = np.array(indices).flatten()     self.bboxes = (xyxy[indices] * np.array([self.x_scale, self.y_scale, self.x_scale, self.y_scale])).astype(np.int32)     self.scores = scores[indices]     self.ids = ids[indices] else:     print("No detections after NMS")     self.bboxes = np.array([], dtype=np.int32).reshape(0, 4)     self.scores = np.array([], dtype=np.float32)     self.ids = np.array([], dtype=np.int32) 

        和输入图像类似,bpu_infer_lib库中也提供了其他可用的输出获取接口,而ModelZoo中的官方示例代码jupyter_ModelZoo_YOLOv5.ipynb中使用的也不是get_output,而是get_infer_res_np_float32接口,因此我们接下来以官方的代码为例子介绍这个API

两种方式让你用Python轻松在RDKX5上部署推理

        之前介绍的get_output函数返回的是已经经过处理之后的classes_scoresbboxes,但是get_infer_res_np_float32输出的是未经处理的原始推理结果这些原始输出包含更基础的信息,例如每个网格点的所有锚框的回归值、置信度得分以及类别得分等,因此我们首先需要从get_infer_res_np_float32函数获取分别对应不同尺度的特征图的预测输出:s_predm_predl_pred,接着我们将每个尺度的输出通过reshape操作转化为一维数组,其中每个元素包含5个回归值(4个边界框坐标和1个置信度)以及类别得分部分,这样我们就得到了每个锚框的预测信息

# 获取不同尺度的特征图的预测输出 s_pred = self.inf.get_infer_res_np_float32(0) m_pred = self.inf.get_infer_res_np_float32(1) l_pred = self.inf.get_infer_res_np_float32(2) # reshape s_pred = s_pred.reshape([-1, (5 + self.nc)]) m_pred = m_pred.reshape([-1, (5 + self.nc)]) l_pred = l_pred.reshape([-1, (5 + self.nc)]) 

        接下来,我们使用numpy向量化操作进行阈值筛选,首先计算每个锚框的最大类别得分,并将物体置信度与最大类别得分进行组合,得到每个锚框的综合得分,我们以小尺寸s为例子进行介绍:

        首先,我们通过np.max函数便可以计算出每个锚框的最大类别得分s_raw_max_scores,这个里面保存的是每个锚框在所有类别中的最大值,然后,我们对物体置信度(s_pred[:, 4])和最大类别得分进行sigmoid激活,便可以进一步得到更直观的综合得分s_max_scores

s_raw_max_scores = np.max(s_pred[:, 5:], axis=1) s_max_scores = 1 / ((1 + np.exp(-s_pred[:, 4])) * (1 + np.exp(-s_raw_max_scores))) 

        接着,我们使用np.flatnonzero方法筛选出得分大于等于设定阈值(self.conf)的锚框索引,保证只有那些置信度较高的预测会被保留,最后我们利用np.argmax找到每个筛选后的锚框对应的最大类别ID,确保我们能够确定每个有效锚框所代表的物体类别

s_valid_indices = np.flatnonzero(s_max_scores >= self.conf) s_ids = np.argmax(s_pred[s_valid_indices, 5:], axis=1) s_scores = s_max_scores[s_valid_indices] 

        而对于其他尺寸的完整代码如下:

# classify: 利用numpy向量化操作完成阈值筛选 s_raw_max_scores = np.max(s_pred[:, 5:], axis=1) s_max_scores = 1 / ((1 + np.exp(-s_pred[:, 4]))*(1 + np.exp(-s_raw_max_scores))) s_valid_indices = np.flatnonzero(s_max_scores >= self.conf) s_ids = np.argmax(s_pred[s_valid_indices, 5:], axis=1) s_scores = s_max_scores[s_valid_indices]  m_raw_max_scores = np.max(m_pred[:, 5:], axis=1) m_max_scores = 1 / ((1 + np.exp(-m_pred[:, 4]))*(1 + np.exp(-m_raw_max_scores))) m_valid_indices = np.flatnonzero(m_max_scores >= self.conf) m_ids = np.argmax(m_pred[m_valid_indices, 5:], axis=1) m_scores = m_max_scores[m_valid_indices]  l_raw_max_scores = np.max(l_pred[:, 5:], axis=1) l_max_scores = 1 / ((1 + np.exp(-l_pred[:, 4]))*(1 + np.exp(-l_raw_max_scores))) l_valid_indices = np.flatnonzero(l_max_scores >= self.conf) l_ids = np.argmax(l_pred[l_valid_indices, 5:], axis=1) l_scores = l_max_scores[l_valid_indices] 

        在获得了经过阈值筛选的锚框之后,我们便需要进行特征解码,将网络输出的回归值(如中心坐标、宽高等)转化为实际的边界框坐标,我们还是以小尺度的s为例子,我们先通过sigmoid函数对每个有效锚框的回归值进行处理,得到位置和尺寸的偏移量也就是锚框回归值s_dxyhw,其包含包括锚框的中心坐标(dx, dy)和宽高(dw, dh),接着我们使用网格位置self.s_grid和步幅self.strides[0]便可以来还原边界框的真实位置

s_dxyhw = 1 / (1 + np.exp(-s_pred[s_valid_indices, :4])) s_xy = (s_dxyhw[:, 0:2] * 2.0 + self.s_grid[s_valid_indices,:] - 1.0) * self.strides[0] s_wh = (s_dxyhw[:, 2:4] * 2.0) ** 2 * self.s_anchors[s_valid_indices, :] s_xyxy = np.concatenate([s_xy - s_wh * 0.5, s_xy + s_wh * 0.5], axis=-1) 

        而对于其他尺寸的完整代码如下:

# 特征解码 s_dxyhw = 1 / (1 + np.exp(-s_pred[s_valid_indices, :4])) s_xy = (s_dxyhw[:, 0:2] * 2.0 + self.s_grid[s_valid_indices,:] - 1.0) * self.strides[0] s_wh = (s_dxyhw[:, 2:4] * 2.0) ** 2 * self.s_anchors[s_valid_indices, :] s_xyxy = np.concatenate([s_xy - s_wh * 0.5, s_xy + s_wh * 0.5], axis=-1)  m_dxyhw = 1 / (1 + np.exp(-m_pred[m_valid_indices, :4])) m_xy = (m_dxyhw[:, 0:2] * 2.0 + self.m_grid[m_valid_indices,:] - 1.0) * self.strides[1] m_wh = (m_dxyhw[:, 2:4] * 2.0) ** 2 * self.m_anchors[m_valid_indices, :] m_xyxy = np.concatenate([m_xy - m_wh * 0.5, m_xy + m_wh * 0.5], axis=-1)  l_dxyhw = 1 / (1 + np.exp(-l_pred[l_valid_indices, :4])) l_xy = (l_dxyhw[:, 0:2] * 2.0 + self.l_grid[l_valid_indices,:] - 1.0) * self.strides[2] l_wh = (l_dxyhw[:, 2:4] * 2.0) ** 2 * self.l_anchors[l_valid_indices, :] l_xyxy = np.concatenate([l_xy - l_wh * 0.5, l_xy + l_wh * 0.5], axis=-1) 

        最后我们将大中小特征层阈值筛选结果拼接即可得到最终的边界框、得分和类别信息,然后我们将这些信息通过get_output函数一样的NMS部分即可得到我们需要的检测结果

xyxy = np.concatenate((s_xyxy, m_xyxy, l_xyxy), axis=0) scores = np.concatenate((s_scores, m_scores, l_scores), axis=0) ids = np.concatenate((s_ids, m_ids, l_ids), axis=0) 

        至此,我们便完成了PostProcess预处理函数,我们加上一些细节以及输出获取方式后的完整代码如下:

def PostProcess(self, method=1):     """     后处理函数     Args:         method: 选择使用的方法             - '0': 使用get_output             - '1': 使用get_infer_res_np_float32     """     if method == 1 :         # 方法1:使用get_infer_res_np_float32获取原始输出并处理         print("n=== 方法1: 使用get_infer_res_np_float32 ===")         s_pred = self.inf.get_infer_res_np_float32(0)         m_pred = self.inf.get_infer_res_np_float32(1)         l_pred = self.inf.get_infer_res_np_float32(2)         print(f"原始输出: {s_pred.shape = }  {m_pred.shape = }  {l_pred.shape = }")          # reshape         s_pred = s_pred.reshape([-1, (5 + self.nc)])         m_pred = m_pred.reshape([-1, (5 + self.nc)])         l_pred = l_pred.reshape([-1, (5 + self.nc)])         print(f"Reshape后: {s_pred.shape = }  {m_pred.shape = }  {l_pred.shape = }")          # classify: 利用numpy向量化操作完成阈值筛选         s_raw_max_scores = np.max(s_pred[:, 5:], axis=1)         s_max_scores = 1 / ((1 + np.exp(-s_pred[:, 4]))*(1 + np.exp(-s_raw_max_scores)))         s_valid_indices = np.flatnonzero(s_max_scores >= self.conf)         s_ids = np.argmax(s_pred[s_valid_indices, 5:], axis=1)         s_scores = s_max_scores[s_valid_indices]          m_raw_max_scores = np.max(m_pred[:, 5:], axis=1)         m_max_scores = 1 / ((1 + np.exp(-m_pred[:, 4]))*(1 + np.exp(-m_raw_max_scores)))         m_valid_indices = np.flatnonzero(m_max_scores >= self.conf)         m_ids = np.argmax(m_pred[m_valid_indices, 5:], axis=1)         m_scores = m_max_scores[m_valid_indices]          l_raw_max_scores = np.max(l_pred[:, 5:], axis=1)         l_max_scores = 1 / ((1 + np.exp(-l_pred[:, 4]))*(1 + np.exp(-l_raw_max_scores)))         l_valid_indices = np.flatnonzero(l_max_scores >= self.conf)         l_ids = np.argmax(l_pred[l_valid_indices, 5:], axis=1)         l_scores = l_max_scores[l_valid_indices]          # 特征解码         s_dxyhw = 1 / (1 + np.exp(-s_pred[s_valid_indices, :4]))         s_xy = (s_dxyhw[:, 0:2] * 2.0 + self.s_grid[s_valid_indices,:] - 1.0) * self.strides[0]         s_wh = (s_dxyhw[:, 2:4] * 2.0) ** 2 * self.s_anchors[s_valid_indices, :]         s_xyxy = np.concatenate([s_xy - s_wh * 0.5, s_xy + s_wh * 0.5], axis=-1)          m_dxyhw = 1 / (1 + np.exp(-m_pred[m_valid_indices, :4]))         m_xy = (m_dxyhw[:, 0:2] * 2.0 + self.m_grid[m_valid_indices,:] - 1.0) * self.strides[1]         m_wh = (m_dxyhw[:, 2:4] * 2.0) ** 2 * self.m_anchors[m_valid_indices, :]         m_xyxy = np.concatenate([m_xy - m_wh * 0.5, m_xy + m_wh * 0.5], axis=-1)          l_dxyhw = 1 / (1 + np.exp(-l_pred[l_valid_indices, :4]))         l_xy = (l_dxyhw[:, 0:2] * 2.0 + self.l_grid[l_valid_indices,:] - 1.0) * self.strides[2]         l_wh = (l_dxyhw[:, 2:4] * 2.0) ** 2 * self.l_anchors[l_valid_indices, :]         l_xyxy = np.concatenate([l_xy - l_wh * 0.5, l_xy + l_wh * 0.5], axis=-1)          # 大中小特征层阈值筛选结果拼接         xyxy = np.concatenate((s_xyxy, m_xyxy, l_xyxy), axis=0)         scores = np.concatenate((s_scores, m_scores, l_scores), axis=0)         ids = np.concatenate((s_ids, m_ids, l_ids), axis=0)      elif method == 0:         # 方法2:使用get_output获取输出         print("n=== 方法2: 使用get_output ===")         if not self.inf.get_output():             raise RuntimeError("获取输出失败")          classes_scores = self.inf.outputs[0].data  # (1, 80, 80, 18)         bboxes = self.inf.outputs[1].data         # (1, 40, 40, 18)         print(f"classes_scores: shape={classes_scores.shape}")         print(f"bboxes: shape={bboxes.shape}")          # 直接使用4D数据         # 每个网格有3个anchor,每个anchor预测6个值(4个框坐标+1个objectness+1个类别)         batch, height, width, channels = classes_scores.shape         num_anchors = 3         pred_per_anchor = 6          scores_list = []         boxes_list = []         ids_list = []         # 处理每个网格点         for h in range(height):             for w in range(width):                 for a in range(num_anchors):                     # 获取当前anchor的预测值                     start_idx = int(a * pred_per_anchor)                     box = classes_scores[0, h, w, start_idx:start_idx+4].copy()  # 框坐标                     obj_score = float(classes_scores[0, h, w, start_idx+4])      # objectness                     cls_score = float(classes_scores[0, h, w, start_idx+5])      # 类别分数                     # sigmoid激活                     obj_score = 1 / (1 + np.exp(-obj_score))                     cls_score = 1 / (1 + np.exp(-cls_score))                     score = obj_score * cls_score                      # 如果分数超过阈值,保存这个预测                     if score >= self.conf:                         # 解码框坐标                         box = 1 / (1 + np.exp(-box))  # sigmoid                         cx = float((box[0] * 2.0 + w - 0.5) * self.strides[0])                         cy = float((box[1] * 2.0 + h - 0.5) * self.strides[0])                         w_pred = float((box[2] * 2.0) ** 2 * self.anchors[0][a*2])                         h_pred = float((box[3] * 2.0) ** 2 * self.anchors[0][a*2+1])                          # 转换为xyxy格式                         x1 = cx - w_pred/2                         y1 = cy - h_pred/2                         x2 = cx + w_pred/2                         y2 = cy + h_pred/2                          boxes_list.append([x1, y1, x2, y2])                         scores_list.append(float(score))  # 确保是标量                         ids_list.append(0)  # 假设只有一个类别         if boxes_list:             xyxy = np.array(boxes_list, dtype=np.float32)             scores = np.array(scores_list, dtype=np.float32)             ids = np.array(ids_list, dtype=np.int32)         else:             xyxy = np.array([], dtype=np.float32).reshape(0, 4)             scores = np.array([], dtype=np.float32)             ids = np.array([], dtype=np.int32)      else:         raise ValueError("method must be 0 or 1")      # NMS处理     indices = cv2.dnn.NMSBoxes(xyxy.tolist(), scores.tolist(), self.conf, self.iou)      if len(indices) > 0:         indices = np.array(indices).flatten()         self.bboxes = (xyxy[indices] * np.array([self.x_scale, self.y_scale, self.x_scale, self.y_scale])).astype(np.int32)         self.scores = scores[indices]         self.ids = ids[indices]     else:         print("No detections after NMS")         self.bboxes = np.array([], dtype=np.int32).reshape(0, 4)         self.scores = np.array([], dtype=np.float32)         self.ids = np.array([], dtype=np.int32) 

(6)完成draw_detection()结果绘制函数

        结果绘制函数比较简单,无非就是使用OpenCV将输入进来的检测结果绘制成检测框和类别信息,因此便不再详细叙述,直接贴出我的完整代码:

def draw_detection(self,img: np.array,                        box,                       score: float,                        class_id: int,                       labelname: list):     x1, y1, x2, y2 = box     rdk_colors = [         (255, 0, 0),    # 红色         (0, 255, 0),    # 绿色         (0, 0, 255),    # 蓝色         (255, 255, 0),  # 黄色         (255, 0, 255),  # 紫色         (0, 255, 255),  # 青色     ]     color = rdk_colors[class_id % len(rdk_colors)]     cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)     label = f"{labelname[class_id]}: {score:.2f}"     (label_width, label_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)     label_x = x1     label_y = y1 - 10 if y1 - 10 > label_height else y1 + 10     cv2.rectangle(         img,          (label_x, label_y - label_height),          (label_x + label_width, label_y + label_height),          color,          cv2.FILLED     )     cv2.putText(img, label, (label_x, label_y),                  cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA) 

(7)完成detect_result()结果处理函数

        接着我们完成detect_result()函数,这个函数我设计是用来实现对推理结果的处理,但是示例中我们只用来保存检测结果和输出检测信息,具体代码如下:

def detect_result(self, img):     if isinstance(img, str):         draw_img = cv2.imread(img)     else:         draw_img = img.copy()     for class_id, score, bbox in zip(self.ids, self.scores, self.bboxes):         x1, y1, x2, y2 = bbox         print("(%d, %d, %d, %d) -> %s: %.2f"%(x1,y1,x2,y2, self.labelname[class_id], score))         if self.is_save:             self.draw_detection(draw_img, (x1, y1, x2, y2), score, class_id, self.labelname)     if self.is_save:             cv2.imwrite("result.jpg", draw_img) 

(8)完成detect()检测函数

        最后我们便可以来完成detect()函数啦,经过我们上面的各种封装,我们只需要掉包就好啦,具体代码及推理API介绍如下:

两种方式让你用Python轻松在RDKX5上部署推理

def detect(self, img_path, method_pre=0, method_post=1):         """         检测函数         Args:             img_path: 图片路径或图片数组             method_pre: 预处理方法                 - 0: 使用read_input                 - 1: 使用read_img_to_nv12             method_post: 后处理方法                 - '0': 使用get_output                 - '1': 使用get_infer_res_np_float32         """         self.PreProcess(img_path, method=method_pre) # 预处理         self.inf.forward(self.mode) # 推理         self.PostProcess(method_post) # 后处理         self.detect_result(img) #获得结果 

(9)完成主函数main

        最后我们来完成主函数部分,这部分也非常简单,大家直接看叭:

if __name__ == "__main__":     labelname = ["tennis"]     test_img = "/path/to/your/img"     model_path = "/path/to/your/model"     infer = BPU_Detect( model_path, labelname)     infer.detect( test_img, method_pre=1, method_post=0) 

        那如果要实现实时推理我们该怎么办呢?很简单我们只需要在继承BPU_Detect类的时候传入mode=True参数就好啦,这样的化每次推理结束后便不会立即释放推理句柄啦!剩下的就只需要在循环中不断传入图像便可以得到结果啦!

infer = BPU_Detect( model_path, coconame, mode = True) 

两种方式让你用Python轻松在RDKX5上部署推理

两种方式让你用Python轻松在RDKX5上部署推理

(10)完整代码示例

        仅供参考!代码写的比较丑陋大家最好根据自己的需求自行修改

import cv2 import numpy as np from scipy.special import softmax from scipy.special import expit as sigmoid from time import time import bpu_infer_lib  # Model Zoo Python API from typing import Tuple import os  class BPU_Detect:     def __init__(self, model_path:str,                 labelnames:list,                 num_classes:int = None,                 conf:float = 0.45,                 iou:float = 0.45,                 anchors:np.array = np.array([                     [10,13, 16,30, 33,23],  # P3/8                     [30,61, 62,45, 59,119],  # P4/16                     [116,90, 156,198, 373,326],  # P5/32                    ]),                 strides = np.array([8, 16, 32]),                 mode:bool = False,                 is_save:bool = False                 ):         self.model = model_path         self.labelname = labelnames         self.inf = bpu_infer_lib.Infer(False)         self.inf.load_model(self.model)         self.conf = conf         self.iou = iou         self.anchors = anchors         self.strides = strides         self.input_w = 640         self.input_h = 640         self.nc = num_classes if num_classes is not None else len(self.labelname)         self.mode = mode         self.is_save = is_save         self._init_grids()              def _init_grids(self) :         """初始化特征图网格"""         def _create_grid(stride: int) :             """创建单个stride的网格和anchors"""             grid = np.stack([                 np.tile(np.linspace(0.5, self.input_w//stride - 0.5, self.input_w//stride),                         reps=self.input_h//stride),                 np.repeat(np.arange(0.5, self.input_h//stride + 0.5, 1),                           self.input_w//stride)             ], axis=0).transpose(1,0)             grid = np.hstack([grid] * 3).reshape(-1, 2)                          anchors = np.tile(                 self.anchors[int(np.log2(stride/8))],                  self.input_w//stride * self.input_h//stride             ).reshape(-1, 2)                          return grid, anchors                      # 创建不同尺度的网格         self.s_grid, self.s_anchors = _create_grid(self.strides[0])         self.m_grid, self.m_anchors = _create_grid(self.strides[1])          self.l_grid, self.l_anchors = _create_grid(self.strides[2])                  print(f"网格尺寸: {self.s_grid.shape = }  {self.m_grid.shape = }  {self.l_grid.shape = }")         print(f"Anchors尺寸: {self.s_anchors.shape = }  {self.m_anchors.shape = }  {self.l_anchors.shape = }")        def bgr2nv12_opencv(self, image):         height, width = image.shape[0], image.shape[1]         area = height * width         yuv420p = cv2.cvtColor(image, cv2.COLOR_BGR2YUV_I420).reshape((area * 3 // 2,))         y = yuv420p[:area]         uv_planar = yuv420p[area:].reshape((2, area // 4))         uv_packed = uv_planar.transpose((1, 0)).reshape((area // 2,))          nv12 = np.zeros_like(yuv420p)         nv12[:height * width] = y         nv12[height * width:] = uv_packed         return  nv12          def PreProcess(self, img, method=0):         """         预处理函数         Args:             img: 输入图像或图片路径             method: 选择使用的方法                 - 0: 使用read_input                 - 1: 使用read_img_to_nv12         """         # 获取原始图片和尺寸         if isinstance(img, str):             # 输入是图片路径             if method == 1:                 # 先获取原始图片尺寸                 orig_img = cv2.imread(img)                 if orig_img is None:                     raise ValueError(f"无法读取图片: {img}")                 img_h, img_w = orig_img.shape[0:2]                 self.inf.read_img_to_nv12(img, 0)# 使用API的方法直接读取             elif method == 0:                 # method == 0,读取图片后处理                 orig_img = cv2.imread(img)                 if orig_img is None:                     raise ValueError(f"无法读取图片: {img}")                 input_tensor = cv2.resize(orig_img, (self.input_h, self.input_w))                 input_tensor = self.bgr2nv12_opencv(input_tensor)                 self.inf.read_input(input_tensor, 0)                 img_h, img_w = orig_img.shape[0:2]         else:             print("输入格式有误")             return False                  # 计算缩放比例         self.y_scale = img_h / self.input_h         self.x_scale = img_w / self.input_w                  print(f"原始尺寸: {img_w}x{img_h}, 输入尺寸: {self.input_w}x{self.input_h}")         print(f"缩放比例: x_scale={self.x_scale}, y_scale={self.y_scale}")                  return True      def PostPrecess(self, method=1):         """         后处理函数         Args:             method: 选择使用的方法                 - '0': 使用get_output                 - '1': 使用get_infer_res_np_float32         """         if method == 1 :             # 方法1:使用get_infer_res_np_float32获取原始输出并处理             print("n=== 方法1: 使用get_infer_res_np_float32 ===")             s_pred = self.inf.get_infer_res_np_float32(0)             m_pred = self.inf.get_infer_res_np_float32(1)             l_pred = self.inf.get_infer_res_np_float32(2)             print(f"原始输出: {s_pred.shape = }  {m_pred.shape = }  {l_pred.shape = }")              # reshape             s_pred = s_pred.reshape([-1, (5 + self.nc)])             m_pred = m_pred.reshape([-1, (5 + self.nc)])             l_pred = l_pred.reshape([-1, (5 + self.nc)])             print(f"Reshape后: {s_pred.shape = }  {m_pred.shape = }  {l_pred.shape = }")              # classify: 利用numpy向量化操作完成阈值筛选             s_raw_max_scores = np.max(s_pred[:, 5:], axis=1)             s_max_scores = 1 / ((1 + np.exp(-s_pred[:, 4]))*(1 + np.exp(-s_raw_max_scores)))             s_valid_indices = np.flatnonzero(s_max_scores >= self.conf)             s_ids = np.argmax(s_pred[s_valid_indices, 5:], axis=1)             s_scores = s_max_scores[s_valid_indices]              m_raw_max_scores = np.max(m_pred[:, 5:], axis=1)             m_max_scores = 1 / ((1 + np.exp(-m_pred[:, 4]))*(1 + np.exp(-m_raw_max_scores)))             m_valid_indices = np.flatnonzero(m_max_scores >= self.conf)             m_ids = np.argmax(m_pred[m_valid_indices, 5:], axis=1)             m_scores = m_max_scores[m_valid_indices]              l_raw_max_scores = np.max(l_pred[:, 5:], axis=1)             l_max_scores = 1 / ((1 + np.exp(-l_pred[:, 4]))*(1 + np.exp(-l_raw_max_scores)))             l_valid_indices = np.flatnonzero(l_max_scores >= self.conf)             l_ids = np.argmax(l_pred[l_valid_indices, 5:], axis=1)             l_scores = l_max_scores[l_valid_indices]              # 特征解码             s_dxyhw = 1 / (1 + np.exp(-s_pred[s_valid_indices, :4]))             s_xy = (s_dxyhw[:, 0:2] * 2.0 + self.s_grid[s_valid_indices,:] - 1.0) * self.strides[0]             s_wh = (s_dxyhw[:, 2:4] * 2.0) ** 2 * self.s_anchors[s_valid_indices, :]             s_xyxy = np.concatenate([s_xy - s_wh * 0.5, s_xy + s_wh * 0.5], axis=-1)              m_dxyhw = 1 / (1 + np.exp(-m_pred[m_valid_indices, :4]))             m_xy = (m_dxyhw[:, 0:2] * 2.0 + self.m_grid[m_valid_indices,:] - 1.0) * self.strides[1]             m_wh = (m_dxyhw[:, 2:4] * 2.0) ** 2 * self.m_anchors[m_valid_indices, :]             m_xyxy = np.concatenate([m_xy - m_wh * 0.5, m_xy + m_wh * 0.5], axis=-1)              l_dxyhw = 1 / (1 + np.exp(-l_pred[l_valid_indices, :4]))             l_xy = (l_dxyhw[:, 0:2] * 2.0 + self.l_grid[l_valid_indices,:] - 1.0) * self.strides[2]             l_wh = (l_dxyhw[:, 2:4] * 2.0) ** 2 * self.l_anchors[l_valid_indices, :]             l_xyxy = np.concatenate([l_xy - l_wh * 0.5, l_xy + l_wh * 0.5], axis=-1)              # 大中小特征层阈值筛选结果拼接             xyxy = np.concatenate((s_xyxy, m_xyxy, l_xyxy), axis=0)             scores = np.concatenate((s_scores, m_scores, l_scores), axis=0)             ids = np.concatenate((s_ids, m_ids, l_ids), axis=0)          elif method == 0:             # 方法2:使用get_output获取输出             print("n=== 方法2: 使用get_output ===")             if not self.inf.get_output():                 raise RuntimeError("获取输出失败")                          classes_scores = self.inf.outputs[0].data  # (1, 80, 80, 18)             bboxes = self.inf.outputs[1].data         # (1, 40, 40, 18)             print(f"classes_scores: shape={classes_scores.shape}")             print(f"bboxes: shape={bboxes.shape}")                          # 直接使用4D数据             # 每个网格有3个anchor,每个anchor预测6个值(4个框坐标+1个objectness+1个类别)             batch, height, width, channels = classes_scores.shape             num_anchors = 3             pred_per_anchor = 6                          scores_list = []             boxes_list = []             ids_list = []             # 处理每个网格点             for h in range(height):                 for w in range(width):                     for a in range(num_anchors):                         # 获取当前anchor的预测值                         start_idx = int(a * pred_per_anchor)                         box = classes_scores[0, h, w, start_idx:start_idx+4].copy()  # 框坐标                         obj_score = float(classes_scores[0, h, w, start_idx+4])      # objectness                         cls_score = float(classes_scores[0, h, w, start_idx+5])      # 类别分数                         # sigmoid激活                         obj_score = 1 / (1 + np.exp(-obj_score))                         cls_score = 1 / (1 + np.exp(-cls_score))                         score = obj_score * cls_score                                                  # 如果分数超过阈值,保存这个预测                         if score >= self.conf:                             # 解码框坐标                             box = 1 / (1 + np.exp(-box))  # sigmoid                             cx = float((box[0] * 2.0 + w - 0.5) * self.strides[0])                             cy = float((box[1] * 2.0 + h - 0.5) * self.strides[0])                             w_pred = float((box[2] * 2.0) ** 2 * self.anchors[0][a*2])                             h_pred = float((box[3] * 2.0) ** 2 * self.anchors[0][a*2+1])                                                          # 转换为xyxy格式                             x1 = cx - w_pred/2                             y1 = cy - h_pred/2                             x2 = cx + w_pred/2                             y2 = cy + h_pred/2                                                          boxes_list.append([x1, y1, x2, y2])                             scores_list.append(float(score))  # 确保是标量                             ids_list.append(0)  # 假设只有一个类别             if boxes_list:                 xyxy = np.array(boxes_list, dtype=np.float32)                 scores = np.array(scores_list, dtype=np.float32)                 ids = np.array(ids_list, dtype=np.int32)             else:                 xyxy = np.array([], dtype=np.float32).reshape(0, 4)                 scores = np.array([], dtype=np.float32)                 ids = np.array([], dtype=np.int32)          else:             raise ValueError("method must be 0 or 1")          # NMS处理         indices = cv2.dnn.NMSBoxes(xyxy.tolist(), scores.tolist(), self.conf, self.iou)                  if len(indices) > 0:             indices = np.array(indices).flatten()             self.bboxes = (xyxy[indices] * np.array([self.x_scale, self.y_scale, self.x_scale, self.y_scale])).astype(np.int32)             self.scores = scores[indices]             self.ids = ids[indices]         else:             print("No detections after NMS")             self.bboxes = np.array([], dtype=np.int32).reshape(0, 4)             self.scores = np.array([], dtype=np.float32)             self.ids = np.array([], dtype=np.int32)        def draw_detection(self,img: np.array,                        box,                       score: float,                        class_id: int,                       labelname: list):         x1, y1, x2, y2 = box         rdk_colors = [             (255, 0, 0),    # 红色             (0, 255, 0),    # 绿色             (0, 0, 255),    # 蓝色             (255, 255, 0),  # 黄色             (255, 0, 255),  # 紫色             (0, 255, 255),  # 青色         ]         color = rdk_colors[class_id % len(rdk_colors)]         cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)         label = f"{labelname[class_id]}: {score:.2f}"         (label_width, label_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)         label_x = x1         label_y = y1 - 10 if y1 - 10 > label_height else y1 + 10         cv2.rectangle(             img,              (label_x, label_y - label_height),              (label_x + label_width, label_y + label_height),              color,              cv2.FILLED         )         cv2.putText(img, label, (label_x, label_y),                      cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)              def detect_result(self, img):         if isinstance(img, str):             draw_img = cv2.imread(img)         else:             draw_img = img.copy()         for class_id, score, bbox in zip(self.ids, self.scores, self.bboxes):             x1, y1, x2, y2 = bbox             print("(%d, %d, %d, %d) -> %s: %.2f"%(x1,y1,x2,y2, self.labelname[class_id], score))             if self.is_save:                 self.draw_detection(draw_img, (x1, y1, x2, y2), score, class_id, self.labelname)         if self.is_save:                 cv2.imwrite("result.jpg", draw_img)      def detect(self, img, method_pre=0, method_post=1):         """         检测函数         Args:             img_path: 图片路径或图片数组             method_pre: 预处理方法                 - 0: 使用read_input(默认,读取图片后处理)                 - 1: 使用read_img_to_nv12(直接读取路径)             method_post: 后处理方法         """         # 预处理         self.PreProcess(img, method=method_pre)                  # 推理和后处理         self.inf.forward(self.mode)         self.PostPrecess(method_post)         self.detect_result(img)  if __name__ == "__main__":     coconame = ["tennis"]     test_img = "/root/Deep_Learning/YOLOv5/imgs/tennis_1_frame_0001.jpg"     model_path = "/root/Deep_Learning/YOLOv5/models/tennis_detect_640x640_bayese_.bin"     infer = BPU_Detect(model_path,coconame,conf=0.1)     infer.detect(test_img,method_pre=1,method_post=0) 

二、BSP——hobot_dnn

        在介绍了ModelZoo的推理方法后我们便能很快的完成BSP版本的使用啦,我们接下来将基于上述完成的ModelZoo代码主要介绍两版有变化的部分,个人感觉BSP的版本比较贴近C++版本的使用感觉

        首先我们先查看一下官方手册,我们可以发现BSP版本主要是使用的Model对象,它包含了inputsoutputsforward等成员和方法,因此首先我们导入pyeasy_dnn包:

from hobot_dnn import pyeasy_dnn as dnn  # BSP Python API 

(1)修改BPU_Detect类及初始化部分

        接着我们在类中使用pyeasy_dnn的接口加载模型以及获取输入的张量格式

self.models = dnn.load(self.model_path) self.model = self.models[0]  # 获取第一个模型  self.input_shape = self.model.inputs[0].properties.shape self.input_w = self.input_shape[2]  # NCHW格式 self.input_h = self.input_shape[3] 

(2)修改预处理PreProcess部分

        然后便是修改预处理部分,之前ModelZoo的版本需要使用read_input函数接口来传入图像数据,但是BSP版的将图像数据作为了推理接口的参数,因此预处理部分只需要处理图像尺寸和图像格式即可,修改后的代码如下:

def PreProcess(self, img):     """预处理函数"""     # 获取原始图片和尺寸     if isinstance(img, str):         # 输入是图片路径         orig_img = cv2.imread(img)         if orig_img is None:             raise ValueError(f"无法读取图片: {img}")     else:         orig_img = img     img_h, img_w = orig_img.shape[0:2]     # 调整图像大小并转换为NV12格式     input_tensor = cv2.resize(orig_img, (self.input_w, self.input_h))     input_tensor = self.bgr2nv12_opencv(input_tensor)     # 计算缩放比例     self.y_scale = img_h / self.input_h     self.x_scale = img_w / self.input_w     print(f"原始尺寸: {img_w}x{img_h}, 输入尺寸: {self.input_w}x{self.input_h}")     print(f"缩放比例: x_scale={self.x_scale}, y_scale={self.y_scale}")     return input_tensor 

(3)修改后处理PostProcess部分

        我们继续看到后处理部分,BSP版的后处理与ModelZoo版的get_infer_res_np_float32接口类似,我们首先需要获取模型的输出:

outputs = self.model_outputs 

        接着我们从outputs中获取三个尺度的输出即可,剩下的代码和ModelZoo版中get_infer_res_np_float32的处理方式一样

# 处理三个输出层 s_pred = outputs[0].buffer.reshape([-1, (5 + self.nc)]) m_pred = outputs[1].buffer.reshape([-1, (5 + self.nc)]) l_pred = outputs[2].buffer.reshape([-1, (5 + self.nc)] 

(4)修改推理detect()部分

        最后我们只需要修改推理部分的推理接口就好啦,我们查看一下官方的API手册,推理部分的介绍如下:

两种方式让你用Python轻松在RDKX5上部署推理

        据此我们可以知道我们只需要从预处理函数获取处理之后的输入数据input_tensor后即可调用forward进行推理啦,forward的三个参数分别是输入数据、推理模型的BPU_ID和推理任务的优先级,具体代码如下:

def detect(self, img):     """检测主函数"""     input_tensor = self.PreProcess(img)# 预处理     self.model_outputs = self.model.forward(input_tensor)# 模型推理     self.PostProcess()# 后处理     self.detect_result(img)# 显示结果 

(5)完整代码示例

import cv2 import numpy as np from scipy.special import softmax from scipy.special import expit as sigmoid from time import time from hobot_dnn import pyeasy_dnn as dnn  # BSP Python API from typing import Tuple import os  class BPU_Detect:     def __init__(self, model_path:str,                 labelnames:list,                 num_classes:int = None,                 conf:float = 0.45,                 iou:float = 0.45,                 anchors:np.array = np.array([                     [10,13, 16,30, 33,23],  # P3/8                     [30,61, 62,45, 59,119],  # P4/16                     [116,90, 156,198, 373,326],  # P5/32                    ]),                 strides = np.array([8, 16, 32]),                 mode:bool = False,                 is_save:bool = False                 ):         self.model_path = model_path         self.labelname = labelnames          # 加载模型         self.models = dnn.load(self.model_path)         self.model = self.models[0]  # 获取第一个模型                  self.conf = conf         self.iou = iou         self.anchors = anchors         self.strides = strides         # 从模型输入获取输入尺寸         self.input_shape = self.model.inputs[0].properties.shape         self.input_w = self.input_shape[2]  # NCHW格式         self.input_h = self.input_shape[3]         self.nc = num_classes if num_classes is not None else len(self.labelname)         self.mode = mode         self.is_save = is_save         self._init_grids()      def _init_grids(self) :         """初始化特征图网格"""         def _create_grid(stride: int) :             """创建单个stride的网格和anchors"""             grid = np.stack([                 np.tile(np.linspace(0.5, self.input_w//stride - 0.5, self.input_w//stride),                         reps=self.input_h//stride),                 np.repeat(np.arange(0.5, self.input_h//stride + 0.5, 1),                           self.input_w//stride)             ], axis=0).transpose(1,0)             grid = np.hstack([grid] * 3).reshape(-1, 2)                          anchors = np.tile(                 self.anchors[int(np.log2(stride/8))],                  self.input_w//stride * self.input_h//stride             ).reshape(-1, 2)                          return grid, anchors                      # 创建不同尺度的网格         self.s_grid, self.s_anchors = _create_grid(self.strides[0])         self.m_grid, self.m_anchors = _create_grid(self.strides[1])          self.l_grid, self.l_anchors = _create_grid(self.strides[2])                  print(f"网格尺寸: {self.s_grid.shape = }  {self.m_grid.shape = }  {self.l_grid.shape = }")         print(f"Anchors尺寸: {self.s_anchors.shape = }  {self.m_anchors.shape = }  {self.l_anchors.shape = }")      def bgr2nv12_opencv(self, image):         """将BGR图像转换为NV12格式"""         height, width = image.shape[0], image.shape[1]         area = height * width         yuv420p = cv2.cvtColor(image, cv2.COLOR_BGR2YUV_I420).reshape((area * 3 // 2,))         y = yuv420p[:area]         uv_planar = yuv420p[area:].reshape((2, area // 4))         uv_packed = uv_planar.transpose((1, 0)).reshape((area // 2,))          nv12 = np.zeros_like(yuv420p)         nv12[:height * width] = y         nv12[height * width:] = uv_packed         return nv12          def PreProcess(self, img):         """预处理函数"""         # 获取原始图片和尺寸         if isinstance(img, str):             # 输入是图片路径             orig_img = cv2.imread(img)             if orig_img is None:                 raise ValueError(f"无法读取图片: {img}")         else:             orig_img = img                      img_h, img_w = orig_img.shape[0:2]         # 调整图像大小并转换为NV12格式         input_tensor = cv2.resize(orig_img, (self.input_w, self.input_h))         input_tensor = self.bgr2nv12_opencv(input_tensor)                  # 计算缩放比例         self.y_scale = img_h / self.input_h         self.x_scale = img_w / self.input_w                  print(f"原始尺寸: {img_w}x{img_h}, 输入尺寸: {self.input_w}x{self.input_h}")         print(f"缩放比例: x_scale={self.x_scale}, y_scale={self.y_scale}")                  return input_tensor      def PostProcess(self):         """后处理函数"""         # 获取模型输出         outputs = self.model_outputs                  # 处理三个输出层         s_pred = outputs[0].buffer.reshape([-1, (5 + self.nc)])         m_pred = outputs[1].buffer.reshape([-1, (5 + self.nc)])         l_pred = outputs[2].buffer.reshape([-1, (5 + self.nc)])                  print(f"输出形状: {s_pred.shape = }  {m_pred.shape = }  {l_pred.shape = }")          # 处理小特征图输出         s_raw_max_scores = np.max(s_pred[:, 5:], axis=1)         s_max_scores = 1 / ((1 + np.exp(-s_pred[:, 4]))*(1 + np.exp(-s_raw_max_scores)))         s_valid_indices = np.flatnonzero(s_max_scores >= self.conf)         s_ids = np.argmax(s_pred[s_valid_indices, 5:], axis=1)         s_scores = s_max_scores[s_valid_indices]          # 处理中特征图输出         m_raw_max_scores = np.max(m_pred[:, 5:], axis=1)         m_max_scores = 1 / ((1 + np.exp(-m_pred[:, 4]))*(1 + np.exp(-m_raw_max_scores)))         m_valid_indices = np.flatnonzero(m_max_scores >= self.conf)         m_ids = np.argmax(m_pred[m_valid_indices, 5:], axis=1)         m_scores = m_max_scores[m_valid_indices]          # 处理大特征图输出         l_raw_max_scores = np.max(l_pred[:, 5:], axis=1)         l_max_scores = 1 / ((1 + np.exp(-l_pred[:, 4]))*(1 + np.exp(-l_raw_max_scores)))         l_valid_indices = np.flatnonzero(l_max_scores >= self.conf)         l_ids = np.argmax(l_pred[l_valid_indices, 5:], axis=1)         l_scores = l_max_scores[l_valid_indices]          # 特征解码         s_dxyhw = 1 / (1 + np.exp(-s_pred[s_valid_indices, :4]))         s_xy = (s_dxyhw[:, 0:2] * 2.0 + self.s_grid[s_valid_indices,:] - 1.0) * self.strides[0]         s_wh = (s_dxyhw[:, 2:4] * 2.0) ** 2 * self.s_anchors[s_valid_indices, :]         s_xyxy = np.concatenate([s_xy - s_wh * 0.5, s_xy + s_wh * 0.5], axis=-1)          m_dxyhw = 1 / (1 + np.exp(-m_pred[m_valid_indices, :4]))         m_xy = (m_dxyhw[:, 0:2] * 2.0 + self.m_grid[m_valid_indices,:] - 1.0) * self.strides[1]         m_wh = (m_dxyhw[:, 2:4] * 2.0) ** 2 * self.m_anchors[m_valid_indices, :]         m_xyxy = np.concatenate([m_xy - m_wh * 0.5, m_xy + m_wh * 0.5], axis=-1)          l_dxyhw = 1 / (1 + np.exp(-l_pred[l_valid_indices, :4]))         l_xy = (l_dxyhw[:, 0:2] * 2.0 + self.l_grid[l_valid_indices,:] - 1.0) * self.strides[2]         l_wh = (l_dxyhw[:, 2:4] * 2.0) ** 2 * self.l_anchors[l_valid_indices, :]         l_xyxy = np.concatenate([l_xy - l_wh * 0.5, l_xy + l_wh * 0.5], axis=-1)          # 合并所有预测结果         xyxy = np.concatenate((s_xyxy, m_xyxy, l_xyxy), axis=0)         scores = np.concatenate((s_scores, m_scores, l_scores), axis=0)         ids = np.concatenate((s_ids, m_ids, l_ids), axis=0)          # NMS处理         indices = cv2.dnn.NMSBoxes(xyxy.tolist(), scores.tolist(), self.conf, self.iou)                  if len(indices) > 0:             indices = np.array(indices).flatten()             self.bboxes = (xyxy[indices] * np.array([self.x_scale, self.y_scale, self.x_scale, self.y_scale])).astype(np.int32)             self.scores = scores[indices]             self.ids = ids[indices]         else:             print("未检测到目标")             self.bboxes = np.array([], dtype=np.int32).reshape(0, 4)             self.scores = np.array([], dtype=np.float32)             self.ids = np.array([], dtype=np.int32)      def draw_detection(self, img: np.array,                        box,                       score: float,                        class_id: int,                       labelname: list):         """绘制检测结果"""         x1, y1, x2, y2 = box         rdk_colors = [             (255, 0, 0),    # 红色             (0, 255, 0),    # 绿色             (0, 0, 255),    # 蓝色             (255, 255, 0),  # 黄色             (255, 0, 255),  # 紫色             (0, 255, 255),  # 青色         ]         color = rdk_colors[class_id % len(rdk_colors)]         cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)         label = f"{labelname[class_id]}: {score:.2f}"         (label_width, label_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)         label_x = x1         label_y = y1 - 10 if y1 - 10 > label_height else y1 + 10         cv2.rectangle(             img,              (label_x, label_y - label_height),              (label_x + label_width, label_y + label_height),              color,              cv2.FILLED         )         cv2.putText(img, label, (label_x, label_y),                      cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)      def detect_result(self, img):         """显示检测结果"""         if isinstance(img, str):             draw_img = cv2.imread(img)         else:             draw_img = img.copy()                      for class_id, score, bbox in zip(self.ids, self.scores, self.bboxes):             x1, y1, x2, y2 = bbox             print("(%d, %d, %d, %d) -> %s: %.2f"%(x1,y1,x2,y2, self.labelname[class_id], score))             if self.is_save:                 self.draw_detection(draw_img, (x1, y1, x2, y2), score, class_id, self.labelname)                  if self.is_save:             cv2.imwrite("result.jpg", draw_img)      def detect(self, img):         """检测主函数"""         input_tensor = self.PreProcess(img)# 预处理         self.model_outputs = self.model.forward(input_tensor)# 模型推理         self.PostProcess()# 后处理         self.detect_result(img)# 显示结果  if __name__ == "__main__":     coconame = ["tennis"]     test_img = "/root/Deep_Learning/YOLOv5/imgs/tennis_1_frame_0001.jpg"     model_path = "/root/Deep_Learning/YOLOv5/models/tennis_detect_640x640_bayese_.bin"     infer = BPU_Detect(model_path, coconame, conf=0.1, is_save=True)     infer.detect(test_img)  

至此,我们便完成了RDK系列ModelZoo和BSP两种版本的推理介绍啦!!!

发表评论

评论已关闭。

相关文章