| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- import torch
- import torch.nn as nn
- from torchvision import transforms
- from torchvision.models import resnet18, resnet50
- import numpy as np
- from PIL import Image
- import os
- import argparse
- from labelme.utils import draw_grid
- import cv2
- import matplotlib.pyplot as plt
- from dotenv import load_dotenv
- load_dotenv()
- # os.environ['CUDA_LAUNCH_BLOCKING'] = '0'
- patch_w = int(os.getenv('PATCH_WIDTH', 256))
- patch_h = int(os.getenv('PATCH_HEIGHT', 256))
- confidence_threshold = float(os.getenv('CONFIDENCE_THRESHOLD', 0.80))
- scale = 2
- class Predictor:
- def __init__(self, model_name, weights_path, num_classes):
- self.model_name = model_name
- self.weights_path = weights_path
- self.num_classes = num_classes
- self.model = None
- self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- print(f"当前设备: {self.device}")
- # 加载模型
- self.load_model()
- # 检查模型结构
- print(self.model)
- def load_model(self):
- if self.model is not None:
- return
- print(f"正在加载模型: {self.model_name}")
- if self.model_name == 'resnet18':
- self.model = resnet18(weights=None)
- elif self.model_name == 'resnet50':
- self.model = resnet50(weights=None)
- else:
- raise ValueError(f"不支持的模型类型: {self.model_name}")
- # 修改最后的全连接层
- self.model.fc = nn.Linear(self.model.fc.in_features, self.num_classes)
- # 加载训练好的权重
- self.model.load_state_dict(torch.load(self.weights_path, map_location=torch.device('cpu')))
- print(f"成功加载模型参数: {self.weights_path}")
- # 将模型移动到GPU
- self.model.eval()
- self.model = self.model.to(self.device)
- print(f"成功加载模型: {self.model_name}")
- def predict(self, image_tensor):
- """
- 对单张图像进行预测
- Args:
- image_tensor: 预处理后的图像张量
- Returns:
- predicted_class: 预测的类别索引
- confidence: 预测置信度
- probabilities: 各类别的概率
- """
- image_tensor = image_tensor.to(self.device)
- with torch.no_grad():
- outputs = self.model(image_tensor)
- probabilities = torch.nn.functional.softmax(outputs, dim=1) # 沿行计算softmax
- confidence, predicted_class = torch.max(probabilities, 1)
- return confidence.cpu().numpy(), predicted_class.cpu().numpy()
- def preprocess_image(img):
- """
- 预处理图像以匹配训练时的预处理
-
- Args:
- img: PIL图像
-
- Returns:
- tensor: 预处理后的图像张量
- """
- # 定义与训练时相同的预处理步骤
- transform = transforms.Compose([
- transforms.Resize((224, 224)),
- transforms.ToTensor(),
- transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
- ])
- # 打开并转换图像
- img_w, img_h = img.size
- global patch_w, patch_h
- imgs_patch = []
- imgs_index = []
- # fig, axs = plt.subplots(img_h // patch_h + 1, img_w // patch_w + 1)
- for i in range(img_h // patch_h + 1):
- for j in range(img_w // patch_w + 1):
- left = j * patch_w # 裁剪区域左边框距离图像左边的像素值
- top = i * patch_h # 裁剪区域上边框距离图像上边的像素值
- right = min(j * patch_w + patch_w, img_w) # 裁剪区域右边框距离图像左边的像素值
- bottom = min(i * patch_h + patch_h, img_h) # 裁剪区域下边框距离图像上边的像素值
- # 检查区域是否有效
- if right > left and bottom > top:
- patch = img.crop((left, top, right, bottom))
- # 长宽比过滤
- # rate = patch.height / (patch.width + 1e-6)
- # if rate > 1.314 or rate < 0.75:
- # # print(f"长宽比过滤: {patch_name}")
- # continue
- imgs_patch.append(patch)
- imgs_index.append((left, top))
- # axs[i, j].imshow(patch)
- # axs[i, j].set_title(f'Image {i} {j}')
- # axs[i, j].axis('off')
- # plt.tight_layout()
- # plt.show()
- imgs_patch = torch.stack([transform(img) for img in imgs_patch])
- # 添加批次维度
- # image_tensor = image_tensor.unsqueeze(0)
- return imgs_index, imgs_patch
- def visualize_prediction(image_path, predicted_class, confidence, class_names):
- """
- 可视化预测结果
-
- Args:
- image_path: 图像路径
- predicted_class: 预测的类别索引
- confidence: 预测置信度
- class_names: 类别名称列表
- """
- image = Image.open(image_path).convert('RGB')
-
- plt.figure(figsize=(8, 6))
- plt.imshow(image)
- plt.axis('off')
- plt.title(f'Predicted: {class_names[predicted_class]}\n'
- f'Confidence: {confidence:.4f}', fontsize=14)
- plt.show()
- def main():
- # 初始化模型实例
- predictor = Predictor(model_name='resnet50',
- weights_path=r'D:\code\water_turbidity_det\resnet50_best_model_acc.pth',
- num_classes=2)
- input_path = r'D:\code\water_turbidity_det\data\video1_20251129120104_20251129123102'
- # 预处理图像
- all_imgs = os.listdir(input_path)
- all_imgs = [os.path.join(input_path, p) for p in all_imgs if p.split('.')[-1] in ['jpg', 'png']]
- for img_path in all_imgs:
- image = Image.open(img_path).convert('RGB')
- patches_index, image_tensor = preprocess_image(image)
- confidence, predicted_class = predictor.predict(image_tensor)
- # 第一层虚警抑制,置信度过滤,低于阈值将会被忽略
- for i in range(len(confidence)):
- if confidence[i] < confidence_threshold:
- confidence[i] = 1.0
- predicted_class[i] = 0
- # 第二层虚警抑制,空间滤波
- predicted_class_mat = np.resize(predicted_class, (image.height//patch_h+1, image.width//patch_w+1))
- # 可视化预测结果
- image = cv2.imread(img_path)
- image = draw_grid(image, patch_w, patch_h)
- dw = patch_w // 2
- dh = patch_h // 2
- resized_img_h = image.shape[0] // 2
- resized_img_w = image.shape[1] // 2
- for i, (idx_w, idx_h) in enumerate(patches_index):
- cv2.circle(image, (idx_w, idx_h), 10, (0, 255, 0), -1)
- text1 = f'cls:{predicted_class[i]}'
- text2 = f'prob:{confidence[i]*100:.1f}%'
- color = (0, 0, 255) if predicted_class[i] else (255, 0, 0)
- cv2.putText(image, text1, (idx_w, idx_h + dh),
- cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, 2)
- cv2.putText(image, text2, (idx_w, idx_h + dh +25),
- cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, 2)
- image = cv2.resize(image, (resized_img_w, resized_img_h))
- cv2.imshow('image', image)
- cv2.waitKey(20)
- if __name__ == "__main__":
- main()
|