uf_physics.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577
  1. """
  2. uf_physics_48h.py
  3. 超滤(UF)系统物理模型与确定性计算规则模块。
  4. 本模块定义:
  5. - 与强化学习算法无关的物理规律
  6. - 与环境状态更新相关的确定性计算
  7. - 超滤系统中的基础物理量转换关系
  8. 设计原则:
  9. - 不依赖 env / agent / trainer
  10. - 不包含强化学习语义
  11. - 不负责参数加载策略(由上层控制)
  12. 该模块应可被:
  13. - 强化学习环境
  14. - 离线仿真
  15. - 工艺分析脚本
  16. 独立复用。
  17. """
  18. import numpy as np
  19. import copy
  20. from env.env_params import UFState, UFPhysicsParams, UFStateBounds
  21. class UFPhysicsModel:
  22. """
  23. 超滤系统无状态物理模型(Physical Rules)
  24. 说明:
  25. - 本类不保存任何动态状态
  26. - 所有计算结果仅依赖输入参数
  27. - 表达的是“物理规律”,而不是“设备实例”
  28. """
  29. def __init__(
  30. self,
  31. phys_params: UFPhysicsParams,
  32. state_bounds: UFStateBounds,
  33. resistance_model_fp=None,
  34. resistance_model_bw=None,
  35. IS_TIMES: bool = False,
  36. ):
  37. """
  38. 参数:
  39. phys_params: 物理/工艺固定参数
  40. resistance_model_fp: 过滤阶段阻力增长模型
  41. resistance_model_bw: 反洗阶段阻力下降模型
  42. IS_TIMES: CEB是否为固定次数,T为固定次数
  43. """
  44. self.p = phys_params
  45. self.state_bounds = state_bounds
  46. self.model_fp = resistance_model_fp
  47. self.model_bw = resistance_model_bw
  48. self.IS_TIMES = IS_TIMES
  49. # ==========================================================
  50. # 水温-粘度关系
  51. # ==========================================================
  52. def viscosity(self, temp: float) -> float:
  53. """
  54. 锡山水厂水温粘度修正公式
  55. 功能:根据水温计算水的动力粘度(考虑温度影响)
  56. 参数:
  57. temp (float): 水温(摄氏度)
  58. 返回:
  59. float: 水的动力粘度 μ (Pa·s)
  60. 原理:
  61. - 水的粘度随温度升高而降低
  62. - 25℃时纯水粘度约为 0.00089 Pa·s
  63. - 本公式基于锡山水厂PLC系统的经验修正因子
  64. 注意:
  65. - 本公式基于纯水粘度修正
  66. - 实际水厂水质与纯水有差异,对粘度有一定影响
  67. - 未来可根据实际水质进一步校准
  68. """
  69. # 温度归一化(相对于300K)
  70. x = (temp + 273.15) / 300 # 摄氏度转开尔文
  71. # 温度修正因子(经验公式,基于锡山水厂PLC)
  72. factor = 890 / (
  73. 280.68 * x ** -1.9 +
  74. 511.45 * x ** -7.7 +
  75. 61.131 * x ** -19.6 +
  76. 0.45903 * x ** -40
  77. )
  78. # 计算修正后的粘度(25℃标准粘度 / 修正因子)
  79. mu = 0.00089 / factor # [Pa·s]
  80. return mu
  81. # ==========================================================
  82. # TMP → 膜阻力
  83. # ==========================================================
  84. def resistance_from_tmp(
  85. self,
  86. tmp: float,
  87. q_UF: float,
  88. temp: float
  89. ) -> float:
  90. """
  91. 由跨膜压差计算膜阻力
  92. 功能:根据 Darcy 定律,由跨膜压差反推膜阻力
  93. 参数:
  94. tmp (float): 跨膜压差 TMP (MPa)
  95. q_UF (float): 过滤流量 (m³/h)
  96. temp (float): 水温 (℃)
  97. 返回:
  98. float: 膜阻力 R(已缩放 1e10)
  99. 原理:
  100. Darcy 定律:J = TMP / (μ × R)
  101. 其中:
  102. - J: 膜通量 [m/s]
  103. - TMP: 跨膜压差 [Pa]
  104. - μ: 水的动力粘度 [Pa·s]
  105. - R: 膜阻力 [m⁻¹]
  106. 反解得:R = TMP / (J × μ)
  107. 注意:
  108. - 超滤膜阻力实际量级为 1e12 m⁻¹
  109. - 为便于数值计算,已缩放 1e10 倍至 1e2 量级
  110. """
  111. # 温度修正后的水粘度
  112. mu = self.viscosity(temp) # [Pa·s]
  113. # 膜有效面积(在参数文件中配置)
  114. A = self.p.A
  115. # 跨膜压差单位转换:MPa → Pa
  116. TMP_Pa = tmp * 1e6 # [Pa]
  117. # 计算膜通量:流量 / 面积
  118. J = q_UF / A / 3600 # [m³/h] → [m³/(m²·s)] = [m/s]
  119. # 物理约束检查:通量和粘度必须为正
  120. if J <= 0 or mu <= 0:
  121. return np.nan
  122. # 根据 Darcy 定律计算膜阻力并缩放
  123. R = TMP_Pa / (J * mu) / 1e10 # [m⁻¹] → [缩放单位]
  124. return float(R)
  125. # ==========================================================
  126. # 膜阻力 → TMP
  127. # ==========================================================
  128. def tmp_from_resistance(
  129. self,
  130. R: float,
  131. q_UF: float,
  132. temp: float
  133. ) -> float:
  134. """
  135. 由膜阻力计算跨膜压差
  136. 功能:根据 Darcy 定律,由膜阻力计算跨膜压差(_calculate_resistance 的逆运算)
  137. 参数:
  138. R (float): 膜阻力(已缩放 1e10)
  139. q_UF (float): 过滤流量 (m³/h)
  140. temp (float): 水温 (℃)
  141. 返回:
  142. float: 跨膜压差 TMP (MPa)
  143. 原理:
  144. Darcy 定律:TMP = J × μ × R
  145. 其中:
  146. - J: 膜通量 [m/s]
  147. - μ: 水的动力粘度 [Pa·s]
  148. - R: 膜阻力 [m⁻¹]
  149. """
  150. # 温度修正后的水粘度
  151. mu = self.viscosity(temp) # [Pa·s]
  152. # 膜有效面积(在参数文件中配置)
  153. A = self.p.A
  154. # 计算膜通量
  155. J = q_UF / A / 3600 # [m/s]
  156. # 根据 Darcy 定律计算跨膜压差(还原缩放)
  157. TMP_Pa = R * J * mu * 1e10 # [缩放单位] → [Pa]
  158. # 单位转换:Pa → MPa
  159. tmp = TMP_Pa / 1e6 # [MPa]
  160. return float(tmp)
  161. def delta_resistance_filter(
  162. self,
  163. state,
  164. L_s: float
  165. ) -> float:
  166. """
  167. 过滤阶段膜阻力上升量(数据驱动模型)
  168. 参数:
  169. state (UFState): 当前运行状态
  170. L_s (float): 过滤时长 [s]
  171. 返回:
  172. float: 阻力增量 ΔR
  173. """
  174. if self.model_fp is None:
  175. raise RuntimeError("过滤阶段阻力模型未注入")
  176. return float(self.model_fp(state, L_s))
  177. def delta_resistance_backwash(
  178. self,
  179. state,
  180. R0: float,
  181. R_end: float,
  182. L_h_next_start: float,
  183. t_bw_s: float
  184. ) -> float:
  185. """
  186. 物理反洗可去除的膜阻力(数据驱动模型)
  187. 返回:
  188. float: 可去除阻力
  189. """
  190. if self.model_bw is None:
  191. raise RuntimeError("反洗阶段阻力模型未注入")
  192. return float(
  193. self.model_bw(
  194. state,
  195. R0,
  196. R_end,
  197. L_h_next_start,
  198. t_bw_s
  199. )
  200. )
  201. def backwash_water_volume(self, t_bw_s: float) -> float:
  202. """
  203. 计算物理反洗水耗
  204. 参数:
  205. t_bw_s (float): 反洗时长 [s]
  206. 返回:
  207. float: 反洗水耗 [m³]
  208. """
  209. return float(self.p.q_bw_m3ph * t_bw_s / 3600.0)
  210. def simulate_one_supercycle(self, state: UFState, L_s: float, t_bw_s: float) -> UFState:
  211. """
  212. 模拟一个超级周期(Super Cycle)
  213. 返回 info 字典 + 更新后的 UFState
  214. IS_TIMES: False CEB执行是否采用次数控制,默认为 False, 采用时长控制
  215. """
  216. # ========== 初始化周期参数 ==========
  217. L_h = float(L_s) / 3600.0 # 过滤时长转换:秒 → 小时
  218. # 初始状态(统一用 TMP / R,与当前 state 保持一致)
  219. initial_tmp = state.TMP # 记录周期初始跨膜压差
  220. initial_R = state.R # 记录周期初始膜阻力
  221. tmp = initial_tmp # 当前跨膜压差
  222. # 跟踪变量(用于记录周期内的极值)
  223. max_tmp_during_filtration = tmp # 周期内最大TMP
  224. min_tmp_during_filtration = tmp # 周期内最小TMP
  225. max_residual_increase = 0.0 # 周期内最大残余污染增量
  226. # ========== 计算小周期数量 ==========
  227. # 小周期时长 = 过滤时长 + 物理反洗时长
  228. t_small_cycle_h = (L_s + t_bw_s) / 3600.0 # [小时]
  229. # 计算一个超级周期内包含多少个小周期
  230. if self.IS_TIMES: # 采用次数控制
  231. k_bw_per_ceb = self.p.T_ceb_interval_times
  232. else: # 采用时长控制
  233. # k = floor(CEB间隔时间 / 小周期时长)
  234. k_bw_per_ceb = int(np.floor(self.p.T_ceb_interval_h / t_small_cycle_h))
  235. if k_bw_per_ceb < 1:
  236. k_bw_per_ceb = 1 # 至少包含1个小周期
  237. # ========== 循环模拟每个小周期(过滤 + 物理反洗) ==========
  238. for idx in range(k_bw_per_ceb):
  239. # --- 小周期开始状态 ---
  240. tmp_run_start = tmp # 本次过滤开始时的TMP
  241. q_UF = state.q_UF # 过滤流量
  242. temp = state.temp # 水温
  243. # --- 过滤阶段:膜阻力上升 ---
  244. R_run_start = self.resistance_from_tmp(tmp_run_start, q_UF, temp) # 过滤开始时的膜阻力
  245. d_R = self.delta_resistance_filter(state, L_s) # 过滤阶段膜阻力增量
  246. R_peak = R_run_start + d_R # 过滤结束时的膜阻力(峰值)
  247. tmp_peak = self.tmp_from_resistance(R_peak, q_UF, temp) # 过滤结束时的TMP(峰值)
  248. # 更新TMP极值记录
  249. max_tmp_during_filtration = max(max_tmp_during_filtration, tmp_peak)
  250. min_tmp_during_filtration = min(min_tmp_during_filtration, tmp_run_start)
  251. # --- 物理反洗阶段:膜阻力下降 ---
  252. # 计算累积运行时间(用于长期污染模型)
  253. L_h_next_start = (L_s + t_bw_s) / 3600.0 * (idx + 1) # 下一小周期起始时间
  254. # 调用膜阻力下降模型,计算物理反洗可去除的阻力
  255. reversible_R = self.delta_resistance_backwash(state, initial_R, R_peak, L_h_next_start, t_bw_s)
  256. # 物理反洗后的膜阻力
  257. R_after_bw = R_peak - reversible_R
  258. tmp_after_bw = self.tmp_from_resistance(R_after_bw, q_UF, temp)
  259. # 计算残余污染增量(反洗后的TMP相对本次开始的增加)
  260. residual_inc = tmp_after_bw - tmp_run_start
  261. max_residual_increase = max(max_residual_increase, residual_inc)
  262. # 更新TMP(作为下一小周期的起始TMP)
  263. tmp = tmp_after_bw
  264. # ========== 化学增强反洗 (CEB) ==========
  265. # CEB比物理反洗更彻底,可去除部分不可逆污染
  266. R_after_ceb = R_peak - state.ceb_removal # CEB后的膜阻力
  267. tmp_after_ceb = self.tmp_from_resistance(R_after_ceb, q_UF, temp) # CEB后的TMP
  268. # ============================================================
  269. # 计算周期性能指标
  270. # ============================================================
  271. # ========== 水量平衡计算 ==========
  272. # 进水总量(所有小周期的过滤进水之和)
  273. V_feed_super = k_bw_per_ceb * state.q_UF * L_h
  274. # 损失水量(物理反洗 + 化学反洗)
  275. V_loss_super = k_bw_per_ceb * self.backwash_water_volume(t_bw_s) + self.p.v_ceb_m3
  276. V_net = max(0.0, V_feed_super - V_loss_super)
  277. # 回收率(净产水 / 进水总量)
  278. # 加1e-12避免除零,max确保非负
  279. recovery = max(0.0, V_net / max(V_feed_super, 1e-12))
  280. # ========== 时间与能耗计算 ==========
  281. # 超级周期总时长
  282. T_super_h = k_bw_per_ceb * (L_s + t_bw_s) / 3600.0 + self.p.t_ceb_s / 3600.0 # [小时]
  283. # 日均产水时间(24小时内实际产水的时间)
  284. daily_prod_time_h = k_bw_per_ceb * L_h / T_super_h * 24.0 # [小时]
  285. # 参考吨水电耗(从查找表获取最接近的值)
  286. # 从物理参数类中获取查找表
  287. closest_L = min(self.p.energy_lookup.keys(), key=lambda x: abs(x - L_s))
  288. refer_ton_water_energy = self.p.energy_lookup[closest_L] # [kWh/m³]
  289. # 实际吨水电耗计算
  290. # 进水时间(小时)
  291. t_feed_total_h = k_bw_per_ceb * L_h
  292. # 反洗时间(小时)
  293. t_bw_total_h = k_bw_per_ceb * t_bw_s / 3600.0
  294. # 总能耗 (kWh)
  295. E_total = t_feed_total_h * self.p.p_feed_kw+ t_bw_total_h * self.p.p_bw_kw
  296. # 吨水电耗 (kWh/吨)
  297. ton_water_energy = E_total / max(V_net, 1e-12)
  298. # 吨水药耗计算
  299. # ========== 吨水药耗计算 ==========
  300. R_removed = state.ceb_removal
  301. # 参数
  302. R_min = self.state_bounds.ceb_removal_min
  303. R_max = self.state_bounds.ceb_removal_max
  304. dose_min = self.p.dose_min
  305. dose_max = self.p.dose_max
  306. # 防止越界
  307. R_removed_clip = np.clip(R_removed, R_min, R_max)
  308. # 线性映射计算加药量
  309. dose = dose_min + (R_removed_clip - R_min) / (R_max - R_min) * (dose_max - dose_min)
  310. # 吨水药耗
  311. ton_water_chem = dose / max(V_net, 1e-12)
  312. # ===== 新指标:膜阻力允许上升空间 =====
  313. # 该指标根据当前最大跨膜压差距离软约束跨膜压差的距离,动态计算当前周期允许上升的膜阻力值,用于后续清洗效果奖励计算
  314. delta_R = R_after_ceb - initial_R
  315. delta_R_allow = max(
  316. self.resistance_from_tmp(self.p.global_TMP_hard_limit, state.q_UF, state.temp) -
  317. self.resistance_from_tmp(max_tmp_during_filtration, state.q_UF, state.temp),
  318. 1e-6
  319. )
  320. if delta_R_allow > 50:
  321. residual_ratio = max(delta_R / delta_R_allow, 0.0)
  322. else:
  323. residual_ratio = 1.0
  324. # ========== 构建性能指标字典 ==========
  325. info = {
  326. # 运行参数
  327. "q_UF": state.q_UF, # 过滤流量
  328. "temp": state.temp, # 水温
  329. # 水量指标
  330. "recovery": recovery, # 回收率
  331. "V_feed_super_m3": V_feed_super, # 进水总量
  332. "V_loss_super_m3": V_loss_super, # 损失水量
  333. "V_net_super_m3": V_net, # 净产水量
  334. # 时间指标
  335. "supercycle_time_h": T_super_h, # 超级周期时长
  336. "daily_prod_time_h": daily_prod_time_h, # 日均产水时间
  337. "k_bw_per_ceb": k_bw_per_ceb, # 小周期数量
  338. # TMP指标
  339. "max_TMP_during_filtration": max_tmp_during_filtration, # 周期内最大TMP
  340. "min_TMP_during_filtration": min_tmp_during_filtration, # 周期内最小TMP
  341. "initial_tmp": initial_tmp, # 周期初始TMP
  342. "tmp_after_ceb": tmp_after_ceb, # CEB后TMP
  343. # 膜阻力指标
  344. "initial_R": initial_R, # 周期初始膜阻力
  345. "R_after_ceb": R_after_ceb, # CEB后膜阻力
  346. "max_residual_increase_per_run": max_residual_increase, # 最大残余污染增量
  347. "delta_R_allow": delta_R_allow, # 污染允许增长空间
  348. "residual_ratio" : residual_ratio, # 污染上升比例
  349. # 能耗指标
  350. "refer_ton_water_energy": refer_ton_water_energy, # 参考吨水电耗
  351. "ton_water_energy": ton_water_energy, # 吨水电耗
  352. "ton_water_chem": ton_water_chem, # 吨水药耗
  353. }
  354. # 更新 state
  355. next_state = copy.deepcopy(state)
  356. next_state.TMP = tmp_after_ceb
  357. next_state.R = R_after_ceb
  358. # ========== 可选更新的参数 ==========
  359. # 这些参数可根据实际情况动态调整,预留扩展接口
  360. next_state.nuK = state.nuK # 短期污染系数
  361. next_state.slope = state.slope # 长期污染斜率
  362. next_state.power = state.power # 长期污染幂次
  363. next_state.ceb_removal = state.ceb_removal # CEB去除能力
  364. next_state.q_UF = state.q_UF # 过滤流量
  365. next_state.temp = state.temp # 水温
  366. return info, next_state
  367. def is_dead_cycle(self, info: dict) -> bool:
  368. """
  369. 判断当前超级周期是否成功(可行)
  370. 功能:
  371. - 检查超级周期是否违反运行约束
  372. - 用于强化学习环境单步step的失败判定(terminated条件)
  373. - True表示成功,False表示失败
  374. 参数:
  375. info (dict): simulate_one_supercycle() 返回的性能指标字典
  376. 返回:
  377. bool: True表示成功周期,False表示失败周期
  378. 失败条件(任一满足即失败):
  379. 1. TMP超限:max_TMP > global_TMP_limit
  380. - 原因:TMP过高会损坏膜或影响产水质量
  381. - 阈值:0.08 MPa(可配置)
  382. 2. 回收率过低:recovery < 0.75
  383. - 原因:回收率太低说明反洗水耗过大,经济性差
  384. - 阈值:75%(可调整)
  385. 3. 残余污染累积过快:(R_after_ceb - R0) / R0 > 0.05
  386. - 原因:单个超级周期污染增长超过5%,长期运行不可持续
  387. - 阈值:10%(可调整)
  388. """
  389. # ========== 获取关键指标 ==========
  390. TMP_limit = self.p.global_TMP_hard_limit # TMP硬约束上限
  391. max_tmp = info.get("max_TMP_during_filtration", 0) # 周期内最大TMP
  392. recovery = info.get("recovery", 1.0) # 回收率
  393. R_after_ceb = info.get("R_after_ceb", 0) # CEB后膜阻力
  394. R0 = info.get("initial_R", 1e-6) # 初始膜阻力
  395. delta_R_allow = info.get("delta_R_allow", 1e-6) # 允许上升的膜阻力(加小值避免除零)
  396. # ========== 失败条件检查 ==========
  397. # 条件1:TMP超限
  398. if max_tmp > TMP_limit:
  399. return False # 失败
  400. # 条件2:回收率过低
  401. if recovery < 0.75:
  402. return False # 失败
  403. # 条件3:污染增长比例超过容许范围
  404. residual_increase = (R_after_ceb - R0) / delta_R_allow
  405. if residual_increase > 1 / 20:
  406. return False # 失败
  407. # 所有条件通过
  408. return True # 成功
  409. def check_dead_initial_state(
  410. self,
  411. init_state: UFState,
  412. max_steps: int = 45,
  413. L_s: float = 3800.0,
  414. t_bw_s: float = 60.0
  415. ) -> bool:
  416. """
  417. 判断给定初始状态在指定动作策略下,是否为物理可行(non-dead)。
  418. 从 init_state 出发,使用固定动作 (L_s, t_bw_s)
  419. 连续模拟 max_steps 个 supercycle:
  420. - 任意一次 is_dead_cycle(info) 为 False → 必死
  421. - 任意一步 TMP0 < 0 → 必死
  422. - 任意异常 → 必死
  423. 参数:
  424. init_state:
  425. 初始 UFState(不会被原地修改)
  426. max_steps:
  427. 前向模拟的最大步数
  428. L_s:
  429. 过滤时长(秒)
  430. t_bw_s:
  431. 物理反洗时长(秒)
  432. 返回:
  433. bool:
  434. True → 物理可行(non-dead)
  435. False → 必死状态
  436. """
  437. import copy
  438. # 使用局部副本,确保 physics 无副作用
  439. curr_state = copy.deepcopy(init_state)
  440. for step in range(max_steps):
  441. try:
  442. info, next_state = self.simulate_one_supercycle(
  443. curr_state,
  444. L_s=L_s,
  445. t_bw_s=t_bw_s
  446. )
  447. except Exception:
  448. # 任何异常都视为物理不可行
  449. return False
  450. # 单步物理可行性判定
  451. if not self.is_dead_cycle(info):
  452. return False
  453. # 物理硬约束:TMP 不允许为负
  454. if next_state.TMP < 0:
  455. return False
  456. # 进入下一步
  457. curr_state = next_state
  458. return True