| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374 |
- import os
- from typing import List, Dict
- import numpy as np
- import pandas as pd
- from pathlib import Path
- from load import UFConfigLoader, UFDataLoader
- from label import UFEventClassifier, PostBackwashInletMarker
- from filter import ConstantFlowFilter, EventQualityFilter, InletSegmentFilter
- from calculate import UFResistanceCalculator, VariableStabilityAnalyzer, UFResistanceAnalyzer, UFPressureAnalyzer
- from fit import ChemicalCycleSegmenter, ChemicalBackwashCleaner, ShortTermCycleFoulingFitter,LongTermFoulingFitter
- class UFAnalysisPipeline:
- """
- Pipeline:
- - 支持四个机组逐机组标记事件并分段
- - 过滤与稳定段提取
- - 每段逐行计算膜阻力,并对多变量做稳定性分析
- - 仅在 CEB 前后最近段均为有效稳定进水段时计算去除阻力
- - 生成每段汇总表并保存
- """
- def __init__(self, cfg: 'UFConfigLoader'):
- self.cfg = cfg
- uf_cfg = cfg.uf
- params = cfg.params
- paths = cfg.paths
- # 机组列表(优先从配置读取)
- self.units = uf_cfg.get("units", ["UF1", "UF2", "UF3", "UF4"])
- self.stable_inlet_code = uf_cfg.get("stable_inlet_code", "26.0")
- project_root = Path(paths["project_root"])
- raw_data_path = project_root / paths["raw_data_path"]
- output_path = project_root / paths["output_path"]
- filter_output_path = project_root / paths["filter_output_path"]
- self.loader = UFDataLoader(raw_data_path)
- self.output_dir = output_path
- self.filter_output_dir = filter_output_path
- output_path.mkdir(parents=True, exist_ok=True)
- # 过滤器
- self.min_points = params.get("min_points", 40)
- self.initial_points = params.get("initial_points", 10)
- self.quality_filter = EventQualityFilter(min_points=self.min_points)
- self.initial_label = PostBackwashInletMarker(n_points=self.initial_points)
- # 阻力计算器
- self.res_calc = UFResistanceCalculator(self.units, area_m2=uf_cfg["area_m2"], scale_factor=params.get("scale_factor", 1e10))
- self.segment_head_n = params.get("segment_head_n", 10)
- self.segment_tail_n = params.get("segment_tail_n", 10)
- # 需要检查的常量列名(除流量外)
- self.temp_col = uf_cfg.get("temp_col", "C.M.RO_TT_ZJS@out")
- self.orp_col = uf_cfg.get("orp_col", "C.M.UF_ORP_ZCS@out")
- self.cond_col = uf_cfg.get("cond_col", "C.M.RO_Cond_ZJS@out")
- # ----------------------------
- # 加载所有 CSV 并合并
- # ----------------------------
- def load_all(self) -> pd.DataFrame:
- print(f"[INFO] Loading raw data from: {self.loader.data_path}")
- df = self.loader.load_all_csv()
- if df.empty:
- print("[WARN] No CSV files found!")
- return df
- # ----------------------------
- # 多变量稳定性检查(针对单个 segment)
- # 返回每个变量的稳定性 bool 与总体稳定 bool(所有变量均稳定)
- # ----------------------------
- def analyze_variables_stability(self, seg: pd.DataFrame, flow_col: str) -> Dict[str, bool]:
- cols = {
- "flow": flow_col,
- "temp": self.temp_col,
- "orp": self.orp_col,
- "cond": self.cond_col
- }
- results = {}
- for name, col in cols.items():
- if col not in seg.columns:
- results[name] = False
- continue
- series = seg[col].dropna()
- if len(series) < 2:
- results[name] = False
- continue
- std = float(series.std())
- x = np.arange(len(series))
- try:
- slope = float(np.polyfit(x, series.values, 1)[0])
- except Exception:
- slope = np.inf
- results[name] = (std <= self.var_max_std) and (abs(slope) <= self.var_max_slope)
- results["all_stable"] = all(results[k] for k in ["flow", "temp", "orp", "cond"])
- return results
- # ----------------------------
- # 从 stable_segments 列表计算每段的 R_start/R_end(按机组)
- # 返回 seg_summary DataFrame(每行对应一个 stable inlet segment)
- # ----------------------------
- def summarize_R_for_unit(self, stable_segments: List[pd.DataFrame], unit: str) -> pd.DataFrame:
- rows = []
- res_col = f"{unit}_R"
- for seg in stable_segments:
- seg = seg.sort_values("time").reset_index(drop=True)
- if len(seg) < (self.segment_head_n + self.segment_tail_n):
- # 跳过过短的
- continue
- R_start = seg[res_col].iloc[:self.segment_head_n].median()
- R_end = seg[res_col].iloc[-self.segment_tail_n:].median()
- rows.append({
- "unit": unit,
- "segment_id": int(seg["segment_id"].iloc[0]),
- "start_time": seg["time"].iloc[0],
- "end_time": seg["time"].iloc[-1],
- "R_start": float(R_start),
- "R_end": float(R_end),
- "n_points": len(seg)
- })
- return pd.DataFrame(rows)
- # ----------------------------
- # 主运行逻辑
- # ----------------------------
- def run(self):
- df = self.load_all()
- if df.empty:
- raise ValueError("未找到原始数据或数据为空")
- df = df.sort_values("time").reset_index(drop=True)
- df = df.ffill().bfill()
- cols_to_convert = df.columns.difference(["time"])
- df[cols_to_convert] = df[cols_to_convert].apply(pd.to_numeric, errors='coerce')
- df[cols_to_convert] = df[cols_to_convert].ffill().bfill()
- # 为安全:确保 time 列为 datetime
- if not np.issubdtype(df["time"].dtype, np.datetime64):
- df["time"] = pd.to_datetime(df["time"])
- # 逐机组处理
- for unit in self.units:
- print(f"Processing {unit} ...")
- ctrl_col = f"C.M.{unit}_DB@word_control"
- flow_col = f"C.M.{unit}_FT_JS@out"
- # 去除无关列
- other_units = [u for u in self.units if u != unit]
- cols_to_drop = [col for col in df.columns if any(ou in col for ou in other_units)]
- df_unit = df.drop(columns=cols_to_drop)
- # 逐机组事件识别:使用 UFEventClassifier(实例化单机组)
- event_clf = UFEventClassifier(unit, self.cfg.uf["inlet_codes"],
- self.cfg.uf["physical_bw_code"], self.cfg.uf["chemical_bw_code"])
- df_unit = event_clf.classify(df_unit) # 产生 event_type 列
- df_unit_mark = self.initial_label.mark(df_unit)
- seg_df = event_clf.segment(df_unit_mark)
- # 对 seg_df 进行按 segment 分组后逐段过滤:
- const_flow_filter = ConstantFlowFilter(flow_col=flow_col, repeat_len=20)
- segments = const_flow_filter.filter(seg_df) # 去除出现网络错误的进水段
- segments = self.quality_filter.filter(segments) # 去除时间过短的进水段
- stable_extractor = InletSegmentFilter(ctrl_col,
- stable_value=self.stable_inlet_code,
- min_points=self.min_points)
- stable_segments = stable_extractor.extract(segments) # 提取稳定进水数据
- # 若无稳定进水段,则记录不稳定段并跳过该机组
- if len(stable_segments) == 0:
- print(f" No stable segments found for {unit}")
- continue
- # -----------------------------
- # 逐段计算膜阻力
- # -----------------------------
- stable_segments = self.res_calc.calculate_for_segments(
- stable_segments,
- temp_col=self.temp_col,
- flow_col=flow_col
- )
- # -----------------------------
- # 逐段计算 起止跨膜压差tmp_start/tmp_end 放缩后的R_start/R_end 进水变量稳定性
- # -----------------------------
- # 变量稳定性分析
- vsa = VariableStabilityAnalyzer()
- stable_segments = vsa.analyze_segments(stable_segments, col=flow_col, prefix="flow")
- stable_segments = vsa.analyze_segments(stable_segments, col=self.temp_col, prefix="temp")
- # 跨膜压差统计
- press_col = f"C.M.{unit}_DB@press_PV"
- upa = UFPressureAnalyzer(
- press_col=press_col,
- head_n=self.segment_head_n,
- tail_n=self.segment_tail_n,
- )
- stable_segments = upa.analyze_segments(stable_segments)
- # 膜阻力统计
- res_scaled_col = f"{unit}_R_scaled"
- ura = UFResistanceAnalyzer(
- resistance_col=res_scaled_col,
- head_n=self.segment_head_n,
- tail_n=self.segment_tail_n
- )
- stable_segments = ura.analyze_segments(stable_segments)
- # ----------------------------------
- # 1. 化学周期划分
- # ----------------------------------
- cycle_segmenter = ChemicalCycleSegmenter(max_hours=60)
- cycles = cycle_segmenter.assign_cycles(df_unit, stable_segments)
- # ----------------------------------
- # 2. 每个周期拟合短期污染 nuK
- # ----------------------------------
- stf = ShortTermCycleFoulingFitter(unit)
- for cid, cycle in cycles.items():
- if not cycle["valid"]:
- cycle["nuk"] = None
- cycle["nuk_r2"] = None
- continue
- nuk, r2 = stf.fit_cycle(cycle["segments"])
- cycle["nuk"] = nuk
- cycle["nuk_r2"] = r2
- for seg in cycle["segments"]:
- seg["cycle_nuK"] = nuk
- seg["cycle_nuK_R2"] = r2
- # ----------------------------------
- # 3. 长期污染模型幂律拟合
- # ----------------------------------
- ltf = LongTermFoulingFitter(unit)
- for cid, cycle in cycles.items():
- if not cycle["valid"]:
- cycle["a"] = None
- cycle["b"] = None
- cycle["lt_r2"] = None
- continue
- a, b, r2 = ltf.fit_cycle(cycle["segments"])
- cycle["a"] = a
- cycle["b"] = b
- cycle["lt_r2"] = r2
- for seg in cycle["segments"]:
- seg["cycle_long_a"] = a
- seg["cycle_long_b"] = b
- seg["cycle_long_r2"] = r2
- # ----------------------------------
- # 4. 计算化学反冲洗去除膜阻力
- # ----------------------------------
- cbc = ChemicalBackwashCleaner(unit)
- cycles = cbc.compute_removal(cycles)
- # 回写到稳定段
- for cid, cycle in cycles.items():
- R_removed = cycle["R_removed"]
- for seg in cycle["segments"]:
- seg["cycle_R_removed"] = R_removed
- # -----------------------------
- # 记录所有稳定进水段信息(stable_segments 仅包含稳定段)
- # -----------------------------
- all_inlet_segment_rows = []
- for seg in stable_segments: # seg 是一个 DataFrame
- seg_id = int(seg["segment_id"].iloc[0])
- row = {
- "seg_id": seg_id,
- "start_time": seg["time"].iloc[0],
- "end_time": seg["time"].iloc[-1],
- }
- # 添加跨膜压差、膜阻力与变量波动性分析列
- row.update({
- "tmp_start": seg["tmp_start"].iloc[0] if "tmp_start" in seg.columns else None,
- "tmp_end": seg["tmp_end"].iloc[-1] if "tmp_end" in seg.columns else None,
- "R_scaled_start": seg["R_scaled_start"].iloc[0] if "R_scaled_start" in seg.columns else None,
- "R_scaled_end": seg["R_scaled_end"].iloc[-1] if "R_scaled_end" in seg.columns else None,
- "flow_mean": seg["flow_mean"].iloc[0] if "flow_mean" in seg.columns else None,
- "flow_std": seg["flow_std"].iloc[0] if "flow_std" in seg.columns else None,
- "flow_cv": seg["flow_cv"].iloc[0] if "flow_cv" in seg.columns else None,
- "temp_mean": seg["temp_mean"].iloc[0] if "temp_mean" in seg.columns else None,
- "temp_std": seg["temp_std"].iloc[0] if "temp_std" in seg.columns else None,
- "temp_cv": seg["temp_cv"].iloc[0] if "temp_cv" in seg.columns else None,
- })
- # === 将化学周期字段写入 row ===
- chem_cols = [col for col in seg.columns if col.startswith("chem_cycle_")]
- for col in chem_cols:
- row[col] = seg[col].iloc[0]
- # === 将 nuK、长期污染参数、CEB 去除阻力写入 row ===
- cycle_cols = [col for col in seg.columns if col.startswith("cycle_")]
- for col in cycle_cols:
- row[col] = seg[col].iloc[0]
- all_inlet_segment_rows.append(row)
- # -----------------------------
- # 最终构建 DataFrame
- # -----------------------------
- df_segments = pd.DataFrame(all_inlet_segment_rows)
- # 输出文件名:unit_xxx_segments.csv
- filename = f"{unit}_segments.csv"
- output_path = os.path.join(self.output_dir, filename)
- df_segments.to_csv(output_path, index=False)
- print(f"Saved inlet segment data to: {output_path}")
- # ======================================================
- # 扩展功能:化学周期过滤与导出(基于 chem_cycle_id)
- # ======================================================
- print(f">>> Post-filtering chemical cycles for unit {unit} ...")
- # 1. 仅保留有化学周期标识的段
- if "chem_cycle_id" not in df_segments.columns:
- print(f"No chem_cycle_id found for unit {unit}, skipping cycle filtering.")
- else:
- df_valid = df_segments.dropna(subset=["chem_cycle_id"]).copy()
- df_valid["chem_cycle_id"] = df_valid["chem_cycle_id"].astype(int)
- # 2. 根据 R2 过滤化学周期
- grouped = df_valid.groupby("chem_cycle_id")
- valid_cycle_ids = []
- for cid, g in grouped:
- # 同一化学周期的 R2 相同(周期级拟合),用第一行即可
- nuk_r2 = g["cycle_nuK_R2"].iloc[0] if "cycle_nuK_R2" in g.columns else None
- long_r2 = g["cycle_long_r2"].iloc[0] if "cycle_long_r2" in g.columns else None
- if nuk_r2 is not None and long_r2 is not None:
- if nuk_r2 > 0.5 and long_r2 > 0.5:
- valid_cycle_ids.append(cid)
- # 3. 过滤后的化学周期数据
- df_cycles_kept = df_valid[df_valid["chem_cycle_id"].isin(valid_cycle_ids)]
- # 4. 保存过滤结果
- cycle_filename = f"{unit}_filtered_cycles.csv"
- cycle_output_path = os.path.join(self.filter_output_dir, cycle_filename)
- df_cycles_kept.to_csv(cycle_output_path, index=False)
- print(f"Saved filtered chemical cycles to: {cycle_output_path}")
- # 5. 统计保留行占比
- total_rows = len(df_valid)
- kept_rows = len(df_cycles_kept)
- pct = kept_rows / total_rows * 100 if total_rows > 0 else 0
- print(
- f"Unit {unit}: kept {kept_rows}/{total_rows} rows "
- f"({pct:.2f} percent) after cycle R2 filtering."
- )
|