|
|
@@ -0,0 +1,353 @@
|
|
|
+import os
|
|
|
+from typing import List, Dict
|
|
|
+
|
|
|
+import numpy as np
|
|
|
+import pandas as pd
|
|
|
+
|
|
|
+from .load import UFConfigLoader, UFDataLoader
|
|
|
+from .label import UFEventClassifier, PostBackwashInletMarker
|
|
|
+from .filter import ConstantFlowFilter, EventQualityFilter, InletSegmentFilter
|
|
|
+from .calculate import UFResistanceCalculator, VariableStabilityAnalyzer, UFResistanceAnalyzer, UFCEBMatcher
|
|
|
+from .fit import ChemicalCycleSegmenter, ChemicalBackwashCleaner, ShortTermCycleFoulingFitter,LongTermFoulingFitter
|
|
|
+
|
|
|
+class UFAnalysisPipeline:
|
|
|
+ """
|
|
|
+ Pipeline:
|
|
|
+ - 支持四个机组逐机组标记事件并分段
|
|
|
+ - 过滤与稳定段提取
|
|
|
+ - 每段逐行计算膜阻力,并对多变量做稳定性分析
|
|
|
+ - 仅在 CEB 前后最近段均为有效稳定进水段时计算去除阻力
|
|
|
+ - 生成每段汇总表并保存
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self, cfg: 'UFConfigLoader'):
|
|
|
+ self.cfg = cfg
|
|
|
+ uf_cfg = cfg.uf
|
|
|
+ params = cfg.params
|
|
|
+ paths = cfg.paths
|
|
|
+
|
|
|
+ # 机组列表(优先从配置读取)
|
|
|
+ self.units = uf_cfg.get("units", ["UF1", "UF2", "UF3", "UF4"])
|
|
|
+ self.stable_inlet_code = uf_cfg.get("stable_inlet_code", "26.0")
|
|
|
+ self.loader = UFDataLoader(paths["raw_data_path"])
|
|
|
+ self.output_dir = paths["output_path"]
|
|
|
+
|
|
|
+ # 过滤器
|
|
|
+ self.min_points = params.get("min_points", 40)
|
|
|
+ self.initial_points = params.get("initial_points", 10)
|
|
|
+
|
|
|
+ self.quality_filter = EventQualityFilter(min_points=self.min_points)
|
|
|
+ self.initial_label = PostBackwashInletMarker(n_points=self.initial_points)
|
|
|
+
|
|
|
+ # 阻力计算器
|
|
|
+ self.res_calc = UFResistanceCalculator(self.units, area_m2=uf_cfg["area_m2"], scale_factor=params.get("scale_factor", 1e7))
|
|
|
+ self.segment_head_n = params.get("segment_head_n", 20)
|
|
|
+ self.segment_tail_n = params.get("segment_tail_n", 20)
|
|
|
+
|
|
|
+ # 需要检查的常量列名(除流量外)
|
|
|
+ self.temp_col = uf_cfg.get("temp_col", "C.M.RO_TT_ZJS@out")
|
|
|
+ self.orp_col = uf_cfg.get("orp_col", "C.M.UF_ORP_ZCS@out")
|
|
|
+ self.cond_col = uf_cfg.get("cond_col", "C.M.RO_Cond_ZJS@out")
|
|
|
+
|
|
|
+ # ----------------------------
|
|
|
+ # 加载所有 CSV 并合并
|
|
|
+ # ----------------------------
|
|
|
+ def load_all(self) -> pd.DataFrame:
|
|
|
+ print(f"[INFO] Loading raw data from: {self.loader.data_path}")
|
|
|
+ df = self.loader.load_all_csv()
|
|
|
+
|
|
|
+ if df.empty:
|
|
|
+ print("[WARN] No CSV files found!")
|
|
|
+ return df
|
|
|
+
|
|
|
+ # ----------------------------
|
|
|
+ # 多变量稳定性检查(针对单个 segment)
|
|
|
+ # 返回每个变量的稳定性 bool 与总体稳定 bool(所有变量均稳定)
|
|
|
+ # ----------------------------
|
|
|
+ def analyze_variables_stability(self, seg: pd.DataFrame, flow_col: str) -> Dict[str, bool]:
|
|
|
+ cols = {
|
|
|
+ "flow": flow_col,
|
|
|
+ "temp": self.temp_col,
|
|
|
+ "orp": self.orp_col,
|
|
|
+ "cond": self.cond_col
|
|
|
+ }
|
|
|
+ results = {}
|
|
|
+ for name, col in cols.items():
|
|
|
+ if col not in seg.columns:
|
|
|
+ results[name] = False
|
|
|
+ continue
|
|
|
+ series = seg[col].dropna()
|
|
|
+ if len(series) < 2:
|
|
|
+ results[name] = False
|
|
|
+ continue
|
|
|
+ std = float(series.std())
|
|
|
+ x = np.arange(len(series))
|
|
|
+ try:
|
|
|
+ slope = float(np.polyfit(x, series.values, 1)[0])
|
|
|
+ except Exception:
|
|
|
+ slope = np.inf
|
|
|
+ results[name] = (std <= self.var_max_std) and (abs(slope) <= self.var_max_slope)
|
|
|
+ results["all_stable"] = all(results[k] for k in ["flow", "temp", "orp", "cond"])
|
|
|
+ return results
|
|
|
+
|
|
|
+ # ----------------------------
|
|
|
+ # 从 stable_segments 列表计算每段的 R_start/R_end(按机组)
|
|
|
+ # 返回 seg_summary DataFrame(每行对应一个 stable inlet segment)
|
|
|
+ # ----------------------------
|
|
|
+ def summarize_R_for_unit(self, stable_segments: List[pd.DataFrame], unit: str) -> pd.DataFrame:
|
|
|
+ rows = []
|
|
|
+ res_col = f"{unit}_R"
|
|
|
+ for seg in stable_segments:
|
|
|
+ seg = seg.sort_values("time").reset_index(drop=True)
|
|
|
+ if len(seg) < (self.segment_head_n + self.segment_tail_n):
|
|
|
+ # 跳过过短的
|
|
|
+ continue
|
|
|
+ R_start = seg[res_col].iloc[:self.segment_head_n].median()
|
|
|
+ R_end = seg[res_col].iloc[-self.segment_tail_n:].median()
|
|
|
+ rows.append({
|
|
|
+ "unit": unit,
|
|
|
+ "segment_id": int(seg["segment_id"].iloc[0]),
|
|
|
+ "start_time": seg["time"].iloc[0],
|
|
|
+ "end_time": seg["time"].iloc[-1],
|
|
|
+ "R_start": float(R_start),
|
|
|
+ "R_end": float(R_end),
|
|
|
+ "n_points": len(seg)
|
|
|
+ })
|
|
|
+ return pd.DataFrame(rows)
|
|
|
+
|
|
|
+ # ----------------------------
|
|
|
+ # 主运行逻辑
|
|
|
+ # ----------------------------
|
|
|
+ def run(self):
|
|
|
+ df = self.load_all()
|
|
|
+ if df.empty:
|
|
|
+ raise ValueError("未找到原始数据或数据为空")
|
|
|
+
|
|
|
+ df = df.sort_values("time").reset_index(drop=True)
|
|
|
+ df = df.ffill().bfill()
|
|
|
+ cols_to_convert = df.columns.difference(["time"])
|
|
|
+ df[cols_to_convert] = df[cols_to_convert].apply(pd.to_numeric, errors='coerce')
|
|
|
+ df[cols_to_convert] = df[cols_to_convert].ffill().bfill()
|
|
|
+
|
|
|
+ # 为安全:确保 time 列为 datetime
|
|
|
+ if not np.issubdtype(df["time"].dtype, np.datetime64):
|
|
|
+ df["time"] = pd.to_datetime(df["time"])
|
|
|
+
|
|
|
+ # 逐机组处理
|
|
|
+ for unit in self.units:
|
|
|
+ print(f"Processing {unit} ...")
|
|
|
+ ctrl_col = f"C.M.{unit}_DB@word_control"
|
|
|
+ flow_col = f"C.M.{unit}_FT_JS@out"
|
|
|
+
|
|
|
+ # 去除无关列
|
|
|
+ other_units = [u for u in self.units if u != unit]
|
|
|
+ cols_to_drop = [col for col in df.columns if any(ou in col for ou in other_units)]
|
|
|
+ df_unit = df.drop(columns=cols_to_drop)
|
|
|
+
|
|
|
+ # 逐机组事件识别:使用 UFEventClassifier(实例化单机组)
|
|
|
+ event_clf = UFEventClassifier(unit, self.cfg.uf["inlet_codes"],
|
|
|
+ self.cfg.uf["physical_bw_code"], self.cfg.uf["chemical_bw_code"])
|
|
|
+ df_unit = event_clf.classify(df_unit) # 产生 event_type 列
|
|
|
+ df_unit_mark = self.initial_label.mark(df_unit)
|
|
|
+ seg_df = event_clf.segment(df_unit_mark)
|
|
|
+
|
|
|
+ # 对 seg_df 进行按 segment 分组后逐段过滤:
|
|
|
+ const_flow_filter = ConstantFlowFilter(flow_col=flow_col, repeat_len=20)
|
|
|
+ segments = const_flow_filter.filter(seg_df) # 去除出现网络错误的进水段
|
|
|
+ segments = self.quality_filter.filter(segments) # 去除时间过短的进水段
|
|
|
+ stable_extractor = InletSegmentFilter(ctrl_col,
|
|
|
+ stable_value=self.stable_inlet_code,
|
|
|
+ min_points=self.min_points)
|
|
|
+
|
|
|
+ stable_segments = stable_extractor.extract(segments) # 提取稳定进水数据
|
|
|
+
|
|
|
+ # 若无稳定进水段,则记录不稳定段并跳过该机组
|
|
|
+ if len(stable_segments) == 0:
|
|
|
+ print(f" No stable segments found for {unit}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # -----------------------------
|
|
|
+ # 逐段计算膜阻力
|
|
|
+ # -----------------------------
|
|
|
+ stable_segments = self.res_calc.calculate_for_segments(
|
|
|
+ stable_segments,
|
|
|
+ temp_col=self.temp_col,
|
|
|
+ flow_col=flow_col
|
|
|
+ )
|
|
|
+
|
|
|
+ # -----------------------------
|
|
|
+ # 逐段计算 放缩后的R_start / R_end 与进水变量稳定性
|
|
|
+ # -----------------------------
|
|
|
+
|
|
|
+ # 变量稳定性分析
|
|
|
+ vsa = VariableStabilityAnalyzer()
|
|
|
+ stable_segments = vsa.analyze_segments(stable_segments, col=flow_col, prefix="flow")
|
|
|
+ stable_segments = vsa.analyze_segments(stable_segments, col=self.temp_col, prefix="temp")
|
|
|
+
|
|
|
+ # 膜阻力统计
|
|
|
+ res_scaled_col = f"{unit}_R_scaled"
|
|
|
+ ura = UFResistanceAnalyzer(
|
|
|
+ resistance_col=res_scaled_col,
|
|
|
+ head_n=self.segment_head_n,
|
|
|
+ tail_n=self.segment_tail_n
|
|
|
+ )
|
|
|
+ stable_segments = ura.analyze_segments(stable_segments)
|
|
|
+
|
|
|
+ # ----------------------------------
|
|
|
+ # 1. 化学周期划分
|
|
|
+ # ----------------------------------
|
|
|
+ cycle_segmenter = ChemicalCycleSegmenter(max_hours=60)
|
|
|
+ cycles = cycle_segmenter.assign_cycles(df_unit, stable_segments)
|
|
|
+
|
|
|
+ # ----------------------------------
|
|
|
+ # 2. 每个周期拟合短期污染 nuK
|
|
|
+ # ----------------------------------
|
|
|
+ stf = ShortTermCycleFoulingFitter(unit)
|
|
|
+ for cid, cycle in cycles.items():
|
|
|
+ if not cycle["valid"]:
|
|
|
+ cycle["nuk"] = None
|
|
|
+ cycle["nuk_r2"] = None
|
|
|
+ continue
|
|
|
+
|
|
|
+ nuk, r2 = stf.fit_cycle(cycle["segments"])
|
|
|
+ cycle["nuk"] = nuk
|
|
|
+ cycle["nuk_r2"] = r2
|
|
|
+
|
|
|
+ for seg in cycle["segments"]:
|
|
|
+ seg["cycle_nuK"] = nuk
|
|
|
+ seg["cycle_nuK_R2"] = r2
|
|
|
+
|
|
|
+ # ----------------------------------
|
|
|
+ # 3. 长期污染模型幂律拟合
|
|
|
+ # ----------------------------------
|
|
|
+ ltf = LongTermFoulingFitter(unit)
|
|
|
+ for cid, cycle in cycles.items():
|
|
|
+ if not cycle["valid"]:
|
|
|
+ cycle["a"] = None
|
|
|
+ cycle["b"] = None
|
|
|
+ cycle["lt_r2"] = None
|
|
|
+ continue
|
|
|
+
|
|
|
+ a, b, r2 = ltf.fit_cycle(cycle["segments"])
|
|
|
+ cycle["a"] = a
|
|
|
+ cycle["b"] = b
|
|
|
+ cycle["lt_r2"] = r2
|
|
|
+
|
|
|
+ for seg in cycle["segments"]:
|
|
|
+ seg["cycle_long_a"] = a
|
|
|
+ seg["cycle_long_b"] = b
|
|
|
+ seg["cycle_long_r2"] = r2
|
|
|
+
|
|
|
+ # ----------------------------------
|
|
|
+ # 4. 计算化学反冲洗去除膜阻力
|
|
|
+ # ----------------------------------
|
|
|
+ cbc = ChemicalBackwashCleaner(unit)
|
|
|
+ cycles = cbc.compute_removal(cycles)
|
|
|
+
|
|
|
+ # 回写到稳定段
|
|
|
+ for cid, cycle in cycles.items():
|
|
|
+ R_removed = cycle["R_removed"]
|
|
|
+ for seg in cycle["segments"]:
|
|
|
+ seg["cycle_R_removed"] = R_removed
|
|
|
+
|
|
|
+ # -----------------------------
|
|
|
+ # 记录所有稳定进水段信息(stable_segments 仅包含稳定段)
|
|
|
+ # -----------------------------
|
|
|
+ all_inlet_segment_rows = []
|
|
|
+
|
|
|
+ for seg in stable_segments: # seg 是一个 DataFrame
|
|
|
+ seg_id = int(seg["segment_id"].iloc[0])
|
|
|
+
|
|
|
+ row = {
|
|
|
+ "seg_id": seg_id,
|
|
|
+ "start_time": seg["time"].iloc[0],
|
|
|
+ "end_time": seg["time"].iloc[-1],
|
|
|
+ }
|
|
|
+
|
|
|
+ # 添加膜阻力与变量波动性分析列
|
|
|
+ row.update({
|
|
|
+ "R_scaled_start": seg["R_scaled_start"].iloc[0] if "R_scaled_start" in seg.columns else None,
|
|
|
+ "R_scaled_end": seg["R_scaled_end"].iloc[-1] if "R_scaled_end" in seg.columns else None,
|
|
|
+
|
|
|
+ "flow_mean": seg["flow_mean"].iloc[0] if "flow_mean" in seg.columns else None,
|
|
|
+ "flow_std": seg["flow_std"].iloc[0] if "flow_std" in seg.columns else None,
|
|
|
+ "flow_cv": seg["flow_cv"].iloc[0] if "flow_cv" in seg.columns else None,
|
|
|
+
|
|
|
+ "temp_mean": seg["temp_mean"].iloc[0] if "temp_mean" in seg.columns else None,
|
|
|
+ "temp_std": seg["temp_std"].iloc[0] if "temp_std" in seg.columns else None,
|
|
|
+ "temp_cv": seg["temp_cv"].iloc[0] if "temp_cv" in seg.columns else None,
|
|
|
+ })
|
|
|
+
|
|
|
+ # === 将化学周期字段写入 row ===
|
|
|
+ chem_cols = [col for col in seg.columns if col.startswith("chem_cycle_")]
|
|
|
+ for col in chem_cols:
|
|
|
+ row[col] = seg[col].iloc[0]
|
|
|
+
|
|
|
+ # === 将 nuK、长期污染参数、CEB 去除阻力写入 row ===
|
|
|
+ cycle_cols = [col for col in seg.columns if col.startswith("cycle_")]
|
|
|
+ for col in cycle_cols:
|
|
|
+ row[col] = seg[col].iloc[0]
|
|
|
+
|
|
|
+ all_inlet_segment_rows.append(row)
|
|
|
+
|
|
|
+ # -----------------------------
|
|
|
+ # 最终构建 DataFrame
|
|
|
+ # -----------------------------
|
|
|
+ df_segments = pd.DataFrame(all_inlet_segment_rows)
|
|
|
+
|
|
|
+ # 输出文件名:unit_xxx_segments.csv
|
|
|
+ filename = f"{unit}_segments.csv"
|
|
|
+ output_path = os.path.join(self.output_dir, filename)
|
|
|
+
|
|
|
+ df_segments.to_csv(output_path, index=False)
|
|
|
+ print(f"Saved inlet segment data to: {output_path}")
|
|
|
+
|
|
|
+ # ======================================================
|
|
|
+ # 扩展功能:化学周期过滤与导出(基于 chem_cycle_id)
|
|
|
+ # ======================================================
|
|
|
+
|
|
|
+ print(f">>> Post-filtering chemical cycles for unit {unit} ...")
|
|
|
+
|
|
|
+ # 1. 仅保留有化学周期标识的段
|
|
|
+ if "chem_cycle_id" not in df_segments.columns:
|
|
|
+ print(f"No chem_cycle_id found for unit {unit}, skipping cycle filtering.")
|
|
|
+ else:
|
|
|
+ df_valid = df_segments.dropna(subset=["chem_cycle_id"]).copy()
|
|
|
+ df_valid["chem_cycle_id"] = df_valid["chem_cycle_id"].astype(int)
|
|
|
+
|
|
|
+ # 2. 根据 R2 过滤化学周期
|
|
|
+ grouped = df_valid.groupby("chem_cycle_id")
|
|
|
+
|
|
|
+ valid_cycle_ids = []
|
|
|
+ for cid, g in grouped:
|
|
|
+ # 同一化学周期的 R2 相同(周期级拟合),用第一行即可
|
|
|
+ nuk_r2 = g["cycle_nuK_R2"].iloc[0] if "cycle_nuK_R2" in g.columns else None
|
|
|
+ long_r2 = g["cycle_long_r2"].iloc[0] if "cycle_long_r2" in g.columns else None
|
|
|
+
|
|
|
+ if nuk_r2 is not None and long_r2 is not None:
|
|
|
+ if nuk_r2 > 0.5 and long_r2 > 0.5:
|
|
|
+ valid_cycle_ids.append(cid)
|
|
|
+
|
|
|
+ # 3. 过滤后的化学周期数据
|
|
|
+ df_cycles_kept = df_valid[df_valid["chem_cycle_id"].isin(valid_cycle_ids)]
|
|
|
+
|
|
|
+ # 4. 保存过滤结果
|
|
|
+ cycle_filename = f"{unit}_filtered_cycles.csv"
|
|
|
+ cycle_output_path = os.path.join(self.output_dir, cycle_filename)
|
|
|
+
|
|
|
+ df_cycles_kept.to_csv(cycle_output_path, index=False)
|
|
|
+ print(f"Saved filtered chemical cycles to: {cycle_output_path}")
|
|
|
+
|
|
|
+ # 5. 统计保留行占比
|
|
|
+ total_rows = len(df_valid)
|
|
|
+ kept_rows = len(df_cycles_kept)
|
|
|
+ pct = kept_rows / total_rows * 100 if total_rows > 0 else 0
|
|
|
+
|
|
|
+ print(
|
|
|
+ f"Unit {unit}: kept {kept_rows}/{total_rows} rows "
|
|
|
+ f"({pct:.2f} percent) after cycle R2 filtering."
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|