# cycle_analysis.py import numpy as np import pandas as pd from scipy.optimize import minimize, curve_fit from sklearn.metrics import r2_score from datetime import timedelta class ChemicalCycleSegmenter: """ 根据 event_type == 'bw_chem' 划分化学周期。 每两个化学反冲洗之间所有 inlet 稳定段属于一个化学周期。 若化学周期时间长度 > max_hours,则标记为无效周期。 """ def __init__(self, max_hours=60): self.max_hours = max_hours def assign_cycles(self, df_unit, stable_segments): """ df_unit:含 time, event_type stable_segments:列表,每段有 segment_id, time 等 返回 cycles: dict(cycle_id -> { segments:[], valid:True/False, start_time/end_time }) """ # 找出所有化学反冲洗的位置 ceb_times = df_unit.loc[df_unit["event_type"] == "bw_chem", "time"].sort_values().to_list() cycles = {} if len(ceb_times) < 2: return cycles for i in range(len(ceb_times) - 1): cycle_id = i + 1 t_start = ceb_times[i] t_end = ceb_times[i + 1] # 该周期内的稳定 segment segs = [] for seg in stable_segments: if seg["time"].iloc[0] >= t_start and seg["time"].iloc[-1] <= t_end: segs.append(seg) if len(segs) == 0: continue # 判断周期有效性(长度 <= max_hours) hours = (t_end - t_start).total_seconds() / 3600 valid = hours <= self.max_hours cycles[cycle_id] = { "segments": segs, "start": t_start, "end": t_end, "hours": hours, "valid": valid } # 回写到段中 for seg in segs: seg["chem_cycle_id"] = cycle_id seg["chem_cycle_valid"] = valid return cycles class ShortTermCycleFoulingFitter: """ 在一个化学周期内: - 每个 segment 有独立的 R0 和 t - 所有 segment 共享同一个短期污染速率 νK """ def __init__(self, unit): self.unit = unit @staticmethod def _is_invalid(x): return ( x is None or np.any(pd.isna(x)) or np.any(np.isinf(x)) or np.any(np.abs(x) > 1e20) ) def fit_cycle(self, segments): # ============================ # 1. 初步检查 # ============================ if not segments: return np.nan, np.nan R_col = f"{self.unit}_R_scaled" J_col = f"{self.unit}_J" # ============================ # 2. 预处理每个 segment # ============================ seg_data = [] for seg in segments: if not {"time", R_col, J_col}.issubset(seg.columns): continue seg = seg.sort_values("time").drop_duplicates(subset=["time"]) if len(seg) < 2: continue if not pd.api.types.is_datetime64_any_dtype(seg["time"]): seg["time"] = pd.to_datetime(seg["time"], errors="coerce") seg = seg.dropna(subset=["time"]) if len(seg) == 0: # 该段完全无效,直接跳过 continue # 局部时间(秒) try: t = (seg["time"] - seg["time"].iloc[0]).dt.total_seconds().astype(float) except Exception: continue # 丢弃 t <= 0 mask = t > 0 if mask.sum() < 1: continue R = seg.loc[mask, R_col].astype(float).values J = seg.loc[mask, J_col].astype(float).values t = t[mask].values R0 = float(seg[R_col].iloc[0]) # 数值与物理检查 if any(self._is_invalid(x) for x in [R, J, t, R0]): continue if np.all(J == 0): continue seg_data.append((R, J, t, R0)) if len(seg_data) == 0: return np.nan, np.nan # ============================ # 3. 定义 Huber 损失 # ============================ delta = 1.0 def huber_loss(res): abs_r = np.abs(res) quad = np.minimum(abs_r, delta) lin = abs_r - quad return 0.5 * quad**2 + delta * lin # ============================ # 4. 定义 cycle 级 loss(共享 νK) # ============================ def loss(nuk): nuk = float(nuk[0]) total = 0.0 for R, J, t, R0 in seg_data: pred = R0 + nuk * J * t res = R - pred total += np.sum(huber_loss(res)) return total # ============================ # 5. 优化求解 νK # ============================ try: res = minimize( loss, x0=[1e-6], bounds=[(0.0, None)], options={"maxiter": 500, "disp": False} ) except Exception: return np.nan, np.nan if not res.success: return np.nan, np.nan nuk_opt = float(res.x[0]) # ============================ # 6. 计算整体 R²(跨 segment) # ============================ R_all = [] pred_all = [] for R, J, t, R0 in seg_data: pred = R0 + nuk_opt * J * t if self._is_invalid(pred): continue R_all.append(R) pred_all.append(pred) if not R_all: return nuk_opt, np.nan R_all = np.concatenate(R_all) pred_all = np.concatenate(pred_all) try: r2 = r2_score(R_all, pred_all) except Exception: r2 = np.nan return nuk_opt, r2 class LongTermFoulingFitter: """ 长期 fouling 拟合: - 使用每个 segment 起点的“稳健不可逆阻力估计” - 拟合不可逆阻力随 cycle 内累计时间的增长 """ def __init__(self, unit, max_hours=60.0): self.unit = unit self.max_hours = max_hours @staticmethod def _power_law(t, a, b, R0): return R0 + a * np.power(t, b) @staticmethod def _is_invalid(x): return ( x is None or np.any(pd.isna(x)) or np.any(np.isinf(x)) or np.any(np.abs(x) > 1e20) ) def _robust_segment_R0(self, seg): """ 计算单个 segment 的稳健不可逆阻力: 1. 优先使用 post_bw_inlet == True 的行 2. 否则使用前 10 行 """ R_col = "R_scaled_start" if "post_bw_inlet" in seg.columns: mask = seg["post_bw_inlet"] == True if mask.any(): R_vals = seg.loc[mask, R_col].astype(float).values if not self._is_invalid(R_vals): return float(np.mean(R_vals)) # fallback:前 10 行 R_vals = seg[R_col].iloc[:10].astype(float).values if not self._is_invalid(R_vals): return float(np.mean(R_vals)) return None def fit_cycle(self, segments): # ============================ # 1. 空检查 # ============================ if not segments: return np.nan, np.nan, np.nan T_list = [] R_list = [] # cycle 起点 try: cycle_start = min(seg["time"].iloc[0] for seg in segments) except Exception: return np.nan, np.nan, np.nan R0_cycle = None for seg in segments: if "time" not in seg.columns or "R_scaled_start" not in seg.columns: continue seg = seg.sort_values("time") if len(seg) < 1: continue t0 = seg["time"].iloc[0] # ===== 稳健 R0 ===== R0 = self._robust_segment_R0(seg) if R0 is None: continue # cycle 起点 R0 if t0 == cycle_start: R0_cycle = R0 # 累计时间(小时) T = (t0 - cycle_start).total_seconds() / 3600.0 if T < 0 or T > self.max_hours: continue if self._is_invalid(T) or self._is_invalid(R0): continue T_list.append(T) R_list.append(R0) # 至少 3 个点 if len(T_list) < 3 or R0_cycle is None: return np.nan, np.nan, np.nan T = np.asarray(T_list, dtype=float) R = np.asarray(R_list, dtype=float) if self._is_invalid(T) or self._is_invalid(R): return np.nan, np.nan, np.nan # ============================ # 2. 幂律拟合(b ∈ [0, 3]) # ============================ try: popt, _ = curve_fit( lambda tt, a, b: self._power_law(tt, a, b, R0_cycle), T, R, p0=(0.1, 1.0), bounds=([0.0, 0.0], [np.inf, 3.0]), maxfev=8000 ) a, b = popt except Exception: return np.nan, np.nan, np.nan if self._is_invalid(a) or self._is_invalid(b): return np.nan, np.nan, np.nan # ============================ # 3. 拟合质量 # ============================ pred = self._power_law(T, a, b, R0_cycle) if self._is_invalid(pred): return np.nan, np.nan, np.nan try: r2 = r2_score(R, pred) except Exception: r2 = np.nan return a, b, r2 class ChemicalBackwashCleaner: """ 根据有效化学周期计算 CEB 去除膜阻力: ΔR = 上周期末 R_end - 下周期初 R_start """ def __init__(self, unit): self.unit = unit def compute_removal(self, cycles): ids = sorted(cycles.keys()) for i in range(len(ids) - 1): cid1 = ids[i] cid2 = ids[i + 1] c1 = cycles[cid1] c2 = cycles[cid2] if not (c1["valid"] and c2["valid"]): c1["R_removed"] = None continue # 上周期末:最后段的 R_end seg_last = c1["segments"][-1] R_end = seg_last[f"R_scaled_end"] # 下周期初:第一段的 R_start seg_first = c2["segments"][0] R_start = seg_first[f"R_scaled_start"] c1["R_removed"] = R_end - R_start # 最后一个周期无 CEB 去除值 if ids: cycles[ids[-1]]["R_removed"] = None return cycles