Explorar o código

1:修正CEB结束后的初始跨膜压差逻辑,改为 95变为 26后的 10分钟内的平均跨膜压差
2:PLC历史数据从数据库直查改成调用数据中心接口
3:调整模型加载逻辑

wmy hai 1 mes
pai
achega
02819fc7c0
Modificáronse 6 ficheiros con 480 adicións e 394 borrados
  1. 91 92
      DQN_decide.py
  2. 211 222
      DQN_env.py
  3. 4 4
      device_states.json
  4. BIN=BIN
      dqn_model.zip
  5. 172 74
      loop_main.py
  6. 2 2
      requirements.txt

+ 91 - 92
DQN_decide.py

@@ -4,10 +4,28 @@ from DQN_env import UFSuperCycleEnv
 from DQN_env import UFParams
 
 # 模型路径
-MODEL_PATH = "/Users/wmy/data/Pycharm_Project/jkhj/jkhj_test1/shuangmo/wmy/ultrafiltration_超滤/v2/dqn_model.zip"
+MODEL_PATH = "dqn_model.zip"
+
+# 创建环境实例以获取观察空间和动作空间
+def _get_model_spaces():
+    """获取模型的观察空间和动作空间"""
+    env = UFSuperCycleEnv(UFParams())
+    obs_space = env.observation_space
+    action_space = env.action_space
+    env.close()
+    return obs_space, action_space
 
 # 加载模型(只加载一次,提高效率)
-model = DQN.load(MODEL_PATH)
+try:
+    # 尝试直接加载
+    model = DQN.load(MODEL_PATH)
+except KeyError:
+    # 如果失败,则提供观察空间和动作空间
+    obs_space, action_space = _get_model_spaces()
+    model = DQN.load(MODEL_PATH, custom_objects={
+        'observation_space': obs_space,
+        'action_space': action_space
+    })
 
 def run_uf_DQN_decide(uf_params, TMP0_value: float):
     """
@@ -29,25 +47,25 @@ def run_uf_DQN_decide(uf_params, TMP0_value: float):
     # 3. 获取归一化状态
     obs = env._get_obs().reshape(1, -1)
 
-    # 4. 模型预测动作 
+    # 4. 模型预测动作
     action, _ = model.predict(obs, deterministic=True)
 
-    # 5. 解析动作对应的 L_s 和 t_bw_s 对应过滤时长和物洗时长
-    L_s, t_bw_s = env._get_action_values(action[0]) 
+    # 5. 解析动作对应的 L_s 和 t_bw_s
+    L_s, t_bw_s = env._get_action_values(action[0])
 
-    # 6. 在环境中执行该动作 执行一个超级周期 
-    next_obs, reward, terminated, truncated, info = env.step(action[0]) 
+    # 6. 在环境中执行该动作
+    next_obs, reward, terminated, truncated, info = env.step(action[0])
 
     # 7. 整理结果
     result = {
-        "action": int(action[0]),  # 动作       
-        "L_s": float(L_s), # 过滤时长
-        "t_bw_s": float(t_bw_s), # 物洗时长
-        "next_obs": next_obs, # 新状态
-        "reward": reward, # 奖励
-        "terminated": terminated, # 是否终止
-        "truncated": truncated, # 是否截断
-        "info": info # 信息 可行性、步数、奖励等
+        "action": int(action[0]),
+        "L_s": float(L_s),
+        "t_bw_s": float(t_bw_s),
+        "next_obs": next_obs,
+        "reward": reward,
+        "terminated": terminated,
+        "truncated": truncated,
+        "info": info
     }
 
     # 8. 关闭环境
@@ -65,18 +83,18 @@ def generate_plc_instructions(current_L_s, current_t_bw_s, model_prev_L_s, model
     """
     # 参数配置保持不变
     params = UFParams(
-        L_min_s=3600.0, L_max_s=4200.0, L_step_s=60.0,
-        t_bw_min_s=90.0, t_bw_max_s=100.0, t_bw_step_s=2.0,
+        L_min_s=3600.0, L_max_s=6000.0, L_step_s=60.0,
+        t_bw_min_s=40.0, t_bw_max_s=60.0, t_bw_step_s=2.0,
     )
 
     # 参数解包
-    L_step_s = params.L_step_s # 过滤时长步长
-    t_bw_step_s = params.t_bw_step_s # 物洗时长步长
-    L_min_s = params.L_min_s # 过滤时长下限
-    L_max_s = params.L_max_s # 过滤时长上限
-    t_bw_min_s = params.t_bw_min_s # 物洗时长下限
-    t_bw_max_s = params.t_bw_max_s # 物洗时长上限
-    adjustment_threshold = 1.0 # 调整阈值
+    L_step_s = params.L_step_s
+    t_bw_step_s = params.t_bw_step_s
+    L_min_s = params.L_min_s
+    L_max_s = params.L_max_s
+    t_bw_min_s = params.t_bw_min_s
+    t_bw_max_s = params.t_bw_max_s
+    adjustment_threshold = 1.0
 
     # 处理None值情况
     if model_prev_L_s is None:
@@ -85,7 +103,7 @@ def generate_plc_instructions(current_L_s, current_t_bw_s, model_prev_L_s, model
             return None, None
         else:
             # 使用工厂当前值作为基准
-            effective_current_L = current_L_s 
+            effective_current_L = current_L_s
             source_L = "工厂当前值(模型上一轮值为None)"
     else:
         # 模型上一轮值不为None,继续检查工厂当前值
@@ -93,16 +111,8 @@ def generate_plc_instructions(current_L_s, current_t_bw_s, model_prev_L_s, model
             effective_current_L = model_prev_L_s
             source_L = "模型上一轮值(工厂当前值为None)"
         else:
-            # 两个值都不为None,比较哪个更接近模型当前建议值
-            current_to_model_diff = abs(current_L_s - model_L_s)
-            prev_to_model_diff = abs(model_prev_L_s - model_L_s)
-
-            if current_to_model_diff <= prev_to_model_diff:
-                effective_current_L = current_L_s
-                source_L = "工厂当前值"
-            else:
-                effective_current_L = model_prev_L_s
-                source_L = "模型上一轮值"
+            effective_current_L = model_prev_L_s
+            source_L = "模型上一轮值"
 
     # 对反洗时长进行同样的处理
     if model_prev_t_bw_s is None:
@@ -117,15 +127,8 @@ def generate_plc_instructions(current_L_s, current_t_bw_s, model_prev_L_s, model
             effective_current_t_bw = model_prev_t_bw_s
             source_t_bw = "模型上一轮值(工厂当前值为None)"
         else:
-            current_to_model_t_bw_diff = abs(current_t_bw_s - model_t_bw_s)
-            prev_to_model_t_bw_diff = abs(model_prev_t_bw_s - model_t_bw_s)
-
-            if current_to_model_t_bw_diff <= prev_to_model_t_bw_diff:
-                effective_current_t_bw = current_t_bw_s
-                source_t_bw = "工厂当前值"
-            else:
-                effective_current_t_bw = model_prev_t_bw_s
-                source_t_bw = "模型上一轮值"
+            effective_current_t_bw = model_prev_t_bw_s
+            source_t_bw = "模型上一轮值"
 
     # 检测所有输入值是否在规定范围内(只对非None值进行检查)
     # 工厂当前值检查(警告)
@@ -155,25 +158,25 @@ def generate_plc_instructions(current_L_s, current_t_bw_s, model_prev_L_s, model
     print(f"反洗时长基准: {source_t_bw}, 值: {effective_current_t_bw}")
 
     # 使用选定的基准值进行计算调整
-    L_diff = model_L_s - effective_current_L # 过滤时长差值
+    L_diff = model_L_s - effective_current_L
     L_adjustment = 0
-    if abs(L_diff) > adjustment_threshold * L_step_s: # 如果过滤时长差值超过调整阈值
-        if L_diff > 0: 
-            L_adjustment = L_step_s # 增加过滤时长步长
+    if abs(L_diff) >= adjustment_threshold * L_step_s:
+        if L_diff >= 0:
+            L_adjustment = L_step_s
         else:
-            L_adjustment = -L_step_s # 减少过滤时长步长
-    next_L_s = effective_current_L + L_adjustment # 调整后的过滤时长
+            L_adjustment = -L_step_s
+    next_L_s = effective_current_L + L_adjustment
 
-    t_bw_diff = model_t_bw_s - effective_current_t_bw # 反洗时长差值
+    t_bw_diff = model_t_bw_s - effective_current_t_bw
     t_bw_adjustment = 0
-    if abs(t_bw_diff) > adjustment_threshold * t_bw_step_s: # 如果反洗时长差值超过调整阈值 调整阈值 * 反洗时长步长
-        if t_bw_diff > 0:
-            t_bw_adjustment = t_bw_step_s # 增加反洗时长步长
+    if abs(t_bw_diff) >= adjustment_threshold * t_bw_step_s:
+        if t_bw_diff >= 0:
+            t_bw_adjustment = t_bw_step_s
         else:
-            t_bw_adjustment = -t_bw_step_s # 减少反洗时长步长
-    next_t_bw_s = effective_current_t_bw + t_bw_adjustment # 调整后的反洗时长
+            t_bw_adjustment = -t_bw_step_s
+    next_t_bw_s = effective_current_t_bw + t_bw_adjustment
 
-    return next_L_s, next_t_bw_s 
+    return next_L_s, next_t_bw_s
 
 
 from DQN_env import simulate_one_supercycle
@@ -196,42 +199,36 @@ def calc_uf_cycle_metrics(p, TMP0, max_tmp_during_filtration, min_tmp_during_fil
             "max_permeability": 全周期最高渗透率(lmh/bar)
         }
     """
-    # 将跨膜压差写入参数 
+    # 将跨膜压差写入参数
     p.TMP0 = TMP0
 
-    # 模拟该参数下的超级周期 
+    # 模拟该参数下的超级周期
     feasible, info = simulate_one_supercycle(p, L_s, t_bw_s)
 
-    # 获得模型模拟周期信息  
-    k_bw_per_ceb = info["k_bw_per_ceb"] # 超级周期内CEB次数 
-    ton_water_energy_kWh_per_m3 = info["ton_water_energy_kWh_per_m3"] # 吨水电耗
-    recovery = info["recovery"] # 回收率
-    net_delivery_rate_m3ph = info["net_delivery_rate_m3ph"] # 净供水率
-    daily_prod_time_h = info["daily_prod_time_h"] # 日均产水时间
-
-    # # 获得模型模拟周期内最高跨膜压差/最低跨膜压差
-    # if max_tmp_during_filtration is None:
-    #     max_tmp_during_filtration = info["max_TMP_during_filtration"]
-    # if min_tmp_during_filtration is None:
-    #     min_tmp_during_filtration = info["min_TMP_during_filtration"]
+    # 获得模型模拟周期信息
+    k_bw_per_ceb = info["k_bw_per_ceb"]
+    ton_water_energy_kWh_per_m3 = info["ton_water_energy_kWh_per_m3"]
+    recovery = info["recovery"]
+    net_delivery_rate_m3ph = info["net_delivery_rate_m3ph"]
+    daily_prod_time_h = info["daily_prod_time_h"]
 
-    # 获取模拟周期内最大/最小跨膜压差(如果未传入,则从 info 中获取)
-    max_tmp_during_filtration = max_tmp_during_filtration if max_tmp_during_filtration is not None else info.get(
-        "max_TMP_during_filtration", None) # 过滤时段TMP峰值
-    min_tmp_during_filtration = min_tmp_during_filtration if min_tmp_during_filtration is not None else info.get(
-        "min_TMP_during_filtration", None) # 过滤时段TMP最低值
+    # 获得模型模拟周期内最高跨膜压差/最低跨膜压差
+    if max_tmp_during_filtration is None:
+        max_tmp_during_filtration = info["max_TMP_during_filtration"]
+    if min_tmp_during_filtration is None:
+        min_tmp_during_filtration = info["min_TMP_during_filtration"]
 
-    # 计算最高渗透率 最高渗透率 = 100 * 过滤流量 / (128*40) / 过滤时段TMP最低值
-    max_permeability = 100 * p.q_UF / (128*40) / min_tmp_during_filtration 
+    # 计算最高渗透率
+    max_permeability = 100 * p.q_UF / (128*40) / min_tmp_during_filtration
 
 
     return {
-        "k_bw_per_ceb": k_bw_per_ceb, # 超级周期内CEB次数       
-        "ton_water_energy_kWh_per_m3": ton_water_energy_kWh_per_m3, # 吨水电耗
-        "recovery": recovery, # 回收率
-        "net_delivery_rate_m3ph": net_delivery_rate_m3ph, # 净供水率
-        "daily_prod_time_h": daily_prod_time_h, # 日均产水时间
-        "max_permeability": max_permeability # 最高渗透率
+        "k_bw_per_ceb": k_bw_per_ceb,
+        "ton_water_energy_kWh_per_m3": ton_water_energy_kWh_per_m3,
+        "recovery": recovery,
+        "net_delivery_rate_m3ph": net_delivery_rate_m3ph,
+        "daily_prod_time_h": daily_prod_time_h,
+        "max_permeability": max_permeability
     }
 
 
@@ -240,23 +237,25 @@ def calc_uf_cycle_metrics(p, TMP0, max_tmp_during_filtration, min_tmp_during_fil
 # ==============================
 if __name__ == "__main__":
     uf_params = UFParams()
-    TMP0 = 0.03  # 原始 TMP0
+    TMP0 = 0.03 # 原始 TMP0
     model_decide_result = run_uf_DQN_decide(uf_params, TMP0) # 调用模型获得动作
     model_L_s = model_decide_result['L_s'] # 获得模型决策产水时长
     model_t_bw_s = model_decide_result['t_bw_s'] # 获得模型决策反洗时长
 
     current_L_s = 3800
-    current_t_bw_s = 100
-    model_prev_L_s = None
-    model_prev_t_bw_s = None
+    current_t_bw_s = 40
+    model_prev_L_s = 4040
+    model_prev_t_bw_s = 60
     L_s, t_bw_s = generate_plc_instructions(current_L_s, current_t_bw_s, model_prev_L_s, model_prev_t_bw_s, model_L_s, model_t_bw_s) # 获取模型下发指令
 
-    max_tmp_during_filtration = None # 新增工厂数据接口:周期最高/最低跨膜压差,无工厂数据接入时传入None,calc_uf_cycle_metrics()自动获取模拟周期中的跨膜压差最值
-    min_tmp_during_filtration = None
-    execution_result = calc_uf_cycle_metrics(uf_params, TMP0, max_tmp_during_filtration, min_tmp_during_filtration, model_L_s, model_t_bw_s)
+    L_s = 4100
+    t_bw_s = 96
+    max_tmp_during_filtration = 0.050176 # 新增工厂数据接口:周期最高/最低跨膜压差,无工厂数据接入时传入None,calc_uf_cycle_metrics()自动获取模拟周期中的跨膜压差最值
+    min_tmp_during_filtration = 0.012496
+    execution_result = calc_uf_cycle_metrics(uf_params, TMP0, max_tmp_during_filtration, min_tmp_during_filtration, L_s, t_bw_s)
     print("\n===== 单步决策结果 =====")
     print(f"模型选择的动作: {model_decide_result['action']}")
-    print(f"模型选择的L_s: {model_decide_result['L_s']} 秒, 模型选择的t_bw_s: {model_decide_result['t_bw_s']} 秒")
+    print(f"模型选择的L_s: {model_L_s} 秒, 模型选择的t_bw_s: {model_t_bw_s} 秒")
     print(f"指令下发的L_s: {L_s} 秒, 指令下发的t_bw_s: {t_bw_s} 秒")
     print(f"指令对应的反洗次数: {execution_result['k_bw_per_ceb']}")
     print(f"指令对应的吨水电耗: {execution_result['ton_water_energy_kWh_per_m3']}")

+ 211 - 222
DQN_env.py

@@ -42,25 +42,25 @@ class UFParams:
 
     # —— 搜索范围(秒) ——
     L_min_s: float = 3800.0  # 过滤时长下限(s)
-    L_max_s: float = 4200.0  # 过滤时长上限(s)
-    t_bw_min_s: float = 90.0  # 物洗时长下限(s)
-    t_bw_max_s: float = 100.0  # 物洗时长上限(s)
+    L_max_s: float = 6000.0  # 过滤时长上限(s)
+    t_bw_min_s: float = 40.0  # 物洗时长下限(s)
+    t_bw_max_s: float = 60.0  # 物洗时长上限(s)
 
     # —— 物理反洗恢复函数参数 ——
     phi_bw_min: float = 0.7  # 物洗去除比例最小值
     phi_bw_max: float = 1.0  # 物洗去除比例最大值
     L_ref_s: float = 4000.0  # 过滤时长影响时间尺度
-    tau_bw_s: float = 30.0  # 物洗时长影响时间尺度
+    tau_bw_s: float = 20.0  # 物洗时长影响时间尺度
     gamma_t: float = 1.0  # 物洗时长作用指数
 
     # —— 网格 ——
     L_step_s: float = 60.0  # 过滤时长步长(s)
-    t_bw_step_s: float = 2.0  # 物洗时长步长(s)
+    t_bw_step_s: float = 5.0  # 物洗时长步长(s)
 
     # 多目标加权及高TMP惩罚
     w_rec: float = 0.8  # 回收率权重
     w_rate: float = 0.2  # 净供水率权重
-    w_headroom: float = 0.3  # 贴边惩罚权重
+    w_headroom: float = 0.2  # 贴边惩罚权重
     r_headroom: float = 2.0  # 贴边惩罚幂次
     headroom_hardcap: float = 0.98  # 超过此比例直接视为不可取
 
@@ -75,28 +75,28 @@ class DQNParams:
     learning_rate: float = 1e-4
 
     # 经验回放缓冲区大小(步数)
-    buffer_size: int = 2000
+    buffer_size: int = 10000
 
     # 学习开始前需要收集的步数
     learning_starts: int = 200
 
     # 每次从经验池中采样的样本数量
-    batch_size: int = 16
+    batch_size: int = 32
 
     # 折扣因子,越接近1越重视长期奖励
     gamma: float = 0.95
 
     # 每隔多少步训练一次
-    train_freq: int = 1
+    train_freq: int = 4
 
     # 目标网络更新间隔
-    target_update_interval: int = 1000
+    target_update_interval: int = 2000
 
     # 初始探索率 ε
     exploration_initial_eps: float = 1.0
 
     # 从初始ε衰减到最终ε所占的训练比例
-    exploration_fraction: float = 0.6
+    exploration_fraction: float = 0.3
 
     # 最终探索率 ε
     exploration_final_eps: float = 0.02
@@ -147,17 +147,19 @@ def simulate_one_supercycle(p: UFParams, L_s: float, t_bw_s: float):
     返回 (是否可行, 指标字典)
     - 支持动态CEB次数:48h固定间隔
     - 增加日均产水时间和吨水电耗
+    - 增加最小TMP记录
     """
     L_h = float(L_s) / 3600.0  # 小周期过滤时间(h)
 
     tmp = p.TMP0
     max_tmp_during_filtration = tmp
+    min_tmp_during_filtration = tmp  # 新增:初始化最小TMP
     max_residual_increase = 0.0
 
-    # 小周期总时长(h) 小周期总时长 = 过滤时长 + 物洗时长
+    # 小周期总时长(h)
     t_small_cycle_h = (L_s + t_bw_s) / 3600.0
 
-    # 计算超级周期内CEB次数 超级周期内CEB次数 = 48h / 小周期总时长
+    # 计算超级周期内CEB次数
     k_bw_per_ceb = int(np.floor(p.T_ceb_interval_h / t_small_cycle_h))
     if k_bw_per_ceb < 1:
         k_bw_per_ceb = 1  # 至少一个小周期
@@ -172,192 +174,209 @@ def simulate_one_supercycle(p: UFParams, L_s: float, t_bw_s: float):
     for _ in range(k_bw_per_ceb):
         tmp_run_start = tmp
 
-        # 过滤阶段TMP增长 
+        # 过滤阶段TMP增长
         dtmp = _delta_tmp(p, L_h)
-        tmp_peak = tmp_run_start + dtmp # 过滤阶段TMP峰值 = 过滤阶段TMP开始值 + 过滤阶段TMP上升量
+        tmp_peak = tmp_run_start + dtmp
 
         # 约束1:峰值不得超过硬上限
-        if tmp_peak > p.TMP_max + 1e-12: 
+        if tmp_peak > p.TMP_max + 1e-12:
             return False, {"reason": "TMP_max violated during filtration", "TMP_peak": tmp_peak}
 
-        if tmp_peak > max_tmp_during_filtration: # 如果过滤阶段TMP峰值超过当前最大值
-            max_tmp_during_filtration = tmp_peak # 更新最大值
+        # 更新最大和最小TMP
+        if tmp_peak > max_tmp_during_filtration:
+            max_tmp_during_filtration = tmp_peak
+        if tmp_run_start < min_tmp_during_filtration:  # 新增:记录运行开始时的最小TMP
+            min_tmp_during_filtration = tmp_run_start
 
         # 物理反洗
-        phi = phi_bw_of(p, L_s, t_bw_s) # 物洗去除比例
-        tmp_after_bw = tmp_peak - phi * (tmp_peak - tmp_run_start) # 物理反洗后TMP = 过滤阶段TMP峰值 - 物洗去除比例 * (过滤阶段TMP峰值 - 过滤阶段TMP开始值)
+        phi = phi_bw_of(p, L_s, t_bw_s)
+        tmp_after_bw = tmp_peak - phi * (tmp_peak - tmp_run_start)
 
         # 约束2:单次残余增量控制
-        residual_inc = tmp_after_bw - tmp_run_start # 单次残余增量 = 物理反洗后TMP - 过滤阶段TMP开始值
-        if residual_inc > p.dTMP + 1e-12: # 如果单次残余增量超过单次残余增量上限
+        residual_inc = tmp_after_bw - tmp_run_start
+        if residual_inc > p.dTMP + 1e-12:
             return False, {
-                "reason": "residual TMP increase after BW exceeded dTMP", # 返回不可行
-                "residual_increase": residual_inc, # 单次残余增量
+                "reason": "residual TMP increase after BW exceeded dTMP",
+                "residual_increase": residual_inc,
                 "limit_dTMP": p.dTMP
             }
-        if residual_inc > max_residual_increase: # 如果单次残余增量超过当前最大值   
-            max_residual_increase = residual_inc # 更新最大值
+        if residual_inc > max_residual_increase:
+            max_residual_increase = residual_inc
 
-        tmp = tmp_after_bw # 更新TMP
+        tmp = tmp_after_bw
 
     # CEB
-    tmp_after_ceb = p.TMP0 # 化学反洗后TMP
+    tmp_after_ceb = p.TMP0
 
     # 体积与回收率
-    V_feed_super = k_bw_per_ceb * p.q_UF * L_h # 进水体积 进水体积 = 超级周期内CEB次数 * 过滤流量 * 过滤时长 
-    V_loss_super = k_bw_per_ceb * _v_bw_m3(p, t_bw_s) + p.v_ceb_m3 # 损失体积 损失体积 = 物洗体积 + CEB用水体积
-    V_net = max(0.0, V_feed_super - V_loss_super) # 净产水体积 净产水体积 = 进水体积 - 损失体积
-    recovery = max(0.0, V_net / max(V_feed_super, 1e-12)) # 回收率 净产水体积 / 进水体积
+    V_feed_super = k_bw_per_ceb * p.q_UF * L_h
+    V_loss_super = k_bw_per_ceb * _v_bw_m3(p, t_bw_s) + p.v_ceb_m3
+    V_net = max(0.0, V_feed_super - V_loss_super)
+    recovery = max(0.0, V_net / max(V_feed_super, 1e-12))
 
     # 时间与净供水率
-    T_super_h = k_bw_per_ceb * (L_s + t_bw_s) / 3600.0 + p.t_ceb_s / 3600.0 # 超循环时间 超循环时间 = 超级周期内CEB次数 * (过滤时长 + 物洗时长) / 3600 + CEB时长 / 3600
-    net_delivery_rate_m3ph = V_net / max(T_super_h, 1e-12) # 净供水率 净产水体积 / 超循环时间 
+    T_super_h = k_bw_per_ceb * (L_s + t_bw_s) / 3600.0 + p.t_ceb_s / 3600.0
+    net_delivery_rate_m3ph = V_net / max(T_super_h, 1e-12)
 
     # 贴边比例与硬限
-    headroom_ratio = max_tmp_during_filtration / max(p.TMP_max, 1e-12) # 贴边比例 过滤时段TMP峰值 / 硬上限 
-    if headroom_ratio > p.headroom_hardcap + 1e-12: # 如果贴边比例超过硬上限 
-        return False, {"reason": "headroom hardcap exceeded", "headroom_ratio": headroom_ratio} # 返回不可行
+    headroom_ratio = max_tmp_during_filtration / max(p.TMP_max, 1e-12)
+    if headroom_ratio > p.headroom_hardcap + 1e-12:
+        return False, {"reason": "headroom hardcap exceeded", "headroom_ratio": headroom_ratio}
 
     # —— 新增指标 1:日均产水时间(h/d) ——
-    daily_prod_time_h = k_bw_per_ceb * L_h / T_super_h * 24.0 # 日均产水时间 日均产水时间 = 超级周期内CEB次数 * 过滤时长 / 超循环时间 * 24
+    daily_prod_time_h = k_bw_per_ceb * L_h / T_super_h * 24.0
 
     # —— 新增指标 2:吨水电耗(kWh/m³) ——
-    closest_L = min(energy_lookup.keys(), key=lambda x: abs(x - L_s)) # 最接近的过滤时长
-    ton_water_energy = energy_lookup[closest_L] # 吨水电耗 最接近的过滤时长对应的吨水电耗 
+    closest_L = min(energy_lookup.keys(), key=lambda x: abs(x - L_s))
+    ton_water_energy = energy_lookup[closest_L]
 
     info = {
-        "recovery": recovery, # 回收率 
-        "V_feed_super_m3": V_feed_super, # 进水体积 
-        "V_loss_super_m3": V_loss_super, # 损失体积 
-        "V_net_super_m3": V_net, # 净产水体积 
-        "supercycle_time_h": T_super_h, # 超循环时间 
-        "net_delivery_rate_m3ph": net_delivery_rate_m3ph, # 净供水率 
-        "max_TMP_during_filtration": max_tmp_during_filtration, # 过滤时段TMP峰值 
-        "max_residual_increase_per_run": max_residual_increase, # 单次残余增量最大值 
-        "phi_bw_effective": phi, # 物洗去除比例 
-        "TMP_after_ceb": tmp_after_ceb, # 物理反洗后TMP 
-        "headroom_ratio": headroom_ratio, # 贴边比例 
-        "daily_prod_time_h": daily_prod_time_h, # 日均产水时间 
-        "ton_water_energy_kWh_per_m3": ton_water_energy, # 吨水电耗 
-        "k_bw_per_ceb": k_bw_per_ceb # 超级周期内CEB次数 
+        "recovery": recovery,
+        "V_feed_super_m3": V_feed_super,
+        "V_loss_super_m3": V_loss_super,
+        "V_net_super_m3": V_net,
+        "supercycle_time_h": T_super_h,
+        "net_delivery_rate_m3ph": net_delivery_rate_m3ph,
+        "max_TMP_during_filtration": max_tmp_during_filtration,
+        "min_TMP_during_filtration": min_tmp_during_filtration,  # 新增:最小TMP
+        "max_residual_increase_per_run": max_residual_increase,
+        "phi_bw_effective": phi,
+        "TMP_after_ceb": tmp_after_ceb,
+        "headroom_ratio": headroom_ratio,
+        "daily_prod_time_h": daily_prod_time_h,
+        "ton_water_energy_kWh_per_m3": ton_water_energy,
+        "k_bw_per_ceb": k_bw_per_ceb
     }
 
     return True, info
 
 def _score(p: UFParams, rec: dict) -> float:
-    """综合评分:越大越好。不同TMP0会改变max_TMP→改变惩罚→得到不同解。"""
-    # 无量纲化净供水率
-    rate_norm = rec["net_delivery_rate_m3ph"] / max(p.q_UF, 1e-12) # 无量纲化净供水率 净供水率 / 过滤流量 1000m3/h / 360m3/h = 2.7778
-    headroom_penalty = (rec["max_TMP_during_filtration"] / max(p.TMP_max, 1e-12)) ** p.r_headroom # 贴边惩罚
-    reward = (p.w_rec * rec["recovery"] + p.w_rate * rate_norm - p.w_headroom * headroom_penalty) # 奖励
-    return reward
+    """综合评分:越大越好。通过非线性放大奖励差异,强化区分好坏动作"""
+
+    # —— 无量纲化净供水率 ——
+    rate_norm = rec["net_delivery_rate_m3ph"] / max(p.q_UF, 1e-12)
+
+    # —— TMP soft penalty (sigmoid) ——
+    tmp_ratio = rec["max_TMP_during_filtration"] / max(p.TMP_max, 1e-12)
+    k = 10.0
+    headroom_penalty = 1.0 / (1.0 + np.exp(-k * (tmp_ratio - 1.0)))
+
+    # —— 基础 reward(0.6~0.9左右)——
+    base_reward = (
+        p.w_rec * rec["recovery"]
+        + p.w_rate * rate_norm
+        - p.w_headroom * headroom_penalty
+    )
+
+    # —— 非线性放大:平方映射 + 缩放 ——
+    # 目的是放大好坏动作差异,同时限制最大值,避免 TD-error 过大
+    amplified_reward = (base_reward - 0.5) ** 2 * 5.0
+
+    # —— 可选:保留符号,区分负奖励
+    if base_reward < 0.5:
+        amplified_reward = -amplified_reward
+
+    return amplified_reward
+
 
 def set_global_seed(seed: int):
     """固定全局随机种子,保证训练可复现"""
-    random.seed(seed) # 随机种子  
-    np.random.seed(seed) # 随机种子
-    torch.manual_seed(seed) # 随机种子
+    random.seed(seed)
+    np.random.seed(seed)
+    torch.manual_seed(seed)
     torch.cuda.manual_seed_all(seed)  # 如果使用GPU
-    torch.backends.cudnn.deterministic = True # 确定性
-    torch.backends.cudnn.benchmark = False # 不使用GPU
+    torch.backends.cudnn.deterministic = True
+    torch.backends.cudnn.benchmark = False
 
 class UFSuperCycleEnv(gym.Env):
     """超滤系统环境(超级周期级别决策)"""
 
     metadata = {"render_modes": ["human"]}
 
-    def __init__(self, base_params, max_episode_steps: int = 10):
-        super(UFSuperCycleEnv, self).__init__() # 初始化环境
+    def __init__(self, base_params, max_episode_steps: int = 20):
+        super(UFSuperCycleEnv, self).__init__()
 
-        self.base_params = base_params # UFParams 实例
-        self.current_params = copy.deepcopy(base_params) # UFParams 实例
-        self.max_episode_steps = max_episode_steps # 最大步数
-        self.current_step = 0 # 当前步数
+        self.base_params = base_params
+        self.current_params = copy.deepcopy(base_params)
+        self.max_episode_steps = max_episode_steps
+        self.current_step = 0
 
         # 计算离散动作空间
         self.L_values = np.arange(
-            self.base_params.L_min_s, # 过滤时长下限
-            self.base_params.L_max_s + self.base_params.L_step_s, # 过滤时长上限
-            self.base_params.L_step_s # 过滤时长步长
+            self.base_params.L_min_s,
+            self.base_params.L_max_s + self.base_params.L_step_s,
+            self.base_params.L_step_s
         )
         self.t_bw_values = np.arange(
-            self.base_params.t_bw_min_s, # 物洗时长下限
-            self.base_params.t_bw_max_s + self.base_params.t_bw_step_s, # 物洗时长上限
-            self.base_params.t_bw_step_s # 物洗时长步长
+            self.base_params.t_bw_min_s,
+            self.base_params.t_bw_max_s + self.base_params.t_bw_step_s,
+            self.base_params.t_bw_step_s
         )
 
-        self.num_L = len(self.L_values) # 过滤时长步数
-        self.num_bw = len(self.t_bw_values) # 物洗时长步数
+        self.num_L = len(self.L_values)
+        self.num_bw = len(self.t_bw_values)
 
-        # 单一离散动作空间,spaces.Discrete(n) 定义了一个包含 n 个离散动作或观测值的空间。这个空间包含从 0 到 n-1 的整数值
-        self.action_space = spaces.Discrete(self.num_L * self.num_bw) # 动作空间,离散动作空间
+        # 单一离散动作空间
+        self.action_space = spaces.Discrete(self.num_L * self.num_bw)
 
-        # 状态空间:归一化的[TMP0], 用于定义 连续的空间,通常用于表示那些具有连续值的观测空间或动作空间
+        # 状态空间增加 TMP0, 上一次动作(L_s, t_bw_s), 本周期最高 TMP
+        # 状态归一化均在 _get_obs 内处理
         self.observation_space = spaces.Box(
-            low=np.array([0.0], dtype=np.float32),  # 单一维度,只有TMP0
-            high=np.array([1.0], dtype=np.float32),  # 单一维度,只有TMP0
-            dtype=np.float32,
-            shape=(1,)  # 明确指定形状为1维
+            low=np.zeros(4, dtype=np.float32),
+            high=np.ones(4, dtype=np.float32),
+            dtype=np.float32
         )
 
         # 初始化状态
-        self.reset(seed=None) # 重置环境
+        self.last_action = (self.base_params.L_min_s, self.base_params.t_bw_min_s)
+        self.max_TMP_during_filtration = self.current_params.TMP0
+        self.reset(seed=None)
 
     def _get_obs(self):
-        # 原始状态
-        TMP0 = self.current_params.TMP0 
-        # 状态归一化
-        TMP0_norm = (TMP0 - 0.01) / (0.05 - 0.01) 
+        TMP0 = self.current_params.TMP0
+        TMP0_norm = (TMP0 - 0.01) / (0.05 - 0.01)
+
+        L_s, t_bw_s = self.last_action
+        L_norm = (L_s - self.base_params.L_min_s) / (self.base_params.L_max_s - self.base_params.L_min_s)
+        t_bw_norm = (t_bw_s - self.base_params.t_bw_min_s) / (self.base_params.t_bw_max_s - self.base_params.t_bw_min_s)
 
-        return np.array([TMP0_norm], dtype=np.float32) # 状态
+        max_TMP_norm = (self.max_TMP_during_filtration - 0.01) / (0.05 - 0.01)
+
+        return np.array([TMP0_norm, L_norm, t_bw_norm, max_TMP_norm], dtype=np.float32)
 
     def _get_action_values(self, action):
-        """解析离散动作"""
-        L_idx = action // self.num_bw # 过滤时长索引
-        t_bw_idx = action % self.num_bw # 物洗时长索引
-        return self.L_values[L_idx], self.t_bw_values[t_bw_idx] # 动作
+        L_idx = action // self.num_bw
+        t_bw_idx = action % self.num_bw
+        return self.L_values[L_idx], self.t_bw_values[t_bw_idx]
 
     def reset(self, seed=None, options=None):
-        """重置环境"""
         super().reset(seed=seed)
-
-        # 随机初始化 TMP0
-        self.current_params.TMP0 = np.random.uniform(0.01, 0.05) 
-        # 初始化步数
+        self.current_params.TMP0 = np.random.uniform(0.01, 0.03)
         self.current_step = 0
-
-        return self._get_obs(), {}  # Gymnasium要求返回(obs, info)
+        self.last_action = (self.base_params.L_min_s, self.base_params.t_bw_min_s)
+        self.max_TMP_during_filtration = self.current_params.TMP0
+        return self._get_obs(), {}
 
     def step(self, action):
-        """执行一个超级周期"""
         self.current_step += 1
-
-        # 解析动作 对应过滤时长和物洗时长
-        L_s, t_bw_s = self._get_action_values(action) 
-
-        #  确保过滤时长和物洗时长在范围内  np.clip:限制在范围内
-        L_s = np.clip(L_s, self.base_params.L_min_s, self.base_params.L_max_s)  
-        t_bw_s = np.clip(t_bw_s, self.base_params.t_bw_min_s, self.base_params.t_bw_max_s) 
-
-        # 记录当前状态 归一化状态
-        current_obs = self._get_obs()
+        L_s, t_bw_s = self._get_action_values(action)
+        L_s = np.clip(L_s, self.base_params.L_min_s, self.base_params.L_max_s)
+        t_bw_s = np.clip(t_bw_s, self.base_params.t_bw_min_s, self.base_params.t_bw_max_s)
 
         # 模拟超级周期
         feasible, info = simulate_one_supercycle(self.current_params, L_s, t_bw_s)
 
-        # 计算奖励
         if feasible:
-            reward = _score(self.current_params, info) 
+            reward = _score(self.current_params, info)
             self.current_params.TMP0 = info["TMP_after_ceb"]
+            self.max_TMP_during_filtration = info["max_TMP_during_filtration"]
             terminated = False
         else:
             reward = -20
             terminated = True
 
-        # 检查是否达到最大步数
         truncated = self.current_step >= self.max_episode_steps
-
-        # 获取新状态
+        self.last_action = (L_s, t_bw_s)
         next_obs = self._get_obs()
 
         info["feasible"] = feasible
@@ -370,39 +389,39 @@ class UFEpisodeRecorder:
     """记录episode中的决策和结果"""
 
     def __init__(self):
-        self.episode_data = [] # 记录episode中的决策和结果
+        self.episode_data = []
         self.current_episode = []
 
     def record_step(self, obs, action, reward, done, info):
         """记录一步"""
         step_data = {
-            "obs": obs.copy(), # 新状态         
-            "action": action.copy(), # 动作
-            "reward": reward, # 奖励
-            "done": done, # 是否终止
-            "info": info.copy() if info else {} # 信息
+            "obs": obs.copy(),
+            "action": action.copy(),
+            "reward": reward,
+            "done": done,
+            "info": info.copy() if info else {}
         }
-        self.current_episode.append(step_data) # 记录episode中的决策和结果
+        self.current_episode.append(step_data)
 
         if done:
-            self.episode_data.append(self.current_episode) # 记录episode中的决策和结果
-            self.current_episode = [] 
+            self.episode_data.append(self.current_episode)
+            self.current_episode = []
 
     def get_episode_stats(self, episode_idx=-1):
         """获取episode统计信息"""
         if not self.episode_data:
             return {}
 
-        episode = self.episode_data[episode_idx] # 记录episode中的决策和结果
-        total_reward = sum(step["reward"] for step in episode) # 总奖励         
-        avg_recovery = np.mean([step["info"].get("recovery", 0) for step in episode if "recovery" in step["info"]]) # 平均回收率
-        feasible_steps = sum(1 for step in episode if step["info"].get("feasible", False)) # 可行的步数
+        episode = self.episode_data[episode_idx]
+        total_reward = sum(step["reward"] for step in episode)
+        avg_recovery = np.mean([step["info"].get("recovery", 0) for step in episode if "recovery" in step["info"]])
+        feasible_steps = sum(1 for step in episode if step["info"].get("feasible", False))
 
         return {
-            "total_reward": total_reward, # 总奖励
-            "avg_recovery": avg_recovery, # 平均回收率
-            "feasible_steps": feasible_steps, # 可行的步数
-            "total_steps": len(episode) # 总步数
+            "total_reward": total_reward,
+            "avg_recovery": avg_recovery,
+            "feasible_steps": feasible_steps,
+            "total_steps": len(episode)
         }
 
 
@@ -416,23 +435,23 @@ class UFTrainingCallback(BaseCallback):
     """
 
     def __init__(self, recorder, verbose=0):
-        super(UFTrainingCallback, self).__init__(verbose) 
-        self.recorder = recorder 
+        super(UFTrainingCallback, self).__init__(verbose)
+        self.recorder = recorder
 
     def _on_step(self) -> bool:
         try:
-            new_obs = self.locals.get("new_obs") # 新状态
-            actions = self.locals.get("actions") # 动作
-            rewards = self.locals.get("rewards") # 奖励
-            dones = self.locals.get("dones") # 是否终止
-            infos = self.locals.get("infos") # 信息
+            new_obs = self.locals.get("new_obs")
+            actions = self.locals.get("actions")
+            rewards = self.locals.get("rewards")
+            dones = self.locals.get("dones")
+            infos = self.locals.get("infos")
 
             if len(new_obs) > 0:
-                step_obs = new_obs[0] # 新状态
-                step_action = actions[0] if actions is not None else None # 动作
-                step_reward = rewards[0] if rewards is not None else 0.0 # 奖励
-                step_done = dones[0] if dones is not None else False # 是否终止
-                step_info = infos[0] if infos is not None else {} # 信息
+                step_obs = new_obs[0]
+                step_action = actions[0] if actions is not None else None
+                step_reward = rewards[0] if rewards is not None else 0.0
+                step_done = dones[0] if dones is not None else False
+                step_info = infos[0] if infos is not None else {}
 
                 # 打印当前 step 的信息
                 if self.verbose:
@@ -440,11 +459,11 @@ class UFTrainingCallback(BaseCallback):
 
                 # 记录数据
                 self.recorder.record_step(
-                    obs=step_obs, # 新状态
-                    action=step_action, # 动作
-                    reward=step_reward, # 奖励
-                    done=step_done, # 是否终止
-                    info=step_info, # 信息
+                    obs=step_obs,
+                    action=step_action,
+                    reward=step_reward,
+                    done=step_done,
+                    info=step_info,
                 )
 
         except Exception as e:
@@ -454,112 +473,82 @@ class UFTrainingCallback(BaseCallback):
         return True
 
 
+
+
 class DQNTrainer:
     def __init__(self, env, params, callback=None):
-        """
-        初始化 DQN 训练器
-        :param env: 强化学习环境
-        :param params: DQNParams 实例
-        :param callback: 可选,训练回调器
-        """
-        self.env = env # 环境   
-        self.params = params # DQNParams 实例
-        self.callback = callback # 训练回调器
-        self.log_dir = self._create_log_dir() # 日志文件夹
-        self.model = self._create_model() # 模型
+        self.env = env
+        self.params = params
+        self.callback = callback
+        self.log_dir = self._create_log_dir()
+        self.model = self._create_model()
 
     def _create_log_dir(self):
-        """
-        自动生成日志文件夹名:包含核心超参数 + 时间戳
-        """
-        timestamp = time.strftime("%Y%m%d-%H%M%S") # 时间戳
+        timestamp = time.strftime("%Y%m%d-%H%M%S")
         log_name = (
-            f"DQN_lr{self.params.learning_rate}_buf{self.params.buffer_size}_bs{self.params.batch_size}" # 日志文件夹名
-            f"_gamma{self.params.gamma}_exp{self.params.exploration_fraction}" # 日志文件夹名
-            f"_{self.params.remark}_{timestamp}" # 日志文件夹名
+            f"DQN_lr{self.params.learning_rate}_buf{self.params.buffer_size}_bs{self.params.batch_size}"
+            f"_gamma{self.params.gamma}_exp{self.params.exploration_fraction}"
+            f"_{self.params.remark}_{timestamp}"
         )
-        log_dir = os.path.join("./uf_dqn_tensorboard", log_name) # 日志文件夹
-        os.makedirs(log_dir, exist_ok=True) # 创建日志文件夹
+        log_dir = os.path.join("./uf_dqn_tensorboard", log_name)
+        os.makedirs(log_dir, exist_ok=True)
         return log_dir
 
     def _create_model(self):
-        """
-        根据参数创建 DQN 模型
-        """
         return DQN(
-            policy="MlpPolicy", # 策略网络
-            env=self.env, # 环境
-            learning_rate=self.params.learning_rate, # 学习率
-            buffer_size=self.params.buffer_size, # 经验回放缓冲区大小
-            learning_starts=self.params.learning_starts, # 学习开始前需要收集的步数
-            batch_size=self.params.batch_size, # 每次从经验池中采样的样本数量
-            gamma=self.params.gamma, # 折扣因子,越接近1越重视长期奖励
-            train_freq=self.params.train_freq, # 每隔多少步训练一次
-            target_update_interval=self.params.target_update_interval, # 目标网络更新间隔
-            exploration_initial_eps=self.params.exploration_initial_eps, # 初始探索率 ε
-            exploration_fraction=self.params.exploration_fraction, # 从初始ε衰减到最终ε所占的训练比例
-            exploration_final_eps=self.params.exploration_final_eps, # 最终探索率 ε
+            policy="MlpPolicy",
+            env=self.env,
+            learning_rate=self.params.learning_rate,
+            buffer_size=self.params.buffer_size,  # 大缓冲保证经验多样性
+            learning_starts=self.params.learning_starts,
+            batch_size=self.params.batch_size,
+            gamma=self.params.gamma,
+            train_freq=self.params.train_freq,
+            target_update_interval=1,
+            tau=0.005,
+            exploration_initial_eps=self.params.exploration_initial_eps,
+            exploration_fraction=self.params.exploration_fraction,
+            exploration_final_eps=self.params.exploration_final_eps,
             verbose=1,
             tensorboard_log=self.log_dir
+            # 不再指定 replay_buffer_class,默认使用 ReplayBuffer
         )
 
     def train(self, total_timesteps: int):
-        """
-        训练 DQN 模型,支持自定义回调器
-        """
         if self.callback:
-            self.model.learn(total_timesteps=total_timesteps, callback=self.callback) # 支持自定义回调器
+            self.model.learn(total_timesteps=total_timesteps, callback=self.callback)
         else:
-            self.model.learn(total_timesteps=total_timesteps) # 不支持自定义回调器
+            self.model.learn(total_timesteps=total_timesteps)
         print(f"模型训练完成,日志保存在:{self.log_dir}")
 
     def save(self, path=None):
-        """
-        保存模型到指定路径
-        """
         if path is None:
-            path = os.path.join(self.log_dir, "dqn_model.zip") # 模型文件名
+            path = os.path.join(self.log_dir, "dqn_model.zip")
         self.model.save(path)
         print(f"模型已保存到:{path}")
 
     def load(self, path):
-        """
-        从指定路径加载模型
-        """
-        self.model = DQN.load(path, env=self.env) # 加载模型
+        self.model = DQN.load(path, env=self.env)
         print(f"模型已从 {path} 加载")
 
-def train_uf_rl_agent(params: UFParams, total_timesteps: int = 10000, seed: int = 2025):
-    """训练超滤系统RL代理(固定随机种子)"""
 
-    # === 1. 固定全局随机种子 ===
+def train_uf_rl_agent(params: UFParams, total_timesteps: int = 10000, seed: int = 2025):
     set_global_seed(seed)
+    recorder = UFEpisodeRecorder()
+    callback = UFTrainingCallback(recorder, verbose=1)
 
-    # === 2. 创建回调器 ===
-    recorder = UFEpisodeRecorder() # 记录每一步的数据
-    callback = UFTrainingCallback(recorder, verbose=1) # 训练回调器
-
-    # === 3. 创建环境并固定种子 ===
     def make_env():
-        env = UFSuperCycleEnv(params) # 创建环境
-        env = Monitor(env) # 监控环境
+        env = UFSuperCycleEnv(params)
+        env = Monitor(env)
         return env
 
-    env = DummyVecEnv([make_env]) # 创建环境 多进程 
+    env = DummyVecEnv([make_env])
 
-    # === 4. 定义DQN参数 ===
     dqn_params = DQNParams()
-
-    # === 5. 创建训练器 ===
-    trainer = DQNTrainer(env, dqn_params, callback=callback) 
-
-    # === 6. 训练模型 ===
+    trainer = DQNTrainer(env, dqn_params, callback=callback)
     trainer.train(total_timesteps)
-
-    # === 7. 保存模型 ===
     trainer.save()
 
-    # === 8. 输出训练统计信息 ===
     stats = callback.recorder.get_episode_stats()
     print(f"训练完成 - 总奖励: {stats.get('total_reward', 0):.2f}, 平均回收率: {stats.get('avg_recovery', 0):.3f}")
 
@@ -573,5 +562,5 @@ if __name__ == "__main__":
 
     # 训练RL代理
     print("开始训练RL代理...")
-    train_uf_rl_agent(params, total_timesteps=8000)
+    train_uf_rl_agent(params, total_timesteps=50000)
 

+ 4 - 4
device_states.json

@@ -3,21 +3,21 @@
     "UF1": {
         "model_prev_L_s": 4100,
         "model_prev_t_bw_s": 94.0,
-        "last_cycle_end_time": "2025-10-11 02:35:57"
+        "last_cycle_end_time": "2025-10-17 10:16:35"
     },
     "UF2": {
         "model_prev_L_s": 4100,
         "model_prev_t_bw_s": 94.0,
-        "last_cycle_end_time": "2025-10-11 16:18:23"
+        "last_cycle_end_time": "2025-10-14 18:20:14"
     },
     "UF3": {
         "model_prev_L_s": 4100.0,
         "model_prev_t_bw_s": 94.0,
-        "last_cycle_end_time": "2025-10-11 16:01:39"
+        "last_cycle_end_time": "2025-10-14 15:11:26"
     },
     "UF4": {
         "model_prev_L_s": 4100,
         "model_prev_t_bw_s": 94.0,
-        "last_cycle_end_time": "2025-10-11 15:47:43"
+        "last_cycle_end_time": "2025-10-14 19:33:40"
     }
 }

BIN=BIN
dqn_model.zip


+ 172 - 74
loop_main.py

@@ -2,7 +2,6 @@
 import time
 import json
 import os
-import statistics
 import threading
 import hashlib
 from datetime import datetime, timedelta
@@ -199,7 +198,7 @@ def create_db_connection():
 
 def get_tmp_extremes(item_name, start_time, end_time, word_control):
     """
-    查询历史数据中指定时间范围内的跨膜压差极值
+    通过API查询历史数据中指定时间范围内的跨膜压差极值
     
     参数:
         item_name: 数据项名称
@@ -210,51 +209,101 @@ def get_tmp_extremes(item_name, start_time, end_time, word_control):
     返回:
         (最大值, 最小值) 或 (None, None)
     """
-    start_time_str = start_time.strftime(DATETIME_FORMAT)
-    end_time_str = end_time.strftime(DATETIME_FORMAT)
-
-    query = f"""
-            SELECT
-                MAX(val) AS max_val,
-                MIN(val) AS min_val
-            FROM {HISTORY_TABLE_NAME}
-            WHERE project_id = %s
-              AND item_name = %s
-              AND h_time IN (
-                  SELECT h_time
-                  FROM {HISTORY_TABLE_NAME}
-                  WHERE project_id = %s
-                    AND item_name = %s
-                    AND val = 26
-                    AND h_time BETWEEN %s AND %s
-              )
-        """
-    logger.info(f"查询历史极值 {item_name} 从 {start_time_str} 到 {end_time_str}")
-    logger.debug(query)
+    # 转换时间为毫秒级时间戳
+    start_timestamp = int(start_time.timestamp() * 1000)
+    end_timestamp = int(end_time.timestamp() * 1000)
+    
+    logger.info(f"查询历史极值 {item_name} 从 {start_time.strftime(DATETIME_FORMAT)} 到 {end_time.strftime(DATETIME_FORMAT)}")
+    
+    # API基础URL
+    api_base_url = "http://120.55.44.4:8900/api/v1/jinke-cloud/db/device/history-data"
     
-    db_connection = create_db_connection()
-    if not db_connection:
-        return None, None
-
     try:
-        with db_connection.cursor() as cursor:
-            cursor.execute(query, (PROJECT_ID_FOR_CALLBACK, item_name, PROJECT_ID_FOR_CALLBACK, word_control, start_time_str, end_time_str))
-            result = cursor.fetchone()
-            logger.debug(f"查询结果: {result}")
-            if result and result['max_val'] is not None and result['min_val'] is not None:
-                max_val = float(result['max_val'])
-                min_val = float(result['min_val'])
-                logger.info(f"查询成功 最大值={max_val} 最小值={min_val}")
-                return max_val, min_val
-            else:
-                logger.warning("查询未返回有效数据")
-                return None, None
-    except pymysql.MySQLError as e:
-        logger.error(f"数据库查询错误: {e}")
+        # 第一次调用:查询item_name的极值
+        params1 = {
+            "deviceid": "1",
+            "dataitemid": item_name,
+            "project_id": "92",
+            "stime": start_timestamp,
+            "etime": end_timestamp,
+            "size": "1",
+            "interval": "minute",
+            "aggregator": "new"
+        }
+        
+        logger.info(f"第一次API调用: {api_base_url} 参数: {params1}")
+        response1 = requests.get(api_base_url, params=params1, headers=HEADERS, timeout=30)
+        response1.raise_for_status()
+        data1 = response1.json()
+        logger.debug(f"第一次API响应: {data1}")
+        
+        # 第二次调用:查询word_control的极值
+        params2 = {
+            "deviceid": "1", 
+            "dataitemid": word_control,
+            "project_id": "92",
+            "stime": start_timestamp,
+            "etime": end_timestamp,
+            "size": "1",
+            "interval": "minute",
+            "aggregator": "new"
+        }
+        
+        logger.info(f"第二次API调用: {api_base_url} 参数: {params2}")
+        response2 = requests.get(api_base_url, params=params2, headers=HEADERS, timeout=30)
+        response2.raise_for_status()
+        data2 = response2.json()
+        logger.debug(f"第二次API响应: {data2}")
+
+        # 处理两次API调用的结果
+        max_val = None
+        min_val = None
+
+        # 从第一次调用结果中提取'UF1跨膜压差'的值,并存储在字典中,以时间为键
+        uf1_diff_values = {}
+        if data1.get("code") == 200 and data1.get("data"):
+            for item in data1["data"]:
+                if item.get("name") == "UF1跨膜压差" and item.get("val") is not None:
+                    time = item.get("htime_at")
+                    uf1_diff_values[time] = float(item.get("val"))
+            if uf1_diff_values:
+                logger.info(f"第一次API查询成功,提取到跨膜压差数据:{uf1_diff_values}")
+
+        # 从第二次调用结果中提取'UF1控制字'为26的数据点,并进行时间匹配
+        if data2.get("code") == 200 and data2.get("data"):
+            control_26_values = []
+            for item in data2["data"]:
+                if item.get("name") == "UF1控制字" and item.get("val") == '26':
+                    time = item.get("htime_at")
+                    # 如果在第一次数据中找到了对应的跨膜压差值
+                    if time in uf1_diff_values:
+                        control_26_values.append(uf1_diff_values[time])
+
+            if control_26_values:
+                logger.info(f"找到控制字为26的数据点,合并跨膜压差数据")
+                max_val = max(control_26_values)
+                min_val = min(control_26_values)
+                # 增加最小跨膜压差的下限值
+                if min_val < 0.01:
+                    min_val = 0.01
+                logger.info(f"控制字为26时的最大跨膜压差值={max_val},最小跨膜压差值={min_val}")
+
+        if max_val is not None and min_val is not None:
+            logger.info(f"API查询成功 最大跨膜压差值={max_val} 最小跨膜压差值={min_val}")
+            return max_val, min_val
+        else:
+            logger.warning("未找到有效的控制字为26时的跨膜压差数据")
+            return None, None
+            
+    except requests.exceptions.RequestException as e:
+        logger.error(f"API请求错误: {e}")
+        return None, None
+    except (json.JSONDecodeError, ValueError, KeyError) as e:
+        logger.error(f"API响应解析错误: {e}")
+        return None, None
+    except Exception as e:
+        logger.error(f"API查询未知错误: {e}")
         return None, None
-    finally:
-        if db_connection and db_connection.open:
-            db_connection.close()
 
 
 def generate_md5_signature(record_data, secret, timestamp):
@@ -424,45 +473,47 @@ def monitor_device(device):
     # 主循环
     while True:
         try:
-            # 阶段1: 等待触发条件
+            # 阶段1: 等待触发条件 (控制字=95)
             logger.info(f"等待触发 控制字需等于 {TRIGGER_VALUE}")
             while True:
                 control_value = get_device_value(device["control_payload"], name)  # 控制字
-                if control_value is not None and int(control_value) == TRIGGER_VALUE:  # 控制字 等于 触发值
-                    logger.info("触发条件满足")
+                if control_value is not None and int(control_value) == TRIGGER_VALUE:  # 控制字 等于 触发值 95
+                    logger.info("触发条件满足,开始等待控制字变为26")
+                    break
+                time.sleep(POLL_INTERVAL)
+
+            # 阶段1.5: 等待控制字变为26
+            logger.info("等待控制字变为26")
+            while True:
+                control_value = get_device_value(device["control_payload"], name)  # 控制字
+                if control_value is not None and int(control_value) == 26:  # 控制字 等于 26
+                    logger.info("控制字变为26,开始收集10分钟数据")
                     break
                 time.sleep(POLL_INTERVAL)
 
-            # 阶段2: 收集数据
-            logger.info(f"开始收集TMP数据 需要 {NUM_VALUES_TO_COLLECT} 个有效数据点")
+            # 阶段2: 收集10分钟数据并计算平均值
+            logger.info("开始收集10分钟TMP数据")
             collected_values = []
-            last_known_value = get_device_value(device["target_payload"], name)  # 上次已知值
+            start_collection_time = datetime.now()
+            collection_duration = timedelta(minutes=10)  # 10分钟
             
-            if last_known_value is not None:
-                logger.info(f"TMP基准值 {last_known_value}")
-                # 循环收集数据点,直到达到目标数量
-                while len(collected_values) < NUM_VALUES_TO_COLLECT:  # 收集数据点 直到达到目标数量 
-                    current_value = get_device_value(device["target_payload"], name)  # 当前值
-                    if current_value is None:
-                        time.sleep(POLL_INTERVAL)
-                        continue
-
-                    # 只有当数值发生变化时才记录
-                    if current_value != last_known_value:  # 当前值 不等于 上次已知值
-                        collected_values.append(current_value)
-                        logger.info(f"TMP变化 {last_known_value:.4f} 到 {current_value:.4f} 已收集 {len(collected_values)}/{NUM_VALUES_TO_COLLECT}")
-                        last_known_value = current_value
-                    time.sleep(POLL_INTERVAL)
-            else:
-                logger.warning("无法获取TMP基准值,跳过本轮")
+            while datetime.now() - start_collection_time < collection_duration:
+                current_value = get_device_value(device["target_payload"], name)  # 当前值
+                if current_value is not None:
+                    collected_values.append(current_value)
+                    logger.info(f"收集TMP值 {current_value:.4f} 已收集 {len(collected_values)} 个数据点")
+                time.sleep(POLL_INTERVAL)
+            
+            if not collected_values:
+                logger.warning("10分钟内未收集到有效数据,跳过本轮")
                 continue
 
             # 阶段3: 决策计算
             logger.info("数据收集完成,开始决策计算")
             if collected_values:
-                # 计算中位数作为代表值
-                median_value = statistics.median(sorted(collected_values))
-                logger.info(f"TMP中位数 {median_value:.4f}")
+                # 计算平均值作为代表值
+                average_value = sum(collected_values) / len(collected_values)
+                logger.info(f"TMP平均值 {average_value:.4f}")
 
                 # 确定历史数据查询时间范围
                 current_decision_time = datetime.now()
@@ -474,7 +525,7 @@ def monitor_device(device):
 
                 # 调用DQN模型获取决策建议
                 logger.info("调用DQN决策模型")
-                uf_bw_dict = run_uf_DQN_decide(uf_params, median_value)
+                uf_bw_dict = run_uf_DQN_decide(uf_params, average_value)
                 logger.info(f"模型决策结果 {uf_bw_dict}")
 
                 # 获取当前PLC参数
@@ -489,8 +540,8 @@ def monitor_device(device):
                 )
                 
                 # 计算运行指标
-                logger.info(f"计算运行指标 TMP={median_value} L_s={L_s} t_bw_s={t_bw_s}")
-                metrics = calc_uf_cycle_metrics(uf_params, median_value, max_tmp, min_tmp, L_s, t_bw_s)  # 计算运行指标
+                logger.info(f"计算运行指标 TMP={average_value} L_s={L_s} t_bw_s={t_bw_s}")
+                metrics = calc_uf_cycle_metrics(uf_params, average_value, max_tmp, min_tmp, L_s, t_bw_s)  # 计算运行指标
 
                 # 发送决策结果
                 send_decision_to_callback(
@@ -499,7 +550,7 @@ def monitor_device(device):
                     physical_backwash=int(t_bw_s),  # 反洗时间
                     ceb_backwash_frequency=int(metrics["k_bw_per_ceb"]),  # 化学反洗频率
                     duration_system=int(prod_time),  # 系统运行时间
-                    tmp_action=median_value,  # TMP动作
+                    tmp_action=average_value,  # TMP动作
                     recovery_rate=metrics["recovery"],  # 回收率
                     ton_water_energy_kWh=metrics['ton_water_energy_kWh_per_m3'],  # 吨水电耗
                     max_permeability=metrics['max_permeability'],  # 最高渗透率
@@ -579,5 +630,52 @@ def main():
         logger.info("检测到中断信号,程序退出")
 
 
+def test_get_tmp_extremes():
+    """
+    测试get_tmp_extremes函数的API调用
+    """
+    print("=" * 50)
+    print("测试get_tmp_extremes API调用")
+    print("=" * 50)
+    
+    # 设置测试参数
+    test_item_name = "C.M.UF1_DB@press_PV"  # 测试数据项
+    test_word_control = "C.M.UF1_DB@word_control"  # 测试控制字段
+    
+    # 设置测试时间范围(最近24小时)
+    end_time = datetime.now()
+    start_time = end_time - timedelta(hours=24)
+    
+    print(f"测试参数:")
+    print(f"  数据项: {test_item_name}")
+    print(f"  控制字段: {test_word_control}")
+    print(f"  开始时间: {start_time.strftime(DATETIME_FORMAT)}")
+    print(f"  结束时间: {end_time.strftime(DATETIME_FORMAT)}")
+    print()
+    
+    try:
+        # 调用函数
+        max_val, min_val = get_tmp_extremes(test_item_name, start_time, end_time, test_word_control)
+        
+        print("测试结果:")
+        if max_val is not None and min_val is not None:
+            print(f"  API调用成功")
+            print(f"  最大值: {max_val}")
+            print(f"  最小值: {min_val}")
+        else:
+            print(f"  API调用失败或未返回有效数据")
+            print(f"  最大值: {max_val}")
+            print(f"  最小值: {min_val}")
+            
+    except Exception as e:
+        print(f" 测试过程中发生异常: {e}")
+    
+    print("=" * 50)
+
+
 if __name__ == "__main__":
+    # 运行测试用例
+    # test_get_tmp_extremes()
+    
+    # 运行主程序
     main()

+ 2 - 2
requirements.txt

@@ -7,8 +7,8 @@ numpy>=1.23.0
 torch>=2.0.0
 
 # 强化学习框架
-gymnasium>=0.29.0
-stable-baselines3>=2.0.0
+gymnasium>=1.2.0
+stable-baselines3>=2.6.0
 
 # 数据库连接
 pymysql>=1.0.0