""" uf_physics_48h.py 超滤(UF)系统物理模型与确定性计算规则模块。 本模块定义: - 与强化学习算法无关的物理规律 - 与环境状态更新相关的确定性计算 - 超滤系统中的基础物理量转换关系 设计原则: - 不依赖 env / agent / trainer - 不包含强化学习语义 - 不负责参数加载策略(由上层控制) 该模块应可被: - 强化学习环境 - 离线仿真 - 工艺分析脚本 独立复用。 """ import numpy as np import copy from uf_train.env.env_params import UFState, UFPhysicsParams class UFPhysicsModel: """ 超滤系统无状态物理模型(Physical Rules) 说明: - 本类不保存任何动态状态 - 所有计算结果仅依赖输入参数 - 表达的是“物理规律”,而不是“设备实例” """ def __init__( self, phys_params: UFPhysicsParams, resistance_model_fp=None, resistance_model_bw=None, IS_TIMES: bool = False, ): """ 参数: phys_params: 物理/工艺固定参数 resistance_model_fp: 过滤阶段阻力增长模型 resistance_model_bw: 反洗阶段阻力下降模型 IS_TIMES: CEB是否为固定次数,T为固定次数 """ self.p = phys_params self.model_fp = resistance_model_fp self.model_bw = resistance_model_bw self.IS_TIMES = IS_TIMES # ========================================================== # 水温-粘度关系 # ========================================================== def viscosity(self, temp: float) -> float: """ 锡山水厂水温粘度修正公式 功能:根据水温计算水的动力粘度(考虑温度影响) 参数: temp (float): 水温(摄氏度) 返回: float: 水的动力粘度 μ (Pa·s) 原理: - 水的粘度随温度升高而降低 - 25℃时纯水粘度约为 0.00089 Pa·s - 本公式基于锡山水厂PLC系统的经验修正因子 注意: - 本公式基于纯水粘度修正 - 实际水厂水质与纯水有差异,对粘度有一定影响 - 未来可根据实际水质进一步校准 """ # 温度归一化(相对于300K) x = (temp + 273.15) / 300 # 摄氏度转开尔文 # 温度修正因子(经验公式,基于锡山水厂PLC) factor = 890 / ( 280.68 * x ** -1.9 + 511.45 * x ** -7.7 + 61.131 * x ** -19.6 + 0.45903 * x ** -40 ) # 计算修正后的粘度(25℃标准粘度 / 修正因子) mu = 0.00089 / factor # [Pa·s] return mu # ========================================================== # TMP → 膜阻力 # ========================================================== def resistance_from_tmp( self, tmp: float, q_UF: float, temp: float ) -> float: """ 由跨膜压差计算膜阻力 功能:根据 Darcy 定律,由跨膜压差反推膜阻力 参数: tmp (float): 跨膜压差 TMP (MPa) q_UF (float): 过滤流量 (m³/h) temp (float): 水温 (℃) 返回: float: 膜阻力 R(已缩放 1e10) 原理: Darcy 定律:J = TMP / (μ × R) 其中: - J: 膜通量 [m/s] - TMP: 跨膜压差 [Pa] - μ: 水的动力粘度 [Pa·s] - R: 膜阻力 [m⁻¹] 反解得:R = TMP / (J × μ) 注意: - 超滤膜阻力实际量级为 1e12 m⁻¹ - 为便于数值计算,已缩放 1e10 倍至 1e2 量级 """ # 温度修正后的水粘度 mu = self.viscosity(temp) # [Pa·s] # 膜有效面积(在参数文件中配置) A = self.p.A # 跨膜压差单位转换:MPa → Pa TMP_Pa = tmp * 1e6 # [Pa] # 计算膜通量:流量 / 面积 J = q_UF / A / 3600 # [m³/h] → [m³/(m²·s)] = [m/s] # 物理约束检查:通量和粘度必须为正 if J <= 0 or mu <= 0: return np.nan # 根据 Darcy 定律计算膜阻力并缩放 R = TMP_Pa / (J * mu) / 1e10 # [m⁻¹] → [缩放单位] return float(R) # ========================================================== # 膜阻力 → TMP # ========================================================== def tmp_from_resistance( self, R: float, q_UF: float, temp: float ) -> float: """ 由膜阻力计算跨膜压差 功能:根据 Darcy 定律,由膜阻力计算跨膜压差(_calculate_resistance 的逆运算) 参数: R (float): 膜阻力(已缩放 1e10) q_UF (float): 过滤流量 (m³/h) temp (float): 水温 (℃) 返回: float: 跨膜压差 TMP (MPa) 原理: Darcy 定律:TMP = J × μ × R 其中: - J: 膜通量 [m/s] - μ: 水的动力粘度 [Pa·s] - R: 膜阻力 [m⁻¹] """ # 温度修正后的水粘度 mu = self.viscosity(temp) # [Pa·s] # 膜有效面积(在参数文件中配置) A = self.p.A # 计算膜通量 J = q_UF / A / 3600 # [m/s] # 根据 Darcy 定律计算跨膜压差(还原缩放) TMP_Pa = R * J * mu * 1e10 # [缩放单位] → [Pa] # 单位转换:Pa → MPa tmp = TMP_Pa / 1e6 # [MPa] return float(tmp) def delta_resistance_filter( self, state, L_s: float ) -> float: """ 过滤阶段膜阻力上升量(数据驱动模型) 参数: state (UFState): 当前运行状态 L_s (float): 过滤时长 [s] 返回: float: 阻力增量 ΔR """ if self.model_fp is None: raise RuntimeError("过滤阶段阻力模型未注入") return float(self.model_fp(state, L_s)) def delta_resistance_backwash( self, state, R0: float, R_end: float, L_h_next_start: float, t_bw_s: float ) -> float: """ 物理反洗可去除的膜阻力(数据驱动模型) 返回: float: 可去除阻力 """ if self.model_bw is None: raise RuntimeError("反洗阶段阻力模型未注入") return float( self.model_bw( state, R0, R_end, L_h_next_start, t_bw_s ) ) def backwash_water_volume(self, t_bw_s: float) -> float: """ 计算物理反洗水耗 参数: t_bw_s (float): 反洗时长 [s] 返回: float: 反洗水耗 [m³] """ return float(self.p.q_bw_m3ph * t_bw_s / 3600.0) def simulate_one_supercycle(self, state: UFState, L_s: float, t_bw_s: float) -> UFState: """ 模拟一个超级周期(Super Cycle) 返回 info 字典 + 更新后的 UFState IS_TIMES: False CEB执行是否采用次数控制,默认为 False, 采用时长控制 """ # ========== 初始化周期参数 ========== L_h = float(L_s) / 3600.0 # 过滤时长转换:秒 → 小时 # 初始状态(统一用 TMP / R,与当前 state 保持一致) initial_tmp = state.TMP # 记录周期初始跨膜压差 initial_R = state.R # 记录周期初始膜阻力 tmp = initial_tmp # 当前跨膜压差 # 跟踪变量(用于记录周期内的极值) max_tmp_during_filtration = tmp # 周期内最大TMP min_tmp_during_filtration = tmp # 周期内最小TMP max_residual_increase = 0.0 # 周期内最大残余污染增量 # ========== 计算小周期数量 ========== # 小周期时长 = 过滤时长 + 物理反洗时长 t_small_cycle_h = (L_s + t_bw_s) / 3600.0 # [小时] # 计算一个超级周期内包含多少个小周期 if self.IS_TIMES: # 采用次数控制 k_bw_per_ceb = self.p.T_ceb_interval_times else: # 采用时长控制 # k = floor(CEB间隔时间 / 小周期时长) k_bw_per_ceb = int(np.floor(self.p.T_ceb_interval_h / t_small_cycle_h)) if k_bw_per_ceb < 1: k_bw_per_ceb = 1 # 至少包含1个小周期 # ========== 循环模拟每个小周期(过滤 + 物理反洗) ========== for idx in range(k_bw_per_ceb): # --- 小周期开始状态 --- tmp_run_start = tmp # 本次过滤开始时的TMP q_UF = state.q_UF # 过滤流量 temp = state.temp # 水温 # --- 过滤阶段:膜阻力上升 --- R_run_start = self.resistance_from_tmp(tmp_run_start, q_UF, temp) # 过滤开始时的膜阻力 d_R = self.delta_resistance_filter(state, L_s) # 过滤阶段膜阻力增量 R_peak = R_run_start + d_R # 过滤结束时的膜阻力(峰值) tmp_peak = self.tmp_from_resistance(R_peak, q_UF, temp) # 过滤结束时的TMP(峰值) # 更新TMP极值记录 max_tmp_during_filtration = max(max_tmp_during_filtration, tmp_peak) min_tmp_during_filtration = min(min_tmp_during_filtration, tmp_run_start) # --- 物理反洗阶段:膜阻力下降 --- # 计算累积运行时间(用于长期污染模型) L_h_next_start = (L_s + t_bw_s) / 3600.0 * (idx + 1) # 下一小周期起始时间 # 调用膜阻力下降模型,计算物理反洗可去除的阻力 reversible_R = self.delta_resistance_backwash(state, initial_R, R_peak, L_h_next_start, t_bw_s) # 物理反洗后的膜阻力 R_after_bw = R_peak - reversible_R tmp_after_bw = self.tmp_from_resistance(R_after_bw, q_UF, temp) # 计算残余污染增量(反洗后的TMP相对本次开始的增加) residual_inc = tmp_after_bw - tmp_run_start max_residual_increase = max(max_residual_increase, residual_inc) # 更新TMP(作为下一小周期的起始TMP) tmp = tmp_after_bw # ========== 化学增强反洗 (CEB) ========== # CEB比物理反洗更彻底,可去除部分不可逆污染 R_after_ceb = R_peak - state.ceb_removal # CEB后的膜阻力 tmp_after_ceb = self.tmp_from_resistance(R_after_ceb, q_UF, temp) # CEB后的TMP # ============================================================ # 计算周期性能指标 # ============================================================ # ========== 水量平衡计算 ========== # 进水总量(所有小周期的过滤进水之和) V_feed_super = k_bw_per_ceb * state.q_UF * L_h # 损失水量(物理反洗 + 化学反洗) V_loss_super = k_bw_per_ceb * self.backwash_water_volume(t_bw_s) + self.p.v_ceb_m3 V_net = max(0.0, V_feed_super - V_loss_super) # 回收率(净产水 / 进水总量) # 加1e-12避免除零,max确保非负 recovery = max(0.0, V_net / max(V_feed_super, 1e-12)) # ========== 时间与能耗计算 ========== # 超级周期总时长 T_super_h = k_bw_per_ceb * (L_s + t_bw_s) / 3600.0 + self.p.t_ceb_s / 3600.0 # [小时] # 日均产水时间(24小时内实际产水的时间) daily_prod_time_h = k_bw_per_ceb * L_h / T_super_h * 24.0 # [小时] # 吨水电耗(从查找表获取最接近的值) # 从物理参数类中获取查找表 closest_L = min(self.p.energy_lookup.keys(), key=lambda x: abs(x - L_s)) ton_water_energy = self.p.energy_lookup[closest_L] # [kWh/m³] # ===== 新指标:膜阻力允许上升空间 ===== # 该指标根据当前最大跨膜压差距离软约束跨膜压差的距离,动态计算当前周期允许上升的膜阻力值,用于后续清洗效果奖励计算 delta_R_allow = max( self.resistance_from_tmp(self.p.global_TMP_soft_limit, state.q_UF, state.temp) - self.resistance_from_tmp(max_tmp_during_filtration, state.q_UF, state.temp), 1e-6 ) if delta_R_allow > 50: residual_ratio = (R_after_ceb - initial_R) / delta_R_allow else: residual_ratio = 1.0 # ========== 构建性能指标字典 ========== info = { # 运行参数 "q_UF": state.q_UF, # 过滤流量 "temp": state.temp, # 水温 # 水量指标 "recovery": recovery, # 回收率 "V_feed_super_m3": V_feed_super, # 进水总量 "V_loss_super_m3": V_loss_super, # 损失水量 "V_net_super_m3": V_net, # 净产水量 # 时间指标 "supercycle_time_h": T_super_h, # 超级周期时长 "daily_prod_time_h": daily_prod_time_h, # 日均产水时间 "k_bw_per_ceb": k_bw_per_ceb, # 小周期数量 # TMP指标 "max_TMP_during_filtration": max_tmp_during_filtration, # 周期内最大TMP "min_TMP_during_filtration": min_tmp_during_filtration, # 周期内最小TMP "initial_tmp": initial_tmp, # 周期初始TMP "tmp_after_ceb": tmp_after_ceb, # CEB后TMP # 膜阻力指标 "initial_R": initial_R, # 周期初始膜阻力 "R_after_ceb": R_after_ceb, # CEB后膜阻力 "max_residual_increase_per_run": max_residual_increase, # 最大残余污染增量 "delta_R_allow": delta_R_allow, # 污染允许增长空间 "residual_ratio" : residual_ratio, # 污染上升比例 # 能耗指标 "ton_water_energy_kWh_per_m3": ton_water_energy, # 吨水电耗 } # 更新 state next_state = copy.deepcopy(state) next_state.TMP = tmp_after_ceb next_state.R = R_after_ceb # ========== 可选更新的参数 ========== # 这些参数可根据实际情况动态调整,预留扩展接口 next_state.nuK = state.nuK # 短期污染系数 next_state.slope = state.slope # 长期污染斜率 next_state.power = state.power # 长期污染幂次 next_state.ceb_removal = state.ceb_removal # CEB去除能力 next_state.q_UF = state.q_UF # 过滤流量 next_state.temp = state.temp # 水温 return info, next_state def is_dead_cycle(self, info: dict) -> bool: """ 判断当前超级周期是否成功(可行) 功能: - 检查超级周期是否违反运行约束 - 用于强化学习环境单步step的失败判定(terminated条件) - True表示成功,False表示失败 参数: info (dict): simulate_one_supercycle() 返回的性能指标字典 返回: bool: True表示成功周期,False表示失败周期 失败条件(任一满足即失败): 1. TMP超限:max_TMP > global_TMP_limit - 原因:TMP过高会损坏膜或影响产水质量 - 阈值:0.08 MPa(可配置) 2. 回收率过低:recovery < 0.75 - 原因:回收率太低说明反洗水耗过大,经济性差 - 阈值:75%(可调整) 3. 残余污染累积过快:(R_after_ceb - R0) / R0 > 0.1 - 原因:单个超级周期污染增长超过10%,长期运行不可持续 - 阈值:10%(可调整) """ # ========== 获取关键指标 ========== TMP_limit = self.p.global_TMP_hard_limit # TMP硬约束上限 max_tmp = info.get("max_TMP_during_filtration", 0) # 周期内最大TMP recovery = info.get("recovery", 1.0) # 回收率 R_after_ceb = info.get("R_after_ceb", 0) # CEB后膜阻力 R0 = info.get("initial_R", 1e-6) # 初始膜阻力 delta_R_allow = info.get("delta_R_allow", 1e-6) # 允许上升的膜阻力(加小值避免除零) # ========== 失败条件检查 ========== # 条件1:TMP超限 if max_tmp > TMP_limit: return False # 失败 # 条件2:回收率过低 if recovery < 0.75: return False # 失败 # 条件3:污染增长比例超过容许范围 residual_increase = (R_after_ceb - R0) / delta_R_allow if residual_increase > 1 / 30: return False # 失败 # 所有条件通过 return True # 成功 def check_dead_initial_state( self, init_state: UFState, max_steps: int = 45, L_s: float = 3800.0, t_bw_s: float = 60.0 ) -> bool: """ 判断给定初始状态在指定动作策略下,是否为物理可行(non-dead)。 从 init_state 出发,使用固定动作 (L_s, t_bw_s) 连续模拟 max_steps 个 supercycle: - 任意一次 is_dead_cycle(info) 为 False → 必死 - 任意一步 TMP0 < 0 → 必死 - 任意异常 → 必死 参数: init_state: 初始 UFState(不会被原地修改) max_steps: 前向模拟的最大步数 L_s: 过滤时长(秒) t_bw_s: 物理反洗时长(秒) 返回: bool: True → 物理可行(non-dead) False → 必死状态 """ import copy # 使用局部副本,确保 physics 无副作用 curr_state = copy.deepcopy(init_state) for step in range(max_steps): try: info, next_state = self.simulate_one_supercycle( curr_state, L_s=L_s, t_bw_s=t_bw_s ) except Exception: # 任何异常都视为物理不可行 return False # 单步物理可行性判定 if not self.is_dead_cycle(info): return False # 物理硬约束:TMP 不允许为负 if next_state.TMP < 0: return False # 进入下一步 curr_state = next_state return True