import os import time import random import numpy as np import gymnasium as gym from gymnasium import spaces from stable_baselines3 import DQN from stable_baselines3.common.monitor import Monitor from stable_baselines3.common.vec_env import DummyVecEnv from stable_baselines3.common.callbacks import BaseCallback from typing import Dict, Tuple, Optional import torch import torch.nn as nn from dataclasses import dataclass, asdict from UF_models import TMPIncreaseModel, TMPDecreaseModel # 导入模型类 import copy # ==== 定义膜的基础运行参数 ==== @dataclass class UFParams: # —— 膜与运行参数 —— q_UF: float = 360.0 # 过滤进水流量(m^3/h) TMP0: float = 0.03 # 初始TMP(MPa) TMP_max: float = 0.06 # TMP硬上限(MPa) # —— 膜污染动力学 —— alpha: float = 1e-6 # TMP增长系数 belta: float = 1.1 # 幂指数 # —— 反洗参数(固定) —— q_bw_m3ph: float = 1000.0 # 物理反洗流量(m^3/h) # —— CEB参数(固定) —— T_ceb_interval_h: float = 48.0 # 固定每 k 小时做一次CEB v_ceb_m3: float = 30.0 # CEB用水体积(m^3) t_ceb_s: float = 40 * 60.0 # CEB时长(s) phi_ceb: float = 1.0 # CEB去除比例(简化:完全恢复到TMP0) # —— 约束与收敛 —— dTMP: float = 0.001 # 单次产水结束时,相对TMP0最大升幅(MPa) # —— 搜索范围(秒) —— L_min_s: float = 3800.0 # 过滤时长下限(s) L_max_s: float = 6000.0 # 过滤时长上限(s) t_bw_min_s: float = 40.0 # 物洗时长下限(s) t_bw_max_s: float = 60.0 # 物洗时长上限(s) # —— 物理反洗恢复函数参数 —— phi_bw_min: float = 0.7 # 物洗去除比例最小值 phi_bw_max: float = 1.0 # 物洗去除比例最大值 L_ref_s: float = 4000.0 # 过滤时长影响时间尺度 tau_bw_s: float = 20.0 # 物洗时长影响时间尺度 gamma_t: float = 1.0 # 物洗时长作用指数 # —— 网格 —— L_step_s: float = 60.0 # 过滤时长步长(s) t_bw_step_s: float = 5.0 # 物洗时长步长(s) # 多目标加权及高TMP惩罚 w_rec: float = 0.8 # 回收率权重 w_rate: float = 0.2 # 净供水率权重 w_headroom: float = 0.2 # 贴边惩罚权重 r_headroom: float = 2.0 # 贴边惩罚幂次 headroom_hardcap: float = 0.98 # 超过此比例直接视为不可取 # ==== 加载模拟环境模型 ==== # 初始化模型 model_fp = TMPIncreaseModel() model_bw = TMPDecreaseModel() # 加载参数 model_fp.load_state_dict(torch.load("uf_fp.pth")) model_bw.load_state_dict(torch.load("uf_bw.pth")) # 切换到推理模式 model_fp.eval() model_bw.eval() def _delta_tmp(p, L_h: float) -> float: """ 过滤时段TMP上升量:调用 uf_fp.pth 模型 """ return model_fp(p, L_h) def phi_bw_of(p, L_s: float, t_bw_s: float) -> float: """ 物洗去除比例:调用 uf_bw.pth 模型 """ return model_bw(p, L_s, t_bw_s) def _tmp_after_ceb(p, L_s: float, t_bw_s: float) -> float: """ 计算化学清洗(CEB)后的TMP,当前为恢复初始跨膜压差 """ return p.TMP0 def _v_bw_m3(p, t_bw_s: float) -> float: """ 物理反洗水耗 """ return float(p.q_bw_m3ph * (float(t_bw_s) / 3600.0)) def simulate_one_supercycle(p: UFParams, L_s: float, t_bw_s: float): """ 返回 (是否可行, 指标字典) - 支持动态CEB次数:48h固定间隔 - 增加日均产水时间和吨水电耗 - 增加最小TMP记录 """ L_h = float(L_s) / 3600.0 # 小周期过滤时间(h) tmp = p.TMP0 max_tmp_during_filtration = tmp min_tmp_during_filtration = tmp # 新增:初始化最小TMP max_residual_increase = 0.0 # 小周期总时长(h) t_small_cycle_h = (L_s + t_bw_s) / 3600.0 # 计算超级周期内CEB次数 k_bw_per_ceb = int(np.floor(p.T_ceb_interval_h / t_small_cycle_h)) if k_bw_per_ceb < 1: k_bw_per_ceb = 1 # 至少一个小周期 # ton水电耗查表 energy_lookup = { 3600: 0.1034, 3660: 0.1031, 3720: 0.1029, 3780: 0.1026, 3840: 0.1023, 3900: 0.1021, 3960: 0.1019, 4020: 0.1017, 4080: 0.1015, 4140: 0.1012, 4200: 0.1011 } for _ in range(k_bw_per_ceb): tmp_run_start = tmp # 过滤阶段TMP增长 dtmp = _delta_tmp(p, L_h) tmp_peak = tmp_run_start + dtmp # 约束1:峰值不得超过硬上限 if tmp_peak > p.TMP_max + 1e-12: return False, {"reason": "TMP_max violated during filtration", "TMP_peak": tmp_peak} # 更新最大和最小TMP if tmp_peak > max_tmp_during_filtration: max_tmp_during_filtration = tmp_peak if tmp_run_start < min_tmp_during_filtration: # 新增:记录运行开始时的最小TMP min_tmp_during_filtration = tmp_run_start # 物理反洗 phi = phi_bw_of(p, L_s, t_bw_s) tmp_after_bw = tmp_peak - phi * (tmp_peak - tmp_run_start) # 约束2:单次残余增量控制 residual_inc = tmp_after_bw - tmp_run_start if residual_inc > p.dTMP + 1e-12: return False, { "reason": "residual TMP increase after BW exceeded dTMP", "residual_increase": residual_inc, "limit_dTMP": p.dTMP } if residual_inc > max_residual_increase: max_residual_increase = residual_inc tmp = tmp_after_bw # CEB tmp_after_ceb = p.TMP0 # 体积与回收率 V_feed_super = k_bw_per_ceb * p.q_UF * L_h V_loss_super = k_bw_per_ceb * _v_bw_m3(p, t_bw_s) + p.v_ceb_m3 V_net = max(0.0, V_feed_super - V_loss_super) recovery = max(0.0, V_net / max(V_feed_super, 1e-12)) # 时间与净供水率 T_super_h = k_bw_per_ceb * (L_s + t_bw_s) / 3600.0 + p.t_ceb_s / 3600.0 net_delivery_rate_m3ph = V_net / max(T_super_h, 1e-12) # 贴边比例与硬限 headroom_ratio = max_tmp_during_filtration / max(p.TMP_max, 1e-12) if headroom_ratio > p.headroom_hardcap + 1e-12: return False, {"reason": "headroom hardcap exceeded", "headroom_ratio": headroom_ratio} # —— 新增指标 1:日均产水时间(h/d) —— daily_prod_time_h = k_bw_per_ceb * L_h / T_super_h * 24.0 # —— 新增指标 2:吨水电耗(kWh/m³) —— closest_L = min(energy_lookup.keys(), key=lambda x: abs(x - L_s)) ton_water_energy = energy_lookup[closest_L] info = { "recovery": recovery, "V_feed_super_m3": V_feed_super, "V_loss_super_m3": V_loss_super, "V_net_super_m3": V_net, "supercycle_time_h": T_super_h, "net_delivery_rate_m3ph": net_delivery_rate_m3ph, "max_TMP_during_filtration": max_tmp_during_filtration, "min_TMP_during_filtration": min_tmp_during_filtration, # 新增:最小TMP "max_residual_increase_per_run": max_residual_increase, "phi_bw_effective": phi, "TMP_after_ceb": tmp_after_ceb, "headroom_ratio": headroom_ratio, "daily_prod_time_h": daily_prod_time_h, "ton_water_energy_kWh_per_m3": ton_water_energy, "k_bw_per_ceb": k_bw_per_ceb } return True, info def _score(p: UFParams, rec: dict) -> float: """综合评分:越大越好。通过非线性放大奖励差异,强化区分好坏动作""" # —— 无量纲化净供水率 —— rate_norm = rec["net_delivery_rate_m3ph"] / max(p.q_UF, 1e-12) # —— TMP soft penalty (sigmoid) —— tmp_ratio = rec["max_TMP_during_filtration"] / max(p.TMP_max, 1e-12) k = 10.0 headroom_penalty = 1.0 / (1.0 + np.exp(-k * (tmp_ratio - 1.0))) # —— 基础 reward(0.6~0.9左右)—— base_reward = ( p.w_rec * rec["recovery"] + p.w_rate * rate_norm - p.w_headroom * headroom_penalty ) # —— 非线性放大:平方映射 + 缩放 —— # 目的是放大好坏动作差异,同时限制最大值,避免 TD-error 过大 amplified_reward = (base_reward - 0.5) ** 2 * 5.0 # —— 可选:保留符号,区分负奖励 if base_reward < 0.5: amplified_reward = -amplified_reward return amplified_reward class UFSuperCycleEnv(gym.Env): """超滤系统环境(超级周期级别决策)""" metadata = {"render_modes": ["human"]} def __init__(self, base_params, max_episode_steps: int = 20): super(UFSuperCycleEnv, self).__init__() self.base_params = base_params self.current_params = copy.deepcopy(base_params) self.max_episode_steps = max_episode_steps self.current_step = 0 # 计算离散动作空间 self.L_values = np.arange( self.base_params.L_min_s, self.base_params.L_max_s + self.base_params.L_step_s, self.base_params.L_step_s ) self.t_bw_values = np.arange( self.base_params.t_bw_min_s, self.base_params.t_bw_max_s + self.base_params.t_bw_step_s, self.base_params.t_bw_step_s ) self.num_L = len(self.L_values) self.num_bw = len(self.t_bw_values) # 单一离散动作空间 self.action_space = spaces.Discrete(self.num_L * self.num_bw) # 状态空间增加 TMP0, 上一次动作(L_s, t_bw_s), 本周期最高 TMP # 状态归一化均在 _get_obs 内处理 self.observation_space = spaces.Box( low=np.zeros(4, dtype=np.float32), high=np.ones(4, dtype=np.float32), dtype=np.float32 ) # 初始化状态 self.last_action = (self.base_params.L_min_s, self.base_params.t_bw_min_s) self.max_TMP_during_filtration = self.current_params.TMP0 self.reset(seed=None) def _get_obs(self): TMP0 = self.current_params.TMP0 TMP0_norm = (TMP0 - 0.01) / (0.05 - 0.01) L_s, t_bw_s = self.last_action L_norm = (L_s - self.base_params.L_min_s) / (self.base_params.L_max_s - self.base_params.L_min_s) t_bw_norm = (t_bw_s - self.base_params.t_bw_min_s) / (self.base_params.t_bw_max_s - self.base_params.t_bw_min_s) max_TMP_norm = (self.max_TMP_during_filtration - 0.01) / (0.05 - 0.01) return np.array([TMP0_norm, L_norm, t_bw_norm, max_TMP_norm], dtype=np.float32) def _get_action_values(self, action): L_idx = action // self.num_bw t_bw_idx = action % self.num_bw return self.L_values[L_idx], self.t_bw_values[t_bw_idx] def reset(self, seed=None, options=None): super().reset(seed=seed) self.current_params.TMP0 = np.random.uniform(0.01, 0.03) self.current_step = 0 self.last_action = (self.base_params.L_min_s, self.base_params.t_bw_min_s) self.max_TMP_during_filtration = self.current_params.TMP0 return self._get_obs(), {} def step(self, action): self.current_step += 1 L_s, t_bw_s = self._get_action_values(action) L_s = np.clip(L_s, self.base_params.L_min_s, self.base_params.L_max_s) t_bw_s = np.clip(t_bw_s, self.base_params.t_bw_min_s, self.base_params.t_bw_max_s) # 模拟超级周期 feasible, info = simulate_one_supercycle(self.current_params, L_s, t_bw_s) if feasible: reward = _score(self.current_params, info) self.current_params.TMP0 = info["TMP_after_ceb"] self.max_TMP_during_filtration = info["max_TMP_during_filtration"] terminated = False else: reward = -20 terminated = True truncated = self.current_step >= self.max_episode_steps self.last_action = (L_s, t_bw_s) next_obs = self._get_obs() info["feasible"] = feasible info["step"] = self.current_step return next_obs, reward, terminated, truncated, info