import time import torch import torch.nn as nn from torchvision import transforms from model.model_zoon import load_model import numpy as np from PIL import Image import os import argparse from labelme.utils import draw_grid, draw_predict_grid import cv2 import matplotlib.pyplot as plt from dotenv import load_dotenv load_dotenv() # os.environ['CUDA_LAUNCH_BLOCKING'] = '0' patch_w = int(os.getenv('PATCH_WIDTH', 256)) patch_h = int(os.getenv('PATCH_HEIGHT', 256)) confidence_threshold = float(os.getenv('CONFIDENCE_THRESHOLD', 0.80)) scale = 2 class Predictor: def __init__(self, model_name, weights_path, num_classes): self.model_name = model_name self.weights_path = weights_path self.num_classes = num_classes # self.use_bias = os.getenv('USE_BIAS', True) self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"当前设备: {self.device}") self.model = self.load_model() # 判定有浑浊水体时的连续帧数量 self.max_continuous_frames = 100 # 比例判定的起始帧数 self.start_frame_num = self.max_continuous_frames # 比例判定的阈值 self.ratio_threshold = 0.90 # 报警的置信度阈值 self.confidence_threshold = 0.90 def load_model(self): return load_model(name=self.model_name, num_classes=self.num_classes, weights_path=self.weights_path, device=self.device) def predict(self, image_tensor): """ 对单张图像进行预测 Args: image_tensor: 预处理后的图像张量 Returns: predicted_class: 预测的类别索引 confidence: 预测置信度 probabilities: 各类别的概率 """ image_tensor = image_tensor.to(self.device) with torch.no_grad(): outputs = self.model(image_tensor) probabilities = torch.softmax(outputs, dim=1) # 沿行计算softmax confidence, predicted_class = torch.max(probabilities, 1) return confidence.cpu().numpy(), predicted_class.cpu().numpy() def preprocess_image(img): """ 预处理图像以匹配训练时的预处理 Args: img: PIL图像 Returns: tensor: 预处理后的图像张量 """ # 定义与训练时相同的预处理步骤 transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) # 打开并转换图像 img_w, img_h = img.size global patch_w, patch_h imgs_patch = [] imgs_index = [] # fig, axs = plt.subplots(img_h // patch_h + 1, img_w // patch_w + 1) for i in range(img_h // patch_h + 1): for j in range(img_w // patch_w + 1): left = j * patch_w # 裁剪区域左边框距离图像左边的像素值 top = i * patch_h # 裁剪区域上边框距离图像上边的像素值 right = min(j * patch_w + patch_w, img_w) # 裁剪区域右边框距离图像左边的像素值 bottom = min(i * patch_h + patch_h, img_h) # 裁剪区域下边框距离图像上边的像素值 # 检查区域是否有效 if right > left and bottom > top: patch = img.crop((left, top, right, bottom)) # 长宽比过滤 # rate = patch.height / (patch.width + 1e-6) # if rate > 1.314 or rate < 0.75: # # print(f"长宽比过滤: {patch_name}") # continue imgs_patch.append(patch) imgs_index.append((left, top)) # axs[i, j].imshow(patch) # axs[i, j].set_title(f'Image {i} {j}') # axs[i, j].axis('off') # plt.tight_layout() # plt.show() imgs_patch = torch.stack([transform(img) for img in imgs_patch]) # 添加批次维度 # image_tensor = image_tensor.unsqueeze(0) return imgs_index, imgs_patch def visualize_prediction(image_path, predicted_class, confidence, class_names): """ 可视化预测结果 Args: image_path: 图像路径 predicted_class: 预测的类别索引 confidence: 预测置信度 class_names: 类别名称列表 """ image = Image.open(image_path).convert('RGB') plt.figure(figsize=(8, 6)) plt.imshow(image) plt.axis('off') plt.title(f'Predicted: {class_names[predicted_class]}\n' f'Confidence: {confidence:.4f}', fontsize=14) plt.show() def get_33_patch(arr:np.ndarray, center_row:int, center_col:int): """以(center_row,center_col)为中心,从arr中取出来3*3区域的数据""" # 边界检查 h,w = arr.shape safe_row_up_limit = max(0, center_row-1) safe_row_bottom_limit = min(h, center_row+2) safe_col_left_limit = max(0, center_col-1) safe_col_right_limit = min(w, center_col+2) return arr[safe_row_up_limit:safe_row_bottom_limit, safe_col_left_limit:safe_col_right_limit] def fileter_prediction(predicted_class, confidence, pre_rows, pre_cols, filter_down_limit=3): """预测结果矩阵滤波,九宫格内部存在浑浊水体的数量需要大于filter_down_limit,""" predicted_class_mat = np.resize(predicted_class, (pre_rows, pre_cols)) predicted_conf_mat = np.resize(confidence, (pre_rows, pre_cols)) new_predicted_class_mat = predicted_class_mat.copy() new_predicted_conf_mat = predicted_conf_mat.copy() for i in range(pre_rows): for j in range(pre_cols): if (1. - predicted_class_mat[i, j]) > 0.1: continue # 跳过背景类 core_region = get_33_patch(predicted_class_mat, i, j) if np.sum(core_region) < filter_down_limit: new_predicted_class_mat[i, j] = 0 # 重置为背景类 new_predicted_conf_mat[i, j] = 1.0 return new_predicted_conf_mat.flatten(), new_predicted_class_mat.flatten() def discriminate_ratio(water_pre_list:list, right_ratio:float): # 方式一:60%以上的帧存在浑浊水体 water_pre_arr = np.array(water_pre_list, dtype=np.float32) water_pre_arr_sum = np.sum(water_pre_arr, axis=0) bad_water = np.array(water_pre_arr_sum >= right_ratio * len(water_pre_list), dtype=np.int32) bad_flag = bool(np.sum(bad_water, dtype=np.int32) > 2) # 大于两个patch符合要求才可以 print(f'浑浊比例判别:该时间段是否存在浑浊水体:{bad_flag}') return bad_flag def discriminate_count(pre_class_arr, continuous_count_mat,max_continuous_frames): """连续帧判别""" positive_index = np.array(pre_class_arr,dtype=np.int32) > 0 negative_index = np.array(pre_class_arr,dtype=np.int32) == 0 # 给负样本区域置零 continuous_count_mat[negative_index] -= 3 # 给正样本区域加1 continuous_count_mat[positive_index] += 1 # 保证不出现负数 continuous_count_mat[continuous_count_mat<0] = 0 # 判断浑浊 bad_flag = bool(np.sum(continuous_count_mat > max_continuous_frames) > 2) print(f'连续帧方式:该时间段是否存在浑浊水体:{bad_flag}') return bad_flag def main(): # 初始化模型实例 # TODO:修改模型网络名称/模型权重路径/视频路径 predictor = Predictor(model_name='shufflenet-x2', weights_path=r'./shufflenet-x2.pth', num_classes=2) input_path = r'D:\code\water_turbidity_det\tem_test\2_ch52_20260113011503_0' # 预处理图像 all_imgs = os.listdir(input_path) all_imgs = [os.path.join(input_path, p) for p in all_imgs if p.split('.')[-1] in ['jpg', 'png']] image = Image.open(all_imgs[0]).convert('RGB') # 将预测结果reshape为矩阵时的行列数量 pre_rows = image.height // patch_h + 1 pre_cols = image.width // patch_w + 1 # 图像显示时resize的尺寸 resized_img_h = image.height // 2 resized_img_w = image.width // 2 # 预测每张图像 water_pre_list = [] continuous_count_mat = np.zeros(pre_rows*pre_cols, dtype=np.int32) flag = False for img_path in all_imgs: image = Image.open(img_path).convert('RGB') # 预处理 patches_index, image_tensor = preprocess_image(image) # patches_index:list[tuple, ...] # 推理 confidence, predicted_class = predictor.predict(image_tensor) # confidence: np.ndarray, shape=(x,), predicted_class: np.ndarray, shape=(x,), raw_outputs: np.ndarray, shape=(x,) # 第一层虚警抑制,置信度过滤,低于阈值将会被忽略 for i in range(len(confidence)): if confidence[i] < confidence_threshold and predicted_class[i] == 1: confidence[i] = 1.0 predicted_class[i] = 0 # 第二层虚警抑制,空间滤波 # 在此处添加过滤逻辑 # print('原始预测结果:', predicted_class) new_confidence, new_predicted_class = fileter_prediction(predicted_class, confidence, pre_rows, pre_cols, filter_down_limit=3) # print('过滤后预测结果:', new_predicted_class) # 可视化预测结果 image = cv2.imread(img_path) image = draw_grid(image, patch_w, patch_h) image = draw_predict_grid(image, patches_index, predicted_class, confidence) new_image = cv2.imread(img_path) new_image = draw_grid(new_image, patch_w, patch_h) new_image = draw_predict_grid(new_image, patches_index, new_predicted_class, new_confidence) image = cv2.resize(image, (resized_img_w, resized_img_h)) new_img = cv2.resize(new_image, (resized_img_w, resized_img_h)) cv2.imshow('image', image) cv2.imshow('image_filter', new_img) cv2.waitKey(25) water_pre_list.append(new_predicted_class) # 方式2判别 flag = discriminate_count(new_predicted_class, continuous_count_mat, predictor.max_continuous_frames) # 方式1判别 if len(water_pre_list) > predictor.start_frame_num: flag = discriminate_ratio(water_pre_list, predictor.ratio_threshold) and flag print('综合判别结果:', flag) if __name__ == "__main__": main()