┌─────────────────────────────────────────────────────────┐
│ DQN训练系统 │
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ DQN_train │─────▶│ DQN Agent │ │
│ │ (训练脚本) │ │ (神经网络) │ │
│ └──────────────┘ └──────────────┘ │
│ │ │ │
│ │ │ predict │
│ ▼ ▼ │
│ ┌──────────────────────────────────────┐ │
│ │ UFSuperCycleEnv │ │
│ │ (强化学习环境) │ │
│ │ │ │
│ │ ┌────────────────────────────────┐ │ │
│ │ │ simulate_one_supercycle() │ │ │
│ │ │ (物理模拟器) │ │ │
│ │ │ │ │ │
│ │ │ ┌──────────┐ ┌──────────┐ │ │ │
│ │ │ │model_fp │ │model_bw │ │ │ │
│ │ │ │TMP增长 │ │反洗恢复 │ │ │ │
│ │ │ └──────────┘ └──────────┘ │ │ │
│ │ └────────────────────────────────┘ │ │
│ └──────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Callback │ │
│ │ (记录器) │ │
│ └──────────────┘ │
└─────────────────────────────────────────────────────────┘
| 文件 | 职责 | 核心类/函数 |
|---|---|---|
DQN_train.py |
训练入口、参数配置、训练循环 | DQNTrainer, train_uf_rl_agent() |
DQN_env.py |
强化学习环境、物理模拟 | UFSuperCycleEnv, simulate_one_supercycle() |
UF_models.py |
TMP动力学模型 | TMPIncreaseModel, TMPDecreaseModel |
DQN_decide.py |
推理决策接口 | run_uf_DQN_decide(), generate_plc_instructions() |
class TMPIncreaseModel(torch.nn.Module):
def forward(self, p, L_h):
# 简化的膜污染动力学公式
return float(p.alpha * (p.q_UF ** p.belta) * L_h)
公式解释:
ΔTMP = α × Q^β × t
其中:
- α (alpha):污染系数(1e-6)
- Q (q_UF):进水流量(360 m³/h)
- β (belta):幂指数(1.1)
- t (L_h):过滤时间(小时)
物理含义:
class TMPDecreaseModel(torch.nn.Module):
def forward(self, p, L_s, t_bw_s):
# 反洗去除比例上界(随过滤时长衰减)
upper_L = phi_bw_min + (phi_bw_max - phi_bw_min) * exp(-L / L_ref)
# 反洗时长增益(饱和曲线)
time_gain = 1 - exp(-(t / tau_bw) ^ gamma_t)
# 实际去除比例
phi = upper_L × time_gain
return clip(phi, 0.0, 0.999)
公式解释:
φ(L, t) = [φ_min + (φ_max - φ_min) × e^(-L/L_ref)] × [1 - e^(-(t/τ)^γ)]
其中:
- φ_min = 0.7:最小去除比例(长时间过滤后)
- φ_max = 1.0:最大去除比例(短时间过滤)
- L_ref = 4000s:过滤时长影响的时间尺度
- τ = 20s:反洗时长影响的时间尺度
- γ = 1.0:反洗时长作用指数
物理含义:
关键问题: ⚠️ 这两个"模型"实际上是数学公式,不是神经网络!
@dataclass
class UFParams:
# 膜运行参数
q_UF: float = 360.0 # 进水流量
TMP0: float = 0.03 # 初始TMP
TMP_max: float = 0.06 # TMP上限
# 污染动力学参数
alpha: float = 1e-6 # TMP增长系数
belta: float = 1.1 # 幂指数
# 反洗参数
q_bw_m3ph: float = 1000.0 # 反洗流量
# CEB参数
T_ceb_interval_h: float = 48.0 # CEB间隔
v_ceb_m3: float = 30.0 # CEB用水
t_ceb_s: float = 2400.0 # CEB时长
# 约束
dTMP: float = 0.001 # 单次残余增量上限
# 动作空间
L_min_s: float = 3800.0 # 过滤时长下限
L_max_s: float = 6000.0 # 过滤时长上限
t_bw_min_s: float = 40.0 # 反洗时长下限
t_bw_max_s: float = 60.0 # 反洗时长上限
# 奖励权重
w_rec: float = 0.8 # 回收率权重
w_rate: float = 0.2 # 净供水率权重
w_headroom: float = 0.2 # TMP贴边惩罚权重
def simulate_one_supercycle(p: UFParams, L_s: float, t_bw_s: float):
"""
模拟一个完整的超级周期(多个小周期 + 1次CEB)
输入:
p: 系统参数
L_s: 单次产水时长(秒)
t_bw_s: 单次反洗时长(秒)
输出:
(feasible, info)
- feasible: 是否满足所有约束
- info: 性能指标字典
"""
执行流程:
1. 初始化
tmp = TMP0
max_tmp = TMP0
min_tmp = TMP0
2. 计算小周期次数
小周期时长 = L_s + t_bw_s
k_bw_per_ceb = floor(48小时 / 小周期时长)
3. 循环k_bw_per_ceb次(多个小周期)
For i in range(k_bw_per_ceb):
3.1 产水阶段
tmp_start = tmp
Δtmp = model_fp(L_h) # 计算TMP增长
tmp_peak = tmp_start + Δtmp
约束检查1:tmp_peak ≤ TMP_max
If 违反: return False
3.2 反洗阶段
φ = model_bw(L_s, t_bw_s) # 计算去除比例
tmp_after_bw = tmp_peak - φ × (tmp_peak - tmp_start)
约束检查2:(tmp_after_bw - tmp_start) ≤ dTMP
If 违反: return False
3.3 更新TMP
tmp = tmp_after_bw
更新max_tmp和min_tmp
4. CEB阶段
tmp = TMP0 # 完全恢复
5. 计算性能指标
V_feed = k × q_UF × L_h # 总进水
V_loss = k × V_bw + V_ceb # 总损失
V_net = V_feed - V_loss # 净产水
recovery = V_net / V_feed # 回收率
net_rate = V_net / T_super # 净供水率
吨水电耗 = 查表(L_s)
日均产水时间 = (k × L_h / T_super) × 24
6. 贴边检查
headroom_ratio = max_tmp / TMP_max
If headroom_ratio > 0.98: return False
7. 返回结果
return True, {
"recovery": recovery,
"net_delivery_rate_m3ph": net_rate,
"max_TMP_during_filtration": max_tmp,
...
}
约束体系:
| 约束 | 检查点 | 物理含义 |
|---|---|---|
| TMP峰值 ≤ 0.06 MPa | 产水后 | 防止膜破裂 |
| 单次残余增量 ≤ 0.001 MPa | 反洗后 | 控制污染累积速率 |
| TMP贴边 < 98% | 周期结束 | 保留安全余量 |
def _score(p: UFParams, rec: dict) -> float:
# 1. 归一化净供水率
rate_norm = rec["net_delivery_rate_m3ph"] / p.q_UF
# 2. TMP软惩罚(sigmoid)
tmp_ratio = rec["max_TMP"] / p.TMP_max
k = 10.0
headroom_penalty = 1 / (1 + exp(-k × (tmp_ratio - 1.0)))
# 3. 基础奖励(加权和)
base_reward = (
0.8 × recovery
+ 0.2 × rate_norm
- 0.2 × headroom_penalty
)
# 典型范围:0.6 ~ 0.9
# 4. 非线性放大
amplified = (base_reward - 0.5)² × 5.0
# 5. 保留符号
if base_reward < 0.5:
amplified = -amplified
return amplified
奖励设计逻辑:
目标1:高回收率(主要)
- 回收率接近1 → 高奖励
- 权重0.8
目标2:高净供水率(次要)
- 净供水率/进水流量 → 归一化到0-1
- 权重0.2
惩罚:TMP贴边
- TMP接近上限 → sigmoid惩罚
- TMP超过上限 → 惩罚急剧增大
- 权重0.2
非线性变换目的:
- 放大好动作和坏动作的差异
- 让Q值学习更快
- 典型奖励范围:-1.25 ~ 0.8
奖励曲线分析:
base_reward = 0.85 → amplified = (0.85-0.5)²×5 = 0.6125
base_reward = 0.70 → amplified = (0.70-0.5)²×5 = 0.2000
base_reward = 0.50 → amplified = 0.0000
base_reward = 0.30 → amplified = -(0.30-0.5)²×5 = -0.2000
问题: ⚠️ 非线性变换可能导致:
class UFSuperCycleEnv(gym.Env):
"""
Gym标准环境接口
"""
def __init__(self, base_params, max_episode_steps=20):
# 离散动作空间
L_values = arange(3800, 6001, 60) # 37个选项
t_bw_values = arange(40, 61, 5) # 5个选项
self.action_space = Discrete(37 × 5 = 185)
# 连续状态空间(归一化到[0,1])
self.observation_space = Box(
low=0, high=1, shape=(4,)
)
状态定义:
def _get_obs(self):
# 状态向量:[TMP0, last_L, last_t_bw, max_TMP]
return [
(TMP0 - 0.01) / (0.05 - 0.01), # 当前初始TMP
(L_s - 3800) / (6000 - 3800), # 上次产水时长
(t_bw_s - 40) / (60 - 40), # 上次反洗时长
(max_TMP - 0.01) / (0.05 - 0.01) # 本周期最高TMP
]
状态空间分析:
| 维度 | 物理意义 | 归一化范围 | 作用 |
|---|---|---|---|
| TMP0 | 当前初始压差 | [0.01, 0.05] MPa | 主要状态,决定可行动作范围 |
| last_L | 上次产水时长 | [3800, 6000] s | 历史信息,捕捉趋势 |
| last_t_bw | 上次反洗时长 | [40, 60] s | 历史信息,捕捉趋势 |
| max_TMP | 周期最高TMP | [0.01, 0.05] MPa | 安全信息,避免贴边 |
动作映射:
def _get_action_values(self, action):
# action ∈ [0, 184]
L_idx = action // 5 # 过滤时长索引
t_bw_idx = action % 5 # 反洗时长索引
L_s = 3800 + L_idx × 60 # 3800, 3860, ..., 6000
t_bw_s = 40 + t_bw_idx × 5 # 40, 45, 50, 55, 60
return (L_s, t_bw_s)
训练循环:
def reset(self):
# 随机初始TMP(增加训练多样性)
self.TMP0 = uniform(0.01, 0.03)
self.current_step = 0
self.last_action = (3800, 40) # 初始为最保守动作
return self._get_obs()
def step(self, action):
self.current_step += 1
# 1. 解码动作
L_s, t_bw_s = self._get_action_values(action)
# 2. 执行模拟
feasible, info = simulate_one_supercycle(
self.current_params, L_s, t_bw_s
)
# 3. 计算奖励
if feasible:
reward = _score(self.current_params, info)
self.TMP0 = info["TMP_after_ceb"] # 更新状态
terminated = False
else:
reward = -20 # 约束违反大惩罚
terminated = True
# 4. 检查截断
truncated = (self.current_step >= 20)
# 5. 返回
return next_obs, reward, terminated, truncated, info
Episode流程示意:
reset() → TMP0=0.025
↓
step(action=92) → (L=4900, t_bw=50) → reward=0.45 → TMP0=0.025
↓
step(action=105) → (L=5160, t_bw=45) → reward=0.52 → TMP0=0.026
↓
...(最多20步)
↓
truncated=True → episode结束
class DQNParams:
learning_rate = 1e-4 # Adam学习率
buffer_size = 10000 # 经验回放池大小
learning_starts = 200 # 开始学习前的随机探索步数
batch_size = 32 # 每次训练采样数
gamma = 0.95 # 折扣因子
train_freq = 4 # 每4步训练一次
target_update_interval = 2000 # 目标网络更新间隔
exploration_initial_eps = 1.0 # 初始探索率
exploration_fraction = 0.3 # 探索衰减比例
exploration_final_eps = 0.02 # 最终探索率
参数含义详解:
| 参数 | 作用 | 典型值 | 当前值 | 评价 |
|---|---|---|---|---|
| learning_rate | 梯度下降步长 | 1e-4~1e-3 | 1e-4 | ✓ 合理 |
| buffer_size | 经验池容量 | 10k~1M | 10k | ⚠️ 偏小 |
| learning_starts | 预填充步数 | 1k~10k | 200 | ⚠️ 太小 |
| batch_size | SGD批大小 | 32~256 | 32 | ✓ 合理 |
| gamma | 未来奖励折扣 | 0.9~0.99 | 0.95 | ✓ 合理 |
| train_freq | 训练频率 | 1~16 | 4 | ✓ 合理 |
| target_update | 目标网络同步 | 1k~10k | 2000 | ⚠️ 代码冲突 |
| exploration | 探索策略 | 前20-50% | 前30% | ✓ 合理 |
class DQNTrainer:
def __init__(self, env, params, callback=None):
self.env = env
self.params = params
self.callback = callback
# 创建日志目录
self.log_dir = self._create_log_dir()
# 创建DQN模型
self.model = self._create_model()
模型创建:
def _create_model(self):
return DQN(
policy="MlpPolicy", # 多层感知机
env=self.env,
learning_rate=1e-4,
buffer_size=10000,
learning_starts=200,
batch_size=32,
gamma=0.95,
train_freq=4,
# ⚠️ 注意:这里有冲突
target_update_interval=1, # 硬编码为1
tau=0.005, # soft update参数
exploration_initial_eps=1.0,
exploration_fraction=0.3,
exploration_final_eps=0.02,
verbose=1,
tensorboard_log=self.log_dir
)
目标网络更新策略冲突:
# 参数说明的是:
target_update_interval = 2000 # 每2000步硬更新
# 但代码实际使用:
target_update_interval = 1 # 每1步软更新
tau = 0.005 # 软更新系数
# 软更新公式:
θ_target = τ × θ_current + (1-τ) × θ_target
两种更新策略对比:
| 策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 硬更新 | 稳定性好 | 更新滞后 | 经典DQN |
| 软更新 | 平滑收敛 | 可能不稳定 | DDPG/TD3 |
当前代码实际使用软更新,但注释说明是硬更新,存在混淆。
def train(self, total_timesteps: int):
self.model.learn(
total_timesteps=total_timesteps,
callback=self.callback
)
Stable-Baselines3内部流程(简化):
# learn() 内部逻辑
for step in range(total_timesteps):
# 1. ε-贪心选择动作
if random() < epsilon:
action = env.action_space.sample() # 探索
else:
action = argmax(Q_network(state)) # 利用
# 2. 执行动作
next_state, reward, done, info = env.step(action)
# 3. 存入经验池
replay_buffer.add(state, action, reward, next_state, done)
# 4. 训练(每train_freq=4步一次)
if step % 4 == 0 and step > learning_starts:
# 从经验池采样
batch = replay_buffer.sample(batch_size=32)
# 计算TD目标
with torch.no_grad():
q_next = Q_target(next_state).max(dim=1)
target = reward + gamma × q_next × (1 - done)
# 计算当前Q值
q_current = Q_network(state)[action]
# 计算损失
loss = MSE(q_current, target)
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 5. 软更新目标网络(每1步)
Q_target = tau × Q_network + (1-tau) × Q_target
# 6. 衰减epsilon
epsilon = max(
epsilon_final,
epsilon_initial - step / (total_steps × exploration_fraction)
)
# 7. 回调记录
callback.on_step()
# 8. Episode重置
if done:
state = env.reset()
class UFTrainingCallback(BaseCallback):
def __init__(self, recorder, verbose=0):
self.recorder = recorder
def _on_step(self) -> bool:
# 从locals获取当前步信息
obs = self.locals.get("new_obs")[0]
action = self.locals.get("actions")[0]
reward = self.locals.get("rewards")[0]
done = self.locals.get("dones")[0]
info = self.locals.get("infos")[0]
# 记录到recorder
self.recorder.record_step(obs, action, reward, done, info)
# 打印(如果verbose=1)
if self.verbose:
print(f"[Step {self.num_timesteps}] "
f"action={action}, reward={reward:.3f}, done={done}")
return True # 继续训练
记录器UFEpisodeRecorder:
class UFEpisodeRecorder:
def __init__(self):
self.episode_data = [] # 所有episode的记录
self.current_episode = [] # 当前episode的步数据
def record_step(self, obs, action, reward, done, info):
step_data = {
"obs": obs,
"action": action,
"reward": reward,
"done": done,
"info": info
}
self.current_episode.append(step_data)
if done:
self.episode_data.append(self.current_episode)
self.current_episode = []
def get_episode_stats(self, episode_idx=-1):
episode = self.episode_data[episode_idx]
return {
"total_reward": sum(step["reward"] for step in episode),
"avg_recovery": mean([step["info"]["recovery"] for step in episode]),
"feasible_steps": sum(1 for s in episode if s["info"]["feasible"]),
"total_steps": len(episode)
}
┌─────────────────────────────────────────────────────────────┐
│ 训练流程 │
└─────────────────────────────────────────────────────────────┘
1. 初始化阶段
├─ set_global_seed(2025) # 固定随机种子
├─ params = UFParams() # 创建系统参数
├─ env = UFSuperCycleEnv(params) # 创建环境
├─ env = Monitor(env) # 包装监控
├─ env = DummyVecEnv([env]) # 向量化环境
├─ recorder = UFEpisodeRecorder() # 创建记录器
├─ callback = UFTrainingCallback() # 创建回调
└─ model = DQN(...) # 创建DQN模型
2. 训练循环(50000步)
For step = 1 to 50000:
├─ 探索vs利用决策
│ If random() < epsilon(step):
│ action = random([0, 184]) # 探索
│ Else:
│ state_tensor = torch.FloatTensor(state)
│ q_values = Q_network(state_tensor) # [185]
│ action = argmax(q_values) # 利用
│
├─ 执行动作
│ L_s, t_bw_s = decode_action(action)
│ next_state, reward, done, info = env.step(action)
│
├─ 存储经验
│ replay_buffer.add(state, action, reward, next_state, done)
│
├─ 训练网络(每4步,且step > 200)
│ If step % 4 == 0 and step > 200:
│ batch = replay_buffer.sample(32)
│ ├─ 前向传播
│ │ q_current = Q_network(batch.state)[batch.action]
│ │ q_next_max = Q_target(batch.next_state).max()
│ │ target = batch.reward + 0.95 × q_next_max × (1 - batch.done)
│ ├─ 计算损失
│ │ loss = MSE(q_current, target)
│ ├─ 反向传播
│ │ optimizer.zero_grad()
│ │ loss.backward()
│ │ optimizer.step()
│ └─ 软更新目标网络
│ Q_target ← 0.005×Q_network + 0.995×Q_target
│
├─ 衰减epsilon
│ epsilon = max(0.02, 1.0 - step/15000) # 前30%线性衰减
│
├─ 记录数据
│ callback.on_step() # 记录obs, action, reward
│
└─ Episode结束处理
If done or truncated:
state = env.reset() # 重置环境
recorder.save_episode()
3. 保存模型
├─ model.save("dqn_model.zip")
└─ print(statistics)
假设训练50000步,每个episode平均10步:
步数范围 | epsilon | 训练行为 | 说明
------------|---------|-------------------|------------------
0-200 | 1.0 | 纯随机探索 | 预填充经验池
200-15000 | 1.0→0.02| 探索衰减期 | 逐渐从探索转向利用
15000-50000 | 0.02 | 基本利用,2%探索 | 稳定策略优化
训练触发:
- 0-200步:不训练,仅收集经验
- 200-50000步:每4步训练1次 → 共12450次梯度更新
目标网络更新:
- 每1步软更新,tau=0.005
- 相当于每200步目标网络更新约63%
state = env.reset()
# 环境内部执行:
TMP0 = uniform(0.01, 0.03) # 随机初始TMP
current_step = 0
last_action = (3800, 40)
max_TMP = TMP0
obs = [
(TMP0 - 0.01) / 0.04, # 例:0.025 → 0.375
(3800 - 3800) / 2200, # 0.0
(40 - 40) / 20, # 0.0
(TMP0 - 0.01) / 0.04 # 0.375
]
# 1. 动作选择(epsilon=1.0,纯探索)
action = random.randint(0, 184) # 假设选到92
# 2. 解码动作
L_idx = 92 // 5 = 18
t_bw_idx = 92 % 5 = 2
L_s = 3800 + 18×60 = 4880
t_bw_s = 40 + 2×5 = 50
# 3. 模拟执行
simulate_one_supercycle(p, 4880, 50):
L_h = 4880 / 3600 = 1.356h
k_bw = floor(48 / ((4880+50)/3600)) = 35次
For i in range(35):
# 产水
dtmp = 1e-6 × 360^1.1 × 1.356 = 0.00074 MPa
tmp_peak = 0.025 + 0.00074 = 0.02574 MPa
# 检查约束
tmp_peak < 0.06 ✓
# 反洗
phi = 0.836 # 通过model_bw计算
tmp_after = 0.02574 - 0.836×0.00074 = 0.02512 MPa
# 检查约束
residual = 0.02512 - 0.025 = 0.00012 < 0.001 ✓
tmp = 0.02512
# CEB
tmp = 0.025
# 计算指标
V_feed = 35 × 360 × 1.356 = 17089 m³
V_loss = 35 × (1000×50/3600) + 30 = 517 m³
V_net = 16572 m³
recovery = 0.970
net_rate = 16572 / 49.0 = 338.2 m³/h
return True, {recovery: 0.970, net_rate: 338.2, ...}
# 4. 计算奖励
rate_norm = 338.2 / 360 = 0.939
headroom_penalty = 1/(1+exp(-10×(0.02574/0.06-1))) = 0.00 (TMP很低)
base_reward = 0.8×0.970 + 0.2×0.939 - 0.2×0.00 = 0.964
amplified = (0.964-0.5)² × 5 = 1.076
reward = 1.076 # 非常好的奖励!
# 5. 下一状态
TMP0_new = 0.025
obs_new = [0.375, (4880-3800)/2200=0.491, (50-40)/20=0.5, 0.429]
# 6. 存储经验
buffer.add(
state=[0.375, 0.0, 0.0, 0.375],
action=92,
reward=1.076,
next_state=[0.375, 0.491, 0.5, 0.429],
done=False
)
# 7. 不训练(step=1 < 200)
# 此时经验池已有204条经验,开始训练
# 1. 从经验池随机采样32条
batch = buffer.sample(32)
# batch.state: [32, 4]
# batch.action: [32]
# batch.reward: [32]
# batch.next_state: [32, 4]
# batch.done: [32]
# 2. 计算当前Q值
state_tensor = torch.FloatTensor(batch.state) # [32, 4]
q_values = Q_network(state_tensor) # [32, 185]
q_current = q_values.gather(1, batch.action.unsqueeze(1)) # [32, 1]
# 3. 计算目标Q值
with torch.no_grad():
next_q_values = Q_target(batch.next_state) # [32, 185]
next_q_max = next_q_values.max(dim=1).values # [32]
target = batch.reward + 0.95 × next_q_max × (1 - batch.done) # [32]
# 4. 计算TD误差
loss = F.mse_loss(q_current.squeeze(), target)
# 例:loss = 0.523
# 5. 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 6. 软更新目标网络
for param, target_param in zip(Q_network.parameters(), Q_target.parameters()):
target_param.data.copy_(0.005 × param.data + 0.995 × target_param.data)
# Stable-Baselines3的MlpPolicy默认架构
class QNetwork(nn.Module):
def __init__(self, state_dim=4, action_dim=185):
super().__init__()
self.net = nn.Sequential(
nn.Linear(4, 64), # 输入层 → 隐藏层1
nn.ReLU(),
nn.Linear(64, 64), # 隐藏层1 → 隐藏层2
nn.ReLU(),
nn.Linear(64, 185) # 隐藏层2 → 输出层
)
def forward(self, state):
# state: [batch, 4]
return self.net(state) # [batch, 185]
参数量:
Layer 1: 4×64 + 64 = 320
Layer 2: 64×64 + 64 = 4160
Layer 3: 64×185 + 185 = 12025
Total: 16505 参数
def predict(self, observation, epsilon):
if np.random.random() < epsilon:
# 探索:均匀随机
return self.action_space.sample()
else:
# 利用:选择Q值最大的动作
with torch.no_grad():
obs_tensor = torch.FloatTensor(observation).unsqueeze(0)
q_values = self.q_network(obs_tensor)
return q_values.argmax(dim=1).item()
class ReplayBuffer:
def sample(self, batch_size):
# 均匀随机采样
indices = np.random.randint(0, len(self.buffer), size=batch_size)
batch = {
'state': np.array([self.buffer[i][0] for i in indices]),
'action': np.array([self.buffer[i][1] for i in indices]),
'reward': np.array([self.buffer[i][2] for i in indices]),
'next_state': np.array([self.buffer[i][3] for i in indices]),
'done': np.array([self.buffer[i][4] for i in indices])
}
return batch
# 自动记录的指标(由Monitor包装)
- rollout/ep_rew_mean: 平均episode奖励
- rollout/ep_len_mean: 平均episode长度
- time/fps: 训练速度(步/秒)
- train/loss: TD误差
- train/learning_rate: 当前学习率
- train/n_updates: 梯度更新次数
输入数据流:
TMP0 (float) → [归一化] → state[0] (0~1)
↓
├─ last_L_s → state[1]
├─ last_t_bw_s → state[2]
└─ max_TMP → state[3]
state[4] → Q_network → q_values[185]
↓
argmax → action (int)
↓
decode → (L_s, t_bw_s)
↓
simulate_one_supercycle()
↓
┌─────┴──────┐
│ model_fp │ → ΔTMP
│ model_bw │ → φ
└─────┬──────┘
↓
约束检查 → feasible (bool)
↓
指标计算 → {recovery, net_rate, ...}
↓
_score() → reward (float)
↓
ReplayBuffer
main() 入口
↓
set_global_seed(2025)
↓
创建UFParams
↓
创建UFSuperCycleEnv
↓
包装Monitor & DummyVecEnv
↓
创建DQN模型
↓
┌─────────────────────────┐
│ model.learn(50000) │
│ │
│ For step in range: │
│ ├─ select_action │
│ ├─ env.step() │
│ ├─ buffer.add() │
│ ├─ train_network │← 每4步
│ ├─ update_target │← 每1步(软更新)
│ └─ callback() │
│ │
│ If done: env.reset() │
└─────────────────────────┘
↓
model.save("dqn_model.zip")
↓
打印统计信息
↓
结束
主线程:
├─ DQN训练循环
│ ├─ 网络前向传播
│ ├─ 环境交互
│ └─ 网络反向传播
│
├─ 回调线程(可选)
│ └─ TensorBoard写入
│
└─ Monitor包装
└─ 统计信息累积
注意:
- DummyVecEnv是单进程向量化(伪并行)
- 如需真正并行,应使用SubprocVecEnv
- 当前代码未使用多进程/多线程
主要内存占用:
1. 经验回放池:10000 × (4+1+1+4+1) × 4字节 ≈ 440KB
2. Q网络参数:16505 × 4字节 ≈ 66KB
3. 目标网络参数:16505 × 4字节 ≈ 66KB
4. 梯度缓存:约等于参数量 ≈ 66KB
5. 训练batch:32 × (4+4+1) × 4字节 ≈ 1.2KB
总计:约650KB(非常小)
峰值内存:
- 反向传播时临时张量 +100KB
- TensorBoard缓冲 +1MB
- 总峰值 < 2MB
启动方式:
tensorboard --logdir=./uf_dqn_tensorboard
关键曲线:
rollout/ep_rew_mean:episode平均奖励(核心指标)
train/loss:TD误差(训练稳定性)
rollout/ep_len_mean:episode平均长度
# 在callback中添加
rewards_hist = []
def _on_step(self):
rewards_hist.append(self.locals["rewards"][0])
if len(rewards_hist) == 1000:
print(f"Reward分布:min={min(rewards_hist)}, "
f"mean={np.mean(rewards_hist)}, "
f"max={max(rewards_hist)}")
rewards_hist.clear()
constraint_violations = 0
total_steps = 0
def _on_step(self):
global constraint_violations, total_steps
total_steps += 1
if self.locals["rewards"][0] == -20:
constraint_violations += 1
if total_steps % 1000 == 0:
violation_rate = constraint_violations / total_steps
print(f"约束违反率:{violation_rate:.2%}")
# 每1000步记录Q值统计
if step % 1000 == 0:
with torch.no_grad():
sample_states = buffer.sample_states(100)
q_values = model.q_network(sample_states)
print(f"Q值范围:[{q_values.min():.2f}, {q_values.max():.2f}]")
| 参数 | 值 | 影响 |
|---|---|---|
| total_timesteps | 50000 | 训练总步数 |
| buffer_size | 10000 | 经验池大小 |
| learning_rate | 1e-4 | 学习速度 |
| gamma | 0.95 | 长期规划能力 |
| exploration | 1.0→0.02 | 探索能力 |
良好训练的标志:
训练失败的标志: