# cycle_analysis.py import numpy as np import pandas as pd from scipy.optimize import minimize, curve_fit from sklearn.metrics import r2_score from datetime import timedelta class ChemicalCycleSegmenter: """ 根据 event_type == 'bw_chem' 划分化学周期。 每两个化学反冲洗之间所有 inlet 稳定段属于一个化学周期。 若化学周期时间长度 > max_hours,则标记为无效周期。 """ def __init__(self, max_hours=60): self.max_hours = max_hours def assign_cycles(self, df_unit, stable_segments): """ df_unit:含 time, event_type stable_segments:列表,每段有 segment_id, time 等 返回 cycles: dict(cycle_id -> { segments:[], valid:True/False, start_time/end_time }) """ # 找出所有化学反冲洗的位置 ceb_times = df_unit.loc[df_unit["event_type"] == "bw_chem", "time"].sort_values().to_list() cycles = {} if len(ceb_times) < 2: return cycles for i in range(len(ceb_times) - 1): cycle_id = i + 1 t_start = ceb_times[i] t_end = ceb_times[i + 1] # 该周期内的稳定 segment segs = [] for seg in stable_segments: if seg["time"].iloc[0] >= t_start and seg["time"].iloc[-1] <= t_end: segs.append(seg) if len(segs) == 0: continue # 判断周期有效性(长度 <= max_hours) hours = (t_end - t_start).total_seconds() / 3600 valid = hours <= self.max_hours cycles[cycle_id] = { "segments": segs, "start": t_start, "end": t_end, "hours": hours, "valid": valid } # 回写到段中 for seg in segs: seg["chem_cycle_id"] = cycle_id seg["chem_cycle_valid"] = valid return cycles class ShortTermCycleFoulingFitter: """ 对一个化学周期内所有 inlet 段统一拟合短期污染增长速率 nuK。 """ def __init__(self, unit): self.unit = unit @staticmethod def _is_invalid(x): return ( x is None or np.any(pd.isna(x)) or np.any(np.isinf(x)) or np.any(np.abs(x) > 1e20) ) def fit_cycle(self, segments): # ============================ # 1. 初步检查 # ============================ if len(segments) == 0: return np.nan, np.nan try: df = pd.concat(segments, ignore_index=True).copy() except Exception: return np.nan, np.nan required_cols = ["time", f"{self.unit}_R_scaled", f"{self.unit}_J"] if any(col not in df.columns for col in required_cols): return np.nan, np.nan # 去重、排序 df = df.sort_values("time").drop_duplicates(subset=["time"]) if len(df) < 3: return np.nan, np.nan # ============================ # 2. 计算 t # ============================ try: df["t"] = (df["time"] - df["time"].iloc[0]).dt.total_seconds() except Exception: return np.nan, np.nan # 舍弃 t <= 0 df = df[df["t"] > 0].copy() if len(df) < 2: return np.nan, np.nan # 取数据 R = df[f"{self.unit}_R_scaled"].astype(float).values J = df[f"{self.unit}_J"].astype(float).values t = df["t"].astype(float).values R0 = R[0] # ============================ # 3. 输入检查 # ============================ if any(self._is_invalid(x) for x in [R, J, t, R0]): return np.nan, np.nan # 防止 J=0 或极端 t if np.all(J == 0): return np.nan, np.nan # ============================ # 4. 定义鲁棒损失(Huber) # ============================ delta = 1.0 # Huber 阈值,可视需要调节 def huber_loss(residual): abs_r = np.abs(residual) quadratic = np.minimum(abs_r, delta) linear = abs_r - quadratic return 0.5 * quadratic**2 + delta * linear def loss(nuk): pred = R0 + nuk * J * t residual = R - pred return np.sum(huber_loss(residual)) # ============================ # 5. 优化求解 # ============================ try: res = minimize( loss, x0=[1e-6], bounds=[(0, None)], options={"maxiter": 500, "disp": False} ) except Exception: return np.nan, np.nan # 若优化失败 if not res.success: return np.nan, np.nan nuk_opt = float(res.x[0]) # ============================ # 6. 计算预测值与 R2(保证无 NaN/Inf) # ============================ pred = R0 + nuk_opt * J * t if self._is_invalid(pred): return nuk_opt, np.nan try: r2 = r2_score(R, pred) except Exception: r2 = np.nan return nuk_opt, r2 class LongTermFoulingFitter: def __init__(self, unit): self.unit = unit @staticmethod def _power_law(t, a, b, R0): return R0 + a * np.power(t, b) @staticmethod def _is_invalid(x): return ( x is None or np.any(pd.isna(x)) or np.any(np.isinf(x)) or np.any(np.abs(x) > 1e20) # 防止极端爆炸 ) def fit_cycle(self, segments): # 1. 空数据 if not segments: return np.nan, np.nan, np.nan # 2. 合并 try: df = pd.concat(segments, ignore_index=True).copy() except Exception: return np.nan, np.nan, np.nan if "time" not in df.columns or "R_scaled_start" not in df.columns: return np.nan, np.nan, np.nan df = df.sort_values("time").drop_duplicates(subset=["time"]) if len(df) < 3: return np.nan, np.nan, np.nan # 3. 计算 t t0 = df["time"].iloc[0] df["t"] = (df["time"] - t0).dt.total_seconds() df_valid = df[df["t"] > 0].copy() if len(df_valid) < 2: return np.nan, np.nan, np.nan t = df_valid["t"].values.astype(float) R = df_valid["R_scaled_start"].values.astype(float) R0 = df["R_scaled_start"].iloc[0] # 4. 输入检查 if self._is_invalid(t) or self._is_invalid(R) or self._is_invalid(R0): return np.nan, np.nan, np.nan # 5. 非线性拟合 try: popt, _ = curve_fit( lambda tt, a, b: self._power_law(tt, a, b, R0), t, R, p0=(0.01, 1.0), maxfev=8000 ) a, b = popt # 检查结果是否爆炸 if self._is_invalid(a) or self._is_invalid(b): return np.nan, np.nan, np.nan pred = self._power_law(t, a, b, R0) # 再检查预测值 if self._is_invalid(pred): return np.nan, np.nan, np.nan r2 = r2_score(R, pred) # 再检查 r2 if self._is_invalid(r2): return np.nan, np.nan, np.nan return a, b, r2 except Exception: return np.nan, np.nan, np.nan class ChemicalBackwashCleaner: """ 根据有效化学周期计算 CEB 去除膜阻力: ΔR = 上周期末 R_end - 下周期初 R_start """ def __init__(self, unit): self.unit = unit def compute_removal(self, cycles): ids = sorted(cycles.keys()) for i in range(len(ids) - 1): cid1 = ids[i] cid2 = ids[i + 1] c1 = cycles[cid1] c2 = cycles[cid2] if not (c1["valid"] and c2["valid"]): c1["R_removed"] = None continue # 上周期末:最后段的 R_end seg_last = c1["segments"][-1] R_end = seg_last[f"R_scaled_end"] # 下周期初:第一段的 R_start seg_first = c2["segments"][0] R_start = seg_first[f"R_scaled_start"] c1["R_removed"] = R_end - R_start # 最后一个周期无 CEB 去除值 if ids: cycles[ids[-1]]["R_removed"] = None return cycles