4 Commity fc84661288 ... 517a240c86

Autor SHA1 Wiadomość Data
  junc_WHU 517a240c86 Merge remote-tracking branch 'company/dev' into dev 3 tygodni temu
  junc_WHU eb67737d9c feat:增加了兰考和安镇的模型 3 tygodni temu
  junc_WHU 406baf4010 feat:增加了环境体中吨水电耗与药耗相关奖励的计算逻辑 3 tygodni temu
  junc_WHU d86a3e990d feat:增加了数据处理与物理模型中吨水电耗与药耗的计算逻辑 3 tygodni temu
29 zmienionych plików z 1629 dodań i 269 usunięć
  1. BIN
      models/uf-rl/anzhen/48h_dqn_model.zip
  2. 221 0
      models/uf-rl/anzhen/data_to_rl_config.yaml
  3. 26 0
      models/uf-rl/anzhen/dqn_config.yaml
  4. 139 0
      models/uf-rl/anzhen/env_config.yaml
  5. 49 0
      models/uf-rl/anzhen/uf_analyze_config.yaml
  6. 1 1
      models/uf-rl/data_to_rl/run_data_to_rl_pipeline.py
  7. 22 15
      models/uf-rl/env/env_params.py
  8. 3 3
      models/uf-rl/env/env_reset.py
  9. 10 7
      models/uf-rl/env/env_visual.py
  10. 87 84
      models/uf-rl/env/uf_env.py
  11. 46 10
      models/uf-rl/env/uf_physics.py
  12. BIN
      models/uf-rl/lankao/48h_dqn_model.zip
  13. 221 0
      models/uf-rl/lankao/data_to_rl_config.yaml
  14. 26 0
      models/uf-rl/lankao/dqn_config.yaml
  15. 139 0
      models/uf-rl/lankao/env_config.yaml
  16. 49 0
      models/uf-rl/lankao/uf_analyze_config.yaml
  17. 26 23
      models/uf-rl/longting/env_config.yaml
  18. 8 2
      models/uf-rl/longting/uf_analyze_config.yaml
  19. 7 1
      models/uf-rl/rl_model/DQN/dqn_model/dqn_statebuilder.py
  20. 30 11
      models/uf-rl/rl_model/DQN/uf_decide/run_dqn_deicde_totalstate.py
  21. 1 1
      models/uf-rl/rl_model/DQN/uf_train/dqn_trainer.py
  22. 4 3
      models/uf-rl/rl_model/DQN/uf_train/run_dqn_train.py
  23. 59 0
      models/uf-rl/uf_data_process/calculate.py
  24. 178 100
      models/uf-rl/uf_data_process/data_export.py
  25. 43 0
      models/uf-rl/uf_data_process/filter.py
  26. 39 0
      models/uf-rl/uf_data_process/fit.py
  27. 27 7
      models/uf-rl/uf_data_process/pipeline.py
  28. 1 1
      models/uf-rl/uf_data_process/run_ufdata_pipeline.py
  29. 167 0
      models/uf-rl/uf_data_process/水厂变量名称记录.md

BIN
models/uf-rl/anzhen/48h_dqn_model.zip


+ 221 - 0
models/uf-rl/anzhen/data_to_rl_config.yaml

@@ -0,0 +1,221 @@
+# ============================================================
+# 项目级路径配置
+# ============================================================
+
+Paths:
+  project_root: "E:/Greentech"
+  __comment__: >
+    项目根目录,所有路径均相对于该目录展开。
+    不同工程师只需修改这一项即可迁移环境。
+
+  raw_data:
+    filtered_cycles_dir: "models/uf-rl/datasets/UF_longting_data/processed/filter_segments"
+    cycle_file_pattern: "UF{unit}_filtered_cycles.csv"
+
+    enabled_units: [1, 2]
+    disabled_units: None
+
+    __comment__: >
+      已完成清洗与周期切分的真实工厂数据。
+      每个 CSV 对应一个机组,文件内每一行是一个【物理周期】。
+      data_to_rl 阶段会:
+        - 读取 enabled_units 中所有机组
+        - 合并为统一数据集
+        - 机组编号仅用于筛选,不进入状态空间
+
+  data_to_rl:
+    output_dir: "models/uf-rl/datasets/UF_longting_data/rl_ready/output"
+    cache_dir: "models/uf-rl/datasets/UF_longting_data/rl_ready/cache"
+
+    __comment__: >
+      data_to_rl 模块输出内容:
+        - 全机组合并后的状态空间范围
+        - reset 初始状态分布
+        - reward 统计量
+
+
+# ============================================================
+# 数据层级定义(非常重要)
+# ============================================================
+
+DataHierarchy:
+
+  physical_cycle:
+    id_column: "seg_id"
+    time_columns:
+      start: "start_time"
+      end: "end_time"
+    __comment__: >
+      物理周期是最细粒度的数据层级:
+      - 用于物理模型验证
+      - 用于环境内部子步模拟
+      - 不直接暴露给强化学习算法
+
+  chemical_cycle:
+    id_column: "chem_cycle_id"
+    validity_column: "chem_cycle_valid"
+    step_definition: "one_rl_step_per_chemical_cycle"
+    __comment__: >
+      化学周期是强化学习的【唯一 step 单位】。
+      强化学习中:
+        - 一个 step = 一个化学周期
+        - 一个状态 = 一个化学周期的整体状态
+
+# ============================================================
+# 强化学习状态定义(工程师重点关注)
+# ============================================================
+
+RLState:
+  level: chemical_cycle
+  dimension: 8
+  __comment__: >
+    强化学习 observation 的定义。
+    所有变量在一个 step(化学周期)内保持不变。
+
+  variables:
+
+    q_UF:
+      source: "flow_mean"
+      aggregation: "mean_within_cycle"
+      unit: "m3_per_h"
+      description: "化学周期代表性进水流量"
+      __comment__: >
+        原始数据在物理周期层级变化,
+        这里使用化学周期内的平均值作为状态。
+
+    temp:
+      source: "temp_mean"
+      aggregation: "mean_within_cycle"
+      unit: "celsius"
+      description: "化学周期代表性水温"
+
+    TMP:
+      source: "tmp_start"
+      selection: "first_physical_cycle"
+      unit: "MPa"
+      description: "化学周期起始跨膜压差"
+      __comment__: >
+        虽然 TMP 在物理周期内会变化,
+        但强化学习只关心化学周期开始时的状态。
+
+    R:
+      source: "R_scaled_start"
+      selection: "first_physical_cycle"
+      unit: "scaled_resistance"
+      description: "化学周期起始膜阻力"
+
+    nuK:
+      source: "cycle_nuK"
+      unit: "scaled"
+      description: "短期污染增长系数"
+      __comment__: >
+        在同一化学周期内视为常数,
+        后续 step (下一化学周期)中根据动作进行经验性更新。
+
+    slope:
+      source: "cycle_long_a"
+      unit: "scaled"
+      description: "长期不可逆污染幂律系数 a"
+
+    power:
+      source: "cycle_long_b"
+      unit: "dimensionless"
+      description: "长期不可逆污染幂律指数 b"
+
+    ceb_removal:
+      source: "cycle_R_removed"
+      unit: "scaled"
+      description: "CEB 可去除的膜阻力"
+
+# ============================================================
+# 状态空间范围提取(reset 使用)
+# ============================================================
+
+StateSpaceExtraction:
+  enabled: true
+  level: chemical_cycle
+  include_only_valid_cycles: true
+
+  __comment__: >
+    用真实工厂数据统计【合理状态空间范围】,
+    用于:
+      - reset 初始状态采样
+      - 状态边界 sanity check
+
+  statistics:
+    method: "empirical"
+    percentiles: [1, 5, 50, 95, 99]
+    __comment__: >
+      使用经验分布统计,避免假设正态分布。
+
+  output:
+    save_csv: true
+    filename: "state_space_bounds.csv"
+
+# ============================================================
+# 物理周期层级的用途声明
+# ============================================================
+
+PhysicalCycleUsage:
+  enabled: true
+
+  purposes:
+    - internal_simulation
+    - parameter_sanity_check
+    - post_training_validation
+
+  variables_available:
+    - R_scaled_start
+    - R_scaled_end
+    - tmp_start
+    - tmp_end
+    - flow_mean
+    - flow_std
+    - temp_mean
+    - temp_std
+
+  __comment__: >
+    物理周期数据:
+      ✔ 可用于环境内部计算
+      ✔ 可用于训练后验证
+      ✘ 不可作为 RL observation
+
+# ============================================================
+# 真实数据在强化学习中的角色(工程边界声明)
+# ============================================================
+
+RealDataUsagePolicy:
+
+  reset_initialization:
+    enabled: true
+    source: "chemical_cycle_state_distribution"
+    __comment__: >
+      reset 时从真实化学周期状态分布中采样,
+      确保训练从“现实可达状态”开始。
+
+  training:
+    use_real_transitions: false
+    __comment__: >
+      真实数据不用于:
+        - 动力学拟合
+        - 监督策略
+      强化学习仍在模拟环境中进行。
+
+  validation:
+    enabled: true
+    compare_with_real_cycles: true
+    __comment__: >
+      训练完成后:
+        - 将策略运行结果
+        - 与真实化学周期统计特性进行对比
+      用于评估策略的现实可行性与安全性。
+
+# ============================================================
+# 方法论与工程风险声明
+# ============================================================
+
+MethodologyNotes:
+  - "强化学习的 step 定义严格等同于一个化学周期"
+  - "所有 RL 状态变量在一个 step 内视为常数"
+  - "物理周期层级仅用于环境内部模拟"
+  - "真实数据用于分布约束与验证,而非动力学建模"

+ 26 - 0
models/uf-rl/anzhen/dqn_config.yaml

@@ -0,0 +1,26 @@
+# ============================================================
+# DQN 超参数配置
+# ============================================================
+# 神经网络参数
+learning_rate: 0.0001           # 学习率 1e-4
+
+# 经验回放参数
+buffer_size: 100000              # 经验回放缓冲区大小
+learning_starts: 10000           # 开始训练前收集的步数
+batch_size: 32                   # 训练批次大小
+
+# 强化学习参数
+gamma: 0.95                      # 折扣因子
+train_freq: 4                    # 训练频率(步数)
+
+# 目标网络参数
+target_update_interval: 1        # 目标网络更新间隔
+tau: 0.005                       # 软更新系数
+
+# 探索策略参数
+exploration_initial_eps: 1.0     # 初始探索率
+exploration_fraction: 0.3        # 探索率衰减比例
+exploration_final_eps: 0.02      # 最终探索率
+
+# 日志参数
+remark: "uf_dqn_real_reset"      # 实验备注

+ 139 - 0
models/uf-rl/anzhen/env_config.yaml

@@ -0,0 +1,139 @@
+UFState:
+  # ===== 膜动态运行参数 =====
+  q_UF: 150.0
+  TMP: 0.02
+  temp: 20.0
+  R: 200.0
+
+  # ===== 膜阻力模型参数 =====
+  nuK: 170.0
+  slope: 2.0
+  power: 1.1
+  ceb_removal: 200.0
+
+
+UFPhysicsParams:
+  # ===== TMP 全局约束 =====
+  global_TMP_hard_limit: 0.08 # 跨膜压差硬上限
+  global_TMP_soft_limit: 0.06 # 跨膜压差软上限
+
+  # ===== 物理反洗参数 =====
+  tau_bw_s: 20.0
+  gamma_t: 1.0
+  q_bw_m3ph: 700.0
+
+  # ===== CEB 化学反洗参数 =====
+  T_ceb_interval_h: 48.0
+  v_ceb_m3: 20.0
+  t_ceb_s: 1800.0
+
+  # ===== 膜组件参数 =====
+  A: 3200.0
+
+  # ===== 吨水电耗查找表 =====
+  energy_lookup:
+    2700: 0.1088
+    2760: 0.1083
+    2820: 0.1078
+    2880: 0.1074
+    2940: 0.1070
+    3000: 0.1066
+    3060: 0.1062
+    3120: 0.1059
+    3180: 0.1055
+    3240: 0.1052
+    3300: 0.1049
+    3360: 0.1045
+    3420: 0.1042
+    3480: 0.1039
+    3540: 0.1036
+    3600: 0.1034
+    3660: 0.1031
+    3720: 0.1029
+    3780: 0.1026
+    3840: 0.1023
+    3900: 0.1021
+    3960: 0.1019
+    4020: 0.1017
+    4080: 0.1015
+    4140: 0.1012
+    4200: 0.1011
+    4260: 0.1008
+    4320: 0.1007
+    4380: 0.1005
+    4440: 0.1003
+    4500: 0.1001
+    4560: 0.0999
+    4620: 0.0998
+    4680: 0.0996
+    4740: 0.0995
+    4800: 0.0993
+
+  p_feed_kw: 9.0
+  p_bw_kw: 30.0
+  dose_min: 0.05
+  dose_max: 0.15
+
+
+UFActionSpec:
+  # ===== 动作空间范围 =====
+  L_min_s: 3600.0
+  L_max_s: 4800.0
+  t_bw_min_s: 50.0
+  t_bw_max_s: 70.0
+
+  # ===== 动作离散化步长 =====
+  L_step_s: 60.0
+  t_bw_step_s: 5.0
+
+
+UFRewardParams:
+  # ===== TMP 安全与惩罚 =====
+  global_TMP_hard_limit: 0.08
+  global_TMP_soft_limit: 0.06
+  w_tmp_hard: 5.0
+  w_tmp: 1.5
+  p: 3.0
+  w_trend: 1.0
+
+  # ===== 经济成本 =====
+  k_cost: 3.0
+  chemical_price: 13.0
+  energy_price: 0.667
+  cost_low: 0.08
+  cost_high:  0.20
+  w_cost: 1.0
+
+  # ===== 残余污染 =====
+  k_res: 3.0
+  residual_ref_ratio: null
+  w_res: 1.0
+
+
+UFStateBounds:
+  # ===== 流量初始化约束 =====
+  q_UF_min: 150.0
+  q_UF_max: 200.0
+
+  # ===== 温度初始化约束 =====
+  temp_min: 9.0
+  temp_max: 25.0
+
+  # ===== TMP 初始化约束 =====
+  TMP0_min: 0.01
+  TMP0_max: 0.04
+  global_TMP_hard_limit: 0.08
+
+  # ===== 短期污染参数 =====
+  nuK_min: 100.0
+  nuK_max: 250.0
+
+  # ===== 长期污染参数 =====
+  slope_min: 1.28
+  slope_max: 150
+  power_min: 0.25
+  power_max: 2.5
+
+  # ===== CEB 去除能力 =====
+  ceb_removal_min: 150.0
+  ceb_removal_max: 350.0

+ 49 - 0
models/uf-rl/anzhen/uf_analyze_config.yaml

@@ -0,0 +1,49 @@
+UF:
+  units: [ "1", "2" ]
+  area_m2: 128 * 40
+
+  inlet_codes: [215.0, 301.0]
+  stable_codes: [220.0, 260.0]
+
+  physical_bw_code: [301.0, 340.0]
+  chemical_bw_code: [400.0, 660.0]
+
+  # 列名
+  column_formats:
+    flow_col: "AR.{unit}#UF_JSFLOW_O"
+    tmp_col: "AR.UF{unit}_SSD_KMYC"
+    press_col: "AR.{unit}#UF_JSPRESS_O"
+    ctrl_col: "AR.UF{unit}_STEP"
+    temp_col: "AR.ZJS_TEMP_O"
+    orp_col: "AR.RO_JSORP_O"
+    ph_col: "AR.RO_JSPH_O"
+
+
+Params:
+  # 稳定段提取
+  min_stable_points: 30
+  initial_points: 10
+
+  # 阻力趋势计算
+  segment_head_n: 5
+  segment_tail_n: 5
+
+  scale_factor: 1e10
+
+
+Plot:
+  figsize: [12, 6]
+  dpi: 120
+  color_inlet: "#1f77b4"
+  color_bw_phys: "#ff7f0e"
+  color_bw_chem: "#d62728"
+
+Paths:
+  project_root: "E:/Greentech" # 请根据项目根目录修改相应路径
+
+  raw_data_path: "models/uf-rl/datasets/UF_anzhen_data/raw"
+  output_path: "models/uf-rl/datasets/UF_anzhen_data/processed/segments"
+  filter_output_path: "models/uf-rl/datasets/UF_anzhen_data/processed/filter_segments"
+
+  output_format: "csv"
+

+ 1 - 1
models/uf-rl/data_to_rl/run_data_to_rl_pipeline.py

@@ -30,7 +30,7 @@ def main():
     # --------------------------------------------------------
     # --------------------------------------------------------
     # 配置文件路径(统一入口)
     # 配置文件路径(统一入口)
     # --------------------------------------------------------
     # --------------------------------------------------------
-    config_path = UF_RL_ROOT / "longting" / "data_to_rl_config.yaml"
+    config_path = UF_RL_ROOT / "lankao" / "data_to_rl_config.yaml"
 
 
     print(f" 配置文件: {config_path}")
     print(f" 配置文件: {config_path}")
     print("======================================================\n")
     print("======================================================\n")

+ 22 - 15
models/uf-rl/env/env_params.py

@@ -256,7 +256,7 @@ class UFPhysicsParams:
     # 膜有效面积(锡山水厂配置:128组膜,每组40m²)
     # 膜有效面积(锡山水厂配置:128组膜,每组40m²)
     A: float = 5120.0  # [m²]
     A: float = 5120.0  # [m²]
 
 
-    # 吨水电耗查找表
+    # 参考吨水电耗查找表
     energy_lookup: Dict[int, float] = field(default_factory=lambda: {
     energy_lookup: Dict[int, float] = field(default_factory=lambda: {
         2700: 0.1088, 2760: 0.1083, 2820: 0.1078, 2880: 0.1074,
         2700: 0.1088, 2760: 0.1083, 2820: 0.1078, 2880: 0.1074,
         2940: 0.1070, 3000: 0.1066, 3060: 0.1062, 3120: 0.1059,
         2940: 0.1070, 3000: 0.1066, 3060: 0.1062, 3120: 0.1059,
@@ -269,6 +269,16 @@ class UFPhysicsParams:
         4620: 0.0998, 4680: 0.0996, 4740: 0.0995, 4800: 0.0993,
         4620: 0.0998, 4680: 0.0996, 4740: 0.0995, 4800: 0.0993,
     })
     })
 
 
+    # 实际吨水电耗计算指标
+    p_feed_kw: float = 19.0
+    p_bw_kw: float = 15.0
+
+    # 实际吨水药耗计算指标
+    dose_min: float = 0.10
+    dose_max: float = 0.30
+    dose_area: float = 0.56
+
+
 @dataclass(frozen=True)
 @dataclass(frozen=True)
 class UFActionSpec:
 class UFActionSpec:
     """
     """
@@ -297,28 +307,25 @@ class UFRewardParams:
     # TMP 硬上限(MPa)
     # TMP 硬上限(MPa)
     # 说明:超过此值将导致 episode 失败,需立即停机
     # 说明:超过此值将导致 episode 失败,需立即停机
     global_TMP_soft_limit: float = 0.06     # TMP 软上限 (MPa)
     global_TMP_soft_limit: float = 0.06     # TMP 软上限 (MPa)
+
     w_tmp_hard: float = 5.0      # TMP超硬限固定惩罚
     w_tmp_hard: float = 5.0      # TMP超硬限固定惩罚
     w_tmp: float = 1.5            # TMP软限惩罚
     w_tmp: float = 1.5            # TMP软限惩罚
     p: float = 3                  # TMP软限非线性指数
     p: float = 3                  # TMP软限非线性指数
     w_trend: float = 1.0          # TMP趋势惩罚权重
     w_trend: float = 1.0          # TMP趋势惩罚权重
 
 
-    # 回收率
-    k_rec: float = 5.0
-    rec_low: float = 0.92
-    rec_high: float = 0.99
-    w_rec: float = 1.0             # 回收率权重
+    # 经济成本
+    k_cost: float = 3.0
+    chemical_price: float = 13.0
+    energy_price: float = 0.667
+    cost_low: float = 0.08
+    cost_high: float = 0.20
+    w_cost: float = 1.0
 
 
     # 残余污染
     # 残余污染
-    k_res: float = 5.0
+    k_res: float = 3.0
     residual_ref_ratio: float = None   # 动态=1/max_episode_steps
     residual_ref_ratio: float = None   # 动态=1/max_episode_steps
-    w_res: float = 2.0                 # 残余污染权重
-
-    # 吨水电耗
-    k_energy: float = 5.0
-    energy_low: float = 0.0993
-    energy_high: float = 0.1034
-    energy_ref: float = 0.1011
-    w_energy: float = 1.0              # 能耗权重
+    w_res: float = 1.0
+
 
 
 
 
 
 

+ 3 - 3
models/uf-rl/env/env_reset.py

@@ -166,9 +166,9 @@ class ResetSampler:
     def _get_sampling_config(self, progress: float) -> dict:
     def _get_sampling_config(self, progress: float) -> dict:
         progress = np.clip(progress, 0.0, 1.0)
         progress = np.clip(progress, 0.0, 1.0)
 
 
-        # -------------------------
-        # 阶段权重设计(非线性 + 提高虚拟工况)
-        # -------------------------
+        # # -------------------------
+        # # 阶段权重设计(非线性 + 提高虚拟工况)
+        # # -------------------------
         # w_real = (1.0 - progress) ** 1.2  # 历史工况逐渐衰减
         # w_real = (1.0 - progress) ** 1.2  # 历史工况逐渐衰减
         # w_perturb = 0.5 * progress  # 周边扰动按线性增加
         # w_perturb = 0.5 * progress  # 周边扰动按线性增加
         # w_virtual = 0.3 * progress ** 1.5  # 虚拟工况加快增长,后期最大约 0.3
         # w_virtual = 0.3 * progress ** 1.5  # 虚拟工况加快增长,后期最大约 0.3

+ 10 - 7
models/uf-rl/env/env_visual.py

@@ -73,20 +73,23 @@ class UFTrainingCallback(BaseCallback):
                 L_s = step_info["L_s"]
                 L_s = step_info["L_s"]
                 t_bw_s = step_info["t_bw_s"]
                 t_bw_s = step_info["t_bw_s"]
                 initial_tmp = step_info["initial_tmp"]
                 initial_tmp = step_info["initial_tmp"]
-                max_TMP_during_filtration = step_info["max_TMP_during_filtration"]
                 tmp_after_ceb = step_info["tmp_after_ceb"]
                 tmp_after_ceb = step_info["tmp_after_ceb"]
+                max_TMP_during_filtration = step_info["max_TMP_during_filtration"]
+                tmp_penalty = step_info["tmp_penalty"]
+
                 residual_ratio =step_info["residual_ratio"]
                 residual_ratio =step_info["residual_ratio"]
-                rec_reward = step_info["rec_reward"]
-                energy_reward = step_info["energy_reward"]
-                recovery = step_info["recovery"]
                 res_penalty = step_info["res_penalty"]
                 res_penalty = step_info["res_penalty"]
 
 
+                econ_reward = step_info["econ_reward"]
+                recovery = step_info["recovery"]
+
+
                 # 打印当前 step 的信息
                 # 打印当前 step 的信息
                 if self.verbose:
                 if self.verbose:
                     print(f"[Step {self.num_timesteps}] 动作={step_action}, 奖励={step_reward:.3f}, Done={step_done}, L_s={L_s}, t_bw_s={t_bw_s},"
                     print(f"[Step {self.num_timesteps}] 动作={step_action}, 奖励={step_reward:.3f}, Done={step_done}, L_s={L_s}, t_bw_s={t_bw_s},"
-                          f"residual_ratio = {residual_ratio:.4f},recovery = {recovery:.4f},"
-                          f"rec_reward = {rec_reward:.4f}, energy_reward = {energy_reward:.4f}, res_penalty = {res_penalty:.4f},"
-                          f"initial_tmp = {initial_tmp:.4f}, max_TMP_during_filtration ={max_TMP_during_filtration:.4f}, tmp_after_ceb = {tmp_after_ceb:.4f} ")
+                          f"residual_ratio = {residual_ratio:.4f}, res_penalty = {res_penalty:.4f},"
+                          f"recovery = {recovery:.4f},econ_reward  = {econ_reward :.4f},"
+                          f"initial_tmp = {initial_tmp:.4f}, tmp_after_ceb = {tmp_after_ceb:.4f}, max_TMP_during_filtration ={max_TMP_during_filtration:.4f}, tmp_penalty = {tmp_penalty:.4f}")
 
 
                 # 记录数据
                 # 记录数据
                 self.recorder.record_step(
                 self.recorder.record_step(

+ 87 - 84
models/uf-rl/env/uf_env.py

@@ -98,13 +98,13 @@ class UFSuperCycleEnv(gym.Env):
 
 
         self.L_values = np.arange(
         self.L_values = np.arange(
             self.action_spec.L_min_s,
             self.action_spec.L_min_s,
-            self.action_spec.L_max_s + self.action_spec.L_step_s,
+            self.action_spec.L_max_s ,
             self.action_spec.L_step_s,
             self.action_spec.L_step_s,
         )
         )
 
 
         self.t_bw_values = np.arange(
         self.t_bw_values = np.arange(
             self.action_spec.t_bw_min_s,
             self.action_spec.t_bw_min_s,
-            self.action_spec.t_bw_max_s + self.action_spec.t_bw_step_s,
+            self.action_spec.t_bw_max_s,
             self.action_spec.t_bw_step_s,
             self.action_spec.t_bw_step_s,
         )
         )
 
 
@@ -195,7 +195,7 @@ class UFSuperCycleEnv(gym.Env):
         """
         """
         return min(1.0, self.current_step / self.max_episode_steps )
         return min(1.0, self.current_step / self.max_episode_steps )
 
 
-    def reset(self, seed=None, options=None, max_attempts: int = 1000):
+    def reset(self, seed=None, options=None, max_attempts: int = 10000):
         super().reset(seed=seed)
         super().reset(seed=seed)
 
 
         progress = self._get_training_progress()
         progress = self._get_training_progress()
@@ -309,11 +309,9 @@ class UFSuperCycleEnv(gym.Env):
         if info["max_TMP_during_filtration"] > self.reward_params.global_TMP_soft_limit:
         if info["max_TMP_during_filtration"] > self.reward_params.global_TMP_soft_limit:
             info_next, _ = self.physics.simulate_one_supercycle(state=next_state,L_s=L_s,t_bw_s=t_bw_s)
             info_next, _ = self.physics.simulate_one_supercycle(state=next_state,L_s=L_s,t_bw_s=t_bw_s)
 
 
-
-        reward, tmp_penalty, rec_reward, energy_reward, res_penalty = self._calculate_reward(info, info_next)
+        reward,tmp_penalty,econ_reward,res_penalty= self._calculate_reward(info, info_next)
         info["tmp_penalty"] = tmp_penalty
         info["tmp_penalty"] = tmp_penalty
-        info["rec_reward"] = rec_reward
-        info["energy_reward"] = energy_reward
+        info["econ_reward"] = econ_reward
         info["res_penalty"] = res_penalty
         info["res_penalty"] = res_penalty
 
 
         self.state = next_state
         self.state = next_state
@@ -352,119 +350,124 @@ class UFSuperCycleEnv(gym.Env):
 
 
         return next_obs, reward, terminated, truncated, info
         return next_obs, reward, terminated, truncated, info
 
 
-    def _calculate_reward(self, info: dict, info_next) -> float:
+    def _calculate_reward(self, info: dict, info_next=None):
         """
         """
-        计算强化学习奖励函数(扩展版
+        计算强化学习奖励函数(经济性 + 系统稳定性
 
 
-        功能:
-        - 平衡回收率、残余污染和吨水电耗三个目标
-        - TMP不直接参与奖励计算(通过失败判定间接影响)
-        - 使用 tanh 函数实现平滑的非线性奖励
+        奖励结构:
+            Reward = 经济奖励 + 污染控制奖励 + TMP风险惩罚
 
 
-        参数
-            info (dict): 周期性能指标字典,需包含
-                - recovery: 回收率 [0-1]
-                - R_after_ceb: 本周期结束膜阻力
-                - initial_R: 本周期初始膜阻力
-                - delta_R_allow: 本周期允许最大阻力上升
-                - ton_water_energy_kWh_per_m3: 本周期吨水电耗
+        经济奖励:
+            基于吨水电耗 + 吨水药耗
+
+        稳定性奖励:
+            - 残余污染控制
+            - TMP软限制
+            - TMP增长趋势
 
 
         返回:
         返回:
-            float: 奖励值(通常在 -3 到 +3 之间)
-
-        设计思想:
-        - 高回收率 → 水资源利用率高 → 正奖励
-        - 低残余污染 → 膜长期稳定运行 → 正奖励
-        - 低吨水电耗 → 节能 → 正奖励
-        - 三者需要权衡:过短的过滤时间提高回收率但污染去除不彻底;过长时间污染控制好但回收率下降,过高功率增加耗能
-
-        参考点设计:
-        - 残余污染:
-            - 高污染参考点 = 1 / self.max_episode_steps
-            - 平衡点 = 0.5 / self.max_episode_steps
-        - 吨水电耗:
-            - 高点 = 0.1034 kWh/m³
-            - 平衡点 = 0.1011 kWh/m³
-            - 低点 = 0.0993 kWh/m³
-        - 回收率参考点保持原有设计
+            total_reward,
+            tmp_penalty,
+            econ_reward,
+            res_penalty
         """
         """
 
 
-        #新增:将TMP超限改为持续惩罚
-        # ========== TMP 超标风险惩罚项 ==========
+        # ==============================
+        # TMP 状态惩罚
+        # ==============================
+
         tmp = info["max_TMP_during_filtration"]
         tmp = info["max_TMP_during_filtration"]
         tmp_soft = self.reward_params.global_TMP_soft_limit
         tmp_soft = self.reward_params.global_TMP_soft_limit
         tmp_hard = self.reward_params.global_TMP_hard_limit
         tmp_hard = self.reward_params.global_TMP_hard_limit
 
 
         if self.tmp_over_limit_flag:
         if self.tmp_over_limit_flag:
             tmp_state_penalty = -self.reward_params.w_tmp_hard
             tmp_state_penalty = -self.reward_params.w_tmp_hard
+
         elif tmp <= tmp_soft:
         elif tmp <= tmp_soft:
             tmp_state_penalty = 0.0
             tmp_state_penalty = 0.0
+
         elif tmp < tmp_hard:
         elif tmp < tmp_hard:
             x = (tmp - tmp_soft) / (tmp_hard - tmp_soft)
             x = (tmp - tmp_soft) / (tmp_hard - tmp_soft)
-            tmp_state_penalty = -self.reward_params.w_tmp * x ** self.reward_params.p
 
 
-        # -------- TMP 趋势惩罚 --------
+            tmp_state_penalty = -self.reward_params.w_tmp * (
+                    x ** self.reward_params.p
+            )
+
+        else:
+            tmp_state_penalty = -self.reward_params.w_tmp_hard
+
+        # ==============================
+        # TMP 趋势惩罚
+        # ==============================
+
         tmp_trend_penalty = 0.0
         tmp_trend_penalty = 0.0
+
         if info_next is not None:
         if info_next is not None:
-            delta_tmp = info_next["max_TMP_during_filtration"] - tmp
+            delta_tmp = (
+                    info_next["max_TMP_during_filtration"] - tmp
+            )
+
+            # 只惩罚TMP上升
+            delta_tmp = max(delta_tmp, 0)
+
             tmp_trend_penalty = -self.reward_params.w_trend * delta_tmp
             tmp_trend_penalty = -self.reward_params.w_trend * delta_tmp
 
 
         tmp_penalty = tmp_state_penalty + tmp_trend_penalty
         tmp_penalty = tmp_state_penalty + tmp_trend_penalty
 
 
-        # ========== 提取性能指标 ==========
-        recovery = info["recovery"]  # 回收率 [0-1]
+        # ==============================
+        # 残余污染惩罚
+        # ==============================
 
 
-        # 污染比例:实际上升的阻力 / 允许上升的阻力
-        # 允许上升的阻力值 = 当前阻力值软上限 - 当前阻力
-        residual_ratio = info['residual_ratio']
+        residual_ratio = info["residual_ratio"]
 
 
-        # 吨水电耗指标
-        energy = info["ton_water_energy_kWh_per_m3"]
+        ref_residual = 1 / self.max_episode_steps
 
 
-        # ========== 回收率奖励项 ==========
-        # 将回收率归一化到 [0, 1] 区间(基于预期范围)
-        rec_norm = (recovery - self.reward_params.rec_low) / (self.reward_params.rec_high - self.reward_params.rec_low)
+        res_penalty = -np.tanh(
+            self.reward_params.k_res *
+            (residual_ratio / ref_residual - 1)
+        )
 
 
-        # 使用 tanh 函数构建平滑的 S 型奖励曲线
-        # - rec_norm = 0.5 时(回收率处于中间值),rec_reward = 0
-        # - rec_norm > 0.5 时,rec_reward > 0(鼓励高回收率)
-        # - rec_norm < 0.5 时,rec_reward < 0(惩罚低回收率)
-        # - k_rec 控制曲线陡峭程度,越大变化越陡
-        rec_reward = np.clip(np.tanh(self.reward_params.k_rec * (rec_norm - 0.5)), -1, 1)
+        # ==============================
+        # 经济成本(电耗 + 药耗)
+        # ==============================
 
 
-        # ========== 残余污染惩罚项 ==========
-        # 新参考点:每步允许上升比例 = 1 / max_episode_steps
-        # 平衡点 = 0.8 / max_episode_steps
-        ref_residual = 0.8 / self.max_episode_steps
+        energy = info["ton_water_energy"]
+        chemical = info["ton_water_chem"]
 
 
-        # 使用 tanh 构建惩罚曲线
-        # - residual_ratio < 平衡点时,res_penalty > 0(奖励低污染)
-        # - residual_ratio > 平衡点时,res_penalty < 0(惩罚高污染)
-        # - k_res 控制曲线陡峭程度
-        res_penalty = -np.tanh(self.reward_params.k_res * (residual_ratio / ref_residual - 1))
+        chemical_price = self.reward_params.chemical_price
+        energy_price =self.reward_params.energy_price
 
 
-        # ========== 吨水电耗奖励项 ==========
-        # 设置高/平衡/低点
-        energy_low = 0.0993
-        energy_high = 0.1034
+        cost = energy * energy_price + chemical * chemical_price * 100
 
 
-        # 将能耗归一化到 [0, 1],平衡点对应 energy_norm = 0.5
-        energy_norm = (energy - energy_low) / (energy_high - energy_low)
+        # 成本归一化范围
+        cost_low = self.reward_params.cost_low
+        cost_high = self.reward_params.cost_high
 
 
-        # 使用 tanh 构建平滑奖励
-        # - energy_norm < 0.5 时,energy_reward > 0(节能奖励)
-        # - energy_norm > 0.5 时,energy_reward < 0(高能耗惩罚)
-        # - k_energy 控制曲线陡峭程度
-        energy_reward = -np.tanh(self.reward_params.k_energy * (energy_norm - 0.5))
+        cost_norm = (
+                (cost - cost_low) /
+                (cost_high - cost_low)
+        )
 
 
-        # ========== 组合奖励 ==========
-        # 简单线性组合三项(为污染项加权)
-        total_reward = rec_reward + 2.0 * res_penalty + energy_reward + tmp_penalty
+        econ_reward = -np.tanh(
+            self.reward_params.k_cost *
+            (cost_norm - 0.5)
+        )
 
 
+        # ==============================
+        # 总奖励
+        # ==============================
 
 
-        # 可选:添加平移项使特定点的奖励为零(当前未使用)
-        # total_reward -= offset
+        total_reward = (
+                econ_reward
+                + res_penalty
+                + tmp_penalty
+        )
 
 
-        return total_reward, tmp_penalty, rec_reward, energy_reward, res_penalty
+        return (
+            total_reward,
+            tmp_penalty,
+            econ_reward,
+            res_penalty
+        )
 
 
 
 

+ 46 - 10
models/uf-rl/env/uf_physics.py

@@ -22,7 +22,7 @@ uf_physics_48h.py
 
 
 import numpy as np
 import numpy as np
 import copy
 import copy
-from env.env_params import UFState, UFPhysicsParams
+from env.env_params import UFState, UFPhysicsParams, UFStateBounds
 
 
 
 
 class UFPhysicsModel:
 class UFPhysicsModel:
@@ -38,6 +38,7 @@ class UFPhysicsModel:
     def __init__(
     def __init__(
             self,
             self,
             phys_params: UFPhysicsParams,
             phys_params: UFPhysicsParams,
+            state_bounds: UFStateBounds,
             resistance_model_fp=None,
             resistance_model_fp=None,
             resistance_model_bw=None,
             resistance_model_bw=None,
             IS_TIMES: bool = False,
             IS_TIMES: bool = False,
@@ -50,6 +51,7 @@ class UFPhysicsModel:
             IS_TIMES: CEB是否为固定次数,T为固定次数
             IS_TIMES: CEB是否为固定次数,T为固定次数
         """
         """
         self.p = phys_params
         self.p = phys_params
+        self.state_bounds = state_bounds
         self.model_fp = resistance_model_fp
         self.model_fp = resistance_model_fp
         self.model_bw = resistance_model_bw
         self.model_bw = resistance_model_bw
         self.IS_TIMES = IS_TIMES
         self.IS_TIMES = IS_TIMES
@@ -350,21 +352,53 @@ class UFPhysicsModel:
         # 日均产水时间(24小时内实际产水的时间)
         # 日均产水时间(24小时内实际产水的时间)
         daily_prod_time_h = k_bw_per_ceb * L_h / T_super_h * 24.0  # [小时]
         daily_prod_time_h = k_bw_per_ceb * L_h / T_super_h * 24.0  # [小时]
 
 
-        # 吨水电耗(从查找表获取最接近的值)
+        # 参考吨水电耗(从查找表获取最接近的值)
         # 从物理参数类中获取查找表
         # 从物理参数类中获取查找表
         closest_L = min(self.p.energy_lookup.keys(), key=lambda x: abs(x - L_s))
         closest_L = min(self.p.energy_lookup.keys(), key=lambda x: abs(x - L_s))
-        ton_water_energy = self.p.energy_lookup[closest_L]  # [kWh/m³]
+        refer_ton_water_energy = self.p.energy_lookup[closest_L]  # [kWh/m³]
+
+        # 实际吨水电耗计算
+        # 进水时间(小时)
+        t_feed_total_h = k_bw_per_ceb * L_h
+        # 反洗时间(小时)
+        t_bw_total_h = k_bw_per_ceb * t_bw_s / 3600.0
+
+        # 总能耗 (kWh)
+        E_total = t_feed_total_h * self.p.p_feed_kw+ t_bw_total_h * self.p.p_bw_kw
+
+        # 吨水电耗 (kWh/吨)
+        ton_water_energy = E_total / max(V_net, 1e-12)
+
+        # 吨水药耗计算
+        # ========== 吨水药耗计算 ==========
+        R_removed = state.ceb_removal
+
+        # 参数
+        R_min = self.state_bounds.ceb_removal_min
+        R_max = self.state_bounds.ceb_removal_max
+
+        dose_min = self.p.dose_min
+        dose_max = self.p.dose_max
+
+        # 防止越界
+        R_removed_clip = np.clip(R_removed, R_min, R_max)
+
+        # 线性映射计算加药量
+        dose = dose_min + (R_removed_clip - R_min) / (R_max - R_min) * (dose_max - dose_min)
+
+        # 吨水药耗
+        ton_water_chem = dose / max(V_net, 1e-12)
 
 
         # ===== 新指标:膜阻力允许上升空间 =====
         # ===== 新指标:膜阻力允许上升空间 =====
         # 该指标根据当前最大跨膜压差距离软约束跨膜压差的距离,动态计算当前周期允许上升的膜阻力值,用于后续清洗效果奖励计算
         # 该指标根据当前最大跨膜压差距离软约束跨膜压差的距离,动态计算当前周期允许上升的膜阻力值,用于后续清洗效果奖励计算
-
+        delta_R = R_after_ceb - initial_R
         delta_R_allow = max(
         delta_R_allow = max(
-            self.resistance_from_tmp(self.p.global_TMP_soft_limit, state.q_UF, state.temp) -
+            self.resistance_from_tmp(self.p.global_TMP_hard_limit, state.q_UF, state.temp) -
             self.resistance_from_tmp(max_tmp_during_filtration, state.q_UF, state.temp),
             self.resistance_from_tmp(max_tmp_during_filtration, state.q_UF, state.temp),
             1e-6
             1e-6
         )
         )
         if delta_R_allow > 50:
         if delta_R_allow > 50:
-            residual_ratio = (R_after_ceb - initial_R) / delta_R_allow
+            residual_ratio = delta_R / delta_R_allow
         else:
         else:
             residual_ratio = 1.0
             residual_ratio = 1.0
 
 
@@ -399,7 +433,9 @@ class UFPhysicsModel:
             "residual_ratio" : residual_ratio, # 污染上升比例
             "residual_ratio" : residual_ratio, # 污染上升比例
 
 
             # 能耗指标
             # 能耗指标
-            "ton_water_energy_kWh_per_m3": ton_water_energy,  # 吨水电耗
+            "refer_ton_water_energy": refer_ton_water_energy,  # 参考吨水电耗
+            "ton_water_energy": ton_water_energy,  # 吨水电耗
+            "ton_water_chem": ton_water_chem,  # 吨水药耗
         }
         }
 
 
         # 更新 state
         # 更新 state
@@ -442,8 +478,8 @@ class UFPhysicsModel:
                - 原因:回收率太低说明反洗水耗过大,经济性差
                - 原因:回收率太低说明反洗水耗过大,经济性差
                - 阈值:75%(可调整)
                - 阈值:75%(可调整)
 
 
-            3. 残余污染累积过快:(R_after_ceb - R0) / R0 > 0.1
-               - 原因:单个超级周期污染增长超过10%,长期运行不可持续
+            3. 残余污染累积过快:(R_after_ceb - R0) / R0 > 0.05
+               - 原因:单个超级周期污染增长超过5%,长期运行不可持续
                - 阈值:10%(可调整)
                - 阈值:10%(可调整)
             """
             """
         # ========== 获取关键指标 ==========
         # ========== 获取关键指标 ==========
@@ -465,7 +501,7 @@ class UFPhysicsModel:
 
 
         # 条件3:污染增长比例超过容许范围
         # 条件3:污染增长比例超过容许范围
         residual_increase = (R_after_ceb - R0) / delta_R_allow
         residual_increase = (R_after_ceb - R0) / delta_R_allow
-        if residual_increase > 1 / 30:
+        if residual_increase > 1 / 20:
             return False  # 失败
             return False  # 失败
 
 
         # 所有条件通过
         # 所有条件通过

BIN
models/uf-rl/lankao/48h_dqn_model.zip


+ 221 - 0
models/uf-rl/lankao/data_to_rl_config.yaml

@@ -0,0 +1,221 @@
+# ============================================================
+# 项目级路径配置
+# ============================================================
+
+Paths:
+  project_root: "E:/Greentech"
+  __comment__: >
+    项目根目录,所有路径均相对于该目录展开。
+    不同工程师只需修改这一项即可迁移环境。
+
+  raw_data:
+    filtered_cycles_dir: "models/uf-rl/datasets/UF_lankao_data/processed/filter_segments"
+    cycle_file_pattern: "UF{unit}_filtered_cycles.csv"
+
+    enabled_units: [1, 2]
+    disabled_units: None
+
+    __comment__: >
+      已完成清洗与周期切分的真实工厂数据。
+      每个 CSV 对应一个机组,文件内每一行是一个【物理周期】。
+      data_to_rl 阶段会:
+        - 读取 enabled_units 中所有机组
+        - 合并为统一数据集
+        - 机组编号仅用于筛选,不进入状态空间
+
+  data_to_rl:
+    output_dir: "models/uf-rl/datasets/UF_lankao_data/rl_ready/output"
+    cache_dir: "models/uf-rl/datasets/UF_lankao_data/rl_ready/cache"
+
+    __comment__: >
+      data_to_rl 模块输出内容:
+        - 全机组合并后的状态空间范围
+        - reset 初始状态分布
+        - reward 统计量
+
+
+# ============================================================
+# 数据层级定义(非常重要)
+# ============================================================
+
+DataHierarchy:
+
+  physical_cycle:
+    id_column: "seg_id"
+    time_columns:
+      start: "start_time"
+      end: "end_time"
+    __comment__: >
+      物理周期是最细粒度的数据层级:
+      - 用于物理模型验证
+      - 用于环境内部子步模拟
+      - 不直接暴露给强化学习算法
+
+  chemical_cycle:
+    id_column: "chem_cycle_id"
+    validity_column: "chem_cycle_valid"
+    step_definition: "one_rl_step_per_chemical_cycle"
+    __comment__: >
+      化学周期是强化学习的【唯一 step 单位】。
+      强化学习中:
+        - 一个 step = 一个化学周期
+        - 一个状态 = 一个化学周期的整体状态
+
+# ============================================================
+# 强化学习状态定义(工程师重点关注)
+# ============================================================
+
+RLState:
+  level: chemical_cycle
+  dimension: 8
+  __comment__: >
+    强化学习 observation 的定义。
+    所有变量在一个 step(化学周期)内保持不变。
+
+  variables:
+
+    q_UF:
+      source: "flow_mean"
+      aggregation: "mean_within_cycle"
+      unit: "m3_per_h"
+      description: "化学周期代表性进水流量"
+      __comment__: >
+        原始数据在物理周期层级变化,
+        这里使用化学周期内的平均值作为状态。
+
+    temp:
+      source: "temp_mean"
+      aggregation: "mean_within_cycle"
+      unit: "celsius"
+      description: "化学周期代表性水温"
+
+    TMP:
+      source: "tmp_start"
+      selection: "first_physical_cycle"
+      unit: "MPa"
+      description: "化学周期起始跨膜压差"
+      __comment__: >
+        虽然 TMP 在物理周期内会变化,
+        但强化学习只关心化学周期开始时的状态。
+
+    R:
+      source: "R_scaled_start"
+      selection: "first_physical_cycle"
+      unit: "scaled_resistance"
+      description: "化学周期起始膜阻力"
+
+    nuK:
+      source: "cycle_nuK"
+      unit: "scaled"
+      description: "短期污染增长系数"
+      __comment__: >
+        在同一化学周期内视为常数,
+        后续 step (下一化学周期)中根据动作进行经验性更新。
+
+    slope:
+      source: "cycle_long_a"
+      unit: "scaled"
+      description: "长期不可逆污染幂律系数 a"
+
+    power:
+      source: "cycle_long_b"
+      unit: "dimensionless"
+      description: "长期不可逆污染幂律指数 b"
+
+    ceb_removal:
+      source: "cycle_R_removed"
+      unit: "scaled"
+      description: "CEB 可去除的膜阻力"
+
+# ============================================================
+# 状态空间范围提取(reset 使用)
+# ============================================================
+
+StateSpaceExtraction:
+  enabled: true
+  level: chemical_cycle
+  include_only_valid_cycles: true
+
+  __comment__: >
+    用真实工厂数据统计【合理状态空间范围】,
+    用于:
+      - reset 初始状态采样
+      - 状态边界 sanity check
+
+  statistics:
+    method: "empirical"
+    percentiles: [1, 5, 50, 95, 99]
+    __comment__: >
+      使用经验分布统计,避免假设正态分布。
+
+  output:
+    save_csv: true
+    filename: "state_space_bounds.csv"
+
+# ============================================================
+# 物理周期层级的用途声明
+# ============================================================
+
+PhysicalCycleUsage:
+  enabled: true
+
+  purposes:
+    - internal_simulation
+    - parameter_sanity_check
+    - post_training_validation
+
+  variables_available:
+    - R_scaled_start
+    - R_scaled_end
+    - tmp_start
+    - tmp_end
+    - flow_mean
+    - flow_std
+    - temp_mean
+    - temp_std
+
+  __comment__: >
+    物理周期数据:
+      ✔ 可用于环境内部计算
+      ✔ 可用于训练后验证
+      ✘ 不可作为 RL observation
+
+# ============================================================
+# 真实数据在强化学习中的角色(工程边界声明)
+# ============================================================
+
+RealDataUsagePolicy:
+
+  reset_initialization:
+    enabled: true
+    source: "chemical_cycle_state_distribution"
+    __comment__: >
+      reset 时从真实化学周期状态分布中采样,
+      确保训练从“现实可达状态”开始。
+
+  training:
+    use_real_transitions: false
+    __comment__: >
+      真实数据不用于:
+        - 动力学拟合
+        - 监督策略
+      强化学习仍在模拟环境中进行。
+
+  validation:
+    enabled: true
+    compare_with_real_cycles: true
+    __comment__: >
+      训练完成后:
+        - 将策略运行结果
+        - 与真实化学周期统计特性进行对比
+      用于评估策略的现实可行性与安全性。
+
+# ============================================================
+# 方法论与工程风险声明
+# ============================================================
+
+MethodologyNotes:
+  - "强化学习的 step 定义严格等同于一个化学周期"
+  - "所有 RL 状态变量在一个 step 内视为常数"
+  - "物理周期层级仅用于环境内部模拟"
+  - "真实数据用于分布约束与验证,而非动力学建模"

+ 26 - 0
models/uf-rl/lankao/dqn_config.yaml

@@ -0,0 +1,26 @@
+# ============================================================
+# DQN 超参数配置
+# ============================================================
+# 神经网络参数
+learning_rate: 0.0001           # 学习率 1e-4
+
+# 经验回放参数
+buffer_size: 100000              # 经验回放缓冲区大小
+learning_starts: 10000           # 开始训练前收集的步数
+batch_size: 32                   # 训练批次大小
+
+# 强化学习参数
+gamma: 0.95                      # 折扣因子
+train_freq: 4                    # 训练频率(步数)
+
+# 目标网络参数
+target_update_interval: 1        # 目标网络更新间隔
+tau: 0.005                       # 软更新系数
+
+# 探索策略参数
+exploration_initial_eps: 1.0     # 初始探索率
+exploration_fraction: 0.3        # 探索率衰减比例
+exploration_final_eps: 0.02      # 最终探索率
+
+# 日志参数
+remark: "uf_dqn_real_reset"      # 实验备注

+ 139 - 0
models/uf-rl/lankao/env_config.yaml

@@ -0,0 +1,139 @@
+UFState:
+  # ===== 膜动态运行参数 =====
+  q_UF: 150.0
+  TMP: 0.02
+  temp: 20.0
+  R: 200.0
+
+  # ===== 膜阻力模型参数 =====
+  nuK: 170.0
+  slope: 2.0
+  power: 1.1
+  ceb_removal: 200.0
+
+
+UFPhysicsParams:
+  # ===== TMP 全局约束 =====
+  global_TMP_hard_limit: 0.08 # 跨膜压差硬上限
+  global_TMP_soft_limit: 0.06 # 跨膜压差软上限
+
+  # ===== 物理反洗参数 =====
+  tau_bw_s: 20.0
+  gamma_t: 1.0
+  q_bw_m3ph: 500.0
+
+  # ===== CEB 化学反洗参数 =====
+  T_ceb_interval_h: 48.0
+  v_ceb_m3: 20.0
+  t_ceb_s: 1800.0
+
+  # ===== 膜组件参数 =====
+  A: 3200.0
+
+  # ===== 吨水电耗查找表 =====
+  energy_lookup:
+    2700: 0.1088
+    2760: 0.1083
+    2820: 0.1078
+    2880: 0.1074
+    2940: 0.1070
+    3000: 0.1066
+    3060: 0.1062
+    3120: 0.1059
+    3180: 0.1055
+    3240: 0.1052
+    3300: 0.1049
+    3360: 0.1045
+    3420: 0.1042
+    3480: 0.1039
+    3540: 0.1036
+    3600: 0.1034
+    3660: 0.1031
+    3720: 0.1029
+    3780: 0.1026
+    3840: 0.1023
+    3900: 0.1021
+    3960: 0.1019
+    4020: 0.1017
+    4080: 0.1015
+    4140: 0.1012
+    4200: 0.1011
+    4260: 0.1008
+    4320: 0.1007
+    4380: 0.1005
+    4440: 0.1003
+    4500: 0.1001
+    4560: 0.0999
+    4620: 0.0998
+    4680: 0.0996
+    4740: 0.0995
+    4800: 0.0993
+
+  p_feed_kw: 15.0
+  p_bw_kw: 25.0
+  dose_min: 0.10
+  dose_max: 0.20
+
+
+UFActionSpec:
+  # ===== 动作空间范围 =====
+  L_min_s: 2400.0
+  L_max_s: 3800.0
+  t_bw_min_s: 40.0
+  t_bw_max_s: 60.0
+
+  # ===== 动作离散化步长 =====
+  L_step_s: 60.0
+  t_bw_step_s: 5.0
+
+
+UFRewardParams:
+  # ===== TMP 安全与惩罚 =====
+  global_TMP_hard_limit: 0.08
+  global_TMP_soft_limit: 0.06
+  w_tmp_hard: 5.0
+  w_tmp: 1.5
+  p: 3.0
+  w_trend: 1.0
+
+  # ===== 经济成本 =====
+  k_cost: 3.0
+  chemical_price: 13.0
+  energy_price: 0.667
+  cost_low: 0.08
+  cost_high:  0.20
+  w_cost: 1.0
+
+  # ===== 残余污染 =====
+  k_res: 3.0
+  residual_ref_ratio: null
+  w_res: 1.0
+
+
+UFStateBounds:
+  # ===== 流量初始化约束 =====
+  q_UF_min: 140.0
+  q_UF_max: 210.0
+
+  # ===== 温度初始化约束 =====
+  temp_min: 15.0
+  temp_max: 25.0
+
+  # ===== TMP 初始化约束 =====
+  TMP0_min: 0.01
+  TMP0_max: 0.04
+  global_TMP_hard_limit: 0.08
+
+  # ===== 短期污染参数 =====
+  nuK_min: 100.0
+  nuK_max: 250.0
+
+  # ===== 长期污染参数 =====
+  slope_min: 1.28
+  slope_max: 150
+  power_min: 0.25
+  power_max: 2.5
+
+  # ===== CEB 去除能力 =====
+  ceb_removal_min: 150.0
+  ceb_removal_max: 350.0

+ 49 - 0
models/uf-rl/lankao/uf_analyze_config.yaml

@@ -0,0 +1,49 @@
+UF:
+  units: [ "1", "2" ]
+  area_m2: 3200
+
+  inlet_codes: [220.0, 301.0]
+  stable_codes: [220.0, 260.0]
+
+  physical_bw_code: [301.0, 340.0]
+  chemical_bw_code: [400.0, 660.0]
+
+  # 列名
+  column_formats:
+    flow_col: "ns=3;s={unit}#UF_JSFLOW_O"
+    tmp_col: "ns=3;s=UF{unit}_SSD_KMYC"
+    press_col: "ns=3;s={unit}#UF_JSPRESS_O"
+    ctrl_col: "ns=3;s=UF{unit}_STEP"
+    temp_col: "ns=3;s=ZJS_TEMP_O"
+    orp_col: "ns=3;s=RO_JSORP_O"
+    ph_col: "ns=3;s=RO_JSPH_O"
+
+
+Params:
+  # 稳定段提取
+  min_stable_points: 30
+  initial_points: 10
+
+  # 阻力趋势计算
+  segment_head_n: 5
+  segment_tail_n: 5
+
+  scale_factor: 1e10
+
+
+Plot:
+  figsize: [12, 6]
+  dpi: 120
+  color_inlet: "#1f77b4"
+  color_bw_phys: "#ff7f0e"
+  color_bw_chem: "#d62728"
+
+Paths:
+  project_root: "E:/Greentech" # 请根据项目根目录修改相应路径
+
+  raw_data_path: "models/uf-rl/datasets/UF_lankao_data/raw"
+  output_path: "models/uf-rl/datasets/UF_lankao_data/processed/segments"
+  filter_output_path: "models/uf-rl/datasets/UF_lankao_data/processed/filter_segments"
+
+  output_format: "csv"
+

+ 26 - 23
models/uf-rl/longting/env_config.yaml

@@ -1,15 +1,15 @@
 UFState:
 UFState:
   # ===== 膜动态运行参数 =====
   # ===== 膜动态运行参数 =====
-  q_UF: 150.0
-  TMP: 0.02
-  temp: 20.0
-  R: 200.0
+  q_UF: 150.0 # 流量
+  TMP: 0.02 # CEB周期初始跨膜压差
+  temp: 20.0 # 温度
+  R: 200.0 # 膜阻力(占位,实际上应用时根据前三个变量计算)
 
 
   # ===== 膜阻力模型参数 =====
   # ===== 膜阻力模型参数 =====
-  nuK: 170.0
-  slope: 2.0
-  power: 1.1
-  ceb_removal: 200.0
+  nuK: 170.0 # 短期膜阻力上升速率
+  slope: 2.0 # 长期膜阻力上升幂律模型斜率
+  power: 1.1 # 长期膜阻力上升幂律模型次数
+  ceb_removal: 200.0 # 本次CEB去除膜阻力值
 
 
 
 
 UFPhysicsParams:
 UFPhysicsParams:
@@ -20,10 +20,11 @@ UFPhysicsParams:
   # ===== 物理反洗参数 =====
   # ===== 物理反洗参数 =====
   tau_bw_s: 20.0
   tau_bw_s: 20.0
   gamma_t: 1.0
   gamma_t: 1.0
-  q_bw_m3ph: 1000.0
+  q_bw_m3ph: 500.0
 
 
   # ===== CEB 化学反洗参数 =====
   # ===== CEB 化学反洗参数 =====
   T_ceb_interval_h: 48.0
   T_ceb_interval_h: 48.0
+  T_ceb_interval_times: 48
   v_ceb_m3: 20.0
   v_ceb_m3: 20.0
   t_ceb_s: 2400.0   # 40 * 60
   t_ceb_s: 2400.0   # 40 * 60
 
 
@@ -69,6 +70,12 @@ UFPhysicsParams:
     4740: 0.0995
     4740: 0.0995
     4800: 0.0993
     4800: 0.0993
 
 
+  p_feed_kw:  18.0
+  p_bw_kw:  20.0
+  dose_min:  0.10
+  dose_max:  0.20
+
+
 
 
 UFActionSpec:
 UFActionSpec:
   # ===== 动作空间范围 =====
   # ===== 动作空间范围 =====
@@ -91,23 +98,19 @@ UFRewardParams:
   p: 3.0
   p: 3.0
   w_trend: 1.0
   w_trend: 1.0
 
 
-  # ===== 回收率 =====
-  k_rec: 5.0
-  rec_low: 0.85
-  rec_high: 0.95
-  w_rec: 2.0
+  # ===== 经济成本 =====
+  k_cost: 3.0
+  chemical_price: 13.0
+  energy_price: 0.667
+  cost_low: 0.06
+  cost_high:  0.10
+  w_cost: 1.0
 
 
   # ===== 残余污染 =====
   # ===== 残余污染 =====
-  k_res: 10.0
+  k_res: 3.0
   residual_ref_ratio: null
   residual_ref_ratio: null
-  w_res: 2.0
-
-  # ===== 吨水电耗 =====
-  k_energy: 5.0
-  energy_low: 0.1023
-  energy_high: 0.1066
-  energy_ref: 0.1044
-  w_energy: 1.0
+  w_res: 1.0
+
 
 
 
 
 UFStateBounds:
 UFStateBounds:

+ 8 - 2
models/uf-rl/longting/uf_analyze_config.yaml

@@ -2,8 +2,8 @@ UF:
   units: [ "1", "2" ]
   units: [ "1", "2" ]
   area_m2: 128 * 40
   area_m2: 128 * 40
 
 
-  inlet_codes: [215.0, 301.0]
-  stable_codes: [220.0, 260.0]
+  inlet_codes: [221.0, 300.0]
+  stable_codes: [221.0, 260.0]
 
 
   physical_bw_code: [301.0, 340.0]
   physical_bw_code: [301.0, 340.0]
   chemical_bw_code: [400.0, 660.0]
   chemical_bw_code: [400.0, 660.0]
@@ -17,6 +17,12 @@ UF:
     temp_col: "ns=3;s=ZJS_TEMP_O"
     temp_col: "ns=3;s=ZJS_TEMP_O"
     orp_col: "ns=3;s=RO_JSORP_O"
     orp_col: "ns=3;s=RO_JSORP_O"
     ph_col: "ns=3;s=RO_JSPH_O"
     ph_col: "ns=3;s=RO_JSPH_O"
+    event_col: "event_type"
+    BWB_POWER_col: "ns=3;s=ZZ_{unit}#UFBWB_POWER"
+    GSB_POWER_col: "ns=3;s=ZZ_UFGSB_POWER"
+    NaClO_col: "ns=3;s=CN_LEVEL_O"
+    HCL_col: "ns=3;s=S_LEVEL_O"
+    NaOH_col: "ns=3;s=J_LEVEL_O"
 
 
 
 
 Params:
 Params:

+ 7 - 1
models/uf-rl/rl_model/DQN/dqn_model/dqn_statebuilder.py

@@ -36,6 +36,10 @@ class DQNStateBuilder:
         self.params = self.cfg.params
         self.params = self.cfg.params
 
 
         self.units = self.uf_cfg["units"]
         self.units = self.uf_cfg["units"]
+        self.column_formats = self.uf_cfg.get("column_formats", {})
+        self.flow_format = self.column_formats.get("flow_col", "C.M.{unit}_FT_JS@out")
+        self.ctrl_format = self.column_formats.get("ctrl_col", "C.M.{unit}_DB@word_control")
+
         self.area_m2 = self.uf_cfg["area_m2"]
         self.area_m2 = self.uf_cfg["area_m2"]
 
 
         self.scale_factor = self.params.get("scale_factor", 1e10)
         self.scale_factor = self.params.get("scale_factor", 1e10)
@@ -236,7 +240,7 @@ class DQNStateBuilder:
         根据列名自动识别 UF 单元编号
         根据列名自动识别 UF 单元编号
         """
         """
         for unit in self.units:
         for unit in self.units:
-            key = f"C.M.{unit}_FT_JS@out"
+            key = self.flow_format.format(unit=unit)
             if key in df.columns:
             if key in df.columns:
                 return unit
                 return unit
         raise ValueError("无法从 CSV 列名识别 UF 单元编号")
         raise ValueError("无法从 CSV 列名识别 UF 单元编号")
@@ -245,11 +249,13 @@ class DQNStateBuilder:
         """
         """
         为 DataFrame 标注 event_type
         为 DataFrame 标注 event_type
         """
         """
+        ctrl_col = self.uf_cfg["flow_col_template"].format(unit=unit_id)
         clf = UFEventClassifier(
         clf = UFEventClassifier(
             unit_name=unit_id,
             unit_name=unit_id,
             inlet_codes=self.uf_cfg["inlet_codes"],
             inlet_codes=self.uf_cfg["inlet_codes"],
             physical_code=self.uf_cfg["physical_bw_code"],
             physical_code=self.uf_cfg["physical_bw_code"],
             chemical_code=self.uf_cfg["chemical_bw_code"],
             chemical_code=self.uf_cfg["chemical_bw_code"],
+            ctrl_col
         )
         )
         df = clf.classify(df)
         df = clf.classify(df)
         df = clf.segment(df)
         df = clf.segment(df)

+ 30 - 11
models/uf-rl/rl_model/DQN/uf_decide/run_dqn_deicde_totalstate.py

@@ -21,7 +21,8 @@ from pathlib import Path
 # ========== 参数 / 物理 ==========
 # ========== 参数 / 物理 ==========
 from env.uf_resistance_models_load import load_resistance_models
 from env.uf_resistance_models_load import load_resistance_models
 from env.uf_physics import UFPhysicsModel
 from env.uf_physics import UFPhysicsModel
-from env.env_params import UFState, UFPhysicsParams, UFActionSpec
+from env.env_params import UFState, UFActionSpec
+from env.env_config_loader import EnvConfigLoader, create_env_params_from_yaml
 
 
 
 
 # ========== 决策器 ==========
 # ========== 决策器 ==========
@@ -31,11 +32,10 @@ from rl_model.DQN.dqn_model.dqn_statebuilder import DQNStateBuilder
 
 
 
 
 
 
-def build_physics(IS_TIMES):
+def build_physics(IS_TIMES, phys_params):
     """
     """
     构造与训练一致的物理模型(只做一次)
     构造与训练一致的物理模型(只做一次)
     """
     """
-    phys_params = UFPhysicsParams()
     res_fp, res_bw = load_resistance_models(phys_params)
     res_fp, res_bw = load_resistance_models(phys_params)
 
 
     physics = UFPhysicsModel(
     physics = UFPhysicsModel(
@@ -46,7 +46,7 @@ def build_physics(IS_TIMES):
     )
     )
     return physics
     return physics
 
 
-def generate_plc_instructions(current_L_s, current_t_bw_s, model_prev_L_s, model_prev_t_bw_s, model_L_s, model_t_bw_s):
+def generate_plc_instructions(action_spec,current_L_s, current_t_bw_s, model_prev_L_s, model_prev_t_bw_s, model_L_s, model_t_bw_s):
     """
     """
     根据工厂当前值、模型上一轮决策值和模型当前轮决策值,生成PLC指令。
     根据工厂当前值、模型上一轮决策值和模型当前轮决策值,生成PLC指令。
 
 
@@ -55,7 +55,7 @@ def generate_plc_instructions(current_L_s, current_t_bw_s, model_prev_L_s, model
        如果工厂当前值也为None,则返回None并提示错误。
        如果工厂当前值也为None,则返回None并提示错误。
     """
     """
 
 
-    action_spec = UFActionSpec()
+    action_spec = action_spec
     adjustment_threshold = 1.0
     adjustment_threshold = 1.0
 
 
     # 处理None值情况
     # 处理None值情况
@@ -192,7 +192,10 @@ def calc_uf_cycle_metrics(current_state, max_tmp_during_filtration, min_tmp_duri
 def run_dqn_decide(
 def run_dqn_decide(
     model_path: Path,
     model_path: Path,
     physics,
     physics,
-    # -------- 工厂当前值 --------
+    action_spec,
+    reward_params,
+    state_bounds,
+# -------- 工厂当前值 --------
     current_state: UFState
     current_state: UFState
 ):
 ):
     """
     """
@@ -202,6 +205,9 @@ def run_dqn_decide(
     # 构造决策器
     # 构造决策器
     decider = UFDQNDecider(
     decider = UFDQNDecider(
         physics=physics,
         physics=physics,
+        action_spec=action_spec,
+        reward_params=reward_params,
+        state_bounds=state_bounds,
         model_path=model_path,
         model_path=model_path,
         seed=0,
         seed=0,
     )
     )
@@ -222,11 +228,11 @@ if __name__ == "__main__":
 
 
     THIS_FILE = Path(__file__).resolve()
     THIS_FILE = Path(__file__).resolve()
     UF_RL_ROOT = THIS_FILE.parents[3]
     UF_RL_ROOT = THIS_FILE.parents[3]
-    CONFIG_PATH = UF_RL_ROOT / "config" / "uf_analyze_config.yaml"
-
-    MODEL_PATH = "model/dqn_model.zip"
-    prev_cycle_csv = "online_datasets/UF1_prev_cycle.csv"
-    init_cycle_csv = "online_datasets/UF1_init_cycle.csv"
+    CONFIG_PATH = UF_RL_ROOT / "xishan" / "uf_analyze_config.yaml"
+    ENV_CONFIG_PATH = UF_RL_ROOT / "xishan" / "env_config.yaml"
+    MODEL_PATH = UF_RL_ROOT / "xishan" / "48h_dqn_model.zip"
+    prev_cycle_csv = "test_online_datasets/UF1_prev_cycle.csv"
+    init_cycle_csv = "test_online_datasets/UF1_init_cycle.csv"
     IS_TIMES = False # 新增指定变量,表示CEB间隔为时间控制/次数控制,T表示48次bw一次CEB,F表示48h一次CEB
     IS_TIMES = False # 新增指定变量,表示CEB间隔为时间控制/次数控制,T表示48次bw一次CEB,F表示48h一次CEB
 
 
     # 构建强化学习状态
     # 构建强化学习状态
@@ -236,11 +242,24 @@ if __name__ == "__main__":
         init_cycle_csv=init_cycle_csv,
         init_cycle_csv=init_cycle_csv,
     )
     )
 
 
+    config_loader = EnvConfigLoader(ENV_CONFIG_PATH)
+    config_loader.validate_config()
+    config_loader.print_config_summary()
+    (
+        uf_state_default,  # UFState默认值
+        phys_params,  # UFPhysicsParams
+        action_spec,  # UFActionSpec
+        reward_params,  # UFRewardParams
+        state_bounds  # UFStateBounds
+    ) = create_env_params_from_yaml(ENV_CONFIG_PATH)
     physics = build_physics(IS_TIMES)
     physics = build_physics(IS_TIMES)
 
 
     action_id, model_L_s, model_t_bw_s = run_dqn_decide(
     action_id, model_L_s, model_t_bw_s = run_dqn_decide(
         model_path=MODEL_PATH,
         model_path=MODEL_PATH,
         physics=physics,
         physics=physics,
+        action_spec=action_spec,
+        reward_params=reward_params,
+        state_bounds=state_bounds,
         current_state=current_state,
         current_state=current_state,
     ) # 环境实例化,模型加载等功能放在UFDQNDecider类中
     ) # 环境实例化,模型加载等功能放在UFDQNDecider类中
 
 

+ 1 - 1
models/uf-rl/rl_model/DQN/uf_train/dqn_trainer.py

@@ -54,7 +54,7 @@ class DQNTrainer:
 
 
         # 4️⃣ 固定日志存放位置:PROJECT_ROOT/model_result/uf_dqn_tensorboard
         # 4️⃣ 固定日志存放位置:PROJECT_ROOT/model_result/uf_dqn_tensorboard
         # 假设在 run_dqn_train.py 中定义了 PROJECT_ROOT = "models/uf-rl"
         # 假设在 run_dqn_train.py 中定义了 PROJECT_ROOT = "models/uf-rl"
-        base_dir = os.path.join(self.PROJECT_ROOT, "model_result", "uf_dqn_tensorboard")
+        base_dir = os.path.join(self.PROJECT_ROOT, "model_result", "uf_dqn_tensorboard","anzhen48h")
         os.makedirs(base_dir, exist_ok=True)
         os.makedirs(base_dir, exist_ok=True)
 
 
         # 5️⃣ 完整日志目录路径
         # 5️⃣ 完整日志目录路径

+ 4 - 3
models/uf-rl/rl_model/DQN/uf_train/run_dqn_train.py

@@ -135,6 +135,7 @@ def main():
     # ---------- Physics ----------
     # ---------- Physics ----------
     physics_model = UFPhysicsModel(
     physics_model = UFPhysicsModel(
         phys_params=phys_params,
         phys_params=phys_params,
+        state_bounds=state_bounds,
         resistance_model_fp=res_fp,
         resistance_model_fp=res_fp,
         resistance_model_bw=res_bw,
         resistance_model_bw=res_bw,
         IS_TIMES=IS_TIMES
         IS_TIMES=IS_TIMES
@@ -291,10 +292,10 @@ if __name__ == "__main__":
 
 
     RESET_STATE_CSV = (
     RESET_STATE_CSV = (
             PROJECT_ROOT
             PROJECT_ROOT
-            / "datasets/UF_longting_data/rl_ready/output/reset_state_pool.csv"
+            / "datasets/UF_anzhen_data/rl_ready/output/reset_state_pool.csv"
     )
     )
 
 
-    ENV_CONFIG_PATH = PROJECT_ROOT / "longting" / "env_config.yaml"
-    MODEL_CONFIG_PATH = PROJECT_ROOT / "longting" / "dqn_config.yaml"
+    ENV_CONFIG_PATH = PROJECT_ROOT / "anzhen" / "env_config.yaml"
+    MODEL_CONFIG_PATH = PROJECT_ROOT / "anzhen" / "dqn_config.yaml"
 
 
     main()
     main()

+ 59 - 0
models/uf-rl/uf_data_process/calculate.py

@@ -78,6 +78,65 @@ class UFResistanceCalculator:
         return result_segments
         return result_segments
 
 
 
 
+class PumpPowerCalculator:
+    """计算每个segment的供水泵和反洗泵平均功率"""
+
+    def __init__(self, event_col="event_type"):
+        self.event_col = event_col
+
+    def calculate_for_segment(
+        self,
+        seg_df: pd.DataFrame,
+        inlet_power_col=None,
+        bw_power_col=None
+    ):
+        """计算单个segment平均功率"""
+        seg_df = seg_df.copy()
+
+        required_cols = [self.event_col, inlet_power_col, bw_power_col]
+
+        if not all(col in seg_df.columns for col in required_cols):
+            print("⚠️ segment缺少必要列")
+            return seg_df
+
+        # 转为数值
+        seg_df[inlet_power_col] = pd.to_numeric(seg_df[inlet_power_col], errors="coerce")
+        seg_df[bw_power_col] = pd.to_numeric(seg_df[bw_power_col], errors="coerce")
+
+        # inlet平均功率
+        inlet_rows = seg_df[seg_df[self.event_col] == "inlet"]
+        inlet_mean = inlet_rows[inlet_power_col].mean()
+
+        # bw_phys平均功率
+        bw_rows = seg_df[seg_df[self.event_col] == "bw_phys"]
+        bw_mean = bw_rows[bw_power_col].mean()
+
+        seg_df["inlet_pump_power_mean"] = inlet_mean
+        seg_df["bw_pump_power_mean"] = bw_mean
+
+        return seg_df
+
+
+    def calculate_for_segments(
+        self,
+        segments: list,
+        inlet_power_col=None,
+        bw_power_col=None
+    ):
+        """批量计算"""
+        result_segments = []
+
+        for seg in segments:
+            seg_res = self.calculate_for_segment(
+                seg,
+                inlet_power_col=inlet_power_col,
+                bw_power_col=bw_power_col
+            )
+            result_segments.append(seg_res)
+
+        return result_segments
+
+
 # =============================
 # =============================
 #   单变量稳定性分析(例如 flow/TMP)
 #   单变量稳定性分析(例如 flow/TMP)
 # =============================
 # =============================

+ 178 - 100
models/uf-rl/uf_data_process/data_export.py

@@ -10,8 +10,8 @@ DB_HOST = "222.130.26.206"
 DB_PORT = 4000
 DB_PORT = 4000
 
 
 # 时间配置
 # 时间配置
-START_TIME = datetime(2025, 11, 28, 0, 0, 0)
-END_TIME = datetime(2026, 2, 20, 0, 0, 0)
+START_TIME = datetime(2025, 4, 1, 0, 0, 0)
+END_TIME = datetime(2026, 3, 1, 6, 0, 0)
 BOUNDARY = datetime(2025, 3, 25, 0, 0, 0)
 BOUNDARY = datetime(2025, 3, 25, 0, 0, 0)
 
 
 DELETE_PERIODS = [
 DELETE_PERIODS = [
@@ -23,26 +23,35 @@ DELETE_PERIODS = [
 ]  # 来自张昊师兄的,需要被去除的黑名单区间
 ]  # 来自张昊师兄的,需要被去除的黑名单区间
 DELETE_PERIODS = [(pd.to_datetime(s), pd.to_datetime(e)) for s, e in DELETE_PERIODS]
 DELETE_PERIODS = [(pd.to_datetime(s), pd.to_datetime(e)) for s, e in DELETE_PERIODS]
 
 
-# 新水岛
-# ---------- 传感器配置(新水厂系统) ----------
+# ---------- 传感器配置----------
 # 机组编号
 # 机组编号
 UNITS = [1, 2]
 UNITS = [1, 2]
 
 
-# 新系统变量模板
 BASE_VARIABLES = [
 BASE_VARIABLES = [
-    "ns=3;s={}#UF_JSFLOW_O",  # 进水流量
-    "ns=3;s={}#UF_JSPRESS_O",  # 进水压力(如有)
-    "ns=3;s=UF{}_SSD_KMYC",  # 跨膜压差(如有)
-    "ns=3;s=UF{}_STEP" # 步序
+    "AR.{}#UF_JSFLOW_O",      # 进水流量
+    "AR.{}#UF_JSPRESS_O",     # 进水压力
+    "AR.UF{}_SSD_KMYC",       # 跨膜压差
+    "AR.UF{}_STEP",           # 步序/控制字
+    "AR.ZZ_{}#UFBWB_POWER" # 反洗泵功率
 ]
 ]
-
-# 系统总变量(如果存在)
 SYSTEM_VARIABLES = [
 SYSTEM_VARIABLES = [
-    "ns=3;s=ZJS_TEMP_O", # 进水温度
-    "ns=3;s=RO_JSORP_O",  # 总产水ORP(示例)
-    "ns=3;s=RO_JSPH_O",  # 总产水PH(示例)
+    "AR.ZJS_TEMP_O",          # 进水温度
+    "AR.RO_JSORP_O",          # 总产水ORP
+    "AR.RO_JSPH_O",           # 总产水PH
+    "AR.RO_JSDD_O",           # 总产水电导
+    "AR.CN_LEVEL_O",       # 次钠液位
+    "AR.S_LEVEL_O",           # 酸液位
+    "AR.J_LEVEL_O",           # 碱液位
+    # "AR.ZZ_UFGSB_POWER", # 超滤供水泵功率
 ]
 ]
 
 
+# 输出目录
+BASE_OUTPUT_DIR = "../datasets/UF_anzhen_data"
+PROCESSED_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "raw")
+
+# 创建目录
+os.makedirs(PROCESSED_OUTPUT_DIR, exist_ok=True)
+
 # 生成所有变量名称
 # 生成所有变量名称
 SENSOR_NAMES = []
 SENSOR_NAMES = []
 
 
@@ -56,13 +65,6 @@ print(f"总共查询 {len(SENSOR_NAMES)} 个变量")
 for i, var in enumerate(SENSOR_NAMES, 1):
 for i, var in enumerate(SENSOR_NAMES, 1):
     print(f"{i:2d}. {var}")
     print(f"{i:2d}. {var}")
 
 
-# 输出目录 - 只需要processed文件夹
-BASE_OUTPUT_DIR = "../datasets/UF_longting_data"
-PROCESSED_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "raw")
-
-# 创建目录
-os.makedirs(PROCESSED_OUTPUT_DIR, exist_ok=True)
-
 
 
 # ---------- 创建数据库引擎 ----------
 # ---------- 创建数据库引擎 ----------
 def create_db_engines():
 def create_db_engines():
@@ -93,21 +95,20 @@ def create_db_engines():
 # ---------- 一分钟聚合查询函数(子查询方式)----------
 # ---------- 一分钟聚合查询函数(子查询方式)----------
 def fetch_valve_aggregated(name, start, end, engine, interval_minutes=1):
 def fetch_valve_aggregated(name, start, end, engine, interval_minutes=1):
     """
     """
-    从数据库获取传感器数据并直接进行时间聚合
-    使用子查询方式避免 ONLY_FULL_GROUP_BY 问题
+    从数据库获取传感器数据并按分钟聚合,时间戳对齐到分钟开始(00秒)
     """
     """
     interval_seconds = interval_minutes * 60
     interval_seconds = interval_minutes * 60
 
 
     sql = text(f"""
     sql = text(f"""
         SELECT 
         SELECT 
-            MIN(h_time) AS time,
+            FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(MIN(h_time)) / {interval_seconds}) * {interval_seconds}) AS time,
             AVG(val) AS val
             AVG(val) AS val
         FROM (
         FROM (
             SELECT 
             SELECT 
                 h_time,
                 h_time,
                 val,
                 val,
                 FLOOR(UNIX_TIMESTAMP(h_time) / {interval_seconds}) AS time_group
                 FLOOR(UNIX_TIMESTAMP(h_time) / {interval_seconds}) AS time_group
-            FROM dc_item_history_data_1450
+            FROM dc_item_history_data_1181
             WHERE item_name = :name
             WHERE item_name = :name
               AND h_time BETWEEN :st AND :et
               AND h_time BETWEEN :st AND :et
               AND val IS NOT NULL
               AND val IS NOT NULL
@@ -120,77 +121,154 @@ def fetch_valve_aggregated(name, start, end, engine, interval_minutes=1):
         df = pd.read_sql(sql, engine, params={"name": name, "st": start, "et": end})
         df = pd.read_sql(sql, engine, params={"name": name, "st": start, "et": end})
         if not df.empty:
         if not df.empty:
             df['time'] = pd.to_datetime(df['time'])
             df['time'] = pd.to_datetime(df['time'])
+            # 确保时间戳是整分钟(去除秒和微秒)
+            df['time'] = df['time'].dt.floor('1min')
             print(f"  ✓ {name}: {len(df)} 条记录")
             print(f"  ✓ {name}: {len(df)} 条记录")
         return df
         return df
     except Exception as e:
     except Exception as e:
         print(f"  ✗ {name} 查询失败: {str(e)}")
         print(f"  ✗ {name} 查询失败: {str(e)}")
         return pd.DataFrame()
         return pd.DataFrame()
 
 
+def fetch_special_data(sensor, start, end, boundary, engine_test, engine_prod):
+    """
+    专门获取步序数据,保持原始变化点
+    返回原始时间戳和值
+    """
+    sql = text("""
+               SELECT h_time AS time, val
+               FROM dc_item_history_data_1181
+               WHERE item_name = :name
+                 AND h_time BETWEEN :st
+                 AND :et
+                 AND val IS NOT NULL
+               ORDER BY h_time
+               """)
+
+    try:
+        # 根据边界时间选择合适的数据库引擎
+        if end <= boundary:
+            df = pd.read_sql(sql, engine_test, params={"name": sensor, "st": start, "et": end})
+        elif start >= boundary:
+            df = pd.read_sql(sql, engine_prod, params={"name": sensor, "st": start, "et": end})
+        else:
+            df1 = pd.read_sql(sql, engine_test, params={"name": sensor, "st": start, "et": boundary})
+            df2 = pd.read_sql(sql, engine_prod, params={"name": sensor, "st": boundary, "et": end})
+            df = pd.concat([df1, df2], ignore_index=True)
 
 
+        if not df.empty:
+            df['time'] = pd.to_datetime(df['time'])
+
+        return df
+    except Exception as e:
+        print(f"  ✗ {sensor} 查询失败: {str(e)}")
+        return pd.DataFrame()
+
+
+# ---------- 传感器数据查询函数(只获取聚合数据)----------
 # ---------- 传感器数据查询函数(只获取聚合数据)----------
 # ---------- 传感器数据查询函数(只获取聚合数据)----------
 def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test, engine_prod):
 def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test, engine_prod):
     """
     """
     获取多个传感器的分钟级聚合数据并合并为宽表
     获取多个传感器的分钟级聚合数据并合并为宽表
-
-    参数:
-        sensor_names: 传感器名称列表
-        start_time: 开始时间
-        end_time: 结束时间
-        boundary: 数据库切换边界
-        engine_test: 测试数据库引擎
-        engine_prod: 生产数据库引擎
+    步序变量单独处理
     """
     """
-    all_data = []  # 只存储聚合数据
-
-    print("\n开始获取各传感器数据:")
-
-    for sensor in sensor_names:
-        try:
-            # 根据边界时间选择合适的数据库引擎
-            if end_time <= boundary:
-                # 全部在测试数据库
-                df = fetch_valve_aggregated(sensor, start_time, end_time, engine_test)
-            elif start_time >= boundary:
-                # 全部在生产数据库
-                df = fetch_valve_aggregated(sensor, start_time, end_time, engine_prod)
-            else:
-                # 跨越两个数据库
-                df1 = fetch_valve_aggregated(sensor, start_time, boundary, engine_test)
-                df2 = fetch_valve_aggregated(sensor, boundary, end_time, engine_prod)
-                df = pd.concat([df1, df2], ignore_index=True)
-
-            if not df.empty:
-                # 重命名列以区分不同传感器
-                df_renamed = df.rename(columns={'val': sensor})
-                all_data.append(df_renamed[['time', sensor]])
-            else:
-                print(f"  ⚠ {sensor}: 无数据")
-
-        except Exception as e:
-            print(f"  ⚠ {sensor}: 处理失败 - {str(e)}")
-            continue
-
-    if not all_data:
-        print("\n❌ 未获取到任何传感器数据")
-        return pd.DataFrame()
+    # 识别步序变量和功率变量
+    step_vars = [v for v in sensor_names if 'STEP' in v]
+    power_vars = [v for v in sensor_names if 'POWER' in v]
+
+    # 需要特殊处理的变量
+    special_vars = step_vars + power_vars
 
 
-    print(f"\n开始合并 {len(all_data)} 个传感器的数据...")
+    # 其他连续变量
+    continuous_vars = [v for v in sensor_names if v not in special_vars]
 
 
-    # 使用reduce逐步合并DataFrame
-    merged_df = reduce(
-        lambda left, right: pd.merge(left, right, on='time', how='outer'),
-        all_data
+    print(f"\n识别到 {len(special_vars)} 个离散变量, {len(continuous_vars)} 个连续变量")
+
+    # 3. 创建完整的时间网格(整分钟)- 先创建
+    print(f"\n创建完整时间网格...")
+    time_grid = pd.date_range(
+        start=start_time.replace(second=0, microsecond=0),
+        end=end_time.replace(second=0, microsecond=0),
+        freq='1min'
     )
     )
 
 
-    # 按时间排序
-    merged_df = merged_df.sort_values('time').reset_index(drop=True)
+    # 创建以时间为索引的DataFrame
+    merged_df = pd.DataFrame(index=time_grid)
+    print(f"时间网格: {len(time_grid)} 个时间点")
+
+    # 1. 处理连续变量(按分钟聚合,时间对齐到整分钟)
+    if continuous_vars:
+        print("\n获取连续变量数据(分钟平均):")
+        for sensor in continuous_vars:
+            try:
+                # 根据边界时间选择合适的数据库引擎
+                if end_time <= boundary:
+                    df = fetch_valve_aggregated(sensor, start_time, end_time, engine_test)
+                elif start_time >= boundary:
+                    df = fetch_valve_aggregated(sensor, start_time, end_time, engine_prod)
+                else:
+                    df1 = fetch_valve_aggregated(sensor, start_time, boundary, engine_test)
+                    df2 = fetch_valve_aggregated(sensor, boundary, end_time, engine_prod)
+                    df = pd.concat([df1, df2], ignore_index=True)
+
+                if not df.empty:
+                    # 确保时间戳是整分钟并设为索引
+                    df['time'] = pd.to_datetime(df['time']).dt.floor('1min')
+                    df = df.drop_duplicates(subset=['time'], keep='first')
+                    df = df.set_index('time')
+
+                    # 添加到合并DataFrame
+                    merged_df[sensor] = df['val']
+                    print(f"  ✓ {sensor}: {len(df)} 条记录")
+                else:
+                    print(f"  ⚠ {sensor}: 无数据")
+
+            except Exception as e:
+                print(f"  ⚠ {sensor}: 处理失败 - {str(e)}")
+                continue
+
+    # 2. 处理离散变量(保持原始变化点)
+    if special_vars:
+        print("\n获取离散变量数据(原始变化点):")
+        for sensor in special_vars:
+            try:
+                # 获取原始数据(不聚合)
+                df = fetch_special_data(sensor, start_time, end_time, boundary, engine_test, engine_prod)
+
+                if not df.empty:
+                    # 创建分钟级的重采样
+                    df['time'] = pd.to_datetime(df['time'])
+                    df = df.set_index('time')
+
+                    # 重采样到分钟,对于离散变量使用前向填充
+                    df_resampled = df.resample('1min').ffill()
+
+                    # 添加到合并DataFrame
+                    merged_df[sensor] = df_resampled['val']
+                    print(f"  ✓ {sensor}: {len(df)} 个原始点 -> {len(df_resampled)} 个分钟点")
+                else:
+                    print(f"  ⚠ {sensor}: 无数据")
+
+            except Exception as e:
+                print(f"  ⚠ {sensor}: 处理失败 - {str(e)}")
+                continue
+
+    if merged_df.empty or len(merged_df.columns) == 0:
+        print("\n❌ 未获取到任何传感器数据")
+        return pd.DataFrame()
 
 
-    print(f"合并完成,共 {len(merged_df)} 条时间记录")
+    # 重置索引,将时间变为列
+    merged_df = merged_df.reset_index()
+    merged_df = merged_df.rename(columns={'index': 'time'})
 
 
-    # 删除黑名单时段
-    print("删除黑名单时段...")
+    print(f"\n合并完成,共 {len(merged_df)} 条时间记录 × {len(merged_df.columns) - 1} 个传感器")
+    print(f"数据框形状: {merged_df.shape}")
+
+    # 5. 删除黑名单时段
+    print("\n删除黑名单时段...")
     original_len = len(merged_df)
     original_len = len(merged_df)
     for s, e in DELETE_PERIODS:
     for s, e in DELETE_PERIODS:
+        s = pd.to_datetime(s).floor('1min')
+        e = pd.to_datetime(e).floor('1min')
         merged_df = merged_df[(merged_df['time'] < s) | (merged_df['time'] > e)]
         merged_df = merged_df[(merged_df['time'] < s) | (merged_df['time'] > e)]
 
 
     deleted_count = original_len - len(merged_df)
     deleted_count = original_len - len(merged_df)
@@ -201,16 +279,11 @@ def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test,
 
 
 
 
 # ---------- 数据后处理函数(填充空值)----------
 # ---------- 数据后处理函数(填充空值)----------
-def post_process_data(df, method='both'):
+def post_process_data(df, continuous_vars, step_vars):
     """
     """
-    对聚合后的数据进行后处理:填充空值
-
-    参数:
-        df: 输入的宽表DataFrame
-        method: 填充方法
-
-    返回:
-        pd.DataFrame: 处理后的DataFrame
+    对聚合后的数据进行后处理
+    连续变量:线性插值或前后填充
+    步序变量:已经前向填充,只需处理开头可能存在的空值
     """
     """
     if df.empty:
     if df.empty:
         print("警告:输入数据为空")
         print("警告:输入数据为空")
@@ -227,21 +300,22 @@ def post_process_data(df, method='both'):
     missing_before = df_processed.isnull().sum().sum()
     missing_before = df_processed.isnull().sum().sum()
     print(f"填充前空值数量: {missing_before}")
     print(f"填充前空值数量: {missing_before}")
 
 
-    if method == 'bfill':
-        df_processed = df_processed.bfill()
-    elif method == 'ffill':
-        df_processed = df_processed.ffill()
-    else:  # both
-        df_processed = df_processed.ffill().bfill()
+    # 处理连续变量(使用线性插值更适合连续物理量)
+    for var in continuous_vars:
+        if var in df_processed.columns:
+            # 先线性插值,再前后填充边界
+            df_processed[var] = df_processed[var].interpolate(method='linear', limit_direction='both')
+
+    # 处理步序变量(可能开头有NaN,用后向填充)
+    for var in step_vars:
+        if var in df_processed.columns:
+            df_processed[var] = df_processed[var].bfill()  # 开头空值用第一个有效值填充
 
 
     missing_after = df_processed.isnull().sum().sum()
     missing_after = df_processed.isnull().sum().sum()
     print(f"填充后空值数量: {missing_after}")
     print(f"填充后空值数量: {missing_after}")
     print(f"填充了 {missing_before - missing_after} 个空值")
     print(f"填充了 {missing_before - missing_after} 个空值")
 
 
-    # 重置索引
-    df_processed = df_processed.reset_index().rename(columns={'index': 'time'})
-
-    return df_processed
+    return df_processed.reset_index()
 
 
 
 
 # ---------- 数据分块保存函数 ----------
 # ---------- 数据分块保存函数 ----------
@@ -303,12 +377,16 @@ if __name__ == "__main__":
     print(f"\n✅ 聚合数据获取完成!")
     print(f"\n✅ 聚合数据获取完成!")
     print(f"总数据量: {len(agg_df)} 条记录")
     print(f"总数据量: {len(agg_df)} 条记录")
     print(f"时间范围: {agg_df['time'].min()} 到 {agg_df['time'].max()}")
     print(f"时间范围: {agg_df['time'].min()} 到 {agg_df['time'].max()}")
-    print(f"时间间隔: 1分钟")
-    print(f"传感器数量: {len(agg_df.columns) - 1} 个")  # 减1是因为time列
+    print(f"时间间隔: 1分钟(整点)")
+    print(f"传感器数量: {len(agg_df.columns) - 1} 个")
+
+    # 识别步序变量
+    step_vars = [col for col in agg_df.columns if 'STEP' in col]
+    continuous_vars = [col for col in agg_df.columns if col != 'time' and col not in step_vars]
 
 
-    # 3. 后处理聚合数据(填充空值)
-    print(f"\n[3/4] 后处理聚合数据(填充空值)...")
-    processed_df = post_process_data(agg_df, method='both')
+    # 3. 后处理数据
+    print(f"\n[3/4] 后处理聚合数据...")
+    processed_df = post_process_data(agg_df, continuous_vars, step_vars)
 
 
     print(f"\n✅ 后处理完成!")
     print(f"\n✅ 后处理完成!")
     print(f"处理后数据行数: {len(processed_df)}")
     print(f"处理后数据行数: {len(processed_df)}")

+ 43 - 0
models/uf-rl/uf_data_process/filter.py

@@ -112,6 +112,49 @@ class InletSegmentFilter:
         return stable_segments
         return stable_segments
 
 
 
 
+import pandas as pd
+import numpy as np
+
+
+class FlowOutlierFilter:
+    """
+    对稳定段进行异常值剔除
+    规则:flow 不在 mean ± n*std 范围内的行删除
+    """
+
+    def __init__(self, n_sigma=3):
+        self.n_sigma = n_sigma
+
+    def filter_segment(self, seg: pd.DataFrame, flow_col, prefix="flow"):
+
+        seg = seg.copy()
+
+        mean_col = f"{prefix}_mean"
+        std_col = f"{prefix}_std"
+
+        # 如果统计列不存在,直接返回
+        if mean_col not in seg.columns or std_col not in seg.columns:
+            return seg
+
+        mean_val = seg[mean_col].iloc[0]
+        std_val = seg[std_col].iloc[0]
+
+        lower = mean_val - self.n_sigma * std_val
+        upper = mean_val + self.n_sigma * std_val
+
+        seg = seg[(seg[flow_col] >= lower) & (seg[flow_col] <= upper)]
+
+        return seg
+
+    def filter_segments(self, segments, flow_col, prefix="flow"):
+
+        result = []
+
+        for seg in segments:
+            filtered = self.filter_segment(seg, flow_col, prefix)
+            result.append(filtered)
+
+        return result
 
 
 
 
 
 

+ 39 - 0
models/uf-rl/uf_data_process/fit.py

@@ -386,3 +386,42 @@ class ChemicalBackwashCleaner:
             cycles[ids[-1]]["R_removed"] = None
             cycles[ids[-1]]["R_removed"] = None
 
 
         return cycles
         return cycles
+
+    def compute_dose(self, cycles, NaClO_col, HCl_col, NaOH_col):
+        ids = sorted(cycles.keys())
+
+        for i in range(len(ids) - 1):
+            cid1 = ids[i]
+            cid2 = ids[i + 1]
+
+            c1 = cycles[cid1]
+            c2 = cycles[cid2]
+
+            if not (c1["valid"] and c2["valid"]):
+                c1["R_removed"] = None
+                continue
+
+            # 上周期末:最后段的 R_end
+            seg_last = c1["segments"][-1]
+            NaClO_dose_end = seg_last[NaClO_col]
+            HCl_dose_end = seg_last[HCl_col]
+            NaOH_dose_end = seg_last[NaOH_col]
+
+            # 下周期初:第一段的 R_start
+            seg_first = c2["segments"][0]
+            NaClO_dose_start = seg_first[NaClO_col]
+            HCl_dose_start = seg_last[HCl_col]
+            NaOH_dose_start = seg_last[NaOH_col]
+
+            c1["NaClO_dose_removed"] = NaClO_dose_end - NaClO_dose_start
+            c1["HCl_dose_removed"] = HCl_dose_end - HCl_dose_start
+            c1["NaOH_dose_removed"] = NaOH_dose_end - NaOH_dose_start
+
+
+        # 最后一个周期无 CEB 去除值
+        if ids:
+            cycles[ids[-1]]["NaClO_dose_removed"] = None
+            cycles[ids[-1]]["HCl_dose_removed"] = None
+            cycles[ids[-1]]["NaOH_dose_removed"] = None
+
+        return cycles

+ 27 - 7
models/uf-rl/uf_data_process/pipeline.py

@@ -6,8 +6,8 @@ import pandas as pd
 from pathlib import Path
 from pathlib import Path
 from load import UFConfigLoader, UFDataLoader
 from load import UFConfigLoader, UFDataLoader
 from label import UFEventClassifier, PostBackwashInletMarker
 from label import UFEventClassifier, PostBackwashInletMarker
-from filter import ConstantFlowFilter, EventQualityFilter, InletSegmentFilter
-from calculate import UFResistanceCalculator, VariableStabilityAnalyzer, UFResistanceAnalyzer, UFPressureAnalyzer
+from filter import ConstantFlowFilter, EventQualityFilter, InletSegmentFilter,FlowOutlierFilter
+from calculate import UFResistanceCalculator, PumpPowerCalculator, VariableStabilityAnalyzer, UFResistanceAnalyzer, UFPressureAnalyzer
 from fit import ChemicalCycleSegmenter, ChemicalBackwashCleaner, ShortTermCycleFoulingFitter,LongTermFoulingFitter
 from fit import ChemicalCycleSegmenter, ChemicalBackwashCleaner, ShortTermCycleFoulingFitter,LongTermFoulingFitter
 
 
 class UFAnalysisPipeline:
 class UFAnalysisPipeline:
@@ -44,14 +44,20 @@ class UFAnalysisPipeline:
         self.ctrl_format = column_formats.get("ctrl_col", "C.M.{unit}_DB@word_control")
         self.ctrl_format = column_formats.get("ctrl_col", "C.M.{unit}_DB@word_control")
         self.flow_format = column_formats.get("flow_col", "C.M.{unit}_FT_JS@out")
         self.flow_format = column_formats.get("flow_col", "C.M.{unit}_FT_JS@out")
         self.tmp_format = column_formats.get("tmp_col", "C.M.{unit}_DB@press_PV")
         self.tmp_format = column_formats.get("tmp_col", "C.M.{unit}_DB@press_PV")
+        self.BWB_POWER_format = column_formats.get("BWB_POWER_col", "ns=3;s=ZZ_{unit}#UFBWB_POWER")
+        self.GSB_POWER_col = column_formats.get("GSB_POWER_col", "ns=3;s=ZZ_UFGSB_POWER")
         self.temp_col = column_formats.get("temp_col", "C.M.RO_TT_ZJS@out")
         self.temp_col = column_formats.get("temp_col", "C.M.RO_TT_ZJS@out")
         self.orp_col = column_formats.get("orp_col", "C.M.UF_ORP_ZCS@out")
         self.orp_col = column_formats.get("orp_col", "C.M.UF_ORP_ZCS@out")
+        self.NaClO_col = column_formats.get("NaClO_col", "ns=3;s=CN_LEVEL_O")
+        self.HCl_col = column_formats.get("HCl_col", "ns=3;s=S_LEVEL_O")
+        self.NaOH_col = column_formats.get("NaOH_col", "ns=3;s=J_LEVEL_O")
 
 
         # 过滤器
         # 过滤器
-        self.min_points = params.get("min_points", 40)
+        self.min_points = params.get("min_points", 20)
         self.initial_points = params.get("initial_points", 10)
         self.initial_points = params.get("initial_points", 10)
 
 
         self.quality_filter = EventQualityFilter(min_points=self.min_points)
         self.quality_filter = EventQualityFilter(min_points=self.min_points)
+        self.flow_filter = FlowOutlierFilter(n_sigma=3)
         self.initial_label = PostBackwashInletMarker(n_points=self.initial_points)
         self.initial_label = PostBackwashInletMarker(n_points=self.initial_points)
 
 
         # 阻力计算器
         # 阻力计算器
@@ -59,6 +65,8 @@ class UFAnalysisPipeline:
         self.segment_head_n = params.get("segment_head_n", 10)
         self.segment_head_n = params.get("segment_head_n", 10)
         self.segment_tail_n = params.get("segment_tail_n", 10)
         self.segment_tail_n = params.get("segment_tail_n", 10)
 
 
+        # 功率计算器
+        self.power_calc = PumpPowerCalculator(event_col=column_formats.get("event_col", "event_type"))
 
 
 
 
     # ----------------------------
     # ----------------------------
@@ -148,13 +156,17 @@ class UFAnalysisPipeline:
         if not np.issubdtype(df["time"].dtype, np.datetime64):
         if not np.issubdtype(df["time"].dtype, np.datetime64):
             df["time"] = pd.to_datetime(df["time"])
             df["time"] = pd.to_datetime(df["time"])
 
 
+        # 只保留 2025-06-11 及之后的数据
+        start_date = pd.Timestamp("2025-06-10")
+        df = df[df["time"] >= start_date].reset_index(drop=True)
+
         # 逐机组处理
         # 逐机组处理
         for unit in self.units:
         for unit in self.units:
             print(f"Processing {unit} ...")
             print(f"Processing {unit} ...")
             ctrl_col = self.ctrl_format.format(unit=unit)
             ctrl_col = self.ctrl_format.format(unit=unit)
             flow_col = self.flow_format.format(unit=unit)
             flow_col = self.flow_format.format(unit=unit)
             tmp_col = self.tmp_format.format(unit=unit)
             tmp_col = self.tmp_format.format(unit=unit)
-
+            BWB_POWER_col = self.BWB_POWER_format.format(unit=unit)
 
 
             # 去除无关列
             # 去除无关列
             other_units = [u for u in self.units if u != unit]
             other_units = [u for u in self.units if u != unit]
@@ -174,6 +186,11 @@ class UFAnalysisPipeline:
             segments = const_flow_filter.filter(seg_df) # 去除出现网络错误的进水段
             segments = const_flow_filter.filter(seg_df) # 去除出现网络错误的进水段
             segments = self.quality_filter.filter(segments) # 去除时间过短的进水段
             segments = self.quality_filter.filter(segments) # 去除时间过短的进水段
 
 
+            # -----------------------------
+            # 逐段计算本段的进水泵平均功率与反洗泵功率
+            # -----------------------------
+            segments = self.power_calc.calculate_for_segments(segments,inlet_power_col=self.GSB_POWER_col,bw_power_col=BWB_POWER_col)
+
             # 提取稳定进水段
             # 提取稳定进水段
             stable_extractor = InletSegmentFilter(ctrl_col,stable_codes=self.stable_codes,min_points=self.min_points)
             stable_extractor = InletSegmentFilter(ctrl_col,stable_codes=self.stable_codes,min_points=self.min_points)
             stable_segments = stable_extractor.extract(segments)  # 提取稳定进水数据
             stable_segments = stable_extractor.extract(segments)  # 提取稳定进水数据
@@ -193,6 +210,7 @@ class UFAnalysisPipeline:
                 tmp_col=tmp_col,
                 tmp_col=tmp_col,
             )
             )
 
 
+
             # -----------------------------
             # -----------------------------
             # 逐段计算 起止跨膜压差tmp_start/tmp_end 放缩后的R_start/R_end 进水变量稳定性
             # 逐段计算 起止跨膜压差tmp_start/tmp_end 放缩后的R_start/R_end 进水变量稳定性
             # -----------------------------
             # -----------------------------
@@ -201,6 +219,7 @@ class UFAnalysisPipeline:
             vsa = VariableStabilityAnalyzer()
             vsa = VariableStabilityAnalyzer()
             stable_segments = vsa.analyze_segments(stable_segments, col=flow_col, prefix="flow")
             stable_segments = vsa.analyze_segments(stable_segments, col=flow_col, prefix="flow")
             stable_segments = vsa.analyze_segments(stable_segments, col=self.temp_col, prefix="temp")
             stable_segments = vsa.analyze_segments(stable_segments, col=self.temp_col, prefix="temp")
+            stable_segments = self.flow_filter.filter_segments(stable_segments,flow_col=flow_col,prefix="flow" )
 
 
             # 跨膜压差统计
             # 跨膜压差统计
             upa = UFPressureAnalyzer(
             upa = UFPressureAnalyzer(
@@ -223,7 +242,7 @@ class UFAnalysisPipeline:
             # ----------------------------------
             # ----------------------------------
             # 1. 化学周期划分
             # 1. 化学周期划分
             # ----------------------------------
             # ----------------------------------
-            cycle_segmenter = ChemicalCycleSegmenter(max_hours=60)
+            cycle_segmenter = ChemicalCycleSegmenter(max_hours = 100)
             cycles = cycle_segmenter.assign_cycles(df_unit, stable_segments)
             cycles = cycle_segmenter.assign_cycles(df_unit, stable_segments)
 
 
             # ----------------------------------
             # ----------------------------------
@@ -266,10 +285,11 @@ class UFAnalysisPipeline:
                     seg["cycle_long_r2"] = r2
                     seg["cycle_long_r2"] = r2
 
 
             # ----------------------------------
             # ----------------------------------
-            # 4. 计算化学反冲洗去除膜阻力
+            # 4. 计算化学反冲洗去除膜阻力及耗药量
             # ----------------------------------
             # ----------------------------------
             cbc = ChemicalBackwashCleaner(unit)
             cbc = ChemicalBackwashCleaner(unit)
             cycles = cbc.compute_removal(cycles)
             cycles = cbc.compute_removal(cycles)
+            cycles = cbc.compute_dose(cycles, self.NaClO_col, self.HCl_col, self.NaOH_col)
 
 
             # 回写到稳定段
             # 回写到稳定段
             for cid, cycle in cycles.items():
             for cid, cycle in cycles.items():
@@ -342,7 +362,7 @@ class UFAnalysisPipeline:
             if "chem_cycle_id" not in df_segments.columns:
             if "chem_cycle_id" not in df_segments.columns:
                 print(f"No chem_cycle_id found for unit {unit}, skipping cycle filtering.")
                 print(f"No chem_cycle_id found for unit {unit}, skipping cycle filtering.")
             else:
             else:
-                df_valid = df_segments.dropna(subset=["chem_cycle_id"]).copy()
+                df_valid = df_segments.dropna(subset=["chem_cycle_id", "cycle_nuK_R2", "cycle_long_r2"]).copy()
                 df_valid["chem_cycle_id"] = df_valid["chem_cycle_id"].astype(int)
                 df_valid["chem_cycle_id"] = df_valid["chem_cycle_id"].astype(int)
 
 
                 # 2. 根据 R2 过滤化学周期
                 # 2. 根据 R2 过滤化学周期

+ 1 - 1
models/uf-rl/uf_data_process/run_ufdata_pipeline.py

@@ -30,5 +30,5 @@ def main():
 
 
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
-    CONFIG_PATH = UF_RL_ROOT / "longting" / "uf_analyze_config.yaml"
+    CONFIG_PATH = UF_RL_ROOT / "anzhen" / "uf_analyze_config.yaml"
     main()
     main()

+ 167 - 0
models/uf-rl/uf_data_process/水厂变量名称记录.md

@@ -0,0 +1,167 @@
+# 各水厂OPC UA变量配置总览
+
+## 当前缺失变量:
+> 龙亭等新水岛供水泵功率命名均为总功率,需确认是否为所有超滤机组的总用电,且需要确认功率单位
+> 
+> 盐城:ns=3;s=ZZ_UFGSB_POWER命名为UF供水泵相电压总功率,需确认是否表示超滤供水泵功率
+> 
+> 
+## 龙亭新水岛
+> **机组数量**:2台
+> 
+> **数据库id**:1450
+> 
+> **可用数据开始时间**:2025-6-11
+
+### UF机组变量
+```python
+UNITS = [1, 2]
+
+BASE_VARIABLES = [
+    "ns=3;s={}#UF_JSFLOW_O",      # 进水流量
+    "ns=3;s={}#UF_JSPRESS_O",     # 进水压力
+    "ns=3;s=UF{}_SSD_KMYC",       # 跨膜压差
+    "ns=3;s=UF{}_STEP",           # 步序/控制字
+    "ns=3;s=ZZ_{}#UFBWB_POWER", # 反洗泵功率
+]
+SYSTEM_VARIABLES = [
+    "ns=3;s=ZJS_TEMP_O",          # 进水温度
+    "ns=3;s=RO_JSORP_O",          # 总产水ORP
+    "ns=3;s=RO_JSPH_O",           # 总产水PH
+    "ns=3;s=RO_JSDD_O",           # 总产水电导
+    "ns=3;s=CN_LEVEL_O", # 次钠液位
+    "ns=3;s=S_LEVEL_O", # 酸液位
+    "ns=3;s=J_LEVEL_O", # 碱液位
+    "ns=3;s=ZZ_UFGSB_POWER", # 超滤供水泵功率
+    
+]
+```
+
+## 兰考新水岛
+> **机组数量**:2台
+> 
+> **数据库id**:1451
+> 
+> **数据开始时间**:2025-09-04
+
+### UF机组变量
+```python
+UNITS = [1, 2]
+
+BASE_VARIABLES = [
+    "ns=3;s={}#UF_JSFLOW_O",      # 进水流量
+    "ns=3;s={}#UF_JSPRESS_O",     # 进水压力
+    "ns=3;s=UF{}_SSD_KMYC",       # 跨膜压差
+    "ns=3;s=UF{}_STEP",           # 步序/控制字
+    "ns=3;s=ZZ_{}#UFBWB_POWER", # 反洗泵功率
+]
+SYSTEM_VARIABLES = [
+    "ns=3;s=ZJS_TEMP_O",          # 进水温度
+    "ns=3;s=RO_JSORP_O",          # 总产水ORP
+    "ns=3;s=RO_JSPH_O",           # 总产水PH
+    "ns=3;s=RO_JSDD_O",           # 总产水电导
+    "ns=3;s=CN_LEVEL_O", # 次钠液位
+    "ns=3;s=S_LEVEL_O", # 酸液位
+    "ns=3;s=J_LEVEL_O", # 碱液位
+    "ns=3;s=ZZ_UFGSB_POWER", # 超滤供水泵功率
+    
+]
+```
+
+
+## 安镇
+> **机组数量**:2台
+> 
+> **数据库id**:1181
+> 
+> **数据开始时间**:2025-06-10
+
+### UF机组变量
+```python
+UNITS = [1, 2]
+
+BASE_VARIABLES = [
+    "AR.{}#UF_JSFLOW_O",      # 进水流量
+    "AR.{}#UF_JSPRESS_O",     # 进水压力
+    "AR.UF{}_SSD_KMYC",       # 跨膜压差
+    "AR.UF{}_STEP",           # 步序/控制字
+    "AR.ZZ_{}#UFBWB_POWER" # 反洗泵功率
+]
+SYSTEM_VARIABLES = [
+    "AR.ZJS_TEMP_O",          # 进水温度
+    "AR.RO_JSORP_O",          # 总产水ORP
+    "AR.RO_JSPH_O",           # 总产水PH
+    "AR.RO_JSDD_O",           # 总产水电导
+    "AR.CN_LEVEL_O",       # 次钠液位
+    "AR.S_LEVEL_O",           # 酸液位
+    "AR.J_LEVEL_O",           # 碱液位
+    "AR.ZZ_UFGSB_POWER", # 超滤供水泵功率
+]
+```
+
+
+## 盐城
+> **机组数量**:2台
+> 
+> **数据库id**:1497
+> 
+> **数据开始时间**:
+
+### UF机组变量
+```python
+UNITS = [1, 2]
+
+BASE_VARIABLES = [
+    "ns=3;s={}#UF_JSFLOW_O",      # 进水流量
+    "ns=3;s={}#UF_JSPRESS_O",     # 进水压力
+    "ns=3;s=UF{}_SSD_KMYC",       # 跨膜压差
+    "ns=3;s=UF{}_STEP",           # 步序/控制字
+    "ns=3;s=ZZ_{}#UFBWB_POWER" # 反洗泵功率
+]
+SYSTEM_VARIABLES = [
+    "ns=3;s=ZJS_TEMP_O",          # 进水温度
+    "ns=3;s=RO_JSORP_O",          # 总产水ORP
+    "ns=3;s=RO_JSPH_O",           # 总产水PH
+    "ns=3;s=RO_JSDD_O",           # 总产水电导
+    "ns=3;s=CN_LEVEL_O", # 次钠液位
+    "ns=3;s=S_LEVEL_O", # 酸液位
+    "ns=3;s=J_LEVEL_O", # 碱液位
+    "ns=3;s=ZZ_UFGSB_POWER", # 超滤供水泵功率
+]
+```
+
+
+## 锡山中荷
+> **系统类型**:超滤+反渗透系统
+> 
+> **机组数量**:4台
+> 
+> **数据库id**:92
+> 
+> **数据开始时间**:2024-03-01
+
+### UF机组变量
+```python
+UNITS = [1, 2, 3, 4]
+
+BASE_VARIABLES = [
+    "C.M.UF{}_FT_JS@out",  # 进水流量
+    "C.M.UF{}_PT_JS@out",  # 进水压力(如有)
+    "C.M.UF{}_DB@press_PV",  # 跨膜压差(如有)
+    "C.M.UF{}_DB@word_control" # 步序/控制字
+]
+
+SYSTEM_VARIABLES = [
+    "C.M.RO_TT_ZJS@out", # 进水温度
+    "C.M.UF_ORP_ZCS@out",  # 总产水ORP
+    "C.M.UF_PH_ZCS@out",  # 总产水PH
+    "C.M.RO_Cond_ZJS@out" # 总产水电导
+    "C.M.LT_NaClO@out",
+    "C.M.LT_HCl@out",  # 酸液位
+    "C.M.LT_NaOH@out"
+    "WG_A.1AA1-1.A13"  # 超滤供水泵A功率
+    "WG_A.1AA1-2.A13"  # 超滤供水泵B功率
+    "WG_A.1AA2-1.A13"  # 超滤反洗泵A功率
+    "WG_A.1AA1-2.A13"  # 超滤反洗泵B功率
+]
+```