""" DQN 强化学习训练模块 ====================== 本模块实现基于 Stable-Baselines3 的 DQN 强化学习训练流程,包括: 1. DQNParams: DQN超参数配置类 2. UFEpisodeRecorder: Episode数据记录器 3. UFTrainingCallback: 训练回调器 4. DQNTrainer: DQN训练器封装 5. train_uf_rl_agent: 主训练函数 DQN算法简介: - Deep Q-Network(深度Q网络) - 基于价值的强化学习算法 - 使用经验回放和目标网络稳定训练 - 适用于离散动作空间 训练流程: 1. 初始化环境和DQN智能体 2. 收集经验(exploration) 3. 从经验池采样训练(exploitation) 4. 周期性更新目标网络 5. 记录训练指标到TensorBoard """ import os import time import random import numpy as np import torch from stable_baselines3 import DQN from stable_baselines3.common.monitor import Monitor from stable_baselines3.common.vec_env import DummyVecEnv from stable_baselines3.common.callbacks import BaseCallback from fixed_DQN_env import UFParams, UFSuperCycleEnv # ==================== DQN超参数配置类 ==================== class DQNParams: """ DQN 超参数配置类 功能:统一管理DQN算法的所有超参数 超参数说明: - learning_rate: 神经网络学习率,控制梯度下降的步长 - buffer_size: 经验回放缓冲区大小,存储历史经验 - learning_starts: 开始训练前先收集的经验数量(warm-up) - batch_size: 每次训练采样的batch大小 - gamma: 折扣因子,权衡即时奖励和长期奖励 - train_freq: 训练频率,每隔多少步训练一次 - target_update_interval: 目标网络更新频率 - tau: 软更新系数(soft update) - exploration_*: ε-贪心策略的探索率参数 """ # ========== 神经网络参数 ========== learning_rate: float = 1e-4 # 学习率,控制神经网络权重更新的步长 # 典型范围:1e-5 ~ 1e-3 # 过大:训练不稳定;过小:收敛慢 # ========== 经验回放参数 ========== buffer_size: int = 100000 # 经验回放缓冲区大小(可存储的transition数量) # 作用:打破样本间的时间相关性,提高训练稳定性 # 建议:至少存储几个完整episode的经验 learning_starts: int = 10000 # 开始训练前先收集的步数(预填充缓冲区) # 作用:确保缓冲区有足够的多样性样本再开始训练 # 建议:设为buffer_size的10%-20% batch_size: int = 32 # 每次训练从缓冲区采样的样本数量 # 典型值:32, 64, 128, 256 # 过大:显存占用高,训练慢;过小:梯度估计不准确 # ========== 强化学习参数 ========== gamma: float = 0.95 # 折扣因子(discount factor),γ ∈ [0, 1] # 作用:权衡即时奖励和长期奖励 # γ=0:只考虑当前奖励(短视) # γ=1:完全考虑未来奖励(长视) # 通常设为0.9-0.99 train_freq: int = 4 # 训练频率:每收集多少步执行一次训练 # 作用:平衡数据收集和网络更新 # 典型值:1(每步训练)或4-16(批量训练) # ========== 目标网络参数 ========== target_update_interval: int = 1 # 目标网络更新间隔(硬更新) # 作用:目标网络每隔多少次训练更新一次 # 注:使用软更新(tau)时此参数通常设为1 tau: float = 0.005 # 软更新系数(soft update) # θ_target = τ×θ + (1-τ)×θ_target # τ=1:硬更新(完全复制) # τ<<1:软更新(平滑过渡,更稳定) # 典型值:0.001 - 0.01 # ========== 探索策略参数(ε-greedy) ========== exploration_initial_eps: float = 1.0 # 初始探索率 ε_0 # ε=1:完全随机探索 # ε=0:完全利用已学知识 exploration_fraction: float = 0.3 # 探索率衰减比例 # 表示训练总步数的前30%进行ε衰减 # 例:总共10万步,前3万步ε从1.0衰减到0.02 exploration_final_eps: float = 0.02 # 最终探索率 ε_final # 衰减结束后保持此值(保留小概率探索) # 典型值:0.01 - 0.05 # ========== 日志参数 ========== remark: str = "default" # 实验备注,用于区分不同训练实验 # 会自动添加到TensorBoard日志目录名中 # ==================== Episode数据记录器 ==================== class UFEpisodeRecorder: """ Episode数据记录器 功能: - 记录训练过程中每个episode的详细数据 - 存储每步的状态、动作、奖励、info等信息 - 计算episode级别的统计指标 用途: - 训练监控:实时查看智能体表现 - 调试分析:定位问题episode - 数据分析:评估策略改进效果 """ def __init__(self): """初始化记录器""" self.episode_data = [] # 存储所有完成的episode数据 self.current_episode = [] # 当前正在进行的episode数据 def record_step(self, obs, action, reward, done, info): """ 记录单步交互数据 参数: obs: 当前状态观测 action: 执行的动作 reward: 获得的奖励 done: 是否结束 info: 额外信息字典 """ # 构建单步数据字典 step_data = { "obs": obs.copy(), # 状态(深拷贝避免引用问题) "action": action.copy(), # 动作 "reward": reward, # 奖励 "done": done, # 是否终止 "info": info.copy() if info else {} # 环境信息 } # 添加到当前episode self.current_episode.append(step_data) # 如果episode结束,保存并重置 if done: self.episode_data.append(self.current_episode) self.current_episode = [] def get_episode_stats(self, episode_idx=-1): """ 获取指定episode的统计信息 参数: episode_idx (int): episode索引,默认-1(最后一个) 返回: dict: 包含以下统计指标的字典 - total_reward: 总奖励 - avg_recovery: 平均回收率 - feasible_steps: 可行步数 - total_steps: 总步数 """ if not self.episode_data: return {} episode = self.episode_data[episode_idx] # 计算总奖励 total_reward = sum(step["reward"] for step in episode) # 计算平均回收率(从info中提取) recovery_values = [ step["info"].get("recovery", 0) for step in episode if "recovery" in step["info"] ] avg_recovery = np.mean(recovery_values) if recovery_values else 0.0 # 计算可行步数(成功的超级周期数) feasible_steps = sum( 1 for step in episode if step["info"].get("feasible", False) ) return { "total_reward": total_reward, "avg_recovery": avg_recovery, "feasible_steps": feasible_steps, "total_steps": len(episode) } # ==================== 训练回调器 ==================== class UFTrainingCallback(BaseCallback): """ 自定义训练回调器 功能: - 在每个训练步骤调用,记录数据到recorder - 兼容Stable-Baselines3的回调机制 - 不依赖环境内部属性,使用标准接口获取数据 回调时机: - _on_step(): 每执行一步环境交互后调用 设计特点: 1. 从self.locals获取当前步的数据(SB3提供的接口) 2. 处理向量化环境(DummyVecEnv)的数据格式 3. 自动检测episode结束并触发记录 """ def __init__(self, recorder, verbose=0): """ 初始化回调器 参数: recorder (UFEpisodeRecorder): 数据记录器实例 verbose (int): 日志详细程度,0=关闭,1=打印每步信息 """ super(UFTrainingCallback, self).__init__(verbose) self.recorder = recorder def _on_step(self) -> bool: """ 每步回调函数(Stable-Baselines3标准接口) 返回: bool: True表示继续训练,False表示提前终止 """ try: # 从SB3的self.locals获取当前步数据 new_obs = self.locals.get("new_obs") # 新状态 actions = self.locals.get("actions") # 执行的动作 rewards = self.locals.get("rewards") # 获得的奖励 dones = self.locals.get("dones") # 是否结束 infos = self.locals.get("infos") # 环境信息 # 处理向量化环境(取第一个环境的数据) if len(new_obs) > 0: step_obs = new_obs[0] step_action = actions[0] if actions is not None else None step_reward = rewards[0] if rewards is not None else 0.0 step_done = dones[0] if dones is not None else False step_info = infos[0] if infos is not None else {} # 可选:打印当前步信息(用于调试) if self.verbose: print(f"[Step {self.num_timesteps}] " f"动作={step_action}, " f"奖励={step_reward:.3f}, " f"Done={step_done}") # 记录数据到recorder self.recorder.record_step( obs=step_obs, action=step_action, reward=step_reward, done=step_done, info=step_info, ) except Exception as e: # 异常处理:避免回调错误中断训练 if self.verbose: print(f"[Callback Error] {e}") # 返回True继续训练 return True # ==================== DQN训练器封装类 ==================== class DQNTrainer: def __init__(self, env, params, callback=None): """ 初始化训练器 参数: env: Gymnasium环境 params: DQN 超参数配置 callback: 可选训练回调 """ self.env = env self.params = params self.callback = callback self.log_dir = self._create_log_dir() # 创建日志目录 self.model = self._create_model() # 创建 DQN 模型 def _create_log_dir(self): """ 创建 TensorBoard 日志目录,保证 Windows 下路径安全 返回: str: 可用的日志目录路径 """ timestamp = time.strftime("%Y%m%d-%H%M%S") # 用整数代替浮点数,避免路径中包含小数点 lr_int = int(self.params.learning_rate * 1e4) gamma_int = int(self.params.gamma * 100) exp_int = int(self.params.exploration_fraction * 100) # 生成目录名 log_name = f"DQN_lr{lr_int}_buf{self.params.buffer_size}_bs{self.params.batch_size}_gamma{gamma_int}_exp{exp_int}_{self.params.remark}_{timestamp}" # 使用短路径,避免 Windows 路径过长 base_dir = r"E:\Greentech\models\uf-rl\uf_dqn_tensorboard" os.makedirs(base_dir, exist_ok=True) log_dir = os.path.join(base_dir, log_name) # 尝试创建目录,防止偶发锁或占用 attempt = 0 while attempt < 5: try: os.makedirs(log_dir, exist_ok=True) if not os.path.isdir(log_dir): raise RuntimeError(f"{log_dir} 已存在但不是目录!") break except Exception as e: attempt += 1 time.sleep(0.1) log_dir += f"_{attempt}" else: raise RuntimeError(f"无法创建日志目录: {log_dir}") return log_dir def _create_model(self): """ 创建 Stable-Baselines3 DQN 模型 """ model = DQN( policy="MlpPolicy", env=self.env, learning_rate=self.params.learning_rate, buffer_size=self.params.buffer_size, learning_starts=self.params.learning_starts, batch_size=self.params.batch_size, gamma=self.params.gamma, train_freq=self.params.train_freq, target_update_interval=1, tau=0.005, exploration_initial_eps=self.params.exploration_initial_eps, exploration_fraction=self.params.exploration_fraction, exploration_final_eps=self.params.exploration_final_eps, verbose=1, tensorboard_log=self.log_dir ) return model def train(self, total_timesteps: int): """ 执行训练 参数: total_timesteps (int): 总训练步数 注:对于超滤环境,每步代表一个超级周期(约2-3天) 150000步 ≈ 10000个episode ≈ 10000个超级周期 ≈ 约54年 """ if self.callback: # 使用回调器训练 self.model.learn(total_timesteps=total_timesteps, callback=self.callback) else: # 不使用回调器训练 self.model.learn(total_timesteps=total_timesteps) print(f"✅ 模型训练完成!") print(f"📊 日志保存在:{self.log_dir}") print(f"💡 使用以下命令查看TensorBoard:") print(f" tensorboard --logdir={self.log_dir}") def save(self, path=None): """ 保存模型 参数: path (str, optional): 保存路径,默认保存到日志目录下的dqn_model.zip """ if path is None: path = os.path.join(self.log_dir, "dqn_model.zip") self.model.save(path) print(f"💾 模型已保存到:{path}") def load(self, path): """ 加载模型 参数: path (str): 模型文件路径(.zip文件) """ self.model = DQN.load(path, env=self.env) print(f"📥 模型已从 {path} 加载") # ==================== 辅助函数:随机种子设置 ==================== def set_global_seed(seed: int): """ 固定全局随机种子,保证训练可复现 参数: seed (int): 随机种子 作用: - 固定Python、NumPy、PyTorch的随机数生成器 - 确保相同种子产生相同的训练结果 - 便于实验对比和问题复现 注意: - 即使固定种子,多线程/多进程仍可能产生微小差异 - GPU运算的非确定性也可能影响复现性 """ random.seed(seed) # Python随机数 np.random.seed(seed) # NumPy随机数 torch.manual_seed(seed) # PyTorch CPU随机数 torch.cuda.manual_seed_all(seed) # PyTorch GPU随机数 # 设置PyTorch为确定性模式(可能影响性能) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False # ==================== 主训练函数 ==================== def train_uf_rl_agent(params: UFParams, total_timesteps: int = 10000, seed: int = 2025): """ 超滤强化学习智能体训练主函数 参数: params (UFParams): 超滤环境参数 total_timesteps (int): 总训练步数,默认10000 seed (int): 随机种子,默认2025 返回: DQN: 训练好的DQN模型 训练流程: 1. 固定随机种子(确保可复现) 2. 创建记录器和回调器 3. 创建并包装环境(Monitor + DummyVecEnv) 4. 初始化DQN训练器 5. 执行训练 6. 保存模型 7. 输出统计信息 """ # 步骤1:固定随机种子 set_global_seed(seed) print(f"🎲 随机种子已设置为: {seed}") # 步骤2:创建数据记录器和回调器 recorder = UFEpisodeRecorder() callback = UFTrainingCallback(recorder, verbose=1) # 步骤3:创建环境(使用闭包和向量化) def make_env(): """环境工厂函数""" env = UFSuperCycleEnv(params) # 创建超滤环境 env = Monitor(env) # 包装Monitor(记录episode统计) return env # 向量化环境(即使只有一个环境,也需要向量化以兼容SB3) env = DummyVecEnv([make_env]) # 步骤4:创建DQN训练器 dqn_params = DQNParams() trainer = DQNTrainer(env, dqn_params, callback=callback) # 步骤5:执行训练 trainer.train(total_timesteps) # 步骤6:保存模型 trainer.save() # 步骤7:输出最终统计信息 stats = callback.recorder.get_episode_stats() print("\n" + "="*60) print("📈 训练统计") print("="*60) print(f"总奖励: {stats.get('total_reward', 0):.2f}") print(f"平均回收率: {stats.get('avg_recovery', 0):.3f}") print(f"可行步数: {stats.get('feasible_steps', 0)}") print(f"总步数: {stats.get('total_steps', 0)}") print("="*60) return trainer.model # ==================== 主程序入口 ==================== if __name__ == "__main__": """ 训练脚本入口 使用方法: python fixed_DQN_train.py 训练参数: - total_timesteps=150000: 总训练步数 - 约10000个episode(每个episode最多15步) - 约需训练数小时至数天(取决于硬件) """ print("="*60) print("🚀 开始训练超滤强化学习智能体") print("="*60) # 初始化超滤参数 params = UFParams() # 执行训练 train_uf_rl_agent(params, total_timesteps=200000) print("\n🎉 训练流程全部完成!")