import torch import numpy as np import gymnasium as gym from gymnasium import spaces from stable_baselines3 import PPO from stable_baselines3.common.callbacks import BaseCallback import torch.optim as optim from gat import GAT from data_trainer import DataTrainer class GATEnv(gym.Env): metadata = {'render_modes': ['human'], 'render_fps': 4} def __init__(self, preprocessor, train_loader, val_loader, adj, args, logger): super().__init__() self.preprocessor = preprocessor self.train_loader = train_loader self.val_loader = val_loader # 使用指定设备(支持GPU) self.eval_device = torch.device(args.device) self.adj = adj.to(self.eval_device) self.args = args self.logger = logger self.action_space = spaces.Box( low=np.array([1e-5, 32, 2, 0.1], dtype=np.float32), high=np.array([1e-2, 128, 8, 0.5], dtype=np.float32), shape=(4,), dtype=np.float32 ) self.observation_space = spaces.Box( low=np.array([1e-5, 32, 2, 0.1, 0], dtype=np.float32), high=np.array([1e-2, 128, 8, 0.5, 100], dtype=np.float32), shape=(5,), dtype=np.float32 ) self.best_val_loss = float('inf') self.current_step = 0 self.max_steps = args.rl_max_steps self.render_mode = None def reset(self, seed=None, options=None): super().reset(seed=seed) self.current_step = 0 self.best_val_loss = float('inf') self.current_state = np.array([ float(self.args.lr), float(self.args.hidden_dim), float(self.args.num_heads), float(self.args.dropout), 10.0 ], dtype=np.float32) return self.current_state, {} def step(self, action): self.current_step += 1 lr = float(action[0]) hidden_dim = int(round(float(action[1]))) num_heads = int(round(float(action[2]))) dropout = float(action[3]) hidden_dim = max(32, min(128, hidden_dim)) num_heads = max(2, min(8, num_heads)) dropout = max(0.1, min(0.5, dropout)) # 在指定设备上构建与评估(支持GPU) model = GAT( nfeat=1, nhid=hidden_dim, noutput=self.args.num_targets, dropout=dropout, nheads=num_heads, alpha=0.2 ).to(self.eval_device) optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=self.args.weight_decay) # 使用指定设备 rl_args = self.args trainer = DataTrainer(model, rl_args, self.preprocessor, optimizer, logger=self.logger) val_loss = self._short_evaluate(trainer) reward = 1.0 / (1.0 + val_loss) if val_loss < self.best_val_loss: reward += 0.5 self.best_val_loss = val_loss self.current_state = np.array([lr, hidden_dim, num_heads, dropout, val_loss], dtype=np.float32) terminated = self.current_step >= self.max_steps truncated = False return self.current_state, float(reward), terminated, truncated, {} def _short_evaluate(self, trainer): """ 关键加速:只用极少 batch 做快速近似,保证一个 env.step() 在毫秒到秒级完成。 """ # 训练 1 个 batch、重复 2 次以产生可用梯度信号 for _ in range(2): trainer.train_epoch(self.train_loader, self.adj, max_batches=1) # 验证用 2 个 batch,降低方差 val_loss, _ = trainer.validate(self.val_loader, self.adj, max_batches=2) return float(val_loss) def render(self): if self.render_mode == 'human': print(f"[RL] Step: {self.current_step}, Best Val Loss: {self.best_val_loss:.6f}") def close(self): pass class TrainingCallback(BaseCallback): def __init__(self, verbose=0, print_every=100): super().__init__(verbose) self.print_every = print_every def _on_step(self) -> bool: # BaseCallback.logger 不是 logging.Logger;用 print 或 record。 if self.n_calls % self.print_every == 0: # 某些版本下 self.locals 里没有 'rewards' 键,做个健壮保护 rew = None try: r = self.locals.get('rewards', None) if r is not None: rew = float(r[0]) except Exception: pass print(f"[RL] timesteps={self.num_timesteps} calls={self.n_calls} reward={rew}") return True class RLOptimizer: def __init__(self, args, preprocessor, train_loader, val_loader, adj, logger): self.args = args self.preprocessor = preprocessor self.train_loader = train_loader self.val_loader = val_loader self.adj = adj self.logger = logger def optimize(self): env = GATEnv( self.preprocessor, self.train_loader, self.val_loader, self.adj, self.args, self.logger ) # 关键:将 PPO rollout 和训练配置调小,避免一次 rollout 等太久 model = PPO( "MlpPolicy", env, verbose=1, learning_rate=3e-4, n_steps=32, # 原来 2048 -> 32 batch_size=32, # 原来 64 -> 32 n_epochs=1, # 原来 10 -> 1 gamma=0.99, gae_lambda=0.95, clip_range=0.2, ent_coef=0.01, device=self.args.device # 使用指定设备(支持GPU) ) self.logger.info("开始训练强化学习智能体...") callback = TrainingCallback(verbose=1, print_every=100) model.learn(total_timesteps=self.args.rl_timesteps, callback=callback) model.save("gat_ppo_agent") # 评估并选最优动作 self.logger.info("寻找最优超参数组合...") best_reward = -1.0 best_action = None eval_env = GATEnv( self.preprocessor, self.train_loader, self.val_loader, self.adj, self.args, self.logger ) for _ in range(self.args.rl_eval_episodes): obs, _ = eval_env.reset() action, _ = model.predict(obs, deterministic=True) _, reward, _, _, _ = eval_env.step(action) if reward > best_reward: best_reward = reward best_action = action best_hparams = { 'lr': float(best_action[0]), 'hidden_dim': int(round(float(best_action[1]))), 'num_heads': int(round(float(best_action[2]))), 'dropout': float(best_action[3]) } self.logger.info(f"\n最优超参数: {best_hparams}") return best_hparams