| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536 |
- """
- DQN 强化学习训练模块
- ======================
- 本模块实现基于 Stable-Baselines3 的 DQN 强化学习训练流程,包括:
- 1. DQNParams: DQN超参数配置类
- 2. UFEpisodeRecorder: Episode数据记录器
- 3. UFTrainingCallback: 训练回调器
- 4. DQNTrainer: DQN训练器封装
- 5. train_uf_rl_agent: 主训练函数
- DQN算法简介:
- - Deep Q-Network(深度Q网络)
- - 基于价值的强化学习算法
- - 使用经验回放和目标网络稳定训练
- - 适用于离散动作空间
- 训练流程:
- 1. 初始化环境和DQN智能体
- 2. 收集经验(exploration)
- 3. 从经验池采样训练(exploitation)
- 4. 周期性更新目标网络
- 5. 记录训练指标到TensorBoard
- """
- import os
- import time
- import random
- import numpy as np
- import torch
- from stable_baselines3 import DQN
- from stable_baselines3.common.monitor import Monitor
- from stable_baselines3.common.vec_env import DummyVecEnv
- from stable_baselines3.common.callbacks import BaseCallback
- from fixed_DQN_env import UFParams, UFSuperCycleEnv
- # ==================== DQN超参数配置类 ====================
- class DQNParams:
- """
- DQN 超参数配置类
-
- 功能:统一管理DQN算法的所有超参数
-
- 超参数说明:
- - learning_rate: 神经网络学习率,控制梯度下降的步长
- - buffer_size: 经验回放缓冲区大小,存储历史经验
- - learning_starts: 开始训练前先收集的经验数量(warm-up)
- - batch_size: 每次训练采样的batch大小
- - gamma: 折扣因子,权衡即时奖励和长期奖励
- - train_freq: 训练频率,每隔多少步训练一次
- - target_update_interval: 目标网络更新频率
- - tau: 软更新系数(soft update)
- - exploration_*: ε-贪心策略的探索率参数
- """
- # ========== 神经网络参数 ==========
- learning_rate: float = 1e-4
- # 学习率,控制神经网络权重更新的步长
- # 典型范围:1e-5 ~ 1e-3
- # 过大:训练不稳定;过小:收敛慢
- # ========== 经验回放参数 ==========
- buffer_size: int = 100000
- # 经验回放缓冲区大小(可存储的transition数量)
- # 作用:打破样本间的时间相关性,提高训练稳定性
- # 建议:至少存储几个完整episode的经验
- learning_starts: int = 10000
- # 开始训练前先收集的步数(预填充缓冲区)
- # 作用:确保缓冲区有足够的多样性样本再开始训练
- # 建议:设为buffer_size的10%-20%
- batch_size: int = 32
- # 每次训练从缓冲区采样的样本数量
- # 典型值:32, 64, 128, 256
- # 过大:显存占用高,训练慢;过小:梯度估计不准确
- # ========== 强化学习参数 ==========
- gamma: float = 0.95
- # 折扣因子(discount factor),γ ∈ [0, 1]
- # 作用:权衡即时奖励和长期奖励
- # γ=0:只考虑当前奖励(短视)
- # γ=1:完全考虑未来奖励(长视)
- # 通常设为0.9-0.99
- train_freq: int = 4
- # 训练频率:每收集多少步执行一次训练
- # 作用:平衡数据收集和网络更新
- # 典型值:1(每步训练)或4-16(批量训练)
- # ========== 目标网络参数 ==========
- target_update_interval: int = 1
- # 目标网络更新间隔(硬更新)
- # 作用:目标网络每隔多少次训练更新一次
- # 注:使用软更新(tau)时此参数通常设为1
- tau: float = 0.005
- # 软更新系数(soft update)
- # θ_target = τ×θ + (1-τ)×θ_target
- # τ=1:硬更新(完全复制)
- # τ<<1:软更新(平滑过渡,更稳定)
- # 典型值:0.001 - 0.01
- # ========== 探索策略参数(ε-greedy) ==========
- exploration_initial_eps: float = 1.0
- # 初始探索率 ε_0
- # ε=1:完全随机探索
- # ε=0:完全利用已学知识
- exploration_fraction: float = 0.3
- # 探索率衰减比例
- # 表示训练总步数的前30%进行ε衰减
- # 例:总共10万步,前3万步ε从1.0衰减到0.02
- exploration_final_eps: float = 0.02
- # 最终探索率 ε_final
- # 衰减结束后保持此值(保留小概率探索)
- # 典型值:0.01 - 0.05
- # ========== 日志参数 ==========
- remark: str = "default"
- # 实验备注,用于区分不同训练实验
- # 会自动添加到TensorBoard日志目录名中
- # ==================== Episode数据记录器 ====================
- class UFEpisodeRecorder:
- """
- Episode数据记录器
-
- 功能:
- - 记录训练过程中每个episode的详细数据
- - 存储每步的状态、动作、奖励、info等信息
- - 计算episode级别的统计指标
-
- 用途:
- - 训练监控:实时查看智能体表现
- - 调试分析:定位问题episode
- - 数据分析:评估策略改进效果
- """
- def __init__(self):
- """初始化记录器"""
- self.episode_data = [] # 存储所有完成的episode数据
- self.current_episode = [] # 当前正在进行的episode数据
- def record_step(self, obs, action, reward, done, info):
- """
- 记录单步交互数据
-
- 参数:
- obs: 当前状态观测
- action: 执行的动作
- reward: 获得的奖励
- done: 是否结束
- info: 额外信息字典
- """
- # 构建单步数据字典
- step_data = {
- "obs": obs.copy(), # 状态(深拷贝避免引用问题)
- "action": action.copy(), # 动作
- "reward": reward, # 奖励
- "done": done, # 是否终止
- "info": info.copy() if info else {} # 环境信息
- }
-
- # 添加到当前episode
- self.current_episode.append(step_data)
- # 如果episode结束,保存并重置
- if done:
- self.episode_data.append(self.current_episode)
- self.current_episode = []
- def get_episode_stats(self, episode_idx=-1):
- """
- 获取指定episode的统计信息
-
- 参数:
- episode_idx (int): episode索引,默认-1(最后一个)
-
- 返回:
- dict: 包含以下统计指标的字典
- - total_reward: 总奖励
- - avg_recovery: 平均回收率
- - feasible_steps: 可行步数
- - total_steps: 总步数
- """
- if not self.episode_data:
- return {}
- episode = self.episode_data[episode_idx]
-
- # 计算总奖励
- total_reward = sum(step["reward"] for step in episode)
-
- # 计算平均回收率(从info中提取)
- recovery_values = [
- step["info"].get("recovery", 0)
- for step in episode
- if "recovery" in step["info"]
- ]
- avg_recovery = np.mean(recovery_values) if recovery_values else 0.0
-
- # 计算可行步数(成功的超级周期数)
- feasible_steps = sum(
- 1 for step in episode
- if step["info"].get("feasible", False)
- )
- return {
- "total_reward": total_reward,
- "avg_recovery": avg_recovery,
- "feasible_steps": feasible_steps,
- "total_steps": len(episode)
- }
- # ==================== 训练回调器 ====================
- class UFTrainingCallback(BaseCallback):
- """
- 自定义训练回调器
-
- 功能:
- - 在每个训练步骤调用,记录数据到recorder
- - 兼容Stable-Baselines3的回调机制
- - 不依赖环境内部属性,使用标准接口获取数据
-
- 回调时机:
- - _on_step(): 每执行一步环境交互后调用
-
- 设计特点:
- 1. 从self.locals获取当前步的数据(SB3提供的接口)
- 2. 处理向量化环境(DummyVecEnv)的数据格式
- 3. 自动检测episode结束并触发记录
- """
- def __init__(self, recorder, verbose=0):
- """
- 初始化回调器
-
- 参数:
- recorder (UFEpisodeRecorder): 数据记录器实例
- verbose (int): 日志详细程度,0=关闭,1=打印每步信息
- """
- super(UFTrainingCallback, self).__init__(verbose)
- self.recorder = recorder
- def _on_step(self) -> bool:
- """
- 每步回调函数(Stable-Baselines3标准接口)
-
- 返回:
- bool: True表示继续训练,False表示提前终止
- """
- try:
- # 从SB3的self.locals获取当前步数据
- new_obs = self.locals.get("new_obs") # 新状态
- actions = self.locals.get("actions") # 执行的动作
- rewards = self.locals.get("rewards") # 获得的奖励
- dones = self.locals.get("dones") # 是否结束
- infos = self.locals.get("infos") # 环境信息
- # 处理向量化环境(取第一个环境的数据)
- if len(new_obs) > 0:
- step_obs = new_obs[0]
- step_action = actions[0] if actions is not None else None
- step_reward = rewards[0] if rewards is not None else 0.0
- step_done = dones[0] if dones is not None else False
- step_info = infos[0] if infos is not None else {}
- # 可选:打印当前步信息(用于调试)
- if self.verbose:
- print(f"[Step {self.num_timesteps}] "
- f"动作={step_action}, "
- f"奖励={step_reward:.3f}, "
- f"Done={step_done}")
- # 记录数据到recorder
- self.recorder.record_step(
- obs=step_obs,
- action=step_action,
- reward=step_reward,
- done=step_done,
- info=step_info,
- )
- except Exception as e:
- # 异常处理:避免回调错误中断训练
- if self.verbose:
- print(f"[Callback Error] {e}")
- # 返回True继续训练
- return True
- # ==================== DQN训练器封装类 ====================
- class DQNTrainer:
- def __init__(self, env, params, callback=None):
- """
- 初始化训练器
- 参数:
- env: Gymnasium环境
- params: DQN 超参数配置
- callback: 可选训练回调
- """
- self.env = env
- self.params = params
- self.callback = callback
- self.log_dir = self._create_log_dir() # 创建日志目录
- self.model = self._create_model() # 创建 DQN 模型
- def _create_log_dir(self):
- """
- 创建 TensorBoard 日志目录,保证 Windows 下路径安全
- 返回:
- str: 可用的日志目录路径
- """
- timestamp = time.strftime("%Y%m%d-%H%M%S")
- # 用整数代替浮点数,避免路径中包含小数点
- lr_int = int(self.params.learning_rate * 1e4)
- gamma_int = int(self.params.gamma * 100)
- exp_int = int(self.params.exploration_fraction * 100)
- # 生成目录名
- log_name = f"DQN_lr{lr_int}_buf{self.params.buffer_size}_bs{self.params.batch_size}_gamma{gamma_int}_exp{exp_int}_{self.params.remark}_{timestamp}"
- # 使用短路径,避免 Windows 路径过长
- base_dir = r"E:\Greentech\models\uf-rl\uf_dqn_tensorboard"
- os.makedirs(base_dir, exist_ok=True)
- log_dir = os.path.join(base_dir, log_name)
- # 尝试创建目录,防止偶发锁或占用
- attempt = 0
- while attempt < 5:
- try:
- os.makedirs(log_dir, exist_ok=True)
- if not os.path.isdir(log_dir):
- raise RuntimeError(f"{log_dir} 已存在但不是目录!")
- break
- except Exception as e:
- attempt += 1
- time.sleep(0.1)
- log_dir += f"_{attempt}"
- else:
- raise RuntimeError(f"无法创建日志目录: {log_dir}")
- return log_dir
- def _create_model(self):
- """
- 创建 Stable-Baselines3 DQN 模型
- """
- model = DQN(
- policy="MlpPolicy",
- env=self.env,
- learning_rate=self.params.learning_rate,
- buffer_size=self.params.buffer_size,
- learning_starts=self.params.learning_starts,
- batch_size=self.params.batch_size,
- gamma=self.params.gamma,
- train_freq=self.params.train_freq,
- target_update_interval=1,
- tau=0.005,
- exploration_initial_eps=self.params.exploration_initial_eps,
- exploration_fraction=self.params.exploration_fraction,
- exploration_final_eps=self.params.exploration_final_eps,
- verbose=1,
- tensorboard_log=self.log_dir
- )
- return model
- def train(self, total_timesteps: int):
- """
- 执行训练
-
- 参数:
- total_timesteps (int): 总训练步数
- 注:对于超滤环境,每步代表一个超级周期(约2-3天)
- 150000步 ≈ 10000个episode ≈ 10000个超级周期 ≈ 约54年
- """
- if self.callback:
- # 使用回调器训练
- self.model.learn(total_timesteps=total_timesteps, callback=self.callback)
- else:
- # 不使用回调器训练
- self.model.learn(total_timesteps=total_timesteps)
-
- print(f"✅ 模型训练完成!")
- print(f"📊 日志保存在:{self.log_dir}")
- print(f"💡 使用以下命令查看TensorBoard:")
- print(f" tensorboard --logdir={self.log_dir}")
- def save(self, path=None):
- """
- 保存模型
-
- 参数:
- path (str, optional): 保存路径,默认保存到日志目录下的dqn_model.zip
- """
- if path is None:
- path = os.path.join(self.log_dir, "dqn_model.zip")
- self.model.save(path)
- print(f"💾 模型已保存到:{path}")
- def load(self, path):
- """
- 加载模型
-
- 参数:
- path (str): 模型文件路径(.zip文件)
- """
- self.model = DQN.load(path, env=self.env)
- print(f"📥 模型已从 {path} 加载")
- # ==================== 辅助函数:随机种子设置 ====================
- def set_global_seed(seed: int):
- """
- 固定全局随机种子,保证训练可复现
-
- 参数:
- seed (int): 随机种子
-
- 作用:
- - 固定Python、NumPy、PyTorch的随机数生成器
- - 确保相同种子产生相同的训练结果
- - 便于实验对比和问题复现
-
- 注意:
- - 即使固定种子,多线程/多进程仍可能产生微小差异
- - GPU运算的非确定性也可能影响复现性
- """
- random.seed(seed) # Python随机数
- np.random.seed(seed) # NumPy随机数
- torch.manual_seed(seed) # PyTorch CPU随机数
- torch.cuda.manual_seed_all(seed) # PyTorch GPU随机数
-
- # 设置PyTorch为确定性模式(可能影响性能)
- torch.backends.cudnn.deterministic = True
- torch.backends.cudnn.benchmark = False
- # ==================== 主训练函数 ====================
- def train_uf_rl_agent(params: UFParams, total_timesteps: int = 10000, seed: int = 2025):
- """
- 超滤强化学习智能体训练主函数
-
- 参数:
- params (UFParams): 超滤环境参数
- total_timesteps (int): 总训练步数,默认10000
- seed (int): 随机种子,默认2025
-
- 返回:
- DQN: 训练好的DQN模型
-
- 训练流程:
- 1. 固定随机种子(确保可复现)
- 2. 创建记录器和回调器
- 3. 创建并包装环境(Monitor + DummyVecEnv)
- 4. 初始化DQN训练器
- 5. 执行训练
- 6. 保存模型
- 7. 输出统计信息
- """
- # 步骤1:固定随机种子
- set_global_seed(seed)
- print(f"🎲 随机种子已设置为: {seed}")
-
- # 步骤2:创建数据记录器和回调器
- recorder = UFEpisodeRecorder()
- callback = UFTrainingCallback(recorder, verbose=1)
-
- # 步骤3:创建环境(使用闭包和向量化)
- def make_env():
- """环境工厂函数"""
- env = UFSuperCycleEnv(params) # 创建超滤环境
- env = Monitor(env) # 包装Monitor(记录episode统计)
- return env
-
- # 向量化环境(即使只有一个环境,也需要向量化以兼容SB3)
- env = DummyVecEnv([make_env])
-
- # 步骤4:创建DQN训练器
- dqn_params = DQNParams()
- trainer = DQNTrainer(env, dqn_params, callback=callback)
-
- # 步骤5:执行训练
- trainer.train(total_timesteps)
-
- # 步骤6:保存模型
- trainer.save()
-
- # 步骤7:输出最终统计信息
- stats = callback.recorder.get_episode_stats()
- print("\n" + "="*60)
- print("📈 训练统计")
- print("="*60)
- print(f"总奖励: {stats.get('total_reward', 0):.2f}")
- print(f"平均回收率: {stats.get('avg_recovery', 0):.3f}")
- print(f"可行步数: {stats.get('feasible_steps', 0)}")
- print(f"总步数: {stats.get('total_steps', 0)}")
- print("="*60)
-
- return trainer.model
- # ==================== 主程序入口 ====================
- if __name__ == "__main__":
- """
- 训练脚本入口
-
- 使用方法:
- python fixed_DQN_train.py
-
- 训练参数:
- - total_timesteps=150000: 总训练步数
- - 约10000个episode(每个episode最多15步)
- - 约需训练数小时至数天(取决于硬件)
- """
- print("="*60)
- print("🚀 开始训练超滤强化学习智能体")
- print("="*60)
-
- # 初始化超滤参数
- params = UFParams()
-
- # 执行训练
- train_uf_rl_agent(params, total_timesteps=200000)
-
- print("\n🎉 训练流程全部完成!")
|