| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541 |
- """
- uf_physics_48h.py
- 超滤(UF)系统物理模型与确定性计算规则模块。
- 本模块定义:
- - 与强化学习算法无关的物理规律
- - 与环境状态更新相关的确定性计算
- - 超滤系统中的基础物理量转换关系
- 设计原则:
- - 不依赖 env / agent / trainer
- - 不包含强化学习语义
- - 不负责参数加载策略(由上层控制)
- 该模块应可被:
- - 强化学习环境
- - 离线仿真
- - 工艺分析脚本
- 独立复用。
- """
- import numpy as np
- import copy
- from env.env_params import UFState, UFPhysicsParams
- class UFPhysicsModel:
- """
- 超滤系统无状态物理模型(Physical Rules)
- 说明:
- - 本类不保存任何动态状态
- - 所有计算结果仅依赖输入参数
- - 表达的是“物理规律”,而不是“设备实例”
- """
- def __init__(
- self,
- phys_params: UFPhysicsParams,
- resistance_model_fp=None,
- resistance_model_bw=None,
- IS_TIMES: bool = False,
- ):
- """
- 参数:
- phys_params: 物理/工艺固定参数
- resistance_model_fp: 过滤阶段阻力增长模型
- resistance_model_bw: 反洗阶段阻力下降模型
- IS_TIMES: CEB是否为固定次数,T为固定次数
- """
- self.p = phys_params
- self.model_fp = resistance_model_fp
- self.model_bw = resistance_model_bw
- self.IS_TIMES = IS_TIMES
- # ==========================================================
- # 水温-粘度关系
- # ==========================================================
- def viscosity(self, temp: float) -> float:
- """
- 锡山水厂水温粘度修正公式
- 功能:根据水温计算水的动力粘度(考虑温度影响)
- 参数:
- temp (float): 水温(摄氏度)
- 返回:
- float: 水的动力粘度 μ (Pa·s)
- 原理:
- - 水的粘度随温度升高而降低
- - 25℃时纯水粘度约为 0.00089 Pa·s
- - 本公式基于锡山水厂PLC系统的经验修正因子
- 注意:
- - 本公式基于纯水粘度修正
- - 实际水厂水质与纯水有差异,对粘度有一定影响
- - 未来可根据实际水质进一步校准
- """
- # 温度归一化(相对于300K)
- x = (temp + 273.15) / 300 # 摄氏度转开尔文
- # 温度修正因子(经验公式,基于锡山水厂PLC)
- factor = 890 / (
- 280.68 * x ** -1.9 +
- 511.45 * x ** -7.7 +
- 61.131 * x ** -19.6 +
- 0.45903 * x ** -40
- )
- # 计算修正后的粘度(25℃标准粘度 / 修正因子)
- mu = 0.00089 / factor # [Pa·s]
- return mu
- # ==========================================================
- # TMP → 膜阻力
- # ==========================================================
- def resistance_from_tmp(
- self,
- tmp: float,
- q_UF: float,
- temp: float
- ) -> float:
- """
- 由跨膜压差计算膜阻力
- 功能:根据 Darcy 定律,由跨膜压差反推膜阻力
- 参数:
- tmp (float): 跨膜压差 TMP (MPa)
- q_UF (float): 过滤流量 (m³/h)
- temp (float): 水温 (℃)
- 返回:
- float: 膜阻力 R(已缩放 1e10)
- 原理:
- Darcy 定律:J = TMP / (μ × R)
- 其中:
- - J: 膜通量 [m/s]
- - TMP: 跨膜压差 [Pa]
- - μ: 水的动力粘度 [Pa·s]
- - R: 膜阻力 [m⁻¹]
- 反解得:R = TMP / (J × μ)
- 注意:
- - 超滤膜阻力实际量级为 1e12 m⁻¹
- - 为便于数值计算,已缩放 1e10 倍至 1e2 量级
- """
- # 温度修正后的水粘度
- mu = self.viscosity(temp) # [Pa·s]
- # 膜有效面积(在参数文件中配置)
- A = self.p.A
- # 跨膜压差单位转换:MPa → Pa
- TMP_Pa = tmp * 1e6 # [Pa]
- # 计算膜通量:流量 / 面积
- J = q_UF / A / 3600 # [m³/h] → [m³/(m²·s)] = [m/s]
- # 物理约束检查:通量和粘度必须为正
- if J <= 0 or mu <= 0:
- return np.nan
- # 根据 Darcy 定律计算膜阻力并缩放
- R = TMP_Pa / (J * mu) / 1e10 # [m⁻¹] → [缩放单位]
- return float(R)
- # ==========================================================
- # 膜阻力 → TMP
- # ==========================================================
- def tmp_from_resistance(
- self,
- R: float,
- q_UF: float,
- temp: float
- ) -> float:
- """
- 由膜阻力计算跨膜压差
- 功能:根据 Darcy 定律,由膜阻力计算跨膜压差(_calculate_resistance 的逆运算)
- 参数:
- R (float): 膜阻力(已缩放 1e10)
- q_UF (float): 过滤流量 (m³/h)
- temp (float): 水温 (℃)
- 返回:
- float: 跨膜压差 TMP (MPa)
- 原理:
- Darcy 定律:TMP = J × μ × R
- 其中:
- - J: 膜通量 [m/s]
- - μ: 水的动力粘度 [Pa·s]
- - R: 膜阻力 [m⁻¹]
- """
- # 温度修正后的水粘度
- mu = self.viscosity(temp) # [Pa·s]
- # 膜有效面积(在参数文件中配置)
- A = self.p.A
- # 计算膜通量
- J = q_UF / A / 3600 # [m/s]
- # 根据 Darcy 定律计算跨膜压差(还原缩放)
- TMP_Pa = R * J * mu * 1e10 # [缩放单位] → [Pa]
- # 单位转换:Pa → MPa
- tmp = TMP_Pa / 1e6 # [MPa]
- return float(tmp)
- def delta_resistance_filter(
- self,
- state,
- L_s: float
- ) -> float:
- """
- 过滤阶段膜阻力上升量(数据驱动模型)
- 参数:
- state (UFState): 当前运行状态
- L_s (float): 过滤时长 [s]
- 返回:
- float: 阻力增量 ΔR
- """
- if self.model_fp is None:
- raise RuntimeError("过滤阶段阻力模型未注入")
- return float(self.model_fp(state, L_s))
- def delta_resistance_backwash(
- self,
- state,
- R0: float,
- R_end: float,
- L_h_next_start: float,
- t_bw_s: float
- ) -> float:
- """
- 物理反洗可去除的膜阻力(数据驱动模型)
- 返回:
- float: 可去除阻力
- """
- if self.model_bw is None:
- raise RuntimeError("反洗阶段阻力模型未注入")
- return float(
- self.model_bw(
- state,
- R0,
- R_end,
- L_h_next_start,
- t_bw_s
- )
- )
- def backwash_water_volume(self, t_bw_s: float) -> float:
- """
- 计算物理反洗水耗
- 参数:
- t_bw_s (float): 反洗时长 [s]
- 返回:
- float: 反洗水耗 [m³]
- """
- return float(self.p.q_bw_m3ph * t_bw_s / 3600.0)
- def simulate_one_supercycle(self, state: UFState, L_s: float, t_bw_s: float) -> UFState:
- """
- 模拟一个超级周期(Super Cycle)
- 返回 info 字典 + 更新后的 UFState
- IS_TIMES: False CEB执行是否采用次数控制,默认为 False, 采用时长控制
- """
- # ========== 初始化周期参数 ==========
- L_h = float(L_s) / 3600.0 # 过滤时长转换:秒 → 小时
- # 初始状态(统一用 TMP / R,与当前 state 保持一致)
- initial_tmp = state.TMP # 记录周期初始跨膜压差
- initial_R = state.R # 记录周期初始膜阻力
- tmp = initial_tmp # 当前跨膜压差
- # 跟踪变量(用于记录周期内的极值)
- max_tmp_during_filtration = tmp # 周期内最大TMP
- min_tmp_during_filtration = tmp # 周期内最小TMP
- max_residual_increase = 0.0 # 周期内最大残余污染增量
- # ========== 计算小周期数量 ==========
- # 小周期时长 = 过滤时长 + 物理反洗时长
- t_small_cycle_h = (L_s + t_bw_s) / 3600.0 # [小时]
- # 计算一个超级周期内包含多少个小周期
- if self.IS_TIMES: # 采用次数控制
- k_bw_per_ceb = self.p.T_ceb_interval_times
- else: # 采用时长控制
- # k = floor(CEB间隔时间 / 小周期时长)
- k_bw_per_ceb = int(np.floor(self.p.T_ceb_interval_h / t_small_cycle_h))
- if k_bw_per_ceb < 1:
- k_bw_per_ceb = 1 # 至少包含1个小周期
- # ========== 循环模拟每个小周期(过滤 + 物理反洗) ==========
- for idx in range(k_bw_per_ceb):
- # --- 小周期开始状态 ---
- tmp_run_start = tmp # 本次过滤开始时的TMP
- q_UF = state.q_UF # 过滤流量
- temp = state.temp # 水温
- # --- 过滤阶段:膜阻力上升 ---
- R_run_start = self.resistance_from_tmp(tmp_run_start, q_UF, temp) # 过滤开始时的膜阻力
- d_R = self.delta_resistance_filter(state, L_s) # 过滤阶段膜阻力增量
- R_peak = R_run_start + d_R # 过滤结束时的膜阻力(峰值)
- tmp_peak = self.tmp_from_resistance(R_peak, q_UF, temp) # 过滤结束时的TMP(峰值)
- # 更新TMP极值记录
- max_tmp_during_filtration = max(max_tmp_during_filtration, tmp_peak)
- min_tmp_during_filtration = min(min_tmp_during_filtration, tmp_run_start)
- # --- 物理反洗阶段:膜阻力下降 ---
- # 计算累积运行时间(用于长期污染模型)
- L_h_next_start = (L_s + t_bw_s) / 3600.0 * (idx + 1) # 下一小周期起始时间
- # 调用膜阻力下降模型,计算物理反洗可去除的阻力
- reversible_R = self.delta_resistance_backwash(state, initial_R, R_peak, L_h_next_start, t_bw_s)
- # 物理反洗后的膜阻力
- R_after_bw = R_peak - reversible_R
- tmp_after_bw = self.tmp_from_resistance(R_after_bw, q_UF, temp)
- # 计算残余污染增量(反洗后的TMP相对本次开始的增加)
- residual_inc = tmp_after_bw - tmp_run_start
- max_residual_increase = max(max_residual_increase, residual_inc)
- # 更新TMP(作为下一小周期的起始TMP)
- tmp = tmp_after_bw
- # ========== 化学增强反洗 (CEB) ==========
- # CEB比物理反洗更彻底,可去除部分不可逆污染
- R_after_ceb = R_peak - state.ceb_removal # CEB后的膜阻力
- tmp_after_ceb = self.tmp_from_resistance(R_after_ceb, q_UF, temp) # CEB后的TMP
- # ============================================================
- # 计算周期性能指标
- # ============================================================
- # ========== 水量平衡计算 ==========
- # 进水总量(所有小周期的过滤进水之和)
- V_feed_super = k_bw_per_ceb * state.q_UF * L_h
- # 损失水量(物理反洗 + 化学反洗)
- V_loss_super = k_bw_per_ceb * self.backwash_water_volume(t_bw_s) + self.p.v_ceb_m3
- V_net = max(0.0, V_feed_super - V_loss_super)
- # 回收率(净产水 / 进水总量)
- # 加1e-12避免除零,max确保非负
- recovery = max(0.0, V_net / max(V_feed_super, 1e-12))
- # ========== 时间与能耗计算 ==========
- # 超级周期总时长
- T_super_h = k_bw_per_ceb * (L_s + t_bw_s) / 3600.0 + self.p.t_ceb_s / 3600.0 # [小时]
- # 日均产水时间(24小时内实际产水的时间)
- daily_prod_time_h = k_bw_per_ceb * L_h / T_super_h * 24.0 # [小时]
- # 吨水电耗(从查找表获取最接近的值)
- # 从物理参数类中获取查找表
- closest_L = min(self.p.energy_lookup.keys(), key=lambda x: abs(x - L_s))
- ton_water_energy = self.p.energy_lookup[closest_L] # [kWh/m³]
- # ===== 新指标:膜阻力允许上升空间 =====
- # 该指标根据当前最大跨膜压差距离软约束跨膜压差的距离,动态计算当前周期允许上升的膜阻力值,用于后续清洗效果奖励计算
- delta_R_allow = max(
- self.resistance_from_tmp(self.p.global_TMP_soft_limit, state.q_UF, state.temp) -
- self.resistance_from_tmp(max_tmp_during_filtration, state.q_UF, state.temp),
- 1e-6
- )
- if delta_R_allow > 50:
- residual_ratio = (R_after_ceb - initial_R) / delta_R_allow
- else:
- residual_ratio = 1.0
- # ========== 构建性能指标字典 ==========
- info = {
- # 运行参数
- "q_UF": state.q_UF, # 过滤流量
- "temp": state.temp, # 水温
- # 水量指标
- "recovery": recovery, # 回收率
- "V_feed_super_m3": V_feed_super, # 进水总量
- "V_loss_super_m3": V_loss_super, # 损失水量
- "V_net_super_m3": V_net, # 净产水量
- # 时间指标
- "supercycle_time_h": T_super_h, # 超级周期时长
- "daily_prod_time_h": daily_prod_time_h, # 日均产水时间
- "k_bw_per_ceb": k_bw_per_ceb, # 小周期数量
- # TMP指标
- "max_TMP_during_filtration": max_tmp_during_filtration, # 周期内最大TMP
- "min_TMP_during_filtration": min_tmp_during_filtration, # 周期内最小TMP
- "initial_tmp": initial_tmp, # 周期初始TMP
- "tmp_after_ceb": tmp_after_ceb, # CEB后TMP
- # 膜阻力指标
- "initial_R": initial_R, # 周期初始膜阻力
- "R_after_ceb": R_after_ceb, # CEB后膜阻力
- "max_residual_increase_per_run": max_residual_increase, # 最大残余污染增量
- "delta_R_allow": delta_R_allow, # 污染允许增长空间
- "residual_ratio" : residual_ratio, # 污染上升比例
- # 能耗指标
- "ton_water_energy_kWh_per_m3": ton_water_energy, # 吨水电耗
- }
- # 更新 state
- next_state = copy.deepcopy(state)
- next_state.TMP = tmp_after_ceb
- next_state.R = R_after_ceb
- # ========== 可选更新的参数 ==========
- # 这些参数可根据实际情况动态调整,预留扩展接口
- next_state.nuK = state.nuK # 短期污染系数
- next_state.slope = state.slope # 长期污染斜率
- next_state.power = state.power # 长期污染幂次
- next_state.ceb_removal = state.ceb_removal # CEB去除能力
- next_state.q_UF = state.q_UF # 过滤流量
- next_state.temp = state.temp # 水温
- return info, next_state
- def is_dead_cycle(self, info: dict) -> bool:
- """
- 判断当前超级周期是否成功(可行)
- 功能:
- - 检查超级周期是否违反运行约束
- - 用于强化学习环境单步step的失败判定(terminated条件)
- - True表示成功,False表示失败
- 参数:
- info (dict): simulate_one_supercycle() 返回的性能指标字典
- 返回:
- bool: True表示成功周期,False表示失败周期
- 失败条件(任一满足即失败):
- 1. TMP超限:max_TMP > global_TMP_limit
- - 原因:TMP过高会损坏膜或影响产水质量
- - 阈值:0.08 MPa(可配置)
- 2. 回收率过低:recovery < 0.75
- - 原因:回收率太低说明反洗水耗过大,经济性差
- - 阈值:75%(可调整)
- 3. 残余污染累积过快:(R_after_ceb - R0) / R0 > 0.1
- - 原因:单个超级周期污染增长超过10%,长期运行不可持续
- - 阈值:10%(可调整)
- """
- # ========== 获取关键指标 ==========
- TMP_limit = self.p.global_TMP_hard_limit # TMP硬约束上限
- max_tmp = info.get("max_TMP_during_filtration", 0) # 周期内最大TMP
- recovery = info.get("recovery", 1.0) # 回收率
- R_after_ceb = info.get("R_after_ceb", 0) # CEB后膜阻力
- R0 = info.get("initial_R", 1e-6) # 初始膜阻力
- delta_R_allow = info.get("delta_R_allow", 1e-6) # 允许上升的膜阻力(加小值避免除零)
- # ========== 失败条件检查 ==========
- # 条件1:TMP超限
- if max_tmp > TMP_limit:
- return False # 失败
- # 条件2:回收率过低
- if recovery < 0.75:
- return False # 失败
- # 条件3:污染增长比例超过容许范围
- residual_increase = (R_after_ceb - R0) / delta_R_allow
- if residual_increase > 1 / 30:
- return False # 失败
- # 所有条件通过
- return True # 成功
- def check_dead_initial_state(
- self,
- init_state: UFState,
- max_steps: int = 45,
- L_s: float = 3800.0,
- t_bw_s: float = 60.0
- ) -> bool:
- """
- 判断给定初始状态在指定动作策略下,是否为物理可行(non-dead)。
- 从 init_state 出发,使用固定动作 (L_s, t_bw_s)
- 连续模拟 max_steps 个 supercycle:
- - 任意一次 is_dead_cycle(info) 为 False → 必死
- - 任意一步 TMP0 < 0 → 必死
- - 任意异常 → 必死
- 参数:
- init_state:
- 初始 UFState(不会被原地修改)
- max_steps:
- 前向模拟的最大步数
- L_s:
- 过滤时长(秒)
- t_bw_s:
- 物理反洗时长(秒)
- 返回:
- bool:
- True → 物理可行(non-dead)
- False → 必死状态
- """
- import copy
- # 使用局部副本,确保 physics 无副作用
- curr_state = copy.deepcopy(init_state)
- for step in range(max_steps):
- try:
- info, next_state = self.simulate_one_supercycle(
- curr_state,
- L_s=L_s,
- t_bw_s=t_bw_s
- )
- except Exception:
- # 任何异常都视为物理不可行
- return False
- # 单步物理可行性判定
- if not self.is_dead_cycle(info):
- return False
- # 物理硬约束:TMP 不允许为负
- if next_state.TMP < 0:
- return False
- # 进入下一步
- curr_state = next_state
- return True
|