| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370 |
- """
- UF-RL 超滤控制模型适配器。复刻 loop_main.py 的事件驱动逻辑,
- 使用新版 run_dqn_decide.py 的函数接口。
- """
- import json
- import logging
- import os
- import sys
- import threading
- import time
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import Dict, Optional
- from core.config import PipelineConfig
- from core.data_provider import DataProvider
- from core.shared_state import SharedState
- logger = logging.getLogger(__name__)
- DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
- def _build_device_payload(unit_name: str, project_id: int) -> dict:
- """根据机组名构建 PLC payload 配置(对应 config.json 中每个 device 的字段)。"""
- return {
- "name": unit_name,
- "press_pv_item": f"C.M.{unit_name}_DB@press_PV",
- "control_payload": {
- "deviceId": "1",
- "deviceItems": f"C.M.{unit_name}_DB@word_control",
- "deviceName": f"{unit_name}_control_word",
- "project_id": project_id,
- },
- "target_payload": {
- "deviceId": "1",
- "deviceItems": f"{unit_name}_BW_After_TMP",
- "deviceName": f"{unit_name}_backwash_pressure_diff",
- "project_id": project_id,
- },
- "production_time_payload": {
- "deviceId": "1",
- "deviceItems": f"C.M.{unit_name}_DB@time_production",
- "deviceName": f"{unit_name}过滤时长",
- "project_id": project_id,
- },
- "backwashing_payload": {
- "deviceId": "1",
- "deviceItems": f"C.M.{unit_name}_DB@time_BW_SP",
- "deviceName": f"{unit_name}反洗时长",
- "project_id": project_id,
- },
- "ceb_payload": {
- "deviceId": "1",
- "deviceItems": f"C.M.{unit_name}_DB@cycle_sp",
- "deviceName": f"{unit_name}CEB次数设定",
- "project_id": project_id,
- },
- }
- class UFAdapter:
- def __init__(
- self,
- plant: str,
- data_provider: DataProvider,
- shared_state: SharedState,
- config: PipelineConfig,
- ):
- self.plant = plant
- self.data_provider = data_provider
- self.dry_run = config.dry_run
- self.shared_state = shared_state
- self.config = config
- self.project_id = config.project_id
- self.poll_interval = config.uf_poll_interval
- self.trigger_value = config.uf_trigger_value
- # 导入新版 UF 模型(从 core/models/ 加载)
- core_models = Path(__file__).resolve().parents[1] / "models"
- uf_decide_dir = str(core_models / "uf-rl" / "DQN" / "uf_decide")
- env_dir = str(core_models / "uf-rl")
- if env_dir not in sys.path:
- sys.path.insert(0, env_dir)
- if uf_decide_dir not in sys.path:
- sys.path.insert(0, uf_decide_dir)
- from run_dqn_decide import (
- build_physics,
- calc_uf_cycle_metrics,
- check_state_bounds,
- generate_plc_instructions,
- run_dqn_decide,
- )
- from env.env_config_loader import create_env_params_from_yaml
- self._build_physics = build_physics
- self._run_dqn_decide = run_dqn_decide
- self._check_state_bounds = check_state_bounds
- self._generate_plc_instructions = generate_plc_instructions
- self._calc_uf_cycle_metrics = calc_uf_cycle_metrics
- # 加载 env_config.yaml
- uf_state_default, phys_params, action_spec, reward_params, state_bounds = (
- create_env_params_from_yaml(config.uf_env_config_path)
- )
- self.physics = build_physics(config.uf_is_times, phys_params, state_bounds)
- self.action_spec = action_spec
- self.reward_params = reward_params
- self.state_bounds = state_bounds
- self.model_path = config.uf_model_path
- # 机组设备配置
- self._device_map: Dict[str, dict] = {
- u: _build_device_payload(u, self.project_id) for u in config.uf_units
- }
- # 状态持久化
- self._state_file = str(
- Path(__file__).resolve().parents[1] / f"uf_states_{plant}.json"
- )
- self._state_lock = threading.Lock()
- self._unit_states: Dict[str, dict] = self._load_states()
- # ==================== 状态持久化 ====================
- def _load_states(self) -> dict:
- if not os.path.exists(self._state_file):
- return {}
- try:
- with open(self._state_file, "r", encoding="utf-8") as f:
- return json.load(f)
- except Exception:
- return {}
- def _save_state(self, unit_name: str, state: dict):
- with self._state_lock:
- all_states = self._load_states()
- all_states[unit_name] = state
- with open(self._state_file, "w", encoding="utf-8") as f:
- json.dump(all_states, f, indent=2, ensure_ascii=False)
- # ==================== 事件驱动主循环 ====================
- def run_unit_forever(self, unit_name: str):
- """单个 UF 机组的事件驱动循环,复刻 loop_main.py 的 monitor_device 逻辑。"""
- threading.current_thread().name = unit_name
- device = self._device_map[unit_name]
- logger.info(f"[{unit_name}] 监控线程启动")
- state = self._unit_states.get(unit_name, {})
- model_prev_L_s = state.get("model_prev_L_s")
- model_prev_t_bw_s = state.get("model_prev_t_bw_s")
- last_cycle_end_time = None
- if state.get("last_cycle_end_time"):
- try:
- last_cycle_end_time = datetime.strptime(
- state["last_cycle_end_time"], DATETIME_FORMAT
- )
- except ValueError:
- pass
- while True:
- try:
- self._run_one_cycle(
- unit_name, device, model_prev_L_s, model_prev_t_bw_s, last_cycle_end_time
- )
- # cycle 完成后读取最新状态
- st = self._unit_states.get(unit_name, {})
- model_prev_L_s = st.get("model_prev_L_s")
- model_prev_t_bw_s = st.get("model_prev_t_bw_s")
- lct = st.get("last_cycle_end_time")
- last_cycle_end_time = (
- datetime.strptime(lct, DATETIME_FORMAT) if lct else None
- )
- except Exception as e:
- logger.error(f"[{unit_name}] 循环异常: {e}", exc_info=True)
- time.sleep(60)
- def _run_one_cycle(
- self, unit_name, device, model_prev_L_s, model_prev_t_bw_s, last_cycle_end_time
- ):
- # --- 阶段 1: 等控制字 == 95 ---
- logger.info(f"[{unit_name}] 等待控制字 == {self.trigger_value}")
- while True:
- val = self.data_provider.query_point_current(device["control_payload"])
- if val is not None and int(val) == self.trigger_value:
- logger.info(f"[{unit_name}] 触发条件满足")
- break
- time.sleep(self.poll_interval)
- # --- 阶段 1.5: 等控制字 == 26 ---
- while True:
- val = self.data_provider.query_point_current(device["control_payload"])
- if val is not None and int(val) == 26:
- logger.info(f"[{unit_name}] 控制字变为 26,开始采集")
- break
- time.sleep(self.poll_interval)
- # --- 阶段 2: 采集 TMP ---
- collected = []
- start_t = datetime.now()
- duration = timedelta(minutes=10)
- has_valid = False
- while datetime.now() - start_t < duration:
- tmp_val = self.data_provider.query_point_current(device["target_payload"])
- ctrl_val = self.data_provider.query_point_current(device["control_payload"])
- if ctrl_val is not None:
- ctrl_int = int(ctrl_val)
- if ctrl_int == self.trigger_value:
- logger.info(f"[{unit_name}] 控制字变回 95,重置")
- return
- if 22 <= ctrl_int <= 26:
- has_valid = True
- if tmp_val is not None:
- collected.append(tmp_val)
- time.sleep(self.poll_interval)
- # 10 分钟无有效数据则延长到 30 分钟
- elapsed = datetime.now() - start_t
- if elapsed >= timedelta(minutes=10) and not has_valid and duration < timedelta(minutes=30):
- logger.info(f"[{unit_name}] 10 分钟无有效数据,延长到 30 分钟")
- duration = timedelta(minutes=30)
- if not has_valid or not collected:
- logger.warning(f"[{unit_name}] 未采集到有效 TMP 数据,跳过本轮")
- return
- avg_tmp = sum(collected) / len(collected)
- logger.info(f"[{unit_name}] 采集 {len(collected)} 点, 平均 TMP={avg_tmp:.4f}")
- # --- 阶段 3: 决策 ---
- # 读取诊断状态(非阻塞,仅标注)
- diag = self.shared_state.read("diagnosis_results", max_age_seconds=2400)
- diag_status = diag.get("status", "unknown") if diag else "no_data"
- logger.info(f"[{unit_name}] 当前诊断状态: {diag_status}")
- # 构建状态对象
- from env.env_params import UFState
- current_state = UFState(TMP=avg_tmp)
- # 异常接口:check_state_bounds
- error_result = self._check_state_bounds(current_state, self.state_bounds, unit_name)
- if error_result:
- self.shared_state.publish("uf_anomalies", {"unit": unit_name, **error_result})
- logger.warning(f"[{unit_name}] 输入状态越界: {error_result}")
- # DQN 决策
- action_id, model_L_s, model_t_bw_s = self._run_dqn_decide(
- self.model_path,
- self.physics,
- self.action_spec,
- self.reward_params,
- self.state_bounds,
- current_state,
- )
- # 读取当前 PLC 参数
- prod_time = self.data_provider.query_point_current(
- device["production_time_payload"]
- ) or 3800
- bw_time = self.data_provider.query_point_current(
- device["backwashing_payload"]
- ) or 100
- bw_per_ceb = self.data_provider.query_point_current(
- device["ceb_payload"]
- ) or 40
- # 生成 PLC 指令
- L_s, t_bw_s = self._generate_plc_instructions(
- self.action_spec,
- prod_time,
- bw_time,
- model_prev_L_s,
- model_prev_t_bw_s,
- model_L_s,
- model_t_bw_s,
- )
- # 计算运行指标
- metrics = self._calc_uf_cycle_metrics(
- current_state, None, None, L_s, t_bw_s, self.physics
- )
- # --- 回调 + PLC 下发 ---
- ceb_freq = int(metrics.get("k_bw_per_ceb", bw_per_ceb))
- now_str = datetime.now().strftime(DATETIME_FORMAT)
- # dry_run 模式:只记录决策,不下发 PLC
- if self.dry_run:
- logger.info(
- f"[{unit_name}] [DRY-RUN] 决策结果: L_s={int(L_s)}, t_bw_s={int(t_bw_s)}, "
- f"ceb_freq={ceb_freq}, avg_tmp={avg_tmp:.4f}, "
- f"recovery={metrics.get('recovery', 0):.3f}"
- )
- else:
- use_model = self.data_provider.send_callback(
- type_name=unit_name,
- water_production_time=int(L_s),
- physical_backwash=int(t_bw_s),
- ceb_backwash_frequency=ceb_freq,
- duration_system=int(prod_time),
- tmp_action=avg_tmp,
- recovery_rate=metrics.get("recovery", 0),
- ton_water_energy_kWh=metrics.get("ton_water_energy_kWh_per_m3", 0),
- max_permeability=metrics.get("max_permeability", 0),
- daily_prod_time_h=metrics.get("daily_prod_time_h", 0),
- ctime=now_str,
- )
- if use_model == 1:
- if int(prod_time) != int(L_s):
- self.data_provider.send_plc_update(
- unit_name,
- device["production_time_payload"]["deviceItems"],
- str(prod_time),
- str(int(L_s)),
- 1,
- )
- if int(bw_time) != int(t_bw_s):
- self.data_provider.send_plc_update(
- unit_name,
- device["backwashing_payload"]["deviceItems"],
- str(bw_time),
- str(int(t_bw_s)),
- 4,
- )
- if int(bw_per_ceb) != ceb_freq:
- self.data_provider.send_plc_update(
- unit_name,
- device["ceb_payload"]["deviceItems"],
- str(bw_per_ceb),
- str(ceb_freq),
- 2,
- )
- # 发布决策到共享状态
- self.shared_state.publish("uf_decisions", {
- "timestamp": now_str,
- "unit": unit_name,
- "plant": self.plant,
- "L_s": L_s,
- "t_bw_s": t_bw_s,
- "avg_tmp": avg_tmp,
- "metrics": metrics,
- "dry_run": self.dry_run,
- })
- # 保存状态
- self._save_state(unit_name, {
- "model_prev_L_s": L_s,
- "model_prev_t_bw_s": t_bw_s,
- "last_cycle_end_time": now_str,
- })
- self._unit_states[unit_name] = {
- "model_prev_L_s": L_s,
- "model_prev_t_bw_s": t_bw_s,
- "last_cycle_end_time": now_str,
- }
- # --- 阶段 4: 等控制字回到 95 ---
- time.sleep(5)
- while True:
- val = self.data_provider.query_point_current(device["control_payload"])
- if val is not None and int(val) == self.trigger_value:
- logger.info(f"[{unit_name}] 周期结束")
- break
- time.sleep(self.poll_interval)
|