import os from typing import List, Dict import numpy as np import pandas as pd from pathlib import Path from load import UFConfigLoader, UFDataLoader from label import UFEventClassifier, PostBackwashInletMarker from filter import ConstantFlowFilter, EventQualityFilter, InletSegmentFilter,FlowOutlierFilter from calculate import UFResistanceCalculator, PumpPowerCalculator, VariableStabilityAnalyzer, UFResistanceAnalyzer, UFPressureAnalyzer from fit import ChemicalCycleSegmenter, ChemicalBackwashCleaner, ShortTermCycleFoulingFitter,LongTermFoulingFitter class UFAnalysisPipeline: """ Pipeline: - 支持四个机组逐机组标记事件并分段 - 过滤与稳定段提取 - 每段逐行计算膜阻力,并对多变量做稳定性分析 - 仅在 CEB 前后最近段均为有效稳定进水段时计算去除阻力 - 生成每段汇总表并保存 """ def __init__(self, cfg: 'UFConfigLoader'): self.cfg = cfg uf_cfg = cfg.uf params = cfg.params paths = cfg.paths # 机组列表(优先从配置读取) self.units = uf_cfg.get("units", ["UF1", "UF2", "UF3", "UF4"]) self.stable_codes = uf_cfg.get("stable_codes", "[24.0, 26.0]") project_root = Path(paths["project_root"]) raw_data_path = project_root / paths["raw_data_path"] output_path = project_root / paths["output_path"] filter_output_path = project_root / paths["filter_output_path"] self.loader = UFDataLoader(raw_data_path) self.output_dir = output_path self.filter_output_dir = filter_output_path output_path.mkdir(parents=True, exist_ok=True) # 列名配置 column_formats = uf_cfg.get("column_formats", {}) self.ctrl_format = column_formats.get("ctrl_col", "C.M.{unit}_DB@word_control") self.flow_format = column_formats.get("flow_col", "C.M.{unit}_FT_JS@out") self.tmp_format = column_formats.get("tmp_col", "C.M.{unit}_DB@press_PV") self.BWB_POWER_format = column_formats.get("BWB_POWER_col", "ns=3;s=ZZ_{unit}#UFBWB_POWER") self.GSB_POWER_col = column_formats.get("GSB_POWER_col", "ns=3;s=ZZ_UFGSB_POWER") self.temp_col = column_formats.get("temp_col", "C.M.RO_TT_ZJS@out") self.orp_col = column_formats.get("orp_col", "C.M.UF_ORP_ZCS@out") self.NaClO_col = column_formats.get("NaClO_col", "ns=3;s=CN_LEVEL_O") self.HCl_col = column_formats.get("HCl_col", "ns=3;s=S_LEVEL_O") self.NaOH_col = column_formats.get("NaOH_col", "ns=3;s=J_LEVEL_O") # 过滤器 self.min_points = params.get("min_points", 20) self.initial_points = params.get("initial_points", 10) self.quality_filter = EventQualityFilter(min_points=self.min_points) self.flow_filter = FlowOutlierFilter(n_sigma=3) self.initial_label = PostBackwashInletMarker(n_points=self.initial_points) # 阻力计算器 self.res_calc = UFResistanceCalculator(self.units, area_m2=uf_cfg["area_m2"], scale_factor=params.get("scale_factor", 1e10)) self.segment_head_n = params.get("segment_head_n", 10) self.segment_tail_n = params.get("segment_tail_n", 10) # 功率计算器 self.power_calc = PumpPowerCalculator(event_col=column_formats.get("event_col", "event_type")) # ---------------------------- # 加载所有 CSV 并合并 # ---------------------------- def load_all(self) -> pd.DataFrame: print(f"[INFO] Loading raw data from: {self.loader.data_path}") df = self.loader.load_all_csv() if df.empty: print("[WARN] No CSV files found!") return df # ---------------------------- # 多变量稳定性检查(针对单个 segment) # 返回每个变量的稳定性 bool 与总体稳定 bool(所有变量均稳定) # ---------------------------- def analyze_variables_stability(self, seg: pd.DataFrame, flow_col: str) -> Dict[str, bool]: cols = { "flow": flow_col, "temp": self.temp_col, "orp": self.orp_col, } results = {} for name, col in cols.items(): if col not in seg.columns: results[name] = False continue series = seg[col].dropna() if len(series) < 2: results[name] = False continue std = float(series.std()) x = np.arange(len(series)) try: slope = float(np.polyfit(x, series.values, 1)[0]) except Exception: slope = np.inf results[name] = (std <= self.var_max_std) and (abs(slope) <= self.var_max_slope) results["all_stable"] = all(results[k] for k in ["flow", "temp", "orp", "cond"]) return results # ---------------------------- # 从 stable_segments 列表计算每段的 R_start/R_end(按机组) # 返回 seg_summary DataFrame(每行对应一个 stable inlet segment) # ---------------------------- def summarize_R_for_unit(self, stable_segments: List[pd.DataFrame], unit: str) -> pd.DataFrame: rows = [] if unit in ["1", "2", "3", "4"] or unit in [1, 2, 3, 4]: # 处理字符串和数字 res_col = f"UF{unit}_R" else: res_col = f"{unit}_R" for seg in stable_segments: seg = seg.sort_values("time").reset_index(drop=True) if len(seg) < (self.segment_head_n + self.segment_tail_n): # 跳过过短的 continue R_start = seg[res_col].iloc[:self.segment_head_n].median() R_end = seg[res_col].iloc[-self.segment_tail_n:].median() rows.append({ "unit": unit, "segment_id": int(seg["segment_id"].iloc[0]), "start_time": seg["time"].iloc[0], "end_time": seg["time"].iloc[-1], "R_start": float(R_start), "R_end": float(R_end), "n_points": len(seg) }) return pd.DataFrame(rows) # ---------------------------- # 主运行逻辑 # ---------------------------- def run(self): df = self.load_all() if df.empty: raise ValueError("未找到原始数据或数据为空") df = df.sort_values("time").reset_index(drop=True) df = df.ffill().bfill() cols_to_convert = df.columns.difference(["time"]) df[cols_to_convert] = df[cols_to_convert].apply(pd.to_numeric, errors='coerce') df[cols_to_convert] = df[cols_to_convert].ffill().bfill() # 为安全:确保 time 列为 datetime if not np.issubdtype(df["time"].dtype, np.datetime64): df["time"] = pd.to_datetime(df["time"]) # 只保留 2025-06-11 及之后的数据 start_date = pd.Timestamp("2025-06-10") df = df[df["time"] >= start_date].reset_index(drop=True) # 逐机组处理 for unit in self.units: print(f"Processing {unit} ...") ctrl_col = self.ctrl_format.format(unit=unit) flow_col = self.flow_format.format(unit=unit) tmp_col = self.tmp_format.format(unit=unit) BWB_POWER_col = self.BWB_POWER_format.format(unit=unit) # 去除无关列 other_units = [u for u in self.units if u != unit] cols_to_drop = [col for col in df.columns if any(ou in col for ou in other_units)] df_unit = df.drop(columns=cols_to_drop) # 逐机组事件识别:使用 UFEventClassifier(实例化单机组) event_clf = UFEventClassifier(unit, self.cfg.uf["inlet_codes"], self.cfg.uf["physical_bw_code"], self.cfg.uf["chemical_bw_code"], ctrl_col) df_unit = event_clf.classify(df_unit) # 产生 event_type 列 df_unit_mark = self.initial_label.mark(df_unit) # 标记反冲洗事件后的前 N 个进水点 seg_df = event_clf.segment(df_unit_mark) # 根据 event_type 列编号事件段落 # 对 seg_df 进行按 segment 分组后逐段过滤: const_flow_filter = ConstantFlowFilter(flow_col=flow_col, repeat_len=20) segments = const_flow_filter.filter(seg_df) # 去除出现网络错误的进水段 segments = self.quality_filter.filter(segments) # 去除时间过短的进水段 # ----------------------------- # 逐段计算本段的进水泵平均功率与反洗泵功率 # ----------------------------- segments = self.power_calc.calculate_for_segments(segments,inlet_power_col=self.GSB_POWER_col,bw_power_col=BWB_POWER_col) # 提取稳定进水段 stable_extractor = InletSegmentFilter(ctrl_col,stable_codes=self.stable_codes,min_points=self.min_points) stable_segments = stable_extractor.extract(segments) # 提取稳定进水数据 # 若无稳定进水段,则记录不稳定段并跳过该机组 if len(stable_segments) == 0: print(f" No stable segments found for {unit}") continue # ----------------------------- # 逐段计算膜阻力 # ----------------------------- stable_segments = self.res_calc.calculate_for_segments( stable_segments, temp_col=self.temp_col, flow_col=flow_col, tmp_col=tmp_col, ) # ----------------------------- # 逐段计算 起止跨膜压差tmp_start/tmp_end 放缩后的R_start/R_end 进水变量稳定性 # ----------------------------- # 变量稳定性分析 vsa = VariableStabilityAnalyzer() stable_segments = vsa.analyze_segments(stable_segments, col=flow_col, prefix="flow") stable_segments = vsa.analyze_segments(stable_segments, col=self.temp_col, prefix="temp") stable_segments = self.flow_filter.filter_segments(stable_segments,flow_col=flow_col,prefix="flow" ) # 跨膜压差统计 upa = UFPressureAnalyzer( tmp_col=tmp_col, head_n=self.segment_head_n, tail_n=self.segment_tail_n, ) stable_segments = upa.analyze_segments(stable_segments) # 膜阻力统计 res_scaled_col = f"{unit}_R_scaled" ura = UFResistanceAnalyzer( resistance_col=res_scaled_col, head_n=self.segment_head_n, tail_n=self.segment_tail_n ) stable_segments = ura.analyze_segments(stable_segments) # ---------------------------------- # 1. 化学周期划分 # ---------------------------------- cycle_segmenter = ChemicalCycleSegmenter(max_hours = 100) cycles = cycle_segmenter.assign_cycles(df_unit, stable_segments) # ---------------------------------- # 2. 每个周期拟合短期污染 nuK # ---------------------------------- stf = ShortTermCycleFoulingFitter(unit) for cid, cycle in cycles.items(): if not cycle["valid"]: cycle["nuk"] = None cycle["nuk_r2"] = None continue nuk, r2 = stf.fit_cycle(cycle["segments"]) cycle["nuk"] = nuk cycle["nuk_r2"] = r2 for seg in cycle["segments"]: seg["cycle_nuK"] = nuk seg["cycle_nuK_R2"] = r2 # ---------------------------------- # 3. 长期污染模型幂律拟合 # ---------------------------------- ltf = LongTermFoulingFitter(unit) for cid, cycle in cycles.items(): if not cycle["valid"]: cycle["a"] = None cycle["b"] = None cycle["lt_r2"] = None continue a, b, r2 = ltf.fit_cycle(cycle["segments"]) cycle["a"] = a cycle["b"] = b cycle["lt_r2"] = r2 for seg in cycle["segments"]: seg["cycle_long_a"] = a seg["cycle_long_b"] = b seg["cycle_long_r2"] = r2 # ---------------------------------- # 4. 计算化学反冲洗去除膜阻力及耗药量 # ---------------------------------- cbc = ChemicalBackwashCleaner(unit) cycles = cbc.compute_removal(cycles) cycles = cbc.compute_dose(cycles, self.NaClO_col, self.HCl_col, self.NaOH_col) # 回写到稳定段 for cid, cycle in cycles.items(): R_removed = cycle["R_removed"] for seg in cycle["segments"]: seg["cycle_R_removed"] = R_removed # ----------------------------- # 记录所有稳定进水段信息(stable_segments 仅包含稳定段) # ----------------------------- all_inlet_segment_rows = [] for seg in stable_segments: # seg 是一个 DataFrame seg_id = int(seg["segment_id"].iloc[0]) row = { "seg_id": seg_id, "start_time": seg["time"].iloc[0], "end_time": seg["time"].iloc[-1], } # 添加跨膜压差、膜阻力与变量波动性分析列 row.update({ "tmp_start": seg["tmp_start"].iloc[0] if "tmp_start" in seg.columns else None, "tmp_end": seg["tmp_end"].iloc[-1] if "tmp_end" in seg.columns else None, "R_scaled_start": seg["R_scaled_start"].iloc[0] if "R_scaled_start" in seg.columns else None, "R_scaled_end": seg["R_scaled_end"].iloc[-1] if "R_scaled_end" in seg.columns else None, "flow_mean": seg["flow_mean"].iloc[0] if "flow_mean" in seg.columns else None, "flow_std": seg["flow_std"].iloc[0] if "flow_std" in seg.columns else None, "flow_cv": seg["flow_cv"].iloc[0] if "flow_cv" in seg.columns else None, "temp_mean": seg["temp_mean"].iloc[0] if "temp_mean" in seg.columns else None, "temp_std": seg["temp_std"].iloc[0] if "temp_std" in seg.columns else None, "temp_cv": seg["temp_cv"].iloc[0] if "temp_cv" in seg.columns else None, }) # === 将化学周期字段写入 row === chem_cols = [col for col in seg.columns if col.startswith("chem_cycle_")] for col in chem_cols: row[col] = seg[col].iloc[0] # === 将 nuK、长期污染参数、CEB 去除阻力写入 row === cycle_cols = [col for col in seg.columns if col.startswith("cycle_")] for col in cycle_cols: row[col] = seg[col].iloc[0] all_inlet_segment_rows.append(row) # ----------------------------- # 最终构建 DataFrame # ----------------------------- df_segments = pd.DataFrame(all_inlet_segment_rows) # 输出文件名:unit_xxx_segments.csv filename = f"{unit}_segments.csv" output_path = os.path.join(self.output_dir, filename) df_segments.to_csv(output_path, index=False) print(f"Saved inlet segment data to: {output_path}") # ====================================================== # 扩展功能:化学周期过滤与导出(基于 chem_cycle_id) # ====================================================== print(f">>> Post-filtering chemical cycles for unit {unit} ...") # 1. 仅保留有化学周期标识的段 if "chem_cycle_id" not in df_segments.columns: print(f"No chem_cycle_id found for unit {unit}, skipping cycle filtering.") else: df_valid = df_segments.dropna(subset=["chem_cycle_id", "cycle_nuK_R2", "cycle_long_r2"]).copy() df_valid["chem_cycle_id"] = df_valid["chem_cycle_id"].astype(int) # 2. 根据 R2 过滤化学周期 grouped = df_valid.groupby("chem_cycle_id") valid_cycle_ids = [] for cid, g in grouped: # 同一化学周期的 R2 相同(周期级拟合),用第一行即可 nuk_r2 = g["cycle_nuK_R2"].iloc[0] if "cycle_nuK_R2" in g.columns else None long_r2 = g["cycle_long_r2"].iloc[0] if "cycle_long_r2" in g.columns else None if nuk_r2 is not None and long_r2 is not None: if nuk_r2 > 0.4 and long_r2 > 0.4: valid_cycle_ids.append(cid) # 3. 过滤后的化学周期数据 df_cycles_kept = df_valid[df_valid["chem_cycle_id"].isin(valid_cycle_ids)] # 4. 保存过滤结果 cycle_filename = f"{unit}_filtered_cycles.csv" cycle_output_path = os.path.join(self.filter_output_dir, cycle_filename) df_cycles_kept.to_csv(cycle_output_path, index=False) print(f"Saved filtered chemical cycles to: {cycle_output_path}") # 5. 统计保留行占比 total_rows = len(df_valid) kept_rows = len(df_cycles_kept) pct = kept_rows / total_rows * 100 if total_rows > 0 else 0 print( f"Unit {unit}: kept {kept_rows}/{total_rows} rows " f"({pct:.2f} percent) after cycle R2 filtering." )