| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- import re
- from pathlib import Path
- from typing import Dict, Any
- import yaml
- import numpy as np
- import pandas as pd
- from oauthlib.uri_validate import segment
- # -------------------------------
- # 引入环境状态模板(最终输出)
- # -------------------------------
- from uf_train.env.env_params import UFState
- # -------------------------------
- # 引入分析类
- # -------------------------------
- from uf_data_process.load import UFConfigLoader
- from uf_data_process.label import UFEventClassifier, PostBackwashInletMarker
- from uf_data_process.filter import ConstantFlowFilter, EventQualityFilter, InletSegmentFilter
- from uf_data_process.calculate import UFResistanceCalculator, UFResistanceAnalyzer
- from uf_data_process.fit import ShortTermCycleFoulingFitter, LongTermFoulingFitter
- class DQNStateBuilder:
- """
- 在 DQN 决策前构建状态的工具类
- 相关数据:
- * CSV1 = 上一完整化学周期
- * CSV2 = 新周期初始进水段
- """
- def __init__(self, config_path: str):
- """
- Parameters
- ----------
- config_path : str
- uf_analyze_config.yaml 路径
- """
- self.cfg = UFConfigLoader(config_path)
- self.uf_cfg = self.cfg.uf
- self.params = self.cfg.params
- self.units = self.uf_cfg["units"]
- self.area_m2 = self.uf_cfg["area_m2"]
- self.scale_factor = self.params.get("scale_factor", 1e10)
- self.segment_head_n = self.params.get("segment_head_n", 10)
- self.segment_tail_n = self.params.get("segment_tail_n", 10)
- # ======================================================================
- # 对外主接口
- # ======================================================================
- def build_from_csv_pair(
- self,
- prev_cycle_csv: str,
- init_cycle_csv: str,
- ) -> UFState:
- """
- 使用【上一完整化学周期 CSV】+【当前周期初始 CSV】构建 UFState
- """
- df_prev = pd.read_csv(prev_cycle_csv)
- df_init = pd.read_csv(init_cycle_csv)
- # 自动识别 UF 单元编号(UF1 / UF2 / ...)
- unit_id = self._infer_unit_id(df_prev)
- # 分别处理两个 CSV
- prev_features = self._analyze_previous_cycle_csv(df_prev, unit_id)
- init_features = self._analyze_init_cycle_csv(df_init, unit_id)
- # 化学清洗去除阻力(上一周期末 - 当前初始)
- ceb_removal = max(
- prev_features["R_end"] - init_features["R_start"],
- 0.0
- )
- # 构建 UFState
- state = UFState(
- q_UF=init_features["q_mean"],
- TMP=init_features["tmp_mean"],
- temp=init_features["temp_mean"],
- R=init_features["R_start"],
- nuK=prev_features["nuK"],
- slope=prev_features["slope"],
- power=prev_features["power"],
- ceb_removal=ceb_removal,
- )
- return state
- # ======================================================================
- # 上一完整化学周期分析
- # ======================================================================
- def _analyze_previous_cycle_csv(
- self,
- df: pd.DataFrame,
- unit_id: str,
- ) -> Dict[str, float]:
- """
- 上一完整化学周期分析逻辑
- 步骤:
- 1. 事件标注
- 2. 进水段过滤(质量过滤)
- 3. 膜阻力计算
- 4. 提取周期末稳定阻力
- 5. 拟合 nuK
- 6. 拟合长期不可逆污染(slope / power)
- """
- # -------- 事件标注 --------
- df = self._label_events(df, unit_id)
- # -------- 保留过滤进水段 --------
- inlet_filter = InletSegmentFilter(
- control_col=f"C.M.{unit_id}_DB@word_control",
- stable_value=self.uf_cfg["stable_inlet_code"],
- min_points=self.params["min_stable_points"],
- )
- segments = inlet_filter.extract(df)
- quality_filter = EventQualityFilter(
- min_points=self.params["min_stable_points"]
- )
- segments = quality_filter.filter(segments)
- if len(segments) == 0:
- raise ValueError("上一周期无有效稳定进水段,无法构建状态")
- # -------- 3️⃣ 膜阻力计算 --------
- res_calc = UFResistanceCalculator(
- units=[unit_id],
- area_m2=self.area_m2,
- scale_factor=self.scale_factor,
- )
- segments = res_calc.calculate_for_segments(
- segments,
- temp_col=self.uf_cfg["temp_col"],
- flow_col=self.uf_cfg["flow_col_template"].format(unit=unit_id),
- )
- # -------- 膜阻力统计 --------
- res_col = f"{unit_id}_R_scaled"
- ura = UFResistanceAnalyzer(
- resistance_col=res_col,
- head_n=self.segment_head_n,
- tail_n=self.segment_tail_n
- )
- segments = ura.analyze_segments(segments)
- df_all = segments[-1]
- R_end = df_all["R_scaled_end"].iloc[0]
- # ===== 确保 time 为 datetime =====
- for i, seg in enumerate(segments):
- if not pd.api.types.is_datetime64_any_dtype(seg["time"]):
- seg = seg.copy()
- seg["time"] = pd.to_datetime(seg["time"], errors="coerce")
- seg = seg.dropna(subset=["time"])
- segments[i] = seg
- # -------- 5️⃣ 短期污染拟合(nuK)--------
- st_fitter = ShortTermCycleFoulingFitter(unit_id)
- nuK, _ = st_fitter.fit_cycle(segments)
- # -------- 6️⃣ 长期不可逆污染拟合 --------
- lt_fitter = LongTermFoulingFitter(unit_id)
- slope, power, _ = lt_fitter.fit_cycle(segments)
- return {
- "R_end": R_end,
- "nuK": float(nuK),
- "slope": float(slope),
- "power": float(power),
- }
- # ======================================================================
- # 当前周期初始进水段分析
- # ======================================================================
- def _analyze_init_cycle_csv(
- self,
- df: pd.DataFrame,
- unit_id: str,
- ) -> Dict[str, float]:
- """
- 当前周期初始进水段分析
- 特点:
- - 不切段
- - 不过滤
- - 只计算均值
- """
- # -------- 1️⃣ 事件标注 --------
- df = self._label_events(df, unit_id)
- # -------- 2️⃣ 仅保留进水行 --------
- df = df[df["event_type"] == "inlet"].copy()
- if df.empty:
- raise ValueError("初始 CSV 中无进水数据")
- # -------- 3️⃣ 膜阻力计算 --------
- res_calc = UFResistanceCalculator(
- units=[unit_id],
- area_m2=self.area_m2,
- scale_factor=self.scale_factor,
- )
- segments = [df]
- segments = res_calc.calculate_for_segments(
- segments,
- temp_col=self.uf_cfg["temp_col"],
- flow_col=self.uf_cfg["flow_col_template"].format(unit=unit_id),
- )
- df = segments[-1]
- flow_col = self.uf_cfg["flow_col_template"].format(unit=unit_id)
- temp_col = self.uf_cfg["temp_col"]
- press_col = f"C.M.{unit_id}_DB@press_PV"
- res_col = f"{unit_id}_R_scaled"
- return {
- "q_mean": float(df[flow_col].mean()),
- "tmp_mean": float(df[press_col].mean()),
- "temp_mean": float(df[temp_col].mean()),
- "R_start": float(df[res_col].mean()),
- }
- # ======================================================================
- # 工具函数
- # ======================================================================
- def _infer_unit_id(self, df: pd.DataFrame) -> str:
- """
- 根据列名自动识别 UF 单元编号
- """
- for unit in self.units:
- key = f"C.M.{unit}_FT_JS@out"
- if key in df.columns:
- return unit
- raise ValueError("无法从 CSV 列名识别 UF 单元编号")
- def _label_events(self, df: pd.DataFrame, unit_id: str) -> pd.DataFrame:
- """
- 为 DataFrame 标注 event_type
- """
- clf = UFEventClassifier(
- unit_name=unit_id,
- inlet_codes=self.uf_cfg["inlet_codes"],
- physical_code=self.uf_cfg["physical_bw_code"],
- chemical_code=self.uf_cfg["chemical_bw_code"],
- )
- df = clf.classify(df)
- df = clf.segment(df)
- return df
|