| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265 |
- import numpy as np
- from stable_baselines3 import DQN
- from DQN_env import UFSuperCycleEnv
- from DQN_env import UFParams
- # 模型路径
- MODEL_PATH = "/Users/wmy/data/Pycharm_Project/jkhj/jkhj_test1/shuangmo/wmy/ultrafiltration_超滤/v2/dqn_model.zip"
- # 加载模型(只加载一次,提高效率)
- model = DQN.load(MODEL_PATH)
- def run_uf_DQN_decide(uf_params, TMP0_value: float):
- """
- 单步决策函数:输入原始 TMP0,预测并执行动作
- 参数:
- TMP0_value (float): 当前 TMP0 值(单位与环境一致)
- 返回:
- dict: 包含模型选择的动作、动作参数、新状态、奖励等
- """
- # 1. 实例化环境
- base_params = uf_params
- env = UFSuperCycleEnv(base_params)
- # 2. 将输入的 TMP0 写入环境
- env.current_params.TMP0 = TMP0_value
- # 3. 获取归一化状态
- obs = env._get_obs().reshape(1, -1)
- # 4. 模型预测动作
- action, _ = model.predict(obs, deterministic=True)
- # 5. 解析动作对应的 L_s 和 t_bw_s 对应过滤时长和物洗时长
- L_s, t_bw_s = env._get_action_values(action[0])
- # 6. 在环境中执行该动作 执行一个超级周期
- next_obs, reward, terminated, truncated, info = env.step(action[0])
- # 7. 整理结果
- result = {
- "action": int(action[0]), # 动作
- "L_s": float(L_s), # 过滤时长
- "t_bw_s": float(t_bw_s), # 物洗时长
- "next_obs": next_obs, # 新状态
- "reward": reward, # 奖励
- "terminated": terminated, # 是否终止
- "truncated": truncated, # 是否截断
- "info": info # 信息 可行性、步数、奖励等
- }
- # 8. 关闭环境
- env.close()
- return result
- def generate_plc_instructions(current_L_s, current_t_bw_s, model_prev_L_s, model_prev_t_bw_s, model_L_s, model_t_bw_s):
- """
- 根据工厂当前值、模型上一轮决策值和模型当前轮决策值,生成PLC指令。
- 新增功能:
- 1. 处理None值情况:如果模型上一轮值为None,则使用工厂当前值;
- 如果工厂当前值也为None,则返回None并提示错误。
- """
- # 参数配置保持不变
- params = UFParams(
- L_min_s=3600.0, L_max_s=4200.0, L_step_s=60.0,
- t_bw_min_s=90.0, t_bw_max_s=100.0, t_bw_step_s=2.0,
- )
- # 参数解包
- L_step_s = params.L_step_s # 过滤时长步长
- t_bw_step_s = params.t_bw_step_s # 物洗时长步长
- L_min_s = params.L_min_s # 过滤时长下限
- L_max_s = params.L_max_s # 过滤时长上限
- t_bw_min_s = params.t_bw_min_s # 物洗时长下限
- t_bw_max_s = params.t_bw_max_s # 物洗时长上限
- adjustment_threshold = 1.0 # 调整阈值
- # 处理None值情况
- if model_prev_L_s is None:
- if current_L_s is None:
- print("错误: 过滤时长的工厂当前值和模型上一轮值均为None")
- return None, None
- else:
- # 使用工厂当前值作为基准
- effective_current_L = current_L_s
- source_L = "工厂当前值(模型上一轮值为None)"
- else:
- # 模型上一轮值不为None,继续检查工厂当前值
- if current_L_s is None:
- effective_current_L = model_prev_L_s
- source_L = "模型上一轮值(工厂当前值为None)"
- else:
- # 两个值都不为None,比较哪个更接近模型当前建议值
- current_to_model_diff = abs(current_L_s - model_L_s)
- prev_to_model_diff = abs(model_prev_L_s - model_L_s)
- if current_to_model_diff <= prev_to_model_diff:
- effective_current_L = current_L_s
- source_L = "工厂当前值"
- else:
- effective_current_L = model_prev_L_s
- source_L = "模型上一轮值"
- # 对反洗时长进行同样的处理
- if model_prev_t_bw_s is None:
- if current_t_bw_s is None:
- print("错误: 反洗时长的工厂当前值和模型上一轮值均为None")
- return None, None
- else:
- effective_current_t_bw = current_t_bw_s
- source_t_bw = "工厂当前值(模型上一轮值为None)"
- else:
- if current_t_bw_s is None:
- effective_current_t_bw = model_prev_t_bw_s
- source_t_bw = "模型上一轮值(工厂当前值为None)"
- else:
- current_to_model_t_bw_diff = abs(current_t_bw_s - model_t_bw_s)
- prev_to_model_t_bw_diff = abs(model_prev_t_bw_s - model_t_bw_s)
- if current_to_model_t_bw_diff <= prev_to_model_t_bw_diff:
- effective_current_t_bw = current_t_bw_s
- source_t_bw = "工厂当前值"
- else:
- effective_current_t_bw = model_prev_t_bw_s
- source_t_bw = "模型上一轮值"
- # 检测所有输入值是否在规定范围内(只对非None值进行检查)
- # 工厂当前值检查(警告)
- if current_L_s is not None and not (L_min_s <= current_L_s <= L_max_s):
- print(f"警告: 当前过滤时长 {current_L_s} 秒不在允许范围内 [{L_min_s}, {L_max_s}]")
- if current_t_bw_s is not None and not (t_bw_min_s <= current_t_bw_s <= t_bw_max_s):
- print(f"警告: 当前反洗时长 {current_t_bw_s} 秒不在允许范围内 [{t_bw_min_s}, {t_bw_max_s}]")
- # 模型上一轮决策值检查(警告)
- if model_prev_L_s is not None and not (L_min_s <= model_prev_L_s <= L_max_s):
- print(f"警告: 模型上一轮过滤时长 {model_prev_L_s} 秒不在允许范围内 [{L_min_s}, {L_max_s}]")
- if model_prev_t_bw_s is not None and not (t_bw_min_s <= model_prev_t_bw_s <= t_bw_max_s):
- print(f"警告: 模型上一轮反洗时长 {model_prev_t_bw_s} 秒不在允许范围内 [{t_bw_min_s}, {t_bw_max_s}]")
- # 模型当前轮决策值检查(错误)
- if model_L_s is None:
- raise ValueError("错误: 决策模型建议的过滤时长不能为None")
- elif not (L_min_s <= model_L_s <= L_max_s):
- raise ValueError(f"错误: 决策模型建议的过滤时长 {model_L_s} 秒不在允许范围内 [{L_min_s}, {L_max_s}]")
- if model_t_bw_s is None:
- raise ValueError("错误: 决策模型建议的反洗时长不能为None")
- elif not (t_bw_min_s <= model_t_bw_s <= t_bw_max_s):
- raise ValueError(f"错误: 决策模型建议的反洗时长 {model_t_bw_s} 秒不在允许范围内 [{t_bw_min_s}, {t_bw_max_s}]")
- print(f"过滤时长基准: {source_L}, 值: {effective_current_L}")
- print(f"反洗时长基准: {source_t_bw}, 值: {effective_current_t_bw}")
- # 使用选定的基准值进行计算调整
- L_diff = model_L_s - effective_current_L # 过滤时长差值
- L_adjustment = 0
- if abs(L_diff) > adjustment_threshold * L_step_s: # 如果过滤时长差值超过调整阈值
- if L_diff > 0:
- L_adjustment = L_step_s # 增加过滤时长步长
- else:
- L_adjustment = -L_step_s # 减少过滤时长步长
- next_L_s = effective_current_L + L_adjustment # 调整后的过滤时长
- t_bw_diff = model_t_bw_s - effective_current_t_bw # 反洗时长差值
- t_bw_adjustment = 0
- if abs(t_bw_diff) > adjustment_threshold * t_bw_step_s: # 如果反洗时长差值超过调整阈值 调整阈值 * 反洗时长步长
- if t_bw_diff > 0:
- t_bw_adjustment = t_bw_step_s # 增加反洗时长步长
- else:
- t_bw_adjustment = -t_bw_step_s # 减少反洗时长步长
- next_t_bw_s = effective_current_t_bw + t_bw_adjustment # 调整后的反洗时长
- return next_L_s, next_t_bw_s
- from DQN_env import simulate_one_supercycle
- def calc_uf_cycle_metrics(p, TMP0, max_tmp_during_filtration, min_tmp_during_filtration, L_s: float, t_bw_s: float):
- """
- 计算 UF 超滤系统的核心性能指标
- 参数:
- p (UFParams): UF 系统参数
- L_s (float): 单次过滤时间(秒)
- t_bw_s (float): 单次反洗时间(秒)
- 返回:
- dict: {
- "k_bw_per_ceb": 小周期次数,
- "ton_water_energy_kWh_per_m3": 吨水电耗,
- "recovery": 回收率,
- "net_delivery_rate_m3ph": 净供水率 (m³/h),
- "daily_prod_time_h": 日均产水时间 (小时/天)
- "max_permeability": 全周期最高渗透率(lmh/bar)
- }
- """
- # 将跨膜压差写入参数
- p.TMP0 = TMP0
- # 模拟该参数下的超级周期
- feasible, info = simulate_one_supercycle(p, L_s, t_bw_s)
- # 获得模型模拟周期信息
- k_bw_per_ceb = info["k_bw_per_ceb"] # 超级周期内CEB次数
- ton_water_energy_kWh_per_m3 = info["ton_water_energy_kWh_per_m3"] # 吨水电耗
- recovery = info["recovery"] # 回收率
- net_delivery_rate_m3ph = info["net_delivery_rate_m3ph"] # 净供水率
- daily_prod_time_h = info["daily_prod_time_h"] # 日均产水时间
- # # 获得模型模拟周期内最高跨膜压差/最低跨膜压差
- # if max_tmp_during_filtration is None:
- # max_tmp_during_filtration = info["max_TMP_during_filtration"]
- # if min_tmp_during_filtration is None:
- # min_tmp_during_filtration = info["min_TMP_during_filtration"]
- # 获取模拟周期内最大/最小跨膜压差(如果未传入,则从 info 中获取)
- max_tmp_during_filtration = max_tmp_during_filtration if max_tmp_during_filtration is not None else info.get(
- "max_TMP_during_filtration", None) # 过滤时段TMP峰值
- min_tmp_during_filtration = min_tmp_during_filtration if min_tmp_during_filtration is not None else info.get(
- "min_TMP_during_filtration", None) # 过滤时段TMP最低值
- # 计算最高渗透率 最高渗透率 = 100 * 过滤流量 / (128*40) / 过滤时段TMP最低值
- max_permeability = 100 * p.q_UF / (128*40) / min_tmp_during_filtration
- return {
- "k_bw_per_ceb": k_bw_per_ceb, # 超级周期内CEB次数
- "ton_water_energy_kWh_per_m3": ton_water_energy_kWh_per_m3, # 吨水电耗
- "recovery": recovery, # 回收率
- "net_delivery_rate_m3ph": net_delivery_rate_m3ph, # 净供水率
- "daily_prod_time_h": daily_prod_time_h, # 日均产水时间
- "max_permeability": max_permeability # 最高渗透率
- }
- # ==============================
- # 示例调用
- # ==============================
- if __name__ == "__main__":
- uf_params = UFParams()
- TMP0 = 0.03 # 原始 TMP0
- model_decide_result = run_uf_DQN_decide(uf_params, TMP0) # 调用模型获得动作
- model_L_s = model_decide_result['L_s'] # 获得模型决策产水时长
- model_t_bw_s = model_decide_result['t_bw_s'] # 获得模型决策反洗时长
- current_L_s = 3800
- current_t_bw_s = 100
- model_prev_L_s = None
- model_prev_t_bw_s = None
- L_s, t_bw_s = generate_plc_instructions(current_L_s, current_t_bw_s, model_prev_L_s, model_prev_t_bw_s, model_L_s, model_t_bw_s) # 获取模型下发指令
- max_tmp_during_filtration = None # 新增工厂数据接口:周期最高/最低跨膜压差,无工厂数据接入时传入None,calc_uf_cycle_metrics()自动获取模拟周期中的跨膜压差最值
- min_tmp_during_filtration = None
- execution_result = calc_uf_cycle_metrics(uf_params, TMP0, max_tmp_during_filtration, min_tmp_during_filtration, model_L_s, model_t_bw_s)
- print("\n===== 单步决策结果 =====")
- print(f"模型选择的动作: {model_decide_result['action']}")
- print(f"模型选择的L_s: {model_decide_result['L_s']} 秒, 模型选择的t_bw_s: {model_decide_result['t_bw_s']} 秒")
- print(f"指令下发的L_s: {L_s} 秒, 指令下发的t_bw_s: {t_bw_s} 秒")
- print(f"指令对应的反洗次数: {execution_result['k_bw_per_ceb']}")
- print(f"指令对应的吨水电耗: {execution_result['ton_water_energy_kWh_per_m3']}")
- print(f"指令对应的回收率: {execution_result['recovery']}")
- print(f"指令对应的日均产水时间: {execution_result['daily_prod_time_h']}")
- print(f"指令对应的最高渗透率: {execution_result['max_permeability']}")
|