uf_physics.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
  1. """
  2. uf_physics_48h.py
  3. 超滤(UF)系统物理模型与确定性计算规则模块。
  4. 本模块定义:
  5. - 与强化学习算法无关的物理规律
  6. - 与环境状态更新相关的确定性计算
  7. - 超滤系统中的基础物理量转换关系
  8. 设计原则:
  9. - 不依赖 env / agent / trainer
  10. - 不包含强化学习语义
  11. - 不负责参数加载策略(由上层控制)
  12. 该模块应可被:
  13. - 强化学习环境
  14. - 离线仿真
  15. - 工艺分析脚本
  16. 独立复用。
  17. """
  18. import numpy as np
  19. import copy
  20. from uf_train.env.env_params import UFState, UFPhysicsParams
  21. class UFPhysicsModel:
  22. """
  23. 超滤系统无状态物理模型(Physical Rules)
  24. 说明:
  25. - 本类不保存任何动态状态
  26. - 所有计算结果仅依赖输入参数
  27. - 表达的是“物理规律”,而不是“设备实例”
  28. """
  29. def __init__(
  30. self,
  31. phys_params: UFPhysicsParams,
  32. resistance_model_fp=None,
  33. resistance_model_bw=None,
  34. IS_TIMES: bool = False,
  35. ):
  36. """
  37. 参数:
  38. phys_params: 物理/工艺固定参数
  39. resistance_model_fp: 过滤阶段阻力增长模型
  40. resistance_model_bw: 反洗阶段阻力下降模型
  41. IS_TIMES: CEB是否为固定次数,T为固定次数
  42. """
  43. self.p = phys_params
  44. self.model_fp = resistance_model_fp
  45. self.model_bw = resistance_model_bw
  46. self.IS_TIMES = IS_TIMES
  47. # ==========================================================
  48. # 水温-粘度关系
  49. # ==========================================================
  50. def viscosity(self, temp: float) -> float:
  51. """
  52. 锡山水厂水温粘度修正公式
  53. 功能:根据水温计算水的动力粘度(考虑温度影响)
  54. 参数:
  55. temp (float): 水温(摄氏度)
  56. 返回:
  57. float: 水的动力粘度 μ (Pa·s)
  58. 原理:
  59. - 水的粘度随温度升高而降低
  60. - 25℃时纯水粘度约为 0.00089 Pa·s
  61. - 本公式基于锡山水厂PLC系统的经验修正因子
  62. 注意:
  63. - 本公式基于纯水粘度修正
  64. - 实际水厂水质与纯水有差异,对粘度有一定影响
  65. - 未来可根据实际水质进一步校准
  66. """
  67. # 温度归一化(相对于300K)
  68. x = (temp + 273.15) / 300 # 摄氏度转开尔文
  69. # 温度修正因子(经验公式,基于锡山水厂PLC)
  70. factor = 890 / (
  71. 280.68 * x ** -1.9 +
  72. 511.45 * x ** -7.7 +
  73. 61.131 * x ** -19.6 +
  74. 0.45903 * x ** -40
  75. )
  76. # 计算修正后的粘度(25℃标准粘度 / 修正因子)
  77. mu = 0.00089 / factor # [Pa·s]
  78. return mu
  79. # ==========================================================
  80. # TMP → 膜阻力
  81. # ==========================================================
  82. def resistance_from_tmp(
  83. self,
  84. tmp: float,
  85. q_UF: float,
  86. temp: float
  87. ) -> float:
  88. """
  89. 由跨膜压差计算膜阻力
  90. 功能:根据 Darcy 定律,由跨膜压差反推膜阻力
  91. 参数:
  92. tmp (float): 跨膜压差 TMP (MPa)
  93. q_UF (float): 过滤流量 (m³/h)
  94. temp (float): 水温 (℃)
  95. 返回:
  96. float: 膜阻力 R(已缩放 1e10)
  97. 原理:
  98. Darcy 定律:J = TMP / (μ × R)
  99. 其中:
  100. - J: 膜通量 [m/s]
  101. - TMP: 跨膜压差 [Pa]
  102. - μ: 水的动力粘度 [Pa·s]
  103. - R: 膜阻力 [m⁻¹]
  104. 反解得:R = TMP / (J × μ)
  105. 注意:
  106. - 超滤膜阻力实际量级为 1e12 m⁻¹
  107. - 为便于数值计算,已缩放 1e10 倍至 1e2 量级
  108. """
  109. # 温度修正后的水粘度
  110. mu = self.viscosity(temp) # [Pa·s]
  111. # 膜有效面积(在参数文件中配置)
  112. A = self.p.A
  113. # 跨膜压差单位转换:MPa → Pa
  114. TMP_Pa = tmp * 1e6 # [Pa]
  115. # 计算膜通量:流量 / 面积
  116. J = q_UF / A / 3600 # [m³/h] → [m³/(m²·s)] = [m/s]
  117. # 物理约束检查:通量和粘度必须为正
  118. if J <= 0 or mu <= 0:
  119. return np.nan
  120. # 根据 Darcy 定律计算膜阻力并缩放
  121. R = TMP_Pa / (J * mu) / 1e10 # [m⁻¹] → [缩放单位]
  122. return float(R)
  123. # ==========================================================
  124. # 膜阻力 → TMP
  125. # ==========================================================
  126. def tmp_from_resistance(
  127. self,
  128. R: float,
  129. q_UF: float,
  130. temp: float
  131. ) -> float:
  132. """
  133. 由膜阻力计算跨膜压差
  134. 功能:根据 Darcy 定律,由膜阻力计算跨膜压差(_calculate_resistance 的逆运算)
  135. 参数:
  136. R (float): 膜阻力(已缩放 1e10)
  137. q_UF (float): 过滤流量 (m³/h)
  138. temp (float): 水温 (℃)
  139. 返回:
  140. float: 跨膜压差 TMP (MPa)
  141. 原理:
  142. Darcy 定律:TMP = J × μ × R
  143. 其中:
  144. - J: 膜通量 [m/s]
  145. - μ: 水的动力粘度 [Pa·s]
  146. - R: 膜阻力 [m⁻¹]
  147. """
  148. # 温度修正后的水粘度
  149. mu = self.viscosity(temp) # [Pa·s]
  150. # 膜有效面积(在参数文件中配置)
  151. A = self.p.A
  152. # 计算膜通量
  153. J = q_UF / A / 3600 # [m/s]
  154. # 根据 Darcy 定律计算跨膜压差(还原缩放)
  155. TMP_Pa = R * J * mu * 1e10 # [缩放单位] → [Pa]
  156. # 单位转换:Pa → MPa
  157. tmp = TMP_Pa / 1e6 # [MPa]
  158. return float(tmp)
  159. def delta_resistance_filter(
  160. self,
  161. state,
  162. L_s: float
  163. ) -> float:
  164. """
  165. 过滤阶段膜阻力上升量(数据驱动模型)
  166. 参数:
  167. state (UFState): 当前运行状态
  168. L_s (float): 过滤时长 [s]
  169. 返回:
  170. float: 阻力增量 ΔR
  171. """
  172. if self.model_fp is None:
  173. raise RuntimeError("过滤阶段阻力模型未注入")
  174. return float(self.model_fp(state, L_s))
  175. def delta_resistance_backwash(
  176. self,
  177. state,
  178. R0: float,
  179. R_end: float,
  180. L_h_next_start: float,
  181. t_bw_s: float
  182. ) -> float:
  183. """
  184. 物理反洗可去除的膜阻力(数据驱动模型)
  185. 返回:
  186. float: 可去除阻力
  187. """
  188. if self.model_bw is None:
  189. raise RuntimeError("反洗阶段阻力模型未注入")
  190. return float(
  191. self.model_bw(
  192. state,
  193. R0,
  194. R_end,
  195. L_h_next_start,
  196. t_bw_s
  197. )
  198. )
  199. def backwash_water_volume(self, t_bw_s: float) -> float:
  200. """
  201. 计算物理反洗水耗
  202. 参数:
  203. t_bw_s (float): 反洗时长 [s]
  204. 返回:
  205. float: 反洗水耗 [m³]
  206. """
  207. return float(self.p.q_bw_m3ph * t_bw_s / 3600.0)
  208. def simulate_one_supercycle(self, state: UFState, L_s: float, t_bw_s: float) -> UFState:
  209. """
  210. 模拟一个超级周期(Super Cycle)
  211. 返回 info 字典 + 更新后的 UFState
  212. IS_TIMES: False CEB执行是否采用次数控制,默认为 False, 采用时长控制
  213. """
  214. # ========== 初始化周期参数 ==========
  215. L_h = float(L_s) / 3600.0 # 过滤时长转换:秒 → 小时
  216. # 初始状态(统一用 TMP / R,与当前 state 保持一致)
  217. initial_tmp = state.TMP # 记录周期初始跨膜压差
  218. initial_R = state.R # 记录周期初始膜阻力
  219. tmp = initial_tmp # 当前跨膜压差
  220. # 跟踪变量(用于记录周期内的极值)
  221. max_tmp_during_filtration = tmp # 周期内最大TMP
  222. min_tmp_during_filtration = tmp # 周期内最小TMP
  223. max_residual_increase = 0.0 # 周期内最大残余污染增量
  224. # ========== 计算小周期数量 ==========
  225. # 小周期时长 = 过滤时长 + 物理反洗时长
  226. t_small_cycle_h = (L_s + t_bw_s) / 3600.0 # [小时]
  227. # 计算一个超级周期内包含多少个小周期
  228. if self.IS_TIMES: # 采用次数控制
  229. k_bw_per_ceb = self.p.T_ceb_interval_times
  230. else: # 采用时长控制
  231. # k = floor(CEB间隔时间 / 小周期时长)
  232. k_bw_per_ceb = int(np.floor(self.p.T_ceb_interval_h / t_small_cycle_h))
  233. if k_bw_per_ceb < 1:
  234. k_bw_per_ceb = 1 # 至少包含1个小周期
  235. # ========== 循环模拟每个小周期(过滤 + 物理反洗) ==========
  236. for idx in range(k_bw_per_ceb):
  237. # --- 小周期开始状态 ---
  238. tmp_run_start = tmp # 本次过滤开始时的TMP
  239. q_UF = state.q_UF # 过滤流量
  240. temp = state.temp # 水温
  241. # --- 过滤阶段:膜阻力上升 ---
  242. R_run_start = self.resistance_from_tmp(tmp_run_start, q_UF, temp) # 过滤开始时的膜阻力
  243. d_R = self.delta_resistance_filter(state, L_s) # 过滤阶段膜阻力增量
  244. R_peak = R_run_start + d_R # 过滤结束时的膜阻力(峰值)
  245. tmp_peak = self.tmp_from_resistance(R_peak, q_UF, temp) # 过滤结束时的TMP(峰值)
  246. # 更新TMP极值记录
  247. max_tmp_during_filtration = max(max_tmp_during_filtration, tmp_peak)
  248. min_tmp_during_filtration = min(min_tmp_during_filtration, tmp_run_start)
  249. # --- 物理反洗阶段:膜阻力下降 ---
  250. # 计算累积运行时间(用于长期污染模型)
  251. L_h_next_start = (L_s + t_bw_s) / 3600.0 * (idx + 1) # 下一小周期起始时间
  252. # 调用膜阻力下降模型,计算物理反洗可去除的阻力
  253. reversible_R = self.delta_resistance_backwash(state, initial_R, R_peak, L_h_next_start, t_bw_s)
  254. # 物理反洗后的膜阻力
  255. R_after_bw = R_peak - reversible_R
  256. tmp_after_bw = self.tmp_from_resistance(R_after_bw, q_UF, temp)
  257. # 计算残余污染增量(反洗后的TMP相对本次开始的增加)
  258. residual_inc = tmp_after_bw - tmp_run_start
  259. max_residual_increase = max(max_residual_increase, residual_inc)
  260. # 更新TMP(作为下一小周期的起始TMP)
  261. tmp = tmp_after_bw
  262. # ========== 化学增强反洗 (CEB) ==========
  263. # CEB比物理反洗更彻底,可去除部分不可逆污染
  264. R_after_ceb = R_peak - state.ceb_removal # CEB后的膜阻力
  265. tmp_after_ceb = self.tmp_from_resistance(R_after_ceb, q_UF, temp) # CEB后的TMP
  266. # ============================================================
  267. # 计算周期性能指标
  268. # ============================================================
  269. # ========== 水量平衡计算 ==========
  270. # 进水总量(所有小周期的过滤进水之和)
  271. V_feed_super = k_bw_per_ceb * state.q_UF * L_h
  272. # 损失水量(物理反洗 + 化学反洗)
  273. V_loss_super = k_bw_per_ceb * self.backwash_water_volume(t_bw_s) + self.p.v_ceb_m3
  274. V_net = max(0.0, V_feed_super - V_loss_super)
  275. # 回收率(净产水 / 进水总量)
  276. # 加1e-12避免除零,max确保非负
  277. recovery = max(0.0, V_net / max(V_feed_super, 1e-12))
  278. # ========== 时间与能耗计算 ==========
  279. # 超级周期总时长
  280. T_super_h = k_bw_per_ceb * (L_s + t_bw_s) / 3600.0 + self.p.t_ceb_s / 3600.0 # [小时]
  281. # 日均产水时间(24小时内实际产水的时间)
  282. daily_prod_time_h = k_bw_per_ceb * L_h / T_super_h * 24.0 # [小时]
  283. # 吨水电耗(从查找表获取最接近的值)
  284. # 从物理参数类中获取查找表
  285. closest_L = min(self.p.energy_lookup.keys(), key=lambda x: abs(x - L_s))
  286. ton_water_energy = self.p.energy_lookup[closest_L] # [kWh/m³]
  287. # ===== 新指标:膜阻力允许上升空间 =====
  288. # 该指标根据当前最大跨膜压差距离软约束跨膜压差的距离,动态计算当前周期允许上升的膜阻力值,用于后续清洗效果奖励计算
  289. delta_R_allow = max(
  290. self.resistance_from_tmp(self.p.global_TMP_soft_limit, state.q_UF, state.temp) -
  291. self.resistance_from_tmp(max_tmp_during_filtration, state.q_UF, state.temp),
  292. 1e-6
  293. )
  294. if delta_R_allow > 50:
  295. residual_ratio = (R_after_ceb - initial_R) / delta_R_allow
  296. else:
  297. residual_ratio = 1.0
  298. # ========== 构建性能指标字典 ==========
  299. info = {
  300. # 运行参数
  301. "q_UF": state.q_UF, # 过滤流量
  302. "temp": state.temp, # 水温
  303. # 水量指标
  304. "recovery": recovery, # 回收率
  305. "V_feed_super_m3": V_feed_super, # 进水总量
  306. "V_loss_super_m3": V_loss_super, # 损失水量
  307. "V_net_super_m3": V_net, # 净产水量
  308. # 时间指标
  309. "supercycle_time_h": T_super_h, # 超级周期时长
  310. "daily_prod_time_h": daily_prod_time_h, # 日均产水时间
  311. "k_bw_per_ceb": k_bw_per_ceb, # 小周期数量
  312. # TMP指标
  313. "max_TMP_during_filtration": max_tmp_during_filtration, # 周期内最大TMP
  314. "min_TMP_during_filtration": min_tmp_during_filtration, # 周期内最小TMP
  315. "initial_tmp": initial_tmp, # 周期初始TMP
  316. "tmp_after_ceb": tmp_after_ceb, # CEB后TMP
  317. # 膜阻力指标
  318. "initial_R": initial_R, # 周期初始膜阻力
  319. "R_after_ceb": R_after_ceb, # CEB后膜阻力
  320. "max_residual_increase_per_run": max_residual_increase, # 最大残余污染增量
  321. "delta_R_allow": delta_R_allow, # 污染允许增长空间
  322. "residual_ratio" : residual_ratio, # 污染上升比例
  323. # 能耗指标
  324. "ton_water_energy_kWh_per_m3": ton_water_energy, # 吨水电耗
  325. }
  326. # 更新 state
  327. next_state = copy.deepcopy(state)
  328. next_state.TMP = tmp_after_ceb
  329. next_state.R = R_after_ceb
  330. # ========== 可选更新的参数 ==========
  331. # 这些参数可根据实际情况动态调整,预留扩展接口
  332. next_state.nuK = state.nuK # 短期污染系数
  333. next_state.slope = state.slope # 长期污染斜率
  334. next_state.power = state.power # 长期污染幂次
  335. next_state.ceb_removal = state.ceb_removal # CEB去除能力
  336. next_state.q_UF = state.q_UF # 过滤流量
  337. next_state.temp = state.temp # 水温
  338. return info, next_state
  339. def is_dead_cycle(self, info: dict) -> bool:
  340. """
  341. 判断当前超级周期是否成功(可行)
  342. 功能:
  343. - 检查超级周期是否违反运行约束
  344. - 用于强化学习环境单步step的失败判定(terminated条件)
  345. - True表示成功,False表示失败
  346. 参数:
  347. info (dict): simulate_one_supercycle() 返回的性能指标字典
  348. 返回:
  349. bool: True表示成功周期,False表示失败周期
  350. 失败条件(任一满足即失败):
  351. 1. TMP超限:max_TMP > global_TMP_limit
  352. - 原因:TMP过高会损坏膜或影响产水质量
  353. - 阈值:0.08 MPa(可配置)
  354. 2. 回收率过低:recovery < 0.75
  355. - 原因:回收率太低说明反洗水耗过大,经济性差
  356. - 阈值:75%(可调整)
  357. 3. 残余污染累积过快:(R_after_ceb - R0) / R0 > 0.1
  358. - 原因:单个超级周期污染增长超过10%,长期运行不可持续
  359. - 阈值:10%(可调整)
  360. """
  361. # ========== 获取关键指标 ==========
  362. TMP_limit = self.p.global_TMP_hard_limit # TMP硬约束上限
  363. max_tmp = info.get("max_TMP_during_filtration", 0) # 周期内最大TMP
  364. recovery = info.get("recovery", 1.0) # 回收率
  365. R_after_ceb = info.get("R_after_ceb", 0) # CEB后膜阻力
  366. R0 = info.get("initial_R", 1e-6) # 初始膜阻力
  367. delta_R_allow = info.get("delta_R_allow", 1e-6) # 允许上升的膜阻力(加小值避免除零)
  368. # ========== 失败条件检查 ==========
  369. # 条件1:TMP超限
  370. if max_tmp > TMP_limit:
  371. return False # 失败
  372. # 条件2:回收率过低
  373. if recovery < 0.75:
  374. return False # 失败
  375. # 条件3:污染增长比例超过容许范围
  376. residual_increase = (R_after_ceb - R0) / delta_R_allow
  377. if residual_increase > 1 / 30:
  378. return False # 失败
  379. # 所有条件通过
  380. return True # 成功
  381. def check_dead_initial_state(
  382. self,
  383. init_state: UFState,
  384. max_steps: int = 45,
  385. L_s: float = 3800.0,
  386. t_bw_s: float = 60.0
  387. ) -> bool:
  388. """
  389. 判断给定初始状态在指定动作策略下,是否为物理可行(non-dead)。
  390. 从 init_state 出发,使用固定动作 (L_s, t_bw_s)
  391. 连续模拟 max_steps 个 supercycle:
  392. - 任意一次 is_dead_cycle(info) 为 False → 必死
  393. - 任意一步 TMP0 < 0 → 必死
  394. - 任意异常 → 必死
  395. 参数:
  396. init_state:
  397. 初始 UFState(不会被原地修改)
  398. max_steps:
  399. 前向模拟的最大步数
  400. L_s:
  401. 过滤时长(秒)
  402. t_bw_s:
  403. 物理反洗时长(秒)
  404. 返回:
  405. bool:
  406. True → 物理可行(non-dead)
  407. False → 必死状态
  408. """
  409. import copy
  410. # 使用局部副本,确保 physics 无副作用
  411. curr_state = copy.deepcopy(init_state)
  412. for step in range(max_steps):
  413. try:
  414. info, next_state = self.simulate_one_supercycle(
  415. curr_state,
  416. L_s=L_s,
  417. t_bw_s=t_bw_s
  418. )
  419. except Exception:
  420. # 任何异常都视为物理不可行
  421. return False
  422. # 单步物理可行性判定
  423. if not self.is_dead_cycle(info):
  424. return False
  425. # 物理硬约束:TMP 不允许为负
  426. if next_state.TMP < 0:
  427. return False
  428. # 进入下一步
  429. curr_state = next_state
  430. return True