pipeline.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. import os
  2. from typing import List, Dict
  3. import numpy as np
  4. import pandas as pd
  5. from .load import UFConfigLoader, UFDataLoader
  6. from .label import UFEventClassifier, PostBackwashInletMarker
  7. from .filter import ConstantFlowFilter, EventQualityFilter, InletSegmentFilter
  8. from .calculate import UFResistanceCalculator, VariableStabilityAnalyzer, UFResistanceAnalyzer, UFCEBMatcher
  9. from .fit import ChemicalCycleSegmenter, ChemicalBackwashCleaner, ShortTermCycleFoulingFitter,LongTermFoulingFitter
  10. class UFAnalysisPipeline:
  11. """
  12. Pipeline:
  13. - 支持四个机组逐机组标记事件并分段
  14. - 过滤与稳定段提取
  15. - 每段逐行计算膜阻力,并对多变量做稳定性分析
  16. - 仅在 CEB 前后最近段均为有效稳定进水段时计算去除阻力
  17. - 生成每段汇总表并保存
  18. """
  19. def __init__(self, cfg: 'UFConfigLoader'):
  20. self.cfg = cfg
  21. uf_cfg = cfg.uf
  22. params = cfg.params
  23. paths = cfg.paths
  24. # 机组列表(优先从配置读取)
  25. self.units = uf_cfg.get("units", ["UF1", "UF2", "UF3", "UF4"])
  26. self.stable_inlet_code = uf_cfg.get("stable_inlet_code", "26.0")
  27. self.loader = UFDataLoader(paths["raw_data_path"])
  28. self.output_dir = paths["output_path"]
  29. # 过滤器
  30. self.min_points = params.get("min_points", 40)
  31. self.initial_points = params.get("initial_points", 10)
  32. self.quality_filter = EventQualityFilter(min_points=self.min_points)
  33. self.initial_label = PostBackwashInletMarker(n_points=self.initial_points)
  34. # 阻力计算器
  35. self.res_calc = UFResistanceCalculator(self.units, area_m2=uf_cfg["area_m2"], scale_factor=params.get("scale_factor", 1e7))
  36. self.segment_head_n = params.get("segment_head_n", 20)
  37. self.segment_tail_n = params.get("segment_tail_n", 20)
  38. # 需要检查的常量列名(除流量外)
  39. self.temp_col = uf_cfg.get("temp_col", "C.M.RO_TT_ZJS@out")
  40. self.orp_col = uf_cfg.get("orp_col", "C.M.UF_ORP_ZCS@out")
  41. self.cond_col = uf_cfg.get("cond_col", "C.M.RO_Cond_ZJS@out")
  42. # ----------------------------
  43. # 加载所有 CSV 并合并
  44. # ----------------------------
  45. def load_all(self) -> pd.DataFrame:
  46. print(f"[INFO] Loading raw data from: {self.loader.data_path}")
  47. df = self.loader.load_all_csv()
  48. if df.empty:
  49. print("[WARN] No CSV files found!")
  50. return df
  51. # ----------------------------
  52. # 多变量稳定性检查(针对单个 segment)
  53. # 返回每个变量的稳定性 bool 与总体稳定 bool(所有变量均稳定)
  54. # ----------------------------
  55. def analyze_variables_stability(self, seg: pd.DataFrame, flow_col: str) -> Dict[str, bool]:
  56. cols = {
  57. "flow": flow_col,
  58. "temp": self.temp_col,
  59. "orp": self.orp_col,
  60. "cond": self.cond_col
  61. }
  62. results = {}
  63. for name, col in cols.items():
  64. if col not in seg.columns:
  65. results[name] = False
  66. continue
  67. series = seg[col].dropna()
  68. if len(series) < 2:
  69. results[name] = False
  70. continue
  71. std = float(series.std())
  72. x = np.arange(len(series))
  73. try:
  74. slope = float(np.polyfit(x, series.values, 1)[0])
  75. except Exception:
  76. slope = np.inf
  77. results[name] = (std <= self.var_max_std) and (abs(slope) <= self.var_max_slope)
  78. results["all_stable"] = all(results[k] for k in ["flow", "temp", "orp", "cond"])
  79. return results
  80. # ----------------------------
  81. # 从 stable_segments 列表计算每段的 R_start/R_end(按机组)
  82. # 返回 seg_summary DataFrame(每行对应一个 stable inlet segment)
  83. # ----------------------------
  84. def summarize_R_for_unit(self, stable_segments: List[pd.DataFrame], unit: str) -> pd.DataFrame:
  85. rows = []
  86. res_col = f"{unit}_R"
  87. for seg in stable_segments:
  88. seg = seg.sort_values("time").reset_index(drop=True)
  89. if len(seg) < (self.segment_head_n + self.segment_tail_n):
  90. # 跳过过短的
  91. continue
  92. R_start = seg[res_col].iloc[:self.segment_head_n].median()
  93. R_end = seg[res_col].iloc[-self.segment_tail_n:].median()
  94. rows.append({
  95. "unit": unit,
  96. "segment_id": int(seg["segment_id"].iloc[0]),
  97. "start_time": seg["time"].iloc[0],
  98. "end_time": seg["time"].iloc[-1],
  99. "R_start": float(R_start),
  100. "R_end": float(R_end),
  101. "n_points": len(seg)
  102. })
  103. return pd.DataFrame(rows)
  104. # ----------------------------
  105. # 主运行逻辑
  106. # ----------------------------
  107. def run(self):
  108. df = self.load_all()
  109. if df.empty:
  110. raise ValueError("未找到原始数据或数据为空")
  111. df = df.sort_values("time").reset_index(drop=True)
  112. df = df.ffill().bfill()
  113. cols_to_convert = df.columns.difference(["time"])
  114. df[cols_to_convert] = df[cols_to_convert].apply(pd.to_numeric, errors='coerce')
  115. df[cols_to_convert] = df[cols_to_convert].ffill().bfill()
  116. # 为安全:确保 time 列为 datetime
  117. if not np.issubdtype(df["time"].dtype, np.datetime64):
  118. df["time"] = pd.to_datetime(df["time"])
  119. # 逐机组处理
  120. for unit in self.units:
  121. print(f"Processing {unit} ...")
  122. ctrl_col = f"C.M.{unit}_DB@word_control"
  123. flow_col = f"C.M.{unit}_FT_JS@out"
  124. # 去除无关列
  125. other_units = [u for u in self.units if u != unit]
  126. cols_to_drop = [col for col in df.columns if any(ou in col for ou in other_units)]
  127. df_unit = df.drop(columns=cols_to_drop)
  128. # 逐机组事件识别:使用 UFEventClassifier(实例化单机组)
  129. event_clf = UFEventClassifier(unit, self.cfg.uf["inlet_codes"],
  130. self.cfg.uf["physical_bw_code"], self.cfg.uf["chemical_bw_code"])
  131. df_unit = event_clf.classify(df_unit) # 产生 event_type 列
  132. df_unit_mark = self.initial_label.mark(df_unit)
  133. seg_df = event_clf.segment(df_unit_mark)
  134. # 对 seg_df 进行按 segment 分组后逐段过滤:
  135. const_flow_filter = ConstantFlowFilter(flow_col=flow_col, repeat_len=20)
  136. segments = const_flow_filter.filter(seg_df) # 去除出现网络错误的进水段
  137. segments = self.quality_filter.filter(segments) # 去除时间过短的进水段
  138. stable_extractor = InletSegmentFilter(ctrl_col,
  139. stable_value=self.stable_inlet_code,
  140. min_points=self.min_points)
  141. stable_segments = stable_extractor.extract(segments) # 提取稳定进水数据
  142. # 若无稳定进水段,则记录不稳定段并跳过该机组
  143. if len(stable_segments) == 0:
  144. print(f" No stable segments found for {unit}")
  145. continue
  146. # -----------------------------
  147. # 逐段计算膜阻力
  148. # -----------------------------
  149. stable_segments = self.res_calc.calculate_for_segments(
  150. stable_segments,
  151. temp_col=self.temp_col,
  152. flow_col=flow_col
  153. )
  154. # -----------------------------
  155. # 逐段计算 放缩后的R_start / R_end 与进水变量稳定性
  156. # -----------------------------
  157. # 变量稳定性分析
  158. vsa = VariableStabilityAnalyzer()
  159. stable_segments = vsa.analyze_segments(stable_segments, col=flow_col, prefix="flow")
  160. stable_segments = vsa.analyze_segments(stable_segments, col=self.temp_col, prefix="temp")
  161. # 膜阻力统计
  162. res_scaled_col = f"{unit}_R_scaled"
  163. ura = UFResistanceAnalyzer(
  164. resistance_col=res_scaled_col,
  165. head_n=self.segment_head_n,
  166. tail_n=self.segment_tail_n
  167. )
  168. stable_segments = ura.analyze_segments(stable_segments)
  169. # ----------------------------------
  170. # 1. 化学周期划分
  171. # ----------------------------------
  172. cycle_segmenter = ChemicalCycleSegmenter(max_hours=60)
  173. cycles = cycle_segmenter.assign_cycles(df_unit, stable_segments)
  174. # ----------------------------------
  175. # 2. 每个周期拟合短期污染 nuK
  176. # ----------------------------------
  177. stf = ShortTermCycleFoulingFitter(unit)
  178. for cid, cycle in cycles.items():
  179. if not cycle["valid"]:
  180. cycle["nuk"] = None
  181. cycle["nuk_r2"] = None
  182. continue
  183. nuk, r2 = stf.fit_cycle(cycle["segments"])
  184. cycle["nuk"] = nuk
  185. cycle["nuk_r2"] = r2
  186. for seg in cycle["segments"]:
  187. seg["cycle_nuK"] = nuk
  188. seg["cycle_nuK_R2"] = r2
  189. # ----------------------------------
  190. # 3. 长期污染模型幂律拟合
  191. # ----------------------------------
  192. ltf = LongTermFoulingFitter(unit)
  193. for cid, cycle in cycles.items():
  194. if not cycle["valid"]:
  195. cycle["a"] = None
  196. cycle["b"] = None
  197. cycle["lt_r2"] = None
  198. continue
  199. a, b, r2 = ltf.fit_cycle(cycle["segments"])
  200. cycle["a"] = a
  201. cycle["b"] = b
  202. cycle["lt_r2"] = r2
  203. for seg in cycle["segments"]:
  204. seg["cycle_long_a"] = a
  205. seg["cycle_long_b"] = b
  206. seg["cycle_long_r2"] = r2
  207. # ----------------------------------
  208. # 4. 计算化学反冲洗去除膜阻力
  209. # ----------------------------------
  210. cbc = ChemicalBackwashCleaner(unit)
  211. cycles = cbc.compute_removal(cycles)
  212. # 回写到稳定段
  213. for cid, cycle in cycles.items():
  214. R_removed = cycle["R_removed"]
  215. for seg in cycle["segments"]:
  216. seg["cycle_R_removed"] = R_removed
  217. # -----------------------------
  218. # 记录所有稳定进水段信息(stable_segments 仅包含稳定段)
  219. # -----------------------------
  220. all_inlet_segment_rows = []
  221. for seg in stable_segments: # seg 是一个 DataFrame
  222. seg_id = int(seg["segment_id"].iloc[0])
  223. row = {
  224. "seg_id": seg_id,
  225. "start_time": seg["time"].iloc[0],
  226. "end_time": seg["time"].iloc[-1],
  227. }
  228. # 添加膜阻力与变量波动性分析列
  229. row.update({
  230. "R_scaled_start": seg["R_scaled_start"].iloc[0] if "R_scaled_start" in seg.columns else None,
  231. "R_scaled_end": seg["R_scaled_end"].iloc[-1] if "R_scaled_end" in seg.columns else None,
  232. "flow_mean": seg["flow_mean"].iloc[0] if "flow_mean" in seg.columns else None,
  233. "flow_std": seg["flow_std"].iloc[0] if "flow_std" in seg.columns else None,
  234. "flow_cv": seg["flow_cv"].iloc[0] if "flow_cv" in seg.columns else None,
  235. "temp_mean": seg["temp_mean"].iloc[0] if "temp_mean" in seg.columns else None,
  236. "temp_std": seg["temp_std"].iloc[0] if "temp_std" in seg.columns else None,
  237. "temp_cv": seg["temp_cv"].iloc[0] if "temp_cv" in seg.columns else None,
  238. })
  239. # === 将化学周期字段写入 row ===
  240. chem_cols = [col for col in seg.columns if col.startswith("chem_cycle_")]
  241. for col in chem_cols:
  242. row[col] = seg[col].iloc[0]
  243. # === 将 nuK、长期污染参数、CEB 去除阻力写入 row ===
  244. cycle_cols = [col for col in seg.columns if col.startswith("cycle_")]
  245. for col in cycle_cols:
  246. row[col] = seg[col].iloc[0]
  247. all_inlet_segment_rows.append(row)
  248. # -----------------------------
  249. # 最终构建 DataFrame
  250. # -----------------------------
  251. df_segments = pd.DataFrame(all_inlet_segment_rows)
  252. # 输出文件名:unit_xxx_segments.csv
  253. filename = f"{unit}_segments.csv"
  254. output_path = os.path.join(self.output_dir, filename)
  255. df_segments.to_csv(output_path, index=False)
  256. print(f"Saved inlet segment data to: {output_path}")
  257. # ======================================================
  258. # 扩展功能:化学周期过滤与导出(基于 chem_cycle_id)
  259. # ======================================================
  260. print(f">>> Post-filtering chemical cycles for unit {unit} ...")
  261. # 1. 仅保留有化学周期标识的段
  262. if "chem_cycle_id" not in df_segments.columns:
  263. print(f"No chem_cycle_id found for unit {unit}, skipping cycle filtering.")
  264. else:
  265. df_valid = df_segments.dropna(subset=["chem_cycle_id"]).copy()
  266. df_valid["chem_cycle_id"] = df_valid["chem_cycle_id"].astype(int)
  267. # 2. 根据 R2 过滤化学周期
  268. grouped = df_valid.groupby("chem_cycle_id")
  269. valid_cycle_ids = []
  270. for cid, g in grouped:
  271. # 同一化学周期的 R2 相同(周期级拟合),用第一行即可
  272. nuk_r2 = g["cycle_nuK_R2"].iloc[0] if "cycle_nuK_R2" in g.columns else None
  273. long_r2 = g["cycle_long_r2"].iloc[0] if "cycle_long_r2" in g.columns else None
  274. if nuk_r2 is not None and long_r2 is not None:
  275. if nuk_r2 > 0.5 and long_r2 > 0.5:
  276. valid_cycle_ids.append(cid)
  277. # 3. 过滤后的化学周期数据
  278. df_cycles_kept = df_valid[df_valid["chem_cycle_id"].isin(valid_cycle_ids)]
  279. # 4. 保存过滤结果
  280. cycle_filename = f"{unit}_filtered_cycles.csv"
  281. cycle_output_path = os.path.join(self.output_dir, cycle_filename)
  282. df_cycles_kept.to_csv(cycle_output_path, index=False)
  283. print(f"Saved filtered chemical cycles to: {cycle_output_path}")
  284. # 5. 统计保留行占比
  285. total_rows = len(df_valid)
  286. kept_rows = len(df_cycles_kept)
  287. pct = kept_rows / total_rows * 100 if total_rows > 0 else 0
  288. print(
  289. f"Unit {unit}: kept {kept_rows}/{total_rows} rows "
  290. f"({pct:.2f} percent) after cycle R2 filtering."
  291. )