from typing import List import numpy as np import pandas as pd class UFResistanceCalculator: """针对单个段或段列表计算膜通量、TMP、粘度和膜阻力""" def __init__(self, units: list, area_m2, scale_factor): self.units = units # 处理面积 self.A = area_m2 if isinstance(self.A, str): try: self.A = float(eval(self.A)) # 将 "128*40" -> 5120.0 except Exception as e: print(f"无法解析面积 self.A: {self.A}, 错误: {e}") # 处理 scale_factor self.scale_factor = scale_factor if isinstance(self.scale_factor, str): try: self.scale_factor = float(eval(self.scale_factor)) except Exception as e: print(f"无法解析 scale_factor: {self.scale_factor}, 错误: {e}") @staticmethod def xishan_viscosity(T_celsius): """锡山水厂水温校正粘度,单位 Pa·s""" x = (T_celsius + 273.15) / 300 factor = 890 / ( 280.68 * x ** -1.9 + 511.45 * x ** -7.7 + 61.131 * x ** -19.6 + 0.45903 * x ** -40 ) mu = 0.00089 / factor return mu def calculate_for_segment(self, seg_df: pd.DataFrame, temp_col="C.M.RO_TT_ZJS@out", flow_col=None, tmp_col=None): """计算单个稳定进水段的 TMP、通量、粘度和膜阻力""" seg_df = seg_df.copy() for unit in self.units: unit_flow_col = flow_col or f"C.M.{unit}_FT_JS@out" tmp_col = tmp_col or f"C.M.{unit}_DB@press_PV" if not all(col in seg_df.columns for col in [unit_flow_col, tmp_col, temp_col]): continue # 计算前检查 cols_to_check = [tmp_col, unit_flow_col, temp_col] for col in cols_to_check: # 检查非数值 if not pd.api.types.is_numeric_dtype(seg_df[col]): # 尝试强制转换为数值 seg_df[col] = pd.to_numeric(seg_df[col], errors='coerce') # 检查是否还有 NaN 或非有限值 if seg_df[col].isna().any() or not np.isfinite(seg_df[col]).all(): print(f"⚠️ 列 {col} 包含 NaN 或非有限值") print(seg_df[col][~np.isfinite(seg_df[col])]) seg_df[f"{unit}_TMP_Pa"] = seg_df[tmp_col] * 1e6 seg_df[f"{unit}_J"] = seg_df[unit_flow_col] / self.A / 3600 seg_df[f"{unit}_mu"] = self.xishan_viscosity(seg_df[temp_col]) seg_df[f"{unit}_R"] = seg_df[f"{unit}_TMP_Pa"] / (seg_df[f"{unit}_J"] * seg_df[f"{unit}_mu"]) seg_df[f"{unit}_R_scaled"] = seg_df[f"{unit}_R"] / self.scale_factor return seg_df def calculate_for_segments(self, segments: list, temp_col="C.M.RO_TT_ZJS@out", flow_col=None, tmp_col=None): result_segments = [] for seg in segments: seg_res = self.calculate_for_segment(seg, temp_col=temp_col, flow_col=flow_col, tmp_col=tmp_col) result_segments.append(seg_res) return result_segments class PumpPowerCalculator: """计算每个segment的供水泵和反洗泵平均功率""" def __init__(self, event_col="event_type"): self.event_col = event_col def calculate_for_segment( self, seg_df: pd.DataFrame, inlet_power_col=None, bw_power_col=None ): """计算单个segment平均功率""" seg_df = seg_df.copy() required_cols = [self.event_col, inlet_power_col, bw_power_col] if not all(col in seg_df.columns for col in required_cols): print("⚠️ segment缺少必要列") return seg_df # 转为数值 seg_df[inlet_power_col] = pd.to_numeric(seg_df[inlet_power_col], errors="coerce") seg_df[bw_power_col] = pd.to_numeric(seg_df[bw_power_col], errors="coerce") # inlet平均功率 inlet_rows = seg_df[seg_df[self.event_col] == "inlet"] inlet_mean = inlet_rows[inlet_power_col].mean() # bw_phys平均功率 bw_rows = seg_df[seg_df[self.event_col] == "bw_phys"] bw_mean = bw_rows[bw_power_col].mean() seg_df["inlet_pump_power_mean"] = inlet_mean seg_df["bw_pump_power_mean"] = bw_mean return seg_df def calculate_for_segments( self, segments: list, inlet_power_col=None, bw_power_col=None ): """批量计算""" result_segments = [] for seg in segments: seg_res = self.calculate_for_segment( seg, inlet_power_col=inlet_power_col, bw_power_col=bw_power_col ) result_segments.append(seg_res) return result_segments # ============================= # 单变量稳定性分析(例如 flow/TMP) # ============================= class VariableStabilityAnalyzer: """ 单变量统计特征分析 为每段数据增加:均值 / 标准差 / CV 系数 """ def __init__(self): pass def analyze(self, seg: pd.DataFrame, col, prefix=None) -> pd.DataFrame: """分析单段:计算均值、标准差、CV,并添加三列""" seg = seg.copy() prefix = prefix or col mean_val = seg[col].mean() std_val = seg[col].std() cv_val = std_val / mean_val if mean_val != 0 else np.nan # 添加三列(整段相同) seg[f"{prefix}_mean"] = mean_val seg[f"{prefix}_std"] = std_val seg[f"{prefix}_cv"] = cv_val return seg def analyze_segments(self, segments: list, col, prefix=None) -> list: """批量处理段列表""" return [self.analyze(seg, col, prefix) for seg in segments] # ============================= # tmp_start / tmp_end 计算 # ============================= class UFPressureAnalyzer: """ 统计每段的压力起止平均值 tmp_start / tmp_end """ def __init__( self, tmp_col, head_n=20, tail_n=20, feature_start="tmp_start", feature_end="tmp_end", ): self.tmp_col = tmp_col self.head_n = head_n self.tail_n = tail_n self.feature_start = feature_start self.feature_end = feature_end def analyze(self, seg: pd.DataFrame) -> pd.DataFrame: """分析单段,将 tmp_start / tmp_end 加入 DataFrame""" seg = seg.sort_values("time").reset_index(drop=True).copy() if len(seg) < self.head_n + self.tail_n: seg[self.feature_start] = None seg[self.feature_end] = None else: seg[self.feature_start] = ( seg[self.tmp_col].iloc[: self.head_n].mean() ) seg[self.feature_end] = ( seg[self.tmp_col].iloc[-self.tail_n :].mean() ) return seg def analyze_segments(self, segments: list) -> list: """批量处理段列表""" return [self.analyze(seg) for seg in segments] # ============================= # R_scaled_start / R_scaled_end 计算 # ============================= class UFResistanceAnalyzer: """ 统计每段的放缩后膜阻力起止值 R_scaled_start / R_scaled_end """ def __init__(self, resistance_col, head_n=20, tail_n=20, feature_start="R_scaled_start", feature_end="R_scaled_end"): self.res_col = resistance_col self.head_n = head_n self.tail_n = tail_n self.feature_start = feature_start self.feature_end = feature_end def analyze(self, seg: pd.DataFrame) -> pd.DataFrame: """分析单段,将 R_scaled_start / R_scaled_end 加入 DataFrame""" seg = seg.sort_values("time").reset_index(drop=True).copy() if len(seg) < self.head_n + self.tail_n: seg[self.feature_start] = None seg[self.feature_end] = None else: seg[self.feature_start] = seg[self.res_col].iloc[:self.head_n].median() seg[self.feature_end] = seg[self.res_col].iloc[-self.tail_n:].median() return seg def analyze_segments(self, segments: list) -> list: """批量处理段列表""" return [self.analyze(seg) for seg in segments]