| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- import argparse
- import sophon.sail as sail
- import cv2
- import os
- import logging
- import json
- import numpy as np
- class Predictor:
- def __init__(self):
- # 加载推理引擎
- self.net = sail.Engine(args.bmodel, args.dev_id, sail.IOMode.SYSIO)
- self.graph_name = self.net.get_graph_names()[0]
- self.input_names = self.net.get_input_names(self.graph_name)
- self.input_shapes = [self.net.get_input_shape(self.graph_name, name) for name in self.input_names]
- self.output_names = self.net.get_output_names(self.graph_name)
- self.output_shapes = [self.net.get_output_shape(self.graph_name, name) for name in self.output_names] # [[1, 2]]
- self.input_name = self.input_names[0]
- self.input_shape = self.input_shapes[0] # [1, 3, 256, 256]
- self.batch_size = self.input_shape[0]
- self.net_h = self.input_shape[2] # 输入图像的高
- self.net_w = self.input_shape[3] # 输入图像的宽
- # 归一化参数,采用imagenet预训练参数
- self.mean = [0.485, 0.456, 0.406]
- self.std = [0.229, 0.224, 0.225]
- self.print_network_info()
- def __call__(self, img):
- return self.predict(img)
- def print_network_info(self):
- info = {
- 'Graph Name': self.graph_name,
- 'Input Name': self.input_name,
- 'Output Names': self.output_names,
- 'Output Shapes': self.output_shapes,
- 'Input Shape': self.input_shape,
- 'Batch Size': self.batch_size,
- 'Height': self.net_h,
- 'Width': self.net_w,
- 'Mean': self.mean,
- 'Std': self.std
- }
- print("=" * 50)
- print("Network Configuration Info")
- print("=" * 50)
- for key, value in info.items():
- print(f"{key:<18}: {value}")
- print("=" * 50)
- def predict(self, input_img):
- input_data = {self.input_name: input_img}
- outputs = self.net.process(self.graph_name, input_data)
- # print('predict fun:', outputs)
- # print('predict fun return:', list(outputs.values())[0])
- return list(outputs.values())[0]
- def preprocess(self, img):
- h, w, _ = img.shape
- if h != 256 or w != 256:
- img = cv2.resize(img, (self.net_w, self.net_h))
- img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
- img = img.astype('float32')
- img = (img / 255 - self.mean) / self.std # 这一步是很有必要的
- # img = img / 255. # 编译过程并不会帮你做归一化,所以这里要自己做归一化,否则预测数值可能会非常不准确
- img = np.transpose(img, (2, 0, 1))
- return img
- def postprocess(self, outputs):
- outputs_exp = np.exp(outputs)
- # print('exp res:', outputs_exp)
- outputs = outputs_exp / np.sum(outputs_exp, axis=1)[:, None]
- # print('softmax res:', outputs)
- predictions = np.argmax(outputs, axis=1) # 返回最大概率的类别
- # print('预测结果:', predictions)
- return predictions
- def argsparser():
- parser = argparse.ArgumentParser(prog=__file__)
- parser.add_argument('--input', type=str, default='./test', help='path of input, must be image directory')
- parser.add_argument('--bmodel', type=str, default='./shufflenet_f32.bmodel', help='path of bmodel')
- parser.add_argument('--dev_id', type=int, default=0, help='tpu id')
- args = parser.parse_args()
- return args
- def main(args):
- # 加载推理引擎
- predictor = Predictor()
- # 获取图片
- cls = os.listdir(args.input)
- # cls = os.listdir('./label_data/test')
- filename = {}
- for c in cls:
- # filename[c] = [os.path.join('./label_data/test', p) for p in os.listdir(os.path.join('./label_data/test', c))]
- filename[c] = [os.path.join(args.input,c, p) for p in os.listdir(os.path.join(args.input, c))]
- print('find data:')
- for k, v in filename.items():
- print(f'{k}: {len(v)}')
- print('开始推理...')
- res_list=[]
- for k, v in filename.items():
- for i in v:
- src_img = cv2.imread(i, cv2.IMREAD_COLOR)
- src_img = predictor.preprocess(src_img)
- res = [-1]
- if src_img is not None:
- src_img = np.stack([src_img])
- res = predictor(src_img)
- res = predictor.postprocess(res)
- print(f'{k}: {res[0]}')
- res_list.append([i, k, res[0]])
- # 保存结果为 json
- res_dict = {}
- for path, real_value, pred_value in res_list:
- res_dict[path] = {'y':int(real_value), 'x':int(pred_value)}
- res_save_path = f'{predictor.graph_name}_prediction_result.json'
- with open(res_save_path, 'w') as f:
- json.dump(res_dict, f)
- print(f'保存结果为 {res_save_path} 成功!')
- if __name__ == '__main__':
- args = argsparser()
- main(args)
|