jiyuhang 3 mēneši atpakaļ
vecāks
revīzija
84d6653bc0

+ 2 - 1
.env

@@ -1,2 +1,3 @@
 PATCH_WIDTH=256
-PATCH_HEIGHT=256
+PATCH_HEIGHT=256
+CONFIDENCE_THRESHOLD=0.90

+ 3 - 0
.gitignore

@@ -37,3 +37,6 @@ __pycache__/
 *.temp
 *.log
 *.bak
+
+# 模型文件
+*.pth

+ 10 - 7
labelme/crop_patch.py

@@ -5,6 +5,7 @@ sys.path.append(os.path.dirname(os.path.abspath(__file__)))
 
 import numpy as np
 import cv2
+import gc
 from dotenv import load_dotenv
 load_dotenv()
 
@@ -14,9 +15,9 @@ patch_h = int(os.getenv('PATCH_HEIGHT', 256))
 
 def main():
     # TODO:需要修改为标注好的图片路径
-    input_path = r'D:\code\water_turbidity_det\data\video4_20251129120320_20251129123514'
+    input_path = r'D:\code\water_turbidity_det\data\4_video_202511211127'
     # TODO: 需要修改为保存patch的根目录
-    output_path_root = r'D:\code\water_turbidity_det\label_data\train'
+    output_path_root = r'D:\code\water_turbidity_det\label_data\test'
 
     # 读取标注文件
     label_path = os.path.join(input_path, 'label.txt')
@@ -66,16 +67,18 @@ def main():
                 patch = img[i*patch_h:min(i*patch_h+patch_h, img_h), j*patch_w:min(j*patch_w+patch_w, img_w), :]
                 patch_name = f'{img_base_name}_{j*patch_w}_{i*patch_h}_0.jpg'
                 # 长宽比过滤
-                if patch.shape[0] / patch.shape[1] > 1.314 or patch.shape[0] / patch.shape[1] < 0.75:
-                    print(f"长宽比过滤: {patch_name}")
+                if patch.shape[0] / (patch.shape[1]+1e-6) > 1.314 or patch.shape[0] / (patch.shape[1]+1e-6) < 0.75:
+                    #print(f"长宽比过滤: {patch_name}")
                     continue
                 # 纯黑图像过滤
                 if np.mean(patch) < 10.10:
-                    print(f"纯黑图像过滤: {patch_name}")
+                    #print(f"纯黑图像过滤: {patch_name}")
                     continue
                 cv2.imwrite(os.path.join(output_path_root, '0', patch_name), patch)
-                print(f"保存图块: {patch_name}到{os.path.join(output_path_root, '0', patch_name)}")
-
+                #print(f"保存图块: {patch_name}到{os.path.join(output_path_root, '0', patch_name)}")
+        print(f"处理图片: {img_path}完成")
+        # del patch, img
+        # gc.collect()
 
 if __name__ == '__main__':
     main()

+ 2 - 44
main.py

@@ -1,46 +1,4 @@
-import cv2
+import torch
+from torchvision.models import resnet50, ResNet50_Weights
 
 
-img_path = 'qingche.jpg'
-
-bgr_img = cv2.resize(cv2.imread(img_path), (512, 512))
-if bgr_img is None:
-    raise ValueError(f"无法读取图像: {img_path}")
-b_channel, g_channel, r_channel = cv2.split(bgr_img)
-
-def mouse_callback(event, x, y, flags, param):
-    """鼠标回调函数:点击时显示灰度值"""
-    if event == cv2.EVENT_LBUTTONDOWN:
-        # 获取灰度值(灰度图中直接取值)
-        img = param.get('img')
-        window = param.get('window')
-        gray_value = img[y, x]
-        # 在彩色原图的副本上显示文本
-        text = f'Gray: {gray_value}'
-        cv2.putText(img, text, (x, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
-        cv2.imshow(window, img)
-
-# 创建窗口并绑定回调函数
-for pic, name in [(b_channel, 'B Channel'),(g_channel, 'G Channel'),(r_channel, 'R Channel')]:
-    img = pic
-    window = name
-    params = {
-        'img':img,
-        'window':window
-    }
-    cv2.namedWindow(window)
-    cv2.setMouseCallback(window, mouse_callback, param=params)
-
-# 显示图像并等待退出
-while True:
-    cv2.imshow('B Channel', b_channel)
-    cv2.imshow('G Channel', g_channel)
-    cv2.imshow('R Channel', r_channel)
-    cv2.imshow('rgb', bgr_img)
-    key = cv2.waitKey(1) & 0xFF
-    if key == ord('q') or key == 27:  # 按'q'或ESC键退出
-        break
-
-cv2.destroyAllWindows()
-pass
-

+ 10 - 0
run.bash

@@ -0,0 +1,10 @@
+python train.py --model swin_v2_b
+sleep 60
+python train.py --model swin_v2_s
+sleep 60
+python train.py --model squeezenet
+sleep 60
+python train.py --model shufflenet
+sleep 60
+python train.py --model resnet50
+

BIN
runs/turbidity_classification/events.out.tfevents.1766071975.240.2967095.0


BIN
runs/turbidity_classification/events.out.tfevents.1766081826.240.3294804.0


+ 195 - 0
test.py

@@ -0,0 +1,195 @@
+import torch
+import torch.nn as nn
+from torchvision import transforms
+from torchvision.models import resnet18, resnet50
+import numpy as np
+from PIL import Image
+import os
+import argparse
+from labelme.utils import draw_grid
+import cv2
+import matplotlib.pyplot as plt
+from dotenv import load_dotenv
+load_dotenv()
+# os.environ['CUDA_LAUNCH_BLOCKING'] = '0'
+patch_w = int(os.getenv('PATCH_WIDTH', 256))
+patch_h = int(os.getenv('PATCH_HEIGHT', 256))
+confidence_threshold = float(os.getenv('CONFIDENCE_THRESHOLD', 0.80))
+scale = 2
+
+
+class Predictor:
+    def __init__(self, model_name, weights_path, num_classes):
+        self.model_name = model_name
+        self.weights_path = weights_path
+        self.num_classes = num_classes
+        self.model = None
+        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
+        print(f"当前设备: {self.device}")
+        # 加载模型
+        self.load_model()
+
+        # 检查模型结构
+        print(self.model)
+
+
+    def load_model(self):
+        if self.model is not None:
+            return
+        print(f"正在加载模型: {self.model_name}")
+        if self.model_name == 'resnet18':
+            self.model = resnet18(weights=None)
+        elif self.model_name == 'resnet50':
+            self.model = resnet50(weights=None)
+        else:
+            raise ValueError(f"不支持的模型类型: {self.model_name}")
+        # 修改最后的全连接层
+        self.model.fc = nn.Linear(self.model.fc.in_features, self.num_classes)
+        # 加载训练好的权重
+        self.model.load_state_dict(torch.load(self.weights_path, map_location=torch.device('cpu')))
+        print(f"成功加载模型参数: {self.weights_path}")
+        # 将模型移动到GPU
+        self.model.eval()
+        self.model = self.model.to(self.device)
+        print(f"成功加载模型: {self.model_name}")
+
+    def predict(self, image_tensor):
+        """
+        对单张图像进行预测
+
+        Args:
+            image_tensor: 预处理后的图像张量
+
+        Returns:
+            predicted_class: 预测的类别索引
+            confidence: 预测置信度
+            probabilities: 各类别的概率
+        """
+
+        image_tensor = image_tensor.to(self.device)
+
+        with torch.no_grad():
+            outputs = self.model(image_tensor)
+            probabilities = torch.nn.functional.softmax(outputs, dim=1)  # 沿行计算softmax
+            confidence, predicted_class = torch.max(probabilities, 1)
+
+        return confidence.cpu().numpy(), predicted_class.cpu().numpy()
+
+
+def preprocess_image(img):
+    """
+    预处理图像以匹配训练时的预处理
+    
+    Args:
+        img: PIL图像
+        
+    Returns:
+        tensor: 预处理后的图像张量
+    """
+    # 定义与训练时相同的预处理步骤
+    transform = transforms.Compose([
+        transforms.Resize((224, 224)),
+        transforms.ToTensor(),
+        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
+    ])
+
+    # 打开并转换图像
+
+    img_w, img_h = img.size
+    global patch_w, patch_h
+    imgs_patch = []
+    imgs_index = []
+    # fig, axs = plt.subplots(img_h // patch_h + 1, img_w // patch_w + 1)
+    for i in range(img_h // patch_h + 1):
+        for j in range(img_w // patch_w + 1):
+            left = j * patch_w  # 裁剪区域左边框距离图像左边的像素值
+            top = i * patch_h  # 裁剪区域上边框距离图像上边的像素值
+            right = min(j * patch_w + patch_w, img_w)  # 裁剪区域右边框距离图像左边的像素值
+            bottom = min(i * patch_h + patch_h, img_h)  # 裁剪区域下边框距离图像上边的像素值
+            # 检查区域是否有效
+            if right > left and bottom > top:
+                patch = img.crop((left, top, right, bottom))
+                # 长宽比过滤
+                # rate = patch.height / (patch.width + 1e-6)
+                # if rate > 1.314 or rate < 0.75:
+                #     # print(f"长宽比过滤: {patch_name}")
+                #     continue
+                imgs_patch.append(patch)
+                imgs_index.append((left, top))
+                # axs[i, j].imshow(patch)
+                # axs[i, j].set_title(f'Image {i} {j}')
+                # axs[i, j].axis('off')
+
+    # plt.tight_layout()
+    # plt.show()
+    imgs_patch = torch.stack([transform(img) for img in imgs_patch])
+    # 添加批次维度
+    # image_tensor = image_tensor.unsqueeze(0)
+    return imgs_index, imgs_patch
+
+
+
+def visualize_prediction(image_path, predicted_class, confidence, class_names):
+    """
+    可视化预测结果
+    
+    Args:
+        image_path: 图像路径
+        predicted_class: 预测的类别索引
+        confidence: 预测置信度
+        class_names: 类别名称列表
+    """
+    image = Image.open(image_path).convert('RGB')
+    
+    plt.figure(figsize=(8, 6))
+    plt.imshow(image)
+    plt.axis('off')
+    plt.title(f'Predicted: {class_names[predicted_class]}\n'
+              f'Confidence: {confidence:.4f}', fontsize=14)
+    plt.show()
+
+
+def main():
+
+    # 初始化模型实例
+    predictor = Predictor(model_name='resnet50',
+                          weights_path=r'D:\code\water_turbidity_det\resnet50_best_model_acc.pth',
+                          num_classes=2)
+    input_path = r'D:\code\water_turbidity_det\data\video1_20251129120104_20251129123102'
+    # 预处理图像
+    all_imgs = os.listdir(input_path)
+    all_imgs = [os.path.join(input_path, p) for p in all_imgs if p.split('.')[-1] in ['jpg', 'png']]
+    for img_path in all_imgs:
+        image = Image.open(img_path).convert('RGB')
+        patches_index, image_tensor = preprocess_image(image)
+        confidence, predicted_class  = predictor.predict(image_tensor)
+        # 第一层虚警抑制,置信度过滤,低于阈值将会被忽略
+        for i in range(len(confidence)):
+            if confidence[i] < confidence_threshold:
+                confidence[i] = 1.0
+                predicted_class[i] = 0
+        # 第二层虚警抑制,空间滤波
+
+        predicted_class_mat = np.resize(predicted_class, (image.height//patch_h+1, image.width//patch_w+1))
+        # 可视化预测结果
+        image = cv2.imread(img_path)
+        image = draw_grid(image, patch_w, patch_h)
+        dw = patch_w // 2
+        dh = patch_h // 2
+        resized_img_h = image.shape[0] // 2
+        resized_img_w = image.shape[1] // 2
+        for i, (idx_w, idx_h) in enumerate(patches_index):
+            cv2.circle(image, (idx_w, idx_h), 10, (0, 255, 0), -1)
+            text1 = f'cls:{predicted_class[i]}'
+            text2 = f'prob:{confidence[i]*100:.1f}%'
+            color = (0, 0, 255) if predicted_class[i] else (255, 0, 0)
+            cv2.putText(image, text1, (idx_w, idx_h + dh),
+                        cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, 2)
+            cv2.putText(image, text2, (idx_w, idx_h + dh +25),
+                        cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, 2)
+        image = cv2.resize(image, (resized_img_w, resized_img_h))
+        cv2.imshow('image', image)
+        cv2.waitKey(20)
+
+if __name__ == "__main__":
+    main()

+ 292 - 0
train.py

@@ -0,0 +1,292 @@
+# 微调pytorch的预训练模型,在自己的数据上训练,完成分类任务。
+import time
+
+import torch
+import torch.nn as nn
+import torch.optim as optim
+from torch.utils.data import DataLoader
+import torchvision.transforms as transforms
+from torchvision.datasets import ImageFolder
+from torchvision.models import resnet18, ResNet18_Weights,resnet50,ResNet50_Weights, squeezenet1_0, SqueezeNet1_0_Weights,\
+    shufflenet_v2_x1_0, ShuffleNet_V2_X1_0_Weights, swin_v2_s, Swin_V2_S_Weights, swin_v2_b, Swin_V2_B_Weights
+import matplotlib.pyplot as plt
+import numpy as np
+from torch.utils.tensorboard import SummaryWriter  # 添加 TensorBoard 支持
+from datetime import datetime
+import os
+os.environ['CUDA_VISIBLE_DEVICES'] = '1'
+
+class Trainer:
+    def __init__(self, batch_size, train_dir, val_dir, name, checkpoint):
+        # 初始化 TensorBoard writer
+        self.name = name
+        self.checkpoint = checkpoint
+        # 获取当前时间戳
+        timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
+        log_dir = f'runs/turbidity_{self.name}_{timestamp}'
+        self.writer = SummaryWriter(log_dir)
+
+        # 定义数据增强和处理
+        self.train_transforms = transforms.Compose([
+            transforms.Resize((256, 256)),  # 调整图像大小为256x256 (ResNet输入尺寸)
+            transforms.RandomHorizontalFlip(p=0.5),  # 随机水平翻转,增加数据多样性
+            transforms.RandomRotation(10),  # 随机旋转±10度
+            transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0, hue=0),  # 颜色抖动
+            transforms.ToTensor(),  # 转换为tensor并归一化到[0,1]
+            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # ImageNet标准化
+        ])
+
+        # 测试集基础变换
+        self.val_transforms = transforms.Compose([
+            transforms.Resize((256, 256)),
+            transforms.ToTensor(),
+            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
+        ])
+
+        # 创建数据集对象
+        self.train_dataset = ImageFolder(root=train_dir, transform=self.train_transforms)
+        self.val_dataset = ImageFolder(root=val_dir, transform=self.val_transforms)
+
+        # 创建数据加载器 (Windows环境下设置num_workers=0避免多进程问题)
+        self.batch_size = batch_size
+        self.train_loader = DataLoader(self.train_dataset, batch_size=batch_size, shuffle=True, num_workers=10)
+        self.val_loader = DataLoader(self.val_dataset, batch_size=batch_size, shuffle=False, num_workers=10)
+        # 获取类别数量
+        self.num_classes = len(self.train_dataset.classes)
+        print(f"发现 {self.num_classes} 个类别: {self.train_dataset.classes}")
+
+        # 加载模型
+        if name == 'resnet50':
+            self.weights = ResNet50_Weights.IMAGENET1K_V2
+            self.model = resnet50(weights=self.weights)
+        elif name == 'squeezenet':
+            self.weights = SqueezeNet1_0_Weights.IMAGENET1K_V1
+            self.model = squeezenet1_0(weights=self.weights)
+        elif name == 'shufflenet':
+            self.weights = ShuffleNet_V2_X1_0_Weights.IMAGENET1K_V1
+            self.model = shufflenet_v2_x1_0(weights=self.weights)
+        elif name == 'swin_v2_s':
+            self.weights = Swin_V2_S_Weights.IMAGENET1K_V1
+            self.model = swin_v2_s(weights=self.weights)
+        elif name == 'swin_v2_b':
+            self.weights = Swin_V2_B_Weights.IMAGENET1K_V1
+            self.model = swin_v2_b(weights=self.weights)
+        else:
+            raise ValueError(f"Invalid model name: {name}")
+        print(self.model)
+        # 冻结特征提取层,只训练最后几层,
+        for param in self.model.parameters():
+            param.requires_grad = False
+
+        # 替换最后的分类层以适应新的分类任务
+        if hasattr(self.model, 'fc'):
+            # ResNet系列模型
+            self.model.fc = nn.Sequential(
+                nn.Linear(int(self.model.fc.in_features), int(self.model.fc.in_features) // 2, bias=True),
+                nn.ReLU(inplace=True),
+                nn.Dropout(0.5),
+                nn.Linear(int(self.model.fc.in_features) // 2, self.num_classes, bias=False)
+            )
+        elif hasattr(self.model, 'classifier'):
+            # Swin Transformer等模型
+            self.model.classifier = nn.Sequential(
+                nn.Linear(int(self.model.classifier.in_features), int(self.model.classifier.in_features) // 2,
+                          bias=True),
+                nn.ReLU(inplace=True),
+                nn.Dropout(0.5),
+                nn.Linear(int(self.model.classifier.in_features) // 2, self.num_classes, bias=False)
+            )
+        elif hasattr(self.model, 'head'):
+            # Swin Transformer使用head层
+            in_features = self.model.head.in_features
+            self.model.head = nn.Sequential(
+                nn.Linear(int(in_features), int(in_features) // 2, bias=True),
+                nn.ReLU(inplace=True),
+                nn.Dropout(0.5),
+                nn.Linear(int(in_features) // 2, self.num_classes, bias=False)
+            )
+        else:
+            raise ValueError(f"Model {name} does not have recognizable classifier layer")
+
+        # 将模型移动到GPU
+        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
+        self.model = self.model.to(self.device)
+
+        # 定义损失函数
+        self.loss = nn.CrossEntropyLoss()  # 多分类常用的交叉熵损失
+
+        # 定义优化器
+        # 只更新requires_grad=True的参数
+        self.optimizer = optim.Adam(self.model.parameters(), lr=1e-3, weight_decay=1e-4)
+
+        # 基于验证损失动态调整,更智能
+        self.scheduler = optim.lr_scheduler.ReduceLROnPlateau(
+            self.optimizer, mode='min', factor=0.5, patience=5, min_lr=1e-7
+        )
+
+    def train_model(self):
+        """
+        单轮训练函数
+
+        Args:
+
+        Returns:
+            average_loss: 平均损失
+            accuracy: 准确率
+        """
+        self.model.train()  # 设置模型为训练模式(启用dropout/batchnorm等)
+        running_loss = 0.0
+        correct_predictions = 0
+        total_samples = 0
+
+        # 遍历训练数据
+        for inputs, labels in self.train_loader:
+            # 将数据移到指定设备上
+            inputs = inputs.to(self.device)
+            labels = labels.to(self.device)
+
+            # 清零梯度缓存
+            self.optimizer.zero_grad()
+
+            # 前向传播
+            outputs = self.model(inputs)
+            loss = self.loss(outputs, labels)
+
+            # 反向传播
+            loss.backward()
+
+            # 更新参数
+            self.optimizer.step()
+
+            # 统计信息
+            running_loss += loss.item() * inputs.size(0)
+            _, predicted = torch.max(outputs.data, 1)
+            total_samples += labels.size(0)
+            correct_predictions += (predicted == labels).sum().item()
+
+        epoch_loss = running_loss / len(self.train_loader.dataset)
+        epoch_acc = correct_predictions / total_samples
+        return epoch_loss, epoch_acc
+
+
+    def validate_model(self):
+        """
+        验证模型性能
+
+        Args:
+        Returns:
+            average_loss: 平均损失
+            accuracy: 准确率
+        """
+        self.model.eval()  # 设置模型为评估模式(关闭dropout/batchnorm等)
+
+        running_loss = 0.0
+        correct_predictions = 0
+        total_samples = 0
+
+        # 不计算梯度,提高推理速度
+        with torch.no_grad():
+            for inputs, labels in self.val_loader:
+                inputs = inputs.to(self.device)
+                labels = labels.to(self.device)
+
+                outputs = self.model(inputs)
+                loss = self.loss(outputs, labels)
+
+                running_loss += loss.item() * inputs.size(0)
+                _, predicted = torch.max(outputs.data, 1)
+                total_samples += labels.size(0)
+                correct_predictions += (predicted == labels).sum().item()
+
+        epoch_loss = running_loss / len(self.val_loader.dataset)
+        epoch_acc = correct_predictions / total_samples
+
+        return epoch_loss, epoch_acc
+
+
+    def train_and_validate(self, num_epochs=25):
+        """
+        训练和验证
+
+        Args:
+            num_epochs: 训练轮数
+
+        Returns:
+            train_losses: 每轮训练损失
+            train_accuracies: 每轮训练准确率
+            val_losses: 每轮验证损失
+            val_accuracies: 每轮验证准确率
+        """
+        # 存储训练过程中的指标
+        train_losses = []
+        train_accuracies = []
+        val_losses = []
+        val_accuracies = []
+
+        best_val_acc = 0.0
+        best_val_loss = float('inf')
+
+        print("开始训练...")
+        for epoch in range(num_epochs):
+            print(f'Epoch {epoch + 1}/{num_epochs}')
+            print('-' * 20)
+
+            # 训练阶段
+            train_loss, train_acc = self.train_model()
+            print(f'Train Loss: {train_loss:.4f} Acc: {train_acc:.4f}')
+
+            # 验证阶段
+            val_loss, val_acc = self.validate_model()
+            print(f'Val Loss: {val_loss:.4f} Acc: {val_acc:.4f}')
+
+            # 学习率调度
+            self.scheduler.step(val_loss)
+
+            # 记录指标到 TensorBoard
+            self.writer.add_scalar('Loss/Train', train_loss, epoch)
+            self.writer.add_scalar('Loss/Validation', val_loss, epoch)
+            self.writer.add_scalar('Accuracy/Train', train_acc, epoch)
+            self.writer.add_scalar('Accuracy/Validation', val_acc, epoch)
+            self.writer.add_scalar('Learning Rate', self.scheduler.get_last_lr()[0], epoch)
+
+            # 记录指标
+            train_losses.append(train_loss)
+            train_accuracies.append(train_acc)
+            val_losses.append(val_loss)
+            val_accuracies.append(val_acc)
+
+            # 保存最佳模型 (基于验证准确率)
+            if val_acc > best_val_acc:
+                best_val_acc = val_acc
+                torch.save(self.model.state_dict(), f'{self.name}_best_model_acc.pth')
+                print(f"保存了新的最佳准确率模型,验证准确率: {best_val_acc:.4f}")
+            
+            # 保存最低验证损失模型
+            
+            if val_loss < best_val_loss:
+                best_val_loss = val_loss
+                torch.save(self.model.state_dict(), f'{self.name}_best_model_loss.pth')
+                print(f"保存了新的最低损失模型,验证损失: {best_val_loss:.4f}")
+
+
+        # 关闭 TensorBoard writer
+        self.writer.close()
+        
+        print(f"训练完成! 最佳验证准确率: {best_val_acc:.4f}, 最低验证损失: {best_val_loss:.4f}")
+        return train_losses, train_accuracies, val_losses, val_accuracies
+
+if __name__ == '__main__':
+    # 开始训练
+    import argparse
+    parser = argparse.ArgumentParser('预训练模型调参')
+    parser.add_argument('--train_dir',default='./label_data/train',help='help')
+    parser.add_argument('--val_dir', default='./label_data/test',help='help')
+    parser.add_argument('--model', default='resnet18',help='help')
+    args = parser.parse_args()
+    num_epochs = 100
+    trainer = Trainer(batch_size=64,
+                      train_dir=args.train_dir,
+                      val_dir=args.val_dir,
+                      name=args.model,
+                      checkpoint=False)
+    train_losses, train_accuracies, val_losses, val_accuracies = trainer.train_and_validate(num_epochs)