| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295 |
- import time
- import torch
- import torch.nn as nn
- from torchvision import transforms
- from torchvision.models import resnet18,resnet50, squeezenet1_0, shufflenet_v2_x1_0
- import numpy as np
- from PIL import Image
- import os
- import argparse
- from labelme.utils import draw_grid, draw_predict_grid
- import cv2
- import matplotlib.pyplot as plt
- from dotenv import load_dotenv
- load_dotenv()
- # os.environ['CUDA_LAUNCH_BLOCKING'] = '0'
- patch_w = int(os.getenv('PATCH_WIDTH', 256))
- patch_h = int(os.getenv('PATCH_HEIGHT', 256))
- confidence_threshold = float(os.getenv('CONFIDENCE_THRESHOLD', 0.80))
- scale = 2
- class Predictor:
- def __init__(self, model_name, weights_path, num_classes):
- self.model_name = model_name
- self.weights_path = weights_path
- self.num_classes = num_classes
- self.model = None
- self.use_bias = os.getenv('USE_BIAS', True)
- self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- print(f"当前设备: {self.device}")
- # 加载模型
- self.load_model()
- def load_model(self):
- if self.model is not None:
- return
- print(f"正在加载模型: {self.model_name}")
- # 加载模型
- if self.model_name== 'resnet50':
- self.model = resnet50()
- elif self.model_name == 'squeezenet':
- self.model = squeezenet1_0()
- elif self.model_name == 'shufflenet':
- self.model = shufflenet_v2_x1_0()
- else:
- raise ValueError(f"Invalid model name: {self.model_name}")
- # 替换最后的分类层以适应新的分类任务
- if hasattr(self.model, 'fc'):
- # ResNet系列模型
- self.model.fc = nn.Linear(int(self.model.fc.in_features), self.num_classes, bias=self.use_bias)
- elif hasattr(self.model, 'classifier'):
- # SqueezeNet、ShuffleNet系列模型
- if self.model_name == 'squeezenet':
- # 获取SqueezeNet的最后一个卷积层的输入通道数
- final_conv_in_channels = self.model.classifier[1].in_channels
- # 替换classifier为新的Sequential,将输出改为2类
- self.model.classifier = nn.Sequential(
- nn.Dropout(p=0.5),
- nn.Conv2d(final_conv_in_channels, self.num_classes, kernel_size=(1, 1)),
- nn.ReLU(inplace=True),
- nn.AdaptiveAvgPool2d((1, 1))
- )
- else:
- # Swin Transformer等模型
- self.model.classifier = nn.Linear(int(self.model.classifier.in_features), self.num_classes, bias=True)
- elif hasattr(self.model, 'head'):
- # Swin Transformer使用head层
- self.model.head = nn.Linear(int(self.model.head.in_features), self.num_classes, bias=self.use_bias)
- else:
- raise ValueError(f"Model {self.model_name} does not have recognizable classifier layer")
- print(self.model)
- # 加载训练好的权重
- self.model.load_state_dict(torch.load(self.weights_path, map_location=torch.device('cpu')))
- print(f"成功加载模型参数: {self.weights_path}")
- # 将模型移动到GPU
- self.model.eval()
- self.model = self.model.to(self.device)
- print(f"成功加载模型: {self.model_name}")
- def predict(self, image_tensor):
- """
- 对单张图像进行预测
- Args:
- image_tensor: 预处理后的图像张量
- Returns:
- predicted_class: 预测的类别索引
- confidence: 预测置信度
- probabilities: 各类别的概率
- """
- image_tensor = image_tensor.to(self.device)
- with torch.no_grad():
- outputs = self.model(image_tensor)
- probabilities = torch.softmax(outputs, dim=1) # 沿行计算softmax
- confidence, predicted_class = torch.max(probabilities, 1)
- return confidence.cpu().numpy(), predicted_class.cpu().numpy()
- def preprocess_image(img):
- """
- 预处理图像以匹配训练时的预处理
-
- Args:
- img: PIL图像
-
- Returns:
- tensor: 预处理后的图像张量
- """
- # 定义与训练时相同的预处理步骤
- transform = transforms.Compose([
- transforms.Resize((224, 224)),
- transforms.ToTensor(),
- transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
- ])
- # 打开并转换图像
- img_w, img_h = img.size
- global patch_w, patch_h
- imgs_patch = []
- imgs_index = []
- # fig, axs = plt.subplots(img_h // patch_h + 1, img_w // patch_w + 1)
- for i in range(img_h // patch_h + 1):
- for j in range(img_w // patch_w + 1):
- left = j * patch_w # 裁剪区域左边框距离图像左边的像素值
- top = i * patch_h # 裁剪区域上边框距离图像上边的像素值
- right = min(j * patch_w + patch_w, img_w) # 裁剪区域右边框距离图像左边的像素值
- bottom = min(i * patch_h + patch_h, img_h) # 裁剪区域下边框距离图像上边的像素值
- # 检查区域是否有效
- if right > left and bottom > top:
- patch = img.crop((left, top, right, bottom))
- # 长宽比过滤
- # rate = patch.height / (patch.width + 1e-6)
- # if rate > 1.314 or rate < 0.75:
- # # print(f"长宽比过滤: {patch_name}")
- # continue
- imgs_patch.append(patch)
- imgs_index.append((left, top))
- # axs[i, j].imshow(patch)
- # axs[i, j].set_title(f'Image {i} {j}')
- # axs[i, j].axis('off')
- # plt.tight_layout()
- # plt.show()
- imgs_patch = torch.stack([transform(img) for img in imgs_patch])
- # 添加批次维度
- # image_tensor = image_tensor.unsqueeze(0)
- return imgs_index, imgs_patch
- def visualize_prediction(image_path, predicted_class, confidence, class_names):
- """
- 可视化预测结果
-
- Args:
- image_path: 图像路径
- predicted_class: 预测的类别索引
- confidence: 预测置信度
- class_names: 类别名称列表
- """
- image = Image.open(image_path).convert('RGB')
-
- plt.figure(figsize=(8, 6))
- plt.imshow(image)
- plt.axis('off')
- plt.title(f'Predicted: {class_names[predicted_class]}\n'
- f'Confidence: {confidence:.4f}', fontsize=14)
- plt.show()
- def get_33_patch(arr:np.ndarray, center_row:int, center_col:int):
- """以(center_row,center_col)为中心,从arr中取出来3*3区域的数据"""
- # 边界检查
- h,w = arr.shape
- safe_row_up_limit = max(0, center_row-1)
- safe_row_bottom_limit = min(h, center_row+2)
- safe_col_left_limit = max(0, center_col-1)
- safe_col_right_limit = min(w, center_col+2)
- return arr[safe_row_up_limit:safe_row_bottom_limit, safe_col_left_limit:safe_col_right_limit]
- def fileter_prediction(predicted_class, confidence, pre_rows, pre_cols, filter_down_limit=3):
- """预测结果矩阵滤波,九宫格内部存在浑浊水体的数量需要大于filter_down_limit,"""
- predicted_class_mat = np.resize(predicted_class, (pre_rows, pre_cols))
- predicted_conf_mat = np.resize(confidence, (pre_rows, pre_cols))
- new_predicted_class_mat = predicted_class_mat.copy()
- new_predicted_conf_mat = predicted_conf_mat.copy()
- for i in range(pre_rows):
- for j in range(pre_cols):
- if (1. - predicted_class_mat[i, j]) > 0.1:
- continue # 跳过背景类
- core_region = get_33_patch(predicted_class_mat, i, j)
- if np.sum(core_region) < filter_down_limit:
- new_predicted_class_mat[i, j] = 0 # 重置为背景类
- new_predicted_conf_mat[i, j] = 1.0
- return new_predicted_conf_mat.flatten(), new_predicted_class_mat.flatten()
- def discriminate_ratio(water_pre_list:list):
- # 方式一:60%以上的帧存在浑浊水体
- water_pre_arr = np.array(water_pre_list, dtype=np.float32)
- water_pre_arr_sum = np.sum(water_pre_arr, axis=0)
- bad_water = np.array(water_pre_arr_sum >= 0.6 * len(water_pre_list), dtype=np.int32)
- bad_flag = bool(np.sum(bad_water, dtype=np.int32) > 2) # 大于两个patch符合要求才可以
- print(f'浑浊比例判别:该时间段是否存在浑浊水体:{bad_flag}')
- return bad_flag
- def discriminate_count(pre_class_arr, continuous_count_mat):
- """连续帧判别"""
- positive_index = np.array(pre_class_arr,dtype=np.int32) > 0
- negative_index = np.array(pre_class_arr,dtype=np.int32) == 0
- # 给负样本区域置零
- continuous_count_mat[negative_index] -= 1
- # 给正样本区域加1
- continuous_count_mat[positive_index] += 1
- # 保证不出现负数
- continuous_count_mat[continuous_count_mat<0] = 0
- # 判断浑浊
- bad_flag = bool(np.sum(continuous_count_mat > 15) > 2)
- print(f'连续帧方式:该时间段是否存在浑浊水体:{bad_flag}')
- return bad_flag
- def main():
- # 初始化模型实例
- # TODO:修改模型网络名称/模型权重路径/视频路径
- predictor = Predictor(model_name='shufflenet',
- weights_path=r'./shufflenet.pth',
- num_classes=2)
- input_path = r'D:\code\water_turbidity_det\frame_data\train\20251225\4_video_20251223163145'
- # 预处理图像
- all_imgs = os.listdir(input_path)
- all_imgs = [os.path.join(input_path, p) for p in all_imgs if p.split('.')[-1] in ['jpg', 'png']]
- image = Image.open(all_imgs[0]).convert('RGB')
- # 将预测结果reshape为矩阵时的行列数量
- pre_rows = image.height // patch_h + 1
- pre_cols = image.width // patch_w + 1
- # 图像显示时resize的尺寸
- resized_img_h = image.height // 2
- resized_img_w = image.width // 2
- # 预测每张图像
- water_pre_list = []
- continuous_count_mat = np.zeros(pre_rows*pre_cols, dtype=np.int32)
- flag = False
- for img_path in all_imgs:
- image = Image.open(img_path).convert('RGB')
- # 预处理
- patches_index, image_tensor = preprocess_image(image) # patches_index:list[tuple, ...]
- # 推理
- confidence, predicted_class = predictor.predict(image_tensor) # confidence: np.ndarray, shape=(x,), predicted_class: np.ndarray, shape=(x,), raw_outputs: np.ndarray, shape=(x,)
- # 第一层虚警抑制,置信度过滤,低于阈值将会被忽略
- for i in range(len(confidence)):
- if confidence[i] < confidence_threshold and predicted_class[i] == 1:
- confidence[i] = 1.0
- predicted_class[i] = 0
- # 第二层虚警抑制,空间滤波
- # 在此处添加过滤逻辑
- # print('原始预测结果:', predicted_class)
- new_confidence, new_predicted_class = fileter_prediction(predicted_class, confidence, pre_rows, pre_cols, filter_down_limit=3)
- # print('过滤后预测结果:', new_predicted_class)
- # 可视化预测结果
- image = cv2.imread(img_path)
- image = draw_grid(image, patch_w, patch_h)
- image = draw_predict_grid(image, patches_index, predicted_class, confidence)
- new_image = cv2.imread(img_path)
- new_image = draw_grid(new_image, patch_w, patch_h)
- new_image = draw_predict_grid(new_image, patches_index, new_predicted_class, new_confidence)
- image = cv2.resize(image, (resized_img_w, resized_img_h))
- new_img = cv2.resize(new_image, (resized_img_w, resized_img_h))
- cv2.imshow('image', image)
- cv2.imshow('image_filter', new_img)
- cv2.waitKey(25)
- water_pre_list.append(new_predicted_class)
- # 方式2判别
- flag = discriminate_count(new_predicted_class, continuous_count_mat)
- # 方式1判别
- if len(water_pre_list) > 25:
- flag = discriminate_ratio(water_pre_list) and flag
- print('综合判别结果:', flag)
- if __name__ == "__main__":
- main()
|