""" 超滤强化学习环境模块 ======================== 本模块定义了超滤系统的强化学习环境,包括: 1. UFParams: 超滤系统参数配置类 2. 膜阻力与跨膜压差转换函数 3. simulate_one_supercycle: 超级周期模拟函数 4. calculate_reward: 奖励函数 5. is_dead_cycle: 失败判定函数 6. UFSuperCycleEnv: Gymnasium环境类 模块设计说明: - 基于 Gymnasium (原OpenAI Gym) 标准接口 - 模拟超滤膜的"超级周期"运行(多次物理反洗 + 一次化学反洗) - 强化学习智能体通过优化过滤时长和反洗时长来最大化回收率并控制污染累积 """ import os import torch import numpy as np import gymnasium as gym from gymnasium import spaces from uf_train.env.env_params import UFState, UFPhysicsParams, UFStateBounds, UFRewardParams, UFActionSpec from uf_train.env.uf_physics import UFPhysicsModel from uf_train.env.env_reset import ResetSampler import copy class UFSuperCycleEnv(gym.Env): """ 超滤系统强化学习环境(Gymnasium标准接口) 功能: - 模拟超滤膜的超级周期运行 - 智能体在每个超级周期选择过滤时长和反洗时长 - 目标:最大化回收率同时控制污染累积 状态空间 (8维,归一化到 [0,1]): 1. TMP0: 初始跨膜压差 2. q_UF: 过滤流量 3. temp: 水温 4. R0: 初始膜阻力 5. nuK: 短期污染系数 6. slope: 长期污染斜率 7. power: 长期污染幂次 8. ceb_removal: CEB去除能力 动作空间 (离散): - 二维离散动作组合:(过滤时长, 反洗时长) - 过滤时长: L_min_s ~ L_max_s,步长 L_step_s - 反洗时长: t_bw_min_s ~ t_bw_max_s,步长 t_bw_step_s - 总动作数 = len(L_values) × len(t_bw_values) 奖励机制: - 基于回收率和残余污染的平衡 - 失败 (TMP超限、回收率过低、污染过快) 时给予大负奖励 (-10) 终止条件: - terminated: 违反运行约束(失败) - truncated: 达到最大步数 (max_episode_steps) """ metadata = {"render_modes": ["human"]} def __init__( self, physics: UFPhysicsModel, reward_params: UFRewardParams, action_spec:UFActionSpec, statebounds:UFStateBounds, real_state_pool, max_episode_steps: int = 45, RANDOM_SEED = 1024 ): """ 超滤强化学习环境 参数: physics(UFPhysicsModel): 超滤物理模型 reward_params(UFRewardParams): 奖励函数参数 max_episode_steps (int): 每个episode的最大步数,默认45 注:每步代表一个超级周期(约2-3天),45步约三个月 """ super(UFSuperCycleEnv, self).__init__() self.RANDOM_SEED = RANDOM_SEED self.physics = physics self.reward_params = reward_params self.max_episode_steps = max_episode_steps self.current_step = 0 # -------- 动作空间 -------- self.action_spec = action_spec self.L_values = np.arange( self.action_spec.L_min_s, self.action_spec.L_max_s + self.action_spec.L_step_s, self.action_spec.L_step_s, ) self.t_bw_values = np.arange( self.action_spec.t_bw_min_s, self.action_spec.t_bw_max_s + self.action_spec.t_bw_step_s, self.action_spec.t_bw_step_s, ) self.num_L = len(self.L_values) self.num_bw = len(self.t_bw_values) self.action_space = spaces.Discrete(self.num_L * self.num_bw) # -------- 状态空间 -------- self.observation_space = spaces.Box( low=0.0, high=1.0, shape=(8,), dtype=np.float32, ) self.state_bounds = statebounds # 状态边界 self.real_state_pool = real_state_pool self.reset_sampler = ResetSampler( bounds=self.state_bounds, physics=physics, real_state_pool=self.real_state_pool, max_resample_attempts=50, random_state=np.random.RandomState(RANDOM_SEED) ) def _generate_initial_state(self) -> UFState | None: """ 在 UFStateBounds 定义的范围内采样一个【合法】初始状态。 若采样失败(约束不满足)返回 None,由 reset() 负责重试。 """ b = self.state_bounds A = 128 * 40.0 # 有效膜面积 # ---- 1. 基础工况 ---- # ---- 随机生成 TMP、q_UF、温度 ---- TMP0 = np.random.uniform(b.TMP0_min, b.TMP0_max) q_UF = np.random.uniform(b.q_UF_min, b.q_UF_max) temp = np.random.uniform(b.temp_min, b.temp_max) # ---- 2. 污染增长参数 ---- slope = np.random.uniform(b.slope_min, b.slope_max) power = np.random.uniform(b.power_min, b.power_max) # ---- 3. 约束:污染增长速率可实现 ---- t_max = 60 if power >= 1 else 1 required_nuK_min = slope * power * (t_max ** (power - 1)) * (A / q_UF) # 若 required_nuK_min 超过可选范围 → 初始状态非法 if required_nuK_min > b.nuK_max: return None # 在可行范围中采样 nuK nuK = np.random.uniform( max(required_nuK_min, b.nuK_min), b.nuK_max ) # ---- 4. CEB 去除率 ---- ceb_removal = np.random.uniform( b.ceb_removal_min, b.ceb_removal_max ) # ---- 5. 初始膜阻力(物理模型) ---- R0 = self.physics.calculate_initial_resistance( TMP=TMP0, q_UF=q_UF, temp=temp ) return UFState( TMP=TMP0, q_UF=q_UF, temp=temp, R=R0, slope=slope, power=power, nuK=nuK, ceb_removal=ceb_removal, ) def _get_training_progress(self) -> float: """ 返回训练进度,用于 reset_sampler 的 curriculum sampling """ return min(1.0, self.current_step / self.max_episode_steps ) def reset(self, seed=None, options=None, max_attempts: int = 1000): super().reset(seed=seed) progress = self._get_training_progress() for _ in range(max_attempts): state = self.reset_sampler.sample(progress) if state is None: continue ok_run = self.physics.check_dead_initial_state( init_state=state, max_steps=self.max_episode_steps, L_s=self.action_spec.L_min_s, t_bw_s=self.action_spec.t_bw_max_s ) if ok_run: self.state = state break else: raise RuntimeError("无法生成可行初始状态") self.current_step = 0 self.last_action = None self.max_TMP_during_filtration = self.state.TMP return self.get_obs(self.state), {} def _get_state_copy(self): return copy.deepcopy(self.state) def get_obs(self, state): """ 构建当前环境归一化状态向量 """ # === 1. 从 state 读取动态参数 === TMP = state.TMP q_UF = state.q_UF temp = state.temp # === 2. 计算本周期初始膜阻力 === R = state.R # === 3. 从 self.state 读取膜阻力增长模型参数 === nuk = state.nuK slope = state.slope power = state.power ceb_removal = state.ceb_removal # === 4. 从 current_params 动态读取上下限 === TMP0_min, TMP0_max = self.state_bounds.TMP0_min, self.state_bounds.global_TMP_hard_limit q_UF_min, q_UF_max = self.state_bounds.q_UF_min, self.state_bounds.q_UF_max temp_min, temp_max = self.state_bounds.temp_min, self.state_bounds.temp_max nuK_min, nuK_max = self.state_bounds.nuK_min, self.state_bounds.nuK_max slope_min, slope_max = self.state_bounds.slope_min, self.state_bounds.slope_max power_min, power_max = self.state_bounds.power_min, self.state_bounds.power_max ceb_min, ceb_max = self.state_bounds.ceb_removal_min, self.state_bounds.ceb_removal_max # === 5. 归一化计算(clip防止越界) === TMP0_norm = np.clip((TMP - TMP0_min) / (TMP0_max - TMP0_min), 0, 1) q_UF_norm = np.clip((q_UF - q_UF_min) / (q_UF_max - q_UF_min), 0, 1) temp_norm = np.clip((temp - temp_min) / (temp_max - temp_min), 0, 1) # R0 不在 current_params 中定义上下限,设定经验范围 R0_norm = np.clip((R - 100.0) / (800.0 - 100.0), 0, 1) short_term_norm = np.clip((nuk - nuK_min) / (nuK_max - nuK_min), 0, 1) long_term_slope_norm = np.clip((slope - slope_min) / (slope_max - slope_min), 0, 1) long_term_power_norm = np.clip((power - power_min) / (power_max - power_min), 0, 1) ceb_removal_norm = np.clip((ceb_removal - ceb_min) / (ceb_max - ceb_min), 0, 1) # === 6. 构建观测向量 === obs = np.array([ TMP0_norm, q_UF_norm, temp_norm, R0_norm, short_term_norm, long_term_slope_norm, long_term_power_norm, ceb_removal_norm ], dtype=np.float32) return obs def get_action_values(self, action): """ 将动作还原为实际时长 """ L_idx = action // self.num_bw t_bw_idx = action % self.num_bw return self.L_values[L_idx], self.t_bw_values[t_bw_idx] def step(self, action): self.current_step += 1 L_s, t_bw_s = self.get_action_values(action) L_s = np.clip(L_s, self.action_spec.L_min_s, self.action_spec.L_max_s) t_bw_s = np.clip(t_bw_s, self.action_spec.t_bw_min_s, self.action_spec.t_bw_max_s) # 模拟超级周期 info, next_state = self.physics.simulate_one_supercycle(state=self.state,L_s=L_s, t_bw_s=t_bw_s) # 根据 info 判断是否成功 feasible = self.physics.is_dead_cycle(info) # True 表示成功循环,False 表示失败 if feasible: # 每步奖励 reward, rec_reward, energy_reward, res_penalty = self._calculate_reward(info) info["rec_reward"] = rec_reward info["energy_reward"] = energy_reward info["res_penalty"] = res_penalty self.state = next_state terminated = False else: # 中途失败惩罚 reward = -10 info["rec_reward"] = None info["energy_reward"] = None info["res_penalty"] = None terminated = True # 判断是否到达最大步数 truncated = self.current_step >= self.max_episode_steps self.last_action = (L_s, t_bw_s) next_obs = self.get_obs() info["feasible"] = feasible info["step"] = self.current_step info["L_s"] = L_s.copy() info["t_bw_s"] = t_bw_s.copy() # # ===================== 测试终末奖励:鼓励 TMP 接近初始状态 ===================== # # 仅在 episode 自然结束(满步但未提前失败)时触发 # if truncated and not terminated: # TMP_initial = self.TMP0 # reset 时记录的初始 TMP # TMP_final = next_obs[0] # next_obs 提供的最终 TMP # # delta_ratio = abs((TMP_final - TMP_initial) / TMP_initial) # # alpha = 4.0 # TMP 偏差敏感度 # gamma = 5.0 # 奖励幅度 # stability_reward = gamma * (np.exp(-alpha * delta_ratio) - 1) # 量级在0到-5之间 # # reward += stability_reward # terminated = True # episode 正式结束 # # ===================== 测试结果 ===================== # 增加该奖励后强化学习依然能保证奖励收敛,但是损失函数在2-3之间反复震荡,无法降低,见reward_test&loss_test # 原设想是只能听在大额偏移发生前能通过该奖励学习到提前减小偏移步伐,但是实际训练时该惩罚反复被触发 # 推测是终末的大额奖惩无法有效传递回过往时间步引导智能体学习,可能由于状态中缺少预测值,智能体会将其观测为不可控事件,暂时不添加该奖励,TODO:等待优化 return next_obs, reward, terminated, truncated, info def _calculate_reward(self, info: dict) -> float: """ 计算强化学习奖励函数(扩展版) 功能: - 平衡回收率、残余污染和吨水电耗三个目标 - TMP不直接参与奖励计算(通过失败判定间接影响) - 使用 tanh 函数实现平滑的非线性奖励 参数: info (dict): 周期性能指标字典,需包含 - recovery: 回收率 [0-1] - R_after_ceb: 本周期结束膜阻力 - initial_R: 本周期初始膜阻力 - delta_R_allow: 本周期允许最大阻力上升 - ton_water_energy_kWh_per_m3: 本周期吨水电耗 返回: float: 奖励值(通常在 -3 到 +3 之间) 设计思想: - 高回收率 → 水资源利用率高 → 正奖励 - 低残余污染 → 膜长期稳定运行 → 正奖励 - 低吨水电耗 → 节能 → 正奖励 - 三者需要权衡:过短的过滤时间提高回收率但污染去除不彻底;过长时间污染控制好但回收率下降,过高功率增加耗能 参考点设计: - 残余污染: - 高污染参考点 = 1 / self.max_episode_steps - 平衡点 = 0.5 / self.max_episode_steps - 吨水电耗: - 高点 = 0.1034 kWh/m³ - 平衡点 = 0.1011 kWh/m³ - 低点 = 0.0993 kWh/m³ - 回收率参考点保持原有设计 """ # ========== 提取性能指标 ========== recovery = info["recovery"] # 回收率 [0-1] # 污染比例:实际上升的阻力 / 允许上升的阻力 # 允许上升的阻力值 = 当前阻力值软上限 - 当前阻力 residual_ratio = info['residual_ratio'] # 吨水电耗指标 energy = info["ton_water_energy_kWh_per_m3"] # ========== 回收率奖励项 ========== # 将回收率归一化到 [0, 1] 区间(基于预期范围) rec_norm = (recovery - self.reward_params.rec_low) / (self.reward_params.rec_high - self.reward_params.rec_low) # 使用 tanh 函数构建平滑的 S 型奖励曲线 # - rec_norm = 0.5 时(回收率处于中间值),rec_reward = 0 # - rec_norm > 0.5 时,rec_reward > 0(鼓励高回收率) # - rec_norm < 0.5 时,rec_reward < 0(惩罚低回收率) # - k_rec 控制曲线陡峭程度,越大变化越陡 rec_reward = np.clip(np.tanh(self.reward_params.k_rec * (rec_norm - 0.5)), -1, 1) # ========== 残余污染惩罚项 ========== # 新参考点:每步允许上升比例 = 1 / max_episode_steps # 平衡点 = 0.5 / max_episode_steps ref_residual = 0.5 / self.max_episode_steps # 使用 tanh 构建惩罚曲线 # - residual_ratio < 平衡点时,res_penalty > 0(奖励低污染) # - residual_ratio > 平衡点时,res_penalty < 0(惩罚高污染) # - k_res 控制曲线陡峭程度 res_penalty = -np.tanh(self.reward_params.k_res * (residual_ratio / ref_residual - 1)) # ========== 吨水电耗奖励项 ========== # 设置高/平衡/低点 energy_low = 0.0993 energy_high = 0.1034 # 将能耗归一化到 [0, 1],平衡点对应 energy_norm = 0.5 energy_norm = (energy - energy_low) / (energy_high - energy_low) # 使用 tanh 构建平滑奖励 # - energy_norm < 0.5 时,energy_reward > 0(节能奖励) # - energy_norm > 0.5 时,energy_reward < 0(高能耗惩罚) # - k_energy 控制曲线陡峭程度 energy_reward = -np.tanh(self.reward_params.k_energy * (energy_norm - 0.5)) # ========== 组合奖励 ========== # 简单线性组合三项(为污染项加权) total_reward = rec_reward + 2.0 * res_penalty + energy_reward # 可选:添加平移项使特定点的奖励为零(当前未使用) # total_reward -= offset return total_reward, rec_reward, energy_reward, res_penalty