# UF_decide.py from dataclasses import dataclass import numpy as np @dataclass class UFParams: # —— 膜与运行参数 —— q_UF: float = 360.0 # 过滤进水流量(m^3/h) TMP0: float = 0.03 # 初始TMP(MPa) TMP_max: float = 0.06 # TMP硬上限(MPa) # —— 膜污染动力学 —— alpha: float = 1e-6 # TMP增长系数 belta: float = 1.1 # 幂指数 # —— 反洗参数(固定) —— q_bw_m3ph: float = 1000.0 # 物理反洗流量(m^3/h) # —— CEB参数(固定) —— T_ceb_interval_h: float = 48.0 # 固定每 k 小时做一次CEB v_ceb_m3: float = 30.0 # CEB用水体积(m^3) t_ceb_s: float = 40 * 60.0 # CEB时长(s) phi_ceb: float = 1.0 # CEB去除比例(简化:完全恢复到TMP0) # —— 约束与收敛 —— dTMP: float = 0.0005 # 单次产水结束时,相对TMP0最大升幅(MPa) # —— 搜索范围(秒) —— L_min_s: float = 3600.0 # 过滤时长下限(s) L_max_s: float = 4200.0 # 过滤时长上限(s) t_bw_min_s: float = 40.0 # 物洗时长下限(s) t_bw_max_s: float = 60.0 # 物洗时长上限(s) # —— 物理反洗恢复函数参数 —— phi_bw_min: float = 0.7 # 物洗去除比例最小值 phi_bw_max: float = 1.0 # 物洗去除比例最大值 L_ref_s: float = 4000.0 # 过滤时长影响时间尺度 tau_bw_s: float = 30.0 # 物洗时长影响时间尺度 gamma_t: float = 1.0 # 物洗时长作用指数 # —— 网格 —— L_step_s: float = 60.0 # 过滤时长步长(s) t_bw_step_s: float = 5.0 # 物洗时长步长(s) # 多目标加权及高TMP惩罚 w_rec: float = 0.8 # 回收率权重 w_rate: float = 0.2 # 净供水率权重 w_headroom: float = 0.3 # 贴边惩罚权重 r_headroom: float = 2.0 # 贴边惩罚幂次 headroom_hardcap: float = 0.98 # 超过此比例直接视为不可取 def _delta_tmp(p: UFParams, L_h: float) -> float: # 过滤时段TMP上升量 return float(p.alpha * (p.q_UF ** p.belta) * L_h) def _v_bw_m3(p: UFParams, t_bw_s: float) -> float: # 物理反洗水耗 return float(p.q_bw_m3ph * (float(t_bw_s) / 3600.0)) def phi_bw_of(p: UFParams, L_s: float, t_bw_s: float) -> float: # 物洗去除比例:随过滤时长增长上界收缩,随物洗时长增长趋饱和 L = max(float(L_s), 1.0) t = max(float(t_bw_s), 1e-6) upper_L = p.phi_bw_min + (p.phi_bw_max - p.phi_bw_min) * np.exp(- L / p.L_ref_s) time_gain = 1.0 - np.exp(- (t / p.tau_bw_s) ** p.gamma_t) phi = upper_L * time_gain return float(np.clip(phi, 0.0, 0.999)) def simulate_one_supercycle(p: UFParams, L_s: float, t_bw_s: float): """ 返回 (是否可行, 指标字典) - 支持动态CEB次数:48h固定间隔 - 增加日均产水时间和吨水电耗 """ L_h = float(L_s) / 3600.0 # 小周期过滤时间(h) tmp = p.TMP0 max_tmp_during_filtration = tmp max_residual_increase = 0.0 # 小周期总时长(h) t_small_cycle_h = (L_s + t_bw_s) / 3600.0 # 计算超级周期内CEB次数 k_bw_per_ceb = int(np.floor(p.T_ceb_interval_h / t_small_cycle_h)) if k_bw_per_ceb < 1: k_bw_per_ceb = 1 # 至少一个小周期 # ton水电耗查表 energy_lookup = { 3600: 0.1034, 3660: 0.1031, 3720: 0.1029, 3780: 0.1026, 3840: 0.1023, 3900: 0.1021, 3960: 0.1019, 4020: 0.1017, 4080: 0.1015, 4140: 0.1012, 4200: 0.1011 } for _ in range(k_bw_per_ceb): tmp_run_start = tmp # 过滤阶段TMP增长 dtmp = _delta_tmp(p, L_h) tmp_peak = tmp_run_start + dtmp # 约束1:峰值不得超过硬上限 if tmp_peak > p.TMP_max + 1e-12: return False, {"reason": "TMP_max violated during filtration", "TMP_peak": tmp_peak} if tmp_peak > max_tmp_during_filtration: max_tmp_during_filtration = tmp_peak # 物理反洗 phi = phi_bw_of(p, L_s, t_bw_s) tmp_after_bw = tmp_peak - phi * (tmp_peak - tmp_run_start) # 约束2:单次残余增量控制 residual_inc = tmp_after_bw - tmp_run_start if residual_inc > p.dTMP + 1e-12: return False, { "reason": "residual TMP increase after BW exceeded dTMP", "residual_increase": residual_inc, "limit_dTMP": p.dTMP } if residual_inc > max_residual_increase: max_residual_increase = residual_inc tmp = tmp_after_bw # CEB tmp_after_ceb = p.TMP0 # 体积与回收率 V_feed_super = k_bw_per_ceb * p.q_UF * L_h V_loss_super = k_bw_per_ceb * _v_bw_m3(p, t_bw_s) + p.v_ceb_m3 V_net = max(0.0, V_feed_super - V_loss_super) recovery = max(0.0, V_net / max(V_feed_super, 1e-12)) # 时间与净供水率 T_super_h = k_bw_per_ceb * (L_s + t_bw_s) / 3600.0 + p.t_ceb_s / 3600.0 net_delivery_rate_m3ph = V_net / max(T_super_h, 1e-12) # 贴边比例与硬限 headroom_ratio = max_tmp_during_filtration / max(p.TMP_max, 1e-12) if headroom_ratio > p.headroom_hardcap + 1e-12: return False, {"reason": "headroom hardcap exceeded", "headroom_ratio": headroom_ratio} # —— 新增指标 1:日均产水时间(h/d) —— daily_prod_time_h = k_bw_per_ceb * L_h / T_super_h * 24.0 # —— 新增指标 2:吨水电耗(kWh/m³) —— closest_L = min(energy_lookup.keys(), key=lambda x: abs(x - L_s)) ton_water_energy = energy_lookup[closest_L] info = { "recovery": recovery, "V_feed_super_m3": V_feed_super, "V_loss_super_m3": V_loss_super, "V_net_super_m3": V_net, "supercycle_time_h": T_super_h, "net_delivery_rate_m3ph": net_delivery_rate_m3ph, "max_TMP_during_filtration": max_tmp_during_filtration, "max_residual_increase_per_run": max_residual_increase, "phi_bw_effective": phi, "TMP_after_ceb": tmp_after_ceb, "headroom_ratio": headroom_ratio, "daily_prod_time_h": daily_prod_time_h, "ton_water_energy_kWh_per_m3": ton_water_energy, "k_bw_per_ceb": k_bw_per_ceb } return True, info def _score(p: UFParams, rec: dict) -> float: """综合评分:越大越好。不同TMP0会改变max_TMP→改变惩罚→得到不同解。""" # 无量纲化净供水率 rate_norm = rec["net_delivery_rate_m3ph"] / max(p.q_UF, 1e-12) headroom_penalty = (rec["max_TMP_during_filtration"] / max(p.TMP_max, 1e-12)) ** p.r_headroom return (p.w_rec * rec["recovery"] + p.w_rate * rate_norm - p.w_headroom * headroom_penalty) def optimize_2d(p: UFParams, L_min_s=None, L_max_s=None, L_step_s=None, t_bw_min_s=None, t_bw_max_s=None, t_bw_step_s=None): # 网格生成 L_lo = p.L_min_s if L_min_s is None else float(L_min_s) L_hi = p.L_max_s if L_max_s is None else float(L_max_s) L_st = p.L_step_s if L_step_s is None else float(L_step_s) t_lo = p.t_bw_min_s if t_bw_min_s is None else float(t_bw_min_s) t_hi = p.t_bw_max_s if t_bw_max_s is None else float(t_bw_max_s) t_st = p.t_bw_step_s if t_bw_step_s is None else float(t_bw_step_s) L_vals = np.arange(L_lo, L_hi + 1e-9, L_st) t_vals = np.arange(t_lo, t_hi + 1e-9, t_st) best = None best_score = -np.inf for L_s in L_vals: for t_bw_s in t_vals: feasible, info = simulate_one_supercycle(p, L_s, t_bw_s) if not feasible: continue rec = {"L_s": float(L_s), "t_bw_s": float(t_bw_s)} rec.update(info) score = _score(p, rec) if score > best_score + 1e-14: best_score = score best = rec.copy() best["score"] = float(score) # 若分数相同,偏好回收率更高,再偏好净供水率更高 elif abs(score - best_score) <= 1e-14: if (rec["recovery"] > best["recovery"] + 1e-12) or ( abs(rec["recovery"] - best["recovery"]) <= 1e-12 and rec["net_delivery_rate_m3ph"] > best["net_delivery_rate_m3ph"] + 1e-12 ): best = rec.copy() best["score"] = float(score) if best is None: return {"status": "no-feasible-solution"} best["status"] = "feasible" return best def run_uf_decision(TMP0: float = None) -> dict: if TMP0 is None: rng = np.random.default_rng() TMP0 = rng.uniform(0.03, 0.04) # 初始TMP随机 params = UFParams( q_UF=360.0, TMP_max=0.05, alpha=1.2e-6, belta=1.0, q_bw_m3ph=1000.0, T_ceb_interval_h=48, v_ceb_m3=30.0, t_ceb_s=40*60.0, phi_ceb=1.0, dTMP=0.001, L_min_s=3600.0, L_max_s=4200.0, L_step_s=30.0, t_bw_min_s=90.0, t_bw_max_s=100.0, t_bw_step_s=2.0, phi_bw_min=0.70, phi_bw_max=1.00, L_ref_s=500.0, tau_bw_s=40.0, gamma_t=1.0, TMP0=TMP0, w_rec=0.7, w_rate=0.3, w_headroom=0.3, r_headroom=2.0, headroom_hardcap=0.9 ) result = optimize_2d(params) if result.get("status") == "feasible": return { "L_s": result["L_s"], "t_bw_s": result["t_bw_s"], "recovery": result["recovery"], "k_bw_per_ceb": result["k_bw_per_ceb"], "daily_prod_time_h": result["daily_prod_time_h"], "ton_water_energy_kWh_per_m3": result["ton_water_energy_kWh_per_m3"] } # 若没有可行解,返回最小过滤时间和默认值 return { "L_s": params.L_min_s, "t_bw_s": params.t_bw_min_s, "recovery": 0.0, "k_bw_per_ceb": 1, "daily_prod_time_h": 0.0, "ton_water_energy_kWh_per_m3": 0.0 } def generate_plc_instructions(current_L_s, current_t_bw_s, model_prev_L_s, model_prev_t_bw_s, model_L_s, model_t_bw_s): """ 根据工厂当前值、模型上一轮决策值和模型当前轮决策值,生成PLC指令。 新增功能: 1. 处理None值情况:如果模型上一轮值为None,则使用工厂当前值; 如果工厂当前值也为None,则返回None并提示错误。 """ # 参数配置保持不变 params = UFParams( L_min_s=3600.0, L_max_s=6000.0, L_step_s=60.0, t_bw_min_s=40.0, t_bw_max_s=60.0, t_bw_step_s=5.0, ) # 参数解包 L_step_s = params.L_step_s t_bw_step_s = params.t_bw_step_s L_min_s = params.L_min_s L_max_s = params.L_max_s t_bw_min_s = params.t_bw_min_s t_bw_max_s = params.t_bw_max_s adjustment_threshold = 1.0 # 处理None值情况 if model_prev_L_s is None: if current_L_s is None: print("错误: 过滤时长的工厂当前值和模型上一轮值均为None") return None, None else: # 使用工厂当前值作为基准 effective_current_L = current_L_s source_L = "工厂当前值(模型上一轮值为None)" else: # 模型上一轮值不为None,继续检查工厂当前值 if current_L_s is None: effective_current_L = model_prev_L_s source_L = "模型上一轮值(工厂当前值为None)" else: # 两个值都不为None,比较哪个更接近模型当前建议值 current_to_model_diff = abs(current_L_s - model_L_s) prev_to_model_diff = abs(model_prev_L_s - model_L_s) if current_to_model_diff <= prev_to_model_diff: effective_current_L = current_L_s source_L = "工厂当前值" else: effective_current_L = model_prev_L_s source_L = "模型上一轮值" # 对反洗时长进行同样的处理 if model_prev_t_bw_s is None: if current_t_bw_s is None: print("错误: 反洗时长的工厂当前值和模型上一轮值均为None") return None, None else: effective_current_t_bw = current_t_bw_s source_t_bw = "工厂当前值(模型上一轮值为None)" else: if current_t_bw_s is None: effective_current_t_bw = model_prev_t_bw_s source_t_bw = "模型上一轮值(工厂当前值为None)" else: current_to_model_t_bw_diff = abs(current_t_bw_s - model_t_bw_s) prev_to_model_t_bw_diff = abs(model_prev_t_bw_s - model_t_bw_s) if current_to_model_t_bw_diff <= prev_to_model_t_bw_diff: effective_current_t_bw = current_t_bw_s source_t_bw = "工厂当前值" else: effective_current_t_bw = model_prev_t_bw_s source_t_bw = "模型上一轮值" # 检测所有输入值是否在规定范围内(只对非None值进行检查) # 工厂当前值检查(警告) if current_L_s is not None and not (L_min_s <= current_L_s <= L_max_s): print(f"警告: 当前过滤时长 {current_L_s} 秒不在允许范围内 [{L_min_s}, {L_max_s}]") if current_t_bw_s is not None and not (t_bw_min_s <= current_t_bw_s <= t_bw_max_s): print(f"警告: 当前反洗时长 {current_t_bw_s} 秒不在允许范围内 [{t_bw_min_s}, {t_bw_max_s}]") # 模型上一轮决策值检查(警告) if model_prev_L_s is not None and not (L_min_s <= model_prev_L_s <= L_max_s): print(f"警告: 模型上一轮过滤时长 {model_prev_L_s} 秒不在允许范围内 [{L_min_s}, {L_max_s}]") if model_prev_t_bw_s is not None and not (t_bw_min_s <= model_prev_t_bw_s <= t_bw_max_s): print(f"警告: 模型上一轮反洗时长 {model_prev_t_bw_s} 秒不在允许范围内 [{t_bw_min_s}, {t_bw_max_s}]") # 模型当前轮决策值检查(错误) if model_L_s is None: raise ValueError("错误: 决策模型建议的过滤时长不能为None") elif not (L_min_s <= model_L_s <= L_max_s): raise ValueError(f"错误: 决策模型建议的过滤时长 {model_L_s} 秒不在允许范围内 [{L_min_s}, {L_max_s}]") if model_t_bw_s is None: raise ValueError("错误: 决策模型建议的反洗时长不能为None") elif not (t_bw_min_s <= model_t_bw_s <= t_bw_max_s): raise ValueError(f"错误: 决策模型建议的反洗时长 {model_t_bw_s} 秒不在允许范围内 [{t_bw_min_s}, {t_bw_max_s}]") print(f"过滤时长基准: {source_L}, 值: {effective_current_L}") print(f"反洗时长基准: {source_t_bw}, 值: {effective_current_t_bw}") # 使用选定的基准值进行计算调整 L_diff = model_L_s - effective_current_L L_adjustment = 0 if abs(L_diff) > adjustment_threshold * L_step_s: if L_diff > 0: L_adjustment = L_step_s else: L_adjustment = -L_step_s next_L_s = effective_current_L + L_adjustment t_bw_diff = model_t_bw_s - effective_current_t_bw t_bw_adjustment = 0 if abs(t_bw_diff) > adjustment_threshold * t_bw_step_s: if t_bw_diff > 0: t_bw_adjustment = t_bw_step_s else: t_bw_adjustment = -t_bw_step_s next_t_bw_s = effective_current_t_bw + t_bw_adjustment return next_L_s, next_t_bw_s current_L_s = 3920 current_t_bw_s = 98 model_prev_L_s = None model_prev_t_bw_s = None model_L_s = 4160 model_t_bw_s = 96 next_L_s, next_t_bw_s = generate_plc_instructions(current_L_s, current_t_bw_s, model_prev_L_s, model_prev_t_bw_s, model_L_s, model_t_bw_s) print(f"next_L_s={next_L_s}, next_t_bw_s={next_t_bw_s}")