import argparse import sophon.sail as sail import cv2 import os import logging import json import numpy as np class Predictor: def __init__(self): # 加载推理引擎 self.net = sail.Engine(args.bmodel, args.dev_id, sail.IOMode.SYSIO) self.graph_name = self.net.get_graph_names()[0] self.input_names = self.net.get_input_names(self.graph_name) self.input_shapes = [self.net.get_input_shape(self.graph_name, name) for name in self.input_names] self.output_names = self.net.get_output_names(self.graph_name) self.output_shapes = [self.net.get_output_shape(self.graph_name, name) for name in self.output_names] # [[1, 2]] self.input_name = self.input_names[0] self.input_shape = self.input_shapes[0] # [1, 3, 256, 256] self.batch_size = self.input_shape[0] self.net_h = self.input_shape[2] # 输入图像的高 self.net_w = self.input_shape[3] # 输入图像的宽 # 归一化参数,采用imagenet预训练参数 self.mean = [0.485, 0.456, 0.406] self.std = [0.229, 0.224, 0.225] self.print_network_info() def __call__(self, img): return self.predict(img) def print_network_info(self): info = { 'Graph Name': self.graph_name, 'Input Name': self.input_name, 'Output Names': self.output_names, 'Output Shapes': self.output_shapes, 'Input Shape': self.input_shape, 'Batch Size': self.batch_size, 'Height': self.net_h, 'Width': self.net_w, 'Mean': self.mean, 'Std': self.std } print("=" * 50) print("Network Configuration Info") print("=" * 50) for key, value in info.items(): print(f"{key:<18}: {value}") print("=" * 50) def predict(self, input_img): input_data = {self.input_name: input_img} outputs = self.net.process(self.graph_name, input_data) # print('predict fun:', outputs) # print('predict fun return:', list(outputs.values())[0]) return list(outputs.values())[0] def preprocess(self, img): h, w, _ = img.shape if h != 256 or w != 256: img = cv2.resize(img, (self.net_w, self.net_h)) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = img.astype('float32') img = (img / 255 - self.mean) / self.std # 这一步是很有必要的 # img = img / 255. # 编译过程并不会帮你做归一化,所以这里要自己做归一化,否则预测数值可能会非常不准确 img = np.transpose(img, (2, 0, 1)) return img def postprocess(self, outputs): outputs_exp = np.exp(outputs) # print('exp res:', outputs_exp) outputs = outputs_exp / np.sum(outputs_exp, axis=1)[:, None] # print('softmax res:', outputs) predictions = np.argmax(outputs, axis=1) # 返回最大概率的类别 # print('预测结果:', predictions) return predictions def argsparser(): parser = argparse.ArgumentParser(prog=__file__) parser.add_argument('--input','-i', type=str, default='./test', help='path of input, must be image directory') parser.add_argument('--bmodel','-b', type=str, default='./shufflenet_f32.bmodel', help='path of bmodel') parser.add_argument('--dev_id','-d', type=int, default=0, help='tpu id') args = parser.parse_args() return args def main(args): # 加载推理引擎 predictor = Predictor() # 获取图片 cls = os.listdir(args.input) # cls = os.listdir('./label_data/test') filename = {} for c in cls: # filename[c] = [os.path.join('./label_data/test', p) for p in os.listdir(os.path.join('./label_data/test', c))] filename[c] = [os.path.join(args.input,c, p) for p in os.listdir(os.path.join(args.input, c))] print('find data:') for k, v in filename.items(): print(f'{k}: {len(v)}') print('开始推理...') res_list=[] for k, v in filename.items(): for i in v: src_img = cv2.imread(i, cv2.IMREAD_COLOR) src_img = predictor.preprocess(src_img) res = [-1] if src_img is not None: src_img = np.stack([src_img]) res = predictor(src_img) res = predictor.postprocess(res) print(f'{k}: {res[0]}') res_list.append([i, k, res[0]]) # 保存结果为 json res_dict = {} for path, real_value, pred_value in res_list: res_dict[path] = {'y':int(real_value), 'x':int(pred_value)} res_save_path = f'{predictor.graph_name}_prediction_result.json' with open(res_save_path, 'w') as f: json.dump(res_dict, f) print(f'保存结果为 {res_save_path} 成功!') if __name__ == '__main__': args = argsparser() main(args)