import re from pathlib import Path from typing import Dict, Any import yaml import numpy as np import pandas as pd from oauthlib.uri_validate import segment # ------------------------------- # 引入环境状态模板(最终输出) # ------------------------------- from uf_train.env.env_params import UFState # ------------------------------- # 引入分析类 # ------------------------------- from uf_data_process.load import UFConfigLoader from uf_data_process.label import UFEventClassifier, PostBackwashInletMarker from uf_data_process.filter import ConstantFlowFilter, EventQualityFilter, InletSegmentFilter from uf_data_process.calculate import UFResistanceCalculator, UFResistanceAnalyzer from uf_data_process.fit import ShortTermCycleFoulingFitter, LongTermFoulingFitter class DQNStateBuilder: """ 在 DQN 决策前构建状态的工具类 相关数据: * CSV1 = 上一完整化学周期 * CSV2 = 新周期初始进水段 """ def __init__(self, config_path: str): """ Parameters ---------- config_path : str uf_analyze_config.yaml 路径 """ self.cfg = UFConfigLoader(config_path) self.uf_cfg = self.cfg.uf self.params = self.cfg.params self.units = self.uf_cfg["units"] self.area_m2 = self.uf_cfg["area_m2"] self.scale_factor = self.params.get("scale_factor", 1e10) self.segment_head_n = self.params.get("segment_head_n", 10) self.segment_tail_n = self.params.get("segment_tail_n", 10) # ====================================================================== # 对外主接口 # ====================================================================== def build_from_csv_pair( self, prev_cycle_csv: str, init_cycle_csv: str, ) -> UFState: """ 使用【上一完整化学周期 CSV】+【当前周期初始 CSV】构建 UFState """ df_prev = pd.read_csv(prev_cycle_csv) df_init = pd.read_csv(init_cycle_csv) # 自动识别 UF 单元编号(UF1 / UF2 / ...) unit_id = self._infer_unit_id(df_prev) # 分别处理两个 CSV prev_features = self._analyze_previous_cycle_csv(df_prev, unit_id) init_features = self._analyze_init_cycle_csv(df_init, unit_id) # 化学清洗去除阻力(上一周期末 - 当前初始) ceb_removal = max( prev_features["R_end"] - init_features["R_start"], 0.0 ) # 构建 UFState state = UFState( q_UF=init_features["q_mean"], TMP=init_features["tmp_mean"], temp=init_features["temp_mean"], R=init_features["R_start"], nuK=prev_features["nuK"], slope=prev_features["slope"], power=prev_features["power"], ceb_removal=ceb_removal, ) return state # ====================================================================== # 上一完整化学周期分析 # ====================================================================== def _analyze_previous_cycle_csv( self, df: pd.DataFrame, unit_id: str, ) -> Dict[str, float]: """ 上一完整化学周期分析逻辑 步骤: 1. 事件标注 2. 进水段过滤(质量过滤) 3. 膜阻力计算 4. 提取周期末稳定阻力 5. 拟合 nuK 6. 拟合长期不可逆污染(slope / power) """ # -------- 事件标注 -------- df = self._label_events(df, unit_id) # -------- 保留过滤进水段 -------- inlet_filter = InletSegmentFilter( control_col=f"C.M.{unit_id}_DB@word_control", stable_value=self.uf_cfg["stable_inlet_code"], min_points=self.params["min_stable_points"], ) segments = inlet_filter.extract(df) quality_filter = EventQualityFilter( min_points=self.params["min_stable_points"] ) segments = quality_filter.filter(segments) if len(segments) == 0: raise ValueError("上一周期无有效稳定进水段,无法构建状态") # -------- 3️⃣ 膜阻力计算 -------- res_calc = UFResistanceCalculator( units=[unit_id], area_m2=self.area_m2, scale_factor=self.scale_factor, ) segments = res_calc.calculate_for_segments( segments, temp_col=self.uf_cfg["temp_col"], flow_col=self.uf_cfg["flow_col_template"].format(unit=unit_id), ) # -------- 膜阻力统计 -------- res_col = f"{unit_id}_R_scaled" ura = UFResistanceAnalyzer( resistance_col=res_col, head_n=self.segment_head_n, tail_n=self.segment_tail_n ) segments = ura.analyze_segments(segments) df_all = segments[-1] R_end = df_all["R_scaled_end"].iloc[0] # ===== 确保 time 为 datetime ===== for i, seg in enumerate(segments): if not pd.api.types.is_datetime64_any_dtype(seg["time"]): seg = seg.copy() seg["time"] = pd.to_datetime(seg["time"], errors="coerce") seg = seg.dropna(subset=["time"]) segments[i] = seg # -------- 5️⃣ 短期污染拟合(nuK)-------- st_fitter = ShortTermCycleFoulingFitter(unit_id) nuK, _ = st_fitter.fit_cycle(segments) # -------- 6️⃣ 长期不可逆污染拟合 -------- lt_fitter = LongTermFoulingFitter(unit_id) slope, power, _ = lt_fitter.fit_cycle(segments) return { "R_end": R_end, "nuK": float(nuK), "slope": float(slope), "power": float(power), } # ====================================================================== # 当前周期初始进水段分析 # ====================================================================== def _analyze_init_cycle_csv( self, df: pd.DataFrame, unit_id: str, ) -> Dict[str, float]: """ 当前周期初始进水段分析 特点: - 不切段 - 不过滤 - 只计算均值 """ # -------- 1️⃣ 事件标注 -------- df = self._label_events(df, unit_id) # -------- 2️⃣ 仅保留进水行 -------- df = df[df["event_type"] == "inlet"].copy() if df.empty: raise ValueError("初始 CSV 中无进水数据") # -------- 3️⃣ 膜阻力计算 -------- res_calc = UFResistanceCalculator( units=[unit_id], area_m2=self.area_m2, scale_factor=self.scale_factor, ) segments = [df] segments = res_calc.calculate_for_segments( segments, temp_col=self.uf_cfg["temp_col"], flow_col=self.uf_cfg["flow_col_template"].format(unit=unit_id), ) df = segments[-1] flow_col = self.uf_cfg["flow_col_template"].format(unit=unit_id) temp_col = self.uf_cfg["temp_col"] press_col = f"C.M.{unit_id}_DB@press_PV" res_col = f"{unit_id}_R_scaled" return { "q_mean": float(df[flow_col].mean()), "tmp_mean": float(df[press_col].mean()), "temp_mean": float(df[temp_col].mean()), "R_start": float(df[res_col].mean()), } # ====================================================================== # 工具函数 # ====================================================================== def _infer_unit_id(self, df: pd.DataFrame) -> str: """ 根据列名自动识别 UF 单元编号 """ for unit in self.units: key = f"C.M.{unit}_FT_JS@out" if key in df.columns: return unit raise ValueError("无法从 CSV 列名识别 UF 单元编号") def _label_events(self, df: pd.DataFrame, unit_id: str) -> pd.DataFrame: """ 为 DataFrame 标注 event_type """ clf = UFEventClassifier( unit_name=unit_id, inlet_codes=self.uf_cfg["inlet_codes"], physical_code=self.uf_cfg["physical_bw_code"], chemical_code=self.uf_cfg["chemical_bw_code"], ) df = clf.classify(df) df = clf.segment(df) return df