Просмотр исходного кода

feat:增加了数据处理与物理模型中吨水电耗与药耗的计算逻辑

junc_WHU 1 месяц назад
Родитель
Сommit
d86a3e990d

+ 10 - 10
models/uf-rl/env/env_reset.py

@@ -169,18 +169,18 @@ class ResetSampler:
         # -------------------------
         # -------------------------
         # 阶段权重设计(非线性 + 提高虚拟工况)
         # 阶段权重设计(非线性 + 提高虚拟工况)
         # -------------------------
         # -------------------------
-        # w_real = (1.0 - progress) ** 1.2  # 历史工况逐渐衰减
-        # w_perturb = 0.5 * progress  # 周边扰动按线性增加
-        # w_virtual = 0.3 * progress ** 1.5  # 虚拟工况加快增长,后期最大约 0.3
-        #
-        # # perturb 扰动幅度
-        # perturb_scale = 0.02 + 0.04 * progress
-        w_real = 0.0
-        w_perturb = 0.0
-        w_virtual = 1.0
+        w_real = (1.0 - progress) ** 1.2  # 历史工况逐渐衰减
+        w_perturb = 0.5 * progress  # 周边扰动按线性增加
+        w_virtual = 0.3 * progress ** 1.5  # 虚拟工况加快增长,后期最大约 0.3
 
 
         # perturb 扰动幅度
         # perturb 扰动幅度
-        perturb_scale = 0.0
+        perturb_scale = 0.02 + 0.04 * progress
+        # w_real = 0.0
+        # w_perturb = 0.0
+        # w_virtual = 1.0
+        #
+        # # perturb 扰动幅度
+        # perturb_scale = 0.0
         return dict(
         return dict(
             w_real=w_real,
             w_real=w_real,
             w_perturb=w_perturb,
             w_perturb=w_perturb,

+ 1 - 1
models/uf-rl/env/uf_env.py

@@ -419,7 +419,7 @@ class UFSuperCycleEnv(gym.Env):
         residual_ratio = info['residual_ratio']
         residual_ratio = info['residual_ratio']
 
 
         # 吨水电耗指标
         # 吨水电耗指标
-        energy = info["ton_water_energy_kWh_per_m3"]
+        energy = info["ton_water_energy"]
 
 
         # ========== 回收率奖励项 ==========
         # ========== 回收率奖励项 ==========
         # 将回收率归一化到 [0, 1] 区间(基于预期范围)
         # 将回收率归一化到 [0, 1] 区间(基于预期范围)

+ 43 - 4
models/uf-rl/env/uf_physics.py

@@ -22,7 +22,7 @@ uf_physics_48h.py
 
 
 import numpy as np
 import numpy as np
 import copy
 import copy
-from env.env_params import UFState, UFPhysicsParams
+from env.env_params import UFState, UFPhysicsParams, UFStateBounds
 
 
 
 
 class UFPhysicsModel:
 class UFPhysicsModel:
@@ -38,6 +38,7 @@ class UFPhysicsModel:
     def __init__(
     def __init__(
             self,
             self,
             phys_params: UFPhysicsParams,
             phys_params: UFPhysicsParams,
+            state_bounds: UFStateBounds,
             resistance_model_fp=None,
             resistance_model_fp=None,
             resistance_model_bw=None,
             resistance_model_bw=None,
             IS_TIMES: bool = False,
             IS_TIMES: bool = False,
@@ -50,6 +51,7 @@ class UFPhysicsModel:
             IS_TIMES: CEB是否为固定次数,T为固定次数
             IS_TIMES: CEB是否为固定次数,T为固定次数
         """
         """
         self.p = phys_params
         self.p = phys_params
+        self.state_bounds = state_bounds
         self.model_fp = resistance_model_fp
         self.model_fp = resistance_model_fp
         self.model_bw = resistance_model_bw
         self.model_bw = resistance_model_bw
         self.IS_TIMES = IS_TIMES
         self.IS_TIMES = IS_TIMES
@@ -350,10 +352,45 @@ class UFPhysicsModel:
         # 日均产水时间(24小时内实际产水的时间)
         # 日均产水时间(24小时内实际产水的时间)
         daily_prod_time_h = k_bw_per_ceb * L_h / T_super_h * 24.0  # [小时]
         daily_prod_time_h = k_bw_per_ceb * L_h / T_super_h * 24.0  # [小时]
 
 
-        # 吨水电耗(从查找表获取最接近的值)
+        # 参考吨水电耗(从查找表获取最接近的值)
         # 从物理参数类中获取查找表
         # 从物理参数类中获取查找表
         closest_L = min(self.p.energy_lookup.keys(), key=lambda x: abs(x - L_s))
         closest_L = min(self.p.energy_lookup.keys(), key=lambda x: abs(x - L_s))
-        ton_water_energy = self.p.energy_lookup[closest_L]  # [kWh/m³]
+        refer_ton_water_energy = self.p.energy_lookup[closest_L]  # [kWh/m³]
+
+        # 实际吨水电耗计算
+        # 进水时间(小时)
+        t_feed_total_h = k_bw_per_ceb * L_h
+        # 反洗时间(小时)
+        t_bw_total_h = k_bw_per_ceb * t_bw_s / 3600.0
+
+        # 总能耗 (kWh)
+        E_total = (
+                t_feed_total_h * self.p.p_feed_kw
+                + t_bw_total_h * self.p.p_bw_kw
+        )
+
+        # 吨水电耗 (kWh/吨)
+        ton_water_energy = E_total / max(V_net, 1e-12)
+
+        # 吨水药耗计算
+        # ========== 吨水药耗计算 ==========
+        R_removed = state.ceb_removal
+
+        # 参数
+        R_min = self.state_bounds.ceb_removal_min
+        R_max = self.state_bounds.ceb_removal_max
+
+        dose_min = self.p.dose_min
+        dose_max = self.p.dose_max
+
+        # 防止越界
+        R_removed_clip = np.clip(R_removed, R_min, R_max)
+
+        # 线性映射计算加药量
+        dose = dose_min + (R_removed_clip - R_min) / (R_max - R_min) * (dose_max - dose_min)
+
+        # 吨水药耗
+        ton_water_chem = dose / max(V_net, 1e-12)
 
 
         # ===== 新指标:膜阻力允许上升空间 =====
         # ===== 新指标:膜阻力允许上升空间 =====
         # 该指标根据当前最大跨膜压差距离软约束跨膜压差的距离,动态计算当前周期允许上升的膜阻力值,用于后续清洗效果奖励计算
         # 该指标根据当前最大跨膜压差距离软约束跨膜压差的距离,动态计算当前周期允许上升的膜阻力值,用于后续清洗效果奖励计算
@@ -399,7 +436,9 @@ class UFPhysicsModel:
             "residual_ratio" : residual_ratio, # 污染上升比例
             "residual_ratio" : residual_ratio, # 污染上升比例
 
 
             # 能耗指标
             # 能耗指标
-            "ton_water_energy_kWh_per_m3": ton_water_energy,  # 吨水电耗
+            "refer_ton_water_energy": refer_ton_water_energy,  # 参考吨水电耗
+            "ton_water_energy": ton_water_energy,  # 吨水电耗
+            "ton_water_chem": ton_water_chem,  # 吨水药耗
         }
         }
 
 
         # 更新 state
         # 更新 state

+ 221 - 0
models/uf-rl/lankao/data_to_rl_config.yaml

@@ -0,0 +1,221 @@
+# ============================================================
+# 项目级路径配置
+# ============================================================
+
+Paths:
+  project_root: "E:/Greentech"
+  __comment__: >
+    项目根目录,所有路径均相对于该目录展开。
+    不同工程师只需修改这一项即可迁移环境。
+
+  raw_data:
+    filtered_cycles_dir: "models/uf-rl/datasets/UF_longting_data/processed/filter_segments"
+    cycle_file_pattern: "UF{unit}_filtered_cycles.csv"
+
+    enabled_units: [1, 2]
+    disabled_units: None
+
+    __comment__: >
+      已完成清洗与周期切分的真实工厂数据。
+      每个 CSV 对应一个机组,文件内每一行是一个【物理周期】。
+      data_to_rl 阶段会:
+        - 读取 enabled_units 中所有机组
+        - 合并为统一数据集
+        - 机组编号仅用于筛选,不进入状态空间
+
+  data_to_rl:
+    output_dir: "models/uf-rl/datasets/UF_longting_data/rl_ready/output"
+    cache_dir: "models/uf-rl/datasets/UF_longting_data/rl_ready/cache"
+
+    __comment__: >
+      data_to_rl 模块输出内容:
+        - 全机组合并后的状态空间范围
+        - reset 初始状态分布
+        - reward 统计量
+
+
+# ============================================================
+# 数据层级定义(非常重要)
+# ============================================================
+
+DataHierarchy:
+
+  physical_cycle:
+    id_column: "seg_id"
+    time_columns:
+      start: "start_time"
+      end: "end_time"
+    __comment__: >
+      物理周期是最细粒度的数据层级:
+      - 用于物理模型验证
+      - 用于环境内部子步模拟
+      - 不直接暴露给强化学习算法
+
+  chemical_cycle:
+    id_column: "chem_cycle_id"
+    validity_column: "chem_cycle_valid"
+    step_definition: "one_rl_step_per_chemical_cycle"
+    __comment__: >
+      化学周期是强化学习的【唯一 step 单位】。
+      强化学习中:
+        - 一个 step = 一个化学周期
+        - 一个状态 = 一个化学周期的整体状态
+
+# ============================================================
+# 强化学习状态定义(工程师重点关注)
+# ============================================================
+
+RLState:
+  level: chemical_cycle
+  dimension: 8
+  __comment__: >
+    强化学习 observation 的定义。
+    所有变量在一个 step(化学周期)内保持不变。
+
+  variables:
+
+    q_UF:
+      source: "flow_mean"
+      aggregation: "mean_within_cycle"
+      unit: "m3_per_h"
+      description: "化学周期代表性进水流量"
+      __comment__: >
+        原始数据在物理周期层级变化,
+        这里使用化学周期内的平均值作为状态。
+
+    temp:
+      source: "temp_mean"
+      aggregation: "mean_within_cycle"
+      unit: "celsius"
+      description: "化学周期代表性水温"
+
+    TMP:
+      source: "tmp_start"
+      selection: "first_physical_cycle"
+      unit: "MPa"
+      description: "化学周期起始跨膜压差"
+      __comment__: >
+        虽然 TMP 在物理周期内会变化,
+        但强化学习只关心化学周期开始时的状态。
+
+    R:
+      source: "R_scaled_start"
+      selection: "first_physical_cycle"
+      unit: "scaled_resistance"
+      description: "化学周期起始膜阻力"
+
+    nuK:
+      source: "cycle_nuK"
+      unit: "scaled"
+      description: "短期污染增长系数"
+      __comment__: >
+        在同一化学周期内视为常数,
+        后续 step (下一化学周期)中根据动作进行经验性更新。
+
+    slope:
+      source: "cycle_long_a"
+      unit: "scaled"
+      description: "长期不可逆污染幂律系数 a"
+
+    power:
+      source: "cycle_long_b"
+      unit: "dimensionless"
+      description: "长期不可逆污染幂律指数 b"
+
+    ceb_removal:
+      source: "cycle_R_removed"
+      unit: "scaled"
+      description: "CEB 可去除的膜阻力"
+
+# ============================================================
+# 状态空间范围提取(reset 使用)
+# ============================================================
+
+StateSpaceExtraction:
+  enabled: true
+  level: chemical_cycle
+  include_only_valid_cycles: true
+
+  __comment__: >
+    用真实工厂数据统计【合理状态空间范围】,
+    用于:
+      - reset 初始状态采样
+      - 状态边界 sanity check
+
+  statistics:
+    method: "empirical"
+    percentiles: [1, 5, 50, 95, 99]
+    __comment__: >
+      使用经验分布统计,避免假设正态分布。
+
+  output:
+    save_csv: true
+    filename: "state_space_bounds.csv"
+
+# ============================================================
+# 物理周期层级的用途声明
+# ============================================================
+
+PhysicalCycleUsage:
+  enabled: true
+
+  purposes:
+    - internal_simulation
+    - parameter_sanity_check
+    - post_training_validation
+
+  variables_available:
+    - R_scaled_start
+    - R_scaled_end
+    - tmp_start
+    - tmp_end
+    - flow_mean
+    - flow_std
+    - temp_mean
+    - temp_std
+
+  __comment__: >
+    物理周期数据:
+      ✔ 可用于环境内部计算
+      ✔ 可用于训练后验证
+      ✘ 不可作为 RL observation
+
+# ============================================================
+# 真实数据在强化学习中的角色(工程边界声明)
+# ============================================================
+
+RealDataUsagePolicy:
+
+  reset_initialization:
+    enabled: true
+    source: "chemical_cycle_state_distribution"
+    __comment__: >
+      reset 时从真实化学周期状态分布中采样,
+      确保训练从“现实可达状态”开始。
+
+  training:
+    use_real_transitions: false
+    __comment__: >
+      真实数据不用于:
+        - 动力学拟合
+        - 监督策略
+      强化学习仍在模拟环境中进行。
+
+  validation:
+    enabled: true
+    compare_with_real_cycles: true
+    __comment__: >
+      训练完成后:
+        - 将策略运行结果
+        - 与真实化学周期统计特性进行对比
+      用于评估策略的现实可行性与安全性。
+
+# ============================================================
+# 方法论与工程风险声明
+# ============================================================
+
+MethodologyNotes:
+  - "强化学习的 step 定义严格等同于一个化学周期"
+  - "所有 RL 状态变量在一个 step 内视为常数"
+  - "物理周期层级仅用于环境内部模拟"
+  - "真实数据用于分布约束与验证,而非动力学建模"

+ 26 - 0
models/uf-rl/lankao/dqn_config.yaml

@@ -0,0 +1,26 @@
+# ============================================================
+# DQN 超参数配置
+# ============================================================
+# 神经网络参数
+learning_rate: 0.0001           # 学习率 1e-4
+
+# 经验回放参数
+buffer_size: 100000              # 经验回放缓冲区大小
+learning_starts: 10000           # 开始训练前收集的步数
+batch_size: 32                   # 训练批次大小
+
+# 强化学习参数
+gamma: 0.95                      # 折扣因子
+train_freq: 4                    # 训练频率(步数)
+
+# 目标网络参数
+target_update_interval: 1        # 目标网络更新间隔
+tau: 0.005                       # 软更新系数
+
+# 探索策略参数
+exploration_initial_eps: 1.0     # 初始探索率
+exploration_fraction: 0.3        # 探索率衰减比例
+exploration_final_eps: 0.02      # 最终探索率
+
+# 日志参数
+remark: "uf_dqn_real_reset"      # 实验备注

+ 139 - 0
models/uf-rl/lankao/env_config.yaml

@@ -0,0 +1,139 @@
+UFState:
+  # ===== 膜动态运行参数 =====
+  q_UF: 150.0
+  TMP: 0.02
+  temp: 20.0
+  R: 200.0
+
+  # ===== 膜阻力模型参数 =====
+  nuK: 170.0
+  slope: 2.0
+  power: 1.1
+  ceb_removal: 200.0
+
+
+UFPhysicsParams:
+  # ===== TMP 全局约束 =====
+  global_TMP_hard_limit: 0.08 # 跨膜压差硬上限
+  global_TMP_soft_limit: 0.06 # 跨膜压差软上限
+
+  # ===== 物理反洗参数 =====
+  tau_bw_s: 20.0
+  gamma_t: 1.0
+  q_bw_m3ph: 1000.0
+
+  # ===== CEB 化学反洗参数 =====
+  T_ceb_interval_h: 48.0
+  v_ceb_m3: 20.0
+  t_ceb_s: 2400.0   # 40 * 60
+
+  # ===== 膜组件参数 =====
+  A: 5120.0   # 128 * 40
+
+  # ===== 吨水电耗查找表 =====
+  energy_lookup:
+    2700: 0.1088
+    2760: 0.1083
+    2820: 0.1078
+    2880: 0.1074
+    2940: 0.1070
+    3000: 0.1066
+    3060: 0.1062
+    3120: 0.1059
+    3180: 0.1055
+    3240: 0.1052
+    3300: 0.1049
+    3360: 0.1045
+    3420: 0.1042
+    3480: 0.1039
+    3540: 0.1036
+    3600: 0.1034
+    3660: 0.1031
+    3720: 0.1029
+    3780: 0.1026
+    3840: 0.1023
+    3900: 0.1021
+    3960: 0.1019
+    4020: 0.1017
+    4080: 0.1015
+    4140: 0.1012
+    4200: 0.1011
+    4260: 0.1008
+    4320: 0.1007
+    4380: 0.1005
+    4440: 0.1003
+    4500: 0.1001
+    4560: 0.0999
+    4620: 0.0998
+    4680: 0.0996
+    4740: 0.0995
+    4800: 0.0993
+
+
+UFActionSpec:
+  # ===== 动作空间范围 =====
+  L_min_s: 3000.0
+  L_max_s: 3800.0
+  t_bw_min_s: 50.0
+  t_bw_max_s: 70.0
+
+  # ===== 动作离散化步长 =====
+  L_step_s: 60.0
+  t_bw_step_s: 5.0
+
+
+UFRewardParams:
+  # ===== TMP 安全与惩罚 =====
+  global_TMP_hard_limit: 0.08
+  global_TMP_soft_limit: 0.06
+  w_tmp_hard: 5.0
+  w_tmp: 1.5
+  p: 3.0
+  w_trend: 1.0
+
+  # ===== 回收率 =====
+  k_rec: 1.0
+  rec_low: 0.85
+  rec_high: 0.95
+  w_rec: 2.0
+
+  # ===== 残余污染 =====
+  k_res: 2.0
+  residual_ref_ratio: null
+  w_res: 2.0
+
+  # ===== 吨水电耗 =====
+  k_energy: 1.0
+  energy_low: 0.1023
+  energy_high: 0.1066
+  energy_ref: 0.1044
+  w_energy: 1.0
+
+
+UFStateBounds:
+  # ===== 流量初始化约束 =====
+  q_UF_min: 130.0
+  q_UF_max: 170.0
+
+  # ===== 温度初始化约束 =====
+  temp_min: 14.0
+  temp_max: 32.0
+
+  # ===== TMP 初始化约束 =====
+  TMP0_min: 0.01
+  TMP0_max: 0.04
+  global_TMP_hard_limit: 0.08
+
+  # ===== 短期污染参数 =====
+  nuK_min: 100.0
+  nuK_max: 250.0
+
+  # ===== 长期污染参数 =====
+  slope_min: 1.28
+  slope_max: 150
+  power_min: 0.25
+  power_max: 2.5
+
+  # ===== CEB 去除能力 =====
+  ceb_removal_min: 150.0
+  ceb_removal_max: 300.0

+ 49 - 0
models/uf-rl/lankao/uf_analyze_config.yaml

@@ -0,0 +1,49 @@
+UF:
+  units: [ "1", "2" ]
+  area_m2: 128 * 40
+
+  inlet_codes: [215.0, 301.0]
+  stable_codes: [220.0, 260.0]
+
+  physical_bw_code: [301.0, 340.0]
+  chemical_bw_code: [400.0, 660.0]
+
+  # 列名
+  column_formats:
+    flow_col: "ns=3;s={unit}#UF_JSFLOW_O"
+    tmp_col: "ns=3;s=UF{unit}_SSD_KMYC"
+    press_col: "ns=3;s={unit}#UF_JSPRESS_O"
+    ctrl_col: "ns=3;s=UF{unit}_STEP"
+    temp_col: "ns=3;s=ZJS_TEMP_O"
+    orp_col: "ns=3;s=RO_JSORP_O"
+    ph_col: "ns=3;s=RO_JSPH_O"
+
+
+Params:
+  # 稳定段提取
+  min_stable_points: 30
+  initial_points: 10
+
+  # 阻力趋势计算
+  segment_head_n: 5
+  segment_tail_n: 5
+
+  scale_factor: 1e10
+
+
+Plot:
+  figsize: [12, 6]
+  dpi: 120
+  color_inlet: "#1f77b4"
+  color_bw_phys: "#ff7f0e"
+  color_bw_chem: "#d62728"
+
+Paths:
+  project_root: "E:/Greentech" # 请根据项目根目录修改相应路径
+
+  raw_data_path: "models/uf-rl/datasets/UF_lankao_data/raw"
+  output_path: "models/uf-rl/datasets/UF_lankao_data/processed/segments"
+  filter_output_path: "models/uf-rl/datasets/UF_lankao_data/processed/filter_segments"
+
+  output_format: "csv"
+

+ 14 - 9
models/uf-rl/longting/env_config.yaml

@@ -1,15 +1,15 @@
 UFState:
 UFState:
   # ===== 膜动态运行参数 =====
   # ===== 膜动态运行参数 =====
-  q_UF: 150.0
-  TMP: 0.02
-  temp: 20.0
-  R: 200.0
+  q_UF: 150.0 # 流量
+  TMP: 0.02 # CEB周期初始跨膜压差
+  temp: 20.0 # 温度
+  R: 200.0 # 膜阻力(占位,实际上应用时根据前三个变量计算)
 
 
   # ===== 膜阻力模型参数 =====
   # ===== 膜阻力模型参数 =====
-  nuK: 170.0
-  slope: 2.0
-  power: 1.1
-  ceb_removal: 200.0
+  nuK: 170.0 # 短期膜阻力上升速率
+  slope: 2.0 # 长期膜阻力上升幂律模型斜率
+  power: 1.1 # 长期膜阻力上升幂律模型次数
+  ceb_removal: 200.0 # 本次CEB去除膜阻力值
 
 
 
 
 UFPhysicsParams:
 UFPhysicsParams:
@@ -20,10 +20,11 @@ UFPhysicsParams:
   # ===== 物理反洗参数 =====
   # ===== 物理反洗参数 =====
   tau_bw_s: 20.0
   tau_bw_s: 20.0
   gamma_t: 1.0
   gamma_t: 1.0
-  q_bw_m3ph: 1000.0
+  q_bw_m3ph: 500.0
 
 
   # ===== CEB 化学反洗参数 =====
   # ===== CEB 化学反洗参数 =====
   T_ceb_interval_h: 48.0
   T_ceb_interval_h: 48.0
+  T_ceb_interval_times: 48
   v_ceb_m3: 20.0
   v_ceb_m3: 20.0
   t_ceb_s: 2400.0   # 40 * 60
   t_ceb_s: 2400.0   # 40 * 60
 
 
@@ -69,6 +70,10 @@ UFPhysicsParams:
     4740: 0.0995
     4740: 0.0995
     4800: 0.0993
     4800: 0.0993
 
 
+  # ===== 吨水电耗功率参考值 =====
+
+  # ===== 吨水药耗功率参考值 =====
+
 
 
 UFActionSpec:
 UFActionSpec:
   # ===== 动作空间范围 =====
   # ===== 动作空间范围 =====

+ 8 - 2
models/uf-rl/longting/uf_analyze_config.yaml

@@ -2,8 +2,8 @@ UF:
   units: [ "1", "2" ]
   units: [ "1", "2" ]
   area_m2: 128 * 40
   area_m2: 128 * 40
 
 
-  inlet_codes: [215.0, 301.0]
-  stable_codes: [220.0, 260.0]
+  inlet_codes: [221.0, 300.0]
+  stable_codes: [221.0, 260.0]
 
 
   physical_bw_code: [301.0, 340.0]
   physical_bw_code: [301.0, 340.0]
   chemical_bw_code: [400.0, 660.0]
   chemical_bw_code: [400.0, 660.0]
@@ -17,6 +17,12 @@ UF:
     temp_col: "ns=3;s=ZJS_TEMP_O"
     temp_col: "ns=3;s=ZJS_TEMP_O"
     orp_col: "ns=3;s=RO_JSORP_O"
     orp_col: "ns=3;s=RO_JSORP_O"
     ph_col: "ns=3;s=RO_JSPH_O"
     ph_col: "ns=3;s=RO_JSPH_O"
+    event_col: "event_type"
+    BWB_POWER_col: "ns=3;s=ZZ_{unit}#UFBWB_POWER"
+    GSB_POWER_col: "ns=3;s=ZZ_UFGSB_POWER"
+    NaClO_col: "ns=3;s=CN_LEVEL_O"
+    HCL_col: "ns=3;s=S_LEVEL_O"
+    NaOH_col: "ns=3;s=J_LEVEL_O"
 
 
 
 
 Params:
 Params:

+ 7 - 1
models/uf-rl/rl_model/DQN/dqn_model/dqn_statebuilder.py

@@ -36,6 +36,10 @@ class DQNStateBuilder:
         self.params = self.cfg.params
         self.params = self.cfg.params
 
 
         self.units = self.uf_cfg["units"]
         self.units = self.uf_cfg["units"]
+        self.column_formats = self.uf_cfg.get("column_formats", {})
+        self.flow_format = self.column_formats.get("flow_col", "C.M.{unit}_FT_JS@out")
+        self.ctrl_format = self.column_formats.get("ctrl_col", "C.M.{unit}_DB@word_control")
+
         self.area_m2 = self.uf_cfg["area_m2"]
         self.area_m2 = self.uf_cfg["area_m2"]
 
 
         self.scale_factor = self.params.get("scale_factor", 1e10)
         self.scale_factor = self.params.get("scale_factor", 1e10)
@@ -236,7 +240,7 @@ class DQNStateBuilder:
         根据列名自动识别 UF 单元编号
         根据列名自动识别 UF 单元编号
         """
         """
         for unit in self.units:
         for unit in self.units:
-            key = f"C.M.{unit}_FT_JS@out"
+            key = self.flow_format.format(unit=unit)
             if key in df.columns:
             if key in df.columns:
                 return unit
                 return unit
         raise ValueError("无法从 CSV 列名识别 UF 单元编号")
         raise ValueError("无法从 CSV 列名识别 UF 单元编号")
@@ -245,11 +249,13 @@ class DQNStateBuilder:
         """
         """
         为 DataFrame 标注 event_type
         为 DataFrame 标注 event_type
         """
         """
+        ctrl_col = self.uf_cfg["flow_col_template"].format(unit=unit_id)
         clf = UFEventClassifier(
         clf = UFEventClassifier(
             unit_name=unit_id,
             unit_name=unit_id,
             inlet_codes=self.uf_cfg["inlet_codes"],
             inlet_codes=self.uf_cfg["inlet_codes"],
             physical_code=self.uf_cfg["physical_bw_code"],
             physical_code=self.uf_cfg["physical_bw_code"],
             chemical_code=self.uf_cfg["chemical_bw_code"],
             chemical_code=self.uf_cfg["chemical_bw_code"],
+            ctrl_col
         )
         )
         df = clf.classify(df)
         df = clf.classify(df)
         df = clf.segment(df)
         df = clf.segment(df)

+ 30 - 11
models/uf-rl/rl_model/DQN/uf_decide/run_dqn_deicde_totalstate.py

@@ -21,7 +21,8 @@ from pathlib import Path
 # ========== 参数 / 物理 ==========
 # ========== 参数 / 物理 ==========
 from env.uf_resistance_models_load import load_resistance_models
 from env.uf_resistance_models_load import load_resistance_models
 from env.uf_physics import UFPhysicsModel
 from env.uf_physics import UFPhysicsModel
-from env.env_params import UFState, UFPhysicsParams, UFActionSpec
+from env.env_params import UFState, UFActionSpec
+from env.env_config_loader import EnvConfigLoader, create_env_params_from_yaml
 
 
 
 
 # ========== 决策器 ==========
 # ========== 决策器 ==========
@@ -31,11 +32,10 @@ from rl_model.DQN.dqn_model.dqn_statebuilder import DQNStateBuilder
 
 
 
 
 
 
-def build_physics(IS_TIMES):
+def build_physics(IS_TIMES, phys_params):
     """
     """
     构造与训练一致的物理模型(只做一次)
     构造与训练一致的物理模型(只做一次)
     """
     """
-    phys_params = UFPhysicsParams()
     res_fp, res_bw = load_resistance_models(phys_params)
     res_fp, res_bw = load_resistance_models(phys_params)
 
 
     physics = UFPhysicsModel(
     physics = UFPhysicsModel(
@@ -46,7 +46,7 @@ def build_physics(IS_TIMES):
     )
     )
     return physics
     return physics
 
 
-def generate_plc_instructions(current_L_s, current_t_bw_s, model_prev_L_s, model_prev_t_bw_s, model_L_s, model_t_bw_s):
+def generate_plc_instructions(action_spec,current_L_s, current_t_bw_s, model_prev_L_s, model_prev_t_bw_s, model_L_s, model_t_bw_s):
     """
     """
     根据工厂当前值、模型上一轮决策值和模型当前轮决策值,生成PLC指令。
     根据工厂当前值、模型上一轮决策值和模型当前轮决策值,生成PLC指令。
 
 
@@ -55,7 +55,7 @@ def generate_plc_instructions(current_L_s, current_t_bw_s, model_prev_L_s, model
        如果工厂当前值也为None,则返回None并提示错误。
        如果工厂当前值也为None,则返回None并提示错误。
     """
     """
 
 
-    action_spec = UFActionSpec()
+    action_spec = action_spec
     adjustment_threshold = 1.0
     adjustment_threshold = 1.0
 
 
     # 处理None值情况
     # 处理None值情况
@@ -192,7 +192,10 @@ def calc_uf_cycle_metrics(current_state, max_tmp_during_filtration, min_tmp_duri
 def run_dqn_decide(
 def run_dqn_decide(
     model_path: Path,
     model_path: Path,
     physics,
     physics,
-    # -------- 工厂当前值 --------
+    action_spec,
+    reward_params,
+    state_bounds,
+# -------- 工厂当前值 --------
     current_state: UFState
     current_state: UFState
 ):
 ):
     """
     """
@@ -202,6 +205,9 @@ def run_dqn_decide(
     # 构造决策器
     # 构造决策器
     decider = UFDQNDecider(
     decider = UFDQNDecider(
         physics=physics,
         physics=physics,
+        action_spec=action_spec,
+        reward_params=reward_params,
+        state_bounds=state_bounds,
         model_path=model_path,
         model_path=model_path,
         seed=0,
         seed=0,
     )
     )
@@ -222,11 +228,11 @@ if __name__ == "__main__":
 
 
     THIS_FILE = Path(__file__).resolve()
     THIS_FILE = Path(__file__).resolve()
     UF_RL_ROOT = THIS_FILE.parents[3]
     UF_RL_ROOT = THIS_FILE.parents[3]
-    CONFIG_PATH = UF_RL_ROOT / "config" / "uf_analyze_config.yaml"
-
-    MODEL_PATH = "model/dqn_model.zip"
-    prev_cycle_csv = "online_datasets/UF1_prev_cycle.csv"
-    init_cycle_csv = "online_datasets/UF1_init_cycle.csv"
+    CONFIG_PATH = UF_RL_ROOT / "xishan" / "uf_analyze_config.yaml"
+    ENV_CONFIG_PATH = UF_RL_ROOT / "xishan" / "env_config.yaml"
+    MODEL_PATH = UF_RL_ROOT / "xishan" / "48h_dqn_model.zip"
+    prev_cycle_csv = "test_online_datasets/UF1_prev_cycle.csv"
+    init_cycle_csv = "test_online_datasets/UF1_init_cycle.csv"
     IS_TIMES = False # 新增指定变量,表示CEB间隔为时间控制/次数控制,T表示48次bw一次CEB,F表示48h一次CEB
     IS_TIMES = False # 新增指定变量,表示CEB间隔为时间控制/次数控制,T表示48次bw一次CEB,F表示48h一次CEB
 
 
     # 构建强化学习状态
     # 构建强化学习状态
@@ -236,11 +242,24 @@ if __name__ == "__main__":
         init_cycle_csv=init_cycle_csv,
         init_cycle_csv=init_cycle_csv,
     )
     )
 
 
+    config_loader = EnvConfigLoader(ENV_CONFIG_PATH)
+    config_loader.validate_config()
+    config_loader.print_config_summary()
+    (
+        uf_state_default,  # UFState默认值
+        phys_params,  # UFPhysicsParams
+        action_spec,  # UFActionSpec
+        reward_params,  # UFRewardParams
+        state_bounds  # UFStateBounds
+    ) = create_env_params_from_yaml(ENV_CONFIG_PATH)
     physics = build_physics(IS_TIMES)
     physics = build_physics(IS_TIMES)
 
 
     action_id, model_L_s, model_t_bw_s = run_dqn_decide(
     action_id, model_L_s, model_t_bw_s = run_dqn_decide(
         model_path=MODEL_PATH,
         model_path=MODEL_PATH,
         physics=physics,
         physics=physics,
+        action_spec=action_spec,
+        reward_params=reward_params,
+        state_bounds=state_bounds,
         current_state=current_state,
         current_state=current_state,
     ) # 环境实例化,模型加载等功能放在UFDQNDecider类中
     ) # 环境实例化,模型加载等功能放在UFDQNDecider类中
 
 

+ 59 - 0
models/uf-rl/uf_data_process/calculate.py

@@ -78,6 +78,65 @@ class UFResistanceCalculator:
         return result_segments
         return result_segments
 
 
 
 
+class PumpPowerCalculator:
+    """计算每个segment的供水泵和反洗泵平均功率"""
+
+    def __init__(self, event_col="event_type"):
+        self.event_col = event_col
+
+    def calculate_for_segment(
+        self,
+        seg_df: pd.DataFrame,
+        inlet_power_col=None,
+        bw_power_col=None
+    ):
+        """计算单个segment平均功率"""
+        seg_df = seg_df.copy()
+
+        required_cols = [self.event_col, inlet_power_col, bw_power_col]
+
+        if not all(col in seg_df.columns for col in required_cols):
+            print("⚠️ segment缺少必要列")
+            return seg_df
+
+        # 转为数值
+        seg_df[inlet_power_col] = pd.to_numeric(seg_df[inlet_power_col], errors="coerce")
+        seg_df[bw_power_col] = pd.to_numeric(seg_df[bw_power_col], errors="coerce")
+
+        # inlet平均功率
+        inlet_rows = seg_df[seg_df[self.event_col] == "inlet"]
+        inlet_mean = inlet_rows[inlet_power_col].mean()
+
+        # bw_phys平均功率
+        bw_rows = seg_df[seg_df[self.event_col] == "bw_phys"]
+        bw_mean = bw_rows[bw_power_col].mean()
+
+        seg_df["inlet_pump_power_mean"] = inlet_mean
+        seg_df["bw_pump_power_mean"] = bw_mean
+
+        return seg_df
+
+
+    def calculate_for_segments(
+        self,
+        segments: list,
+        inlet_power_col=None,
+        bw_power_col=None
+    ):
+        """批量计算"""
+        result_segments = []
+
+        for seg in segments:
+            seg_res = self.calculate_for_segment(
+                seg,
+                inlet_power_col=inlet_power_col,
+                bw_power_col=bw_power_col
+            )
+            result_segments.append(seg_res)
+
+        return result_segments
+
+
 # =============================
 # =============================
 #   单变量稳定性分析(例如 flow/TMP)
 #   单变量稳定性分析(例如 flow/TMP)
 # =============================
 # =============================

+ 177 - 95
models/uf-rl/uf_data_process/data_export.py

@@ -10,8 +10,8 @@ DB_HOST = "222.130.26.206"
 DB_PORT = 4000
 DB_PORT = 4000
 
 
 # 时间配置
 # 时间配置
-START_TIME = datetime(2025, 11, 28, 0, 0, 0)
-END_TIME = datetime(2026, 2, 20, 0, 0, 0)
+START_TIME = datetime(2025, 6, 11, 0, 0, 0)
+END_TIME = datetime(2026, 3, 1, 6, 0, 0)
 BOUNDARY = datetime(2025, 3, 25, 0, 0, 0)
 BOUNDARY = datetime(2025, 3, 25, 0, 0, 0)
 
 
 DELETE_PERIODS = [
 DELETE_PERIODS = [
@@ -23,26 +23,36 @@ DELETE_PERIODS = [
 ]  # 来自张昊师兄的,需要被去除的黑名单区间
 ]  # 来自张昊师兄的,需要被去除的黑名单区间
 DELETE_PERIODS = [(pd.to_datetime(s), pd.to_datetime(e)) for s, e in DELETE_PERIODS]
 DELETE_PERIODS = [(pd.to_datetime(s), pd.to_datetime(e)) for s, e in DELETE_PERIODS]
 
 
-# 新水岛
-# ---------- 传感器配置(新水厂系统) ----------
+# ---------- 传感器配置----------
 # 机组编号
 # 机组编号
 UNITS = [1, 2]
 UNITS = [1, 2]
 
 
-# 新系统变量模板
 BASE_VARIABLES = [
 BASE_VARIABLES = [
     "ns=3;s={}#UF_JSFLOW_O",  # 进水流量
     "ns=3;s={}#UF_JSFLOW_O",  # 进水流量
-    "ns=3;s={}#UF_JSPRESS_O",  # 进水压力(如有)
-    "ns=3;s=UF{}_SSD_KMYC",  # 跨膜压差(如有)
-    "ns=3;s=UF{}_STEP" # 步序
+    "ns=3;s={}#UF_JSPRESS_O",  # 进水压力
+    "ns=3;s=UF{}_SSD_KMYC",  # 跨膜压差
+    "ns=3;s=UF{}_STEP",  # 步序/控制字
+    "ns=3;s=ZZ_{}#UFBWB_POWER",  # 反洗泵功率
 ]
 ]
-
-# 系统总变量(如果存在)
 SYSTEM_VARIABLES = [
 SYSTEM_VARIABLES = [
-    "ns=3;s=ZJS_TEMP_O", # 进水温度
-    "ns=3;s=RO_JSORP_O",  # 总产水ORP(示例)
-    "ns=3;s=RO_JSPH_O",  # 总产水PH(示例)
+    "ns=3;s=ZJS_TEMP_O",  # 进水温度
+    "ns=3;s=RO_JSORP_O",  # 总产水ORP
+    "ns=3;s=RO_JSPH_O",  # 总产水PH
+    "ns=3;s=RO_JSDD_O",  # 总产水电导
+    "ns=3;s=CN_LEVEL_O",  # 次钠液位
+    "ns=3;s=S_LEVEL_O",  # 酸液位
+    "ns=3;s=J_LEVEL_O",  # 碱液位
+    "ns=3;s=ZZ_UFGSB_POWER",  # 超滤供水泵功率
+
 ]
 ]
 
 
+# 输出目录
+BASE_OUTPUT_DIR = "../datasets/UF_lankao_data"
+PROCESSED_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "raw")
+
+# 创建目录
+os.makedirs(PROCESSED_OUTPUT_DIR, exist_ok=True)
+
 # 生成所有变量名称
 # 生成所有变量名称
 SENSOR_NAMES = []
 SENSOR_NAMES = []
 
 
@@ -56,13 +66,6 @@ print(f"总共查询 {len(SENSOR_NAMES)} 个变量")
 for i, var in enumerate(SENSOR_NAMES, 1):
 for i, var in enumerate(SENSOR_NAMES, 1):
     print(f"{i:2d}. {var}")
     print(f"{i:2d}. {var}")
 
 
-# 输出目录 - 只需要processed文件夹
-BASE_OUTPUT_DIR = "../datasets/UF_longting_data"
-PROCESSED_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "raw")
-
-# 创建目录
-os.makedirs(PROCESSED_OUTPUT_DIR, exist_ok=True)
-
 
 
 # ---------- 创建数据库引擎 ----------
 # ---------- 创建数据库引擎 ----------
 def create_db_engines():
 def create_db_engines():
@@ -93,21 +96,20 @@ def create_db_engines():
 # ---------- 一分钟聚合查询函数(子查询方式)----------
 # ---------- 一分钟聚合查询函数(子查询方式)----------
 def fetch_valve_aggregated(name, start, end, engine, interval_minutes=1):
 def fetch_valve_aggregated(name, start, end, engine, interval_minutes=1):
     """
     """
-    从数据库获取传感器数据并直接进行时间聚合
-    使用子查询方式避免 ONLY_FULL_GROUP_BY 问题
+    从数据库获取传感器数据并按分钟聚合,时间戳对齐到分钟开始(00秒)
     """
     """
     interval_seconds = interval_minutes * 60
     interval_seconds = interval_minutes * 60
 
 
     sql = text(f"""
     sql = text(f"""
         SELECT 
         SELECT 
-            MIN(h_time) AS time,
+            FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(MIN(h_time)) / {interval_seconds}) * {interval_seconds}) AS time,
             AVG(val) AS val
             AVG(val) AS val
         FROM (
         FROM (
             SELECT 
             SELECT 
                 h_time,
                 h_time,
                 val,
                 val,
                 FLOOR(UNIX_TIMESTAMP(h_time) / {interval_seconds}) AS time_group
                 FLOOR(UNIX_TIMESTAMP(h_time) / {interval_seconds}) AS time_group
-            FROM dc_item_history_data_1450
+            FROM dc_item_history_data_1451
             WHERE item_name = :name
             WHERE item_name = :name
               AND h_time BETWEEN :st AND :et
               AND h_time BETWEEN :st AND :et
               AND val IS NOT NULL
               AND val IS NOT NULL
@@ -120,77 +122,156 @@ def fetch_valve_aggregated(name, start, end, engine, interval_minutes=1):
         df = pd.read_sql(sql, engine, params={"name": name, "st": start, "et": end})
         df = pd.read_sql(sql, engine, params={"name": name, "st": start, "et": end})
         if not df.empty:
         if not df.empty:
             df['time'] = pd.to_datetime(df['time'])
             df['time'] = pd.to_datetime(df['time'])
+            # 确保时间戳是整分钟(去除秒和微秒)
+            df['time'] = df['time'].dt.floor('1min')
             print(f"  ✓ {name}: {len(df)} 条记录")
             print(f"  ✓ {name}: {len(df)} 条记录")
         return df
         return df
     except Exception as e:
     except Exception as e:
         print(f"  ✗ {name} 查询失败: {str(e)}")
         print(f"  ✗ {name} 查询失败: {str(e)}")
         return pd.DataFrame()
         return pd.DataFrame()
 
 
+def fetch_special_data(sensor, start, end, boundary, engine_test, engine_prod):
+    """
+    专门获取步序数据,保持原始变化点
+    返回原始时间戳和值
+    """
+    sql = text("""
+               SELECT h_time AS time, val
+               FROM dc_item_history_data_1451
+               WHERE item_name = :name
+                 AND h_time BETWEEN :st
+                 AND :et
+                 AND val IS NOT NULL
+               ORDER BY h_time
+               """)
+
+    try:
+        # 根据边界时间选择合适的数据库引擎
+        if end <= boundary:
+            df = pd.read_sql(sql, engine_test, params={"name": sensor, "st": start, "et": end})
+        elif start >= boundary:
+            df = pd.read_sql(sql, engine_prod, params={"name": sensor, "st": start, "et": end})
+        else:
+            df1 = pd.read_sql(sql, engine_test, params={"name": sensor, "st": start, "et": boundary})
+            df2 = pd.read_sql(sql, engine_prod, params={"name": sensor, "st": boundary, "et": end})
+            df = pd.concat([df1, df2], ignore_index=True)
+
+        if not df.empty:
+            df['time'] = pd.to_datetime(df['time'])
+
+        return df
+    except Exception as e:
+        print(f"  ✗ {sensor} 查询失败: {str(e)}")
+        return pd.DataFrame()
+
 
 
 # ---------- 传感器数据查询函数(只获取聚合数据)----------
 # ---------- 传感器数据查询函数(只获取聚合数据)----------
 def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test, engine_prod):
 def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test, engine_prod):
     """
     """
     获取多个传感器的分钟级聚合数据并合并为宽表
     获取多个传感器的分钟级聚合数据并合并为宽表
-
-    参数:
-        sensor_names: 传感器名称列表
-        start_time: 开始时间
-        end_time: 结束时间
-        boundary: 数据库切换边界
-        engine_test: 测试数据库引擎
-        engine_prod: 生产数据库引擎
+    步序变量单独处理
     """
     """
-    all_data = []  # 只存储聚合数据
-
-    print("\n开始获取各传感器数据:")
-
-    for sensor in sensor_names:
-        try:
-            # 根据边界时间选择合适的数据库引擎
-            if end_time <= boundary:
-                # 全部在测试数据库
-                df = fetch_valve_aggregated(sensor, start_time, end_time, engine_test)
-            elif start_time >= boundary:
-                # 全部在生产数据库
-                df = fetch_valve_aggregated(sensor, start_time, end_time, engine_prod)
-            else:
-                # 跨越两个数据库
-                df1 = fetch_valve_aggregated(sensor, start_time, boundary, engine_test)
-                df2 = fetch_valve_aggregated(sensor, boundary, end_time, engine_prod)
-                df = pd.concat([df1, df2], ignore_index=True)
-
-            if not df.empty:
-                # 重命名列以区分不同传感器
-                df_renamed = df.rename(columns={'val': sensor})
-                all_data.append(df_renamed[['time', sensor]])
-            else:
-                print(f"  ⚠ {sensor}: 无数据")
-
-        except Exception as e:
-            print(f"  ⚠ {sensor}: 处理失败 - {str(e)}")
-            continue
+    # 识别步序变量和功率变量
+    step_vars = [v for v in sensor_names if 'STEP' in v]
+    power_vars = [v for v in sensor_names if 'POWER' in v]
+
+    # 需要特殊处理的变量
+    special_vars = step_vars + power_vars
+
+    # 其他连续变量
+    continuous_vars = [v for v in sensor_names if v not in special_vars]
+
+    print(f"\n识别到 {len(special_vars)} 个离散变量, {len(continuous_vars)} 个连续变量")
+
+    all_data = []
+
+    # 1. 处理连续变量(按分钟聚合,时间对齐到整分钟)
+    if continuous_vars:
+        print("\n获取连续变量数据(分钟平均):")
+        for sensor in continuous_vars:
+            try:
+                # 根据边界时间选择合适的数据库引擎
+                if end_time <= boundary:
+                    df = fetch_valve_aggregated(sensor, start_time, end_time, engine_test)
+                elif start_time >= boundary:
+                    df = fetch_valve_aggregated(sensor, start_time, end_time, engine_prod)
+                else:
+                    df1 = fetch_valve_aggregated(sensor, start_time, boundary, engine_test)
+                    df2 = fetch_valve_aggregated(sensor, boundary, end_time, engine_prod)
+                    df = pd.concat([df1, df2], ignore_index=True)
+
+                if not df.empty:
+                    # 确保时间戳是整分钟
+                    df['time'] = pd.to_datetime(df['time']).dt.floor('1min')
+                    # 按时间去重(同一分钟可能有多个聚合结果)
+                    df = df.drop_duplicates(subset=['time'], keep='first')
+                    df_renamed = df.rename(columns={'val': sensor})
+                    all_data.append(df_renamed[['time', sensor]])
+                else:
+                    print(f"  ⚠ {sensor}: 无数据")
+
+            except Exception as e:
+                print(f"  ⚠ {sensor}: 处理失败 - {str(e)}")
+                continue
+
+    # 2. 处理离散变量(保持原始变化点,但标记时间)
+    if special_vars:
+        print("\n获取离散变量数据(原始变化点):")
+        for sensor in special_vars:
+            try:
+                # 获取原始数据(不聚合)
+                df = fetch_special_data(sensor, start_time, end_time, boundary, engine_test, engine_prod)
+
+                if not df.empty:
+                    # 将时间戳对齐到分钟,用于后续扩展
+                    df['time_min'] = df['time'].dt.floor('1min')
+                    # 重命名值列
+                    df_renamed = df.rename(columns={'val': sensor})
+                    all_data.append(df_renamed[['time_min', sensor]].rename(columns={'time_min': 'time'}))
+                    print(f"  ✓ {sensor}: {len(df)} 个变化点")
+                else:
+                    print(f"  ⚠ {sensor}: 无数据")
+
+            except Exception as e:
+                print(f"  ⚠ {sensor}: 处理失败 - {str(e)}")
+                continue
 
 
     if not all_data:
     if not all_data:
         print("\n❌ 未获取到任何传感器数据")
         print("\n❌ 未获取到任何传感器数据")
         return pd.DataFrame()
         return pd.DataFrame()
 
 
-    print(f"\n开始合并 {len(all_data)} 个传感器的数据...")
-
-    # 使用reduce逐步合并DataFrame
-    merged_df = reduce(
-        lambda left, right: pd.merge(left, right, on='time', how='outer'),
-        all_data
+    # 3. 创建完整的时间网格(整分钟)
+    print(f"\n创建完整时间网格...")
+    time_grid = pd.date_range(
+        start=start_time.replace(second=0, microsecond=0),  # 对齐到秒0
+        end=end_time.replace(second=0, microsecond=0),
+        freq='1min'
     )
     )
+    merged_df = pd.DataFrame({'time': time_grid})
+    print(f"时间网格: {len(time_grid)} 个时间点")
 
 
-    # 按时间排序
-    merged_df = merged_df.sort_values('time').reset_index(drop=True)
+    # 4. 逐个合并数据
+    print(f"\n开始合并 {len(all_data)} 个传感器的数据...")
+    for df in all_data:
+        df['time'] = pd.to_datetime(df['time']).dt.floor('1min')
+        # 对于步序变量,需要先扩展填充
+        sensor_name = df.columns[1]  # 获取传感器名称
+        if sensor_name in step_vars:
+            # 步序变量:前向填充
+            merged_df = merged_df.merge(df, on='time', how='left')
+            merged_df[sensor_name] = merged_df[sensor_name].fillna(method='ffill')
+        else:
+            # 连续变量:直接合并
+            merged_df = merged_df.merge(df, on='time', how='left')
 
 
     print(f"合并完成,共 {len(merged_df)} 条时间记录")
     print(f"合并完成,共 {len(merged_df)} 条时间记录")
 
 
-    # 删除黑名单时段
-    print("删除黑名单时段...")
+    # 5. 删除黑名单时段
+    print("\n删除黑名单时段...")
     original_len = len(merged_df)
     original_len = len(merged_df)
     for s, e in DELETE_PERIODS:
     for s, e in DELETE_PERIODS:
+        s = pd.to_datetime(s).floor('1min')
+        e = pd.to_datetime(e).floor('1min')
         merged_df = merged_df[(merged_df['time'] < s) | (merged_df['time'] > e)]
         merged_df = merged_df[(merged_df['time'] < s) | (merged_df['time'] > e)]
 
 
     deleted_count = original_len - len(merged_df)
     deleted_count = original_len - len(merged_df)
@@ -200,17 +281,13 @@ def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test,
     return merged_df
     return merged_df
 
 
 
 
+
 # ---------- 数据后处理函数(填充空值)----------
 # ---------- 数据后处理函数(填充空值)----------
-def post_process_data(df, method='both'):
+def post_process_data(df, continuous_vars, step_vars):
     """
     """
-    对聚合后的数据进行后处理:填充空值
-
-    参数:
-        df: 输入的宽表DataFrame
-        method: 填充方法
-
-    返回:
-        pd.DataFrame: 处理后的DataFrame
+    对聚合后的数据进行后处理
+    连续变量:线性插值或前后填充
+    步序变量:已经前向填充,只需处理开头可能存在的空值
     """
     """
     if df.empty:
     if df.empty:
         print("警告:输入数据为空")
         print("警告:输入数据为空")
@@ -227,21 +304,22 @@ def post_process_data(df, method='both'):
     missing_before = df_processed.isnull().sum().sum()
     missing_before = df_processed.isnull().sum().sum()
     print(f"填充前空值数量: {missing_before}")
     print(f"填充前空值数量: {missing_before}")
 
 
-    if method == 'bfill':
-        df_processed = df_processed.bfill()
-    elif method == 'ffill':
-        df_processed = df_processed.ffill()
-    else:  # both
-        df_processed = df_processed.ffill().bfill()
+    # 处理连续变量(使用线性插值更适合连续物理量)
+    for var in continuous_vars:
+        if var in df_processed.columns:
+            # 先线性插值,再前后填充边界
+            df_processed[var] = df_processed[var].interpolate(method='linear', limit_direction='both')
+
+    # 处理步序变量(可能开头有NaN,用后向填充)
+    for var in step_vars:
+        if var in df_processed.columns:
+            df_processed[var] = df_processed[var].bfill()  # 开头空值用第一个有效值填充
 
 
     missing_after = df_processed.isnull().sum().sum()
     missing_after = df_processed.isnull().sum().sum()
     print(f"填充后空值数量: {missing_after}")
     print(f"填充后空值数量: {missing_after}")
     print(f"填充了 {missing_before - missing_after} 个空值")
     print(f"填充了 {missing_before - missing_after} 个空值")
 
 
-    # 重置索引
-    df_processed = df_processed.reset_index().rename(columns={'index': 'time'})
-
-    return df_processed
+    return df_processed.reset_index()
 
 
 
 
 # ---------- 数据分块保存函数 ----------
 # ---------- 数据分块保存函数 ----------
@@ -303,12 +381,16 @@ if __name__ == "__main__":
     print(f"\n✅ 聚合数据获取完成!")
     print(f"\n✅ 聚合数据获取完成!")
     print(f"总数据量: {len(agg_df)} 条记录")
     print(f"总数据量: {len(agg_df)} 条记录")
     print(f"时间范围: {agg_df['time'].min()} 到 {agg_df['time'].max()}")
     print(f"时间范围: {agg_df['time'].min()} 到 {agg_df['time'].max()}")
-    print(f"时间间隔: 1分钟")
-    print(f"传感器数量: {len(agg_df.columns) - 1} 个")  # 减1是因为time列
+    print(f"时间间隔: 1分钟(整点)")
+    print(f"传感器数量: {len(agg_df.columns) - 1} 个")
+
+    # 识别步序变量
+    step_vars = [col for col in agg_df.columns if 'STEP' in col]
+    continuous_vars = [col for col in agg_df.columns if col != 'time' and col not in step_vars]
 
 
-    # 3. 后处理聚合数据(填充空值)
-    print(f"\n[3/4] 后处理聚合数据(填充空值)...")
-    processed_df = post_process_data(agg_df, method='both')
+    # 3. 后处理数据
+    print(f"\n[3/4] 后处理聚合数据...")
+    processed_df = post_process_data(agg_df, continuous_vars, step_vars)
 
 
     print(f"\n✅ 后处理完成!")
     print(f"\n✅ 后处理完成!")
     print(f"处理后数据行数: {len(processed_df)}")
     print(f"处理后数据行数: {len(processed_df)}")

+ 43 - 0
models/uf-rl/uf_data_process/filter.py

@@ -112,6 +112,49 @@ class InletSegmentFilter:
         return stable_segments
         return stable_segments
 
 
 
 
+import pandas as pd
+import numpy as np
+
+
+class FlowOutlierFilter:
+    """
+    对稳定段进行异常值剔除
+    规则:flow 不在 mean ± n*std 范围内的行删除
+    """
+
+    def __init__(self, n_sigma=3):
+        self.n_sigma = n_sigma
+
+    def filter_segment(self, seg: pd.DataFrame, flow_col, prefix="flow"):
+
+        seg = seg.copy()
+
+        mean_col = f"{prefix}_mean"
+        std_col = f"{prefix}_std"
+
+        # 如果统计列不存在,直接返回
+        if mean_col not in seg.columns or std_col not in seg.columns:
+            return seg
+
+        mean_val = seg[mean_col].iloc[0]
+        std_val = seg[std_col].iloc[0]
+
+        lower = mean_val - self.n_sigma * std_val
+        upper = mean_val + self.n_sigma * std_val
+
+        seg = seg[(seg[flow_col] >= lower) & (seg[flow_col] <= upper)]
+
+        return seg
+
+    def filter_segments(self, segments, flow_col, prefix="flow"):
+
+        result = []
+
+        for seg in segments:
+            filtered = self.filter_segment(seg, flow_col, prefix)
+            result.append(filtered)
+
+        return result
 
 
 
 
 
 

+ 39 - 0
models/uf-rl/uf_data_process/fit.py

@@ -386,3 +386,42 @@ class ChemicalBackwashCleaner:
             cycles[ids[-1]]["R_removed"] = None
             cycles[ids[-1]]["R_removed"] = None
 
 
         return cycles
         return cycles
+
+    def compute_dose(self, cycles, NaClO_col, HCl_col, NaOH_col):
+        ids = sorted(cycles.keys())
+
+        for i in range(len(ids) - 1):
+            cid1 = ids[i]
+            cid2 = ids[i + 1]
+
+            c1 = cycles[cid1]
+            c2 = cycles[cid2]
+
+            if not (c1["valid"] and c2["valid"]):
+                c1["R_removed"] = None
+                continue
+
+            # 上周期末:最后段的 R_end
+            seg_last = c1["segments"][-1]
+            NaClO_dose_end = seg_last[NaClO_col]
+            HCl_dose_end = seg_last[HCl_col]
+            NaOH_dose_end = seg_last[NaOH_col]
+
+            # 下周期初:第一段的 R_start
+            seg_first = c2["segments"][0]
+            NaClO_dose_start = seg_first[NaClO_col]
+            HCl_dose_start = seg_last[HCl_col]
+            NaOH_dose_start = seg_last[NaOH_col]
+
+            c1["NaClO_dose_removed"] = NaClO_dose_end - NaClO_dose_start
+            c1["HCl_dose_removed"] = HCl_dose_end - HCl_dose_start
+            c1["NaOH_dose_removed"] = NaOH_dose_end - NaOH_dose_start
+
+
+        # 最后一个周期无 CEB 去除值
+        if ids:
+            cycles[ids[-1]]["NaClO_dose_removed"] = None
+            cycles[ids[-1]]["HCl_dose_removed"] = None
+            cycles[ids[-1]]["NaOH_dose_removed"] = None
+
+        return cycles

+ 27 - 7
models/uf-rl/uf_data_process/pipeline.py

@@ -6,8 +6,8 @@ import pandas as pd
 from pathlib import Path
 from pathlib import Path
 from load import UFConfigLoader, UFDataLoader
 from load import UFConfigLoader, UFDataLoader
 from label import UFEventClassifier, PostBackwashInletMarker
 from label import UFEventClassifier, PostBackwashInletMarker
-from filter import ConstantFlowFilter, EventQualityFilter, InletSegmentFilter
-from calculate import UFResistanceCalculator, VariableStabilityAnalyzer, UFResistanceAnalyzer, UFPressureAnalyzer
+from filter import ConstantFlowFilter, EventQualityFilter, InletSegmentFilter,FlowOutlierFilter
+from calculate import UFResistanceCalculator, PumpPowerCalculator, VariableStabilityAnalyzer, UFResistanceAnalyzer, UFPressureAnalyzer
 from fit import ChemicalCycleSegmenter, ChemicalBackwashCleaner, ShortTermCycleFoulingFitter,LongTermFoulingFitter
 from fit import ChemicalCycleSegmenter, ChemicalBackwashCleaner, ShortTermCycleFoulingFitter,LongTermFoulingFitter
 
 
 class UFAnalysisPipeline:
 class UFAnalysisPipeline:
@@ -44,14 +44,20 @@ class UFAnalysisPipeline:
         self.ctrl_format = column_formats.get("ctrl_col", "C.M.{unit}_DB@word_control")
         self.ctrl_format = column_formats.get("ctrl_col", "C.M.{unit}_DB@word_control")
         self.flow_format = column_formats.get("flow_col", "C.M.{unit}_FT_JS@out")
         self.flow_format = column_formats.get("flow_col", "C.M.{unit}_FT_JS@out")
         self.tmp_format = column_formats.get("tmp_col", "C.M.{unit}_DB@press_PV")
         self.tmp_format = column_formats.get("tmp_col", "C.M.{unit}_DB@press_PV")
+        self.BWB_POWER_format = column_formats.get("BWB_POWER_col", "ns=3;s=ZZ_{unit}#UFBWB_POWER")
+        self.GSB_POWER_col = column_formats.get("GSB_POWER_col", "ns=3;s=ZZ_UFGSB_POWER")
         self.temp_col = column_formats.get("temp_col", "C.M.RO_TT_ZJS@out")
         self.temp_col = column_formats.get("temp_col", "C.M.RO_TT_ZJS@out")
         self.orp_col = column_formats.get("orp_col", "C.M.UF_ORP_ZCS@out")
         self.orp_col = column_formats.get("orp_col", "C.M.UF_ORP_ZCS@out")
+        self.NaClO_col = column_formats.get("NaClO_col", "ns=3;s=CN_LEVEL_O")
+        self.HCl_col = column_formats.get("HCl_col", "ns=3;s=S_LEVEL_O")
+        self.NaOH_col = column_formats.get("NaOH_col", "ns=3;s=J_LEVEL_O")
 
 
         # 过滤器
         # 过滤器
-        self.min_points = params.get("min_points", 40)
+        self.min_points = params.get("min_points", 20)
         self.initial_points = params.get("initial_points", 10)
         self.initial_points = params.get("initial_points", 10)
 
 
         self.quality_filter = EventQualityFilter(min_points=self.min_points)
         self.quality_filter = EventQualityFilter(min_points=self.min_points)
+        self.flow_filter = FlowOutlierFilter(n_sigma=3)
         self.initial_label = PostBackwashInletMarker(n_points=self.initial_points)
         self.initial_label = PostBackwashInletMarker(n_points=self.initial_points)
 
 
         # 阻力计算器
         # 阻力计算器
@@ -59,6 +65,8 @@ class UFAnalysisPipeline:
         self.segment_head_n = params.get("segment_head_n", 10)
         self.segment_head_n = params.get("segment_head_n", 10)
         self.segment_tail_n = params.get("segment_tail_n", 10)
         self.segment_tail_n = params.get("segment_tail_n", 10)
 
 
+        # 功率计算器
+        self.power_calc = PumpPowerCalculator(event_col=column_formats.get("event_col", "event_type"))
 
 
 
 
     # ----------------------------
     # ----------------------------
@@ -148,13 +156,17 @@ class UFAnalysisPipeline:
         if not np.issubdtype(df["time"].dtype, np.datetime64):
         if not np.issubdtype(df["time"].dtype, np.datetime64):
             df["time"] = pd.to_datetime(df["time"])
             df["time"] = pd.to_datetime(df["time"])
 
 
+        # 只保留 2025-06-11 及之后的数据
+        start_date = pd.Timestamp("2025-06-11")
+        df = df[df["time"] >= start_date].reset_index(drop=True)
+
         # 逐机组处理
         # 逐机组处理
         for unit in self.units:
         for unit in self.units:
             print(f"Processing {unit} ...")
             print(f"Processing {unit} ...")
             ctrl_col = self.ctrl_format.format(unit=unit)
             ctrl_col = self.ctrl_format.format(unit=unit)
             flow_col = self.flow_format.format(unit=unit)
             flow_col = self.flow_format.format(unit=unit)
             tmp_col = self.tmp_format.format(unit=unit)
             tmp_col = self.tmp_format.format(unit=unit)
-
+            BWB_POWER_col = self.BWB_POWER_format.format(unit=unit)
 
 
             # 去除无关列
             # 去除无关列
             other_units = [u for u in self.units if u != unit]
             other_units = [u for u in self.units if u != unit]
@@ -174,6 +186,11 @@ class UFAnalysisPipeline:
             segments = const_flow_filter.filter(seg_df) # 去除出现网络错误的进水段
             segments = const_flow_filter.filter(seg_df) # 去除出现网络错误的进水段
             segments = self.quality_filter.filter(segments) # 去除时间过短的进水段
             segments = self.quality_filter.filter(segments) # 去除时间过短的进水段
 
 
+            # -----------------------------
+            # 逐段计算本段的进水泵平均功率与反洗泵功率
+            # -----------------------------
+            segments = self.power_calc.calculate_for_segments(segments,inlet_power_col=self.GSB_POWER_col,bw_power_col=BWB_POWER_col)
+
             # 提取稳定进水段
             # 提取稳定进水段
             stable_extractor = InletSegmentFilter(ctrl_col,stable_codes=self.stable_codes,min_points=self.min_points)
             stable_extractor = InletSegmentFilter(ctrl_col,stable_codes=self.stable_codes,min_points=self.min_points)
             stable_segments = stable_extractor.extract(segments)  # 提取稳定进水数据
             stable_segments = stable_extractor.extract(segments)  # 提取稳定进水数据
@@ -193,6 +210,7 @@ class UFAnalysisPipeline:
                 tmp_col=tmp_col,
                 tmp_col=tmp_col,
             )
             )
 
 
+
             # -----------------------------
             # -----------------------------
             # 逐段计算 起止跨膜压差tmp_start/tmp_end 放缩后的R_start/R_end 进水变量稳定性
             # 逐段计算 起止跨膜压差tmp_start/tmp_end 放缩后的R_start/R_end 进水变量稳定性
             # -----------------------------
             # -----------------------------
@@ -201,6 +219,7 @@ class UFAnalysisPipeline:
             vsa = VariableStabilityAnalyzer()
             vsa = VariableStabilityAnalyzer()
             stable_segments = vsa.analyze_segments(stable_segments, col=flow_col, prefix="flow")
             stable_segments = vsa.analyze_segments(stable_segments, col=flow_col, prefix="flow")
             stable_segments = vsa.analyze_segments(stable_segments, col=self.temp_col, prefix="temp")
             stable_segments = vsa.analyze_segments(stable_segments, col=self.temp_col, prefix="temp")
+            stable_segments = self.flow_filter.filter_segments(stable_segments,flow_col=flow_col,prefix="flow" )
 
 
             # 跨膜压差统计
             # 跨膜压差统计
             upa = UFPressureAnalyzer(
             upa = UFPressureAnalyzer(
@@ -223,7 +242,7 @@ class UFAnalysisPipeline:
             # ----------------------------------
             # ----------------------------------
             # 1. 化学周期划分
             # 1. 化学周期划分
             # ----------------------------------
             # ----------------------------------
-            cycle_segmenter = ChemicalCycleSegmenter(max_hours=60)
+            cycle_segmenter = ChemicalCycleSegmenter(max_hours = 100)
             cycles = cycle_segmenter.assign_cycles(df_unit, stable_segments)
             cycles = cycle_segmenter.assign_cycles(df_unit, stable_segments)
 
 
             # ----------------------------------
             # ----------------------------------
@@ -266,10 +285,11 @@ class UFAnalysisPipeline:
                     seg["cycle_long_r2"] = r2
                     seg["cycle_long_r2"] = r2
 
 
             # ----------------------------------
             # ----------------------------------
-            # 4. 计算化学反冲洗去除膜阻力
+            # 4. 计算化学反冲洗去除膜阻力及耗药量
             # ----------------------------------
             # ----------------------------------
             cbc = ChemicalBackwashCleaner(unit)
             cbc = ChemicalBackwashCleaner(unit)
             cycles = cbc.compute_removal(cycles)
             cycles = cbc.compute_removal(cycles)
+            cycles = cbc.compute_dose(cycles, self.NaClO_col, self.HCl_col, self.NaOH_col)
 
 
             # 回写到稳定段
             # 回写到稳定段
             for cid, cycle in cycles.items():
             for cid, cycle in cycles.items():
@@ -342,7 +362,7 @@ class UFAnalysisPipeline:
             if "chem_cycle_id" not in df_segments.columns:
             if "chem_cycle_id" not in df_segments.columns:
                 print(f"No chem_cycle_id found for unit {unit}, skipping cycle filtering.")
                 print(f"No chem_cycle_id found for unit {unit}, skipping cycle filtering.")
             else:
             else:
-                df_valid = df_segments.dropna(subset=["chem_cycle_id"]).copy()
+                df_valid = df_segments.dropna(subset=["chem_cycle_id", "cycle_nuK_R2", "cycle_long_r2"]).copy()
                 df_valid["chem_cycle_id"] = df_valid["chem_cycle_id"].astype(int)
                 df_valid["chem_cycle_id"] = df_valid["chem_cycle_id"].astype(int)
 
 
                 # 2. 根据 R2 过滤化学周期
                 # 2. 根据 R2 过滤化学周期

+ 171 - 0
models/uf-rl/uf_data_process/水厂变量名称记录.md

@@ -0,0 +1,171 @@
+# 各水厂OPC UA变量配置总览
+
+## 当前缺失变量:
+> 安镇: 未查询到泵组功率变量,只有功率因数
+>
+> 龙亭等新水岛供水泵功率命名均为总功率,需确认是否为所有超滤机组的总用电,且需要确认功率单位
+> 
+> 盐城:ns=3;s=ZZ_UFGSB_POWER命名为UF供水泵相电压总功率,需确认是否表示超滤供水泵功率
+> 
+> 
+## 龙亭新水岛
+> **机组数量**:2台
+> 
+> **数据库id**:1450
+> 
+> **数据开始时间**:2025-6-11
+
+### UF机组变量
+```python
+UNITS = [1, 2]
+
+BASE_VARIABLES = [
+    "ns=3;s={}#UF_JSFLOW_O",      # 进水流量
+    "ns=3;s={}#UF_JSPRESS_O",     # 进水压力
+    "ns=3;s=UF{}_SSD_KMYC",       # 跨膜压差
+    "ns=3;s=UF{}_STEP",           # 步序/控制字
+    "ns=3;s=ZZ_{}#UFBWB_POWER", # 反洗泵功率
+]
+SYSTEM_VARIABLES = [
+    "ns=3;s=ZJS_TEMP_O",          # 进水温度
+    "ns=3;s=RO_JSORP_O",          # 总产水ORP
+    "ns=3;s=RO_JSPH_O",           # 总产水PH
+    "ns=3;s=RO_JSDD_O",           # 总产水电导
+    "ns=3;s=CN_LEVEL_O", # 次钠液位
+    "ns=3;s=S_LEVEL_O", # 酸液位
+    "ns=3;s=J_LEVEL_O", # 碱液位
+    "ns=3;s=ZZ_UFGSB_POWER", # 超滤供水泵功率
+    
+]
+```
+
+## 兰考新水岛
+> **机组数量**:2台
+> 
+> **数据库id**:1451
+> 
+> **数据开始时间**:
+
+### UF机组变量
+```python
+UNITS = [1, 2]
+
+BASE_VARIABLES = [
+    "ns=3;s={}#UF_JSFLOW_O",      # 进水流量
+    "ns=3;s={}#UF_JSPRESS_O",     # 进水压力
+    "ns=3;s=UF{}_SSD_KMYC",       # 跨膜压差
+    "ns=3;s=UF{}_STEP",           # 步序/控制字
+    "ns=3;s=ZZ_{}#UFBWB_POWER", # 反洗泵功率
+]
+SYSTEM_VARIABLES = [
+    "ns=3;s=ZJS_TEMP_O",          # 进水温度
+    "ns=3;s=RO_JSORP_O",          # 总产水ORP
+    "ns=3;s=RO_JSPH_O",           # 总产水PH
+    "ns=3;s=RO_JSDD_O",           # 总产水电导
+    "ns=3;s=CN_LEVEL_O", # 次钠液位
+    "ns=3;s=S_LEVEL_O", # 酸液位
+    "ns=3;s=J_LEVEL_O", # 碱液位
+    "ns=3;s=ZZ_UFGSB_POWER", # 超滤供水泵功率
+    
+]
+```
+
+
+## 安镇
+> **机组数量**:2台
+> 
+> **数据库id**:1181
+> 
+> **数据开始时间**:
+
+### UF机组变量
+```python
+UNITS = [1, 2]
+
+BASE_VARIABLES = [
+    "AR.{}#UF_JSFLOW_O",      # 进水流量
+    "AR.{}#UF_JSPRESS_O",     # 进水压力
+    "AR.UF{}_SSD_KMYC",       # 跨膜压差
+    "AR.UF{}_STEP",           # 步序/控制字
+]
+SYSTEM_VARIABLES = [
+    "AR.ZJS_TEMP_O",          # 进水温度
+    "AR.RO_JSORP_O",          # 总产水ORP
+    "AR.RO_JSPH_O",           # 总产水PH
+    "AR.RO_JSDD_O",           # 总产水电导
+    "AR.CN_LEVEL_O",       # 次钠液位
+    "AR.S_LEVEL_O",           # 酸液位
+    "AR.J_LEVEL_O",           # 碱液位
+    "ns=3;s=CN_LEVEL_O", # 次钠液位
+    "ns=3;s=S_LEVEL_O", # 酸液位
+    "ns=3;s=J_LEVEL_O", # 碱液位
+    "ns=3;s=ZZ_UFGSB_POWER", # 超滤供水泵功率
+]
+```
+
+
+## 盐城
+> **机组数量**:2台
+> 
+> **数据库id**:1497
+> 
+> **数据开始时间**:
+
+### UF机组变量
+```python
+UNITS = [1, 2]
+
+BASE_VARIABLES = [
+    "ns=3;s={}#UF_JSFLOW_O",      # 进水流量
+    "ns=3;s={}#UF_JSPRESS_O",     # 进水压力
+    "ns=3;s=UF{}_SSD_KMYC",       # 跨膜压差
+    "ns=3;s=UF{}_STEP",           # 步序/控制字
+    "ns=3;s=ZZ_{}#UFBWB_POWER" # 反洗泵功率
+]
+SYSTEM_VARIABLES = [
+    "ns=3;s=ZJS_TEMP_O",          # 进水温度
+    "ns=3;s=RO_JSORP_O",          # 总产水ORP
+    "ns=3;s=RO_JSPH_O",           # 总产水PH
+    "ns=3;s=RO_JSDD_O",           # 总产水电导
+    "ns=3;s=CN_LEVEL_O", # 次钠液位
+    "ns=3;s=S_LEVEL_O", # 酸液位
+    "ns=3;s=J_LEVEL_O", # 碱液位
+    "ns=3;s=ZZ_UFGSB_POWER", # 超滤供水泵功率
+]
+```
+
+
+## 锡山中荷
+> **系统类型**:超滤+反渗透系统
+> 
+> **机组数量**:4台
+> 
+> **数据库id**:92
+> 
+> **数据开始时间**:2024-03-01
+
+### UF机组变量
+```python
+UNITS = [1, 2, 3, 4]
+
+BASE_VARIABLES = [
+    "C.M.UF{}_FT_JS@out",  # 进水流量
+    "C.M.UF{}_PT_JS@out",  # 进水压力(如有)
+    "C.M.UF{}_DB@press_PV",  # 跨膜压差(如有)
+    "C.M.UF{}_DB@word_control" # 步序/控制字
+]
+
+SYSTEM_VARIABLES = [
+    "C.M.RO_TT_ZJS@out", # 进水温度
+    "C.M.UF_ORP_ZCS@out",  # 总产水ORP
+    "C.M.UF_PH_ZCS@out",  # 总产水PH
+    "C.M.RO_Cond_ZJS@out" # 总产水电导
+    "C.M.LT_NaClO@out",
+    "C.M.LT_HCl@out",  # 酸液位
+    "C.M.LT_NaOH@out"
+    "WG_A.1AA1-1.A13"  # 超滤供水泵A功率
+    "WG_A.1AA1-2.A13"  # 超滤供水泵B功率
+    "WG_A.1AA2-1.A13"  # 超滤反洗泵A功率
+    "WG_A.1AA1-2.A13"  # 超滤反洗泵B功率
+]
+```