""" UF-RL 超滤控制模型适配器。复刻 loop_main.py 的事件驱动逻辑, 使用新版 run_dqn_decide.py 的函数接口。 """ import json import logging import os import sys import threading import time from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Optional from core.config import PipelineConfig from core.data_provider import DataProvider from core.shared_state import SharedState logger = logging.getLogger(__name__) DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" def _build_device_payload(unit_name: str, project_id: int) -> dict: """根据机组名构建 PLC payload 配置(对应 config.json 中每个 device 的字段)。""" return { "name": unit_name, "press_pv_item": f"C.M.{unit_name}_DB@press_PV", "control_payload": { "deviceId": "1", "deviceItems": f"C.M.{unit_name}_DB@word_control", "deviceName": f"{unit_name}_control_word", "project_id": project_id, }, "target_payload": { "deviceId": "1", "deviceItems": f"{unit_name}_BW_After_TMP", "deviceName": f"{unit_name}_backwash_pressure_diff", "project_id": project_id, }, "production_time_payload": { "deviceId": "1", "deviceItems": f"C.M.{unit_name}_DB@time_production", "deviceName": f"{unit_name}过滤时长", "project_id": project_id, }, "backwashing_payload": { "deviceId": "1", "deviceItems": f"C.M.{unit_name}_DB@time_BW_SP", "deviceName": f"{unit_name}反洗时长", "project_id": project_id, }, "ceb_payload": { "deviceId": "1", "deviceItems": f"C.M.{unit_name}_DB@cycle_sp", "deviceName": f"{unit_name}CEB次数设定", "project_id": project_id, }, } class UFAdapter: def __init__( self, plant: str, data_provider: DataProvider, shared_state: SharedState, config: PipelineConfig, ): self.plant = plant self.data_provider = data_provider self.dry_run = config.dry_run self.shared_state = shared_state self.config = config self.project_id = config.project_id self.poll_interval = config.uf_poll_interval self.trigger_value = config.uf_trigger_value # 导入新版 UF 模型(从 core/models/ 加载) core_models = Path(__file__).resolve().parents[1] / "models" uf_decide_dir = str(core_models / "uf-rl" / "DQN" / "uf_decide") env_dir = str(core_models / "uf-rl") if env_dir not in sys.path: sys.path.insert(0, env_dir) if uf_decide_dir not in sys.path: sys.path.insert(0, uf_decide_dir) from run_dqn_decide import ( build_physics, calc_uf_cycle_metrics, check_state_bounds, generate_plc_instructions, run_dqn_decide, ) from env.env_config_loader import create_env_params_from_yaml self._build_physics = build_physics self._run_dqn_decide = run_dqn_decide self._check_state_bounds = check_state_bounds self._generate_plc_instructions = generate_plc_instructions self._calc_uf_cycle_metrics = calc_uf_cycle_metrics # 加载 env_config.yaml uf_state_default, phys_params, action_spec, reward_params, state_bounds = ( create_env_params_from_yaml(config.uf_env_config_path) ) self.physics = build_physics(config.uf_is_times, phys_params, state_bounds) self.action_spec = action_spec self.reward_params = reward_params self.state_bounds = state_bounds self.model_path = config.uf_model_path # 机组设备配置 self._device_map: Dict[str, dict] = { u: _build_device_payload(u, self.project_id) for u in config.uf_units } # 状态持久化 self._state_file = str( Path(__file__).resolve().parents[1] / f"uf_states_{plant}.json" ) self._state_lock = threading.Lock() self._unit_states: Dict[str, dict] = self._load_states() # ==================== 状态持久化 ==================== def _load_states(self) -> dict: if not os.path.exists(self._state_file): return {} try: with open(self._state_file, "r", encoding="utf-8") as f: return json.load(f) except Exception: return {} def _save_state(self, unit_name: str, state: dict): with self._state_lock: all_states = self._load_states() all_states[unit_name] = state with open(self._state_file, "w", encoding="utf-8") as f: json.dump(all_states, f, indent=2, ensure_ascii=False) # ==================== 事件驱动主循环 ==================== def run_unit_forever(self, unit_name: str): """单个 UF 机组的事件驱动循环,复刻 loop_main.py 的 monitor_device 逻辑。""" threading.current_thread().name = unit_name device = self._device_map[unit_name] logger.info(f"[{unit_name}] 监控线程启动") state = self._unit_states.get(unit_name, {}) model_prev_L_s = state.get("model_prev_L_s") model_prev_t_bw_s = state.get("model_prev_t_bw_s") last_cycle_end_time = None if state.get("last_cycle_end_time"): try: last_cycle_end_time = datetime.strptime( state["last_cycle_end_time"], DATETIME_FORMAT ) except ValueError: pass while True: try: self._run_one_cycle( unit_name, device, model_prev_L_s, model_prev_t_bw_s, last_cycle_end_time ) # cycle 完成后读取最新状态 st = self._unit_states.get(unit_name, {}) model_prev_L_s = st.get("model_prev_L_s") model_prev_t_bw_s = st.get("model_prev_t_bw_s") lct = st.get("last_cycle_end_time") last_cycle_end_time = ( datetime.strptime(lct, DATETIME_FORMAT) if lct else None ) except Exception as e: logger.error(f"[{unit_name}] 循环异常: {e}", exc_info=True) time.sleep(60) def _run_one_cycle( self, unit_name, device, model_prev_L_s, model_prev_t_bw_s, last_cycle_end_time ): # --- 阶段 1: 等控制字 == 95 --- logger.info(f"[{unit_name}] 等待控制字 == {self.trigger_value}") while True: val = self.data_provider.query_point_current(device["control_payload"]) if val is not None and int(val) == self.trigger_value: logger.info(f"[{unit_name}] 触发条件满足") break time.sleep(self.poll_interval) # --- 阶段 1.5: 等控制字 == 26 --- while True: val = self.data_provider.query_point_current(device["control_payload"]) if val is not None and int(val) == 26: logger.info(f"[{unit_name}] 控制字变为 26,开始采集") break time.sleep(self.poll_interval) # --- 阶段 2: 采集 TMP --- collected = [] start_t = datetime.now() duration = timedelta(minutes=10) has_valid = False while datetime.now() - start_t < duration: tmp_val = self.data_provider.query_point_current(device["target_payload"]) ctrl_val = self.data_provider.query_point_current(device["control_payload"]) if ctrl_val is not None: ctrl_int = int(ctrl_val) if ctrl_int == self.trigger_value: logger.info(f"[{unit_name}] 控制字变回 95,重置") return if 22 <= ctrl_int <= 26: has_valid = True if tmp_val is not None: collected.append(tmp_val) time.sleep(self.poll_interval) # 10 分钟无有效数据则延长到 30 分钟 elapsed = datetime.now() - start_t if elapsed >= timedelta(minutes=10) and not has_valid and duration < timedelta(minutes=30): logger.info(f"[{unit_name}] 10 分钟无有效数据,延长到 30 分钟") duration = timedelta(minutes=30) if not has_valid or not collected: logger.warning(f"[{unit_name}] 未采集到有效 TMP 数据,跳过本轮") return avg_tmp = sum(collected) / len(collected) logger.info(f"[{unit_name}] 采集 {len(collected)} 点, 平均 TMP={avg_tmp:.4f}") # --- 阶段 3: 决策 --- # 读取诊断状态(非阻塞,仅标注) diag = self.shared_state.read("diagnosis_results", max_age_seconds=2400) diag_status = diag.get("status", "unknown") if diag else "no_data" logger.info(f"[{unit_name}] 当前诊断状态: {diag_status}") # 构建状态对象 from env.env_params import UFState current_state = UFState(TMP=avg_tmp) # 异常接口:check_state_bounds error_result = self._check_state_bounds(current_state, self.state_bounds, unit_name) if error_result: self.shared_state.publish("uf_anomalies", {"unit": unit_name, **error_result}) logger.warning(f"[{unit_name}] 输入状态越界: {error_result}") # DQN 决策 action_id, model_L_s, model_t_bw_s = self._run_dqn_decide( self.model_path, self.physics, self.action_spec, self.reward_params, self.state_bounds, current_state, ) # 读取当前 PLC 参数 prod_time = self.data_provider.query_point_current( device["production_time_payload"] ) or 3800 bw_time = self.data_provider.query_point_current( device["backwashing_payload"] ) or 100 bw_per_ceb = self.data_provider.query_point_current( device["ceb_payload"] ) or 40 # 生成 PLC 指令 L_s, t_bw_s = self._generate_plc_instructions( self.action_spec, prod_time, bw_time, model_prev_L_s, model_prev_t_bw_s, model_L_s, model_t_bw_s, ) # 计算运行指标 metrics = self._calc_uf_cycle_metrics( current_state, None, None, L_s, t_bw_s, self.physics ) # --- 回调 + PLC 下发 --- ceb_freq = int(metrics.get("k_bw_per_ceb", bw_per_ceb)) now_str = datetime.now().strftime(DATETIME_FORMAT) # dry_run 模式:只记录决策,不下发 PLC if self.dry_run: logger.info( f"[{unit_name}] [DRY-RUN] 决策结果: L_s={int(L_s)}, t_bw_s={int(t_bw_s)}, " f"ceb_freq={ceb_freq}, avg_tmp={avg_tmp:.4f}, " f"recovery={metrics.get('recovery', 0):.3f}" ) else: use_model = self.data_provider.send_callback( type_name=unit_name, water_production_time=int(L_s), physical_backwash=int(t_bw_s), ceb_backwash_frequency=ceb_freq, duration_system=int(prod_time), tmp_action=avg_tmp, recovery_rate=metrics.get("recovery", 0), ton_water_energy_kWh=metrics.get("ton_water_energy_kWh_per_m3", 0), max_permeability=metrics.get("max_permeability", 0), daily_prod_time_h=metrics.get("daily_prod_time_h", 0), ctime=now_str, ) if use_model == 1: if int(prod_time) != int(L_s): self.data_provider.send_plc_update( unit_name, device["production_time_payload"]["deviceItems"], str(prod_time), str(int(L_s)), 1, ) if int(bw_time) != int(t_bw_s): self.data_provider.send_plc_update( unit_name, device["backwashing_payload"]["deviceItems"], str(bw_time), str(int(t_bw_s)), 4, ) if int(bw_per_ceb) != ceb_freq: self.data_provider.send_plc_update( unit_name, device["ceb_payload"]["deviceItems"], str(bw_per_ceb), str(ceb_freq), 2, ) # 发布决策到共享状态 self.shared_state.publish("uf_decisions", { "timestamp": now_str, "unit": unit_name, "plant": self.plant, "L_s": L_s, "t_bw_s": t_bw_s, "avg_tmp": avg_tmp, "metrics": metrics, "dry_run": self.dry_run, }) # 保存状态 self._save_state(unit_name, { "model_prev_L_s": L_s, "model_prev_t_bw_s": t_bw_s, "last_cycle_end_time": now_str, }) self._unit_states[unit_name] = { "model_prev_L_s": L_s, "model_prev_t_bw_s": t_bw_s, "last_cycle_end_time": now_str, } # --- 阶段 4: 等控制字回到 95 --- time.sleep(5) while True: val = self.data_provider.query_point_current(device["control_payload"]) if val is not None and int(val) == self.trigger_value: logger.info(f"[{unit_name}] 周期结束") break time.sleep(self.poll_interval)