2 Komitmen 62fc4eb3e5 ... f7461dd03c

Pembuat SHA1 Pesan Tanggal
  junc_WHU f7461dd03c feat: 新增工厂历史数据处理 pipeline(UF-RL) 3 bulan lalu
  junc_WHU 1b486a9e31 更新项目 3 bulan lalu

+ 1 - 1
README.md

@@ -178,7 +178,7 @@ python api_main.py
 ```bash
 # 训练
 cd models/uf-rl/超滤训练源码
-python DQN_train.py
+python fixed_DQN_train.py
 
 # 推理
 cd models/uf-rl/Ultrafiltration_model

+ 178 - 0
models/uf-rl/uf_data_process/calculate.py

@@ -0,0 +1,178 @@
+from typing import List
+
+import numpy as np
+import pandas as pd
+
+class UFResistanceCalculator:
+    """针对单个段或段列表计算膜通量、TMP、粘度和膜阻力"""
+
+    def __init__(self, units: list, area_m2, scale_factor):
+        self.units = units
+
+        # 处理面积
+        self.A = area_m2
+        if isinstance(self.A, str):
+            try:
+                self.A = float(eval(self.A))  # 将 "128*40" -> 5120.0
+            except Exception as e:
+                print(f"无法解析面积 self.A: {self.A}, 错误: {e}")
+
+        # 处理 scale_factor
+        self.scale_factor = scale_factor
+        if isinstance(self.scale_factor, str):
+            try:
+                self.scale_factor = float(eval(self.scale_factor))
+            except Exception as e:
+                print(f"无法解析 scale_factor: {self.scale_factor}, 错误: {e}")
+
+    @staticmethod
+    def xishan_viscosity(T_celsius):
+        """锡山水厂水温校正粘度,单位 Pa·s"""
+        x = (T_celsius + 273.15) / 300
+        factor = 890 / (
+            280.68 * x ** -1.9 +
+            511.45 * x ** -7.7 +
+            61.131 * x ** -19.6 +
+            0.45903 * x ** -40
+        )
+        mu = 0.00089 / factor
+        return mu
+
+    def calculate_for_segment(self, seg_df: pd.DataFrame, temp_col="C.M.RO_TT_ZJS@out", flow_col=None):
+        """计算单个稳定进水段的 TMP、通量、粘度和膜阻力"""
+        seg_df = seg_df.copy()
+
+        for unit in self.units:
+            unit_flow_col = flow_col or f"C.M.{unit}_FT_JS@out"
+            press_col = f"C.M.{unit}_DB@press_PV"
+
+            if not all(col in seg_df.columns for col in [unit_flow_col, press_col, temp_col]):
+                continue
+
+            # 计算前检查
+            cols_to_check = [press_col, unit_flow_col, temp_col]
+            for col in cols_to_check:
+                # 检查非数值
+                if not pd.api.types.is_numeric_dtype(seg_df[col]):
+                    # 尝试强制转换为数值
+                    seg_df[col] = pd.to_numeric(seg_df[col], errors='coerce')
+
+                # 检查是否还有 NaN 或非有限值
+                if seg_df[col].isna().any() or not np.isfinite(seg_df[col]).all():
+                    print(f"⚠️ 列 {col} 包含 NaN 或非有限值")
+                    print(seg_df[col][~np.isfinite(seg_df[col])])
+
+            seg_df[f"{unit}_TMP_Pa"] = seg_df[press_col] * 1e6
+            seg_df[f"{unit}_J"] = seg_df[unit_flow_col] / self.A / 3600
+            seg_df[f"{unit}_mu"] = self.xishan_viscosity(seg_df[temp_col])
+            seg_df[f"{unit}_R"] = seg_df[f"{unit}_TMP_Pa"] / (seg_df[f"{unit}_J"] * seg_df[f"{unit}_mu"])
+            seg_df[f"{unit}_R_scaled"] = seg_df[f"{unit}_R"] / self.scale_factor
+
+        return seg_df
+
+    def calculate_for_segments(self, segments: list, temp_col="C.M.RO_TT_ZJS@out", flow_col=None):
+        result_segments = []
+        for seg in segments:
+            seg_res = self.calculate_for_segment(seg, temp_col=temp_col, flow_col=flow_col)
+            result_segments.append(seg_res)
+        return result_segments
+
+
+# =============================
+#   单变量稳定性分析(例如 flow/TMP)
+# =============================
+class VariableStabilityAnalyzer:
+    """
+    单变量统计特征分析
+    为每段数据增加:均值 / 标准差 / CV 系数
+    """
+
+    def __init__(self):
+        pass
+
+    def analyze(self, seg: pd.DataFrame, col, prefix=None) -> pd.DataFrame:
+        """分析单段:计算均值、标准差、CV,并添加三列"""
+
+        seg = seg.copy()
+
+        prefix = prefix or col
+
+        mean_val = seg[col].mean()
+        std_val = seg[col].std()
+        cv_val = std_val / mean_val if mean_val != 0 else np.nan
+
+        # 添加三列(整段相同)
+        seg[f"{prefix}_mean"] = mean_val
+        seg[f"{prefix}_std"] = std_val
+        seg[f"{prefix}_cv"] = cv_val
+
+        return seg
+
+    def analyze_segments(self, segments: list, col, prefix=None) -> list:
+        """批量处理段列表"""
+        return [self.analyze(seg, col, prefix) for seg in segments]
+
+
+# =============================
+#       R_scaled_start / R_scaled_end 计算
+# =============================
+class UFResistanceAnalyzer:
+    """
+    统计每段的放缩后膜阻力起止值 R_scaled_start / R_scaled_end
+    """
+
+    def __init__(self, resistance_col, head_n=20, tail_n=20, feature_start="R_scaled_start", feature_end="R_scaled_end"):
+        self.res_col = resistance_col
+        self.head_n = head_n
+        self.tail_n = tail_n
+        self.feature_start = feature_start
+        self.feature_end = feature_end
+
+    def analyze(self, seg: pd.DataFrame) -> pd.DataFrame:
+        """分析单段,将 R_scaled_start / R_scaled_end 加入 DataFrame"""
+        seg = seg.sort_values("time").reset_index(drop=True).copy()
+
+        if len(seg) < self.head_n + self.tail_n:
+            seg[self.feature_start] = None
+            seg[self.feature_end] = None
+        else:
+            seg[self.feature_start] = seg[self.res_col].iloc[:self.head_n].median()
+            seg[self.feature_end] = seg[self.res_col].iloc[-self.tail_n:].median()
+        return seg
+
+    def analyze_segments(self, segments: list) -> list:
+        """批量处理段列表"""
+        return [self.analyze(seg) for seg in segments]
+
+
+
+# =============================
+#       CEB 匹配分析
+# =============================
+class UFCEBMatcher:
+    def __init__(self, unit_name, chemical_code):
+        self.unit = unit_name
+        self.chemical_code = chemical_code
+        self.ctrl_col = f"C.M.{unit_name}_DB@word_control"
+
+    def match(self, df, seg_summary):
+        ceb_times = df[df[self.ctrl_col] == self.chemical_code]["time"].sort_values()
+
+        res = []
+        for ceb_t in ceb_times:
+            prev_seg = seg_summary[seg_summary["end_time"] < ceb_t].tail(1)
+            next_seg = seg_summary[seg_summary["start_time"] > ceb_t].head(1)
+            if prev_seg.empty or next_seg.empty:
+                continue
+
+            res.append({
+                "ceb_time": ceb_t,
+                "prev_seg_id": int(prev_seg["segment_id"]),
+                "next_seg_id": int(next_seg["segment_id"]),
+                "R_before": float(prev_seg["R_end"]),
+                "R_after": float(next_seg["R_start"]),
+                "delta_R": float(prev_seg["R_end"] - next_seg["R_start"])
+            })
+
+        return pd.DataFrame(res)
+

+ 113 - 0
models/uf-rl/uf_data_process/filter.py

@@ -0,0 +1,113 @@
+import os
+from typing import List
+import numpy as np
+import pandas as pd
+import yaml
+
+# =============================
+#       过滤网络错误上传重复数据
+# =============================
+class ConstantFlowFilter:
+    """
+    过滤掉因网络异常导致的“连续若干行流量重复”的无效段。
+    """
+
+    def __init__(self, flow_col, repeat_len=20):
+        self.flow_col = flow_col
+        self.repeat_len = repeat_len
+
+    def filter(self, seg_df):
+        valid_segments = []
+
+        for sid, g in seg_df.groupby("segment_id"):
+            g = g.sort_values("time").reset_index(drop=True)
+
+            # 若小于 repeat_len,无需检查
+            if len(g) < self.repeat_len:
+                valid_segments.append(g)
+                continue
+
+            # 检查是否存在连续 repeat_len 行流量完全相同
+            has_constant_flow = False
+            for i in range(len(g) - self.repeat_len + 1):
+                window = g[self.flow_col].iloc[i:i + self.repeat_len]
+                if window.nunique() == 1:     # 流量全程不变
+                    has_constant_flow = True
+                    break
+
+            if not has_constant_flow:
+                valid_segments.append(g)
+
+        return valid_segments
+
+
+# =============================
+#       过滤过短事件
+# =============================
+class EventQualityFilter:
+    """
+    根据段落最基本质量进行过滤:
+    例如点数是否足够、事件类型是否单一等。
+    """
+
+    def __init__(self, min_points=40):
+        self.min_points = min_points
+
+    def filter(self, seg_df):
+        """
+        seg_df: pd.DataFrame 或者 list of pd.DataFrame
+        返回 list of pd.DataFrame,每个元素都是通过质量过滤的段
+        """
+        # 如果传入的是单个 DataFrame,就包装成 list
+        if isinstance(seg_df, pd.DataFrame):
+            seg_list = [seg_df]
+        elif isinstance(seg_df, list):
+            seg_list = seg_df
+        else:
+            raise ValueError(f"Unsupported type for seg_df: {type(seg_df)}")
+
+        valid = []
+        for df in seg_list:
+            for sid, g in df.groupby("segment_id"):
+                if len(g) >= self.min_points:
+                    valid.append(g)
+
+        return valid
+
+
+
+# =============================
+#       稳定进水数据提取
+# =============================
+class InletSegmentFilter:
+    def __init__(self, control_col, stable_value=26.0, min_points=40):
+        self.control_col = control_col
+        self.stable_value = stable_value
+        self.min_points = min_points
+
+    def extract(self, seg_df):
+        """
+        seg_df: pd.DataFrame 或 list of pd.DataFrame
+        返回 list of pd.DataFrame,每个元素都是稳定段
+        """
+        # 如果传入的是单个 DataFrame,就包装成 list
+        if isinstance(seg_df, pd.DataFrame):
+            seg_list = [seg_df]
+        elif isinstance(seg_df, list):
+            seg_list = seg_df
+        else:
+            raise ValueError(f"Unsupported type for seg_df: {type(seg_df)}")
+
+        stable_segments = []
+        for df in seg_list:
+            for sid, g in df.groupby("segment_id"):
+                stable = g[g[self.control_col] == self.stable_value]
+                if len(stable) >= self.min_points:
+                    stable_segments.append(stable)
+
+        return stable_segments
+
+
+
+
+

+ 308 - 0
models/uf-rl/uf_data_process/fit.py

@@ -0,0 +1,308 @@
+# cycle_analysis.py
+import numpy as np
+import pandas as pd
+from scipy.optimize import minimize, curve_fit
+from sklearn.metrics import r2_score
+from datetime import timedelta
+
+class ChemicalCycleSegmenter:
+    """
+    根据 event_type == 'bw_chem' 划分化学周期。
+    每两个化学反冲洗之间所有 inlet 稳定段属于一个化学周期。
+    若化学周期时间长度 > max_hours,则标记为无效周期。
+    """
+
+    def __init__(self, max_hours=60):
+        self.max_hours = max_hours
+
+    def assign_cycles(self, df_unit, stable_segments):
+        """
+        df_unit:含 time, event_type
+        stable_segments:列表,每段有 segment_id, time 等
+
+        返回 cycles: dict(cycle_id -> { segments:[], valid:True/False, start_time/end_time })
+        """
+
+        # 找出所有化学反冲洗的位置
+        ceb_times = df_unit.loc[df_unit["event_type"] == "bw_chem", "time"].sort_values().to_list()
+
+        cycles = {}
+        if len(ceb_times) < 2:
+            return cycles
+
+        for i in range(len(ceb_times) - 1):
+            cycle_id = i + 1
+            t_start = ceb_times[i]
+            t_end = ceb_times[i + 1]
+
+            # 该周期内的稳定 segment
+            segs = []
+            for seg in stable_segments:
+                if seg["time"].iloc[0] >= t_start and seg["time"].iloc[-1] <= t_end:
+                    segs.append(seg)
+
+            if len(segs) == 0:
+                continue
+
+            # 判断周期有效性(长度 <= max_hours)
+            hours = (t_end - t_start).total_seconds() / 3600
+            valid = hours <= self.max_hours
+
+            cycles[cycle_id] = {
+                "segments": segs,
+                "start": t_start,
+                "end": t_end,
+                "hours": hours,
+                "valid": valid
+            }
+
+            # 回写到段中
+            for seg in segs:
+                seg["chem_cycle_id"] = cycle_id
+                seg["chem_cycle_valid"] = valid
+
+        return cycles
+
+class ShortTermCycleFoulingFitter:
+    """
+    对一个化学周期内所有 inlet 段统一拟合短期污染增长速率 nuK。
+    """
+
+    def __init__(self, unit):
+        self.unit = unit
+
+    @staticmethod
+    def _is_invalid(x):
+        return (
+            x is None
+            or np.any(pd.isna(x))
+            or np.any(np.isinf(x))
+            or np.any(np.abs(x) > 1e20)
+        )
+
+    def fit_cycle(self, segments):
+        # ============================
+        # 1. 初步检查
+        # ============================
+        if len(segments) == 0:
+            return np.nan, np.nan
+
+        try:
+            df = pd.concat(segments, ignore_index=True).copy()
+        except Exception:
+            return np.nan, np.nan
+
+        required_cols = ["time", f"{self.unit}_R_scaled", f"{self.unit}_J"]
+        if any(col not in df.columns for col in required_cols):
+            return np.nan, np.nan
+
+        # 去重、排序
+        df = df.sort_values("time").drop_duplicates(subset=["time"])
+
+        if len(df) < 3:
+            return np.nan, np.nan
+
+        # ============================
+        # 2. 计算 t
+        # ============================
+        try:
+            df["t"] = (df["time"] - df["time"].iloc[0]).dt.total_seconds()
+        except Exception:
+            return np.nan, np.nan
+
+        # 舍弃 t <= 0
+        df = df[df["t"] > 0].copy()
+        if len(df) < 2:
+            return np.nan, np.nan
+
+        # 取数据
+        R = df[f"{self.unit}_R_scaled"].astype(float).values
+        J = df[f"{self.unit}_J"].astype(float).values
+        t = df["t"].astype(float).values
+        R0 = R[0]
+
+        # ============================
+        # 3. 输入检查
+        # ============================
+        if any(self._is_invalid(x) for x in [R, J, t, R0]):
+            return np.nan, np.nan
+
+        # 防止 J=0 或极端 t
+        if np.all(J == 0):
+            return np.nan, np.nan
+
+        # ============================
+        # 4. 定义鲁棒损失(Huber)
+        # ============================
+        delta = 1.0  # Huber 阈值,可视需要调节
+
+        def huber_loss(residual):
+            abs_r = np.abs(residual)
+            quadratic = np.minimum(abs_r, delta)
+            linear = abs_r - quadratic
+            return 0.5 * quadratic**2 + delta * linear
+
+        def loss(nuk):
+            pred = R0 + nuk * J * t
+            residual = R - pred
+            return np.sum(huber_loss(residual))
+
+        # ============================
+        # 5. 优化求解
+        # ============================
+        try:
+            res = minimize(
+                loss,
+                x0=[1e-6],
+                bounds=[(0, None)],
+                options={"maxiter": 500, "disp": False}
+            )
+        except Exception:
+            return np.nan, np.nan
+
+        # 若优化失败
+        if not res.success:
+            return np.nan, np.nan
+
+        nuk_opt = float(res.x[0])
+
+        # ============================
+        # 6. 计算预测值与 R2(保证无 NaN/Inf)
+        # ============================
+        pred = R0 + nuk_opt * J * t
+
+        if self._is_invalid(pred):
+            return nuk_opt, np.nan
+
+        try:
+            r2 = r2_score(R, pred)
+        except Exception:
+            r2 = np.nan
+
+        return nuk_opt, r2
+
+
+
+class LongTermFoulingFitter:
+
+    def __init__(self, unit):
+        self.unit = unit
+
+    @staticmethod
+    def _power_law(t, a, b, R0):
+        return R0 + a * np.power(t, b)
+
+    @staticmethod
+    def _is_invalid(x):
+        return (
+            x is None
+            or np.any(pd.isna(x))
+            or np.any(np.isinf(x))
+            or np.any(np.abs(x) > 1e20)  # 防止极端爆炸
+        )
+
+    def fit_cycle(self, segments):
+        # 1. 空数据
+        if not segments:
+            return np.nan, np.nan, np.nan
+
+        # 2. 合并
+        try:
+            df = pd.concat(segments, ignore_index=True).copy()
+        except Exception:
+            return np.nan, np.nan, np.nan
+
+        if "time" not in df.columns or "R_scaled_start" not in df.columns:
+            return np.nan, np.nan, np.nan
+
+        df = df.sort_values("time").drop_duplicates(subset=["time"])
+
+        if len(df) < 3:
+            return np.nan, np.nan, np.nan
+
+        # 3. 计算 t
+        t0 = df["time"].iloc[0]
+        df["t"] = (df["time"] - t0).dt.total_seconds()
+
+        df_valid = df[df["t"] > 0].copy()
+        if len(df_valid) < 2:
+            return np.nan, np.nan, np.nan
+
+        t = df_valid["t"].values.astype(float)
+        R = df_valid["R_scaled_start"].values.astype(float)
+        R0 = df["R_scaled_start"].iloc[0]
+
+        # 4. 输入检查
+        if self._is_invalid(t) or self._is_invalid(R) or self._is_invalid(R0):
+            return np.nan, np.nan, np.nan
+
+        # 5. 非线性拟合
+        try:
+            popt, _ = curve_fit(
+                lambda tt, a, b: self._power_law(tt, a, b, R0),
+                t, R,
+                p0=(0.01, 1.0),
+                maxfev=8000
+            )
+            a, b = popt
+
+            # 检查结果是否爆炸
+            if self._is_invalid(a) or self._is_invalid(b):
+                return np.nan, np.nan, np.nan
+
+            pred = self._power_law(t, a, b, R0)
+
+            # 再检查预测值
+            if self._is_invalid(pred):
+                return np.nan, np.nan, np.nan
+
+            r2 = r2_score(R, pred)
+
+            # 再检查 r2
+            if self._is_invalid(r2):
+                return np.nan, np.nan, np.nan
+
+            return a, b, r2
+
+        except Exception:
+            return np.nan, np.nan, np.nan
+
+
+class ChemicalBackwashCleaner:
+    """
+    根据有效化学周期计算 CEB 去除膜阻力:
+    ΔR = 上周期末 R_end - 下周期初 R_start
+    """
+
+    def __init__(self, unit):
+        self.unit = unit
+
+    def compute_removal(self, cycles):
+        ids = sorted(cycles.keys())
+
+        for i in range(len(ids) - 1):
+            cid1 = ids[i]
+            cid2 = ids[i + 1]
+
+            c1 = cycles[cid1]
+            c2 = cycles[cid2]
+
+            if not (c1["valid"] and c2["valid"]):
+                c1["R_removed"] = None
+                continue
+
+            # 上周期末:最后段的 R_end
+            seg_last = c1["segments"][-1]
+            R_end = seg_last[f"R_scaled_end"]
+
+            # 下周期初:第一段的 R_start
+            seg_first = c2["segments"][0]
+            R_start = seg_first[f"R_scaled_start"]
+
+            c1["R_removed"] = R_end - R_start
+
+        # 最后一个周期无 CEB 去除值
+        if ids:
+            cycles[ids[-1]]["R_removed"] = None
+
+        return cycles

+ 79 - 0
models/uf-rl/uf_data_process/label.py

@@ -0,0 +1,79 @@
+import numpy as np
+import pandas as pd
+
+# =============================
+#     事件识别和划分
+# =============================
+class UFEventClassifier:
+    def __init__(self, unit_name, inlet_codes, physical_code, chemical_code):
+        self.unit = unit_name
+        self.inlet_codes = inlet_codes
+        self.physical_code = physical_code
+        self.chemical_code = chemical_code
+        self.ctrl_col = f"C.M.{unit_name}_DB@word_control"
+
+    def classify(self, df):
+        df = df.copy()
+        df["event_type"] = "other"
+
+        df.loc[df[self.ctrl_col].isin(self.inlet_codes), "event_type"] = "inlet"
+        df.loc[(df[self.ctrl_col] >= self.physical_code - 5) &(df[self.ctrl_col] <= self.physical_code + 5),"event_type"] = "bw_phys"
+        df.loc[(df[self.ctrl_col] >= self.chemical_code - 5) &(df[self.ctrl_col] <= self.chemical_code + 5),"event_type"] = "bw_chem"
+
+        return df
+
+    def segment(self, df):
+        df = df.copy()
+        df["segment_id"] = np.nan
+        seg_id = 0
+        in_inlet = False
+
+        for i, evt in enumerate(df["event_type"]):
+            if evt == "inlet":
+                if not in_inlet:
+                    seg_id += 1
+                    in_inlet = True
+                df.loc[i, "segment_id"] = seg_id
+            else:
+                in_inlet = False
+
+        df = df[df["segment_id"].notna()].copy()
+        df["segment_id"] = df["segment_id"].astype(int)
+        return df
+
+
+class PostBackwashInletMarker:
+    """
+    标记反冲洗事件后的前 N 个进水点
+    """
+
+    def __init__(self, n_points=10):
+        self.n_points = n_points
+        self.label_col = "post_bw_inlet"  # 新标记列
+
+    def mark(self, df: pd.DataFrame) -> pd.DataFrame:
+        df = df.copy()
+
+        # 确保 event_type 清洗干净,避免 object array 卡死
+        df['event_type'] = (
+            df['event_type']
+            .astype(str)
+            .str.strip()
+            .fillna('')
+        )
+
+        df[self.label_col] = False
+
+        # 找出所有反冲洗事件索引
+        bw_idx = df.index[df['event_type'].isin(['bw_phys', 'bw_chem'])]
+
+        # 预先计算 inlet mask,避免多次 object-level 比较
+        inlet_mask = (df['event_type'] == 'inlet')
+
+        for idx in bw_idx:
+            # 只看 idx 之后的 inlet
+            candidate_idx = df.index[(df.index > idx) & inlet_mask]
+            post_idx = candidate_idx[: self.n_points]
+            df.loc[post_idx, self.label_col] = True
+
+        return df

+ 41 - 0
models/uf-rl/uf_data_process/load.py

@@ -0,0 +1,41 @@
+import os
+import pandas as pd
+import yaml
+
+
+# =============================
+#        配置加载器
+# =============================
+class UFConfigLoader:
+    def __init__(self, config_path="config/uf_analyze_config.yaml"):
+        with open(config_path, "r", encoding="utf-8") as f:
+            self.cfg = yaml.safe_load(f)
+
+    @property
+    def uf(self):
+        return self.cfg["UF"]
+
+    @property
+    def params(self):
+        return self.cfg["Params"]
+
+    @property
+    def paths(self):
+        return self.cfg["Paths"]
+
+# =============================
+#        数据加载器
+# =============================
+class UFDataLoader:
+    def __init__(self, data_path: str):
+        self.data_path = data_path
+
+    def load_all_csv(self) -> pd.DataFrame:
+        """读取目录下所有 CSV 并合并成一个 DataFrame"""
+        files = [f for f in os.listdir(self.data_path) if f.endswith(".csv")]
+        dfs = [pd.read_csv(os.path.join(self.data_path, f), parse_dates=["time"]) for f in files]
+        return pd.concat(dfs, ignore_index=True)
+
+    def load_single_csv(self, file_name: str) -> pd.DataFrame:
+        """读取单个 CSV"""
+        return pd.read_csv(os.path.join(self.data_path, file_name), parse_dates=["time"])

+ 353 - 0
models/uf-rl/uf_data_process/pipeline.py

@@ -0,0 +1,353 @@
+import os
+from typing import List, Dict
+
+import numpy as np
+import pandas as pd
+
+from .load import UFConfigLoader, UFDataLoader
+from .label import UFEventClassifier, PostBackwashInletMarker
+from .filter import ConstantFlowFilter, EventQualityFilter, InletSegmentFilter
+from .calculate import UFResistanceCalculator, VariableStabilityAnalyzer, UFResistanceAnalyzer, UFCEBMatcher
+from .fit import ChemicalCycleSegmenter, ChemicalBackwashCleaner, ShortTermCycleFoulingFitter,LongTermFoulingFitter
+
+class UFAnalysisPipeline:
+    """
+    Pipeline:
+    - 支持四个机组逐机组标记事件并分段
+    - 过滤与稳定段提取
+    - 每段逐行计算膜阻力,并对多变量做稳定性分析
+    - 仅在 CEB 前后最近段均为有效稳定进水段时计算去除阻力
+    - 生成每段汇总表并保存
+    """
+
+    def __init__(self, cfg: 'UFConfigLoader'):
+        self.cfg = cfg
+        uf_cfg = cfg.uf
+        params = cfg.params
+        paths = cfg.paths
+
+        # 机组列表(优先从配置读取)
+        self.units = uf_cfg.get("units", ["UF1", "UF2", "UF3", "UF4"])
+        self.stable_inlet_code = uf_cfg.get("stable_inlet_code", "26.0")
+        self.loader = UFDataLoader(paths["raw_data_path"])
+        self.output_dir = paths["output_path"]
+
+        # 过滤器
+        self.min_points = params.get("min_points", 40)
+        self.initial_points = params.get("initial_points", 10)
+
+        self.quality_filter = EventQualityFilter(min_points=self.min_points)
+        self.initial_label = PostBackwashInletMarker(n_points=self.initial_points)
+
+        # 阻力计算器
+        self.res_calc = UFResistanceCalculator(self.units, area_m2=uf_cfg["area_m2"], scale_factor=params.get("scale_factor", 1e7))
+        self.segment_head_n = params.get("segment_head_n", 20)
+        self.segment_tail_n = params.get("segment_tail_n", 20)
+
+        # 需要检查的常量列名(除流量外)
+        self.temp_col = uf_cfg.get("temp_col", "C.M.RO_TT_ZJS@out")
+        self.orp_col = uf_cfg.get("orp_col", "C.M.UF_ORP_ZCS@out")
+        self.cond_col = uf_cfg.get("cond_col", "C.M.RO_Cond_ZJS@out")
+
+    # ----------------------------
+    # 加载所有 CSV 并合并
+    # ----------------------------
+    def load_all(self) -> pd.DataFrame:
+        print(f"[INFO] Loading raw data from: {self.loader.data_path}")
+        df = self.loader.load_all_csv()
+
+        if df.empty:
+            print("[WARN] No CSV files found!")
+        return df
+
+    # ----------------------------
+    # 多变量稳定性检查(针对单个 segment)
+    # 返回每个变量的稳定性 bool 与总体稳定 bool(所有变量均稳定)
+    # ----------------------------
+    def analyze_variables_stability(self, seg: pd.DataFrame, flow_col: str) -> Dict[str, bool]:
+        cols = {
+            "flow": flow_col,
+            "temp": self.temp_col,
+            "orp": self.orp_col,
+            "cond": self.cond_col
+        }
+        results = {}
+        for name, col in cols.items():
+            if col not in seg.columns:
+                results[name] = False
+                continue
+            series = seg[col].dropna()
+            if len(series) < 2:
+                results[name] = False
+                continue
+            std = float(series.std())
+            x = np.arange(len(series))
+            try:
+                slope = float(np.polyfit(x, series.values, 1)[0])
+            except Exception:
+                slope = np.inf
+            results[name] = (std <= self.var_max_std) and (abs(slope) <= self.var_max_slope)
+        results["all_stable"] = all(results[k] for k in ["flow", "temp", "orp", "cond"])
+        return results
+
+    # ----------------------------
+    # 从 stable_segments 列表计算每段的 R_start/R_end(按机组)
+    # 返回 seg_summary DataFrame(每行对应一个 stable inlet segment)
+    # ----------------------------
+    def summarize_R_for_unit(self, stable_segments: List[pd.DataFrame], unit: str) -> pd.DataFrame:
+        rows = []
+        res_col = f"{unit}_R"
+        for seg in stable_segments:
+            seg = seg.sort_values("time").reset_index(drop=True)
+            if len(seg) < (self.segment_head_n + self.segment_tail_n):
+                # 跳过过短的
+                continue
+            R_start = seg[res_col].iloc[:self.segment_head_n].median()
+            R_end = seg[res_col].iloc[-self.segment_tail_n:].median()
+            rows.append({
+                "unit": unit,
+                "segment_id": int(seg["segment_id"].iloc[0]),
+                "start_time": seg["time"].iloc[0],
+                "end_time": seg["time"].iloc[-1],
+                "R_start": float(R_start),
+                "R_end": float(R_end),
+                "n_points": len(seg)
+            })
+        return pd.DataFrame(rows)
+
+    # ----------------------------
+    # 主运行逻辑
+    # ----------------------------
+    def run(self):
+        df = self.load_all()
+        if df.empty:
+            raise ValueError("未找到原始数据或数据为空")
+
+        df = df.sort_values("time").reset_index(drop=True)
+        df = df.ffill().bfill()
+        cols_to_convert = df.columns.difference(["time"])
+        df[cols_to_convert] = df[cols_to_convert].apply(pd.to_numeric, errors='coerce')
+        df[cols_to_convert] = df[cols_to_convert].ffill().bfill()
+
+        # 为安全:确保 time 列为 datetime
+        if not np.issubdtype(df["time"].dtype, np.datetime64):
+            df["time"] = pd.to_datetime(df["time"])
+
+        # 逐机组处理
+        for unit in self.units:
+            print(f"Processing {unit} ...")
+            ctrl_col = f"C.M.{unit}_DB@word_control"
+            flow_col = f"C.M.{unit}_FT_JS@out"
+
+            # 去除无关列
+            other_units = [u for u in self.units if u != unit]
+            cols_to_drop = [col for col in df.columns if any(ou in col for ou in other_units)]
+            df_unit = df.drop(columns=cols_to_drop)
+
+            # 逐机组事件识别:使用 UFEventClassifier(实例化单机组)
+            event_clf = UFEventClassifier(unit, self.cfg.uf["inlet_codes"],
+                                          self.cfg.uf["physical_bw_code"], self.cfg.uf["chemical_bw_code"])
+            df_unit = event_clf.classify(df_unit)  # 产生 event_type 列
+            df_unit_mark = self.initial_label.mark(df_unit)
+            seg_df = event_clf.segment(df_unit_mark)
+
+            # 对 seg_df 进行按 segment 分组后逐段过滤:
+            const_flow_filter = ConstantFlowFilter(flow_col=flow_col, repeat_len=20)
+            segments = const_flow_filter.filter(seg_df) # 去除出现网络错误的进水段
+            segments = self.quality_filter.filter(segments) # 去除时间过短的进水段
+            stable_extractor = InletSegmentFilter(ctrl_col,
+                                                  stable_value=self.stable_inlet_code,
+                                                  min_points=self.min_points)
+
+            stable_segments = stable_extractor.extract(segments)  # 提取稳定进水数据
+
+            # 若无稳定进水段,则记录不稳定段并跳过该机组
+            if len(stable_segments) == 0:
+                print(f"  No stable segments found for {unit}")
+                continue
+
+            # -----------------------------
+            # 逐段计算膜阻力
+            # -----------------------------
+            stable_segments = self.res_calc.calculate_for_segments(
+                stable_segments,
+                temp_col=self.temp_col,
+                flow_col=flow_col
+            )
+
+            # -----------------------------
+            # 逐段计算 放缩后的R_start / R_end 与进水变量稳定性
+            # -----------------------------
+
+            # 变量稳定性分析
+            vsa = VariableStabilityAnalyzer()
+            stable_segments = vsa.analyze_segments(stable_segments, col=flow_col, prefix="flow")
+            stable_segments = vsa.analyze_segments(stable_segments, col=self.temp_col, prefix="temp")
+
+            # 膜阻力统计
+            res_scaled_col = f"{unit}_R_scaled"
+            ura = UFResistanceAnalyzer(
+                resistance_col=res_scaled_col,
+                head_n=self.segment_head_n,
+                tail_n=self.segment_tail_n
+            )
+            stable_segments = ura.analyze_segments(stable_segments)
+
+            # ----------------------------------
+            # 1. 化学周期划分
+            # ----------------------------------
+            cycle_segmenter = ChemicalCycleSegmenter(max_hours=60)
+            cycles = cycle_segmenter.assign_cycles(df_unit, stable_segments)
+
+            # ----------------------------------
+            # 2. 每个周期拟合短期污染 nuK
+            # ----------------------------------
+            stf = ShortTermCycleFoulingFitter(unit)
+            for cid, cycle in cycles.items():
+                if not cycle["valid"]:
+                    cycle["nuk"] = None
+                    cycle["nuk_r2"] = None
+                    continue
+
+                nuk, r2 = stf.fit_cycle(cycle["segments"])
+                cycle["nuk"] = nuk
+                cycle["nuk_r2"] = r2
+
+                for seg in cycle["segments"]:
+                    seg["cycle_nuK"] = nuk
+                    seg["cycle_nuK_R2"] = r2
+
+            # ----------------------------------
+            # 3. 长期污染模型幂律拟合
+            # ----------------------------------
+            ltf = LongTermFoulingFitter(unit)
+            for cid, cycle in cycles.items():
+                if not cycle["valid"]:
+                    cycle["a"] = None
+                    cycle["b"] = None
+                    cycle["lt_r2"] = None
+                    continue
+
+                a, b, r2 = ltf.fit_cycle(cycle["segments"])
+                cycle["a"] = a
+                cycle["b"] = b
+                cycle["lt_r2"] = r2
+
+                for seg in cycle["segments"]:
+                    seg["cycle_long_a"] = a
+                    seg["cycle_long_b"] = b
+                    seg["cycle_long_r2"] = r2
+
+            # ----------------------------------
+            # 4. 计算化学反冲洗去除膜阻力
+            # ----------------------------------
+            cbc = ChemicalBackwashCleaner(unit)
+            cycles = cbc.compute_removal(cycles)
+
+            # 回写到稳定段
+            for cid, cycle in cycles.items():
+                R_removed = cycle["R_removed"]
+                for seg in cycle["segments"]:
+                    seg["cycle_R_removed"] = R_removed
+
+            # -----------------------------
+            # 记录所有稳定进水段信息(stable_segments 仅包含稳定段)
+            # -----------------------------
+            all_inlet_segment_rows = []
+
+            for seg in stable_segments:  # seg 是一个 DataFrame
+                seg_id = int(seg["segment_id"].iloc[0])
+
+                row = {
+                    "seg_id": seg_id,
+                    "start_time": seg["time"].iloc[0],
+                    "end_time": seg["time"].iloc[-1],
+                }
+
+                # 添加膜阻力与变量波动性分析列
+                row.update({
+                    "R_scaled_start": seg["R_scaled_start"].iloc[0] if "R_scaled_start" in seg.columns else None,
+                    "R_scaled_end": seg["R_scaled_end"].iloc[-1] if "R_scaled_end" in seg.columns else None,
+
+                    "flow_mean": seg["flow_mean"].iloc[0] if "flow_mean" in seg.columns else None,
+                    "flow_std": seg["flow_std"].iloc[0] if "flow_std" in seg.columns else None,
+                    "flow_cv": seg["flow_cv"].iloc[0] if "flow_cv" in seg.columns else None,
+
+                    "temp_mean": seg["temp_mean"].iloc[0] if "temp_mean" in seg.columns else None,
+                    "temp_std": seg["temp_std"].iloc[0] if "temp_std" in seg.columns else None,
+                    "temp_cv": seg["temp_cv"].iloc[0] if "temp_cv" in seg.columns else None,
+                })
+
+                # === 将化学周期字段写入 row ===
+                chem_cols = [col for col in seg.columns if col.startswith("chem_cycle_")]
+                for col in chem_cols:
+                    row[col] = seg[col].iloc[0]
+
+                # === 将 nuK、长期污染参数、CEB 去除阻力写入 row ===
+                cycle_cols = [col for col in seg.columns if col.startswith("cycle_")]
+                for col in cycle_cols:
+                    row[col] = seg[col].iloc[0]
+
+                all_inlet_segment_rows.append(row)
+
+            # -----------------------------
+            # 最终构建 DataFrame
+            # -----------------------------
+            df_segments = pd.DataFrame(all_inlet_segment_rows)
+
+            # 输出文件名:unit_xxx_segments.csv
+            filename = f"{unit}_segments.csv"
+            output_path = os.path.join(self.output_dir, filename)
+
+            df_segments.to_csv(output_path, index=False)
+            print(f"Saved inlet segment data to: {output_path}")
+
+            # ======================================================
+            # 扩展功能:化学周期过滤与导出(基于 chem_cycle_id)
+            # ======================================================
+
+            print(f">>> Post-filtering chemical cycles for unit {unit} ...")
+
+            # 1. 仅保留有化学周期标识的段
+            if "chem_cycle_id" not in df_segments.columns:
+                print(f"No chem_cycle_id found for unit {unit}, skipping cycle filtering.")
+            else:
+                df_valid = df_segments.dropna(subset=["chem_cycle_id"]).copy()
+                df_valid["chem_cycle_id"] = df_valid["chem_cycle_id"].astype(int)
+
+                # 2. 根据 R2 过滤化学周期
+                grouped = df_valid.groupby("chem_cycle_id")
+
+                valid_cycle_ids = []
+                for cid, g in grouped:
+                    # 同一化学周期的 R2 相同(周期级拟合),用第一行即可
+                    nuk_r2 = g["cycle_nuK_R2"].iloc[0] if "cycle_nuK_R2" in g.columns else None
+                    long_r2 = g["cycle_long_r2"].iloc[0] if "cycle_long_r2" in g.columns else None
+
+                    if nuk_r2 is not None and long_r2 is not None:
+                        if nuk_r2 > 0.5 and long_r2 > 0.5:
+                            valid_cycle_ids.append(cid)
+
+                # 3. 过滤后的化学周期数据
+                df_cycles_kept = df_valid[df_valid["chem_cycle_id"].isin(valid_cycle_ids)]
+
+                # 4. 保存过滤结果
+                cycle_filename = f"{unit}_filtered_cycles.csv"
+                cycle_output_path = os.path.join(self.output_dir, cycle_filename)
+
+                df_cycles_kept.to_csv(cycle_output_path, index=False)
+                print(f"Saved filtered chemical cycles to: {cycle_output_path}")
+
+                # 5. 统计保留行占比
+                total_rows = len(df_valid)
+                kept_rows = len(df_cycles_kept)
+                pct = kept_rows / total_rows * 100 if total_rows > 0 else 0
+
+                print(
+                    f"Unit {unit}: kept {kept_rows}/{total_rows} rows "
+                    f"({pct:.2f} percent) after cycle R2 filtering."
+                )
+
+
+
+

+ 34 - 0
models/uf-rl/uf_data_process/run_ufdata_pipeline.py

@@ -0,0 +1,34 @@
+import os
+import sys
+
+SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
+PROJECT_ROOT = os.path.dirname(SCRIPT_DIR)
+sys.path.append(PROJECT_ROOT)
+
+from uf_data.load import UFConfigLoader
+from uf_data.pipeline import UFAnalysisPipeline
+
+
+def main():
+    print("=====================================")
+    print("    UF Multi-Unit Analysis Pipeline")
+    print("=====================================")
+
+    # 1. 加载配置文件
+    config_path = os.path.join(PROJECT_ROOT, "config", "uf_analyze_config.yaml")
+    cfg = UFConfigLoader(config_path)
+
+    # 2. 创建 pipeline
+    pipeline = UFAnalysisPipeline(cfg)
+
+    # 3. 执行 pipeline
+    print(">>> Running pipeline ...")
+    results = pipeline.run()
+
+    print("=========================================")
+    print("Pipeline finished. Results saved.")
+    print("=========================================")
+
+
+if __name__ == "__main__":
+    main()

+ 39 - 0
models/uf-rl/uf_data_process/uf_analyze_config.yaml

@@ -0,0 +1,39 @@
+UF:
+  units: ["UF1", "UF2", "UF3", "UF4"]
+  area_m2: 128 * 40
+
+  inlet_codes: [21.0, 22.0, 23.0, 24.0, 25.0, 26.0]
+  stable_inlet_code: 26.0
+
+  physical_bw_code: 45.0
+  chemical_bw_code: 95.0
+
+  # 列名
+  flow_col_template: "C.M.{unit}_FT_JS@out"
+  temp_col: "C.M.RO_TT_ZJS@out"
+  orp_col: "C.M.UF_ORP_ZCS@out"
+  cond_col: "C.M.RO_Cond_ZJS@out"
+
+Params:
+  # 稳定段提取
+  min_stable_points: 40
+  initial_points: 10
+
+  # 阻力趋势计算
+  segment_head_n: 10
+  segment_tail_n: 10
+
+  scale_factor: 1e7
+
+
+Plot:
+  figsize: [12, 6]
+  dpi: 120
+  color_inlet: "#1f77b4"
+  color_bw_phys: "#ff7f0e"
+  color_bw_chem: "#d62728"
+
+Paths:
+  raw_data_path: "E:/Greentech/datasets/raw"
+  output_path: "E:/Greentech/datasets/results"
+  output_format: "csv"