# 微调pytorch的预训练模型,在自己的数据上训练,完成分类任务。 import time import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader import torchvision.transforms as transforms from torchvision.datasets import ImageFolder from model.model_zoon import load_model from torch.utils.tensorboard import SummaryWriter # 添加 TensorBoard 支持 from datetime import datetime import os from dotenv import load_dotenv load_dotenv() os.environ['CUDA_VISIBLE_DEVICES'] = os.getenv('CUDA_VISIBLE_DEVICES', '0') # 读取并打印.env文件中的变量 def print_env_variables(): print("从.env文件加载的变量:") env_vars = { 'PATCH_WIDTH': os.getenv('PATCH_WIDTH'), 'PATCH_HEIGHT': os.getenv('PATCH_HEIGHT'), 'CONFIDENCE_THRESHOLD': os.getenv('CONFIDENCE_THRESHOLD'), 'IMG_INPUT_SIZE': os.getenv('IMG_INPUT_SIZE'), 'WORKERS': os.getenv('WORKERS'), 'CUDA_VISIBLE_DEVICES': os.getenv('CUDA_VISIBLE_DEVICES') } for var, value in env_vars.items(): print(f"{var}: {value}") class Trainer: def __init__(self, batch_size, train_dir, val_dir, name, checkpoint:bool=False): # 定义一些参数 self.name = name # 采用的模型名称 self.img_size = int(os.getenv('IMG_INPUT_SIZE', 224)) # 输入图片尺寸 self.batch_size = batch_size # 批次大小 self.cls_map = {"0": "non-muddy", "1":"muddy"} # 类别名称映射词典 self.imagenet = os.getenv('PRETRAINED', True) # 是否使用ImageNet预训练权重 # 训练设备 - 优先使用GPU,如果不可用则使用CPU if torch.cuda.is_available(): try: # 尝试进行简单的CUDA操作以确认CUDA功能正常 _ = torch.zeros(1).cuda() self.device = torch.device("cuda") print("成功检测到CUDA设备,使用GPU进行训练") except Exception as e: print(f"CUDA设备存在问题: {e},回退到CPU") self.device = torch.device("cpu") else: self.device = torch.device("cpu") print("CUDA不可用,使用CPU进行训练") self.__global_step = 0 self.best_val_acc = 0.0 self.epoch = 0 self.best_val_loss = float('inf') self.checkpoint_root_path = './checkpoints' self.best_acc_model_path = os.path.join(self.checkpoint_root_path , f'{self.name}_best_model_acc.pth') self.best_loss_model_path = os.path.join(self.checkpoint_root_path , f'{self.name}_best_model_loss.pth') self.latest_checkpoint_path = os.path.join(self.checkpoint_root_path , f'{self.name}_latest_checkpoint.pth') self.workers = int(os.getenv('WORKERS', 0)) # 创建日志目录 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") # 获取当前时间戳 log_dir = f'runs/turbidity_{self.name}_{timestamp}'# 按照模型的名称和时间戳创建日志目录 self.writer = SummaryWriter(log_dir) # 创建 TensorBoard writer # 定义数据增强和预处理层 self.train_transforms = transforms.Compose([ transforms.Resize((self.img_size, self.img_size)), # 调整图像大小为256x256 (ResNet输入尺寸) transforms.RandomHorizontalFlip(p=0.3), # 随机水平翻转,增加数据多样性 transforms.RandomVerticalFlip(p=0.3), # 随机垂直翻转,增加数据多样性 transforms.RandomGrayscale(p=0.25), # 随机灰度化,增加数据多样性 transforms.RandomRotation(10), # 随机旋转±10度 transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0, hue=0), # 颜色抖动 transforms.ToTensor(), # 转换为tensor并归一化到[0,1] transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # ImageNet标准化 ]) # 测试集基础变换 self.val_transforms = transforms.Compose([ transforms.Resize((self.img_size, self.img_size)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) # 创建数据集对象 self.train_dataset = ImageFolder(root=train_dir, transform=self.train_transforms) print(f"训练样本数量: {len(self.train_dataset)}") self.val_dataset = ImageFolder(root=val_dir, transform=self.val_transforms) print(f"验证样本数量: {len(self.val_dataset)}") # 创建数据加载器 (Windows环境下设置num_workers=0避免多进程问题) self.train_loader = DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True, num_workers=self.workers) # 可迭代对象,返回(inputs_tensor, label) self.val_loader = DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.workers) # 获取类别数量 self.num_classes = len(self.train_dataset.classes) print(f"自动发现 {self.num_classes} 个类别:") # 打印类别和名称 for cls in self.train_dataset.classes: print(f"{cls}: {self.cls_map.get(cls,'None')}") # 打印训练集和测试集图像数量 print(f"训练集图像数量: {len(self.train_dataset)}") print(f"验证集图像数量: {len(self.val_dataset)}") # 创建模型 self.model = load_model(name=self.name, imagenet=self.imagenet, num_classes=self.num_classes, device=self.device) # 定义损失函数 self.loss = nn.CrossEntropyLoss() # 多分类常用的交叉熵损失 # 定义优化器 # 只更新requires_grad=True的参数 self.optimizer = optim.Adam(self.model.parameters(), lr=1e-3, weight_decay=1e-4) # 基于验证损失动态调整,更智能 self.scheduler = optim.lr_scheduler.ReduceLROnPlateau( self.optimizer, mode='min', factor=0.5, patience=5, min_lr=1e-7,cooldown=2 ) # 加载检查点 if checkpoint and os.path.exists(self.latest_checkpoint_path): self.load_checkpoint() def save_checkpoint(self): if not os.path.exists(self.checkpoint_root_path ): os.makedirs(self.checkpoint_root_path ) checkpoint = { 'epoch': self.epoch, 'model_state_dict': self.model.state_dict(), 'optimizer_state_dict': self.optimizer.state_dict(), 'scheduler_state_dict': self.scheduler.state_dict(), 'best_val_acc': self.best_val_acc, 'best_val_loss': self.best_val_loss } torch.save(checkpoint, self.latest_checkpoint_path) print(f"已保存检查点到 {self.latest_checkpoint_path}") def load_checkpoint(self, from_where='latest'): checkpoint = torch.load(self.latest_checkpoint_path) self.model.load_state_dict(checkpoint['model_state_dict']) self.optimizer.load_state_dict(checkpoint['optimizer_state_dict']) self.scheduler.load_state_dict(checkpoint['scheduler_state_dict']) self.best_val_acc = checkpoint['best_val_acc'] self.best_val_loss = checkpoint['best_val_loss'] self.epoch = checkpoint['epoch'] + 1 print(f"从 {self.latest_checkpoint_path} 加载检查点") def train_step(self): """ 单轮训练函数 Args: Returns: average_loss: 平均损失 accuracy: 准确率 """ self.model.train() # 设置模型为训练模式(启用dropout/batchnorm等) epoch_loss = 0.0 # epoch损失 correct_predictions = 0.0 # 预测正确的样本数 total_samples = 0.0 # 已经训练的总样本 # 遍历训练数据 for inputs, labels in self.train_loader: # 将数据移到指定设备上 inputs = inputs.to(self.device) # b c h w labels = labels.to(self.device) # b, # 清零梯度缓存 self.optimizer.zero_grad() # 前向传播 outputs = self.model(inputs) # b, 2 loss = self.loss(outputs, labels) # 标量 # 反向传播 loss.backward() # 更新参数 self.optimizer.step() # 统计信息 batch_loss = loss.item() * inputs.size(0) # 批损失 self.writer.add_scalar('Batch_Loss/Train', batch_loss, self.__global_step) print(f'Training | Batch Loss: {batch_loss:.4f}\t', end='\r',flush=True) epoch_loss += batch_loss # epoch损失 _, predicted = torch.max(outputs.data, 1) # predicted b,存储的是预测的类别,最大值的位置 total_samples += labels.size(0) # 统计总样本数量 correct_predictions += (predicted == labels).sum().item() # 统计预测正确的样本数 epoch_loss = epoch_loss / len(self.train_loader.dataset) epoch_acc = correct_predictions / total_samples return epoch_loss, epoch_acc def val_step(self): """ 验证模型性能 Args: Returns: average_loss: 平均损失 accuracy: 准确率 """ self.model.eval() # 设置模型为评估模式(关闭dropout/batchnorm等) epoch_loss = 0.0 correct_predictions = 0. total_samples = 0. # 不计算梯度,提高推理速度 with torch.no_grad(): for inputs, labels in self.val_loader: inputs = inputs.to(self.device) labels = labels.to(self.device) outputs = self.model(inputs) loss = self.loss(outputs, labels) epoch_loss += loss.item() * inputs.size(0) _, predicted = torch.max(outputs.data, 1) total_samples += labels.size(0) correct_predictions += (predicted == labels).sum().item() epoch_loss = epoch_loss / len(self.val_loader.dataset) # 平均损失 epoch_acc = correct_predictions / total_samples return epoch_loss, epoch_acc def train_and_validate(self, num_epochs=25): """ 训练和验证 Args: num_epochs: 训练轮数 Returns: train_losses: 每轮训练损失 train_accuracies: 每轮训练准确率 val_losses: 每轮验证损失 val_accuracies: 每轮验证准确率 """ # 在你的代码中调用 print_env_variables() print("开始训练...") for epoch in range(self.epoch, num_epochs): print(f'Epoch {epoch + 1}/{num_epochs}') print('-' * 20) # 单步训练 train_loss, train_acc = self.train_step() print(f'Train Loss: {train_loss:.4f} Acc: {train_acc:.4f}') # 验证阶段 val_loss, val_acc = self.val_step() print(f'Val Loss: {val_loss:.4f} Acc: {val_acc:.4f}') # 学习率调度 self.scheduler.step(val_loss) # 记录指标到 TensorBoard self.writer.add_scalar('Loss/Train', train_loss, epoch) self.writer.add_scalar('Loss/Validation', val_loss, epoch) self.writer.add_scalar('Accuracy/Train', train_acc, epoch) self.writer.add_scalar('Accuracy/Validation', val_acc, epoch) self.writer.add_scalar('Learning Rate', self.optimizer.param_groups[0]['lr'], epoch) # 保存最佳模型 (基于验证准确率) if val_acc > self.best_val_acc: best_val_acc = val_acc torch.save(self.model.state_dict(), f'{self.name}_best_model_acc.pth') print(f"保存了新的最佳准确率模型,验证准确率: {best_val_acc:.4f}") # 保存最低验证损失模型 if val_loss < self.best_val_loss: best_val_loss = val_loss torch.save(self.model.state_dict(), f'{self.name}_best_model_loss.pth') print(f"保存了新的最低损失模型,验证损失: {best_val_loss:.4f}") self.save_checkpoint() self.epoch += 1 # 关闭 TensorBoard writer self.writer.close() print(f"训练完成! 最佳验证准确率: {self.best_val_acc:.4f}, 最低验证损失: {self.best_val_loss:.4f}") return 1 if __name__ == '__main__': # 开始训练 import argparse parser = argparse.ArgumentParser('预训练模型调参') parser.add_argument('--train_dir',default='./label_data/train',help='help') parser.add_argument('--val_dir', default='./label_data/test',help='help') parser.add_argument('--model', default='squeezenet',help='help') parser.add_argument('--resume', action='store_true',help='是否恢复继续训练') args = parser.parse_args() num_epochs = 100 trainer = Trainer(batch_size=int(os.getenv('BATCH_SIZE', 32)), train_dir=args.train_dir, val_dir=args.val_dir, name=args.model, checkpoint=args.resume) trainer.train_and_validate(num_epochs)