Browse Source

feat:增加了环境体中吨水电耗与药耗相关奖励的计算逻辑
debug:修复了数据读取文件中各传感器时间戳不一致导致合并数据过大的问题

junc_WHU 3 weeks ago
parent
commit
406baf4010

+ 20 - 15
models/uf-rl/env/env_params.py

@@ -256,7 +256,7 @@ class UFPhysicsParams:
     # 膜有效面积(锡山水厂配置:128组膜,每组40m²)
     A: float = 5120.0  # [m²]
 
-    # 吨水电耗查找表
+    # 参考吨水电耗查找表
     energy_lookup: Dict[int, float] = field(default_factory=lambda: {
         2700: 0.1088, 2760: 0.1083, 2820: 0.1078, 2880: 0.1074,
         2940: 0.1070, 3000: 0.1066, 3060: 0.1062, 3120: 0.1059,
@@ -269,6 +269,15 @@ class UFPhysicsParams:
         4620: 0.0998, 4680: 0.0996, 4740: 0.0995, 4800: 0.0993,
     })
 
+    # 实际吨水电耗计算指标
+    p_feed_kw: float = 18.0
+    p_bw_kw: float = 20.0
+
+    # 实际吨水药耗计算指标
+    dose_min: float = 0.10
+    dose_max: float = 0.20
+
+
 @dataclass(frozen=True)
 class UFActionSpec:
     """
@@ -297,28 +306,24 @@ class UFRewardParams:
     # TMP 硬上限(MPa)
     # 说明:超过此值将导致 episode 失败,需立即停机
     global_TMP_soft_limit: float = 0.06     # TMP 软上限 (MPa)
+
     w_tmp_hard: float = 5.0      # TMP超硬限固定惩罚
     w_tmp: float = 1.5            # TMP软限惩罚
     p: float = 3                  # TMP软限非线性指数
     w_trend: float = 1.0          # TMP趋势惩罚权重
 
-    # 回收率
-    k_rec: float = 5.0
-    rec_low: float = 0.92
-    rec_high: float = 0.99
-    w_rec: float = 1.0             # 回收率权重
+    # 经济成本
+    k_cost: float = 3.0
+    cost_low: float = 0.10
+    cost_high: float = 0.16
+    w_cost: float = 1.0
+    alpha_chemical: float = 5.0
 
     # 残余污染
-    k_res: float = 5.0
+    k_res: float = 3.0
     residual_ref_ratio: float = None   # 动态=1/max_episode_steps
-    w_res: float = 2.0                 # 残余污染权重
-
-    # 吨水电耗
-    k_energy: float = 5.0
-    energy_low: float = 0.0993
-    energy_high: float = 0.1034
-    energy_ref: float = 0.1011
-    w_energy: float = 1.0              # 能耗权重
+    w_res: float = 1.0
+
 
 
 

+ 13 - 13
models/uf-rl/env/env_reset.py

@@ -166,21 +166,21 @@ class ResetSampler:
     def _get_sampling_config(self, progress: float) -> dict:
         progress = np.clip(progress, 0.0, 1.0)
 
-        # -------------------------
-        # 阶段权重设计(非线性 + 提高虚拟工况)
-        # -------------------------
-        w_real = (1.0 - progress) ** 1.2  # 历史工况逐渐衰减
-        w_perturb = 0.5 * progress  # 周边扰动按线性增加
-        w_virtual = 0.3 * progress ** 1.5  # 虚拟工况加快增长,后期最大约 0.3
-
-        # perturb 扰动幅度
-        perturb_scale = 0.02 + 0.04 * progress
-        # w_real = 0.0
-        # w_perturb = 0.0
-        # w_virtual = 1.0
+        # # -------------------------
+        # # 阶段权重设计(非线性 + 提高虚拟工况)
+        # # -------------------------
+        # w_real = (1.0 - progress) ** 1.2  # 历史工况逐渐衰减
+        # w_perturb = 0.5 * progress  # 周边扰动按线性增加
+        # w_virtual = 0.3 * progress ** 1.5  # 虚拟工况加快增长,后期最大约 0.3
         #
         # # perturb 扰动幅度
-        # perturb_scale = 0.0
+        # perturb_scale = 0.02 + 0.04 * progress
+        w_real = 0.0
+        w_perturb = 0.0
+        w_virtual = 1.0
+
+        # perturb 扰动幅度
+        perturb_scale = 0.0
         return dict(
             w_real=w_real,
             w_perturb=w_perturb,

+ 10 - 7
models/uf-rl/env/env_visual.py

@@ -73,20 +73,23 @@ class UFTrainingCallback(BaseCallback):
                 L_s = step_info["L_s"]
                 t_bw_s = step_info["t_bw_s"]
                 initial_tmp = step_info["initial_tmp"]
-                max_TMP_during_filtration = step_info["max_TMP_during_filtration"]
                 tmp_after_ceb = step_info["tmp_after_ceb"]
+                max_TMP_during_filtration = step_info["max_TMP_during_filtration"]
+                tmp_penalty = step_info["tmp_penalty"]
+
                 residual_ratio =step_info["residual_ratio"]
-                rec_reward = step_info["rec_reward"]
-                energy_reward = step_info["energy_reward"]
-                recovery = step_info["recovery"]
                 res_penalty = step_info["res_penalty"]
 
+                econ_reward = step_info["econ_reward"]
+                recovery = step_info["recovery"]
+
+
                 # 打印当前 step 的信息
                 if self.verbose:
                     print(f"[Step {self.num_timesteps}] 动作={step_action}, 奖励={step_reward:.3f}, Done={step_done}, L_s={L_s}, t_bw_s={t_bw_s},"
-                          f"residual_ratio = {residual_ratio:.4f},recovery = {recovery:.4f},"
-                          f"rec_reward = {rec_reward:.4f}, energy_reward = {energy_reward:.4f}, res_penalty = {res_penalty:.4f},"
-                          f"initial_tmp = {initial_tmp:.4f}, max_TMP_during_filtration ={max_TMP_during_filtration:.4f}, tmp_after_ceb = {tmp_after_ceb:.4f} ")
+                          f"residual_ratio = {residual_ratio:.4f}, res_penalty = {res_penalty:.4f},"
+                          f",recovery = {recovery:.4f},econ_reward  = {econ_reward :.4f}"
+                          f"initial_tmp = {initial_tmp:.4f}, tmp_after_ceb = {tmp_after_ceb:.4f}, max_TMP_during_filtration ={max_TMP_during_filtration:.4f}, tmp_penalty = {tmp_penalty:.4f}")
 
                 # 记录数据
                 self.recorder.record_step(

+ 83 - 81
models/uf-rl/env/uf_env.py

@@ -309,11 +309,9 @@ class UFSuperCycleEnv(gym.Env):
         if info["max_TMP_during_filtration"] > self.reward_params.global_TMP_soft_limit:
             info_next, _ = self.physics.simulate_one_supercycle(state=next_state,L_s=L_s,t_bw_s=t_bw_s)
 
-
-        reward, tmp_penalty, rec_reward, energy_reward, res_penalty = self._calculate_reward(info, info_next)
+        reward,tmp_penalty,econ_reward,res_penalty= self._calculate_reward(info, info_next)
         info["tmp_penalty"] = tmp_penalty
-        info["rec_reward"] = rec_reward
-        info["energy_reward"] = energy_reward
+        info["econ_reward"] = econ_reward
         info["res_penalty"] = res_penalty
 
         self.state = next_state
@@ -352,119 +350,123 @@ class UFSuperCycleEnv(gym.Env):
 
         return next_obs, reward, terminated, truncated, info
 
-    def _calculate_reward(self, info: dict, info_next) -> float:
+    def _calculate_reward(self, info: dict, info_next=None):
         """
-        计算强化学习奖励函数(扩展版
+        计算强化学习奖励函数(经济性 + 系统稳定性
 
-        功能:
-        - 平衡回收率、残余污染和吨水电耗三个目标
-        - TMP不直接参与奖励计算(通过失败判定间接影响)
-        - 使用 tanh 函数实现平滑的非线性奖励
+        奖励结构:
+            Reward = 经济奖励 + 污染控制奖励 + TMP风险惩罚
 
-        参数
-            info (dict): 周期性能指标字典,需包含
-                - recovery: 回收率 [0-1]
-                - R_after_ceb: 本周期结束膜阻力
-                - initial_R: 本周期初始膜阻力
-                - delta_R_allow: 本周期允许最大阻力上升
-                - ton_water_energy_kWh_per_m3: 本周期吨水电耗
+        经济奖励:
+            基于吨水电耗 + 吨水药耗
+
+        稳定性奖励:
+            - 残余污染控制
+            - TMP软限制
+            - TMP增长趋势
 
         返回:
-            float: 奖励值(通常在 -3 到 +3 之间)
-
-        设计思想:
-        - 高回收率 → 水资源利用率高 → 正奖励
-        - 低残余污染 → 膜长期稳定运行 → 正奖励
-        - 低吨水电耗 → 节能 → 正奖励
-        - 三者需要权衡:过短的过滤时间提高回收率但污染去除不彻底;过长时间污染控制好但回收率下降,过高功率增加耗能
-
-        参考点设计:
-        - 残余污染:
-            - 高污染参考点 = 1 / self.max_episode_steps
-            - 平衡点 = 0.5 / self.max_episode_steps
-        - 吨水电耗:
-            - 高点 = 0.1034 kWh/m³
-            - 平衡点 = 0.1011 kWh/m³
-            - 低点 = 0.0993 kWh/m³
-        - 回收率参考点保持原有设计
+            total_reward,
+            tmp_penalty,
+            econ_reward,
+            res_penalty
         """
 
-        #新增:将TMP超限改为持续惩罚
-        # ========== TMP 超标风险惩罚项 ==========
+        # ==============================
+        # TMP 状态惩罚
+        # ==============================
+
         tmp = info["max_TMP_during_filtration"]
         tmp_soft = self.reward_params.global_TMP_soft_limit
         tmp_hard = self.reward_params.global_TMP_hard_limit
 
         if self.tmp_over_limit_flag:
             tmp_state_penalty = -self.reward_params.w_tmp_hard
+
         elif tmp <= tmp_soft:
             tmp_state_penalty = 0.0
+
         elif tmp < tmp_hard:
             x = (tmp - tmp_soft) / (tmp_hard - tmp_soft)
-            tmp_state_penalty = -self.reward_params.w_tmp * x ** self.reward_params.p
 
-        # -------- TMP 趋势惩罚 --------
+            tmp_state_penalty = -self.reward_params.w_tmp * (
+                    x ** self.reward_params.p
+            )
+
+        else:
+            tmp_state_penalty = -self.reward_params.w_tmp_hard
+
+        # ==============================
+        # TMP 趋势惩罚
+        # ==============================
+
         tmp_trend_penalty = 0.0
+
         if info_next is not None:
-            delta_tmp = info_next["max_TMP_during_filtration"] - tmp
+            delta_tmp = (
+                    info_next["max_TMP_during_filtration"] - tmp
+            )
+
+            # 只惩罚TMP上升
+            delta_tmp = max(delta_tmp, 0)
+
             tmp_trend_penalty = -self.reward_params.w_trend * delta_tmp
 
         tmp_penalty = tmp_state_penalty + tmp_trend_penalty
 
-        # ========== 提取性能指标 ==========
-        recovery = info["recovery"]  # 回收率 [0-1]
+        # ==============================
+        # 残余污染惩罚
+        # ==============================
 
-        # 污染比例:实际上升的阻力 / 允许上升的阻力
-        # 允许上升的阻力值 = 当前阻力值软上限 - 当前阻力
-        residual_ratio = info['residual_ratio']
+        residual_ratio = info["residual_ratio"]
 
-        # 吨水电耗指标
-        energy = info["ton_water_energy"]
+        ref_residual = 1 / self.max_episode_steps
 
-        # ========== 回收率奖励项 ==========
-        # 将回收率归一化到 [0, 1] 区间(基于预期范围)
-        rec_norm = (recovery - self.reward_params.rec_low) / (self.reward_params.rec_high - self.reward_params.rec_low)
+        res_penalty = -np.tanh(
+            self.reward_params.k_res *
+            (residual_ratio / ref_residual - 1)
+        )
 
-        # 使用 tanh 函数构建平滑的 S 型奖励曲线
-        # - rec_norm = 0.5 时(回收率处于中间值),rec_reward = 0
-        # - rec_norm > 0.5 时,rec_reward > 0(鼓励高回收率)
-        # - rec_norm < 0.5 时,rec_reward < 0(惩罚低回收率)
-        # - k_rec 控制曲线陡峭程度,越大变化越陡
-        rec_reward = np.clip(np.tanh(self.reward_params.k_rec * (rec_norm - 0.5)), -1, 1)
+        # ==============================
+        # 经济成本(电耗 + 药耗)
+        # ==============================
 
-        # ========== 残余污染惩罚项 ==========
-        # 新参考点:每步允许上升比例 = 1 / max_episode_steps
-        # 平衡点 = 0.8 / max_episode_steps
-        ref_residual = 0.8 / self.max_episode_steps
+        energy = info["ton_water_energy"]
+        chemical = info["ton_water_chem"]
 
-        # 使用 tanh 构建惩罚曲线
-        # - residual_ratio < 平衡点时,res_penalty > 0(奖励低污染)
-        # - residual_ratio > 平衡点时,res_penalty < 0(惩罚高污染)
-        # - k_res 控制曲线陡峭程度
-        res_penalty = -np.tanh(self.reward_params.k_res * (residual_ratio / ref_residual - 1))
+        alpha = self.reward_params.alpha_chemical
 
-        # ========== 吨水电耗奖励项 ==========
-        # 设置高/平衡/低点
-        energy_low = 0.0993
-        energy_high = 0.1034
+        cost = energy + alpha * chemical
 
-        # 将能耗归一化到 [0, 1],平衡点对应 energy_norm = 0.5
-        energy_norm = (energy - energy_low) / (energy_high - energy_low)
+        # 成本归一化范围
+        cost_low = self.reward_params.cost_low
+        cost_high = self.reward_params.cost_high
 
-        # 使用 tanh 构建平滑奖励
-        # - energy_norm < 0.5 时,energy_reward > 0(节能奖励)
-        # - energy_norm > 0.5 时,energy_reward < 0(高能耗惩罚)
-        # - k_energy 控制曲线陡峭程度
-        energy_reward = -np.tanh(self.reward_params.k_energy * (energy_norm - 0.5))
+        cost_norm = (
+                (cost - cost_low) /
+                (cost_high - cost_low)
+        )
 
-        # ========== 组合奖励 ==========
-        # 简单线性组合三项(为污染项加权)
-        total_reward = rec_reward + 2.0 * res_penalty + energy_reward + tmp_penalty
+        econ_reward = -np.tanh(
+            self.reward_params.k_cost *
+            (cost_norm - 0.5)
+        )
 
+        # ==============================
+        # 总奖励
+        # ==============================
 
-        # 可选:添加平移项使特定点的奖励为零(当前未使用)
-        # total_reward -= offset
+        total_reward = (
+                econ_reward
+                + res_penalty
+                + tmp_penalty
+        )
 
-        return total_reward, tmp_penalty, rec_reward, energy_reward, res_penalty
+        return (
+            total_reward,
+            tmp_penalty,
+            econ_reward,
+            res_penalty
+        )
 
 

+ 7 - 10
models/uf-rl/env/uf_physics.py

@@ -364,10 +364,7 @@ class UFPhysicsModel:
         t_bw_total_h = k_bw_per_ceb * t_bw_s / 3600.0
 
         # 总能耗 (kWh)
-        E_total = (
-                t_feed_total_h * self.p.p_feed_kw
-                + t_bw_total_h * self.p.p_bw_kw
-        )
+        E_total = t_feed_total_h * self.p.p_feed_kw+ t_bw_total_h * self.p.p_bw_kw
 
         # 吨水电耗 (kWh/吨)
         ton_water_energy = E_total / max(V_net, 1e-12)
@@ -394,14 +391,14 @@ class UFPhysicsModel:
 
         # ===== 新指标:膜阻力允许上升空间 =====
         # 该指标根据当前最大跨膜压差距离软约束跨膜压差的距离,动态计算当前周期允许上升的膜阻力值,用于后续清洗效果奖励计算
-
+        delta_R = R_after_ceb - initial_R
         delta_R_allow = max(
-            self.resistance_from_tmp(self.p.global_TMP_soft_limit, state.q_UF, state.temp) -
+            self.resistance_from_tmp(self.p.global_TMP_hard_limit, state.q_UF, state.temp) -
             self.resistance_from_tmp(max_tmp_during_filtration, state.q_UF, state.temp),
             1e-6
         )
         if delta_R_allow > 50:
-            residual_ratio = (R_after_ceb - initial_R) / delta_R_allow
+            residual_ratio = delta_R / delta_R_allow
         else:
             residual_ratio = 1.0
 
@@ -481,8 +478,8 @@ class UFPhysicsModel:
                - 原因:回收率太低说明反洗水耗过大,经济性差
                - 阈值:75%(可调整)
 
-            3. 残余污染累积过快:(R_after_ceb - R0) / R0 > 0.1
-               - 原因:单个超级周期污染增长超过10%,长期运行不可持续
+            3. 残余污染累积过快:(R_after_ceb - R0) / R0 > 0.05
+               - 原因:单个超级周期污染增长超过5%,长期运行不可持续
                - 阈值:10%(可调整)
             """
         # ========== 获取关键指标 ==========
@@ -504,7 +501,7 @@ class UFPhysicsModel:
 
         # 条件3:污染增长比例超过容许范围
         residual_increase = (R_after_ceb - R0) / delta_R_allow
-        if residual_increase > 1 / 30:
+        if residual_increase > 1 / 20:
             return False  # 失败
 
         # 所有条件通过

+ 13 - 16
models/uf-rl/longting/env_config.yaml

@@ -70,9 +70,11 @@ UFPhysicsParams:
     4740: 0.0995
     4800: 0.0993
 
-  # ===== 吨水电耗功率参考值 =====
+  p_feed_kw:  18.0
+  p_bw_kw:  20.0
+  dose_min:  0.10
+  dose_max:  0.20
 
-  # ===== 吨水药耗功率参考值 =====
 
 
 UFActionSpec:
@@ -96,23 +98,18 @@ UFRewardParams:
   p: 3.0
   w_trend: 1.0
 
-  # ===== 回收率 =====
-  k_rec: 5.0
-  rec_low: 0.85
-  rec_high: 0.95
-  w_rec: 2.0
+  # ===== 经济成本 =====
+  k_cost: 3.0
+  cost_low: 0.10
+  cost_high:  0.16
+  w_cost: 1.0
+  alpha_chemical: 5.0
 
   # ===== 残余污染 =====
-  k_res: 10.0
+  k_res: 3.0
   residual_ref_ratio: null
-  w_res: 2.0
-
-  # ===== 吨水电耗 =====
-  k_energy: 5.0
-  energy_low: 0.1023
-  energy_high: 0.1066
-  energy_ref: 0.1044
-  w_energy: 1.0
+  w_res: 1.0
+
 
 
 UFStateBounds:

+ 1 - 0
models/uf-rl/rl_model/DQN/uf_train/run_dqn_train.py

@@ -135,6 +135,7 @@ def main():
     # ---------- Physics ----------
     physics_model = UFPhysicsModel(
         phys_params=phys_params,
+        state_bounds=state_bounds,
         resistance_model_fp=res_fp,
         resistance_model_bw=res_bw,
         IS_TIMES=IS_TIMES

+ 52 - 56
models/uf-rl/uf_data_process/data_export.py

@@ -10,7 +10,7 @@ DB_HOST = "222.130.26.206"
 DB_PORT = 4000
 
 # 时间配置
-START_TIME = datetime(2025, 6, 11, 0, 0, 0)
+START_TIME = datetime(2025, 4, 1, 0, 0, 0)
 END_TIME = datetime(2026, 3, 1, 6, 0, 0)
 BOUNDARY = datetime(2025, 3, 25, 0, 0, 0)
 
@@ -28,26 +28,25 @@ DELETE_PERIODS = [(pd.to_datetime(s), pd.to_datetime(e)) for s, e in DELETE_PERI
 UNITS = [1, 2]
 
 BASE_VARIABLES = [
-    "ns=3;s={}#UF_JSFLOW_O",  # 进水流量
-    "ns=3;s={}#UF_JSPRESS_O",  # 进水压力
-    "ns=3;s=UF{}_SSD_KMYC",  # 跨膜压差
-    "ns=3;s=UF{}_STEP",  # 步序/控制字
-    "ns=3;s=ZZ_{}#UFBWB_POWER",  # 反洗泵功率
+    "AR.{}#UF_JSFLOW_O",      # 进水流量
+    "AR.{}#UF_JSPRESS_O",     # 进水压力
+    "AR.UF{}_SSD_KMYC",       # 跨膜压差
+    "AR.UF{}_STEP",           # 步序/控制字
+    "AR.ZZ_{}#UFBWB_POWER" # 反洗泵功率
 ]
 SYSTEM_VARIABLES = [
-    "ns=3;s=ZJS_TEMP_O",  # 进水温度
-    "ns=3;s=RO_JSORP_O",  # 总产水ORP
-    "ns=3;s=RO_JSPH_O",  # 总产水PH
-    "ns=3;s=RO_JSDD_O",  # 总产水电导
-    "ns=3;s=CN_LEVEL_O",  # 次钠液位
-    "ns=3;s=S_LEVEL_O",  # 酸液位
-    "ns=3;s=J_LEVEL_O",  # 碱液位
-    "ns=3;s=ZZ_UFGSB_POWER",  # 超滤供水泵功率
-
+    "AR.ZJS_TEMP_O",          # 进水温度
+    "AR.RO_JSORP_O",          # 总产水ORP
+    "AR.RO_JSPH_O",           # 总产水PH
+    "AR.RO_JSDD_O",           # 总产水电导
+    "AR.CN_LEVEL_O",       # 次钠液位
+    "AR.S_LEVEL_O",           # 酸液位
+    "AR.J_LEVEL_O",           # 碱液位
+    # "AR.ZZ_UFGSB_POWER", # 超滤供水泵功率
 ]
 
 # 输出目录
-BASE_OUTPUT_DIR = "../datasets/UF_lankao_data"
+BASE_OUTPUT_DIR = "../datasets/UF_anzhen_data"
 PROCESSED_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "raw")
 
 # 创建目录
@@ -109,7 +108,7 @@ def fetch_valve_aggregated(name, start, end, engine, interval_minutes=1):
                 h_time,
                 val,
                 FLOOR(UNIX_TIMESTAMP(h_time) / {interval_seconds}) AS time_group
-            FROM dc_item_history_data_1451
+            FROM dc_item_history_data_1181
             WHERE item_name = :name
               AND h_time BETWEEN :st AND :et
               AND val IS NOT NULL
@@ -137,7 +136,7 @@ def fetch_special_data(sensor, start, end, boundary, engine_test, engine_prod):
     """
     sql = text("""
                SELECT h_time AS time, val
-               FROM dc_item_history_data_1451
+               FROM dc_item_history_data_1181
                WHERE item_name = :name
                  AND h_time BETWEEN :st
                  AND :et
@@ -165,6 +164,7 @@ def fetch_special_data(sensor, start, end, boundary, engine_test, engine_prod):
         return pd.DataFrame()
 
 
+# ---------- 传感器数据查询函数(只获取聚合数据)----------
 # ---------- 传感器数据查询函数(只获取聚合数据)----------
 def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test, engine_prod):
     """
@@ -183,7 +183,17 @@ def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test,
 
     print(f"\n识别到 {len(special_vars)} 个离散变量, {len(continuous_vars)} 个连续变量")
 
-    all_data = []
+    # 3. 创建完整的时间网格(整分钟)- 先创建
+    print(f"\n创建完整时间网格...")
+    time_grid = pd.date_range(
+        start=start_time.replace(second=0, microsecond=0),
+        end=end_time.replace(second=0, microsecond=0),
+        freq='1min'
+    )
+
+    # 创建以时间为索引的DataFrame
+    merged_df = pd.DataFrame(index=time_grid)
+    print(f"时间网格: {len(time_grid)} 个时间点")
 
     # 1. 处理连续变量(按分钟聚合,时间对齐到整分钟)
     if continuous_vars:
@@ -201,12 +211,14 @@ def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test,
                     df = pd.concat([df1, df2], ignore_index=True)
 
                 if not df.empty:
-                    # 确保时间戳是整分钟
+                    # 确保时间戳是整分钟并设为索引
                     df['time'] = pd.to_datetime(df['time']).dt.floor('1min')
-                    # 按时间去重(同一分钟可能有多个聚合结果)
                     df = df.drop_duplicates(subset=['time'], keep='first')
-                    df_renamed = df.rename(columns={'val': sensor})
-                    all_data.append(df_renamed[['time', sensor]])
+                    df = df.set_index('time')
+
+                    # 添加到合并DataFrame
+                    merged_df[sensor] = df['val']
+                    print(f"  ✓ {sensor}: {len(df)} 条记录")
                 else:
                     print(f"  ⚠ {sensor}: 无数据")
 
@@ -214,7 +226,7 @@ def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test,
                 print(f"  ⚠ {sensor}: 处理失败 - {str(e)}")
                 continue
 
-    # 2. 处理离散变量(保持原始变化点,但标记时间
+    # 2. 处理离散变量(保持原始变化点)
     if special_vars:
         print("\n获取离散变量数据(原始变化点):")
         for sensor in special_vars:
@@ -223,12 +235,16 @@ def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test,
                 df = fetch_special_data(sensor, start_time, end_time, boundary, engine_test, engine_prod)
 
                 if not df.empty:
-                    # 将时间戳对齐到分钟,用于后续扩展
-                    df['time_min'] = df['time'].dt.floor('1min')
-                    # 重命名值列
-                    df_renamed = df.rename(columns={'val': sensor})
-                    all_data.append(df_renamed[['time_min', sensor]].rename(columns={'time_min': 'time'}))
-                    print(f"  ✓ {sensor}: {len(df)} 个变化点")
+                    # 创建分钟级的重采样
+                    df['time'] = pd.to_datetime(df['time'])
+                    df = df.set_index('time')
+
+                    # 重采样到分钟,对于离散变量使用前向填充
+                    df_resampled = df.resample('1min').ffill()
+
+                    # 添加到合并DataFrame
+                    merged_df[sensor] = df_resampled['val']
+                    print(f"  ✓ {sensor}: {len(df)} 个原始点 -> {len(df_resampled)} 个分钟点")
                 else:
                     print(f"  ⚠ {sensor}: 无数据")
 
@@ -236,35 +252,16 @@ def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test,
                 print(f"  ⚠ {sensor}: 处理失败 - {str(e)}")
                 continue
 
-    if not all_data:
+    if merged_df.empty or len(merged_df.columns) == 0:
         print("\n❌ 未获取到任何传感器数据")
         return pd.DataFrame()
 
-    # 3. 创建完整的时间网格(整分钟)
-    print(f"\n创建完整时间网格...")
-    time_grid = pd.date_range(
-        start=start_time.replace(second=0, microsecond=0),  # 对齐到秒0
-        end=end_time.replace(second=0, microsecond=0),
-        freq='1min'
-    )
-    merged_df = pd.DataFrame({'time': time_grid})
-    print(f"时间网格: {len(time_grid)} 个时间点")
-
-    # 4. 逐个合并数据
-    print(f"\n开始合并 {len(all_data)} 个传感器的数据...")
-    for df in all_data:
-        df['time'] = pd.to_datetime(df['time']).dt.floor('1min')
-        # 对于步序变量,需要先扩展填充
-        sensor_name = df.columns[1]  # 获取传感器名称
-        if sensor_name in step_vars:
-            # 步序变量:前向填充
-            merged_df = merged_df.merge(df, on='time', how='left')
-            merged_df[sensor_name] = merged_df[sensor_name].fillna(method='ffill')
-        else:
-            # 连续变量:直接合并
-            merged_df = merged_df.merge(df, on='time', how='left')
+    # 重置索引,将时间变为列
+    merged_df = merged_df.reset_index()
+    merged_df = merged_df.rename(columns={'index': 'time'})
 
-    print(f"合并完成,共 {len(merged_df)} 条时间记录")
+    print(f"\n合并完成,共 {len(merged_df)} 条时间记录 × {len(merged_df.columns) - 1} 个传感器")
+    print(f"数据框形状: {merged_df.shape}")
 
     # 5. 删除黑名单时段
     print("\n删除黑名单时段...")
@@ -281,7 +278,6 @@ def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test,
     return merged_df
 
 
-
 # ---------- 数据后处理函数(填充空值)----------
 def post_process_data(df, continuous_vars, step_vars):
     """

+ 4 - 8
models/uf-rl/uf_data_process/水厂变量名称记录.md

@@ -1,8 +1,6 @@
 # 各水厂OPC UA变量配置总览
 
 ## 当前缺失变量:
-> 安镇: 未查询到泵组功率变量,只有功率因数
->
 > 龙亭等新水岛供水泵功率命名均为总功率,需确认是否为所有超滤机组的总用电,且需要确认功率单位
 > 
 > 盐城:ns=3;s=ZZ_UFGSB_POWER命名为UF供水泵相电压总功率,需确认是否表示超滤供水泵功率
@@ -13,7 +11,7 @@
 > 
 > **数据库id**:1450
 > 
-> **数据开始时间**:2025-6-11
+> **可用数据开始时间**:2025-6-11
 
 ### UF机组变量
 ```python
@@ -44,7 +42,7 @@ SYSTEM_VARIABLES = [
 > 
 > **数据库id**:1451
 > 
-> **数据开始时间**:
+> **数据开始时间**:2025-08-17
 
 ### UF机组变量
 ```python
@@ -87,6 +85,7 @@ BASE_VARIABLES = [
     "AR.{}#UF_JSPRESS_O",     # 进水压力
     "AR.UF{}_SSD_KMYC",       # 跨膜压差
     "AR.UF{}_STEP",           # 步序/控制字
+    "AR.ZZ_{}#UFBWB_POWER" # 反洗泵功率
 ]
 SYSTEM_VARIABLES = [
     "AR.ZJS_TEMP_O",          # 进水温度
@@ -96,10 +95,7 @@ SYSTEM_VARIABLES = [
     "AR.CN_LEVEL_O",       # 次钠液位
     "AR.S_LEVEL_O",           # 酸液位
     "AR.J_LEVEL_O",           # 碱液位
-    "ns=3;s=CN_LEVEL_O", # 次钠液位
-    "ns=3;s=S_LEVEL_O", # 酸液位
-    "ns=3;s=J_LEVEL_O", # 碱液位
-    "ns=3;s=ZZ_UFGSB_POWER", # 超滤供水泵功率
+    "AR.ZZ_UFGSB_POWER", # 超滤供水泵功率
 ]
 ```