""" 因果推理模型主程序(Causal Inference Main Program) 本程序实现了基于强化学习优化的图注意力网络训练流程,用于工业时间序列预测。 整个系统分为三个核心阶段: 1. 数据预处理阶段: 数据加载、清洗、降噪、归一化、图构建 2. RL超参数优化阶段: 使用PPO算法自动搜索最优超参数 3. 最终训练评估阶段: 使用最优参数训练模型并在测试集上评估 核心特点: - 自动化超参数优化: 无需手动调参,RL智能体自动寻找最优配置 - 有向图注意力: 建模特征间的因果关系,支持非对称影响 - 小波降噪预处理: 提升数据质量,增强模型精度 - 完善的监控机制: 日志记录、早停、学习率调度、模型保存 技术栈: - PyTorch: 深度学习框架 - Stable-Baselines3: 强化学习库(PPO算法) - PyWavelets: 小波变换库 - Scikit-learn: 数据预处理 工作流程: main() → 数据预处理 → RL优化超参数 → 训练最终模型 → 测试评估 """ import torch.optim as optim from args import get_args from data_preprocessor import DataPreprocessor from gat import GAT from data_trainer import DataTrainer from rl_optimizer import RLOptimizer import logging import os def setup_logger(args): """ 配置日志系统 功能: 创建并配置日志记录器,同时输出到控制台和文件。 日志文件以训练数据文件数量命名,便于区分不同实验。 参数: args: 命令行参数对象 - args.num_files: 数据文件数量,用于日志文件命名 返回: logging.Logger: 配置好的日志记录器 日志级别: INFO: 记录关键步骤和指标信息 输出位置: - 控制台: 实时查看训练进度 - 文件: logs/training_{num_files}.log,便于事后分析 日志格式: 时间戳 - 记录器名称 - 日志级别 - 消息内容 示例: 2025-01-10 10:30:45 - GAT-Training - INFO - 开始训练 技术要点: - 自动创建logs目录 - 文件和控制台使用相同的格式化器 - 避免重复添加处理器 """ # 创建日志目录(如果不存在) if not os.path.exists('logs'): os.makedirs('logs') # 创建日志记录器 logger = logging.getLogger('GAT-Training') logger.setLevel(logging.INFO) # 文件处理器: 将日志写入文件 file_handler = logging.FileHandler(f'logs/training_{args.num_files}.log') file_handler.setLevel(logging.INFO) # 控制台处理器: 将日志输出到终端 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) # 格式化器: 定义日志消息的格式 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) # 添加处理器到记录器 logger.addHandler(file_handler) # 添加文件处理器 logger.addHandler(console_handler) # 添加控制台处理器 return logger def main(): """ 主程序入口 功能: 协调整个训练流程,包括数据预处理、RL优化、模型训练和测试评估。 这是整个系统的控制中心,按顺序执行各个阶段的任务。 执行流程: 第一阶段: 数据预处理 1. 加载50个CSV数据文件 2. 时间特征分解(年月日时分秒) 3. 小波降噪(db4小波,1层分解) 4. 数据归一化(StandardScaler) 5. 划分训练集/验证集/测试集(70%/10%/20%) 6. 构建有向图邻接矩阵(相关性阈值0.3) 第二阶段: RL超参数优化 1. 创建GATEnv强化学习环境 2. 使用PPO算法训练5000时间步 3. 搜索最优超参数(lr, hidden_dim, num_heads, dropout) 4. 快速评估策略(1-2个batch)加速收敛 5. 选择奖励最高的超参数组合 第三阶段: 最终模型训练 1. 使用最优超参数创建GAT模型 2. 配置Adam优化器和学习率调度器 3. 训练最多100轮,早停耐心20轮 4. 保存最佳模型和最终模型 5. 生成训练曲线图 第四阶段: 测试评估 1. 加载最佳模型 2. 在测试集上评估性能 3. 计算归一化和原始尺度的MSE/MAE/RMSE 4. 生成预测对比图 输出文件: 日志文件: - logs/training_{num_files}.log 归一化器: - scalers/features_scaler.joblib - scalers/targets_scaler.joblib 模型文件: - models/best_model.pth (验证损失最低的模型) - models/final_model.pth (训练完成后的最终模型) - gat_ppo_agent (RL优化器模型) 可视化图表: - plots/loss_curve.png (训练/验证损失曲线) - plots/mae_curve.png (训练/验证MAE曲线) - plots/prediction_examples.png (预测vs真实值对比) 关键技术: 1. RL自动调参: 避免手动网格搜索,智能寻优 2. 有向图建模: 捕捉特征间的因果关系 3. 小波降噪: 提升数据质量 4. 早停机制: 防止过拟合 5. 学习率调度: 自适应调整学习率 性能优化: - GPU加速: 自动检测并使用CUDA - 梯度裁剪: 防止梯度爆炸 - Dropout正则化: 防止过拟合 - ReduceLROnPlateau: 验证损失停滞时降低学习率 使用示例: >>> python main.py # 使用默认参数训练 >>> python main.py --num_files 30 --epochs 50 # 自定义参数训练 """ # ========== 阶段0: 初始化配置 ========== # 获取命令行参数(或使用默认值) args = get_args() # 配置日志系统 logger = setup_logger(args) logger.info(f"使用设备: {args.device}") logger.info("=" * 80) logger.info("因果推理模型训练系统启动") logger.info("=" * 80) # ========== 阶段1: 数据预处理 ========== logger.info("\n" + "=" * 80) logger.info("阶段1: 数据预处理") logger.info("=" * 80) # 创建数据预处理器 preprocessor = DataPreprocessor(args, logger) # 执行完整的预处理流程 # 返回: train_loader(训练数据加载器), val_loader(验证数据加载器), # test_loader(测试数据加载器), preprocessor(预处理器对象) train_loader, val_loader, test_loader, preprocessor = preprocessor.preprocess() logger.info("数据预处理完成!") # 创建有向图邻接矩阵 # 基于特征相关性构建图结构,相关性>0.3的特征对之间建立有向边 adj = preprocessor.create_adjacency_matrix() logger.info(f"邻接矩阵形状: {adj.shape}") logger.info(f"边的数量: {int(adj.sum())}") # ========== 阶段2: RL超参数优化 ========== logger.info("\n" + "=" * 80) logger.info("阶段2: 强化学习超参数优化") logger.info("=" * 80) logger.info("使用PPO算法搜索最优超参数...") # 创建RL优化器 # 在环境中评估不同的超参数组合,找到使验证损失最小的配置 rl_optimizer = RLOptimizer(args, preprocessor, train_loader, val_loader, adj, logger) # 执行优化,返回最优超参数字典 # best_hparams包含: lr(学习率), hidden_dim(隐藏层维度), # num_heads(注意力头数), dropout(dropout率) best_hparams = rl_optimizer.optimize() logger.info(f"最优超参数: {best_hparams}") # ========== 阶段3: 使用最优超参数训练最终模型 ========== logger.info("\n" + "=" * 80) logger.info("阶段3: 训练最终模型") logger.info("=" * 80) logger.info("使用RL优化得到的最优超参数...") # 创建GAT模型,使用最优超参数 final_model = GAT( nfeat=1, # 输入特征维度(每个节点1维) nhid=best_hparams['hidden_dim'], # 隐藏层维度(RL优化得到) noutput=args.num_targets, # 输出维度(47个目标变量) dropout=best_hparams['dropout'], # Dropout率(RL优化得到) nheads=best_hparams['num_heads'], # 注意力头数(RL优化得到) alpha=0.2 # LeakyReLU斜率(固定值) ).to(args.device) # 移动到GPU(如果可用) logger.info(f"模型结构: nfeat=1, nhid={best_hparams['hidden_dim']}, " f"noutput={args.num_targets}, dropout={best_hparams['dropout']}, " f"nheads={best_hparams['num_heads']}") # 配置优化器 # Adam优化器: 自适应学习率,使用RL优化得到的学习率 optimizer = optim.Adam( final_model.parameters(), lr=best_hparams['lr'], # 学习率(RL优化得到) weight_decay=args.weight_decay # L2正则化系数 ) logger.info(f"优化器: Adam(lr={best_hparams['lr']}, weight_decay={args.weight_decay})") # 配置学习率调度器 # ReduceLROnPlateau: 当验证损失停滞时,将学习率降低一半 scheduler = optim.lr_scheduler.ReduceLROnPlateau( optimizer, mode='min', # 监控指标越小越好(损失函数) factor=0.5, # 降低因子(新lr = 旧lr * 0.5) patience=10, # 容忍10轮无改善 verbose=True # 打印学习率变化信息 ) logger.info("学习率调度器: ReduceLROnPlateau(factor=0.5, patience=10)") # 创建训练器 # 负责模型训练、验证、测试和可视化 trainer = DataTrainer( model=final_model, args=args, preprocessor=preprocessor, optimizer=optimizer, scheduler=scheduler, logger=logger ) # 执行训练 # 训练最多100轮,使用早停机制(耐心20轮) # 自动保存最佳模型(验证损失最低)和最终模型 logger.info("开始训练循环...") trained_model = trainer.train(train_loader, val_loader, adj) logger.info("模型训练完成!") # ========== 阶段4: 在测试集上评估 ========== logger.info("\n" + "=" * 80) logger.info("阶段4: 测试集评估") logger.info("=" * 80) logger.info("在测试集上评估最终模型性能...") # 测试模型性能 # 返回归一化和原始尺度的MSE/MAE/RMSE指标 test_results = trainer.test(test_loader, adj) # 打印最终结果摘要 logger.info("\n" + "=" * 80) logger.info("训练完成总结") logger.info("=" * 80) logger.info(f"最优超参数: {best_hparams}") logger.info(f"测试集性能(归一化):") logger.info(f" - MSE: {test_results['normalized_mse']:.6f}") logger.info(f" - MAE: {test_results['normalized_mae']:.6f}") logger.info(f" - RMSE: {test_results['normalized_rmse']:.6f}") logger.info(f"测试集性能(原始尺度):") logger.info(f" - MSE: {test_results['original_mse']:.6f}") logger.info(f" - MAE: {test_results['original_mae']:.6f}") logger.info(f" - RMSE: {test_results['original_rmse']:.6f}") logger.info("=" * 80) logger.info("所有任务完成!") logger.info("=" * 80) if __name__ == "__main__": """ 程序入口点 直接运行此文件时执行main()函数。 支持命令行参数自定义配置,详见args.py。 运行方式: python main.py # 使用默认参数 python main.py --epochs 50 # 自定义训练轮数 python main.py --num_files 30 # 自定义数据文件数量 """ main()