| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- import torch
- import numpy as np
- import gymnasium as gym
- from gymnasium import spaces
- from stable_baselines3 import PPO
- from stable_baselines3.common.callbacks import BaseCallback
- import torch.optim as optim
- from gat import GAT
- from data_trainer import DataTrainer
- class GATEnv(gym.Env):
- metadata = {'render_modes': ['human'], 'render_fps': 4}
-
- def __init__(self, preprocessor, train_loader, val_loader, adj, args, logger):
- super().__init__()
- self.preprocessor = preprocessor
- self.train_loader = train_loader
- self.val_loader = val_loader
- # 使用指定设备(支持GPU)
- self.eval_device = torch.device(args.device)
- self.adj = adj.to(self.eval_device)
- self.args = args
- self.logger = logger
-
- self.action_space = spaces.Box(
- low=np.array([1e-5, 32, 2, 0.1], dtype=np.float32),
- high=np.array([1e-2, 128, 8, 0.5], dtype=np.float32),
- shape=(4,),
- dtype=np.float32
- )
- self.observation_space = spaces.Box(
- low=np.array([1e-5, 32, 2, 0.1, 0], dtype=np.float32),
- high=np.array([1e-2, 128, 8, 0.5, 100], dtype=np.float32),
- shape=(5,),
- dtype=np.float32
- )
- self.best_val_loss = float('inf')
- self.current_step = 0
- self.max_steps = args.rl_max_steps
- self.render_mode = None
-
- def reset(self, seed=None, options=None):
- super().reset(seed=seed)
- self.current_step = 0
- self.best_val_loss = float('inf')
- self.current_state = np.array([
- float(self.args.lr),
- float(self.args.hidden_dim),
- float(self.args.num_heads),
- float(self.args.dropout),
- 10.0
- ], dtype=np.float32)
- return self.current_state, {}
-
- def step(self, action):
- self.current_step += 1
- lr = float(action[0])
- hidden_dim = int(round(float(action[1])))
- num_heads = int(round(float(action[2])))
- dropout = float(action[3])
-
- hidden_dim = max(32, min(128, hidden_dim))
- num_heads = max(2, min(8, num_heads))
- dropout = max(0.1, min(0.5, dropout))
-
- # 在指定设备上构建与评估(支持GPU)
- model = GAT(
- nfeat=1,
- nhid=hidden_dim,
- noutput=self.args.num_targets,
- dropout=dropout,
- nheads=num_heads,
- alpha=0.2
- ).to(self.eval_device)
- optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=self.args.weight_decay)
-
- # 使用指定设备
- rl_args = self.args
- trainer = DataTrainer(model, rl_args, self.preprocessor, optimizer, logger=self.logger)
- val_loss = self._short_evaluate(trainer)
-
- reward = 1.0 / (1.0 + val_loss)
- if val_loss < self.best_val_loss:
- reward += 0.5
- self.best_val_loss = val_loss
-
- self.current_state = np.array([lr, hidden_dim, num_heads, dropout, val_loss], dtype=np.float32)
- terminated = self.current_step >= self.max_steps
- truncated = False
- return self.current_state, float(reward), terminated, truncated, {}
-
- def _short_evaluate(self, trainer):
- """
- 关键加速:只用极少 batch 做快速近似,保证一个 env.step() 在毫秒到秒级完成。
- """
- # 训练 1 个 batch、重复 2 次以产生可用梯度信号
- for _ in range(2):
- trainer.train_epoch(self.train_loader, self.adj, max_batches=1)
- # 验证用 2 个 batch,降低方差
- val_loss, _ = trainer.validate(self.val_loader, self.adj, max_batches=2)
- return float(val_loss)
-
- def render(self):
- if self.render_mode == 'human':
- print(f"[RL] Step: {self.current_step}, Best Val Loss: {self.best_val_loss:.6f}")
-
- def close(self):
- pass
- class TrainingCallback(BaseCallback):
- def __init__(self, verbose=0, print_every=100):
- super().__init__(verbose)
- self.print_every = print_every
-
- def _on_step(self) -> bool:
- # BaseCallback.logger 不是 logging.Logger;用 print 或 record。
- if self.n_calls % self.print_every == 0:
- # 某些版本下 self.locals 里没有 'rewards' 键,做个健壮保护
- rew = None
- try:
- r = self.locals.get('rewards', None)
- if r is not None:
- rew = float(r[0])
- except Exception:
- pass
- print(f"[RL] timesteps={self.num_timesteps} calls={self.n_calls} reward={rew}")
- return True
- class RLOptimizer:
- def __init__(self, args, preprocessor, train_loader, val_loader, adj, logger):
- self.args = args
- self.preprocessor = preprocessor
- self.train_loader = train_loader
- self.val_loader = val_loader
- self.adj = adj
- self.logger = logger
-
- def optimize(self):
- env = GATEnv(
- self.preprocessor, self.train_loader, self.val_loader,
- self.adj, self.args, self.logger
- )
-
- # 关键:将 PPO rollout 和训练配置调小,避免一次 rollout 等太久
- model = PPO(
- "MlpPolicy",
- env,
- verbose=1,
- learning_rate=3e-4,
- n_steps=32, # 原来 2048 -> 32
- batch_size=32, # 原来 64 -> 32
- n_epochs=1, # 原来 10 -> 1
- gamma=0.99,
- gae_lambda=0.95,
- clip_range=0.2,
- ent_coef=0.01,
- device=self.args.device # 使用指定设备(支持GPU)
- )
-
- self.logger.info("开始训练强化学习智能体...")
- callback = TrainingCallback(verbose=1, print_every=100)
- model.learn(total_timesteps=self.args.rl_timesteps, callback=callback)
- model.save("gat_ppo_agent")
-
- # 评估并选最优动作
- self.logger.info("寻找最优超参数组合...")
- best_reward = -1.0
- best_action = None
- eval_env = GATEnv(
- self.preprocessor, self.train_loader, self.val_loader,
- self.adj, self.args, self.logger
- )
- for _ in range(self.args.rl_eval_episodes):
- obs, _ = eval_env.reset()
- action, _ = model.predict(obs, deterministic=True)
- _, reward, _, _, _ = eval_env.step(action)
- if reward > best_reward:
- best_reward = reward
- best_action = action
-
- best_hparams = {
- 'lr': float(best_action[0]),
- 'hidden_dim': int(round(float(best_action[1]))),
- 'num_heads': int(round(float(best_action[2]))),
- 'dropout': float(best_action[3])
- }
- self.logger.info(f"\n最优超参数: {best_hparams}")
- return best_hparams
|