| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- # cycle_analysis.py
- import numpy as np
- import pandas as pd
- from scipy.optimize import minimize, curve_fit
- from sklearn.metrics import r2_score
- from datetime import timedelta
- class ChemicalCycleSegmenter:
- """
- 根据 event_type == 'bw_chem' 划分化学周期。
- 每两个化学反冲洗之间所有 inlet 稳定段属于一个化学周期。
- 若化学周期时间长度 > max_hours,则标记为无效周期。
- """
- def __init__(self, max_hours=60):
- self.max_hours = max_hours
- def assign_cycles(self, df_unit, stable_segments):
- """
- df_unit:含 time, event_type
- stable_segments:列表,每段有 segment_id, time 等
- 返回 cycles: dict(cycle_id -> { segments:[], valid:True/False, start_time/end_time })
- """
- # 找出所有化学反冲洗的位置
- ceb_times = df_unit.loc[df_unit["event_type"] == "bw_chem", "time"].sort_values().to_list()
- cycles = {}
- if len(ceb_times) < 2:
- return cycles
- for i in range(len(ceb_times) - 1):
- cycle_id = i + 1
- t_start = ceb_times[i]
- t_end = ceb_times[i + 1]
- # 该周期内的稳定 segment
- segs = []
- for seg in stable_segments:
- if seg["time"].iloc[0] >= t_start and seg["time"].iloc[-1] <= t_end:
- segs.append(seg)
- if len(segs) == 0:
- continue
- # 判断周期有效性(长度 <= max_hours)
- hours = (t_end - t_start).total_seconds() / 3600
- valid = hours <= self.max_hours
- cycles[cycle_id] = {
- "segments": segs,
- "start": t_start,
- "end": t_end,
- "hours": hours,
- "valid": valid
- }
- # 回写到段中
- for seg in segs:
- seg["chem_cycle_id"] = cycle_id
- seg["chem_cycle_valid"] = valid
- return cycles
- class ShortTermCycleFoulingFitter:
- """
- 对一个化学周期内所有 inlet 段统一拟合短期污染增长速率 nuK。
- """
- def __init__(self, unit):
- self.unit = unit
- @staticmethod
- def _is_invalid(x):
- return (
- x is None
- or np.any(pd.isna(x))
- or np.any(np.isinf(x))
- or np.any(np.abs(x) > 1e20)
- )
- def fit_cycle(self, segments):
- # ============================
- # 1. 初步检查
- # ============================
- if len(segments) == 0:
- return np.nan, np.nan
- try:
- df = pd.concat(segments, ignore_index=True).copy()
- except Exception:
- return np.nan, np.nan
- required_cols = ["time", f"{self.unit}_R_scaled", f"{self.unit}_J"]
- if any(col not in df.columns for col in required_cols):
- return np.nan, np.nan
- # 去重、排序
- df = df.sort_values("time").drop_duplicates(subset=["time"])
- if len(df) < 3:
- return np.nan, np.nan
- # ============================
- # 2. 计算 t
- # ============================
- try:
- df["t"] = (df["time"] - df["time"].iloc[0]).dt.total_seconds()
- except Exception:
- return np.nan, np.nan
- # 舍弃 t <= 0
- df = df[df["t"] > 0].copy()
- if len(df) < 2:
- return np.nan, np.nan
- # 取数据
- R = df[f"{self.unit}_R_scaled"].astype(float).values
- J = df[f"{self.unit}_J"].astype(float).values
- t = df["t"].astype(float).values
- R0 = R[0]
- # ============================
- # 3. 输入检查
- # ============================
- if any(self._is_invalid(x) for x in [R, J, t, R0]):
- return np.nan, np.nan
- # 防止 J=0 或极端 t
- if np.all(J == 0):
- return np.nan, np.nan
- # ============================
- # 4. 定义鲁棒损失(Huber)
- # ============================
- delta = 1.0 # Huber 阈值,可视需要调节
- def huber_loss(residual):
- abs_r = np.abs(residual)
- quadratic = np.minimum(abs_r, delta)
- linear = abs_r - quadratic
- return 0.5 * quadratic**2 + delta * linear
- def loss(nuk):
- pred = R0 + nuk * J * t
- residual = R - pred
- return np.sum(huber_loss(residual))
- # ============================
- # 5. 优化求解
- # ============================
- try:
- res = minimize(
- loss,
- x0=[1e-6],
- bounds=[(0, None)],
- options={"maxiter": 500, "disp": False}
- )
- except Exception:
- return np.nan, np.nan
- # 若优化失败
- if not res.success:
- return np.nan, np.nan
- nuk_opt = float(res.x[0])
- # ============================
- # 6. 计算预测值与 R2(保证无 NaN/Inf)
- # ============================
- pred = R0 + nuk_opt * J * t
- if self._is_invalid(pred):
- return nuk_opt, np.nan
- try:
- r2 = r2_score(R, pred)
- except Exception:
- r2 = np.nan
- return nuk_opt, r2
- class LongTermFoulingFitter:
- def __init__(self, unit):
- self.unit = unit
- @staticmethod
- def _power_law(t, a, b, R0):
- return R0 + a * np.power(t, b)
- @staticmethod
- def _is_invalid(x):
- return (
- x is None
- or np.any(pd.isna(x))
- or np.any(np.isinf(x))
- or np.any(np.abs(x) > 1e20) # 防止极端爆炸
- )
- def fit_cycle(self, segments):
- # 1. 空数据
- if not segments:
- return np.nan, np.nan, np.nan
- # 2. 合并
- try:
- df = pd.concat(segments, ignore_index=True).copy()
- except Exception:
- return np.nan, np.nan, np.nan
- if "time" not in df.columns or "R_scaled_start" not in df.columns:
- return np.nan, np.nan, np.nan
- df = df.sort_values("time").drop_duplicates(subset=["time"])
- if len(df) < 3:
- return np.nan, np.nan, np.nan
- # 3. 计算 t
- t0 = df["time"].iloc[0]
- df["t"] = (df["time"] - t0).dt.total_seconds()
- df_valid = df[df["t"] > 0].copy()
- if len(df_valid) < 2:
- return np.nan, np.nan, np.nan
- t = df_valid["t"].values.astype(float)
- R = df_valid["R_scaled_start"].values.astype(float)
- R0 = df["R_scaled_start"].iloc[0]
- # 4. 输入检查
- if self._is_invalid(t) or self._is_invalid(R) or self._is_invalid(R0):
- return np.nan, np.nan, np.nan
- # 5. 非线性拟合
- try:
- popt, _ = curve_fit(
- lambda tt, a, b: self._power_law(tt, a, b, R0),
- t, R,
- p0=(0.01, 1.0),
- maxfev=8000
- )
- a, b = popt
- # 检查结果是否爆炸
- if self._is_invalid(a) or self._is_invalid(b):
- return np.nan, np.nan, np.nan
- pred = self._power_law(t, a, b, R0)
- # 再检查预测值
- if self._is_invalid(pred):
- return np.nan, np.nan, np.nan
- r2 = r2_score(R, pred)
- # 再检查 r2
- if self._is_invalid(r2):
- return np.nan, np.nan, np.nan
- return a, b, r2
- except Exception:
- return np.nan, np.nan, np.nan
- class ChemicalBackwashCleaner:
- """
- 根据有效化学周期计算 CEB 去除膜阻力:
- ΔR = 上周期末 R_end - 下周期初 R_start
- """
- def __init__(self, unit):
- self.unit = unit
- def compute_removal(self, cycles):
- ids = sorted(cycles.keys())
- for i in range(len(ids) - 1):
- cid1 = ids[i]
- cid2 = ids[i + 1]
- c1 = cycles[cid1]
- c2 = cycles[cid2]
- if not (c1["valid"] and c2["valid"]):
- c1["R_removed"] = None
- continue
- # 上周期末:最后段的 R_end
- seg_last = c1["segments"][-1]
- R_end = seg_last[f"R_scaled_end"]
- # 下周期初:第一段的 R_start
- seg_first = c2["segments"][0]
- R_start = seg_first[f"R_scaled_start"]
- c1["R_removed"] = R_end - R_start
- # 最后一个周期无 CEB 去除值
- if ids:
- cycles[ids[-1]]["R_removed"] = None
- return cycles
|