import os import torch from pathlib import Path import numpy as np import gymnasium as gym from gymnasium import spaces from typing import Dict, Tuple, Optional import torch import torch.nn as nn from dataclasses import dataclass, asdict from UF_resistance_models import ResistanceIncreaseModel, ResistanceDecreaseModel # 导入模型类 import copy # ======================= # 膜运行参数类:定义膜的基础运行参数 # ======================= @dataclass class UFParams: # —— 膜动态运行参数 —— q_UF: float = 360.0 # 过滤进水流量(m^3/h) TMP0: float = 0.03 # 初始跨膜压差 temp: float = 25.0 # 水温,摄氏度 # —— 膜阻力模型参数 —— nuK: float =4.92e+01 # 过滤阶段膜阻力增长模型参数 slope: float = 3.44e-01 # 全周期不可逆污染阻力增长斜率 power: float = 1.032 # 全周期不可逆污染阻力增长幂次 tau_bw_s: float = 30.0 # 物洗时长影响时间尺度 gamma_t: float = 1.0 # 物洗时长作用指数 ceb_removal: float = 150 # CEB去除膜阻力 # —— 膜运行约束参数 —— global_TMP_limit: float = 0.08 # TMP硬上限(MPa) TMP0_max: float = 0.035 # 初始TMP上限(MPa) TMP0_min: float = 0.01 # 初始TMP下限(MPa) q_UF_max: float = 400.0 # 进水流量上限(m^3/h) q_UF_min: float = 250.0 # 进水流量上限(m^3/h) temp_max: float = 40.0 # 温度上限(摄氏度) temp_min: float = 10.0 # 温度下限(摄氏度) nuK_max: float = 6e+01 # 物理周期总阻力增速上限(m^-1/s) nuK_min: float = 3e+01 # 物理周期总阻力增速下限(m^-1/s) slope_max: float = 10 # 化学周期长期阻力增速斜率上限 slope_min: float = 0.1 # 化学周期长期阻力增速斜率下限 power_max: float = 1.3 # 化学周期长期阻力增速幂次上限 power_min: float = 0.8 # 化学周期长期阻力增速幂次下限 ceb_removal_max: float = 150 # CEB去除阻力(已缩放)上限(m^-1) ceb_removal_min: float = 100 # CEB去除阻力(已缩放)下限(m^-1) # —— 反洗参数(固定) —— q_bw_m3ph: float = 1000.0 # 物理反洗流量(m^3/h) # —— CEB参数 —— T_ceb_interval_h: float = 60.0 # 固定每 k 小时做一次CEB v_ceb_m3: float = 30.0 # CEB用水体积(m^3) t_ceb_s: float = 40 * 60.0 # CEB时长(s) # —— 搜索范围(秒) —— L_min_s: float = 3800.0 # 过滤时长下限(s) L_max_s: float = 6000.0 # 过滤时长上限(s) t_bw_min_s: float = 40.0 # 物洗时长下限(s) t_bw_max_s: float = 60.0 # 物洗时长上限(s) # —— 网格 —— L_step_s: float = 60.0 # 过滤时长步长(s) t_bw_step_s: float = 5.0 # 物洗时长步长(s) # —— 奖励函数参数 —— k_rec = 5.0 # 回收率敏感度 k_res = 10.0 # 残余污染敏感度 rec_low, rec_high = 0.92, 0.99 rr0 = 0.08 # ======================= # 辅助函数:转换膜阻力与跨膜压差 # ======================= def xishan_viscosity(temp): # temp: 水温,单位摄氏度 """ 锡山水厂 PLC水温校正因子经验公式(25摄氏度标准) 返回温度修正后的水粘度(纯水修正),TODO:水厂水质与纯水相差较大,对粘度有一定影响 """ x = (temp + 273.15) / 300 factor = 890 / (280.68 * x ** -1.9 + 511.45 * x ** -7.7 + 61.131 * x ** -19.6 + 0.45903 * x ** -40) mu = 0.00089 / factor return mu def _calculate_resistance(tmp, q_UF, temp): """ 计算超滤膜阻力 R = TMP / (J * μ) 返回缩小1e10的膜阻力(超滤原膜阻力量级为1e12,过大的绝对值容易导致平稳拟合) """ A = 128 * 40 # m²,有效膜面积 mu = xishan_viscosity(temp) # 温度修正后的水粘度 TMP_Pa = tmp * 1e6 # 跨膜压差 MPa -> Pa J = q_UF / A / 3600 # 通量 m³/h -> m³/(m²·s) if J <= 0 or mu <= 0: return np.nan R = TMP_Pa / (J * mu) / 1e10 # 缩放膜阻力 return float(R) def _calculate_tmp(R, q_UF, temp): """ 还原超滤跨膜压差 TMP """ A = 128 * 40 # m²,有效膜面积 mu = xishan_viscosity(temp) # 温度修正后的水粘度 J = q_UF / A / 3600 # 通量 m³/h -> m³/(m²·s) TMP_Pa = R * J * mu * 1e10 tmp = TMP_Pa / 1e6 return float(tmp) # ======================= # 环境体模型加载函数 # ======================= def load_resistance_models(): """加载阻力变化模型,仅在首次调用时执行""" global resistance_model_fp, resistance_model_bw # 如果全局模型已存在,则直接返回 if "resistance_model_fp" in globals() and resistance_model_fp is not None: return resistance_model_fp, resistance_model_bw print("🔄 Loading resistance models...") # 初始化模型 resistance_model_fp = ResistanceIncreaseModel() resistance_model_bw = ResistanceDecreaseModel() # 取得当前脚本所在目录(即 rl_dqn_env.py 或 check_initial_state.py 同目录) base_dir = Path(__file__).resolve().parent # 构造模型路径 fp_path = base_dir / "resistance_model_fp.pth" bw_path = base_dir / "resistance_model_bw.pth" # 检查文件存在性 assert fp_path.exists(), f"缺少 {fp_path.name}" assert bw_path.exists(), f"缺少 {bw_path.name}" # 加载权重 resistance_model_fp.load_state_dict(torch.load(fp_path, map_location="cpu")) resistance_model_bw.load_state_dict(torch.load(bw_path, map_location="cpu")) # 设置推理模式 resistance_model_fp.eval() resistance_model_bw.eval() print("✅ Resistance models loaded successfully from current directory.") return resistance_model_fp, resistance_model_bw # ======================= # 环境体模型模拟函数 # ======================= def _delta_resistance(p, L_h: float) -> float: """ 过滤时段膜阻力上升量:调用 resistance_model_fp.pth 模型 """ return resistance_model_fp(p, L_h) def phi_bw_of(p, R0: float, R_end: float, L_h_start: float, L_h_next_start: float, t_bw_s: float) -> float: """ 物理冲洗去除膜阻力值:调用 resistance_model_bw 模型 """ return resistance_model_bw(p, R0, R_end, L_h_start, L_h_next_start, t_bw_s) def _v_bw_m3(p, t_bw_s: float) -> float: """ 物理反洗水耗 """ return float(p.q_bw_m3ph * (float(t_bw_s) / 3600.0)) def simulate_one_supercycle(p: UFParams, L_s: float, t_bw_s: float): """ 模拟一个超级周期(多次物理反洗 + 一次化学反洗) 返回: (info, next_params) """ L_h = float(L_s) / 3600.0 # 小周期过滤时间(h) tmp = p.TMP0 R0 = _calculate_resistance(p.TMP0, p.q_UF, p.temp) max_tmp_during_filtration = tmp min_tmp_during_filtration = tmp max_residual_increase = 0.0 t_small_cycle_h = (L_s + t_bw_s) / 3600.0 k_bw_per_ceb = int(np.floor(p.T_ceb_interval_h / t_small_cycle_h)) if k_bw_per_ceb < 1: k_bw_per_ceb = 1 energy_lookup = { 3600: 0.1034, 3660: 0.1031, 3720: 0.1029, 3780: 0.1026, 3840: 0.1023, 3900: 0.1021, 3960: 0.1019, 4020: 0.1017, 4080: 0.1015, 4140: 0.1012, 4200: 0.1011 } # --- 循环模拟物理反洗 --- for idx in range(k_bw_per_ceb): tmp_run_start = tmp q_UF = p.q_UF temp = p.temp R_run_start = _calculate_resistance(tmp_run_start, q_UF, temp) d_R = _delta_resistance(p, L_s) R_peak = R_run_start + d_R tmp_peak = _calculate_tmp(R_peak, q_UF, temp) max_tmp_during_filtration = max(max_tmp_during_filtration, tmp_peak) min_tmp_during_filtration = min(min_tmp_during_filtration, tmp_run_start) # 物洗膜阻力减小 L_h_start = (L_s + t_bw_s) / 3600.0 * idx L_h_next_start = (L_s + t_bw_s) / 3600.0 * (idx + 1) reversible_R = phi_bw_of(p, R_run_start, R_peak, L_h_start, L_h_next_start, t_bw_s) R_after_bw = R_peak - reversible_R tmp_after_bw = _calculate_tmp(R_after_bw, q_UF, temp) residual_inc = tmp_after_bw - tmp_run_start max_residual_increase = max(max_residual_increase, residual_inc) tmp = tmp_after_bw # --- CEB反洗 --- R_after_ceb = R_peak - p.ceb_removal tmp_after_ceb = _calculate_tmp(R_after_ceb, q_UF, temp) # ============================================================ # 生成本周期指标 # ============================================================ # --- 体积与能耗 --- V_feed_super = k_bw_per_ceb * p.q_UF * L_h V_loss_super = k_bw_per_ceb * _v_bw_m3(p, t_bw_s) + p.v_ceb_m3 V_net = max(0.0, V_feed_super - V_loss_super) recovery = max(0.0, V_net / max(V_feed_super, 1e-12)) T_super_h = k_bw_per_ceb * (L_s + t_bw_s) / 3600.0 + p.t_ceb_s / 3600.0 daily_prod_time_h = k_bw_per_ceb * L_h / T_super_h * 24.0 closest_L = min(energy_lookup.keys(), key=lambda x: abs(x - L_s)) ton_water_energy = energy_lookup[closest_L] #TODO:需确认新过滤时间范围下的吨水电耗 # --- 信息输出 --- info = { "q_UF": p.q_UF, "temp": p.temp, "recovery": recovery, "V_feed_super_m3": V_feed_super, "V_loss_super_m3": V_loss_super, "V_net_super_m3": V_net, "supercycle_time_h": T_super_h, "max_TMP_during_filtration": max_tmp_during_filtration, "min_TMP_during_filtration": min_tmp_during_filtration, "global_TMP_limit":p.global_TMP_limit, "max_residual_increase_per_run": max_residual_increase, "R0": R0, "R_after_ceb": R_after_ceb, "TMP0":p.TMP0, "TMP_after_ceb": tmp_after_ceb, "daily_prod_time_h": daily_prod_time_h, "ton_water_energy_kWh_per_m3": ton_water_energy, "k_bw_per_ceb": k_bw_per_ceb } # ============================================================ # 状态更新:生成 next_params(新状态) # ============================================================ next_params = copy.deepcopy(p) # 更新跨膜压差(TMP) next_params.TMP0 = tmp_after_ceb # 可选参数(当前保持不变,未来可扩展更新逻辑) next_params.slope = p.slope next_params.power = p.power next_params.ceb_removal = p.ceb_removal next_params.nuK = p.nuK next_params.q_UF = p.q_UF next_params.temp = p.temp return info, next_params def calculate_reward(p: UFParams, info: dict) -> float: """ TMP不参与奖励计算,仅考虑回收率与残余污染比例之间的权衡。 满足: - 当 recovery=0.97, residual_ratio=0.1 → reward = 0 - 当 recovery=0.90, residual_ratio=0.0 → reward = 0 - 在两者之间平衡(如 recovery≈0.94, residual_ratio≈0.05)→ reward > 0 """ recovery = info["recovery"] residual_ratio = (info["R_after_ceb"] - info["R0"]) / info["R0"] # 回收率奖励(在 [rec_low, rec_high] 内平滑上升) rec_norm = (recovery - p.rec_low) / (p.rec_high - p.rec_low) rec_reward = np.clip(np.tanh(p.k_rec * (rec_norm - 0.5)), -1, 1) # 残余比惩罚(超过rr0时快速变为负值) res_penalty = -np.tanh(p.k_res * (residual_ratio / p.rr0 - 1)) # 组合逻辑:权衡二者 total_reward = rec_reward + res_penalty # 再平移使指定点为零: # recovery=0.97, residual=0.1 → 0 # recovery=0.90, residual=0.0 → 0 # 经验上,这两点几乎对称,因此无需额外线性偏移 # 若希望严格归零,可用线性校正: total_reward -= 0.0 return total_reward def is_dead_cycle(info: dict) -> bool: """ 判断当前循环是否为成功循环(True)或失败循环(False) 失败条件: 1. 最大TMP超过设定上限; 2. 回收率低于75%; 3. 化学反冲洗后膜阻力上升超过10%。 参数: info: dict simulate_one_supercycle() 返回的指标字典,需包含: - max_TMP_during_filtration - recovery - R_after_ceb - R_run_start - TMP_limit(如果有定义) 返回: bool: True 表示成功循环,False 表示失败循环。 """ TMP_limit = info.get("global_TMP_limit", 0.08) # 默认硬约束上限 max_tmp = info.get("max_TMP_during_filtration", 0) recovery = info.get("recovery", 1.0) R_after_ceb = info.get("R_after_ceb", 0) R0 = info.get("R0", 1e-6) # 判断条件 if max_tmp > TMP_limit: return False if recovery < 0.75: return False if (R_after_ceb - R0) / R0 > 0.1: return False return True class UFSuperCycleEnv(gym.Env): """超滤系统环境(超级周期级别决策)""" metadata = {"render_modes": ["human"]} def __init__(self, base_params, resistance_models=None, max_episode_steps: int = 15): super(UFSuperCycleEnv, self).__init__() self.base_params = base_params self.current_params = copy.deepcopy(base_params) self.max_episode_steps = max_episode_steps self.current_step = 0 if resistance_models is None: self.resistance_model_fp, self.resistance_model_bw = load_resistance_models() else: self.resistance_model_fp, self.resistance_model_bw = resistance_models # 计算离散动作空间 self.L_values = np.arange( self.base_params.L_min_s, self.base_params.L_max_s, self.base_params.L_step_s ) self.t_bw_values = np.arange( self.base_params.t_bw_min_s, self.base_params.t_bw_max_s + self.base_params.t_bw_step_s, self.base_params.t_bw_step_s ) self.num_L = len(self.L_values) self.num_bw = len(self.t_bw_values) # 单一离散动作空间 self.action_space = spaces.Discrete(self.num_L * self.num_bw) # 状态空间,归一化在 _get_obs 中处理 self.observation_space = spaces.Box( low=np.zeros(8, dtype=np.float32), high=np.ones(8, dtype=np.float32), dtype=np.float32 ) # 初始化环境 self.reset(seed=None) def generate_initial_state(self): """ 随机生成一个初始状态,不进行死状态判断 """ self.current_params.TMP0 = np.random.uniform( self.current_params.TMP0_min, self.current_params.TMP0_max ) self.current_params.q_UF = np.random.uniform( self.current_params.q_UF_min, self.current_params.q_UF_max ) self.current_params.temp = np.random.uniform( self.current_params.temp_min, self.current_params.temp_max ) self.current_params.R0 = _calculate_resistance( self.current_params.TMP0, self.current_params.q_UF, self.current_params.temp ) self.current_params.nuK = np.random.uniform( self.current_params.nuK_min, self.current_params.nuK_max ) self.current_params.slope = np.random.uniform( self.current_params.slope_min, self.current_params.slope_max ) self.current_params.power = np.random.uniform( self.current_params.power_min, self.current_params.power_max ) self.current_params.ceb_removal = np.random.uniform( self.current_params.ceb_removal_min, self.current_params.ceb_removal_max ) return self._get_state_copy() def reset(self, seed=None, options=None, max_attempts: int = 200): super().reset(seed=seed) attempts = 0 while attempts < max_attempts: attempts += 1 self.generate_initial_state() # 生成随机初始状态 if self.check_dead_initial_state(max_steps=getattr(self, "max_episode_steps", 15), L_s=3800, t_bw_s=60): # True 表示可行,退出循环 break else: # 超过最大尝试次数仍未生成可行状态 raise RuntimeError(f"在 {max_attempts} 次尝试后仍无法生成可行初始状态。") # 初始化步数、动作、最大 TMP self.current_step = 0 self.last_action = (self.base_params.L_min_s, self.base_params.t_bw_min_s) self.max_TMP_during_filtration = self.current_params.TMP0 return self._get_obs(), {} def check_dead_initial_state(self, max_steps: int = None, L_s: int = 4900, t_bw_s: int = 50) -> bool: """ 判断当前环境生成的初始状态是否为可行(non-dead)。 使用最保守策略连续模拟 max_steps 次: 若任意一次 is_dead_cycle(info) 返回 False,则视为必死状态。 参数: max_steps: 模拟步数,默认使用 self.max_episode_steps L_s: 过滤时长(s),默认 3800 t_bw_s: 物理反洗时长(s),默认 60 返回: bool: True 表示可行状态(non-dead),False 表示必死状态 """ if max_steps is None: max_steps = getattr(self, "max_episode_steps", 15) # 生成初始状态 self.generate_initial_state() if not hasattr(self, "current_params"): raise AttributeError("generate_initial_state() 未设置 current_params。") import copy curr_p = copy.deepcopy(self.current_params) # 逐步模拟 for step in range(max_steps): try: info, next_params = simulate_one_supercycle(curr_p, L_s, t_bw_s) except Exception: # 异常即视为不可行 return False if not is_dead_cycle(info): # 任意一次失败即为必死状态 return False curr_p = next_params return True def _get_state_copy(self): return copy.deepcopy(self.current_params) def _get_obs(self): """ 构建当前环境归一化状态向量 """ # === 1. 从 current_params 读取动态参数 === TMP0 = self.current_params.TMP0 q_UF = self.current_params.q_UF temp = self.current_params.temp # === 2. 计算本周期初始膜阻力 === R0 = _calculate_resistance(TMP0, q_UF, temp) # === 3. 从 current_params 读取膜阻力增长模型参数 === nuk = self.current_params.nuK slope = self.current_params.slope power = self.current_params.power ceb_removal = self.current_params.ceb_removal # === 4. 从 current_params 动态读取上下限 === TMP0_min, TMP0_max = self.current_params.TMP0_min, self.current_params.TMP0_max q_UF_min, q_UF_max = self.current_params.q_UF_min, self.current_params.q_UF_max temp_min, temp_max = self.current_params.temp_min, self.current_params.temp_max nuK_min, nuK_max = self.current_params.nuK_min, self.current_params.nuK_max slope_min, slope_max = self.current_params.slope_min, self.current_params.slope_max power_min, power_max = self.current_params.power_min, self.current_params.power_max ceb_min, ceb_max = self.current_params.ceb_removal_min, self.current_params.ceb_removal_max # === 5. 归一化计算(clip防止越界) === TMP0_norm = np.clip((TMP0 - TMP0_min) / (TMP0_max - TMP0_min), 0, 1) q_UF_norm = np.clip((q_UF - q_UF_min) / (q_UF_max - q_UF_min), 0, 1) temp_norm = np.clip((temp - temp_min) / (temp_max - temp_min), 0, 1) # R0 不在 current_params 中定义上下限,设定经验范围 R0_norm = np.clip((R0 - 100.0) / (800.0 - 100.0), 0, 1) short_term_norm = np.clip((nuk - nuK_min) / (nuK_max - nuK_min), 0, 1) long_term_slope_norm = np.clip((slope - slope_min) / (slope_max - slope_min), 0, 1) long_term_power_norm = np.clip((power - power_min) / (power_max - power_min), 0, 1) ceb_removal_norm = np.clip((ceb_removal - ceb_min) / (ceb_max - ceb_min), 0, 1) # === 6. 构建观测向量 === obs = np.array([ TMP0_norm, q_UF_norm, temp_norm, R0_norm, short_term_norm, long_term_slope_norm, long_term_power_norm, ceb_removal_norm ], dtype=np.float32) return obs def _get_action_values(self, action): """ 将动作还原为实际时长 """ L_idx = action // self.num_bw t_bw_idx = action % self.num_bw return self.L_values[L_idx], self.t_bw_values[t_bw_idx] def step(self, action): self.current_step += 1 L_s, t_bw_s = self._get_action_values(action) L_s = np.clip(L_s, self.base_params.L_min_s, self.base_params.L_max_s) t_bw_s = np.clip(t_bw_s, self.base_params.t_bw_min_s, self.base_params.t_bw_max_s) # 模拟超级周期 info, next_params = simulate_one_supercycle(self.current_params, L_s, t_bw_s) # 根据 info 判断是否成功 feasible = is_dead_cycle(info) # True 表示成功循环,False 表示失败 if feasible: reward = calculate_reward(self.current_params, info) self.current_params = next_params terminated = False else: reward = -10 terminated = True truncated = self.current_step >= self.max_episode_steps self.last_action = (L_s, t_bw_s) next_obs = self._get_obs() info["feasible"] = feasible info["step"] = self.current_step return next_obs, reward, terminated, truncated, info