import os import time import random import numpy as np import torch import gymnasium as gym from gymnasium import spaces from stable_baselines3 import DQN from stable_baselines3.common.monitor import Monitor from stable_baselines3.common.vec_env import DummyVecEnv from stable_baselines3.common.callbacks import BaseCallback from DQN_env import UFParams, UFSuperCycleEnv # ==== 定义强化学习超参数 ==== class DQNParams: """ DQN 超参数定义类 用于统一管理模型训练参数 """ # 学习率,控制神经网络更新步长 learning_rate: float = 1e-4 # 经验回放缓冲区大小(步数) buffer_size: int = 10000 # 学习开始前需要收集的步数 learning_starts: int = 200 # 每次从经验池中采样的样本数量 batch_size: int = 32 # 折扣因子,越接近1越重视长期奖励 gamma: float = 0.95 # 每隔多少步训练一次 train_freq: int = 4 # 目标网络更新间隔 target_update_interval: int = 2000 # 初始探索率 ε exploration_initial_eps: float = 1.0 # 从初始ε衰减到最终ε所占的训练比例 exploration_fraction: float = 0.3 # 最终探索率 ε exploration_final_eps: float = 0.02 # 日志备注(用于区分不同实验) remark: str = "default" class UFEpisodeRecorder: """记录episode中的决策和结果""" def __init__(self): self.episode_data = [] self.current_episode = [] def record_step(self, obs, action, reward, done, info): """记录单步信息""" step_data = { "obs": obs.copy(), "action": action.copy(), "reward": reward, "done": done, "info": info.copy() if info else {} } self.current_episode.append(step_data) if done: self.episode_data.append(self.current_episode) self.current_episode = [] def get_episode_stats(self, episode_idx=-1): """获取episode统计信息""" if not self.episode_data: return {} episode = self.episode_data[episode_idx] total_reward = sum(step["reward"] for step in episode) avg_recovery = np.mean([step["info"].get("recovery", 0) for step in episode if "recovery" in step["info"]]) feasible_steps = sum(1 for step in episode if step["info"].get("feasible", False)) return { "total_reward": total_reward, "avg_recovery": avg_recovery, "feasible_steps": feasible_steps, "total_steps": len(episode) } # ==== 定义强化学习训练回调器 ==== class UFTrainingCallback(BaseCallback): """ 强化学习训练回调,用于记录每一步的数据到 recorder。 1. 不依赖环境内部 last_* 属性 2. 使用环境接口提供的 obs、actions、rewards、dones、infos 3. 自动处理 episode 结束时的统计 """ def __init__(self, recorder, verbose=0): super(UFTrainingCallback, self).__init__(verbose) self.recorder = recorder def _on_step(self) -> bool: try: new_obs = self.locals.get("new_obs") actions = self.locals.get("actions") rewards = self.locals.get("rewards") dones = self.locals.get("dones") infos = self.locals.get("infos") if len(new_obs) > 0: step_obs = new_obs[0] step_action = actions[0] if actions is not None else None step_reward = rewards[0] if rewards is not None else 0.0 step_done = dones[0] if dones is not None else False step_info = infos[0] if infos is not None else {} # 打印当前 step 的信息 if self.verbose: print(f"[Step {self.num_timesteps}] 动作={step_action}, 奖励={step_reward:.3f}, Done={step_done}") # 记录数据 self.recorder.record_step( obs=step_obs, action=step_action, reward=step_reward, done=step_done, info=step_info, ) except Exception as e: if self.verbose: print(f"[Callback Error] {e}") return True class DQNTrainer: def __init__(self, env, params, callback=None): self.env = env self.params = params self.callback = callback self.log_dir = self._create_log_dir() self.model = self._create_model() def _create_log_dir(self): # 创建训练日志 timestamp = time.strftime("%Y%m%d-%H%M%S") log_name = ( f"DQN_lr{self.params.learning_rate}_buf{self.params.buffer_size}_bs{self.params.batch_size}" f"_gamma{self.params.gamma}_exp{self.params.exploration_fraction}" f"_{self.params.remark}_{timestamp}" ) log_dir = os.path.join("./uf_dqn_tensorboard", log_name) os.makedirs(log_dir, exist_ok=True) return log_dir def _create_model(self): return DQN( policy="MlpPolicy", env=self.env, learning_rate=self.params.learning_rate, buffer_size=self.params.buffer_size, learning_starts=self.params.learning_starts, batch_size=self.params.batch_size, gamma=self.params.gamma, train_freq=self.params.train_freq, target_update_interval=1, tau=0.005, exploration_initial_eps=self.params.exploration_initial_eps, exploration_fraction=self.params.exploration_fraction, exploration_final_eps=self.params.exploration_final_eps, verbose=1, tensorboard_log=self.log_dir ) def train(self, total_timesteps: int): if self.callback: self.model.learn(total_timesteps=total_timesteps, callback=self.callback) else: self.model.learn(total_timesteps=total_timesteps) print(f"模型训练完成,日志保存在:{self.log_dir}") def save(self, path=None): if path is None: path = os.path.join(self.log_dir, "dqn_model.zip") self.model.save(path) print(f"模型已保存到:{path}") def load(self, path): self.model = DQN.load(path, env=self.env) print(f"模型已从 {path} 加载") def set_global_seed(seed: int): """固定全局随机种子,保证训练可复现""" random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) # 如果使用GPU torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False def train_uf_rl_agent(params: UFParams, total_timesteps: int = 10000, seed: int = 2025): set_global_seed(seed) recorder = UFEpisodeRecorder() callback = UFTrainingCallback(recorder, verbose=1) def make_env(): env = UFSuperCycleEnv(params) env = Monitor(env) return env env = DummyVecEnv([make_env]) dqn_params = DQNParams() trainer = DQNTrainer(env, dqn_params, callback=callback) trainer.train(total_timesteps) trainer.save() stats = callback.recorder.get_episode_stats() print(f"训练完成 - 总奖励: {stats.get('total_reward', 0):.2f}, 平均回收率: {stats.get('avg_recovery', 0):.3f}") return trainer.model # 训练 if __name__ == "__main__": # 初始化参数 params = UFParams() # 训练RL代理 print("开始训练RL代理...") train_uf_rl_agent(params, total_timesteps=50000)