""" 超滤强化学习环境模块 ======================== 本模块定义了超滤系统的强化学习环境,包括: 1. UFParams: 超滤系统参数配置类 2. 膜阻力与跨膜压差转换函数 3. simulate_one_supercycle: 超级周期模拟函数 4. calculate_reward: 奖励函数 5. is_dead_cycle: 失败判定函数 6. UFSuperCycleEnv: Gymnasium环境类 模块设计说明: - 基于 Gymnasium (原OpenAI Gym) 标准接口 - 模拟超滤膜的"超级周期"运行(多次物理反洗 + 一次化学反洗) - 强化学习智能体通过优化过滤时长和反洗时长来最大化回收率并控制污染累积 """ import os import torch from pathlib import Path import numpy as np import gymnasium as gym from gymnasium import spaces from typing import Dict, Tuple, Optional import torch import torch.nn as nn from dataclasses import dataclass, asdict from UF_resistance_models import ResistanceIncreaseModel, ResistanceDecreaseModel # 导入膜阻力模型 import copy # ==================== 超滤系统参数配置类 ==================== @dataclass class UFParams: """ 超滤系统参数配置类 功能:统一管理超滤系统的所有运行参数,包括: - 膜动态运行参数(流量、温度、压差等) - 膜阻力模型参数(污染增长速率、去除效率等) - 膜运行约束参数(各参数的上下限) - 反洗参数(物理反洗、化学反洗) - 动作搜索范围(过滤时长、反洗时长的取值范围) - 奖励函数参数 设计思想: - 使用 dataclass 装饰器,自动生成 __init__、__repr__ 等方法 - 所有参数带有类型注解和默认值 - 参数值基于锡山水厂的实际运行数据和经验 """ # ========== 膜动态运行参数 ========== # 这些参数描述超滤膜的实时运行状态,在环境模拟中会动态变化 q_UF: float = 360.0 # 过滤进水流量(m³/h) # 说明:影响膜通量,进而影响污染速率 # 典型范围:250-400 m³/h TMP0: float = 0.03 # 初始跨膜压差(MPa,兆帕) # 说明:反映膜阻力状态,TMP 越高表示膜污染越严重 # 正常范围:0.01-0.035 MPa,超过 0.08 MPa 需停机检修 temp: float = 25.0 # 水温(摄氏度) # 说明:影响水的粘度,进而影响跨膜压差 # 典型范围:10-40℃,25℃为标准温度 # ========== 膜阻力模型参数 ========== # 这些参数描述膜污染的物理化学特性,基于历史数据拟合得到 nuK: float = 6.92e+01 # 过滤阶段膜阻力增长系数(缩放后单位) # 说明:反映水质污染特性,nuK 越大表示水质越差、膜污染越快 # 物理意义:单位膜通量、单位时间的阻力增长速率 slope: float = 3.44e-01 # 全周期不可逆污染增长斜率 # 说明:描述长期不可逆污染的累积速率(幂律模型的系数) power: float = 1.032 # 全周期不可逆污染增长幂次 # 说明:描述长期污染的非线性特性(幂律模型的指数) # power > 1 表示污染加速累积,power < 1 表示污染增速放缓 tau_bw_s: float = 30.0 # 物理反洗时长影响的时间尺度(秒) # 说明:反洗效率的特征时间,当反洗时长 = tau 时,达到约 63% 效率 gamma_t: float = 1.0 # 物理反洗时长作用指数(保留参数,当前未使用) ceb_removal: float = 150 # 化学增强反洗(CEB)可去除的膜阻力(缩放后单位) # 说明:CEB 比物理反洗更彻底,可去除部分不可逆污染 # ========== 膜运行约束参数 ========== # 定义各运行参数的物理约束和安全限制 global_TMP_hard_limit: float = 0.08 # TMP 硬上限(MPa) # 说明:超过此值将导致episode失败,需立即停机 global_TMP_soft_limit: float = 0.06 # TMP 软上限 (MPa) # 说明:此上限用于指导奖励函数中膜阻力允许上升值,越接近该上限,系统对膜阻力上升控制的更严格 # --- 初始TMP约束 --- TMP0_max: float = 0.035 # 初始TMP上限(MPa) TMP0_min: float = 0.01 # 初始TMP下限(MPa) # --- 流量约束 --- q_UF_max: float = 400.0 # 进水流量上限(m³/h) q_UF_min: float = 250.0 # 进水流量下限(m³/h) # --- 温度约束 --- temp_max: float = 40.0 # 温度上限(℃) temp_min: float = 10.0 # 温度下限(℃) # --- 短期污染模型参数约束 --- nuK_max: float = 1e+02 # 阻力增长系数上限 nuK_min: float = 6e+01 # 阻力增长系数下限 # --- 长期污染模型参数约束 --- slope_max: float = 10 # 不可逆污染斜率上限 slope_min: float = 0.5 # 不可逆污染斜率下限 power_max: float = 1.5 # 不可逆污染幂次上限 power_min: float = 0.5 # 不可逆污染幂次下限 # --- CEB去除能力约束 --- ceb_removal_max: float = 200 # CEB去除阻力上限(缩放后) ceb_removal_min: float = 100 # CEB去除阻力下限(缩放后) # ========== 反洗参数(固定配置) ========== q_bw_m3ph: float = 1000.0 # 物理反洗流量(m³/h) # 说明:反洗流量通常为正常过滤流量的 2-3 倍 fixed_t_bw_s = 60 # 固定物理反洗时间 # ========== CEB 化学反洗参数 ========== T_ceb_interval_h: float = 48.0 # CEB 间隔时间(小时) # 说明:每运行约 60 小时执行一次化学增强反洗 v_ceb_m3: float = 20.0 # CEB 用水体积(m³) t_ceb_s: float = 40 * 60.0 # CEB 时长(秒,这里为 40 分钟) # ========== 强化学习动作空间搜索范围 ========== # 定义智能体可选择的动作范围(离散化) L_min_s: float = 3800.0 # 过滤时长下限(秒,约 63 分钟) L_max_s: float = 4800.0 # 过滤时长上限,改为 4800s t_bw_min_s: float = 40.0 # 物理反洗时长下限(秒) t_bw_max_s: float = 60.0 # 物理反洗时长上限(秒) # ========== 动作离散化网格 ========== L_step_s: float = 60.0 # 过滤时长步长(秒) t_bw_step_s: float = 5.0 # 物理反洗时长步长(秒) # ========== 奖励函数参数 ========== k_rec = 5.0 # 回收率敏感度系数(控制回收率奖励的陡峭程度) k_res = 10.0 # 残余污染敏感度系数(控制污染惩罚的陡峭程度) rec_low, rec_high = 0.92, 0.99 # 回收率的正常范围 rr0 = 0.08 # 残余污染比例的参考值 # ==================== 辅助函数:膜阻力与跨膜压差转换 ==================== def xishan_viscosity(temp): """ 锡山水厂水温粘度修正公式 功能:根据水温计算水的动力粘度(考虑温度影响) 参数: temp (float): 水温(摄氏度) 返回: float: 水的动力粘度 μ (Pa·s) 原理: - 水的粘度随温度升高而降低 - 25℃时纯水粘度约为 0.00089 Pa·s - 本公式基于锡山水厂PLC系统的经验修正因子 注意: - 本公式基于纯水粘度修正 - 实际水厂水质与纯水有差异,对粘度有一定影响 - 未来可根据实际水质进一步校准 """ # 温度归一化(相对于300K) x = (temp + 273.15) / 300 # 摄氏度转开尔文 # 温度修正因子(经验公式,基于锡山水厂PLC) factor = 890 / ( 280.68 * x ** -1.9 + 511.45 * x ** -7.7 + 61.131 * x ** -19.6 + 0.45903 * x ** -40 ) # 计算修正后的粘度(25℃标准粘度 / 修正因子) mu = 0.00089 / factor # [Pa·s] return mu def _calculate_resistance(tmp, q_UF, temp): """ 由跨膜压差计算膜阻力 功能:根据 Darcy 定律,由跨膜压差反推膜阻力 参数: tmp (float): 跨膜压差 TMP (MPa) q_UF (float): 过滤流量 (m³/h) temp (float): 水温 (℃) 返回: float: 膜阻力 R(已缩放 1e10) 原理: Darcy 定律:J = TMP / (μ × R) 其中: - J: 膜通量 [m/s] - TMP: 跨膜压差 [Pa] - μ: 水的动力粘度 [Pa·s] - R: 膜阻力 [m⁻¹] 反解得:R = TMP / (J × μ) 注意: - 超滤膜阻力实际量级为 1e12 m⁻¹ - 为便于数值计算,已缩放 1e10 倍至 1e2 量级 """ # 膜有效面积(锡山水厂配置:128组 × 40 m²/组) A = 128 * 40 # [m²] # 温度修正后的水粘度 mu = xishan_viscosity(temp) # [Pa·s] # 跨膜压差单位转换:MPa → Pa TMP_Pa = tmp * 1e6 # [Pa] # 计算膜通量:流量 / 面积 J = q_UF / A / 3600 # [m³/h] → [m³/(m²·s)] = [m/s] # 物理约束检查:通量和粘度必须为正 if J <= 0 or mu <= 0: return np.nan # 根据 Darcy 定律计算膜阻力并缩放 R = TMP_Pa / (J * mu) / 1e10 # [m⁻¹] → [缩放单位] return float(R) def _calculate_tmp(R, q_UF, temp): """ 由膜阻力计算跨膜压差 功能:根据 Darcy 定律,由膜阻力计算跨膜压差(_calculate_resistance 的逆运算) 参数: R (float): 膜阻力(已缩放 1e10) q_UF (float): 过滤流量 (m³/h) temp (float): 水温 (℃) 返回: float: 跨膜压差 TMP (MPa) 原理: Darcy 定律:TMP = J × μ × R 其中: - J: 膜通量 [m/s] - μ: 水的动力粘度 [Pa·s] - R: 膜阻力 [m⁻¹] """ # 膜有效面积 A = 128 * 40 # [m²] # 温度修正后的水粘度 mu = xishan_viscosity(temp) # [Pa·s] # 计算膜通量 J = q_UF / A / 3600 # [m/s] # 根据 Darcy 定律计算跨膜压差(还原缩放) TMP_Pa = R * J * mu * 1e10 # [缩放单位] → [Pa] # 单位转换:Pa → MPa tmp = TMP_Pa / 1e6 # [MPa] return float(tmp) # ==================== 膜阻力模型加载函数 ==================== def load_resistance_models(): """ 加载膜阻力预测模型(单例模式) 功能: - 加载预训练的膜阻力上升模型和下降模型 - 使用全局变量实现单例模式,避免重复加载 - 仅在首次调用时执行加载操作 返回: tuple: (resistance_model_fp, resistance_model_bw) - resistance_model_fp: 过滤阶段阻力上升模型 - resistance_model_bw: 反洗阶段阻力下降模型 注意: - 模型文件必须与本脚本位于同一目录 - 模型已设置为推理模式(eval),不会更新参数 """ # 声明全局变量(实现单例模式) global resistance_model_fp, resistance_model_bw # 检查模型是否已加载(避免重复加载) if "resistance_model_fp" in globals() and resistance_model_fp is not None: return resistance_model_fp, resistance_model_bw print("🔄 正在加载膜阻力模型...") # 初始化模型对象 resistance_model_fp = ResistanceIncreaseModel() resistance_model_bw = ResistanceDecreaseModel() # 获取当前脚本所在目录 base_dir = Path(__file__).resolve().parent # 构造模型文件路径 fp_path = base_dir / "resistance_model_fp.pth" # 过滤阶段模型 bw_path = base_dir / "resistance_model_bw.pth" # 反洗阶段模型 # 检查模型文件是否存在 assert fp_path.exists(), f"缺少膜阻力上升模型文件: {fp_path.name}" assert bw_path.exists(), f"缺少膜阻力下降模型文件: {bw_path.name}" # 加载模型权重(map_location="cpu" 确保在没有GPU的环境也能运行) resistance_model_fp.load_state_dict(torch.load(fp_path, map_location="cpu")) resistance_model_bw.load_state_dict(torch.load(bw_path, map_location="cpu")) # 设置为推理模式(禁用 dropout、batchnorm 等训练特性) resistance_model_fp.eval() resistance_model_bw.eval() print("✅ 膜阻力模型加载成功!") return resistance_model_fp, resistance_model_bw # ==================== 膜阻力模型调用函数 ==================== def _delta_resistance(p, L_s: float) -> float: """ 计算过滤阶段膜阻力上升量 功能:调用预训练的膜阻力上升模型 参数: p (UFParams): 超滤运行参数 L_s (float): 过滤时长(秒) 返回: float: 膜阻力上升量 ΔR """ return resistance_model_fp(p, L_s) def phi_bw_of(p, R0: float, R_end: float, L_h_next_start: float, t_bw_s: float) -> float: """ 计算物理反洗可去除的膜阻力 功能:调用预训练的膜阻力下降模型 参数: p (UFParams): 超滤运行参数 R0 (float): 本超级周期初始膜阻力 R_end (float): 过滤结束时的膜阻力 L_h_start (float): 本小周期起始累积运行时间(小时) L_h_next_start (float): 下一小周期起始累积运行时间(小时) t_bw_s (float): 物理反洗时长(秒) 返回: float: 物理反洗去除的膜阻力量 """ return resistance_model_bw(p, R0, R_end, L_h_next_start, t_bw_s) def _v_bw_m3(p, t_bw_s: float) -> float: """ 计算物理反洗水耗 参数: p (UFParams): 超滤运行参数(包含反洗流量 q_bw_m3ph) t_bw_s (float): 物理反洗时长(秒) 返回: float: 反洗水耗(立方米) 公式: V = Q × t 其中 Q 为反洗流量 [m³/h],t 为反洗时长 [h] """ return float(p.q_bw_m3ph * (float(t_bw_s) / 3600.0)) def simulate_one_supercycle(p: UFParams, L_s: float, t_bw_s: float): """ 模拟一个完整的超级周期(Super Cycle) 功能: - 模拟超滤膜的完整运行周期:多次"过滤+物理反洗"小周期 + 一次化学增强反洗(CEB) - 计算周期内的各项性能指标(回收率、TMP变化、污染累积等) - 返回周期结束后的状态参数,用于下一周期的模拟 参数: p (UFParams): 当前超滤系统参数 L_s (float): 过滤时长(秒) t_bw_s (float): 物理反洗时长(秒) 返回: tuple: (info, next_params) - info (dict): 本周期的性能指标字典 - next_params (UFParams): 更新后的系统参数(用于下一周期) 超级周期结构: 超级周期 = k 次小周期 + 1 次 CEB 小周期 = 过滤 + 物理反洗 时间轴: |-- 小周期1 --|-- 小周期2 --|-- ... --|-- 小周期k --|-- CEB --| | 滤 | 物洗 | 滤 | 物洗 | ... | 滤 | 物洗 | 化洗 | """ # ========== 初始化周期参数 ========== L_h = float(L_s) / 3600.0 # 过滤时长转换:秒 → 小时 # 初始状态 tmp = p.TMP0 # 当前跨膜压差 R0 = _calculate_resistance(p.TMP0, p.q_UF, p.temp) # 初始膜阻力 # 跟踪变量(用于记录周期内的极值) max_tmp_during_filtration = tmp # 周期内最大TMP min_tmp_during_filtration = tmp # 周期内最小TMP max_residual_increase = 0.0 # 周期内最大残余污染增量 # ========== 计算小周期数量 ========== # 小周期时长 = 过滤时长 + 物理反洗时长 t_small_cycle_h = (L_s + t_bw_s) / 3600.0 # [小时] # 计算一个超级周期内包含多少个小周期 # k = floor(CEB间隔时间 / 小周期时长) k_bw_per_ceb = int(np.floor(p.T_ceb_interval_h / t_small_cycle_h)) if k_bw_per_ceb < 1: k_bw_per_ceb = 1 # 至少包含1个小周期 # ========== 吨水电耗查找表 ========== # 键:过滤时长(秒),值:吨水电耗(kWh/m³) energy_lookup = { 3600: 0.1034, 3660: 0.1031, 3720: 0.1029, 3780: 0.1026, 3840: 0.1023, 3900: 0.1021, 3960: 0.1019, 4020: 0.1017, 4080: 0.1015, 4140: 0.1012, 4200: 0.1011, 4260: 0.1008, 4320: 0.1007, 4380: 0.1005, 4440: 0.1003, 4500: 0.1001, 4560: 0.0999, 4620: 0.0998, 4680: 0.0996, 4740: 0.0995, 4800: 0.0993, } # ========== 循环模拟每个小周期(过滤 + 物理反洗) ========== for idx in range(k_bw_per_ceb): # --- 小周期开始状态 --- tmp_run_start = tmp # 本次过滤开始时的TMP q_UF = p.q_UF # 过滤流量 temp = p.temp # 水温 # --- 过滤阶段:膜阻力上升 --- R_run_start = _calculate_resistance(tmp_run_start, q_UF, temp) # 过滤开始时的膜阻力 d_R = _delta_resistance(p, L_s) # 过滤阶段膜阻力增量 R_peak = R_run_start + d_R # 过滤结束时的膜阻力(峰值) tmp_peak = _calculate_tmp(R_peak, q_UF, temp) # 过滤结束时的TMP(峰值) # 更新TMP极值记录 max_tmp_during_filtration = max(max_tmp_during_filtration, tmp_peak) min_tmp_during_filtration = min(min_tmp_during_filtration, tmp_run_start) # --- 物理反洗阶段:膜阻力下降 --- # 计算累积运行时间(用于长期污染模型) L_h_next_start = (L_s + t_bw_s) / 3600.0 * (idx + 1) # 下一小周期起始时间 # 调用膜阻力下降模型,计算物理反洗可去除的阻力 reversible_R = phi_bw_of(p, R0, R_peak, L_h_next_start, t_bw_s) # 物理反洗后的膜阻力 R_after_bw = R_peak - reversible_R tmp_after_bw = _calculate_tmp(R_after_bw, q_UF, temp) # 计算残余污染增量(反洗后的TMP相对本次开始的增加) residual_inc = tmp_after_bw - tmp_run_start max_residual_increase = max(max_residual_increase, residual_inc) # 更新TMP(作为下一小周期的起始TMP) tmp = tmp_after_bw # ========== 化学增强反洗 (CEB) ========== # CEB比物理反洗更彻底,可去除部分不可逆污染 R_after_ceb = R_peak - p.ceb_removal # CEB后的膜阻力 tmp_after_ceb = _calculate_tmp(R_after_ceb, q_UF, temp) # CEB后的TMP # ============================================================ # 计算周期性能指标 # ============================================================ # ========== 水量平衡计算 ========== # 进水总量(所有小周期的过滤进水之和) V_feed_super = k_bw_per_ceb * p.q_UF * L_h # [m³] # 损失水量(物理反洗 + 化学反洗) V_loss_super = k_bw_per_ceb * _v_bw_m3(p, t_bw_s) + p.v_ceb_m3 # [m³] # 净产水量 V_net = max(0.0, V_feed_super - V_loss_super) # [m³] # 回收率(净产水 / 进水总量) # 加1e-12避免除零,max确保非负 recovery = max(0.0, V_net / max(V_feed_super, 1e-12)) # [无量纲,0-1之间] # ========== 时间与能耗计算 ========== # 超级周期总时长 T_super_h = k_bw_per_ceb * (L_s + t_bw_s) / 3600.0 + p.t_ceb_s / 3600.0 # [小时] # 日均产水时间(24小时内实际产水的时间) daily_prod_time_h = k_bw_per_ceb * L_h / T_super_h * 24.0 # [小时] # 吨水电耗(从查找表获取最接近的值) closest_L = min(energy_lookup.keys(), key=lambda x: abs(x - L_s)) ton_water_energy = energy_lookup[closest_L] # [kWh/m³] # ===== 新指标:膜阻力允许上升空间 ===== R_max = _calculate_resistance(max_tmp_during_filtration, p.q_UF, p.temp) R_soft_limit = _calculate_resistance(p.global_TMP_soft_limit, p.q_UF, p.temp) delta_R_allow = max(R_soft_limit - R_max, 1e-6) # 供奖励函数使用的“污染允许增长空间” # ========== 构建性能指标字典 ========== info = { # 运行参数 "q_UF": p.q_UF, # 过滤流量 "temp": p.temp, # 水温 # 水量指标 "recovery": recovery, # 回收率 "V_feed_super_m3": V_feed_super, # 进水总量 "V_loss_super_m3": V_loss_super, # 损失水量 "V_net_super_m3": V_net, # 净产水量 # 时间指标 "supercycle_time_h": T_super_h, # 超级周期时长 "daily_prod_time_h": daily_prod_time_h, # 日均产水时间 "k_bw_per_ceb": k_bw_per_ceb, # 小周期数量 # TMP指标 "max_TMP_during_filtration": max_tmp_during_filtration, # 周期内最大TMP "min_TMP_during_filtration": min_tmp_during_filtration, # 周期内最小TMP "global_TMP_limit": p.global_TMP_hard_limit, # TMP限制 "TMP0": p.TMP0, # 周期初始TMP "TMP_after_ceb": tmp_after_ceb, # CEB后TMP # 膜阻力指标 "R0": R0, # 周期初始膜阻力 "R_after_ceb": R_after_ceb, # CEB后膜阻力 "max_residual_increase_per_run": max_residual_increase, # 最大残余污染增量 "delta_R_allow": delta_R_allow, # 污染允许增长空间 # 能耗指标 "ton_water_energy_kWh_per_m3": ton_water_energy, # 吨水电耗 } # ============================================================ # 状态更新:生成下一周期的初始参数 # ============================================================ # 深拷贝当前参数(避免修改原对象) next_params = copy.deepcopy(p) # ========== 必须更新的参数 ========== # 更新TMP为CEB后的值(作为下一超级周期的起始TMP) next_params.TMP0 = tmp_after_ceb # ========== 可选更新的参数(当前保持不变) ========== # 这些参数可根据实际情况动态调整,预留扩展接口 next_params.slope = p.slope # 长期污染斜率 next_params.power = p.power # 长期污染幂次 next_params.ceb_removal = p.ceb_removal # CEB去除能力 next_params.nuK = p.nuK # 短期污染系数 next_params.q_UF = p.q_UF # 过滤流量 next_params.temp = p.temp # 水温 return info, next_params def calculate_reward(p: UFParams, info: dict) -> float: """ 计算强化学习奖励函数 功能: - 平衡回收率和残余污染两个目标 - TMP不直接参与奖励计算(通过失败判定间接影响) - 使用 tanh 函数实现平滑的非线性奖励 参数: p (UFParams): 超滤参数(包含奖励函数超参数) info (dict): 周期性能指标字典 返回: float: 奖励值(通常在 -2 到 +2 之间) 设计思想: - 高回收率 → 水资源利用率高 → 正奖励 - 低残余污染 → 膜长期稳定运行 → 正奖励 - 两者需要权衡:过短的过滤时间提高回收率但污染去除不彻底; 过长的过滤时间污染控制好但回收率下降 参考点设计: - (recovery=0.97, residual_ratio=0.1) → reward ≈ 0(高回收但污染高) - (recovery=0.90, residual_ratio=0.0) → reward ≈ 0(低污染但回收率低) - (recovery≈0.94, residual_ratio≈0.05) → reward > 0(平衡点) """ # ========== 提取性能指标 ========== recovery = info["recovery"] # 回收率 [0-1] # 污染比例:实际上升的阻力 / 允许上升的阻力 # 允许上升的阻力值 = 当前阻力值软上限 - 当前阻力 residual_ratio = (info["R_after_ceb"] - info["R0"]) / info["delta_R_allow"] # ========== 回收率奖励项 ========== # 将回收率归一化到 [0, 1] 区间(基于预期范围) rec_norm = (recovery - p.rec_low) / (p.rec_high - p.rec_low) # 使用 tanh 函数构建平滑的 S 型奖励曲线 # - rec_norm = 0.5 时(回收率处于中间值),rec_reward = 0 # - rec_norm > 0.5 时,rec_reward > 0(鼓励高回收率) # - rec_norm < 0.5 时,rec_reward < 0(惩罚低回收率) # - k_rec 控制曲线陡峭程度,越大变化越陡 rec_reward = np.clip(np.tanh(p.k_rec * (rec_norm - 0.5)), -1, 1) # ========== 污染惩罚项 ========== # 使用 tanh 函数构建惩罚曲线 # - residual_ratio < rr0 时,res_penalty > 0(奖励低污染) # - residual_ratio > rr0 时,res_penalty < 0(惩罚高污染) # - k_res 控制曲线陡峭程度 res_penalty = -np.tanh(p.k_res * (residual_ratio / p.rr0 - 1)) # ========== 组合奖励 ========== # 简单线性组合两项(也可以加权) total_reward = rec_reward + res_penalty # 可选:添加平移项使特定点的奖励为零(当前未使用) # total_reward -= offset return total_reward def is_dead_cycle(info: dict) -> bool: """ 判断当前超级周期是否成功(可行) 功能: - 检查超级周期是否违反运行约束 - 用于强化学习的失败判定(terminated条件) - True表示成功,False表示失败 参数: info (dict): simulate_one_supercycle() 返回的性能指标字典 返回: bool: True表示成功周期,False表示失败周期 失败条件(任一满足即失败): 1. TMP超限:max_TMP > global_TMP_limit - 原因:TMP过高会损坏膜或影响产水质量 - 阈值:0.08 MPa(可配置) 2. 回收率过低:recovery < 0.75 - 原因:回收率太低说明反洗水耗过大,经济性差 - 阈值:75%(可调整) 3. 残余污染累积过快:(R_after_ceb - R0) / R0 > 0.1 - 原因:单个超级周期污染增长超过10%,长期运行不可持续 - 阈值:10%(可调整) """ # ========== 获取关键指标 ========== TMP_limit = info.get("global_TMP_limit", 0.08) # TMP硬约束上限 max_tmp = info.get("max_TMP_during_filtration", 0) # 周期内最大TMP recovery = info.get("recovery", 1.0) # 回收率 R_after_ceb = info.get("R_after_ceb", 0) # CEB后膜阻力 R0 = info.get("R0", 1e-6) # 初始膜阻力 delta_R_allow = info.get("delta_R_allow", 1e-6) # 允许上升的膜阻力(加小值避免除零) # ========== 失败条件检查 ========== # 条件1:TMP超限 if max_tmp > TMP_limit: return False # 失败 # 条件2:回收率过低 if recovery < 0.75: return False # 失败 # 条件3:污染增长量超过容许范围 residual_increase = (R_after_ceb - R0) / delta_R_allow if residual_increase > 1/15: return False # 失败 # 所有条件通过 return True # 成功 class UFSuperCycleEnv(gym.Env): """ 超滤系统强化学习环境(Gymnasium标准接口) 功能: - 模拟超滤膜的超级周期运行 - 智能体在每个超级周期选择过滤时长和反洗时长 - 目标:最大化回收率同时控制污染累积 状态空间 (8维,归一化到 [0,1]): 1. TMP0: 初始跨膜压差 2. q_UF: 过滤流量 3. temp: 水温 4. R0: 初始膜阻力 5. nuK: 短期污染系数 6. slope: 长期污染斜率 7. power: 长期污染幂次 8. ceb_removal: CEB去除能力 动作空间 (离散): - 二维离散动作组合:(过滤时长, 反洗时长) - 过滤时长: L_min_s ~ L_max_s,步长 L_step_s - 反洗时长: t_bw_min_s ~ t_bw_max_s,步长 t_bw_step_s - 总动作数 = len(L_values) × len(t_bw_values) 奖励机制: - 基于回收率和残余污染的平衡 - 失败 (TMP超限、回收率过低、污染过快) 时给予大负奖励 (-10) 终止条件: - terminated: 违反运行约束(失败) - truncated: 达到最大步数 (max_episode_steps) """ metadata = {"render_modes": ["human"]} def __init__(self, base_params, resistance_models=None, max_episode_steps: int = 15): """ 初始化超滤强化学习环境 参数: base_params (UFParams): 基础超滤参数配置 resistance_models (tuple, optional): 预加载的膜阻力模型,默认None(自动加载) max_episode_steps (int): 每个episode的最大步数,默认15 注:每步代表一个超级周期(约2-3天),15步约一个月 """ super(UFSuperCycleEnv, self).__init__() # ========== 参数初始化 ========== self.base_params = base_params # 基础参数(不变) self.current_params = copy.deepcopy(base_params) # 当前参数(动态更新) self.max_episode_steps = max_episode_steps # 最大步数 self.current_step = 0 # 当前步数计数器 # ========== 加载膜阻力模型 ========== if resistance_models is None: # 自动加载模型 self.resistance_model_fp, self.resistance_model_bw = load_resistance_models() else: # 使用预加载的模型(用于并行环境避免重复加载) self.resistance_model_fp, self.resistance_model_bw = resistance_models # ========== 构建进水时间离散动作空间 ========== # 过滤时长候选值(例:3800, 3860, 3920, ..., 5940, 6000秒) self.L_values = np.arange( self.base_params.L_min_s, self.base_params.L_max_s, self.base_params.L_step_s ) self.action_space = spaces.Discrete(len(self.L_values)) # ========== 定义状态空间 ========== # 8维连续状态,归一化到 [0, 1] self.observation_space = spaces.Box( low=np.zeros(8, dtype=np.float32), high=np.ones(8, dtype=np.float32), dtype=np.float32 ) # ========== 初始化环境(调用reset) ========== self.reset(seed=None) def generate_initial_state(self): """ 随机生成一个初始状态,并判断其污染增长速率约束是否满足。 若满足返回 True,不满足返回 False。 (不负责重复采样,由 reset() 控制) """ # 基础常数 A = 128 * 40.0 # 有效膜面积 # ---- 1. 随机生成 TMP、q_UF、温度 ---- self.current_params.TMP0 = np.random.uniform( self.current_params.TMP0_min, self.current_params.TMP0_max ) self.current_params.q_UF = np.random.uniform( self.current_params.q_UF_min, self.current_params.q_UF_max ) self.current_params.temp = np.random.uniform( self.current_params.temp_min, self.current_params.temp_max ) q_UF = self.current_params.q_UF # ---- 2. 随机 slope、power ---- slope = np.random.uniform(self.current_params.slope_min, self.current_params.slope_max) power = np.random.uniform(self.current_params.power_min, self.current_params.power_max) # ---- 3. 计算满足条件所需的最小 nuK ---- # 计算 t_max t_max = 60 if power >= 1 else 1 required_nuK_min = ( slope * power * (t_max ** (power - 1)) * (A / q_UF) ) # 若 required_nuK_min 超过可选范围 → 初始状态非法 if required_nuK_min > self.current_params.nuK_max: return False # ---- 4. 在可行范围中采样 nuK ---- nuK_low = max(required_nuK_min, self.current_params.nuK_min) nuK_high = self.current_params.nuK_max self.current_params.nuK = np.random.uniform(nuK_low, nuK_high) # ---- 5. 生成 CEB 去除率 ---- self.current_params.ceb_removal = np.random.uniform( self.current_params.ceb_removal_min, self.current_params.ceb_removal_max ) # ---- 6. 计算初始膜阻力 ---- self.current_params.R0 = _calculate_resistance( self.current_params.TMP0, self.current_params.q_UF, self.current_params.temp ) # ---- 7. slope/power 写入 ---- self.current_params.slope = slope self.current_params.power = power return True def reset(self, seed=None, options=None, max_attempts: int = 1000): super().reset(seed=seed) attempts = 0 while attempts < max_attempts: attempts += 1 # ==== Step 0: 生成初始状态(含污染速率约束) ==== ok_init = self.generate_initial_state() if not ok_init: continue # 运行稳定性检测 ok_run = self.check_dead_initial_state( max_steps=getattr(self, "max_episode_steps", 15), L_s=3800, t_bw_s=60 ) # 满足 → 成功生成初始状态 if ok_run: break else: raise RuntimeError(f"在 {max_attempts} 次尝试后仍无法生成可行初始状态。") # 初始化环境状态 self.current_step = 0 self.last_action = (self.base_params.L_min_s, self.base_params.t_bw_min_s) self.max_TMP_during_filtration = self.current_params.TMP0 self.TMP0 = self.current_params.TMP0 return self._get_obs(), {} def check_dead_initial_state(self, max_steps: int = None, L_s: int = 3800, t_bw_s: int = 60) -> bool: """ 判断当前环境生成的初始状态是否为可行(non-dead)。 使用最保守策略连续模拟 max_steps 次: 若任意一次 is_dead_cycle(info) 返回 False 或 next_params[0] < 0,则视为必死状态。 参数: max_steps: 模拟步数,默认使用 self.max_episode_steps L_s: 过滤时长(s),默认 3800 t_bw_s: 物理反洗时长(s),默认 60 返回: bool: True 表示可行状态(non-dead),False 表示必死状态 """ if max_steps is None: max_steps = getattr(self, "max_episode_steps", 15) if not hasattr(self, "current_params"): raise AttributeError("generate_initial_state() 未设置 current_params。") import copy curr_p = copy.deepcopy(self.current_params) # 逐步模拟 for step in range(max_steps): try: info, next_params = simulate_one_supercycle(curr_p, L_s, t_bw_s) except Exception: # 异常即视为不可行 return False # 任意一次不可行即为必死状态 if not is_dead_cycle(info): return False # 新增判断:下一步跨膜压差 TMP 不可能为负 if next_params.TMP0 < 0: return False # 更新参数,进入下一步模拟 curr_p = next_params return True def _get_state_copy(self): return copy.deepcopy(self.current_params) def _get_obs(self): """ 构建当前环境归一化状态向量 """ # === 1. 从 current_params 读取动态参数 === TMP0 = self.current_params.TMP0 q_UF = self.current_params.q_UF temp = self.current_params.temp # === 2. 计算本周期初始膜阻力 === R0 = _calculate_resistance(TMP0, q_UF, temp) # === 3. 从 current_params 读取膜阻力增长模型参数 === nuk = self.current_params.nuK slope = self.current_params.slope power = self.current_params.power ceb_removal = self.current_params.ceb_removal # === 4. 从 current_params 动态读取上下限 === TMP0_min, TMP0_max = self.current_params.TMP0_min, self.current_params.global_TMP_hard_limit q_UF_min, q_UF_max = self.current_params.q_UF_min, self.current_params.q_UF_max temp_min, temp_max = self.current_params.temp_min, self.current_params.temp_max nuK_min, nuK_max = self.current_params.nuK_min, self.current_params.nuK_max slope_min, slope_max = self.current_params.slope_min, self.current_params.slope_max power_min, power_max = self.current_params.power_min, self.current_params.power_max ceb_min, ceb_max = self.current_params.ceb_removal_min, self.current_params.ceb_removal_max # === 5. 归一化计算(clip防止越界) === TMP0_norm = np.clip((TMP0 - TMP0_min) / (TMP0_max - TMP0_min), 0, 1) q_UF_norm = np.clip((q_UF - q_UF_min) / (q_UF_max - q_UF_min), 0, 1) temp_norm = np.clip((temp - temp_min) / (temp_max - temp_min), 0, 1) # R0 不在 current_params 中定义上下限,设定经验范围 R0_norm = np.clip((R0 - 100.0) / (600.0 - 100.0), 0, 1) short_term_norm = np.clip((nuk - nuK_min) / (nuK_max - nuK_min), 0, 1) long_term_slope_norm = np.clip((slope - slope_min) / (slope_max - slope_min), 0, 1) long_term_power_norm = np.clip((power - power_min) / (power_max - power_min), 0, 1) ceb_removal_norm = np.clip((ceb_removal - ceb_min) / (ceb_max - ceb_min), 0, 1) # === 6. 构建观测向量 === obs = np.array([ TMP0_norm, q_UF_norm, temp_norm, R0_norm, short_term_norm, long_term_slope_norm, long_term_power_norm, ceb_removal_norm ], dtype=np.float32) return obs def _get_action_values(self, action: int): """ 新版动作解释函数: action 只对应一个 L_s,不再包含 t_bw_s。 """ L_s = self.L_values[action] return L_s def step(self, action): self.current_step += 1 L_s= self._get_action_values(action) L_s = np.clip(L_s, self.base_params.L_min_s, self.base_params.L_max_s) t_bw_s = self.current_params.fixed_t_bw_s # 模拟超级周期 info, next_params = simulate_one_supercycle(self.current_params, L_s, t_bw_s) # 根据 info 判断是否成功 feasible = is_dead_cycle(info) # True 表示成功循环,False 表示失败 if feasible: # 每步奖励 reward = calculate_reward(self.current_params, info) self.current_params = next_params terminated = False else: # 中途失败惩罚 reward = -20 terminated = True # 判断是否到达最大步数 truncated = self.current_step >= self.max_episode_steps self.last_action = (L_s, t_bw_s) next_obs = self._get_obs() info["feasible"] = feasible info["step"] = self.current_step # ===================== 测试终末奖励:鼓励 TMP 接近初始状态 ===================== # 仅在 episode 自然结束(满步但未提前失败)时触发 if truncated and not terminated: TMP_initial = self.TMP0 # reset 时记录的初始 TMP TMP_final = next_obs[0] # next_obs 提供的最终 TMP delta_ratio = abs((TMP_final - TMP_initial) / TMP_initial) alpha = 4.0 # TMP 偏差敏感度 gamma = 5.0 # 奖励幅度 stability_reward = gamma * (np.exp(-alpha * delta_ratio) - 1) # 量级在0到-5之间 reward += stability_reward terminated = True # episode 正式结束 return next_obs, reward, terminated, truncated, info