dqn_statebuilder.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. from typing import Dict
  2. import pandas as pd
  3. # -------------------------------
  4. # 引入环境状态模板(最终输出)
  5. # -------------------------------
  6. from env.env_params import UFState
  7. # -------------------------------
  8. # 引入分析类
  9. # -------------------------------
  10. from uf_data_process.load import UFConfigLoader
  11. from uf_data_process.label import UFEventClassifier
  12. from uf_data_process.filter import EventQualityFilter, InletSegmentFilter
  13. from uf_data_process.calculate import UFResistanceCalculator, UFResistanceAnalyzer
  14. from uf_data_process.fit import ShortTermCycleFoulingFitter, LongTermFoulingFitter
  15. class DQNStateBuilder:
  16. """
  17. 在 DQN 决策前构建状态的工具类
  18. 相关数据:
  19. * CSV1 = 上一完整化学周期
  20. * CSV2 = 新周期初始进水段
  21. """
  22. def __init__(self, config_path: str):
  23. """
  24. Parameters
  25. ----------
  26. config_path : str
  27. uf_analyze_config.yaml 路径
  28. """
  29. self.cfg = UFConfigLoader(config_path)
  30. self.uf_cfg = self.cfg.uf
  31. self.params = self.cfg.params
  32. self.units = self.uf_cfg["units"]
  33. self.area_m2 = self.uf_cfg["area_m2"]
  34. self.scale_factor = self.params.get("scale_factor", 1e10)
  35. self.segment_head_n = self.params.get("segment_head_n", 10)
  36. self.segment_tail_n = self.params.get("segment_tail_n", 10)
  37. # ======================================================================
  38. # 对外主接口
  39. # ======================================================================
  40. def build_from_csv_pair(
  41. self,
  42. prev_cycle_csv: str,
  43. init_cycle_csv: str,
  44. ) -> UFState:
  45. """
  46. 使用【上一完整化学周期 CSV】+【当前周期初始 CSV】构建 UFState
  47. """
  48. df_prev = pd.read_csv(prev_cycle_csv)
  49. df_init = pd.read_csv(init_cycle_csv)
  50. # 自动识别 UF 单元编号(UF1 / UF2 / ...)
  51. unit_id = self._infer_unit_id(df_prev)
  52. # 分别处理两个 CSV
  53. prev_features = self._analyze_previous_cycle_csv(df_prev, unit_id)
  54. init_features = self._analyze_init_cycle_csv(df_init, unit_id)
  55. # 化学清洗去除阻力(上一周期末 - 当前初始)
  56. ceb_removal = max(
  57. prev_features["R_end"] - init_features["R_start"],
  58. 0.0
  59. )
  60. # 构建 UFState
  61. state = UFState(
  62. q_UF=init_features["q_mean"],
  63. TMP=init_features["tmp_mean"],
  64. temp=init_features["temp_mean"],
  65. R=init_features["R_start"],
  66. nuK=prev_features["nuK"],
  67. slope=prev_features["slope"],
  68. power=prev_features["power"],
  69. ceb_removal=ceb_removal,
  70. )
  71. return state
  72. # ======================================================================
  73. # 上一完整化学周期分析
  74. # ======================================================================
  75. def _analyze_previous_cycle_csv(
  76. self,
  77. df: pd.DataFrame,
  78. unit_id: str,
  79. ) -> Dict[str, float]:
  80. """
  81. 上一完整化学周期分析逻辑
  82. 步骤:
  83. 1. 事件标注
  84. 2. 进水段过滤(质量过滤)
  85. 3. 膜阻力计算
  86. 4. 提取周期末稳定阻力
  87. 5. 拟合 nuK
  88. 6. 拟合长期不可逆污染(slope / power)
  89. """
  90. # -------- 事件标注 --------
  91. df = self._label_events(df, unit_id)
  92. # -------- 保留过滤进水段 --------
  93. inlet_filter = InletSegmentFilter(
  94. control_col=f"C.M.{unit_id}_DB@word_control",
  95. stable_value=self.uf_cfg["stable_inlet_code"],
  96. min_points=self.params["min_stable_points"],
  97. )
  98. segments = inlet_filter.extract(df)
  99. quality_filter = EventQualityFilter(
  100. min_points=self.params["min_stable_points"]
  101. )
  102. segments = quality_filter.filter(segments)
  103. if len(segments) == 0:
  104. raise ValueError("上一周期无有效稳定进水段,无法构建状态")
  105. # -------- 3️⃣ 膜阻力计算 --------
  106. res_calc = UFResistanceCalculator(
  107. units=[unit_id],
  108. area_m2=self.area_m2,
  109. scale_factor=self.scale_factor,
  110. )
  111. segments = res_calc.calculate_for_segments(
  112. segments,
  113. temp_col=self.uf_cfg["temp_col"],
  114. flow_col=self.uf_cfg["flow_col_template"].format(unit=unit_id),
  115. )
  116. # -------- 膜阻力统计 --------
  117. res_col = f"{unit_id}_R_scaled"
  118. ura = UFResistanceAnalyzer(
  119. resistance_col=res_col,
  120. head_n=self.segment_head_n,
  121. tail_n=self.segment_tail_n
  122. )
  123. segments = ura.analyze_segments(segments)
  124. df_all = segments[-1]
  125. R_end = df_all["R_scaled_end"].iloc[0]
  126. # ===== 确保 time 为 datetime =====
  127. for i, seg in enumerate(segments):
  128. if not pd.api.types.is_datetime64_any_dtype(seg["time"]):
  129. seg = seg.copy()
  130. seg["time"] = pd.to_datetime(seg["time"], errors="coerce")
  131. seg = seg.dropna(subset=["time"])
  132. segments[i] = seg
  133. # -------- 5️⃣ 短期污染拟合(nuK)--------
  134. st_fitter = ShortTermCycleFoulingFitter(unit_id)
  135. nuK, _ = st_fitter.fit_cycle(segments)
  136. # -------- 6️⃣ 长期不可逆污染拟合 --------
  137. lt_fitter = LongTermFoulingFitter(unit_id)
  138. slope, power, _ = lt_fitter.fit_cycle(segments)
  139. return {
  140. "R_end": R_end,
  141. "nuK": float(nuK),
  142. "slope": float(slope),
  143. "power": float(power),
  144. }
  145. # ======================================================================
  146. # 当前周期初始进水段分析
  147. # ======================================================================
  148. def _analyze_init_cycle_csv(
  149. self,
  150. df: pd.DataFrame,
  151. unit_id: str,
  152. ) -> Dict[str, float]:
  153. """
  154. 当前周期初始进水段分析
  155. 特点:
  156. - 不切段
  157. - 不过滤
  158. - 只计算均值
  159. """
  160. # -------- 1️⃣ 事件标注 --------
  161. df = self._label_events(df, unit_id)
  162. # -------- 2️⃣ 仅保留进水行 --------
  163. df = df[df["event_type"] == "inlet"].copy()
  164. if df.empty:
  165. raise ValueError("初始 CSV 中无进水数据")
  166. # -------- 3️⃣ 膜阻力计算 --------
  167. res_calc = UFResistanceCalculator(
  168. units=[unit_id],
  169. area_m2=self.area_m2,
  170. scale_factor=self.scale_factor,
  171. )
  172. segments = [df]
  173. segments = res_calc.calculate_for_segments(
  174. segments,
  175. temp_col=self.uf_cfg["temp_col"],
  176. flow_col=self.uf_cfg["flow_col_template"].format(unit=unit_id),
  177. )
  178. df = segments[-1]
  179. flow_col = self.uf_cfg["flow_col_template"].format(unit=unit_id)
  180. temp_col = self.uf_cfg["temp_col"]
  181. press_col = f"C.M.{unit_id}_DB@press_PV"
  182. res_col = f"{unit_id}_R_scaled"
  183. return {
  184. "q_mean": float(df[flow_col].mean()),
  185. "tmp_mean": float(df[press_col].mean()),
  186. "temp_mean": float(df[temp_col].mean()),
  187. "R_start": float(df[res_col].mean()),
  188. }
  189. # ======================================================================
  190. # 工具函数
  191. # ======================================================================
  192. def _infer_unit_id(self, df: pd.DataFrame) -> str:
  193. """
  194. 根据列名自动识别 UF 单元编号
  195. """
  196. for unit in self.units:
  197. key = f"C.M.{unit}_FT_JS@out"
  198. if key in df.columns:
  199. return unit
  200. raise ValueError("无法从 CSV 列名识别 UF 单元编号")
  201. def _label_events(self, df: pd.DataFrame, unit_id: str) -> pd.DataFrame:
  202. """
  203. 为 DataFrame 标注 event_type
  204. """
  205. clf = UFEventClassifier(
  206. unit_name=unit_id,
  207. inlet_codes=self.uf_cfg["inlet_codes"],
  208. physical_code=self.uf_cfg["physical_bw_code"],
  209. chemical_code=self.uf_cfg["chemical_bw_code"],
  210. )
  211. df = clf.classify(df)
  212. df = clf.segment(df)
  213. return df