uf_physics.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527
  1. """
  2. uf_physics.py
  3. 超滤(UF)系统物理模型与确定性计算规则模块。
  4. 本模块定义:
  5. - 与强化学习算法无关的物理规律
  6. - 与环境状态更新相关的确定性计算
  7. - 超滤系统中的基础物理量转换关系
  8. 设计原则:
  9. - 不依赖 env / agent / trainer
  10. - 不包含强化学习语义
  11. - 不负责参数加载策略(由上层控制)
  12. 该模块应可被:
  13. - 强化学习环境
  14. - 离线仿真
  15. - 工艺分析脚本
  16. 独立复用。
  17. """
  18. import numpy as np
  19. import copy
  20. from uf_train.env.env_params import UFState, UFPhysicsParams
  21. class UFPhysicsModel:
  22. """
  23. 超滤系统无状态物理模型(Physical Rules)
  24. 说明:
  25. - 本类不保存任何动态状态
  26. - 所有计算结果仅依赖输入参数
  27. - 表达的是“物理规律”,而不是“设备实例”
  28. """
  29. def __init__(
  30. self,
  31. phys_params: UFPhysicsParams,
  32. resistance_model_fp=None,
  33. resistance_model_bw=None,
  34. ):
  35. """
  36. 参数:
  37. phys_params: 物理/工艺固定参数
  38. resistance_model_fp: 过滤阶段阻力增长模型
  39. resistance_model_bw: 反洗阶段阻力下降模型
  40. """
  41. self.p = phys_params
  42. self.model_fp = resistance_model_fp
  43. self.model_bw = resistance_model_bw
  44. # ==========================================================
  45. # 水温-粘度关系
  46. # ==========================================================
  47. def viscosity(self, temp: float) -> float:
  48. """
  49. 锡山水厂水温粘度修正公式
  50. 功能:根据水温计算水的动力粘度(考虑温度影响)
  51. 参数:
  52. temp (float): 水温(摄氏度)
  53. 返回:
  54. float: 水的动力粘度 μ (Pa·s)
  55. 原理:
  56. - 水的粘度随温度升高而降低
  57. - 25℃时纯水粘度约为 0.00089 Pa·s
  58. - 本公式基于锡山水厂PLC系统的经验修正因子
  59. 注意:
  60. - 本公式基于纯水粘度修正
  61. - 实际水厂水质与纯水有差异,对粘度有一定影响
  62. - 未来可根据实际水质进一步校准
  63. """
  64. # 温度归一化(相对于300K)
  65. x = (temp + 273.15) / 300 # 摄氏度转开尔文
  66. # 温度修正因子(经验公式,基于锡山水厂PLC)
  67. factor = 890 / (
  68. 280.68 * x ** -1.9 +
  69. 511.45 * x ** -7.7 +
  70. 61.131 * x ** -19.6 +
  71. 0.45903 * x ** -40
  72. )
  73. # 计算修正后的粘度(25℃标准粘度 / 修正因子)
  74. mu = 0.00089 / factor # [Pa·s]
  75. return mu
  76. # ==========================================================
  77. # TMP → 膜阻力
  78. # ==========================================================
  79. def resistance_from_tmp(
  80. self,
  81. tmp: float,
  82. q_UF: float,
  83. temp: float
  84. ) -> float:
  85. """
  86. 由跨膜压差计算膜阻力
  87. 功能:根据 Darcy 定律,由跨膜压差反推膜阻力
  88. 参数:
  89. tmp (float): 跨膜压差 TMP (MPa)
  90. q_UF (float): 过滤流量 (m³/h)
  91. temp (float): 水温 (℃)
  92. 返回:
  93. float: 膜阻力 R(已缩放 1e10)
  94. 原理:
  95. Darcy 定律:J = TMP / (μ × R)
  96. 其中:
  97. - J: 膜通量 [m/s]
  98. - TMP: 跨膜压差 [Pa]
  99. - μ: 水的动力粘度 [Pa·s]
  100. - R: 膜阻力 [m⁻¹]
  101. 反解得:R = TMP / (J × μ)
  102. 注意:
  103. - 超滤膜阻力实际量级为 1e12 m⁻¹
  104. - 为便于数值计算,已缩放 1e10 倍至 1e2 量级
  105. """
  106. # 温度修正后的水粘度
  107. mu = self.viscosity(temp) # [Pa·s]
  108. # 膜有效面积(在参数文件中配置)
  109. A = self.p.A
  110. # 跨膜压差单位转换:MPa → Pa
  111. TMP_Pa = tmp * 1e6 # [Pa]
  112. # 计算膜通量:流量 / 面积
  113. J = q_UF / A / 3600 # [m³/h] → [m³/(m²·s)] = [m/s]
  114. # 物理约束检查:通量和粘度必须为正
  115. if J <= 0 or mu <= 0:
  116. return np.nan
  117. # 根据 Darcy 定律计算膜阻力并缩放
  118. R = TMP_Pa / (J * mu) / 1e10 # [m⁻¹] → [缩放单位]
  119. return float(R)
  120. # ==========================================================
  121. # 膜阻力 → TMP
  122. # ==========================================================
  123. def tmp_from_resistance(
  124. self,
  125. R: float,
  126. q_UF: float,
  127. temp: float
  128. ) -> float:
  129. """
  130. 由膜阻力计算跨膜压差
  131. 功能:根据 Darcy 定律,由膜阻力计算跨膜压差(_calculate_resistance 的逆运算)
  132. 参数:
  133. R (float): 膜阻力(已缩放 1e10)
  134. q_UF (float): 过滤流量 (m³/h)
  135. temp (float): 水温 (℃)
  136. 返回:
  137. float: 跨膜压差 TMP (MPa)
  138. 原理:
  139. Darcy 定律:TMP = J × μ × R
  140. 其中:
  141. - J: 膜通量 [m/s]
  142. - μ: 水的动力粘度 [Pa·s]
  143. - R: 膜阻力 [m⁻¹]
  144. """
  145. # 温度修正后的水粘度
  146. mu = self.viscosity(temp) # [Pa·s]
  147. # 膜有效面积(在参数文件中配置)
  148. A = self.p.A
  149. # 计算膜通量
  150. J = q_UF / A / 3600 # [m/s]
  151. # 根据 Darcy 定律计算跨膜压差(还原缩放)
  152. TMP_Pa = R * J * mu * 1e10 # [缩放单位] → [Pa]
  153. # 单位转换:Pa → MPa
  154. tmp = TMP_Pa / 1e6 # [MPa]
  155. return float(tmp)
  156. def delta_resistance_filter(
  157. self,
  158. state,
  159. L_s: float
  160. ) -> float:
  161. """
  162. 过滤阶段膜阻力上升量(数据驱动模型)
  163. 参数:
  164. state (UFState): 当前运行状态
  165. L_s (float): 过滤时长 [s]
  166. 返回:
  167. float: 阻力增量 ΔR
  168. """
  169. if self.model_fp is None:
  170. raise RuntimeError("过滤阶段阻力模型未注入")
  171. return float(self.model_fp(state, L_s))
  172. def delta_resistance_backwash(
  173. self,
  174. state,
  175. R0: float,
  176. R_end: float,
  177. L_h_next_start: float,
  178. t_bw_s: float
  179. ) -> float:
  180. """
  181. 物理反洗可去除的膜阻力(数据驱动模型)
  182. 返回:
  183. float: 可去除阻力
  184. """
  185. if self.model_bw is None:
  186. raise RuntimeError("反洗阶段阻力模型未注入")
  187. return float(
  188. self.model_bw(
  189. state,
  190. R0,
  191. R_end,
  192. L_h_next_start,
  193. t_bw_s
  194. )
  195. )
  196. def backwash_water_volume(self, t_bw_s: float) -> float:
  197. """
  198. 计算物理反洗水耗
  199. 参数:
  200. t_bw_s (float): 反洗时长 [s]
  201. 返回:
  202. float: 反洗水耗 [m³]
  203. """
  204. return float(self.p.q_bw_m3ph * t_bw_s / 3600.0)
  205. def simulate_one_supercycle(self, state: UFState, L_s: float, t_bw_s: float):
  206. """
  207. 模拟一个超级周期(Super Cycle)
  208. 返回 info 字典 + 更新后的 UFState
  209. """
  210. # ========== 初始化周期参数 ==========
  211. L_h = float(L_s) / 3600.0 # 过滤时长转换:秒 → 小时
  212. # 初始状态(统一用 TMP / R,与当前 state 保持一致)
  213. initial_tmp = state.TMP # 记录周期初始跨膜压差
  214. initial_R = state.R # 记录周期初始膜阻力
  215. tmp = initial_tmp # 当前跨膜压差
  216. # 跟踪变量(用于记录周期内的极值)
  217. max_tmp_during_filtration = tmp # 周期内最大TMP
  218. min_tmp_during_filtration = tmp # 周期内最小TMP
  219. max_residual_increase = 0.0 # 周期内最大残余污染增量
  220. # ========== 计算小周期数量 ==========
  221. # 小周期时长 = 过滤时长 + 物理反洗时长
  222. t_small_cycle_h = (L_s + t_bw_s) / 3600.0 # [小时]
  223. # 计算一个超级周期内包含多少个小周期
  224. # k = floor(CEB间隔时间 / 小周期时长)
  225. k_bw_per_ceb = int(np.floor(self.p.T_ceb_interval_h / t_small_cycle_h))
  226. if k_bw_per_ceb < 1:
  227. k_bw_per_ceb = 1 # 至少包含1个小周期
  228. # ========== 循环模拟每个小周期(过滤 + 物理反洗) ==========
  229. for idx in range(k_bw_per_ceb):
  230. # --- 小周期开始状态 ---
  231. tmp_run_start = tmp # 本次过滤开始时的TMP
  232. q_UF = state.q_UF # 过滤流量
  233. temp = state.temp # 水温
  234. # --- 过滤阶段:膜阻力上升 ---
  235. R_run_start = self.resistance_from_tmp(tmp_run_start, q_UF, temp) # 过滤开始时的膜阻力
  236. d_R = self.delta_resistance_filter(state, L_s) # 过滤阶段膜阻力增量
  237. R_peak = R_run_start + d_R # 过滤结束时的膜阻力(峰值)
  238. tmp_peak = self.tmp_from_resistance(R_peak, q_UF, temp) # 过滤结束时的TMP(峰值)
  239. # 更新TMP极值记录
  240. max_tmp_during_filtration = max(max_tmp_during_filtration, tmp_peak)
  241. min_tmp_during_filtration = min(min_tmp_during_filtration, tmp_run_start)
  242. # --- 物理反洗阶段:膜阻力下降 ---
  243. # 计算累积运行时间(用于长期污染模型)
  244. L_h_next_start = (L_s + t_bw_s) / 3600.0 * (idx + 1) # 下一小周期起始时间
  245. # 调用膜阻力下降模型,计算物理反洗可去除的阻力
  246. reversible_R = self.delta_resistance_backwash(state, initial_R, R_peak, L_h_next_start, t_bw_s)
  247. # 物理反洗后的膜阻力
  248. R_after_bw = R_peak - reversible_R
  249. tmp_after_bw = self.tmp_from_resistance(R_after_bw, q_UF, temp)
  250. # 计算残余污染增量(反洗后的TMP相对本次开始的增加)
  251. residual_inc = tmp_after_bw - tmp_run_start
  252. max_residual_increase = max(max_residual_increase, residual_inc)
  253. # 更新TMP(作为下一小周期的起始TMP)
  254. tmp = tmp_after_bw
  255. # ========== 化学增强反洗 (CEB) ==========
  256. # CEB比物理反洗更彻底,可去除部分不可逆污染
  257. R_after_ceb = R_peak - state.ceb_removal # CEB后的膜阻力
  258. tmp_after_ceb = self.tmp_from_resistance(R_after_ceb, q_UF, temp) # CEB后的TMP
  259. # ============================================================
  260. # 计算周期性能指标
  261. # ============================================================
  262. # ========== 水量平衡计算 ==========
  263. # 进水总量(所有小周期的过滤进水之和)
  264. V_feed_super = k_bw_per_ceb * state.q_UF * L_h
  265. # 损失水量(物理反洗 + 化学反洗)
  266. V_loss_super = k_bw_per_ceb * self.backwash_water_volume(t_bw_s) + self.p.v_ceb_m3
  267. V_net = max(0.0, V_feed_super - V_loss_super)
  268. # 回收率(净产水 / 进水总量)
  269. # 加1e-12避免除零,max确保非负
  270. recovery = max(0.0, V_net / max(V_feed_super, 1e-12))
  271. # ========== 时间与能耗计算 ==========
  272. # 超级周期总时长
  273. T_super_h = k_bw_per_ceb * (L_s + t_bw_s) / 3600.0 + self.p.t_ceb_s / 3600.0 # [小时]
  274. # 日均产水时间(24小时内实际产水的时间)
  275. daily_prod_time_h = k_bw_per_ceb * L_h / T_super_h * 24.0 # [小时]
  276. # 吨水电耗(从查找表获取最接近的值)
  277. # 从物理参数类中获取查找表
  278. closest_L = min(self.p.energy_lookup.keys(), key=lambda x: abs(x - L_s))
  279. ton_water_energy = self.p.energy_lookup[closest_L] # [kWh/m³]
  280. # ===== 新指标:膜阻力允许上升空间 =====
  281. # 该指标根据当前最大跨膜压差距离软约束跨膜压差的距离,动态计算当前周期允许上升的膜阻力值,用于后续清洗效果奖励计算
  282. delta_R_allow = max(
  283. self.resistance_from_tmp(self.p.global_TMP_soft_limit, state.q_UF, state.temp) -
  284. self.resistance_from_tmp(max_tmp_during_filtration, state.q_UF, state.temp),
  285. 1e-6
  286. )
  287. # ========== 构建性能指标字典 ==========
  288. info = {
  289. # 运行参数
  290. "q_UF": state.q_UF, # 过滤流量
  291. "temp": state.temp, # 水温
  292. # 水量指标
  293. "recovery": recovery, # 回收率
  294. "V_feed_super_m3": V_feed_super, # 进水总量
  295. "V_loss_super_m3": V_loss_super, # 损失水量
  296. "V_net_super_m3": V_net, # 净产水量
  297. # 时间指标
  298. "supercycle_time_h": T_super_h, # 超级周期时长
  299. "daily_prod_time_h": daily_prod_time_h, # 日均产水时间
  300. "k_bw_per_ceb": k_bw_per_ceb, # 小周期数量
  301. # TMP指标
  302. "max_TMP_during_filtration": max_tmp_during_filtration, # 周期内最大TMP
  303. "min_TMP_during_filtration": min_tmp_during_filtration, # 周期内最小TMP
  304. "initial_tmp": initial_tmp, # 周期初始TMP
  305. "tmp_after_ceb": tmp_after_ceb, # CEB后TMP
  306. # 膜阻力指标
  307. "initial_R": initial_R, # 周期初始膜阻力
  308. "R_after_ceb": R_after_ceb, # CEB后膜阻力
  309. "max_residual_increase_per_run": max_residual_increase, # 最大残余污染增量
  310. "delta_R_allow": delta_R_allow, # 污染允许增长空间
  311. # 能耗指标
  312. "ton_water_energy_kWh_per_m3": ton_water_energy, # 吨水电耗
  313. }
  314. # 更新 state
  315. next_state = copy.deepcopy(state)
  316. next_state.TMP = tmp_after_ceb
  317. next_state.R = R_after_ceb
  318. # ========== 可选更新的参数(当前保持不变) ==========
  319. # 这些参数可根据实际情况动态调整,预留扩展接口
  320. next_state.slope = state.slope # 长期污染斜率
  321. next_state.power = state.power # 长期污染幂次
  322. next_state.ceb_removal = state.ceb_removal # CEB去除能力
  323. next_state.nuK = state.nuK # 短期污染系数
  324. next_state.q_UF = state.q_UF # 过滤流量
  325. next_state.temp = state.temp # 水温
  326. return info, next_state
  327. def is_dead_cycle(self, info: dict) -> bool:
  328. """
  329. 判断当前超级周期是否成功(可行)
  330. 功能:
  331. - 检查超级周期是否违反运行约束
  332. - 用于强化学习环境单步step的失败判定(terminated条件)
  333. - True表示成功,False表示失败
  334. 参数:
  335. info (dict): simulate_one_supercycle() 返回的性能指标字典
  336. 返回:
  337. bool: True表示成功周期,False表示失败周期
  338. 失败条件(任一满足即失败):
  339. 1. TMP超限:max_TMP > global_TMP_limit
  340. - 原因:TMP过高会损坏膜或影响产水质量
  341. - 阈值:0.08 MPa(可配置)
  342. 2. 回收率过低:recovery < 0.75
  343. - 原因:回收率太低说明反洗水耗过大,经济性差
  344. - 阈值:75%(可调整)
  345. 3. 残余污染累积过快:(R_after_ceb - R0) / R0 > 0.1
  346. - 原因:单个超级周期污染增长超过10%,长期运行不可持续
  347. - 阈值:10%(可调整)
  348. """
  349. # ========== 获取关键指标 ==========
  350. TMP_limit = self.p.global_TMP_hard_limit # TMP硬约束上限
  351. max_tmp = info.get("max_TMP_during_filtration", 0) # 周期内最大TMP
  352. recovery = info.get("recovery", 1.0) # 回收率
  353. R_after_ceb = info.get("R_after_ceb", 0) # CEB后膜阻力
  354. R0 = info.get("initial_R", 1e-6) # 初始膜阻力
  355. delta_R_allow = info.get("delta_R_allow", 1e-6) # 允许上升的膜阻力(加小值避免除零)
  356. # ========== 失败条件检查 ==========
  357. # 条件1:TMP超限
  358. if max_tmp > TMP_limit:
  359. return False # 失败
  360. # 条件2:回收率过低
  361. if recovery < 0.75:
  362. return False # 失败
  363. # 条件3:污染增长比例超过容许范围
  364. residual_increase = (R_after_ceb - R0) / delta_R_allow
  365. if residual_increase > 1 / 45:
  366. return False # 失败
  367. # 所有条件通过
  368. return True # 成功
  369. def check_dead_initial_state(
  370. self,
  371. init_state: UFState,
  372. max_steps: int = 15,
  373. L_s: float = 3800.0,
  374. t_bw_s: float = 60.0
  375. ) -> bool:
  376. """
  377. 判断给定初始状态在指定动作策略下,是否为物理可行(non-dead)。
  378. 从 init_state 出发,使用固定动作 (L_s, t_bw_s)
  379. 连续模拟 max_steps 个 supercycle:
  380. - 任意一次 is_dead_cycle(info) 为 False → 必死
  381. - 任意一步 TMP0 < 0 → 必死
  382. - 任意异常 → 必死
  383. 参数:
  384. init_state:
  385. 初始 UFState(不会被原地修改)
  386. max_steps:
  387. 前向模拟的最大步数
  388. L_s:
  389. 过滤时长(秒)
  390. t_bw_s:
  391. 物理反洗时长(秒)
  392. 返回:
  393. bool:
  394. True → 物理可行(non-dead)
  395. False → 必死状态
  396. """
  397. import copy
  398. # 使用局部副本,确保 physics 无副作用
  399. curr_state = copy.deepcopy(init_state)
  400. for step in range(max_steps):
  401. try:
  402. info, next_state = self.simulate_one_supercycle(
  403. curr_state,
  404. L_s=L_s,
  405. t_bw_s=t_bw_s
  406. )
  407. except Exception:
  408. # 任何异常都视为物理不可行
  409. return False
  410. # 单步物理可行性判定
  411. if not self.is_dead_cycle(info):
  412. return False
  413. # 物理硬约束:TMP 不允许为负
  414. if next_state.TMP < 0:
  415. return False
  416. # 进入下一步
  417. curr_state = next_state
  418. return True