位置:UF_models.py
问题描述:
class TMPIncreaseModel(torch.nn.Module):
def forward(self, p, L_h):
# 这不是神经网络,只是数学公式!
return float(p.alpha * (p.q_UF ** p.belta) * L_h)
问题分析:
nn.Module但没有定义任何nn.Parameterstate_dict()是空字典验证问题:
model = TMPIncreaseModel()
print(model.state_dict()) # 输出:OrderedDict()
print(list(model.parameters())) # 输出:[]
影响:
评级:🔴 严重问题
位置:DQN_env.py - _get_obs()
当前状态:
state = [
TMP0_norm, # 当前TMP
L_norm, # 上次产水时长
t_bw_norm, # 上次反洗时长
max_TMP_norm # 周期最高TMP
] # 仅4维
缺失信息:
后果:
评级:🟡 中等问题
位置:DQN_env.py - _score()
问题代码:
def _score(p, rec):
base_reward = 0.8 × recovery + 0.2 × rate_norm - 0.2 × headroom_penalty
# 基础奖励范围:0.6 ~ 0.9
# 非线性放大
amplified = (base_reward - 0.5) ** 2 * 5.0
if base_reward < 0.5:
amplified = -amplified
return amplified
问题1:非线性变换过于激进
奖励映射示例: | base_reward | amplified | 倍数变化 | |-------------|-----------|---------| | 0.85 | 0.613 | - | | 0.80 | 0.450 | ↓36% | | 0.75 | 0.313 | ↓31% | | 0.70 | 0.200 | ↓36% |
后果:
问题2:约束违反惩罚不合理
if not feasible:
reward = -20 # 硬编码的大惩罚
分析:
-20与正常奖励(0.2~0.8)相差25-100倍问题3:sigmoid惩罚形式复杂
headroom_penalty = 1 / (1 + exp(-10 × (tmp_ratio - 1.0)))
评级:🟡 中等问题
位置:DQN_env.py - __init__()
问题代码:
class UFSuperCycleEnv:
def __init__(self, base_params, max_episode_steps=20):
self.max_episode_steps = 20 # 固定20步
问题分析:
太短:20步约等于20个超级周期(40-60天)
固定长度:
截断vs终止混淆:
truncated = self.current_step >= 20 # 强制截断
建议:
评级:🟡 中等问题
位置:DQN_train.py - _create_model()
冲突代码:
class DQNParams:
target_update_interval: int = 2000 # 参数说明:每2000步更新
def _create_model(self):
return DQN(
...
target_update_interval=1, # 实际代码:每1步更新
tau=0.005, # 软更新系数
...
)
问题分析:
target_update_interval=1 + tau=0.005 → 软更新soft update(当前实际使用):
θ_target = 0.005 × θ_current + 0.995 × θ_target
hard update(注释说明):
Every 2000 steps: θ_target = θ_current
建议:
# 改为经典DQN的硬更新
target_update_interval=1000, # 每1000步硬更新
tau=1.0, # tau=1表示完全复制
评级:🔴 严重问题
位置:DQN_train.py - DQNParams
问题代码:
buffer_size: int = 10000 # 仅10000条经验
问题分析:
相对于动作空间太小:
经验覆盖率低:
旧经验快速被覆盖:
建议:
buffer_size: int = 50000 # 增加到50000
评级:🟡 中等问题
位置:DQN_train.py - DQNParams
问题代码:
learning_starts: int = 200 # 仅200步随机探索
问题分析:
预填充不足:
早期训练不稳定:
标准实践:
buffer_size × 0.1 = 5000步action_space × 10 = 1850步建议:
learning_starts: int = 5000 # 增加到5000
评级:🟡 中等问题
位置:DQN_env.py - _get_obs()
问题代码:
def _get_obs(self):
TMP0_norm = (TMP0 - 0.01) / (0.05 - 0.01) # 硬编码范围
...
问题分析:
缺乏灵活性:
边界处理不当:
TMP0 = 0.03 # 如果0.03对应的归一化值?
norm = (0.03 - 0.01) / 0.04 = 0.5 # 中间值
不同维度归一化不一致:
建议:
class Normalizer:
def __init__(self):
self.tmp_min = 0.01
self.tmp_max = 0.05
# ...可配置
def normalize_tmp(self, tmp):
return np.clip((tmp - self.tmp_min) / (self.tmp_max - self.tmp_min), 0, 1)
评级:🟢 轻微问题
位置:DQN_env.py - 顶层
问题代码:
# 全局加载(模块导入时执行)
model_fp = TMPIncreaseModel()
model_bw = TMPDecreaseModel()
model_fp.load_state_dict(torch.load("uf_fp.pth"))
model_bw.load_state_dict(torch.load("uf_bw.pth"))
model_fp.eval()
model_bw.eval()
问题分析:
不支持多环境并行:
SubprocVecEnv(多进程),每个进程都会加载路径硬编码:
uf_fp.pth无法动态切换模型:
测试困难:
建议:
class UFSuperCycleEnv:
def __init__(self, base_params, model_dir="./"):
self.model_fp = TMPIncreaseModel()
self.model_bw = TMPDecreaseModel()
self.model_fp.load_state_dict(torch.load(f"{model_dir}/uf_fp.pth"))
self.model_bw.load_state_dict(torch.load(f"{model_dir}/uf_bw.pth"))
评级:🟡 中等问题
位置:DQN_train.py - train()
问题代码:
def train(self, total_timesteps: int):
self.model.learn(total_timesteps=total_timesteps, callback=self.callback)
# 训练结束后才保存一次
问题分析:
训练中断风险:
无法回滚到最佳模型:
难以对比不同阶段:
建议:
from stable_baselines3.common.callbacks import CheckpointCallback
checkpoint_callback = CheckpointCallback(
save_freq=5000, # 每5000步保存
save_path='./checkpoints/',
name_prefix='uf_dqn'
)
model.learn(..., callback=[checkpoint_callback, training_callback])
评级:🟡 中等问题
当前选择:DQN(Deep Q-Network)
DQN特点:
问题分析:
动作空间其实是连续的:
更适合的算法:
| 算法 | 优点 | 缺点 | 适用性 |
|---|---|---|---|
| SAC | 连续动作、样本高效、稳定 | 稍复杂 | ⭐⭐⭐⭐⭐ |
| TD3 | 连续动作、稳定 | 探索能力弱 | ⭐⭐⭐⭐ |
| PPO | 稳定、易调参 | 样本效率低 | ⭐⭐⭐ |
| DQN | 简单 | 连续动作支持差 | ⭐⭐ |
推荐改用SAC:
from stable_baselines3 import SAC
model = SAC(
policy="MlpPolicy",
env=env,
learning_rate=3e-4,
buffer_size=100000,
batch_size=256,
tau=0.005,
gamma=0.99,
verbose=1
)
改用SAC的好处:
评级:🟡 中等问题
当前训练:
def reset(self):
self.TMP0 = uniform(0.01, 0.03) # 固定范围
问题:
curriculum learning思路:
# 阶段1:简单场景(0-10k步)
TMP_range = [0.025, 0.03] # 窄范围
constraint_relaxed = True # 放宽约束
# 阶段2:中等场景(10k-30k步)
TMP_range = [0.02, 0.035]
constraint_relaxed = False
# 阶段3:困难场景(30k-50k步)
TMP_range = [0.01, 0.04] # 全范围
add_noise = True # 增加噪声
实现示例:
class CurriculumEnv(UFSuperCycleEnv):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.difficulty = 1 # 难度等级
def reset(self):
if self.difficulty == 1:
self.TMP0 = uniform(0.025, 0.03)
elif self.difficulty == 2:
self.TMP0 = uniform(0.02, 0.035)
else:
self.TMP0 = uniform(0.01, 0.04)
return super().reset()
def increase_difficulty(self):
self.difficulty = min(3, self.difficulty + 1)
评级:🟢 轻微问题
当前状态:无任何测试代码
关键模块应测试:
# tests/test_env.py
def test_env_reset():
env = UFSuperCycleEnv(UFParams())
obs, info = env.reset()
assert obs.shape == (4,)
assert 0 <= obs.all() <= 1
def test_env_step():
env = UFSuperCycleEnv(UFParams())
env.reset()
obs, reward, done, truncated, info = env.step(0)
assert isinstance(reward, float)
assert isinstance(done, bool)
def test_simulate_feasibility():
p = UFParams()
# 测试可行动作
feasible, info = simulate_one_supercycle(p, 4000, 50)
assert feasible == True
# 测试不可行动作(过长时间)
feasible, info = simulate_one_supercycle(p, 7000, 50)
assert feasible == False
def test_reward_range():
"""测试奖励是否在合理范围"""
rewards = []
for _ in range(1000):
# 采样不同状态和动作
reward = _score(params, info)
rewards.append(reward)
assert min(rewards) > -30 # 避免过大负奖励
assert max(rewards) < 10 # 避免奖励爆炸
评级:🟡 中等问题
当前状态:参数散落在多个类中
建议结构:
# config.yaml
environment:
tmp_range: [0.01, 0.05]
action_range:
L_s: [3800, 6000]
t_bw_s: [40, 60]
constraints:
tmp_max: 0.06
dTMP: 0.001
dqn:
learning_rate: 1e-4
buffer_size: 50000
batch_size: 64
gamma: 0.95
training:
total_timesteps: 100000
checkpoint_freq: 5000
eval_freq: 2000
# config.py
import yaml
from dataclasses import dataclass
@dataclass
class Config:
@staticmethod
def from_yaml(path):
with open(path) as f:
data = yaml.safe_load(f)
return Config(**data)
评级:🟢 轻微问题
当前状态:只有print语句
建议:
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler('training.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 使用
logger.info(f"Episode {ep} - reward: {reward:.3f}")
logger.warning(f"Constraint violation at step {step}")
logger.error(f"Training failed: {error}")
评级:🟢 轻微问题
目标:用真实神经网络替代数学公式
# 收集真实运行数据
data = {
'L_h': [...], # 产水时长
'q_UF': [...], # 流量
'temp': [...], # 温度
'delta_TMP': [...] # 实测TMP增长
}
class RealTMPIncreaseModel(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(10, 64), # 输入:L_h, q_UF, temp, ...
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1) # 输出:delta_TMP
)
def forward(self, features):
return self.net(features)
# 监督学习训练
model = RealTMPIncreaseModel()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
for epoch in range(100):
for batch in data_loader:
pred = model(batch['features'])
loss = F.mse_loss(pred, batch['delta_TMP'])
optimizer.zero_grad()
loss.backward()
optimizer.step()
def _delta_tmp(p, L_h):
features = torch.FloatTensor([
L_h,
p.q_UF,
p.temp,
...
])
with torch.no_grad():
delta = model_fp(features).item()
return delta
收益:
新状态设计:
def _get_obs(self):
state = [
# 基础状态(4维)
TMP0_norm,
L_norm,
t_bw_norm,
max_TMP_norm,
# 水质特征(3维)
turbidity_norm, # 浊度
conductivity_norm, # 电导率
temperature_norm, # 温度
# 历史趋势(4维)
tmp_change_rate, # TMP变化速率
avg_L_last_5, # 最近5次平均产水时长
avg_recovery_last_5, # 最近5次平均回收率
days_since_ceb, # 距上次CEB天数
# 膜状态(2维)
membrane_age, # 膜龄(归一化)
total_cycles, # 总运行周期数
]
return np.array(state, dtype=np.float32) # 13维
实现历史追踪:
class UFSuperCycleEnv:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.history = {
'L_s': deque(maxlen=5),
'recovery': deque(maxlen=5),
'TMP': deque(maxlen=10)
}
def step(self, action):
...
self.history['L_s'].append(L_s)
self.history['recovery'].append(info['recovery'])
self.history['TMP'].append(self.TMP0)
...
收益:
新奖励设计:
def _score_v2(p, rec, constraint_violation=None):
# 1. 基础奖励(保持简单)
recovery_reward = rec['recovery'] # [0.9, 0.98]
rate_reward = rec['net_rate'] / p.q_UF # [0.85, 0.95]
# 2. TMP惩罚(线性,避免非线性)
tmp_penalty = max(0, (rec['max_TMP'] / p.TMP_max - 0.9)) * 2
# TMP<90%上限:无惩罚
# TMP=95%上限:惩罚0.1
# TMP=100%上限:惩罚0.2
# 3. 约束违反惩罚(分级)
if constraint_violation:
if constraint_violation == 'tmp_peak':
penalty = -5 # 峰值超限
elif constraint_violation == 'residual':
penalty = -3 # 残余增量超限
elif constraint_violation == 'headroom':
penalty = -2 # 贴边过度
else:
penalty = 0
# 4. 稳定性奖励(鼓励平稳操作)
stability_bonus = 0
if hasattr(env, 'last_action'):
action_change = abs(current_action - env.last_action)
if action_change < 0.1: # 动作变化小
stability_bonus = 0.05
# 5. 总奖励(加权和,无非线性变换)
reward = (
0.6 * recovery_reward
+ 0.3 * rate_reward
- 0.2 * tmp_penalty
+ penalty
+ stability_bonus
)
return reward
关键改进:
收益:
完整实现:
from stable_baselines3 import SAC
from gymnasium import spaces
class UFSuperCycleEnvContinuous(UFSuperCycleEnv):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 改为连续动作空间
self.action_space = spaces.Box(
low=np.array([0.0, 0.0]), # [L_norm, t_bw_norm]
high=np.array([1.0, 1.0]),
dtype=np.float32
)
def step(self, action):
# 反归一化
L_s = self.L_min + action[0] * (self.L_max - self.L_min)
t_bw_s = self.t_bw_min + action[1] * (self.t_bw_max - self.t_bw_min)
# 其余逻辑相同
...
# 训练
model = SAC(
policy="MlpPolicy",
env=env,
learning_rate=3e-4,
buffer_size=100000,
batch_size=256,
tau=0.005,
gamma=0.99,
ent_coef='auto', # 自动调整熵系数
target_entropy='auto',
verbose=1,
tensorboard_log="./sac_tensorboard/"
)
model.learn(total_timesteps=100000)
SAC优势:
收益:
实现:
from stable_baselines3.common.callbacks import EvalCallback
eval_env = UFSuperCycleEnv(UFParams())
eval_callback = EvalCallback(
eval_env,
best_model_save_path='./best_model/',
log_path='./eval_logs/',
eval_freq=2000, # 每2000步评估一次
n_eval_episodes=10, # 每次评估10个episode
deterministic=True, # 确定性策略评估
render=False
)
model.learn(
total_timesteps=100000,
callback=[eval_callback, checkpoint_callback, training_callback]
)
收益:
阶段1:修复关键bug(1-2天)
1. 修复目标网络更新冲突 → 硬更新
2. 增大buffer_size到50000
3. 增大learning_starts到5000
4. 添加checkpoint保存
5. 修复奖励函数(移除非线性)
阶段2:优化训练(3-5天)
1. 扩展状态空间(添加历史信息)
2. 增加episode长度到50步
3. 实现curriculum learning
4. 添加evaluation循环
阶段3:算法升级(1周)
1. 改用SAC算法
2. 连续动作空间
3. 超参数调优
阶段4:模型升级(2-4周)
1. 收集真实运行数据
2. 训练神经网络物理模型
3. 集成到环境
4. 验证Sim-to-Real性能
新架构设计:
项目结构:
uf_rl_v2/
├── config/
│ ├── env_config.yaml
│ ├── sac_config.yaml
│ └── train_config.yaml
├── models/
│ ├── physics/
│ │ ├── tmp_model.py # 神经网络物理模型
│ │ └── train_physics.py # 物理模型训练脚本
│ ├── policy/
│ │ └── sac_policy.py # SAC策略网络
│ └── reward/
│ └── reward_shaping.py # 奖励函数设计
├── envs/
│ ├── uf_env_v2.py # 重构的环境
│ └── wrappers.py # 环境包装器
├── utils/
│ ├── logger.py # 日志系统
│ ├── callbacks.py # 训练回调
│ └── evaluation.py # 评估工具
├── tests/
│ ├── test_env.py
│ ├── test_physics.py
│ └── test_training.py
├── train.py # 训练入口
└── requirements.txt
核心改进:
总计:半天可完成
总计:1周可完成
总计:1-2周可完成
# 1. 修复目标网络(5分钟)
target_update_interval=1000, tau=1.0
# 2. 简化奖励函数(10分钟)
reward = 0.6*recovery + 0.3*rate - 0.2*tmp_penalty + constraint_penalty
# 3. 增大经验池(1行)
buffer_size=50000, learning_starts=5000
# 4. 添加checkpoint(5分钟)
CheckpointCallback(save_freq=5000, ...)
# 5. 添加evaluation(5分钟)
EvalCallback(eval_freq=2000, ...)
总时间:不到1小时
预期提升:训练稳定性提升50%+,最终性能提升20%+
总时间:3-4周
预期提升:训练效率提升3倍+,策略性能提升50%+,工业可用性大幅提升