calculate.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. from typing import List
  2. import numpy as np
  3. import pandas as pd
  4. class UFResistanceCalculator:
  5. """针对单个段或段列表计算膜通量、TMP、粘度和膜阻力"""
  6. def __init__(self, units: list, area_m2, scale_factor):
  7. self.units = units
  8. # 处理面积
  9. self.A = area_m2
  10. if isinstance(self.A, str):
  11. try:
  12. self.A = float(eval(self.A)) # 将 "128*40" -> 5120.0
  13. except Exception as e:
  14. print(f"无法解析面积 self.A: {self.A}, 错误: {e}")
  15. # 处理 scale_factor
  16. self.scale_factor = scale_factor
  17. if isinstance(self.scale_factor, str):
  18. try:
  19. self.scale_factor = float(eval(self.scale_factor))
  20. except Exception as e:
  21. print(f"无法解析 scale_factor: {self.scale_factor}, 错误: {e}")
  22. @staticmethod
  23. def xishan_viscosity(T_celsius):
  24. """锡山水厂水温校正粘度,单位 Pa·s"""
  25. x = (T_celsius + 273.15) / 300
  26. factor = 890 / (
  27. 280.68 * x ** -1.9 +
  28. 511.45 * x ** -7.7 +
  29. 61.131 * x ** -19.6 +
  30. 0.45903 * x ** -40
  31. )
  32. mu = 0.00089 / factor
  33. return mu
  34. def calculate_for_segment(self, seg_df: pd.DataFrame, temp_col="C.M.RO_TT_ZJS@out", flow_col=None, tmp_col=None):
  35. """计算单个稳定进水段的 TMP、通量、粘度和膜阻力"""
  36. seg_df = seg_df.copy()
  37. for unit in self.units:
  38. unit_flow_col = flow_col or f"C.M.{unit}_FT_JS@out"
  39. tmp_col = tmp_col or f"C.M.{unit}_DB@press_PV"
  40. if not all(col in seg_df.columns for col in [unit_flow_col, tmp_col, temp_col]):
  41. continue
  42. # 计算前检查
  43. cols_to_check = [tmp_col, unit_flow_col, temp_col]
  44. for col in cols_to_check:
  45. # 检查非数值
  46. if not pd.api.types.is_numeric_dtype(seg_df[col]):
  47. # 尝试强制转换为数值
  48. seg_df[col] = pd.to_numeric(seg_df[col], errors='coerce')
  49. # 检查是否还有 NaN 或非有限值
  50. if seg_df[col].isna().any() or not np.isfinite(seg_df[col]).all():
  51. print(f"⚠️ 列 {col} 包含 NaN 或非有限值")
  52. print(seg_df[col][~np.isfinite(seg_df[col])])
  53. seg_df[f"{unit}_TMP_Pa"] = seg_df[tmp_col] * 1e6
  54. seg_df[f"{unit}_J"] = seg_df[unit_flow_col] / self.A / 3600
  55. seg_df[f"{unit}_mu"] = self.xishan_viscosity(seg_df[temp_col])
  56. seg_df[f"{unit}_R"] = seg_df[f"{unit}_TMP_Pa"] / (seg_df[f"{unit}_J"] * seg_df[f"{unit}_mu"])
  57. seg_df[f"{unit}_R_scaled"] = seg_df[f"{unit}_R"] / self.scale_factor
  58. return seg_df
  59. def calculate_for_segments(self, segments: list, temp_col="C.M.RO_TT_ZJS@out", flow_col=None, tmp_col=None):
  60. result_segments = []
  61. for seg in segments:
  62. seg_res = self.calculate_for_segment(seg, temp_col=temp_col, flow_col=flow_col, tmp_col=tmp_col)
  63. result_segments.append(seg_res)
  64. return result_segments
  65. # =============================
  66. # 单变量稳定性分析(例如 flow/TMP)
  67. # =============================
  68. class VariableStabilityAnalyzer:
  69. """
  70. 单变量统计特征分析
  71. 为每段数据增加:均值 / 标准差 / CV 系数
  72. """
  73. def __init__(self):
  74. pass
  75. def analyze(self, seg: pd.DataFrame, col, prefix=None) -> pd.DataFrame:
  76. """分析单段:计算均值、标准差、CV,并添加三列"""
  77. seg = seg.copy()
  78. prefix = prefix or col
  79. mean_val = seg[col].mean()
  80. std_val = seg[col].std()
  81. cv_val = std_val / mean_val if mean_val != 0 else np.nan
  82. # 添加三列(整段相同)
  83. seg[f"{prefix}_mean"] = mean_val
  84. seg[f"{prefix}_std"] = std_val
  85. seg[f"{prefix}_cv"] = cv_val
  86. return seg
  87. def analyze_segments(self, segments: list, col, prefix=None) -> list:
  88. """批量处理段列表"""
  89. return [self.analyze(seg, col, prefix) for seg in segments]
  90. # =============================
  91. # tmp_start / tmp_end 计算
  92. # =============================
  93. class UFPressureAnalyzer:
  94. """
  95. 统计每段的压力起止平均值 tmp_start / tmp_end
  96. """
  97. def __init__(
  98. self,
  99. tmp_col,
  100. head_n=20,
  101. tail_n=20,
  102. feature_start="tmp_start",
  103. feature_end="tmp_end",
  104. ):
  105. self.tmp_col = tmp_col
  106. self.head_n = head_n
  107. self.tail_n = tail_n
  108. self.feature_start = feature_start
  109. self.feature_end = feature_end
  110. def analyze(self, seg: pd.DataFrame) -> pd.DataFrame:
  111. """分析单段,将 tmp_start / tmp_end 加入 DataFrame"""
  112. seg = seg.sort_values("time").reset_index(drop=True).copy()
  113. if len(seg) < self.head_n + self.tail_n:
  114. seg[self.feature_start] = None
  115. seg[self.feature_end] = None
  116. else:
  117. seg[self.feature_start] = (
  118. seg[self.tmp_col].iloc[: self.head_n].mean()
  119. )
  120. seg[self.feature_end] = (
  121. seg[self.tmp_col].iloc[-self.tail_n :].mean()
  122. )
  123. return seg
  124. def analyze_segments(self, segments: list) -> list:
  125. """批量处理段列表"""
  126. return [self.analyze(seg) for seg in segments]
  127. # =============================
  128. # R_scaled_start / R_scaled_end 计算
  129. # =============================
  130. class UFResistanceAnalyzer:
  131. """
  132. 统计每段的放缩后膜阻力起止值 R_scaled_start / R_scaled_end
  133. """
  134. def __init__(self, resistance_col, head_n=20, tail_n=20, feature_start="R_scaled_start", feature_end="R_scaled_end"):
  135. self.res_col = resistance_col
  136. self.head_n = head_n
  137. self.tail_n = tail_n
  138. self.feature_start = feature_start
  139. self.feature_end = feature_end
  140. def analyze(self, seg: pd.DataFrame) -> pd.DataFrame:
  141. """分析单段,将 R_scaled_start / R_scaled_end 加入 DataFrame"""
  142. seg = seg.sort_values("time").reset_index(drop=True).copy()
  143. if len(seg) < self.head_n + self.tail_n:
  144. seg[self.feature_start] = None
  145. seg[self.feature_end] = None
  146. else:
  147. seg[self.feature_start] = seg[self.res_col].iloc[:self.head_n].median()
  148. seg[self.feature_end] = seg[self.res_col].iloc[-self.tail_n:].median()
  149. return seg
  150. def analyze_segments(self, segments: list) -> list:
  151. """批量处理段列表"""
  152. return [self.analyze(seg) for seg in segments]