| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- from typing import List
- import numpy as np
- import pandas as pd
- class UFResistanceCalculator:
- """针对单个段或段列表计算膜通量、TMP、粘度和膜阻力"""
- def __init__(self, units: list, area_m2, scale_factor):
- self.units = units
- # 处理面积
- self.A = area_m2
- if isinstance(self.A, str):
- try:
- self.A = float(eval(self.A)) # 将 "128*40" -> 5120.0
- except Exception as e:
- print(f"无法解析面积 self.A: {self.A}, 错误: {e}")
- # 处理 scale_factor
- self.scale_factor = scale_factor
- if isinstance(self.scale_factor, str):
- try:
- self.scale_factor = float(eval(self.scale_factor))
- except Exception as e:
- print(f"无法解析 scale_factor: {self.scale_factor}, 错误: {e}")
- @staticmethod
- def xishan_viscosity(T_celsius):
- """锡山水厂水温校正粘度,单位 Pa·s"""
- x = (T_celsius + 273.15) / 300
- factor = 890 / (
- 280.68 * x ** -1.9 +
- 511.45 * x ** -7.7 +
- 61.131 * x ** -19.6 +
- 0.45903 * x ** -40
- )
- mu = 0.00089 / factor
- return mu
- def calculate_for_segment(self, seg_df: pd.DataFrame, temp_col="C.M.RO_TT_ZJS@out", flow_col=None, tmp_col=None):
- """计算单个稳定进水段的 TMP、通量、粘度和膜阻力"""
- seg_df = seg_df.copy()
- for unit in self.units:
- unit_flow_col = flow_col or f"C.M.{unit}_FT_JS@out"
- tmp_col = tmp_col or f"C.M.{unit}_DB@press_PV"
- if not all(col in seg_df.columns for col in [unit_flow_col, tmp_col, temp_col]):
- continue
- # 计算前检查
- cols_to_check = [tmp_col, unit_flow_col, temp_col]
- for col in cols_to_check:
- # 检查非数值
- if not pd.api.types.is_numeric_dtype(seg_df[col]):
- # 尝试强制转换为数值
- seg_df[col] = pd.to_numeric(seg_df[col], errors='coerce')
- # 检查是否还有 NaN 或非有限值
- if seg_df[col].isna().any() or not np.isfinite(seg_df[col]).all():
- print(f"⚠️ 列 {col} 包含 NaN 或非有限值")
- print(seg_df[col][~np.isfinite(seg_df[col])])
- seg_df[f"{unit}_TMP_Pa"] = seg_df[tmp_col] * 1e6
- seg_df[f"{unit}_J"] = seg_df[unit_flow_col] / self.A / 3600
- seg_df[f"{unit}_mu"] = self.xishan_viscosity(seg_df[temp_col])
- seg_df[f"{unit}_R"] = seg_df[f"{unit}_TMP_Pa"] / (seg_df[f"{unit}_J"] * seg_df[f"{unit}_mu"])
- seg_df[f"{unit}_R_scaled"] = seg_df[f"{unit}_R"] / self.scale_factor
- return seg_df
- def calculate_for_segments(self, segments: list, temp_col="C.M.RO_TT_ZJS@out", flow_col=None, tmp_col=None):
- result_segments = []
- for seg in segments:
- seg_res = self.calculate_for_segment(seg, temp_col=temp_col, flow_col=flow_col, tmp_col=tmp_col)
- result_segments.append(seg_res)
- return result_segments
- # =============================
- # 单变量稳定性分析(例如 flow/TMP)
- # =============================
- class VariableStabilityAnalyzer:
- """
- 单变量统计特征分析
- 为每段数据增加:均值 / 标准差 / CV 系数
- """
- def __init__(self):
- pass
- def analyze(self, seg: pd.DataFrame, col, prefix=None) -> pd.DataFrame:
- """分析单段:计算均值、标准差、CV,并添加三列"""
- seg = seg.copy()
- prefix = prefix or col
- mean_val = seg[col].mean()
- std_val = seg[col].std()
- cv_val = std_val / mean_val if mean_val != 0 else np.nan
- # 添加三列(整段相同)
- seg[f"{prefix}_mean"] = mean_val
- seg[f"{prefix}_std"] = std_val
- seg[f"{prefix}_cv"] = cv_val
- return seg
- def analyze_segments(self, segments: list, col, prefix=None) -> list:
- """批量处理段列表"""
- return [self.analyze(seg, col, prefix) for seg in segments]
- # =============================
- # tmp_start / tmp_end 计算
- # =============================
- class UFPressureAnalyzer:
- """
- 统计每段的压力起止平均值 tmp_start / tmp_end
- """
- def __init__(
- self,
- tmp_col,
- head_n=20,
- tail_n=20,
- feature_start="tmp_start",
- feature_end="tmp_end",
- ):
- self.tmp_col = tmp_col
- self.head_n = head_n
- self.tail_n = tail_n
- self.feature_start = feature_start
- self.feature_end = feature_end
- def analyze(self, seg: pd.DataFrame) -> pd.DataFrame:
- """分析单段,将 tmp_start / tmp_end 加入 DataFrame"""
- seg = seg.sort_values("time").reset_index(drop=True).copy()
- if len(seg) < self.head_n + self.tail_n:
- seg[self.feature_start] = None
- seg[self.feature_end] = None
- else:
- seg[self.feature_start] = (
- seg[self.tmp_col].iloc[: self.head_n].mean()
- )
- seg[self.feature_end] = (
- seg[self.tmp_col].iloc[-self.tail_n :].mean()
- )
- return seg
- def analyze_segments(self, segments: list) -> list:
- """批量处理段列表"""
- return [self.analyze(seg) for seg in segments]
- # =============================
- # R_scaled_start / R_scaled_end 计算
- # =============================
- class UFResistanceAnalyzer:
- """
- 统计每段的放缩后膜阻力起止值 R_scaled_start / R_scaled_end
- """
- def __init__(self, resistance_col, head_n=20, tail_n=20, feature_start="R_scaled_start", feature_end="R_scaled_end"):
- self.res_col = resistance_col
- self.head_n = head_n
- self.tail_n = tail_n
- self.feature_start = feature_start
- self.feature_end = feature_end
- def analyze(self, seg: pd.DataFrame) -> pd.DataFrame:
- """分析单段,将 R_scaled_start / R_scaled_end 加入 DataFrame"""
- seg = seg.sort_values("time").reset_index(drop=True).copy()
- if len(seg) < self.head_n + self.tail_n:
- seg[self.feature_start] = None
- seg[self.feature_end] = None
- else:
- seg[self.feature_start] = seg[self.res_col].iloc[:self.head_n].median()
- seg[self.feature_end] = seg[self.res_col].iloc[-self.tail_n:].median()
- return seg
- def analyze_segments(self, segments: list) -> list:
- """批量处理段列表"""
- return [self.analyze(seg) for seg in segments]
|