| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- # 微调pytorch的预训练模型,在自己的数据上训练,完成分类任务。
- import time
- import torch
- import torch.nn as nn
- import torch.optim as optim
- from torch.utils.data import DataLoader
- import torchvision.transforms as transforms
- from torchvision.datasets import ImageFolder
- from model.model_zoon import load_model
- from torch.utils.tensorboard import SummaryWriter # 添加 TensorBoard 支持
- from datetime import datetime
- import os
- from dotenv import load_dotenv
- load_dotenv()
- os.environ['CUDA_VISIBLE_DEVICES'] = os.getenv('CUDA_VISIBLE_DEVICES', '0')
- # 读取并打印.env文件中的变量
- def print_env_variables():
- print("从.env文件加载的变量:")
- env_vars = {
- 'PATCH_WIDTH': os.getenv('PATCH_WIDTH'),
- 'PATCH_HEIGHT': os.getenv('PATCH_HEIGHT'),
- 'CONFIDENCE_THRESHOLD': os.getenv('CONFIDENCE_THRESHOLD'),
- 'IMG_INPUT_SIZE': os.getenv('IMG_INPUT_SIZE'),
- 'WORKERS': os.getenv('WORKERS'),
- 'CUDA_VISIBLE_DEVICES': os.getenv('CUDA_VISIBLE_DEVICES')
- }
- for var, value in env_vars.items():
- print(f"{var}: {value}")
- class Trainer:
- def __init__(self, batch_size, train_dir, val_dir, name, checkpoint:bool=False):
- # 定义一些参数
- self.name = name # 采用的模型名称
- self.img_size = int(os.getenv('IMG_INPUT_SIZE', 224)) # 输入图片尺寸
- self.batch_size = batch_size # 批次大小
- self.cls_map = {"0": "non-muddy", "1":"muddy"} # 类别名称映射词典
- self.imagenet = os.getenv('PRETRAINED', True) # 是否使用ImageNet预训练权重
- # 训练设备 - 优先使用GPU,如果不可用则使用CPU
- if torch.cuda.is_available():
- try:
- # 尝试进行简单的CUDA操作以确认CUDA功能正常
- _ = torch.zeros(1).cuda()
- self.device = torch.device("cuda")
- print("成功检测到CUDA设备,使用GPU进行训练")
- except Exception as e:
- print(f"CUDA设备存在问题: {e},回退到CPU")
- self.device = torch.device("cpu")
- else:
- self.device = torch.device("cpu")
- print("CUDA不可用,使用CPU进行训练")
- self.__global_step = 0
- self.best_val_acc = 0.0
- self.epoch = 0
- self.best_val_loss = float('inf')
- self.checkpoint_root_path = './checkpoints'
- self.best_acc_model_path = os.path.join(self.checkpoint_root_path , f'{self.name}_best_model_acc.pth')
- self.best_loss_model_path = os.path.join(self.checkpoint_root_path , f'{self.name}_best_model_loss.pth')
- self.latest_checkpoint_path = os.path.join(self.checkpoint_root_path , f'{self.name}_latest_checkpoint.pth')
- self.workers = int(os.getenv('WORKERS', 0))
- # 创建日志目录
- timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") # 获取当前时间戳
- log_dir = f'runs/turbidity_{self.name}_{timestamp}'# 按照模型的名称和时间戳创建日志目录
- self.writer = SummaryWriter(log_dir) # 创建 TensorBoard writer
- # 定义数据增强和预处理层
- self.train_transforms = transforms.Compose([
- transforms.Resize((self.img_size, self.img_size)), # 调整图像大小为256x256 (ResNet输入尺寸)
- transforms.RandomHorizontalFlip(p=0.3), # 随机水平翻转,增加数据多样性
- transforms.RandomVerticalFlip(p=0.3), # 随机垂直翻转,增加数据多样性
- transforms.RandomGrayscale(p=0.25), # 随机灰度化,增加数据多样性
- transforms.RandomRotation(10), # 随机旋转±10度
- transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0, hue=0), # 颜色抖动
- transforms.ToTensor(), # 转换为tensor并归一化到[0,1]
- transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # ImageNet标准化
- ])
- # 测试集基础变换
- self.val_transforms = transforms.Compose([
- transforms.Resize((self.img_size, self.img_size)),
- transforms.ToTensor(),
- transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
- ])
- # 创建数据集对象
- self.train_dataset = ImageFolder(root=train_dir, transform=self.train_transforms)
- print(f"训练样本数量: {len(self.train_dataset)}")
- self.val_dataset = ImageFolder(root=val_dir, transform=self.val_transforms)
- print(f"验证样本数量: {len(self.val_dataset)}")
- # 创建数据加载器 (Windows环境下设置num_workers=0避免多进程问题)
- self.train_loader = DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True, num_workers=self.workers) # 可迭代对象,返回(inputs_tensor, label)
- self.val_loader = DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.workers)
- # 获取类别数量
- self.num_classes = len(self.train_dataset.classes)
- print(f"自动发现 {self.num_classes} 个类别:")
- # 打印类别和名称
- for cls in self.train_dataset.classes:
- print(f"{cls}: {self.cls_map.get(cls,'None')}")
- # 打印训练集和测试集图像数量
- print(f"训练集图像数量: {len(self.train_dataset)}")
- print(f"验证集图像数量: {len(self.val_dataset)}")
- # 创建模型
- self.model = load_model(name=self.name, imagenet=self.imagenet, num_classes=self.num_classes, device=self.device)
- # 定义损失函数
- self.loss = nn.CrossEntropyLoss() # 多分类常用的交叉熵损失
- # 定义优化器
- # 只更新requires_grad=True的参数
- self.optimizer = optim.Adam(self.model.parameters(), lr=1e-3, weight_decay=1e-4)
- # 基于验证损失动态调整,更智能
- self.scheduler = optim.lr_scheduler.ReduceLROnPlateau(
- self.optimizer, mode='min', factor=0.5, patience=5, min_lr=1e-7,cooldown=2
- )
- # 加载检查点
- if checkpoint and os.path.exists(self.latest_checkpoint_path):
- self.load_checkpoint()
- def save_checkpoint(self):
- if not os.path.exists(self.checkpoint_root_path ):
- os.makedirs(self.checkpoint_root_path )
- checkpoint = {
- 'epoch': self.epoch,
- 'model_state_dict': self.model.state_dict(),
- 'optimizer_state_dict': self.optimizer.state_dict(),
- 'scheduler_state_dict': self.scheduler.state_dict(),
- 'best_val_acc': self.best_val_acc,
- 'best_val_loss': self.best_val_loss
- }
- torch.save(checkpoint, self.latest_checkpoint_path)
- print(f"已保存检查点到 {self.latest_checkpoint_path}")
- def load_checkpoint(self, from_where='latest'):
- checkpoint = torch.load(self.latest_checkpoint_path)
- self.model.load_state_dict(checkpoint['model_state_dict'])
- self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
- self.scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
- self.best_val_acc = checkpoint['best_val_acc']
- self.best_val_loss = checkpoint['best_val_loss']
- self.epoch = checkpoint['epoch'] + 1
- print(f"从 {self.latest_checkpoint_path} 加载检查点")
- def train_step(self):
- """
- 单轮训练函数
- Args:
- Returns:
- average_loss: 平均损失
- accuracy: 准确率
- """
- self.model.train() # 设置模型为训练模式(启用dropout/batchnorm等)
- epoch_loss = 0.0 # epoch损失
- correct_predictions = 0.0 # 预测正确的样本数
- total_samples = 0.0 # 已经训练的总样本
- # 遍历训练数据
- for inputs, labels in self.train_loader:
- # 将数据移到指定设备上
- inputs = inputs.to(self.device) # b c h w
- labels = labels.to(self.device) # b,
- # 清零梯度缓存
- self.optimizer.zero_grad()
- # 前向传播
- outputs = self.model(inputs) # b, 2
- loss = self.loss(outputs, labels) # 标量
- # 反向传播
- loss.backward()
- # 更新参数
- self.optimizer.step()
- # 统计信息
- batch_loss = loss.item() * inputs.size(0) # 批损失
- self.writer.add_scalar('Batch_Loss/Train', batch_loss, self.__global_step)
- print(f'Training | Batch Loss: {batch_loss:.4f}\t', end='\r',flush=True)
- epoch_loss += batch_loss # epoch损失
- _, predicted = torch.max(outputs.data, 1) # predicted b,存储的是预测的类别,最大值的位置
- total_samples += labels.size(0) # 统计总样本数量
- correct_predictions += (predicted == labels).sum().item() # 统计预测正确的样本数
- epoch_loss = epoch_loss / len(self.train_loader.dataset)
- epoch_acc = correct_predictions / total_samples
- return epoch_loss, epoch_acc
- def val_step(self):
- """
- 验证模型性能
- Args:
- Returns:
- average_loss: 平均损失
- accuracy: 准确率
- """
- self.model.eval() # 设置模型为评估模式(关闭dropout/batchnorm等)
- epoch_loss = 0.0
- correct_predictions = 0.
- total_samples = 0.
- # 不计算梯度,提高推理速度
- with torch.no_grad():
- for inputs, labels in self.val_loader:
- inputs = inputs.to(self.device)
- labels = labels.to(self.device)
- outputs = self.model(inputs)
- loss = self.loss(outputs, labels)
- epoch_loss += loss.item() * inputs.size(0)
- _, predicted = torch.max(outputs.data, 1)
- total_samples += labels.size(0)
- correct_predictions += (predicted == labels).sum().item()
- epoch_loss = epoch_loss / len(self.val_loader.dataset) # 平均损失
- epoch_acc = correct_predictions / total_samples
- return epoch_loss, epoch_acc
- def train_and_validate(self, num_epochs=25):
- """
- 训练和验证
- Args:
- num_epochs: 训练轮数
- Returns:
- train_losses: 每轮训练损失
- train_accuracies: 每轮训练准确率
- val_losses: 每轮验证损失
- val_accuracies: 每轮验证准确率
- """
- # 在你的代码中调用
- print_env_variables()
- print("开始训练...")
- for epoch in range(self.epoch, num_epochs):
- print(f'Epoch {epoch + 1}/{num_epochs}')
- print('-' * 20)
- # 单步训练
- train_loss, train_acc = self.train_step()
- print(f'Train Loss: {train_loss:.4f} Acc: {train_acc:.4f}')
- # 验证阶段
- val_loss, val_acc = self.val_step()
- print(f'Val Loss: {val_loss:.4f} Acc: {val_acc:.4f}')
- # 学习率调度
- self.scheduler.step(val_loss)
- # 记录指标到 TensorBoard
- self.writer.add_scalar('Loss/Train', train_loss, epoch)
- self.writer.add_scalar('Loss/Validation', val_loss, epoch)
- self.writer.add_scalar('Accuracy/Train', train_acc, epoch)
- self.writer.add_scalar('Accuracy/Validation', val_acc, epoch)
- self.writer.add_scalar('Learning Rate', self.optimizer.param_groups[0]['lr'], epoch)
- # 保存最佳模型 (基于验证准确率)
- if val_acc > self.best_val_acc:
- best_val_acc = val_acc
- torch.save(self.model.state_dict(), f'{self.name}_best_model_acc.pth')
- print(f"保存了新的最佳准确率模型,验证准确率: {best_val_acc:.4f}")
-
- # 保存最低验证损失模型
- if val_loss < self.best_val_loss:
- best_val_loss = val_loss
- torch.save(self.model.state_dict(), f'{self.name}_best_model_loss.pth')
- print(f"保存了新的最低损失模型,验证损失: {best_val_loss:.4f}")
- self.save_checkpoint()
- self.epoch += 1
- # 关闭 TensorBoard writer
- self.writer.close()
-
- print(f"训练完成! 最佳验证准确率: {self.best_val_acc:.4f}, 最低验证损失: {self.best_val_loss:.4f}")
- return 1
- if __name__ == '__main__':
- # 开始训练
- import argparse
- parser = argparse.ArgumentParser('预训练模型调参')
- parser.add_argument('--train_dir',default='./label_data/train',help='help')
- parser.add_argument('--val_dir', default='./label_data/test',help='help')
- parser.add_argument('--model', default='squeezenet',help='help')
- parser.add_argument('--resume', action='store_true',help='是否恢复继续训练')
- args = parser.parse_args()
- num_epochs = 100
- trainer = Trainer(batch_size=int(os.getenv('BATCH_SIZE', 32)),
- train_dir=args.train_dir,
- val_dir=args.val_dir,
- name=args.model,
- checkpoint=args.resume)
- trainer.train_and_validate(num_epochs)
|