import torch import torch.nn as nn from torchvision import transforms from torchvision.models import resnet18, resnet50 import numpy as np from PIL import Image import os import argparse from labelme.utils import draw_grid import cv2 import matplotlib.pyplot as plt from dotenv import load_dotenv load_dotenv() # os.environ['CUDA_LAUNCH_BLOCKING'] = '0' patch_w = int(os.getenv('PATCH_WIDTH', 256)) patch_h = int(os.getenv('PATCH_HEIGHT', 256)) confidence_threshold = float(os.getenv('CONFIDENCE_THRESHOLD', 0.80)) scale = 2 class Predictor: def __init__(self, model_name, weights_path, num_classes): self.model_name = model_name self.weights_path = weights_path self.num_classes = num_classes self.model = None self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"当前设备: {self.device}") # 加载模型 self.load_model() # 检查模型结构 print(self.model) def load_model(self): if self.model is not None: return print(f"正在加载模型: {self.model_name}") if self.model_name == 'resnet18': self.model = resnet18(weights=None) elif self.model_name == 'resnet50': self.model = resnet50(weights=None) else: raise ValueError(f"不支持的模型类型: {self.model_name}") # 修改最后的全连接层 self.model.fc = nn.Linear(self.model.fc.in_features, self.num_classes) # 加载训练好的权重 self.model.load_state_dict(torch.load(self.weights_path, map_location=torch.device('cpu'))) print(f"成功加载模型参数: {self.weights_path}") # 将模型移动到GPU self.model.eval() self.model = self.model.to(self.device) print(f"成功加载模型: {self.model_name}") def predict(self, image_tensor): """ 对单张图像进行预测 Args: image_tensor: 预处理后的图像张量 Returns: predicted_class: 预测的类别索引 confidence: 预测置信度 probabilities: 各类别的概率 """ image_tensor = image_tensor.to(self.device) with torch.no_grad(): outputs = self.model(image_tensor) probabilities = torch.nn.functional.softmax(outputs, dim=1) # 沿行计算softmax confidence, predicted_class = torch.max(probabilities, 1) return confidence.cpu().numpy(), predicted_class.cpu().numpy() def preprocess_image(img): """ 预处理图像以匹配训练时的预处理 Args: img: PIL图像 Returns: tensor: 预处理后的图像张量 """ # 定义与训练时相同的预处理步骤 transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) # 打开并转换图像 img_w, img_h = img.size global patch_w, patch_h imgs_patch = [] imgs_index = [] # fig, axs = plt.subplots(img_h // patch_h + 1, img_w // patch_w + 1) for i in range(img_h // patch_h + 1): for j in range(img_w // patch_w + 1): left = j * patch_w # 裁剪区域左边框距离图像左边的像素值 top = i * patch_h # 裁剪区域上边框距离图像上边的像素值 right = min(j * patch_w + patch_w, img_w) # 裁剪区域右边框距离图像左边的像素值 bottom = min(i * patch_h + patch_h, img_h) # 裁剪区域下边框距离图像上边的像素值 # 检查区域是否有效 if right > left and bottom > top: patch = img.crop((left, top, right, bottom)) # 长宽比过滤 # rate = patch.height / (patch.width + 1e-6) # if rate > 1.314 or rate < 0.75: # # print(f"长宽比过滤: {patch_name}") # continue imgs_patch.append(patch) imgs_index.append((left, top)) # axs[i, j].imshow(patch) # axs[i, j].set_title(f'Image {i} {j}') # axs[i, j].axis('off') # plt.tight_layout() # plt.show() imgs_patch = torch.stack([transform(img) for img in imgs_patch]) # 添加批次维度 # image_tensor = image_tensor.unsqueeze(0) return imgs_index, imgs_patch def visualize_prediction(image_path, predicted_class, confidence, class_names): """ 可视化预测结果 Args: image_path: 图像路径 predicted_class: 预测的类别索引 confidence: 预测置信度 class_names: 类别名称列表 """ image = Image.open(image_path).convert('RGB') plt.figure(figsize=(8, 6)) plt.imshow(image) plt.axis('off') plt.title(f'Predicted: {class_names[predicted_class]}\n' f'Confidence: {confidence:.4f}', fontsize=14) plt.show() def main(): # 初始化模型实例 predictor = Predictor(model_name='resnet50', weights_path=r'D:\code\water_turbidity_det\resnet50_best_model_acc.pth', num_classes=2) input_path = r'D:\code\water_turbidity_det\data\video1_20251129120104_20251129123102' # 预处理图像 all_imgs = os.listdir(input_path) all_imgs = [os.path.join(input_path, p) for p in all_imgs if p.split('.')[-1] in ['jpg', 'png']] for img_path in all_imgs: image = Image.open(img_path).convert('RGB') patches_index, image_tensor = preprocess_image(image) confidence, predicted_class = predictor.predict(image_tensor) # 第一层虚警抑制,置信度过滤,低于阈值将会被忽略 for i in range(len(confidence)): if confidence[i] < confidence_threshold: confidence[i] = 1.0 predicted_class[i] = 0 # 第二层虚警抑制,空间滤波 predicted_class_mat = np.resize(predicted_class, (image.height//patch_h+1, image.width//patch_w+1)) # 可视化预测结果 image = cv2.imread(img_path) image = draw_grid(image, patch_w, patch_h) dw = patch_w // 2 dh = patch_h // 2 resized_img_h = image.shape[0] // 2 resized_img_w = image.shape[1] // 2 for i, (idx_w, idx_h) in enumerate(patches_index): cv2.circle(image, (idx_w, idx_h), 10, (0, 255, 0), -1) text1 = f'cls:{predicted_class[i]}' text2 = f'prob:{confidence[i]*100:.1f}%' color = (0, 0, 255) if predicted_class[i] else (255, 0, 0) cv2.putText(image, text1, (idx_w, idx_h + dh), cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, 2) cv2.putText(image, text2, (idx_w, idx_h + dh +25), cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, 2) image = cv2.resize(image, (resized_img_w, resized_img_h)) cv2.imshow('image', image) cv2.waitKey(20) if __name__ == "__main__": main()