import time import torch import torch.nn as nn from torchvision import transforms from torchvision.models import resnet18,resnet50, squeezenet1_0, shufflenet_v2_x1_0 import numpy as np from PIL import Image import os import argparse from labelme.utils import draw_grid, draw_predict_grid import cv2 import matplotlib.pyplot as plt from dotenv import load_dotenv load_dotenv() # os.environ['CUDA_LAUNCH_BLOCKING'] = '0' patch_w = int(os.getenv('PATCH_WIDTH', 256)) patch_h = int(os.getenv('PATCH_HEIGHT', 256)) confidence_threshold = float(os.getenv('CONFIDENCE_THRESHOLD', 0.80)) scale = 2 class Predictor: def __init__(self, model_name, weights_path, num_classes): self.model_name = model_name self.weights_path = weights_path self.num_classes = num_classes self.model = None self.use_bias = os.getenv('USE_BIAS', True) self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"当前设备: {self.device}") # 加载模型 self.load_model() def load_model(self): if self.model is not None: return print(f"正在加载模型: {self.model_name}") # 加载模型 if self.model_name== 'resnet50': self.model = resnet50() elif self.model_name == 'squeezenet': self.model = squeezenet1_0() elif self.model_name == 'shufflenet': self.model = shufflenet_v2_x1_0() else: raise ValueError(f"Invalid model name: {self.model_name}") # 替换最后的分类层以适应新的分类任务 if hasattr(self.model, 'fc'): # ResNet系列模型 self.model.fc = nn.Linear(int(self.model.fc.in_features), self.num_classes, bias=self.use_bias) elif hasattr(self.model, 'classifier'): # SqueezeNet、ShuffleNet系列模型 if self.model_name == 'squeezenet': # 获取SqueezeNet的最后一个卷积层的输入通道数 final_conv_in_channels = self.model.classifier[1].in_channels # 替换classifier为新的Sequential,将输出改为2类 self.model.classifier = nn.Sequential( nn.Dropout(p=0.5), nn.Conv2d(final_conv_in_channels, self.num_classes, kernel_size=(1, 1)), nn.ReLU(inplace=True), nn.AdaptiveAvgPool2d((1, 1)) ) else: # Swin Transformer等模型 self.model.classifier = nn.Linear(int(self.model.classifier.in_features), self.num_classes, bias=True) elif hasattr(self.model, 'head'): # Swin Transformer使用head层 self.model.head = nn.Linear(int(self.model.head.in_features), self.num_classes, bias=self.use_bias) else: raise ValueError(f"Model {self.model_name} does not have recognizable classifier layer") print(self.model) # 加载训练好的权重 self.model.load_state_dict(torch.load(self.weights_path, map_location=torch.device('cpu'))) print(f"成功加载模型参数: {self.weights_path}") # 将模型移动到GPU self.model.eval() self.model = self.model.to(self.device) print(f"成功加载模型: {self.model_name}") def predict(self, image_tensor): """ 对单张图像进行预测 Args: image_tensor: 预处理后的图像张量 Returns: predicted_class: 预测的类别索引 confidence: 预测置信度 probabilities: 各类别的概率 """ image_tensor = image_tensor.to(self.device) with torch.no_grad(): outputs = self.model(image_tensor) probabilities = torch.softmax(outputs, dim=1) # 沿行计算softmax confidence, predicted_class = torch.max(probabilities, 1) return confidence.cpu().numpy(), predicted_class.cpu().numpy() def preprocess_image(img): """ 预处理图像以匹配训练时的预处理 Args: img: PIL图像 Returns: tensor: 预处理后的图像张量 """ # 定义与训练时相同的预处理步骤 transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) # 打开并转换图像 img_w, img_h = img.size global patch_w, patch_h imgs_patch = [] imgs_index = [] # fig, axs = plt.subplots(img_h // patch_h + 1, img_w // patch_w + 1) for i in range(img_h // patch_h + 1): for j in range(img_w // patch_w + 1): left = j * patch_w # 裁剪区域左边框距离图像左边的像素值 top = i * patch_h # 裁剪区域上边框距离图像上边的像素值 right = min(j * patch_w + patch_w, img_w) # 裁剪区域右边框距离图像左边的像素值 bottom = min(i * patch_h + patch_h, img_h) # 裁剪区域下边框距离图像上边的像素值 # 检查区域是否有效 if right > left and bottom > top: patch = img.crop((left, top, right, bottom)) # 长宽比过滤 # rate = patch.height / (patch.width + 1e-6) # if rate > 1.314 or rate < 0.75: # # print(f"长宽比过滤: {patch_name}") # continue imgs_patch.append(patch) imgs_index.append((left, top)) # axs[i, j].imshow(patch) # axs[i, j].set_title(f'Image {i} {j}') # axs[i, j].axis('off') # plt.tight_layout() # plt.show() imgs_patch = torch.stack([transform(img) for img in imgs_patch]) # 添加批次维度 # image_tensor = image_tensor.unsqueeze(0) return imgs_index, imgs_patch def visualize_prediction(image_path, predicted_class, confidence, class_names): """ 可视化预测结果 Args: image_path: 图像路径 predicted_class: 预测的类别索引 confidence: 预测置信度 class_names: 类别名称列表 """ image = Image.open(image_path).convert('RGB') plt.figure(figsize=(8, 6)) plt.imshow(image) plt.axis('off') plt.title(f'Predicted: {class_names[predicted_class]}\n' f'Confidence: {confidence:.4f}', fontsize=14) plt.show() def get_33_patch(arr:np.ndarray, center_row:int, center_col:int): """以(center_row,center_col)为中心,从arr中取出来3*3区域的数据""" # 边界检查 h,w = arr.shape safe_row_up_limit = max(0, center_row-1) safe_row_bottom_limit = min(h, center_row+2) safe_col_left_limit = max(0, center_col-1) safe_col_right_limit = min(w, center_col+2) return arr[safe_row_up_limit:safe_row_bottom_limit, safe_col_left_limit:safe_col_right_limit] def fileter_prediction(predicted_class, confidence, pre_rows, pre_cols, filter_down_limit=3): """预测结果矩阵滤波,九宫格内部存在浑浊水体的数量需要大于filter_down_limit,""" predicted_class_mat = np.resize(predicted_class, (pre_rows, pre_cols)) predicted_conf_mat = np.resize(confidence, (pre_rows, pre_cols)) new_predicted_class_mat = predicted_class_mat.copy() new_predicted_conf_mat = predicted_conf_mat.copy() for i in range(pre_rows): for j in range(pre_cols): if (1. - predicted_class_mat[i, j]) > 0.1: continue # 跳过背景类 core_region = get_33_patch(predicted_class_mat, i, j) if np.sum(core_region) < filter_down_limit: new_predicted_class_mat[i, j] = 0 # 重置为背景类 new_predicted_conf_mat[i, j] = 1.0 return new_predicted_conf_mat.flatten(), new_predicted_class_mat.flatten() def discriminate_ratio(water_pre_list:list): # 方式一:60%以上的帧存在浑浊水体 water_pre_arr = np.array(water_pre_list, dtype=np.float32) water_pre_arr_sum = np.sum(water_pre_arr, axis=0) bad_water = np.array(water_pre_arr_sum >= 0.6*len(water_pre_list), dtype=np.int32) bad_flag = bool(np.sum(bad_water, dtype=np.int32) > 2) # 大于两个patch符合要求才可以 print(f'浑浊比例判别:该时间段是否存在浑浊水体:{bad_flag}') return bad_flag def discriminate_cont(pre_class_arr, continuous_count_mat): """连续帧判别""" positive_index = np.array(pre_class_arr,dtype=np.int32) > 0 negative_index = np.array(pre_class_arr,dtype=np.int32) == 0 # 给负样本区域置零 continuous_count_mat[negative_index] = 0 # 给正样本区域加1 continuous_count_mat[positive_index] += 1 # 判断浑浊 bad_flag = np.max(continuous_count_mat) > 30 if bad_flag: print(f'连续帧方式:该时间段是否存在浑浊水体:{bool(bad_flag)}') return bad_flag def main(): # 初始化模型实例 # TODO:修改模型网络名称/模型权重路径/视频路径 predictor = Predictor(model_name='shufflenet', weights_path=r'./shufflenet.pth', num_classes=2) input_path = r'D:\code\water_turbidity_det\frame_data\train\20251230\4video_20251229160514' # 预处理图像 all_imgs = os.listdir(input_path) all_imgs = [os.path.join(input_path, p) for p in all_imgs if p.split('.')[-1] in ['jpg', 'png']] image = Image.open(all_imgs[0]).convert('RGB') # 将预测结果reshape为矩阵时的行列数量 pre_rows = image.height // patch_h + 1 pre_cols = image.width // patch_w + 1 # 图像显示时resize的尺寸 resized_img_h = image.height // 2 resized_img_w = image.width // 2 # 预测每张图像 water_pre_list = [] continuous_count_mat = np.zeros(pre_rows*pre_cols, dtype=np.int32) flag = False for img_path in all_imgs: image = Image.open(img_path).convert('RGB') # 预处理 patches_index, image_tensor = preprocess_image(image) # patches_index:list[tuple, ...] # 推理 confidence, predicted_class = predictor.predict(image_tensor) # confidence: np.ndarray, shape=(x,), predicted_class: np.ndarray, shape=(x,), raw_outputs: np.ndarray, shape=(x,) # 第一层虚警抑制,置信度过滤,低于阈值将会被忽略 for i in range(len(confidence)): if confidence[i] < confidence_threshold and predicted_class[i] == 1: confidence[i] = 1.0 predicted_class[i] = 0 # 第二层虚警抑制,空间滤波 # 在此处添加过滤逻辑 # print('原始预测结果:', predicted_class) new_confidence, new_predicted_class = fileter_prediction(predicted_class, confidence, pre_rows, pre_cols, filter_down_limit=3) # print('过滤后预测结果:', new_predicted_class) # 可视化预测结果 image = cv2.imread(img_path) image = draw_grid(image, patch_w, patch_h) image = draw_predict_grid(image, patches_index, predicted_class, confidence) new_image = cv2.imread(img_path) new_image = draw_grid(new_image, patch_w, patch_h) new_image = draw_predict_grid(new_image, patches_index, new_predicted_class, new_confidence) image = cv2.resize(image, (resized_img_w, resized_img_h)) new_img = cv2.resize(new_image, (resized_img_w, resized_img_h)) cv2.imshow('image', image) cv2.imshow('image_filter', new_img) cv2.waitKey(25) # 方式1判别 if len(water_pre_list) > 25: flag = discriminate_ratio(water_pre_list) and flag water_pre_list = [] print('综合判别结果:', flag) water_pre_list.append(new_predicted_class) # 方式2判别 flag = discriminate_cont(new_predicted_class, continuous_count_mat) if __name__ == "__main__": main()