uf_adapter.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. """
  2. UF-RL 超滤控制模型适配器。复刻 loop_main.py 的事件驱动逻辑,
  3. 使用新版 run_dqn_decide.py 的函数接口。
  4. """
  5. import json
  6. import logging
  7. import os
  8. import sys
  9. import threading
  10. import time
  11. from datetime import datetime, timedelta
  12. from pathlib import Path
  13. from typing import Dict, Optional
  14. from core.config import PipelineConfig
  15. from core.data_provider import DataProvider
  16. from core.shared_state import SharedState
  17. logger = logging.getLogger(__name__)
  18. DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
  19. def _build_device_payload(unit_name: str, project_id: int) -> dict:
  20. """根据机组名构建 PLC payload 配置(对应 config.json 中每个 device 的字段)。"""
  21. return {
  22. "name": unit_name,
  23. "press_pv_item": f"C.M.{unit_name}_DB@press_PV",
  24. "control_payload": {
  25. "deviceId": "1",
  26. "deviceItems": f"C.M.{unit_name}_DB@word_control",
  27. "deviceName": f"{unit_name}_control_word",
  28. "project_id": project_id,
  29. },
  30. "target_payload": {
  31. "deviceId": "1",
  32. "deviceItems": f"{unit_name}_BW_After_TMP",
  33. "deviceName": f"{unit_name}_backwash_pressure_diff",
  34. "project_id": project_id,
  35. },
  36. "production_time_payload": {
  37. "deviceId": "1",
  38. "deviceItems": f"C.M.{unit_name}_DB@time_production",
  39. "deviceName": f"{unit_name}过滤时长",
  40. "project_id": project_id,
  41. },
  42. "backwashing_payload": {
  43. "deviceId": "1",
  44. "deviceItems": f"C.M.{unit_name}_DB@time_BW_SP",
  45. "deviceName": f"{unit_name}反洗时长",
  46. "project_id": project_id,
  47. },
  48. "ceb_payload": {
  49. "deviceId": "1",
  50. "deviceItems": f"C.M.{unit_name}_DB@cycle_sp",
  51. "deviceName": f"{unit_name}CEB次数设定",
  52. "project_id": project_id,
  53. },
  54. }
  55. class UFAdapter:
  56. def __init__(
  57. self,
  58. plant: str,
  59. data_provider: DataProvider,
  60. shared_state: SharedState,
  61. config: PipelineConfig,
  62. ):
  63. self.plant = plant
  64. self.data_provider = data_provider
  65. self.dry_run = config.dry_run
  66. self.shared_state = shared_state
  67. self.config = config
  68. self.project_id = config.project_id
  69. self.poll_interval = config.uf_poll_interval
  70. self.trigger_value = config.uf_trigger_value
  71. # 导入新版 UF 模型(从 core/models/ 加载)
  72. core_models = Path(__file__).resolve().parents[1] / "models"
  73. uf_decide_dir = str(core_models / "uf-rl" / "DQN" / "uf_decide")
  74. env_dir = str(core_models / "uf-rl")
  75. if env_dir not in sys.path:
  76. sys.path.insert(0, env_dir)
  77. if uf_decide_dir not in sys.path:
  78. sys.path.insert(0, uf_decide_dir)
  79. from run_dqn_decide import (
  80. build_physics,
  81. calc_uf_cycle_metrics,
  82. check_state_bounds,
  83. generate_plc_instructions,
  84. run_dqn_decide,
  85. )
  86. from env.env_config_loader import create_env_params_from_yaml
  87. self._build_physics = build_physics
  88. self._run_dqn_decide = run_dqn_decide
  89. self._check_state_bounds = check_state_bounds
  90. self._generate_plc_instructions = generate_plc_instructions
  91. self._calc_uf_cycle_metrics = calc_uf_cycle_metrics
  92. # 加载 env_config.yaml
  93. uf_state_default, phys_params, action_spec, reward_params, state_bounds = (
  94. create_env_params_from_yaml(config.uf_env_config_path)
  95. )
  96. self.physics = build_physics(config.uf_is_times, phys_params, state_bounds)
  97. self.action_spec = action_spec
  98. self.reward_params = reward_params
  99. self.state_bounds = state_bounds
  100. self.model_path = config.uf_model_path
  101. # 机组设备配置
  102. self._device_map: Dict[str, dict] = {
  103. u: _build_device_payload(u, self.project_id) for u in config.uf_units
  104. }
  105. # 状态持久化
  106. self._state_file = str(
  107. Path(__file__).resolve().parents[1] / f"uf_states_{plant}.json"
  108. )
  109. self._state_lock = threading.Lock()
  110. self._unit_states: Dict[str, dict] = self._load_states()
  111. # ==================== 状态持久化 ====================
  112. def _load_states(self) -> dict:
  113. if not os.path.exists(self._state_file):
  114. return {}
  115. try:
  116. with open(self._state_file, "r", encoding="utf-8") as f:
  117. return json.load(f)
  118. except Exception:
  119. return {}
  120. def _save_state(self, unit_name: str, state: dict):
  121. with self._state_lock:
  122. all_states = self._load_states()
  123. all_states[unit_name] = state
  124. with open(self._state_file, "w", encoding="utf-8") as f:
  125. json.dump(all_states, f, indent=2, ensure_ascii=False)
  126. # ==================== 事件驱动主循环 ====================
  127. def run_unit_forever(self, unit_name: str):
  128. """单个 UF 机组的事件驱动循环,复刻 loop_main.py 的 monitor_device 逻辑。"""
  129. threading.current_thread().name = unit_name
  130. device = self._device_map[unit_name]
  131. logger.info(f"[{unit_name}] 监控线程启动")
  132. state = self._unit_states.get(unit_name, {})
  133. model_prev_L_s = state.get("model_prev_L_s")
  134. model_prev_t_bw_s = state.get("model_prev_t_bw_s")
  135. last_cycle_end_time = None
  136. if state.get("last_cycle_end_time"):
  137. try:
  138. last_cycle_end_time = datetime.strptime(
  139. state["last_cycle_end_time"], DATETIME_FORMAT
  140. )
  141. except ValueError:
  142. pass
  143. while True:
  144. try:
  145. self._run_one_cycle(
  146. unit_name, device, model_prev_L_s, model_prev_t_bw_s, last_cycle_end_time
  147. )
  148. # cycle 完成后读取最新状态
  149. st = self._unit_states.get(unit_name, {})
  150. model_prev_L_s = st.get("model_prev_L_s")
  151. model_prev_t_bw_s = st.get("model_prev_t_bw_s")
  152. lct = st.get("last_cycle_end_time")
  153. last_cycle_end_time = (
  154. datetime.strptime(lct, DATETIME_FORMAT) if lct else None
  155. )
  156. except Exception as e:
  157. logger.error(f"[{unit_name}] 循环异常: {e}", exc_info=True)
  158. time.sleep(60)
  159. def _run_one_cycle(
  160. self, unit_name, device, model_prev_L_s, model_prev_t_bw_s, last_cycle_end_time
  161. ):
  162. # --- 阶段 1: 等控制字 == 95 ---
  163. logger.info(f"[{unit_name}] 等待控制字 == {self.trigger_value}")
  164. while True:
  165. val = self.data_provider.query_point_current(device["control_payload"])
  166. if val is not None and int(val) == self.trigger_value:
  167. logger.info(f"[{unit_name}] 触发条件满足")
  168. break
  169. time.sleep(self.poll_interval)
  170. # --- 阶段 1.5: 等控制字 == 26 ---
  171. while True:
  172. val = self.data_provider.query_point_current(device["control_payload"])
  173. if val is not None and int(val) == 26:
  174. logger.info(f"[{unit_name}] 控制字变为 26,开始采集")
  175. break
  176. time.sleep(self.poll_interval)
  177. # --- 阶段 2: 采集 TMP ---
  178. collected = []
  179. start_t = datetime.now()
  180. duration = timedelta(minutes=10)
  181. has_valid = False
  182. while datetime.now() - start_t < duration:
  183. tmp_val = self.data_provider.query_point_current(device["target_payload"])
  184. ctrl_val = self.data_provider.query_point_current(device["control_payload"])
  185. if ctrl_val is not None:
  186. ctrl_int = int(ctrl_val)
  187. if ctrl_int == self.trigger_value:
  188. logger.info(f"[{unit_name}] 控制字变回 95,重置")
  189. return
  190. if 22 <= ctrl_int <= 26:
  191. has_valid = True
  192. if tmp_val is not None:
  193. collected.append(tmp_val)
  194. time.sleep(self.poll_interval)
  195. # 10 分钟无有效数据则延长到 30 分钟
  196. elapsed = datetime.now() - start_t
  197. if elapsed >= timedelta(minutes=10) and not has_valid and duration < timedelta(minutes=30):
  198. logger.info(f"[{unit_name}] 10 分钟无有效数据,延长到 30 分钟")
  199. duration = timedelta(minutes=30)
  200. if not has_valid or not collected:
  201. logger.warning(f"[{unit_name}] 未采集到有效 TMP 数据,跳过本轮")
  202. return
  203. avg_tmp = sum(collected) / len(collected)
  204. logger.info(f"[{unit_name}] 采集 {len(collected)} 点, 平均 TMP={avg_tmp:.4f}")
  205. # --- 阶段 3: 决策 ---
  206. # 读取诊断状态(非阻塞,仅标注)
  207. diag = self.shared_state.read("diagnosis_results", max_age_seconds=2400)
  208. diag_status = diag.get("status", "unknown") if diag else "no_data"
  209. logger.info(f"[{unit_name}] 当前诊断状态: {diag_status}")
  210. # 构建状态对象
  211. from env.env_params import UFState
  212. current_state = UFState(TMP=avg_tmp)
  213. # 异常接口:check_state_bounds
  214. error_result = self._check_state_bounds(current_state, self.state_bounds, unit_name)
  215. if error_result:
  216. self.shared_state.publish("uf_anomalies", {"unit": unit_name, **error_result})
  217. logger.warning(f"[{unit_name}] 输入状态越界: {error_result}")
  218. # DQN 决策
  219. action_id, model_L_s, model_t_bw_s = self._run_dqn_decide(
  220. self.model_path,
  221. self.physics,
  222. self.action_spec,
  223. self.reward_params,
  224. self.state_bounds,
  225. current_state,
  226. )
  227. # 读取当前 PLC 参数
  228. prod_time = self.data_provider.query_point_current(
  229. device["production_time_payload"]
  230. ) or 3800
  231. bw_time = self.data_provider.query_point_current(
  232. device["backwashing_payload"]
  233. ) or 100
  234. bw_per_ceb = self.data_provider.query_point_current(
  235. device["ceb_payload"]
  236. ) or 40
  237. # 生成 PLC 指令
  238. L_s, t_bw_s = self._generate_plc_instructions(
  239. self.action_spec,
  240. prod_time,
  241. bw_time,
  242. model_prev_L_s,
  243. model_prev_t_bw_s,
  244. model_L_s,
  245. model_t_bw_s,
  246. )
  247. # 计算运行指标
  248. metrics = self._calc_uf_cycle_metrics(
  249. current_state, None, None, L_s, t_bw_s, self.physics
  250. )
  251. # --- 回调 + PLC 下发 ---
  252. ceb_freq = int(metrics.get("k_bw_per_ceb", bw_per_ceb))
  253. now_str = datetime.now().strftime(DATETIME_FORMAT)
  254. # dry_run 模式:只记录决策,不下发 PLC
  255. if self.dry_run:
  256. logger.info(
  257. f"[{unit_name}] [DRY-RUN] 决策结果: L_s={int(L_s)}, t_bw_s={int(t_bw_s)}, "
  258. f"ceb_freq={ceb_freq}, avg_tmp={avg_tmp:.4f}, "
  259. f"recovery={metrics.get('recovery', 0):.3f}"
  260. )
  261. else:
  262. use_model = self.data_provider.send_callback(
  263. type_name=unit_name,
  264. water_production_time=int(L_s),
  265. physical_backwash=int(t_bw_s),
  266. ceb_backwash_frequency=ceb_freq,
  267. duration_system=int(prod_time),
  268. tmp_action=avg_tmp,
  269. recovery_rate=metrics.get("recovery", 0),
  270. ton_water_energy_kWh=metrics.get("ton_water_energy_kWh_per_m3", 0),
  271. max_permeability=metrics.get("max_permeability", 0),
  272. daily_prod_time_h=metrics.get("daily_prod_time_h", 0),
  273. ctime=now_str,
  274. )
  275. if use_model == 1:
  276. if int(prod_time) != int(L_s):
  277. self.data_provider.send_plc_update(
  278. unit_name,
  279. device["production_time_payload"]["deviceItems"],
  280. str(prod_time),
  281. str(int(L_s)),
  282. 1,
  283. )
  284. if int(bw_time) != int(t_bw_s):
  285. self.data_provider.send_plc_update(
  286. unit_name,
  287. device["backwashing_payload"]["deviceItems"],
  288. str(bw_time),
  289. str(int(t_bw_s)),
  290. 4,
  291. )
  292. if int(bw_per_ceb) != ceb_freq:
  293. self.data_provider.send_plc_update(
  294. unit_name,
  295. device["ceb_payload"]["deviceItems"],
  296. str(bw_per_ceb),
  297. str(ceb_freq),
  298. 2,
  299. )
  300. # 发布决策到共享状态
  301. self.shared_state.publish("uf_decisions", {
  302. "timestamp": now_str,
  303. "unit": unit_name,
  304. "plant": self.plant,
  305. "L_s": L_s,
  306. "t_bw_s": t_bw_s,
  307. "avg_tmp": avg_tmp,
  308. "metrics": metrics,
  309. "dry_run": self.dry_run,
  310. })
  311. # 保存状态
  312. self._save_state(unit_name, {
  313. "model_prev_L_s": L_s,
  314. "model_prev_t_bw_s": t_bw_s,
  315. "last_cycle_end_time": now_str,
  316. })
  317. self._unit_states[unit_name] = {
  318. "model_prev_L_s": L_s,
  319. "model_prev_t_bw_s": t_bw_s,
  320. "last_cycle_end_time": now_str,
  321. }
  322. # --- 阶段 4: 等控制字回到 95 ---
  323. time.sleep(5)
  324. while True:
  325. val = self.data_provider.query_point_current(device["control_payload"])
  326. if val is not None and int(val) == self.trigger_value:
  327. logger.info(f"[{unit_name}] 周期结束")
  328. break
  329. time.sleep(self.poll_interval)