# 微调pytorch的预训练模型,在自己的数据上训练,完成分类任务。 import time import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader import torchvision.transforms as transforms from torchvision.datasets import ImageFolder from torchvision.models import resnet18,resnet50, squeezenet1_0,shufflenet_v2_x1_0,shufflenet_v2_x2_0 from torch.utils.tensorboard import SummaryWriter # 添加 TensorBoard 支持 from datetime import datetime import os from dotenv import load_dotenv load_dotenv() os.environ['CUDA_VISIBLE_DEVICES'] = os.getenv('CUDA_VISIBLE_DEVICES', '0') # 读取并打印.env文件中的变量 def print_env_variables(): print("从.env文件加载的变量:") env_vars = { 'PATCH_WIDTH': os.getenv('PATCH_WIDTH'), 'PATCH_HEIGHT': os.getenv('PATCH_HEIGHT'), 'CONFIDENCE_THRESHOLD': os.getenv('CONFIDENCE_THRESHOLD'), 'IMG_INPUT_SIZE': os.getenv('IMG_INPUT_SIZE'), 'WORKERS': os.getenv('WORKERS'), 'CUDA_VISIBLE_DEVICES': os.getenv('CUDA_VISIBLE_DEVICES') } for var, value in env_vars.items(): print(f"{var}: {value}") class Trainer: def __init__(self, batch_size, train_dir, val_dir, name, checkpoint): # 定义一些参数 self.name = name # 采用的模型名称 self.img_size = int(os.getenv('IMG_INPUT_SIZE', 224)) # 输入图片尺寸 self.batch_size = batch_size # 批次大小 self.cls_map = {"0": "non-muddy", "1":"muddy"} # 类别名称映射词典 self.imagenet = os.getenv('PRETRAINED', True) # 是否使用ImageNet预训练权重 # 训练设备 - 优先使用GPU,如果不可用则使用CPU if torch.cuda.is_available(): try: # 尝试进行简单的CUDA操作以确认CUDA功能正常 _ = torch.zeros(1).cuda() self.device = torch.device("cuda") print("成功检测到CUDA设备,使用GPU进行训练") except Exception as e: print(f"CUDA设备存在问题: {e},回退到CPU") self.device = torch.device("cpu") else: self.device = torch.device("cpu") print("CUDA不可用,使用CPU进行训练") self.checkpoint = checkpoint self.__global_step = 0 self.workers = int(os.getenv('WORKERS', 0)) # 创建日志目录 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") # 获取当前时间戳 log_dir = f'runs/turbidity_{self.name}_{timestamp}'# 按照模型的名称和时间戳创建日志目录 self.writer = SummaryWriter(log_dir) # 创建 TensorBoard writer # 定义数据增强和预处理层 self.train_transforms = transforms.Compose([ transforms.Resize((self.img_size, self.img_size)), # 调整图像大小为256x256 (ResNet输入尺寸) transforms.RandomHorizontalFlip(p=0.3), # 随机水平翻转,增加数据多样性 transforms.RandomVerticalFlip(p=0.3), # 随机垂直翻转,增加数据多样性 transforms.RandomGrayscale(p=0.25), # 随机灰度化,增加数据多样性 transforms.RandomRotation(10), # 随机旋转±10度 transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0, hue=0), # 颜色抖动 transforms.ToTensor(), # 转换为tensor并归一化到[0,1] transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # ImageNet标准化 ]) # 测试集基础变换 self.val_transforms = transforms.Compose([ transforms.Resize((self.img_size, self.img_size)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) # 创建数据集对象 self.train_dataset = ImageFolder(root=train_dir, transform=self.train_transforms) self.val_dataset = ImageFolder(root=val_dir, transform=self.val_transforms) # 创建数据加载器 (Windows环境下设置num_workers=0避免多进程问题) self.train_loader = DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True, num_workers=self.workers) # 可迭代对象,返回(inputs_tensor, label) self.val_loader = DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.workers) # 获取类别数量 self.num_classes = len(self.train_dataset.classes) print(f"自动发现 {self.num_classes} 个类别:") # 打印类别和名称 for cls in self.train_dataset.classes: print(f"{cls}: {self.cls_map.get(cls,'None')}") # 打印训练集和测试集图像数量 print(f"训练集图像数量: {len(self.train_dataset)}") print(f"验证集图像数量: {len(self.val_dataset)}") # 创建模型 self.model = None self.model = self.__load_model() # 定义损失函数 self.loss = nn.CrossEntropyLoss() # 多分类常用的交叉熵损失 # 定义优化器 # 只更新requires_grad=True的参数 self.optimizer = optim.Adam(self.model.parameters(), lr=1e-3, weight_decay=1e-4) # 基于验证损失动态调整,更智能 self.scheduler = optim.lr_scheduler.ReduceLROnPlateau( self.optimizer, mode='min', factor=0.5, patience=5, min_lr=1e-7,cooldown=2 ) def __load_model(self): """加载模型结构""" # 加载模型 pretrained = True if self.imagenet else False if self.name == 'resnet50': self.model = resnet50(pretrained=pretrained) elif self.name == 'squeezenet': self.model = squeezenet1_0(pretrained=pretrained) elif self.name == 'shufflenet' or self.name == 'shufflenet-x1': self.model = shufflenet_v2_x1_0(pretrained=pretrained) elif self.name == 'shufflenet-x2': self.model = shufflenet_v2_x2_0(pretrained=False) self.imagenet = False print('shufflenet-x2无预训练权重,重新训练所有权重') else: raise ValueError(f"Invalid model name: {self.name}") # 如果采用预训练的神经网络,就需要冻结特征提取层,只训练最后几层 if self.imagenet: for param in self.model.parameters(): param.requires_grad = False # 替换最后的分类层以适应新的分类任务 if hasattr(self.model, 'fc'): # ResNet系列模型 self.model.fc = nn.Linear(int(self.model.fc.in_features), self.num_classes, bias=True) elif hasattr(self.model, 'classifier'): # Swin Transformer等模型 self.model.classifier = nn.Linear(int(self.model.classifier.in_features), self.num_classes, bias=True) elif hasattr(self.model, 'head'): # Swin Transformer使用head层 self.model.head = nn.Linear(int(self.model.head.in_features), self.num_classes, bias=True) else: raise ValueError(f"Model {self.name} does not have recognizable classifier layer") print(self.model) print(f'模型{self.name}结构已经加载,移动到设备{self.device}') # 将模型移动到GPU/cpu self.model = self.model.to(self.device) return self.model def train_step(self): """ 单轮训练函数 Args: Returns: average_loss: 平均损失 accuracy: 准确率 """ self.model.train() # 设置模型为训练模式(启用dropout/batchnorm等) epoch_loss = 0.0 # epoch损失 correct_predictions = 0.0 # 预测正确的样本数 total_samples = 0.0 # 已经训练的总样本 # 遍历训练数据 for inputs, labels in self.train_loader: # 将数据移到指定设备上 inputs = inputs.to(self.device) # b c h w labels = labels.to(self.device) # b, # 清零梯度缓存 self.optimizer.zero_grad() # 前向传播 outputs = self.model(inputs) # b, 2 loss = self.loss(outputs, labels) # 标量 # 反向传播 loss.backward() # 更新参数 self.optimizer.step() # 统计信息 batch_loss = loss.item() * inputs.size(0) # 批损失 self.writer.add_scalar('Batch_Loss/Train', batch_loss, self.__global_step) print(f'Training | Batch Loss: {batch_loss:.4f}\t', end='\r',flush=True) epoch_loss += batch_loss # epoch损失 _, predicted = torch.max(outputs.data, 1) # predicted b,存储的是预测的类别,最大值的位置 total_samples += labels.size(0) # 统计总样本数量 correct_predictions += (predicted == labels).sum().item() # 统计预测正确的样本数 epoch_loss = epoch_loss / len(self.train_loader.dataset) epoch_acc = correct_predictions / total_samples return epoch_loss, epoch_acc def val_step(self): """ 验证模型性能 Args: Returns: average_loss: 平均损失 accuracy: 准确率 """ self.model.eval() # 设置模型为评估模式(关闭dropout/batchnorm等) epoch_loss = 0.0 correct_predictions = 0. total_samples = 0. # 不计算梯度,提高推理速度 with torch.no_grad(): for inputs, labels in self.val_loader: inputs = inputs.to(self.device) labels = labels.to(self.device) outputs = self.model(inputs) loss = self.loss(outputs, labels) epoch_loss += loss.item() * inputs.size(0) _, predicted = torch.max(outputs.data, 1) total_samples += labels.size(0) correct_predictions += (predicted == labels).sum().item() epoch_loss = epoch_loss / len(self.val_loader.dataset) # 平均损失 epoch_acc = correct_predictions / total_samples return epoch_loss, epoch_acc def train_and_validate(self, num_epochs=25): """ 训练和验证 Args: num_epochs: 训练轮数 Returns: train_losses: 每轮训练损失 train_accuracies: 每轮训练准确率 val_losses: 每轮验证损失 val_accuracies: 每轮验证准确率 """ best_val_acc = 0.0 best_val_loss = float('inf') # 在你的代码中调用 print_env_variables() print("开始训练...") for epoch in range(num_epochs): print(f'Epoch {epoch + 1}/{num_epochs}') print('-' * 20) # 单步训练 train_loss, train_acc = self.train_step() print(f'Train Loss: {train_loss:.4f} Acc: {train_acc:.4f}') # 验证阶段 val_loss, val_acc = self.val_step() print(f'Val Loss: {val_loss:.4f} Acc: {val_acc:.4f}') # 学习率调度 self.scheduler.step(val_loss) # 记录指标到 TensorBoard self.writer.add_scalar('Loss/Train', train_loss, epoch) self.writer.add_scalar('Loss/Validation', val_loss, epoch) self.writer.add_scalar('Accuracy/Train', train_acc, epoch) self.writer.add_scalar('Accuracy/Validation', val_acc, epoch) self.writer.add_scalar('Learning Rate', self.optimizer.param_groups[0]['lr'], epoch) # 保存最佳模型 (基于验证准确率) if val_acc > best_val_acc: best_val_acc = val_acc torch.save(self.model.state_dict(), f'{self.name}_best_model_acc.pth') print(f"保存了新的最佳准确率模型,验证准确率: {best_val_acc:.4f}") # 保存最低验证损失模型 if val_loss < best_val_loss: best_val_loss = val_loss torch.save(self.model.state_dict(), f'{self.name}_best_model_loss.pth') print(f"保存了新的最低损失模型,验证损失: {best_val_loss:.4f}") # 关闭 TensorBoard writer self.writer.close() print(f"训练完成! 最佳验证准确率: {best_val_acc:.4f}, 最低验证损失: {best_val_loss:.4f}") return 1 if __name__ == '__main__': # 开始训练 import argparse parser = argparse.ArgumentParser('预训练模型调参') parser.add_argument('--train_dir',default='./label_data/train',help='help') parser.add_argument('--val_dir', default='./label_data/test',help='help') parser.add_argument('--model', default='shufflenet',help='help') args = parser.parse_args() num_epochs = 100 trainer = Trainer(batch_size=int(os.getenv('BATCH_SIZE', 32)), train_dir=args.train_dir, val_dir=args.val_dir, name=args.model, checkpoint=False) trainer.train_and_validate(num_epochs)