pump_state_monitor.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. # -*- coding: utf-8 -*-
  2. """
  3. pump_state_monitor.py - 泵状态监控模块
  4. ================================================================================
  5. 使用示例
  6. ================================================================================
  7. # 初始化
  8. from pump_state_monitor import PumpStateMonitor
  9. monitor = PumpStateMonitor(
  10. scada_url="http://120.55.44.4:8900/api/v1/jinke-cloud/device/current-data",
  11. scada_jwt="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJJRCI6NywiVXNlcm5hbWUiOiJhZG1pbiIsIkRlcCI6IjEzNSIsImV4cCI6MTc3NjExOTExNCwiaXNzIjoiZ2luLWJsb2cifQ.0HTtzHZjyd2mHo8VCy8icYROxmntRMuQhyoZsAYRL_M",
  12. project_id=92,
  13. transition_window_minutes=15
  14. )
  15. # 单泵查询
  16. is_running, _ = monitor.update_pump_state("pump_1", "C.M.RO1_GYB@run", "RO1高压泵")
  17. in_transition = monitor.is_in_transition("pump_1")
  18. # 多泵批量查询
  19. pump_configs = [
  20. {"point": "C.M.RO1_GYB@run", "name": "RO1高压泵"},
  21. {"point": "C.M.RO2_GYB@run", "name": "RO2高压泵"}
  22. ]
  23. in_transition, pumps = monitor.check_pumps_transition(pump_configs)
  24. # 业务逻辑
  25. for audio_file in get_audio_files():
  26. is_running, _ = monitor.update_pump_state("pump_1", "C.M.RO1_GYB@run", "RO1高压泵")
  27. if not is_running:
  28. skip_audio(audio_file) # 泵停机,跳过
  29. continue
  30. if monitor.is_in_transition("pump_1"):
  31. skip_audio(audio_file) # 过渡期,跳过
  32. continue
  33. process_audio(audio_file) # 泵运行且稳定,正常处理
  34. ================================================================================
  35. 时间线案例 (过渡期=15分钟)
  36. ================================================================================
  37. 10:00:00 首次调用 -> is_running=True, in_transition=False (初始化)
  38. 10:05:00 泵停机 -> is_running=False, in_transition=True (进入过渡期)
  39. 10:20:00 过渡期结束 -> is_running=False, in_transition=False (15分钟后)
  40. 10:25:00 泵启动 -> is_running=True, in_transition=True (再次进入过渡期)
  41. 10:40:00 过渡期结束 -> is_running=True, in_transition=False (可正常处理)
  42. ================================================================================
  43. 内部机制
  44. ================================================================================
  45. 1. 查询缓存: 30秒内同一泵只查询一次SCADA,调用方无需控制频率
  46. 2. 状态变化: 本地比较前后状态,变化时自动记录时间
  47. 3. 过渡期判定: 基于状态变化时间,默认15分钟窗口
  48. 4. 首次查询: 不视为过渡期(假设泵已稳定运行)
  49. 日志: 泵状态初始化/变化时输出 INFO 级别日志
  50. """
  51. import json
  52. import requests
  53. import logging
  54. import threading
  55. from datetime import datetime, timedelta
  56. from typing import List, Optional
  57. from collections import defaultdict
  58. from urllib import request as urllib_request, error as urllib_error
  59. logger = logging.getLogger(__name__)
  60. class PumpStateMonitor:
  61. """
  62. 泵状态监控器
  63. 功能:
  64. - 查询泵运行状态
  65. - 记录状态变化历史
  66. - 判断是否处于启停过渡期
  67. """
  68. def __init__(self, scada_url, scada_jwt, project_id,
  69. timeout=10, transition_window_minutes=15,
  70. login_url="", login_username="", login_password="",
  71. login_type="account", login_dep_id=""):
  72. """
  73. 初始化泵状态监控器
  74. 参数:
  75. scada_url: SCADA API地址
  76. scada_jwt: JWT认证Token(静态,可为空)
  77. project_id: 项目ID (用于API查询)
  78. timeout: 请求超时秒数
  79. transition_window_minutes: 启停过渡期窗口(默认 15分钟)
  80. login_url: 登录接口地址(启用后自动获取 JWT)
  81. login_username: 登录用户名
  82. login_password: 登录密码
  83. login_type: 登录类型
  84. login_dep_id: 部门ID
  85. """
  86. self.scada_url = scada_url
  87. self.scada_jwt = str(scada_jwt or "").strip()
  88. self.project_id = project_id
  89. self.timeout = timeout
  90. self.transition_window_minutes = transition_window_minutes
  91. # 自动登录配置
  92. self.login_url = str(login_url or "").strip()
  93. self.login_username = str(login_username or "").strip()
  94. self.login_password = str(login_password or "")
  95. self.login_type = str(login_type or "account")
  96. self.login_dep_id = str(login_dep_id or "")
  97. self._auth_lock = threading.Lock()
  98. # 状态缓存: {pump_id: True/False}
  99. self.current_states = {}
  100. # 最后状态变化时间缓存: {pump_id: datetime}
  101. # 注意: 由本地记录,不依赖接口返回的 htime(那是数据采集时间,不是状态变化时间)
  102. self.state_change_time = {}
  103. # 上次查询时间(避免频繁查询)
  104. self.last_query_time = {}
  105. self.min_query_interval_seconds = 30 # 30 秒查询一次
  106. # ------------------------------------------------------------------ #
  107. # Token 管理 #
  108. # ------------------------------------------------------------------ #
  109. def _can_auto_login(self) -> bool:
  110. return bool(self.login_url and self.login_username and self.login_password)
  111. @staticmethod
  112. def _extract_token(payload: dict) -> str:
  113. if not isinstance(payload, dict):
  114. return ""
  115. candidates: List[object] = []
  116. data = payload.get("data")
  117. if isinstance(data, dict):
  118. candidates.extend([
  119. data.get("token"), data.get("jwt"), data.get("jwtToken"),
  120. data.get("accessToken"), data.get("access_token"),
  121. ])
  122. elif isinstance(data, str):
  123. candidates.append(data)
  124. candidates.extend([
  125. payload.get("token"), payload.get("jwt"), payload.get("jwtToken"),
  126. payload.get("accessToken"), payload.get("access_token"),
  127. ])
  128. for value in candidates:
  129. if isinstance(value, str) and value.strip():
  130. return value.strip()
  131. return ""
  132. def _login_and_get_jwt(self) -> bool:
  133. if not self._can_auto_login():
  134. return False
  135. body = json.dumps({
  136. "UserName": self.login_username,
  137. "Password": self.login_password,
  138. "type": self.login_type,
  139. "DepId": self.login_dep_id,
  140. }).encode("utf-8")
  141. req = urllib_request.Request(
  142. self.login_url, data=body,
  143. headers={"Content-Type": "application/json"}, method="POST",
  144. )
  145. try:
  146. with urllib_request.urlopen(req, timeout=self.timeout) as resp:
  147. data = json.loads(resp.read().decode("utf-8"))
  148. token = self._extract_token(data)
  149. if token:
  150. self.scada_jwt = token
  151. logger.info("SCADA 登录成功,JWT 已刷新")
  152. return True
  153. logger.warning("SCADA 登录成功但响应内无 JWT 字段")
  154. except (urllib_error.URLError, TimeoutError, json.JSONDecodeError, Exception) as e:
  155. logger.warning("SCADA 登录失败: %s", e)
  156. return False
  157. def _ensure_jwt(self) -> bool:
  158. if self.scada_jwt:
  159. return True
  160. with self._auth_lock:
  161. if self.scada_jwt:
  162. return True
  163. return self._login_and_get_jwt()
  164. def _clear_jwt(self) -> None:
  165. with self._auth_lock:
  166. self.scada_jwt = ""
  167. def query_pump_status(self, point, pump_name=""):
  168. """
  169. 查询单个泵的运行状态(使用实时数据接口)
  170. 使用 current-data 接口直接获取最新一条数据,无需时间窗口查询。
  171. 支持自动登录获取 JWT,401/403 时自动刷新重试。
  172. 参数:
  173. point: 点位标识, 如 "C.M.RO1_GYB@run"
  174. pump_name: 泵名称, 用于日志
  175. 返回:
  176. (is_running, last_change_time):
  177. is_running: True=运行中, False=停机
  178. last_change_time: 最后一次状态变化时间 (来自 htime 字段)
  179. """
  180. # 当前时间戳(毫秒)
  181. now_ms = int(datetime.now().timestamp() * 1000)
  182. # 请求参数
  183. params = {"time": now_ms}
  184. # 请求体:使用实时数据接口格式
  185. request_body = [
  186. {
  187. "deviceId": "1",
  188. "deviceItems": point,
  189. "deviceName": pump_name or point,
  190. "project_id": self.project_id
  191. }
  192. ]
  193. for attempt in range(2):
  194. if not self._ensure_jwt():
  195. logger.warning("泵状态查询失败: JWT 不可用")
  196. break
  197. headers = {
  198. "Content-Type": "application/json",
  199. "JWT-TOKEN": self.scada_jwt
  200. }
  201. try:
  202. response = requests.post(
  203. self.scada_url,
  204. params=params,
  205. json=request_body,
  206. headers=headers,
  207. timeout=self.timeout
  208. )
  209. if response.status_code == 200:
  210. data = response.json()
  211. code = data.get("code")
  212. if code in (401, 403) and self._can_auto_login() and attempt == 0:
  213. self._clear_jwt()
  214. continue
  215. if code == 200 and data.get("data"):
  216. latest = data["data"][0]
  217. if "val" in latest:
  218. val = int(float(latest["val"]))
  219. is_running = val == 1
  220. htime_str = latest.get("htime", "")
  221. last_change_time = None
  222. if htime_str:
  223. try:
  224. last_change_time = datetime.strptime(htime_str, "%Y-%m-%d %H:%M:%S")
  225. except ValueError:
  226. logger.warning(f"无法解析 htime: {htime_str}")
  227. logger.debug(f"泵状态查询: {pump_name or point} = {'运行' if is_running else '停机'} (变化时间={htime_str})")
  228. return is_running, last_change_time
  229. logger.warning(f"泵状态查询无数据: {pump_name or point}")
  230. elif response.status_code in (401, 403) and self._can_auto_login() and attempt == 0:
  231. self._clear_jwt()
  232. if self._ensure_jwt():
  233. continue
  234. break
  235. else:
  236. logger.warning(f"泵状态查询HTTP错误: {pump_name or point} | status={response.status_code}")
  237. except Exception as e:
  238. logger.warning(f"泵状态查询失败: {pump_name or point} | {e}")
  239. break
  240. logger.warning(f"泵状态查询: {pump_name or point} | 查询失败,保持上次状态")
  241. # 查询失败时返回上次已知状态,避免网络抖动误判停机触发15分钟过渡期
  242. last_state = self.current_states.get(pump_id, True)
  243. return last_state, self.state_change_time.get(pump_id)
  244. def update_pump_state(self, pump_id, point, pump_name=""):
  245. """
  246. 更新并检测泵状态变化
  247. 状态变化时间由本地记录,不依赖接口返回的 htime(那是数据采集时间)。
  248. 参数:
  249. pump_id: 泵唯一标识
  250. point: SCADA点位
  251. pump_name: 泵名称
  252. 返回:
  253. (当前状态 True/False, 最后状态变化时间)
  254. """
  255. # 限制查询频率
  256. now = datetime.now()
  257. last_query = self.last_query_time.get(pump_id)
  258. if last_query:
  259. elapsed = (now - last_query).total_seconds()
  260. if elapsed < self.min_query_interval_seconds:
  261. # 使用缓存状态
  262. cached_state = self.current_states.get(pump_id)
  263. cached_time = self.state_change_time.get(pump_id)
  264. return cached_state, cached_time
  265. # 查询状态(实时接口只返回当前状态,不返回状态变化时间)
  266. new_state, _ = self.query_pump_status(point, pump_name)
  267. self.last_query_time[pump_id] = now
  268. # 检测状态变化
  269. old_state = self.current_states.get(pump_id)
  270. state_change_time = self.state_change_time.get(pump_id)
  271. if old_state is None:
  272. # 首次查询:初始化缓存状态
  273. # 假设泵已经稳定运行,不设置过渡期(避免程序启动时所有泵都被认为在过渡期)
  274. state_change_time = None # None 表示不在过渡期
  275. logger.info(f"泵状态初始化: {pump_name or pump_id} = {'运行' if new_state else '停机'}")
  276. elif new_state != old_state:
  277. # 状态变化:记录当前时间为变化时间
  278. state_change_time = now
  279. event = "启动" if new_state else "停机"
  280. logger.info(f"泵状态变化: {pump_name or pump_id} {event} | 进入过渡期")
  281. # 更新缓存
  282. self.current_states[pump_id] = new_state
  283. self.state_change_time[pump_id] = state_change_time
  284. return new_state, state_change_time
  285. def is_in_transition(self, pump_id):
  286. """
  287. 检查泵是否处于启停过渡期
  288. 基于本地记录的状态变化时间判断。
  289. 参数:
  290. pump_id: 泵唯一标识
  291. 返回:
  292. True=过渡期内(最近N分钟有状态变化), False=稳定状态
  293. """
  294. last_change_time = self.state_change_time.get(pump_id)
  295. if not last_change_time:
  296. # None 表示首次查询或无状态变化记录,视为稳定状态
  297. return False
  298. # 使用本地时间比较(变化时间是本地记录的)
  299. elapsed_minutes = (datetime.now() - last_change_time).total_seconds() / 60
  300. return elapsed_minutes < self.transition_window_minutes
  301. def check_pumps_transition(self, pump_configs):
  302. """
  303. 检查多个泵是否有任何处于启停过渡期
  304. 参数:
  305. pump_configs: 泵配置列表 [{"point": "...", "name": "..."}, ...]
  306. 返回:
  307. (是否有泵在过渡期, 过渡期泵名称列表)
  308. """
  309. in_transition = False
  310. transition_pumps = []
  311. for pump_cfg in pump_configs:
  312. point = pump_cfg.get("point", "")
  313. name = pump_cfg.get("name", point)
  314. pump_id = point # 使用点位作为唯一ID
  315. # 更新状态
  316. self.update_pump_state(pump_id, point, name)
  317. # 检查是否过渡期
  318. if self.is_in_transition(pump_id):
  319. in_transition = True
  320. transition_pumps.append(name)
  321. return in_transition, transition_pumps