| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566 |
- import os
- import time
- import random
- import numpy as np
- import gymnasium as gym
- from gymnasium import spaces
- from stable_baselines3 import DQN
- from stable_baselines3.common.monitor import Monitor
- from stable_baselines3.common.vec_env import DummyVecEnv
- from stable_baselines3.common.callbacks import BaseCallback
- from typing import Dict, Tuple, Optional
- import torch
- import torch.nn as nn
- from dataclasses import dataclass, asdict
- from save_uf_models import TMPIncreaseModel, TMPDecreaseModel # 导入模型类
- import copy
- # ==== 定义膜的基础运行参数 ====
- @dataclass
- class UFParams:
- # —— 膜与运行参数 ——
- q_UF: float = 360.0 # 过滤进水流量(m^3/h)
- TMP0: float = 0.03 # 初始TMP(MPa)
- TMP_max: float = 0.06 # TMP硬上限(MPa)
- # —— 膜污染动力学 ——
- alpha: float = 1e-6 # TMP增长系数
- belta: float = 1.1 # 幂指数
- # —— 反洗参数(固定) ——
- q_bw_m3ph: float = 1000.0 # 物理反洗流量(m^3/h)
- # —— CEB参数(固定) ——
- T_ceb_interval_h: float = 48.0 # 固定每 k 小时做一次CEB
- v_ceb_m3: float = 30.0 # CEB用水体积(m^3)
- t_ceb_s: float = 40 * 60.0 # CEB时长(s)
- phi_ceb: float = 1.0 # CEB去除比例(简化:完全恢复到TMP0)
- # —— 约束与收敛 ——
- dTMP: float = 0.001 # 单次产水结束时,相对TMP0最大升幅(MPa)
- # —— 搜索范围(秒) ——
- L_min_s: float = 3800.0 # 过滤时长下限(s)
- L_max_s: float = 6000.0 # 过滤时长上限(s)
- t_bw_min_s: float = 40.0 # 物洗时长下限(s)
- t_bw_max_s: float = 60.0 # 物洗时长上限(s)
- # —— 物理反洗恢复函数参数 ——
- phi_bw_min: float = 0.7 # 物洗去除比例最小值
- phi_bw_max: float = 1.0 # 物洗去除比例最大值
- L_ref_s: float = 4000.0 # 过滤时长影响时间尺度
- tau_bw_s: float = 20.0 # 物洗时长影响时间尺度
- gamma_t: float = 1.0 # 物洗时长作用指数
- # —— 网格 ——
- L_step_s: float = 60.0 # 过滤时长步长(s)
- t_bw_step_s: float = 5.0 # 物洗时长步长(s)
- # 多目标加权及高TMP惩罚
- w_rec: float = 0.8 # 回收率权重
- w_rate: float = 0.2 # 净供水率权重
- w_headroom: float = 0.2 # 贴边惩罚权重
- r_headroom: float = 2.0 # 贴边惩罚幂次
- headroom_hardcap: float = 0.98 # 超过此比例直接视为不可取
- # ==== 定义强化学习超参数 ====
- @dataclass
- class DQNParams:
- """
- DQN 超参数定义类
- 用于统一管理模型训练参数
- """
- # 学习率,控制神经网络更新步长
- learning_rate: float = 1e-4
- # 经验回放缓冲区大小(步数)
- buffer_size: int = 10000
- # 学习开始前需要收集的步数
- learning_starts: int = 200
- # 每次从经验池中采样的样本数量
- batch_size: int = 32
- # 折扣因子,越接近1越重视长期奖励
- gamma: float = 0.95
- # 每隔多少步训练一次
- train_freq: int = 4
- # 目标网络更新间隔
- target_update_interval: int = 2000
- # 初始探索率 ε
- exploration_initial_eps: float = 1.0
- # 从初始ε衰减到最终ε所占的训练比例
- exploration_fraction: float = 0.3
- # 最终探索率 ε
- exploration_final_eps: float = 0.02
- # 日志备注(用于区分不同实验)
- remark: str = "default"
- # ==== 加载模拟环境模型 ====
- # 初始化模型
- model_fp = TMPIncreaseModel()
- model_bw = TMPDecreaseModel()
- # 加载参数
- model_fp.load_state_dict(torch.load("uf_fp.pth"))
- model_bw.load_state_dict(torch.load("uf_bw.pth"))
- # 切换到推理模式
- model_fp.eval()
- model_bw.eval()
- def _delta_tmp(p, L_h: float) -> float:
- """
- 过滤时段TMP上升量:调用 uf_fp.pth 模型
- """
- return model_fp(p, L_h)
- def phi_bw_of(p, L_s: float, t_bw_s: float) -> float:
- """
- 物洗去除比例:调用 uf_bw.pth 模型
- """
- return model_bw(p, L_s, t_bw_s)
- def _tmp_after_ceb(p, L_s: float, t_bw_s: float) -> float:
- """
- 计算化学清洗(CEB)后的TMP,当前为恢复初始跨膜压差
- """
- return p.TMP0
- def _v_bw_m3(p, t_bw_s: float) -> float:
- """
- 物理反洗水耗
- """
- return float(p.q_bw_m3ph * (float(t_bw_s) / 3600.0))
- def simulate_one_supercycle(p: UFParams, L_s: float, t_bw_s: float):
- """
- 返回 (是否可行, 指标字典)
- - 支持动态CEB次数:48h固定间隔
- - 增加日均产水时间和吨水电耗
- - 增加最小TMP记录
- """
- L_h = float(L_s) / 3600.0 # 小周期过滤时间(h)
- tmp = p.TMP0
- max_tmp_during_filtration = tmp
- min_tmp_during_filtration = tmp # 新增:初始化最小TMP
- max_residual_increase = 0.0
- # 小周期总时长(h)
- t_small_cycle_h = (L_s + t_bw_s) / 3600.0
- # 计算超级周期内CEB次数
- k_bw_per_ceb = int(np.floor(p.T_ceb_interval_h / t_small_cycle_h))
- if k_bw_per_ceb < 1:
- k_bw_per_ceb = 1 # 至少一个小周期
- # ton水电耗查表
- energy_lookup = {
- 3600: 0.1034, 3660: 0.1031, 3720: 0.1029, 3780: 0.1026,
- 3840: 0.1023, 3900: 0.1021, 3960: 0.1019, 4020: 0.1017,
- 4080: 0.1015, 4140: 0.1012, 4200: 0.1011
- }
- for _ in range(k_bw_per_ceb):
- tmp_run_start = tmp
- # 过滤阶段TMP增长
- dtmp = _delta_tmp(p, L_h)
- tmp_peak = tmp_run_start + dtmp
- # 约束1:峰值不得超过硬上限
- if tmp_peak > p.TMP_max + 1e-12:
- return False, {"reason": "TMP_max violated during filtration", "TMP_peak": tmp_peak}
- # 更新最大和最小TMP
- if tmp_peak > max_tmp_during_filtration:
- max_tmp_during_filtration = tmp_peak
- if tmp_run_start < min_tmp_during_filtration: # 新增:记录运行开始时的最小TMP
- min_tmp_during_filtration = tmp_run_start
- # 物理反洗
- phi = phi_bw_of(p, L_s, t_bw_s)
- tmp_after_bw = tmp_peak - phi * (tmp_peak - tmp_run_start)
- # 约束2:单次残余增量控制
- residual_inc = tmp_after_bw - tmp_run_start
- if residual_inc > p.dTMP + 1e-12:
- return False, {
- "reason": "residual TMP increase after BW exceeded dTMP",
- "residual_increase": residual_inc,
- "limit_dTMP": p.dTMP
- }
- if residual_inc > max_residual_increase:
- max_residual_increase = residual_inc
- tmp = tmp_after_bw
- # CEB
- tmp_after_ceb = p.TMP0
- # 体积与回收率
- V_feed_super = k_bw_per_ceb * p.q_UF * L_h
- V_loss_super = k_bw_per_ceb * _v_bw_m3(p, t_bw_s) + p.v_ceb_m3
- V_net = max(0.0, V_feed_super - V_loss_super)
- recovery = max(0.0, V_net / max(V_feed_super, 1e-12))
- # 时间与净供水率
- T_super_h = k_bw_per_ceb * (L_s + t_bw_s) / 3600.0 + p.t_ceb_s / 3600.0
- net_delivery_rate_m3ph = V_net / max(T_super_h, 1e-12)
- # 贴边比例与硬限
- headroom_ratio = max_tmp_during_filtration / max(p.TMP_max, 1e-12)
- if headroom_ratio > p.headroom_hardcap + 1e-12:
- return False, {"reason": "headroom hardcap exceeded", "headroom_ratio": headroom_ratio}
- # —— 新增指标 1:日均产水时间(h/d) ——
- daily_prod_time_h = k_bw_per_ceb * L_h / T_super_h * 24.0
- # —— 新增指标 2:吨水电耗(kWh/m³) ——
- closest_L = min(energy_lookup.keys(), key=lambda x: abs(x - L_s))
- ton_water_energy = energy_lookup[closest_L]
- info = {
- "recovery": recovery,
- "V_feed_super_m3": V_feed_super,
- "V_loss_super_m3": V_loss_super,
- "V_net_super_m3": V_net,
- "supercycle_time_h": T_super_h,
- "net_delivery_rate_m3ph": net_delivery_rate_m3ph,
- "max_TMP_during_filtration": max_tmp_during_filtration,
- "min_TMP_during_filtration": min_tmp_during_filtration, # 新增:最小TMP
- "max_residual_increase_per_run": max_residual_increase,
- "phi_bw_effective": phi,
- "TMP_after_ceb": tmp_after_ceb,
- "headroom_ratio": headroom_ratio,
- "daily_prod_time_h": daily_prod_time_h,
- "ton_water_energy_kWh_per_m3": ton_water_energy,
- "k_bw_per_ceb": k_bw_per_ceb
- }
- return True, info
- def _score(p: UFParams, rec: dict) -> float:
- """综合评分:越大越好。通过非线性放大奖励差异,强化区分好坏动作"""
- # —— 无量纲化净供水率 ——
- rate_norm = rec["net_delivery_rate_m3ph"] / max(p.q_UF, 1e-12)
- # —— TMP soft penalty (sigmoid) ——
- tmp_ratio = rec["max_TMP_during_filtration"] / max(p.TMP_max, 1e-12)
- k = 10.0
- headroom_penalty = 1.0 / (1.0 + np.exp(-k * (tmp_ratio - 1.0)))
- # —— 基础 reward(0.6~0.9左右)——
- base_reward = (
- p.w_rec * rec["recovery"]
- + p.w_rate * rate_norm
- - p.w_headroom * headroom_penalty
- )
- # —— 非线性放大:平方映射 + 缩放 ——
- # 目的是放大好坏动作差异,同时限制最大值,避免 TD-error 过大
- amplified_reward = (base_reward - 0.5) ** 2 * 5.0
- # —— 可选:保留符号,区分负奖励
- if base_reward < 0.5:
- amplified_reward = -amplified_reward
- return amplified_reward
- def set_global_seed(seed: int):
- """固定全局随机种子,保证训练可复现"""
- random.seed(seed)
- np.random.seed(seed)
- torch.manual_seed(seed)
- torch.cuda.manual_seed_all(seed) # 如果使用GPU
- torch.backends.cudnn.deterministic = True
- torch.backends.cudnn.benchmark = False
- class UFSuperCycleEnv(gym.Env):
- """超滤系统环境(超级周期级别决策)"""
- metadata = {"render_modes": ["human"]}
- def __init__(self, base_params, max_episode_steps: int = 20):
- super(UFSuperCycleEnv, self).__init__()
- self.base_params = base_params
- self.current_params = copy.deepcopy(base_params)
- self.max_episode_steps = max_episode_steps
- self.current_step = 0
- # 计算离散动作空间
- self.L_values = np.arange(
- self.base_params.L_min_s,
- self.base_params.L_max_s + self.base_params.L_step_s,
- self.base_params.L_step_s
- )
- self.t_bw_values = np.arange(
- self.base_params.t_bw_min_s,
- self.base_params.t_bw_max_s + self.base_params.t_bw_step_s,
- self.base_params.t_bw_step_s
- )
- self.num_L = len(self.L_values)
- self.num_bw = len(self.t_bw_values)
- # 单一离散动作空间
- self.action_space = spaces.Discrete(self.num_L * self.num_bw)
- # 状态空间增加 TMP0, 上一次动作(L_s, t_bw_s), 本周期最高 TMP
- # 状态归一化均在 _get_obs 内处理
- self.observation_space = spaces.Box(
- low=np.zeros(4, dtype=np.float32),
- high=np.ones(4, dtype=np.float32),
- dtype=np.float32
- )
- # 初始化状态
- self.last_action = (self.base_params.L_min_s, self.base_params.t_bw_min_s)
- self.max_TMP_during_filtration = self.current_params.TMP0
- self.reset(seed=None)
- def _get_obs(self):
- TMP0 = self.current_params.TMP0
- TMP0_norm = (TMP0 - 0.01) / (0.05 - 0.01)
- L_s, t_bw_s = self.last_action
- L_norm = (L_s - self.base_params.L_min_s) / (self.base_params.L_max_s - self.base_params.L_min_s)
- t_bw_norm = (t_bw_s - self.base_params.t_bw_min_s) / (self.base_params.t_bw_max_s - self.base_params.t_bw_min_s)
- max_TMP_norm = (self.max_TMP_during_filtration - 0.01) / (0.05 - 0.01)
- return np.array([TMP0_norm, L_norm, t_bw_norm, max_TMP_norm], dtype=np.float32)
- def _get_action_values(self, action):
- L_idx = action // self.num_bw
- t_bw_idx = action % self.num_bw
- return self.L_values[L_idx], self.t_bw_values[t_bw_idx]
- def reset(self, seed=None, options=None):
- super().reset(seed=seed)
- self.current_params.TMP0 = np.random.uniform(0.01, 0.03)
- self.current_step = 0
- self.last_action = (self.base_params.L_min_s, self.base_params.t_bw_min_s)
- self.max_TMP_during_filtration = self.current_params.TMP0
- return self._get_obs(), {}
- def step(self, action):
- self.current_step += 1
- L_s, t_bw_s = self._get_action_values(action)
- L_s = np.clip(L_s, self.base_params.L_min_s, self.base_params.L_max_s)
- t_bw_s = np.clip(t_bw_s, self.base_params.t_bw_min_s, self.base_params.t_bw_max_s)
- # 模拟超级周期
- feasible, info = simulate_one_supercycle(self.current_params, L_s, t_bw_s)
- if feasible:
- reward = _score(self.current_params, info)
- self.current_params.TMP0 = info["TMP_after_ceb"]
- self.max_TMP_during_filtration = info["max_TMP_during_filtration"]
- terminated = False
- else:
- reward = -20
- terminated = True
- truncated = self.current_step >= self.max_episode_steps
- self.last_action = (L_s, t_bw_s)
- next_obs = self._get_obs()
- info["feasible"] = feasible
- info["step"] = self.current_step
- return next_obs, reward, terminated, truncated, info
- class UFEpisodeRecorder:
- """记录episode中的决策和结果"""
- def __init__(self):
- self.episode_data = []
- self.current_episode = []
- def record_step(self, obs, action, reward, done, info):
- """记录一步"""
- step_data = {
- "obs": obs.copy(),
- "action": action.copy(),
- "reward": reward,
- "done": done,
- "info": info.copy() if info else {}
- }
- self.current_episode.append(step_data)
- if done:
- self.episode_data.append(self.current_episode)
- self.current_episode = []
- def get_episode_stats(self, episode_idx=-1):
- """获取episode统计信息"""
- if not self.episode_data:
- return {}
- episode = self.episode_data[episode_idx]
- total_reward = sum(step["reward"] for step in episode)
- avg_recovery = np.mean([step["info"].get("recovery", 0) for step in episode if "recovery" in step["info"]])
- feasible_steps = sum(1 for step in episode if step["info"].get("feasible", False))
- return {
- "total_reward": total_reward,
- "avg_recovery": avg_recovery,
- "feasible_steps": feasible_steps,
- "total_steps": len(episode)
- }
- class UFTrainingCallback(BaseCallback):
- """
- PPO 训练回调,用于记录每一步的数据到 recorder。
- 相比原来的 RecordingCallback,更加合理和健壮:
- 1. 不依赖环境内部 last_* 属性
- 2. 使用 PPO 提供的 obs、actions、rewards、dones、infos
- 3. 自动处理 episode 结束时的统计
- """
- def __init__(self, recorder, verbose=0):
- super(UFTrainingCallback, self).__init__(verbose)
- self.recorder = recorder
- def _on_step(self) -> bool:
- try:
- new_obs = self.locals.get("new_obs")
- actions = self.locals.get("actions")
- rewards = self.locals.get("rewards")
- dones = self.locals.get("dones")
- infos = self.locals.get("infos")
- if len(new_obs) > 0:
- step_obs = new_obs[0]
- step_action = actions[0] if actions is not None else None
- step_reward = rewards[0] if rewards is not None else 0.0
- step_done = dones[0] if dones is not None else False
- step_info = infos[0] if infos is not None else {}
- # 打印当前 step 的信息
- if self.verbose:
- print(f"[Step {self.num_timesteps}] 动作={step_action}, 奖励={step_reward:.3f}, Done={step_done}")
- # 记录数据
- self.recorder.record_step(
- obs=step_obs,
- action=step_action,
- reward=step_reward,
- done=step_done,
- info=step_info,
- )
- except Exception as e:
- if self.verbose:
- print(f"[Callback Error] {e}")
- return True
- class DQNTrainer:
- def __init__(self, env, params, callback=None):
- self.env = env
- self.params = params
- self.callback = callback
- self.log_dir = self._create_log_dir()
- self.model = self._create_model()
- def _create_log_dir(self):
- timestamp = time.strftime("%Y%m%d-%H%M%S")
- log_name = (
- f"DQN_lr{self.params.learning_rate}_buf{self.params.buffer_size}_bs{self.params.batch_size}"
- f"_gamma{self.params.gamma}_exp{self.params.exploration_fraction}"
- f"_{self.params.remark}_{timestamp}"
- )
- log_dir = os.path.join("./uf_dqn_tensorboard", log_name)
- os.makedirs(log_dir, exist_ok=True)
- return log_dir
- def _create_model(self):
- return DQN(
- policy="MlpPolicy",
- env=self.env,
- learning_rate=self.params.learning_rate,
- buffer_size=self.params.buffer_size, # 大缓冲保证经验多样性
- learning_starts=self.params.learning_starts,
- batch_size=self.params.batch_size,
- gamma=self.params.gamma,
- train_freq=self.params.train_freq,
- target_update_interval=1,
- tau=0.005,
- exploration_initial_eps=self.params.exploration_initial_eps,
- exploration_fraction=self.params.exploration_fraction,
- exploration_final_eps=self.params.exploration_final_eps,
- verbose=1,
- tensorboard_log=self.log_dir
- # 不再指定 replay_buffer_class,默认使用 ReplayBuffer
- )
- def train(self, total_timesteps: int):
- if self.callback:
- self.model.learn(total_timesteps=total_timesteps, callback=self.callback)
- else:
- self.model.learn(total_timesteps=total_timesteps)
- print(f"模型训练完成,日志保存在:{self.log_dir}")
- def save(self, path=None):
- if path is None:
- path = os.path.join(self.log_dir, "dqn_model.zip")
- self.model.save(path)
- print(f"模型已保存到:{path}")
- def load(self, path):
- self.model = DQN.load(path, env=self.env)
- print(f"模型已从 {path} 加载")
- def train_uf_rl_agent(params: UFParams, total_timesteps: int = 10000, seed: int = 2025):
- set_global_seed(seed)
- recorder = UFEpisodeRecorder()
- callback = UFTrainingCallback(recorder, verbose=1)
- def make_env():
- env = UFSuperCycleEnv(params)
- env = Monitor(env)
- return env
- env = DummyVecEnv([make_env])
- dqn_params = DQNParams()
- trainer = DQNTrainer(env, dqn_params, callback=callback)
- trainer.train(total_timesteps)
- trainer.save()
- stats = callback.recorder.get_episode_stats()
- print(f"训练完成 - 总奖励: {stats.get('total_reward', 0):.2f}, 平均回收率: {stats.get('avg_recovery', 0):.3f}")
- return trainer.model
- # 训练和测试示例
- if __name__ == "__main__":
- # 初始化参数
- params = UFParams()
- # 训练RL代理
- print("开始训练RL代理...")
- train_uf_rl_agent(params, total_timesteps=50000)
|