dqn_statebuilder.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. from typing import Dict
  2. import pandas as pd
  3. # -------------------------------
  4. # 引入环境状态模板(最终输出)
  5. # -------------------------------
  6. from env.env_params import UFState
  7. # -------------------------------
  8. # 引入分析类
  9. # -------------------------------
  10. from uf_data_process.load import UFConfigLoader
  11. from uf_data_process.label import UFEventClassifier
  12. from uf_data_process.filter import EventQualityFilter, InletSegmentFilter
  13. from uf_data_process.calculate import UFResistanceCalculator, UFResistanceAnalyzer
  14. from uf_data_process.fit import ShortTermCycleFoulingFitter, LongTermFoulingFitter
  15. class DQNStateBuilder:
  16. """
  17. 在 DQN 决策前构建状态的工具类
  18. 相关数据:
  19. * CSV1 = 上一完整化学周期
  20. * CSV2 = 新周期初始进水段
  21. """
  22. def __init__(self, config_path: str):
  23. """
  24. Parameters
  25. ----------
  26. config_path : str
  27. uf_analyze_config.yaml 路径
  28. """
  29. self.cfg = UFConfigLoader(config_path)
  30. self.uf_cfg = self.cfg.uf
  31. self.params = self.cfg.params
  32. self.units = self.uf_cfg["units"]
  33. self.column_formats = self.uf_cfg.get("column_formats", {})
  34. self.flow_format = self.column_formats.get("flow_col", "C.M.{unit}_FT_JS@out")
  35. self.ctrl_format = self.column_formats.get("ctrl_col", "C.M.{unit}_DB@word_control")
  36. self.area_m2 = self.uf_cfg["area_m2"]
  37. self.scale_factor = self.params.get("scale_factor", 1e10)
  38. self.segment_head_n = self.params.get("segment_head_n", 10)
  39. self.segment_tail_n = self.params.get("segment_tail_n", 10)
  40. # ======================================================================
  41. # 对外主接口
  42. # ======================================================================
  43. def build_from_csv_pair(
  44. self,
  45. prev_cycle_csv: str,
  46. init_cycle_csv: str,
  47. ) -> UFState:
  48. """
  49. 使用【上一完整化学周期 CSV】+【当前周期初始 CSV】构建 UFState
  50. """
  51. df_prev = pd.read_csv(prev_cycle_csv)
  52. df_init = pd.read_csv(init_cycle_csv)
  53. # 自动识别 UF 单元编号(UF1 / UF2 / ...)
  54. unit_id = self._infer_unit_id(df_prev)
  55. # 分别处理两个 CSV
  56. prev_features = self._analyze_previous_cycle_csv(df_prev, unit_id)
  57. init_features = self._analyze_init_cycle_csv(df_init, unit_id)
  58. # 化学清洗去除阻力(上一周期末 - 当前初始)
  59. ceb_removal = max(
  60. prev_features["R_end"] - init_features["R_start"],
  61. 0.0
  62. )
  63. # 构建 UFState
  64. state = UFState(
  65. q_UF=init_features["q_mean"],
  66. TMP=init_features["tmp_mean"],
  67. temp=init_features["temp_mean"],
  68. R=init_features["R_start"],
  69. nuK=prev_features["nuK"],
  70. slope=prev_features["slope"],
  71. power=prev_features["power"],
  72. ceb_removal=ceb_removal,
  73. )
  74. return state
  75. # ======================================================================
  76. # 上一完整化学周期分析
  77. # ======================================================================
  78. def _analyze_previous_cycle_csv(
  79. self,
  80. df: pd.DataFrame,
  81. unit_id: str,
  82. ) -> Dict[str, float]:
  83. """
  84. 上一完整化学周期分析逻辑
  85. 步骤:
  86. 1. 事件标注
  87. 2. 进水段过滤(质量过滤)
  88. 3. 膜阻力计算
  89. 4. 提取周期末稳定阻力
  90. 5. 拟合 nuK
  91. 6. 拟合长期不可逆污染(slope / power)
  92. """
  93. # -------- 事件标注 --------
  94. df = self._label_events(df, unit_id)
  95. # -------- 保留过滤进水段 --------
  96. inlet_filter = InletSegmentFilter(
  97. control_col=f"C.M.{unit_id}_DB@word_control",
  98. stable_value=self.uf_cfg["stable_inlet_code"],
  99. min_points=self.params["min_stable_points"],
  100. )
  101. segments = inlet_filter.extract(df)
  102. quality_filter = EventQualityFilter(
  103. min_points=self.params["min_stable_points"]
  104. )
  105. segments = quality_filter.filter(segments)
  106. if len(segments) == 0:
  107. raise ValueError("上一周期无有效稳定进水段,无法构建状态")
  108. # -------- 3️⃣ 膜阻力计算 --------
  109. res_calc = UFResistanceCalculator(
  110. units=[unit_id],
  111. area_m2=self.area_m2,
  112. scale_factor=self.scale_factor,
  113. )
  114. segments = res_calc.calculate_for_segments(
  115. segments,
  116. temp_col=self.uf_cfg["temp_col"],
  117. flow_col=self.uf_cfg["flow_col_template"].format(unit=unit_id),
  118. )
  119. # -------- 膜阻力统计 --------
  120. res_col = f"{unit_id}_R_scaled"
  121. ura = UFResistanceAnalyzer(
  122. resistance_col=res_col,
  123. head_n=self.segment_head_n,
  124. tail_n=self.segment_tail_n
  125. )
  126. segments = ura.analyze_segments(segments)
  127. df_all = segments[-1]
  128. R_end = df_all["R_scaled_end"].iloc[0]
  129. # ===== 确保 time 为 datetime =====
  130. for i, seg in enumerate(segments):
  131. if not pd.api.types.is_datetime64_any_dtype(seg["time"]):
  132. seg = seg.copy()
  133. seg["time"] = pd.to_datetime(seg["time"], errors="coerce")
  134. seg = seg.dropna(subset=["time"])
  135. segments[i] = seg
  136. # -------- 5️⃣ 短期污染拟合(nuK)--------
  137. st_fitter = ShortTermCycleFoulingFitter(unit_id)
  138. nuK, _ = st_fitter.fit_cycle(segments)
  139. # -------- 6️⃣ 长期不可逆污染拟合 --------
  140. lt_fitter = LongTermFoulingFitter(unit_id)
  141. slope, power, _ = lt_fitter.fit_cycle(segments)
  142. return {
  143. "R_end": R_end,
  144. "nuK": float(nuK),
  145. "slope": float(slope),
  146. "power": float(power),
  147. }
  148. # ======================================================================
  149. # 当前周期初始进水段分析
  150. # ======================================================================
  151. def _analyze_init_cycle_csv(
  152. self,
  153. df: pd.DataFrame,
  154. unit_id: str,
  155. ) -> Dict[str, float]:
  156. """
  157. 当前周期初始进水段分析
  158. 特点:
  159. - 不切段
  160. - 不过滤
  161. - 只计算均值
  162. """
  163. # -------- 1️⃣ 事件标注 --------
  164. df = self._label_events(df, unit_id)
  165. # -------- 2️⃣ 仅保留进水行 --------
  166. df = df[df["event_type"] == "inlet"].copy()
  167. if df.empty:
  168. raise ValueError("初始 CSV 中无进水数据")
  169. # -------- 3️⃣ 膜阻力计算 --------
  170. res_calc = UFResistanceCalculator(
  171. units=[unit_id],
  172. area_m2=self.area_m2,
  173. scale_factor=self.scale_factor,
  174. )
  175. segments = [df]
  176. segments = res_calc.calculate_for_segments(
  177. segments,
  178. temp_col=self.uf_cfg["temp_col"],
  179. flow_col=self.uf_cfg["flow_col_template"].format(unit=unit_id),
  180. )
  181. df = segments[-1]
  182. flow_col = self.uf_cfg["flow_col_template"].format(unit=unit_id)
  183. temp_col = self.uf_cfg["temp_col"]
  184. press_col = f"C.M.{unit_id}_DB@press_PV"
  185. res_col = f"{unit_id}_R_scaled"
  186. return {
  187. "q_mean": float(df[flow_col].mean()),
  188. "tmp_mean": float(df[press_col].mean()),
  189. "temp_mean": float(df[temp_col].mean()),
  190. "R_start": float(df[res_col].mean()),
  191. }
  192. # ======================================================================
  193. # 工具函数
  194. # ======================================================================
  195. def _infer_unit_id(self, df: pd.DataFrame) -> str:
  196. """
  197. 根据列名自动识别 UF 单元编号
  198. """
  199. for unit in self.units:
  200. key = self.flow_format.format(unit=unit)
  201. if key in df.columns:
  202. return unit
  203. raise ValueError("无法从 CSV 列名识别 UF 单元编号")
  204. def _label_events(self, df: pd.DataFrame, unit_id: str) -> pd.DataFrame:
  205. """
  206. 为 DataFrame 标注 event_type
  207. """
  208. ctrl_col = self.uf_cfg["flow_col_template"].format(unit=unit_id)
  209. clf = UFEventClassifier(
  210. unit_name=unit_id,
  211. inlet_codes=self.uf_cfg["inlet_codes"],
  212. physical_code=self.uf_cfg["physical_bw_code"],
  213. chemical_code=self.uf_cfg["chemical_bw_code"],
  214. ctrl_col
  215. )
  216. df = clf.classify(df)
  217. df = clf.segment(df)
  218. return df