import numpy as np from stable_baselines3 import DQN from fixed_DQN_env import UFSuperCycleEnv, UFParams from fixed_DQN_env import simulate_one_supercycle # 模型路径 MODEL_PATH = "model/dqn_model.zip" # 加载模型(只加载一次,提高效率) model = DQN.load(MODEL_PATH) def run_uf_DQN_decide(uf_params, TMP0_value: float): """ 单步决策函数(新版): 当前模型只输出进水时间 L_s,不输出反洗时间。 """ # 1. 初始化环境 env = UFSuperCycleEnv(uf_params) # 2. 设置 TMP0 env.current_params.TMP0 = TMP0_value # 3. 获取观察(归一化) obs = env._get_obs().reshape(1, -1) # 4. 模型预测动作 action, _ = model.predict(obs, deterministic=True) # 5. 新模型动作只返回 L_s L_s = env._get_action_values(action[0]) # 单值 # 6. 在环境中执行动作 next_obs, reward, terminated, truncated, info = env.step(action[0]) # 7. 返回结构化结果 return { "action": int(action[0]), "L_s": float(L_s), "t_bw_s": None, # 保留字段但固定为 None "next_obs": next_obs, "reward": reward, "terminated": terminated, "truncated": truncated, "info": info } def generate_plc_instructions(current_L_s, model_prev_L_s, model_L_s): """ 根据工厂当前值、模型上一轮决策值和模型当前轮决策值,生成PLC指令。 新增功能: 1. 处理None值情况:如果模型上一轮值为None,则使用工厂当前值; 如果工厂当前值也为None,则返回None并提示错误。 """ # 参数配置保持不变 params = UFParams( L_min_s=3600.0, L_max_s=4800.0, L_step_s=60.0, ) # 参数解包 L_step_s = params.L_step_s L_min_s = params.L_min_s L_max_s = params.L_max_s adjustment_threshold = 1.0 # 处理None值情况 if model_prev_L_s is None: if current_L_s is None: print("错误: 过滤时长的工厂当前值和模型上一轮值均为None") return None, None else: # 使用工厂当前值作为基准 effective_current_L = current_L_s source_L = "工厂当前值(模型上一轮值为None)" else: # 模型上一轮值不为None,继续检查工厂当前值 if current_L_s is None: effective_current_L = model_prev_L_s source_L = "模型上一轮值(工厂当前值为None)" else: effective_current_L = model_prev_L_s source_L = "模型上一轮值" # 检测所有输入值是否在规定范围内(只对非None值进行检查) # 工厂当前值检查(警告) if current_L_s is not None and not (L_min_s <= current_L_s <= L_max_s): print(f"警告: 当前过滤时长 {current_L_s} 秒不在允许范围内 [{L_min_s}, {L_max_s}]") # 模型上一轮决策值检查(警告) if model_prev_L_s is not None and not (L_min_s <= model_prev_L_s <= L_max_s): print(f"警告: 模型上一轮过滤时长 {model_prev_L_s} 秒不在允许范围内 [{L_min_s}, {L_max_s}]") # 模型当前轮决策值检查(错误) if model_L_s is None: raise ValueError("错误: 决策模型建议的过滤时长不能为None") elif not (L_min_s <= model_L_s <= L_max_s): raise ValueError(f"错误: 决策模型建议的过滤时长 {model_L_s} 秒不在允许范围内 [{L_min_s}, {L_max_s}]") print(f"过滤时长基准: {source_L}, 值: {effective_current_L}") # 使用选定的基准值进行计算调整 L_diff = model_L_s - effective_current_L L_adjustment = 0 if abs(L_diff) >= adjustment_threshold * L_step_s: if L_diff >= 0: L_adjustment = L_step_s else: L_adjustment = -L_step_s next_L_s = effective_current_L + L_adjustment return next_L_s def calc_uf_cycle_metrics(p, TMP0, max_tmp_during_filtration, min_tmp_during_filtration, L_s: float, t_bw_s): """ 计算 UF 超滤系统的核心性能指标 参数: p (UFParams): UF 系统参数 L_s (float): 单次过滤时间(秒) t_bw_s (float): 单次反洗时间(秒) 返回: dict: { "k_bw_per_ceb": 小周期次数, "ton_water_energy_kWh_per_m3": 吨水电耗, "recovery": 回收率, "net_delivery_rate_m3ph": 净供水率 (m³/h), "daily_prod_time_h": 日均产水时间 (小时/天) "max_permeability": 全周期最高渗透率(lmh/bar) } """ # 将跨膜压差写入参数 p.TMP0 = TMP0 # 模拟该参数下的超级周期 info, next_params = simulate_one_supercycle(p, L_s, t_bw_s) # 获得模型模拟周期信息 k_bw_per_ceb = info["k_bw_per_ceb"] ton_water_energy_kWh_per_m3 = info["ton_water_energy_kWh_per_m3"] recovery = info["recovery"] daily_prod_time_h = info["daily_prod_time_h"] # 获得模型模拟周期内最高跨膜压差/最低跨膜压差 if max_tmp_during_filtration is None: max_tmp_during_filtration = info["max_TMP_during_filtration"] if min_tmp_during_filtration is None: min_tmp_during_filtration = info["min_TMP_during_filtration"] # 计算最高渗透率 max_permeability = 100 * p.q_UF / (128*40) / min_tmp_during_filtration return { "k_bw_per_ceb": k_bw_per_ceb, "ton_water_energy_kWh_per_m3": ton_water_energy_kWh_per_m3, "recovery": recovery, "daily_prod_time_h": daily_prod_time_h, "max_permeability": max_permeability } # ============================== # 示例调用 # ============================== if __name__ == "__main__": # ------------------------- # 1. 初始化参数 # ------------------------- uf_params = UFParams() TMP0 = 0.01 # 原始跨膜压差 # ------------------------- # 2. 调用模型做一次决策(只输出 L_s) # ------------------------- model_decide_result = run_uf_DQN_decide(uf_params, TMP0) model_L_s = model_decide_result["L_s"] # 只输出 L_s print(f"模型决策进水时长 L_s = {model_L_s}") # ------------------------- # 3. 工厂当前值 + 模型上一轮值(示例值) # ------------------------- current_L_s = 3800 model_prev_L_s = 4040 # ------------------------- # 4. 生成 PLC 指令(新版仅 L_s) # ------------------------- plc_L_s = generate_plc_instructions(current_L_s,model_prev_L_s,model_L_s) print(f"PLC 指令 L_s = {plc_L_s}") # ------------------------- # 5. 反洗时长由工厂参数决定/固定值 # (新模型不输出 t_bw_s) # ------------------------- plc_t_bw_s = uf_params.fixed_t_bw_s # ------------------------- # 6. 工厂 TMP 最大/最小(可为空 None) # ------------------------- max_tmp_during_filtration = 0.050176 min_tmp_during_filtration = 0.012496 # ------------------------- # 7. 计算周期指标(模型动作实际效果) # ------------------------- execution_result = calc_uf_cycle_metrics( p=uf_params, TMP0=TMP0, max_tmp_during_filtration=max_tmp_during_filtration, min_tmp_during_filtration=min_tmp_during_filtration, L_s=plc_L_s, t_bw_s=plc_t_bw_s # 仍需要反洗时长参数 ) # ------------------------- # 8. 打印结果 # ------------------------- print("\n===== 单步决策结果 =====") print(f"模型动作编号: {model_decide_result['action']}") print(f"模型选择的 L_s: {model_L_s} 秒") print(f"PLC 下发 L_s: {plc_L_s} 秒") print(f"PLC 下发反洗时长(固定) t_bw_s: {plc_t_bw_s} 秒") print(f"周期对应的反洗次数: {execution_result['k_bw_per_ceb']}") print(f"吨水电耗: {execution_result['ton_water_energy_kWh_per_m3']}") print(f"回收率: {execution_result['recovery']}") print(f"日均产水时间: {execution_result['daily_prod_time_h']}") print(f"最高渗透率: {execution_result['max_permeability']}")