| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599 |
- import os
- import torch
- from pathlib import Path
- import numpy as np
- import gymnasium as gym
- from gymnasium import spaces
- from typing import Dict, Tuple, Optional
- import torch
- import torch.nn as nn
- from dataclasses import dataclass, asdict
- from UF_resistance_models import ResistanceIncreaseModel, ResistanceDecreaseModel # 导入模型类
- import copy
- # =======================
- # 膜运行参数类:定义膜的基础运行参数
- # =======================
- @dataclass
- class UFParams:
- # —— 膜动态运行参数 ——
- q_UF: float = 360.0 # 过滤进水流量(m^3/h)
- TMP0: float = 0.03 # 初始跨膜压差
- temp: float = 25.0 # 水温,摄氏度
- # —— 膜阻力模型参数 ——
- nuK: float =4.92e+01 # 过滤阶段膜阻力增长模型参数
- slope: float = 3.44e-01 # 全周期不可逆污染阻力增长斜率
- power: float = 1.032 # 全周期不可逆污染阻力增长幂次
- tau_bw_s: float = 30.0 # 物洗时长影响时间尺度
- gamma_t: float = 1.0 # 物洗时长作用指数
- ceb_removal: float = 150 # CEB去除膜阻力
- # —— 膜运行约束参数 ——
- global_TMP_limit: float = 0.08 # TMP硬上限(MPa)
- TMP0_max: float = 0.035 # 初始TMP上限(MPa)
- TMP0_min: float = 0.01 # 初始TMP下限(MPa)
- q_UF_max: float = 400.0 # 进水流量上限(m^3/h)
- q_UF_min: float = 250.0 # 进水流量上限(m^3/h)
- temp_max: float = 40.0 # 温度上限(摄氏度)
- temp_min: float = 10.0 # 温度下限(摄氏度)
- nuK_max: float = 6e+01 # 物理周期总阻力增速上限(m^-1/s)
- nuK_min: float = 3e+01 # 物理周期总阻力增速下限(m^-1/s)
- slope_max: float = 10 # 化学周期长期阻力增速斜率上限
- slope_min: float = 0.1 # 化学周期长期阻力增速斜率下限
- power_max: float = 1.3 # 化学周期长期阻力增速幂次上限
- power_min: float = 0.8 # 化学周期长期阻力增速幂次下限
- ceb_removal_max: float = 150 # CEB去除阻力(已缩放)上限(m^-1)
- ceb_removal_min: float = 100 # CEB去除阻力(已缩放)下限(m^-1)
- # —— 反洗参数(固定) ——
- q_bw_m3ph: float = 1000.0 # 物理反洗流量(m^3/h)
- # —— CEB参数 ——
- T_ceb_interval_h: float = 60.0 # 固定每 k 小时做一次CEB
- v_ceb_m3: float = 30.0 # CEB用水体积(m^3)
- t_ceb_s: float = 40 * 60.0 # CEB时长(s)
- # —— 搜索范围(秒) ——
- L_min_s: float = 3800.0 # 过滤时长下限(s)
- L_max_s: float = 6000.0 # 过滤时长上限(s)
- t_bw_min_s: float = 40.0 # 物洗时长下限(s)
- t_bw_max_s: float = 60.0 # 物洗时长上限(s)
- # —— 网格 ——
- L_step_s: float = 60.0 # 过滤时长步长(s)
- t_bw_step_s: float = 5.0 # 物洗时长步长(s)
- # —— 奖励函数参数 ——
- k_rec = 5.0 # 回收率敏感度
- k_res = 10.0 # 残余污染敏感度
- rec_low, rec_high = 0.92, 0.99
- rr0 = 0.08
- # =======================
- # 辅助函数:转换膜阻力与跨膜压差
- # =======================
- def xishan_viscosity(temp):
- # temp: 水温,单位摄氏度
- """
- 锡山水厂 PLC水温校正因子经验公式(25摄氏度标准)
- 返回温度修正后的水粘度(纯水修正),TODO:水厂水质与纯水相差较大,对粘度有一定影响
- """
- x = (temp + 273.15) / 300
- factor = 890 / (280.68 * x ** -1.9 + 511.45 * x ** -7.7 + 61.131 * x ** -19.6 + 0.45903 * x ** -40)
- mu = 0.00089 / factor
- return mu
- def _calculate_resistance(tmp, q_UF, temp):
- """
- 计算超滤膜阻力 R = TMP / (J * μ)
- 返回缩小1e10的膜阻力(超滤原膜阻力量级为1e12,过大的绝对值容易导致平稳拟合)
- """
- A = 128 * 40 # m²,有效膜面积
- mu = xishan_viscosity(temp) # 温度修正后的水粘度
- TMP_Pa = tmp * 1e6 # 跨膜压差 MPa -> Pa
- J = q_UF / A / 3600 # 通量 m³/h -> m³/(m²·s)
- if J <= 0 or mu <= 0:
- return np.nan
- R = TMP_Pa / (J * mu) / 1e10 # 缩放膜阻力
- return float(R)
- def _calculate_tmp(R, q_UF, temp):
- """
- 还原超滤跨膜压差 TMP
- """
- A = 128 * 40 # m²,有效膜面积
- mu = xishan_viscosity(temp) # 温度修正后的水粘度
- J = q_UF / A / 3600 # 通量 m³/h -> m³/(m²·s)
- TMP_Pa = R * J * mu * 1e10
- tmp = TMP_Pa / 1e6
- return float(tmp)
- # =======================
- # 环境体模型加载函数
- # =======================
- def load_resistance_models():
- """加载阻力变化模型,仅在首次调用时执行"""
- global resistance_model_fp, resistance_model_bw
- # 如果全局模型已存在,则直接返回
- if "resistance_model_fp" in globals() and resistance_model_fp is not None:
- return resistance_model_fp, resistance_model_bw
- print("🔄 Loading resistance models...")
- # 初始化模型
- resistance_model_fp = ResistanceIncreaseModel()
- resistance_model_bw = ResistanceDecreaseModel()
- # 取得当前脚本所在目录(即 rl_dqn_env.py 或 check_initial_state.py 同目录)
- base_dir = Path(__file__).resolve().parent
- # 构造模型路径
- fp_path = base_dir / "resistance_model_fp.pth"
- bw_path = base_dir / "resistance_model_bw.pth"
- # 检查文件存在性
- assert fp_path.exists(), f"缺少 {fp_path.name}"
- assert bw_path.exists(), f"缺少 {bw_path.name}"
- # 加载权重
- resistance_model_fp.load_state_dict(torch.load(fp_path, map_location="cpu"))
- resistance_model_bw.load_state_dict(torch.load(bw_path, map_location="cpu"))
- # 设置推理模式
- resistance_model_fp.eval()
- resistance_model_bw.eval()
- print("✅ Resistance models loaded successfully from current directory.")
- return resistance_model_fp, resistance_model_bw
- # =======================
- # 环境体模型模拟函数
- # =======================
- def _delta_resistance(p, L_h: float) -> float:
- """
- 过滤时段膜阻力上升量:调用 resistance_model_fp.pth 模型
- """
- return resistance_model_fp(p, L_h)
- def phi_bw_of(p, R0: float, R_end: float, L_h_start: float, L_h_next_start: float, t_bw_s: float) -> float:
- """
- 物理冲洗去除膜阻力值:调用 resistance_model_bw 模型
- """
- return resistance_model_bw(p, R0, R_end, L_h_start, L_h_next_start, t_bw_s)
- def _v_bw_m3(p, t_bw_s: float) -> float:
- """
- 物理反洗水耗
- """
- return float(p.q_bw_m3ph * (float(t_bw_s) / 3600.0))
- def simulate_one_supercycle(p: UFParams, L_s: float, t_bw_s: float):
- """
- 模拟一个超级周期(多次物理反洗 + 一次化学反洗)
- 返回: (info, next_params)
- """
- L_h = float(L_s) / 3600.0 # 小周期过滤时间(h)
- tmp = p.TMP0
- R0 = _calculate_resistance(p.TMP0, p.q_UF, p.temp)
- max_tmp_during_filtration = tmp
- min_tmp_during_filtration = tmp
- max_residual_increase = 0.0
- t_small_cycle_h = (L_s + t_bw_s) / 3600.0
- k_bw_per_ceb = int(np.floor(p.T_ceb_interval_h / t_small_cycle_h))
- if k_bw_per_ceb < 1:
- k_bw_per_ceb = 1
- energy_lookup = {
- 3600: 0.1034, 3660: 0.1031, 3720: 0.1029, 3780: 0.1026,
- 3840: 0.1023, 3900: 0.1021, 3960: 0.1019, 4020: 0.1017,
- 4080: 0.1015, 4140: 0.1012, 4200: 0.1011
- }
- # --- 循环模拟物理反洗 ---
- for idx in range(k_bw_per_ceb):
- tmp_run_start = tmp
- q_UF = p.q_UF
- temp = p.temp
- R_run_start = _calculate_resistance(tmp_run_start, q_UF, temp)
- d_R = _delta_resistance(p, L_s)
- R_peak = R_run_start + d_R
- tmp_peak = _calculate_tmp(R_peak, q_UF, temp)
- max_tmp_during_filtration = max(max_tmp_during_filtration, tmp_peak)
- min_tmp_during_filtration = min(min_tmp_during_filtration, tmp_run_start)
- # 物洗膜阻力减小
- L_h_start = (L_s + t_bw_s) / 3600.0 * idx
- L_h_next_start = (L_s + t_bw_s) / 3600.0 * (idx + 1)
- reversible_R = phi_bw_of(p, R_run_start, R_peak, L_h_start, L_h_next_start, t_bw_s)
- R_after_bw = R_peak - reversible_R
- tmp_after_bw = _calculate_tmp(R_after_bw, q_UF, temp)
- residual_inc = tmp_after_bw - tmp_run_start
- max_residual_increase = max(max_residual_increase, residual_inc)
- tmp = tmp_after_bw
- # --- CEB反洗 ---
- R_after_ceb = R_peak - p.ceb_removal
- tmp_after_ceb = _calculate_tmp(R_after_ceb, q_UF, temp)
- # ============================================================
- # 生成本周期指标
- # ============================================================
- # --- 体积与能耗 ---
- V_feed_super = k_bw_per_ceb * p.q_UF * L_h
- V_loss_super = k_bw_per_ceb * _v_bw_m3(p, t_bw_s) + p.v_ceb_m3
- V_net = max(0.0, V_feed_super - V_loss_super)
- recovery = max(0.0, V_net / max(V_feed_super, 1e-12))
- T_super_h = k_bw_per_ceb * (L_s + t_bw_s) / 3600.0 + p.t_ceb_s / 3600.0
- daily_prod_time_h = k_bw_per_ceb * L_h / T_super_h * 24.0
- closest_L = min(energy_lookup.keys(), key=lambda x: abs(x - L_s))
- ton_water_energy = energy_lookup[closest_L] #TODO:需确认新过滤时间范围下的吨水电耗
- # --- 信息输出 ---
- info = {
- "q_UF": p.q_UF,
- "temp": p.temp,
- "recovery": recovery,
- "V_feed_super_m3": V_feed_super,
- "V_loss_super_m3": V_loss_super,
- "V_net_super_m3": V_net,
- "supercycle_time_h": T_super_h,
- "max_TMP_during_filtration": max_tmp_during_filtration,
- "min_TMP_during_filtration": min_tmp_during_filtration,
- "global_TMP_limit":p.global_TMP_limit,
- "max_residual_increase_per_run": max_residual_increase,
- "R0": R0,
- "R_after_ceb": R_after_ceb,
- "TMP0":p.TMP0,
- "TMP_after_ceb": tmp_after_ceb,
- "daily_prod_time_h": daily_prod_time_h,
- "ton_water_energy_kWh_per_m3": ton_water_energy,
- "k_bw_per_ceb": k_bw_per_ceb
- }
- # ============================================================
- # 状态更新:生成 next_params(新状态)
- # ============================================================
- next_params = copy.deepcopy(p)
- # 更新跨膜压差(TMP)
- next_params.TMP0 = tmp_after_ceb
- # 可选参数(当前保持不变,未来可扩展更新逻辑)
- next_params.slope = p.slope
- next_params.power = p.power
- next_params.ceb_removal = p.ceb_removal
- next_params.nuK = p.nuK
- next_params.q_UF = p.q_UF
- next_params.temp = p.temp
- return info, next_params
- def calculate_reward(p: UFParams, info: dict) -> float:
- """
- TMP不参与奖励计算,仅考虑回收率与残余污染比例之间的权衡。
- 满足:
- - 当 recovery=0.97, residual_ratio=0.1 → reward = 0
- - 当 recovery=0.90, residual_ratio=0.0 → reward = 0
- - 在两者之间平衡(如 recovery≈0.94, residual_ratio≈0.05)→ reward > 0
- """
- recovery = info["recovery"]
- residual_ratio = (info["R_after_ceb"] - info["R0"]) / info["R0"]
- # 回收率奖励(在 [rec_low, rec_high] 内平滑上升)
- rec_norm = (recovery - p.rec_low) / (p.rec_high - p.rec_low)
- rec_reward = np.clip(np.tanh(p.k_rec * (rec_norm - 0.5)), -1, 1)
- # 残余比惩罚(超过rr0时快速变为负值)
- res_penalty = -np.tanh(p.k_res * (residual_ratio / p.rr0 - 1))
- # 组合逻辑:权衡二者
- total_reward = rec_reward + res_penalty
- # 再平移使指定点为零:
- # recovery=0.97, residual=0.1 → 0
- # recovery=0.90, residual=0.0 → 0
- # 经验上,这两点几乎对称,因此无需额外线性偏移
- # 若希望严格归零,可用线性校正:
- total_reward -= 0.0
- return total_reward
- def is_dead_cycle(info: dict) -> bool:
- """
- 判断当前循环是否为成功循环(True)或失败循环(False)
- 失败条件:
- 1. 最大TMP超过设定上限;
- 2. 回收率低于75%;
- 3. 化学反冲洗后膜阻力上升超过10%。
- 参数:
- info: dict
- simulate_one_supercycle() 返回的指标字典,需包含:
- - max_TMP_during_filtration
- - recovery
- - R_after_ceb
- - R_run_start
- - TMP_limit(如果有定义)
- 返回:
- bool: True 表示成功循环,False 表示失败循环。
- """
- TMP_limit = info.get("global_TMP_limit", 0.08) # 默认硬约束上限
- max_tmp = info.get("max_TMP_during_filtration", 0)
- recovery = info.get("recovery", 1.0)
- R_after_ceb = info.get("R_after_ceb", 0)
- R0 = info.get("R0", 1e-6)
- # 判断条件
- if max_tmp > TMP_limit:
- return False
- if recovery < 0.75:
- return False
- if (R_after_ceb - R0) / R0 > 0.1:
- return False
- return True
- class UFSuperCycleEnv(gym.Env):
- """超滤系统环境(超级周期级别决策)"""
- metadata = {"render_modes": ["human"]}
- def __init__(self, base_params, resistance_models=None, max_episode_steps: int = 15):
- super(UFSuperCycleEnv, self).__init__()
- self.base_params = base_params
- self.current_params = copy.deepcopy(base_params)
- self.max_episode_steps = max_episode_steps
- self.current_step = 0
- if resistance_models is None:
- self.resistance_model_fp, self.resistance_model_bw = load_resistance_models()
- else:
- self.resistance_model_fp, self.resistance_model_bw = resistance_models
- # 计算离散动作空间
- self.L_values = np.arange(
- self.base_params.L_min_s,
- self.base_params.L_max_s,
- self.base_params.L_step_s
- )
- self.t_bw_values = np.arange(
- self.base_params.t_bw_min_s,
- self.base_params.t_bw_max_s + self.base_params.t_bw_step_s,
- self.base_params.t_bw_step_s
- )
- self.num_L = len(self.L_values)
- self.num_bw = len(self.t_bw_values)
- # 单一离散动作空间
- self.action_space = spaces.Discrete(self.num_L * self.num_bw)
- # 状态空间,归一化在 _get_obs 中处理
- self.observation_space = spaces.Box(
- low=np.zeros(8, dtype=np.float32),
- high=np.ones(8, dtype=np.float32),
- dtype=np.float32
- )
- # 初始化环境
- self.reset(seed=None)
- def generate_initial_state(self):
- """
- 随机生成一个初始状态,不进行死状态判断
- """
- self.current_params.TMP0 = np.random.uniform(
- self.current_params.TMP0_min, self.current_params.TMP0_max
- )
- self.current_params.q_UF = np.random.uniform(
- self.current_params.q_UF_min, self.current_params.q_UF_max
- )
- self.current_params.temp = np.random.uniform(
- self.current_params.temp_min, self.current_params.temp_max
- )
- self.current_params.R0 = _calculate_resistance(
- self.current_params.TMP0,
- self.current_params.q_UF,
- self.current_params.temp
- )
- self.current_params.nuK = np.random.uniform(
- self.current_params.nuK_min, self.current_params.nuK_max
- )
- self.current_params.slope = np.random.uniform(
- self.current_params.slope_min, self.current_params.slope_max
- )
- self.current_params.power = np.random.uniform(
- self.current_params.power_min, self.current_params.power_max
- )
- self.current_params.ceb_removal = np.random.uniform(
- self.current_params.ceb_removal_min, self.current_params.ceb_removal_max
- )
- return self._get_state_copy()
- def reset(self, seed=None, options=None, max_attempts: int = 200):
- super().reset(seed=seed)
- attempts = 0
- while attempts < max_attempts:
- attempts += 1
- self.generate_initial_state() # 生成随机初始状态
- if self.check_dead_initial_state(max_steps=getattr(self, "max_episode_steps", 15),
- L_s=3800, t_bw_s=60):
- # True 表示可行,退出循环
- break
- else:
- # 超过最大尝试次数仍未生成可行状态
- raise RuntimeError(f"在 {max_attempts} 次尝试后仍无法生成可行初始状态。")
- # 初始化步数、动作、最大 TMP
- self.current_step = 0
- self.last_action = (self.base_params.L_min_s, self.base_params.t_bw_min_s)
- self.max_TMP_during_filtration = self.current_params.TMP0
- return self._get_obs(), {}
- def check_dead_initial_state(self, max_steps: int = None,
- L_s: int = 4900, t_bw_s: int = 50) -> bool:
- """
- 判断当前环境生成的初始状态是否为可行(non-dead)。
- 使用最保守策略连续模拟 max_steps 次:
- 若任意一次 is_dead_cycle(info) 返回 False,则视为必死状态。
- 参数:
- max_steps: 模拟步数,默认使用 self.max_episode_steps
- L_s: 过滤时长(s),默认 3800
- t_bw_s: 物理反洗时长(s),默认 60
- 返回:
- bool: True 表示可行状态(non-dead),False 表示必死状态
- """
- if max_steps is None:
- max_steps = getattr(self, "max_episode_steps", 15)
- # 生成初始状态
- self.generate_initial_state()
- if not hasattr(self, "current_params"):
- raise AttributeError("generate_initial_state() 未设置 current_params。")
- import copy
- curr_p = copy.deepcopy(self.current_params)
- # 逐步模拟
- for step in range(max_steps):
- try:
- info, next_params = simulate_one_supercycle(curr_p, L_s, t_bw_s)
- except Exception:
- # 异常即视为不可行
- return False
- if not is_dead_cycle(info):
- # 任意一次失败即为必死状态
- return False
- curr_p = next_params
- return True
- def _get_state_copy(self):
- return copy.deepcopy(self.current_params)
- def _get_obs(self):
- """
- 构建当前环境归一化状态向量
- """
- # === 1. 从 current_params 读取动态参数 ===
- TMP0 = self.current_params.TMP0
- q_UF = self.current_params.q_UF
- temp = self.current_params.temp
- # === 2. 计算本周期初始膜阻力 ===
- R0 = _calculate_resistance(TMP0, q_UF, temp)
- # === 3. 从 current_params 读取膜阻力增长模型参数 ===
- nuk = self.current_params.nuK
- slope = self.current_params.slope
- power = self.current_params.power
- ceb_removal = self.current_params.ceb_removal
- # === 4. 从 current_params 动态读取上下限 ===
- TMP0_min, TMP0_max = self.current_params.TMP0_min, self.current_params.TMP0_max
- q_UF_min, q_UF_max = self.current_params.q_UF_min, self.current_params.q_UF_max
- temp_min, temp_max = self.current_params.temp_min, self.current_params.temp_max
- nuK_min, nuK_max = self.current_params.nuK_min, self.current_params.nuK_max
- slope_min, slope_max = self.current_params.slope_min, self.current_params.slope_max
- power_min, power_max = self.current_params.power_min, self.current_params.power_max
- ceb_min, ceb_max = self.current_params.ceb_removal_min, self.current_params.ceb_removal_max
- # === 5. 归一化计算(clip防止越界) ===
- TMP0_norm = np.clip((TMP0 - TMP0_min) / (TMP0_max - TMP0_min), 0, 1)
- q_UF_norm = np.clip((q_UF - q_UF_min) / (q_UF_max - q_UF_min), 0, 1)
- temp_norm = np.clip((temp - temp_min) / (temp_max - temp_min), 0, 1)
- # R0 不在 current_params 中定义上下限,设定经验范围
- R0_norm = np.clip((R0 - 100.0) / (800.0 - 100.0), 0, 1)
- short_term_norm = np.clip((nuk - nuK_min) / (nuK_max - nuK_min), 0, 1)
- long_term_slope_norm = np.clip((slope - slope_min) / (slope_max - slope_min), 0, 1)
- long_term_power_norm = np.clip((power - power_min) / (power_max - power_min), 0, 1)
- ceb_removal_norm = np.clip((ceb_removal - ceb_min) / (ceb_max - ceb_min), 0, 1)
- # === 6. 构建观测向量 ===
- obs = np.array([
- TMP0_norm,
- q_UF_norm,
- temp_norm,
- R0_norm,
- short_term_norm,
- long_term_slope_norm,
- long_term_power_norm,
- ceb_removal_norm
- ], dtype=np.float32)
- return obs
- def _get_action_values(self, action):
- """
- 将动作还原为实际时长
- """
- L_idx = action // self.num_bw
- t_bw_idx = action % self.num_bw
- return self.L_values[L_idx], self.t_bw_values[t_bw_idx]
- def step(self, action):
- self.current_step += 1
- L_s, t_bw_s = self._get_action_values(action)
- L_s = np.clip(L_s, self.base_params.L_min_s, self.base_params.L_max_s)
- t_bw_s = np.clip(t_bw_s, self.base_params.t_bw_min_s, self.base_params.t_bw_max_s)
- # 模拟超级周期
- info, next_params = simulate_one_supercycle(self.current_params, L_s, t_bw_s)
- # 根据 info 判断是否成功
- feasible = is_dead_cycle(info) # True 表示成功循环,False 表示失败
- if feasible:
- reward = calculate_reward(self.current_params, info)
- self.current_params = next_params
- terminated = False
- else:
- reward = -10
- terminated = True
- truncated = self.current_step >= self.max_episode_steps
- self.last_action = (L_s, t_bw_s)
- next_obs = self._get_obs()
- info["feasible"] = feasible
- info["step"] = self.current_step
- return next_obs, reward, terminated, truncated, info
|