| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- import os
- import time
- import random
- import numpy as np
- import torch
- import gymnasium as gym
- from gymnasium import spaces
- from stable_baselines3 import DQN
- from stable_baselines3.common.monitor import Monitor
- from stable_baselines3.common.vec_env import DummyVecEnv
- from stable_baselines3.common.callbacks import BaseCallback
- from DQN_env import UFParams, UFSuperCycleEnv
- # ==== 定义强化学习超参数 ====
- class DQNParams:
- """
- DQN 超参数定义类
- 用于统一管理模型训练参数
- """
- # 学习率,控制神经网络更新步长
- learning_rate: float = 1e-4
- # 经验回放缓冲区大小(步数)
- buffer_size: int = 10000
- # 学习开始前需要收集的步数
- learning_starts: int = 200
- # 每次从经验池中采样的样本数量
- batch_size: int = 32
- # 折扣因子,越接近1越重视长期奖励
- gamma: float = 0.95
- # 每隔多少步训练一次
- train_freq: int = 4
- # 目标网络更新间隔
- target_update_interval: int = 2000
- # 初始探索率 ε
- exploration_initial_eps: float = 1.0
- # 从初始ε衰减到最终ε所占的训练比例
- exploration_fraction: float = 0.3
- # 最终探索率 ε
- exploration_final_eps: float = 0.02
- # 日志备注(用于区分不同实验)
- remark: str = "default"
- class UFEpisodeRecorder:
- """记录episode中的决策和结果"""
- def __init__(self):
- self.episode_data = []
- self.current_episode = []
- def record_step(self, obs, action, reward, done, info):
- """记录单步信息"""
- step_data = {
- "obs": obs.copy(),
- "action": action.copy(),
- "reward": reward,
- "done": done,
- "info": info.copy() if info else {}
- }
- self.current_episode.append(step_data)
- if done:
- self.episode_data.append(self.current_episode)
- self.current_episode = []
- def get_episode_stats(self, episode_idx=-1):
- """获取episode统计信息"""
- if not self.episode_data:
- return {}
- episode = self.episode_data[episode_idx]
- total_reward = sum(step["reward"] for step in episode)
- avg_recovery = np.mean([step["info"].get("recovery", 0) for step in episode if "recovery" in step["info"]])
- feasible_steps = sum(1 for step in episode if step["info"].get("feasible", False))
- return {
- "total_reward": total_reward,
- "avg_recovery": avg_recovery,
- "feasible_steps": feasible_steps,
- "total_steps": len(episode)
- }
- # ==== 定义强化学习训练回调器 ====
- class UFTrainingCallback(BaseCallback):
- """
- 强化学习训练回调,用于记录每一步的数据到 recorder。
- 1. 不依赖环境内部 last_* 属性
- 2. 使用环境接口提供的 obs、actions、rewards、dones、infos
- 3. 自动处理 episode 结束时的统计
- """
- def __init__(self, recorder, verbose=0):
- super(UFTrainingCallback, self).__init__(verbose)
- self.recorder = recorder
- def _on_step(self) -> bool:
- try:
- new_obs = self.locals.get("new_obs")
- actions = self.locals.get("actions")
- rewards = self.locals.get("rewards")
- dones = self.locals.get("dones")
- infos = self.locals.get("infos")
- if len(new_obs) > 0:
- step_obs = new_obs[0]
- step_action = actions[0] if actions is not None else None
- step_reward = rewards[0] if rewards is not None else 0.0
- step_done = dones[0] if dones is not None else False
- step_info = infos[0] if infos is not None else {}
- # 打印当前 step 的信息
- if self.verbose:
- print(f"[Step {self.num_timesteps}] 动作={step_action}, 奖励={step_reward:.3f}, Done={step_done}")
- # 记录数据
- self.recorder.record_step(
- obs=step_obs,
- action=step_action,
- reward=step_reward,
- done=step_done,
- info=step_info,
- )
- except Exception as e:
- if self.verbose:
- print(f"[Callback Error] {e}")
- return True
- class DQNTrainer:
- def __init__(self, env, params, callback=None):
- self.env = env
- self.params = params
- self.callback = callback
- self.log_dir = self._create_log_dir()
- self.model = self._create_model()
- def _create_log_dir(self):
- # 创建训练日志
- timestamp = time.strftime("%Y%m%d-%H%M%S")
- log_name = (
- f"DQN_lr{self.params.learning_rate}_buf{self.params.buffer_size}_bs{self.params.batch_size}"
- f"_gamma{self.params.gamma}_exp{self.params.exploration_fraction}"
- f"_{self.params.remark}_{timestamp}"
- )
- log_dir = os.path.join("./uf_dqn_tensorboard", log_name)
- os.makedirs(log_dir, exist_ok=True)
- return log_dir
- def _create_model(self):
- return DQN(
- policy="MlpPolicy",
- env=self.env,
- learning_rate=self.params.learning_rate,
- buffer_size=self.params.buffer_size,
- learning_starts=self.params.learning_starts,
- batch_size=self.params.batch_size,
- gamma=self.params.gamma,
- train_freq=self.params.train_freq,
- target_update_interval=1,
- tau=0.005,
- exploration_initial_eps=self.params.exploration_initial_eps,
- exploration_fraction=self.params.exploration_fraction,
- exploration_final_eps=self.params.exploration_final_eps,
- verbose=1,
- tensorboard_log=self.log_dir
- )
- def train(self, total_timesteps: int):
- if self.callback:
- self.model.learn(total_timesteps=total_timesteps, callback=self.callback)
- else:
- self.model.learn(total_timesteps=total_timesteps)
- print(f"模型训练完成,日志保存在:{self.log_dir}")
- def save(self, path=None):
- if path is None:
- path = os.path.join(self.log_dir, "dqn_model.zip")
- self.model.save(path)
- print(f"模型已保存到:{path}")
- def load(self, path):
- self.model = DQN.load(path, env=self.env)
- print(f"模型已从 {path} 加载")
- def set_global_seed(seed: int):
- """固定全局随机种子,保证训练可复现"""
- random.seed(seed)
- np.random.seed(seed)
- torch.manual_seed(seed)
- torch.cuda.manual_seed_all(seed) # 如果使用GPU
- torch.backends.cudnn.deterministic = True
- torch.backends.cudnn.benchmark = False
- def train_uf_rl_agent(params: UFParams, total_timesteps: int = 10000, seed: int = 2025):
- set_global_seed(seed)
- recorder = UFEpisodeRecorder()
- callback = UFTrainingCallback(recorder, verbose=1)
- def make_env():
- env = UFSuperCycleEnv(params)
- env = Monitor(env)
- return env
- env = DummyVecEnv([make_env])
- dqn_params = DQNParams()
- trainer = DQNTrainer(env, dqn_params, callback=callback)
- trainer.train(total_timesteps)
- trainer.save()
- stats = callback.recorder.get_episode_stats()
- print(f"训练完成 - 总奖励: {stats.get('total_reward', 0):.2f}, 平均回收率: {stats.get('avg_recovery', 0):.3f}")
- return trainer.model
- # 训练
- if __name__ == "__main__":
- # 初始化参数
- params = UFParams()
- # 训练RL代理
- print("开始训练RL代理...")
- train_uf_rl_agent(params, total_timesteps=50000)
|