| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391 |
- # -*- coding: utf-8 -*-
- """
- pump_state_monitor.py - 泵状态监控模块
- ================================================================================
- 使用示例
- ================================================================================
- # 初始化
- from pump_state_monitor import PumpStateMonitor
- monitor = PumpStateMonitor(
- scada_url="http://120.55.44.4:8900/api/v1/jinke-cloud/device/current-data",
- scada_jwt="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJJRCI6NywiVXNlcm5hbWUiOiJhZG1pbiIsIkRlcCI6IjEzNSIsImV4cCI6MTc3NjExOTExNCwiaXNzIjoiZ2luLWJsb2cifQ.0HTtzHZjyd2mHo8VCy8icYROxmntRMuQhyoZsAYRL_M",
- project_id=92,
- transition_window_minutes=15
- )
- # 单泵查询
- is_running, _ = monitor.update_pump_state("pump_1", "C.M.RO1_GYB@run", "RO1高压泵")
- in_transition = monitor.is_in_transition("pump_1")
- # 多泵批量查询
- pump_configs = [
- {"point": "C.M.RO1_GYB@run", "name": "RO1高压泵"},
- {"point": "C.M.RO2_GYB@run", "name": "RO2高压泵"}
- ]
- in_transition, pumps = monitor.check_pumps_transition(pump_configs)
- # 业务逻辑
- for audio_file in get_audio_files():
- is_running, _ = monitor.update_pump_state("pump_1", "C.M.RO1_GYB@run", "RO1高压泵")
- if not is_running:
- skip_audio(audio_file) # 泵停机,跳过
- continue
- if monitor.is_in_transition("pump_1"):
- skip_audio(audio_file) # 过渡期,跳过
- continue
- process_audio(audio_file) # 泵运行且稳定,正常处理
- ================================================================================
- 时间线案例 (过渡期=15分钟)
- ================================================================================
- 10:00:00 首次调用 -> is_running=True, in_transition=False (初始化)
- 10:05:00 泵停机 -> is_running=False, in_transition=True (进入过渡期)
- 10:20:00 过渡期结束 -> is_running=False, in_transition=False (15分钟后)
- 10:25:00 泵启动 -> is_running=True, in_transition=True (再次进入过渡期)
- 10:40:00 过渡期结束 -> is_running=True, in_transition=False (可正常处理)
- ================================================================================
- 内部机制
- ================================================================================
- 1. 查询缓存: 30秒内同一泵只查询一次SCADA,调用方无需控制频率
- 2. 状态变化: 本地比较前后状态,变化时自动记录时间
- 3. 过渡期判定: 基于状态变化时间,默认15分钟窗口
- 4. 首次查询: 不视为过渡期(假设泵已稳定运行)
- 日志: 泵状态初始化/变化时输出 INFO 级别日志
- """
- import json
- import requests
- import logging
- import threading
- from datetime import datetime, timedelta
- from typing import List, Optional
- from collections import defaultdict
- from urllib import request as urllib_request, error as urllib_error
- logger = logging.getLogger(__name__)
- class PumpStateMonitor:
- """
- 泵状态监控器
-
- 功能:
- - 查询泵运行状态
- - 记录状态变化历史
- - 判断是否处于启停过渡期
- """
-
- def __init__(self, scada_url, scada_jwt, project_id,
- timeout=10, transition_window_minutes=15,
- login_url="", login_username="", login_password="",
- login_type="account", login_dep_id=""):
- """
- 初始化泵状态监控器
- 参数:
- scada_url: SCADA API地址
- scada_jwt: JWT认证Token(静态,可为空)
- project_id: 项目ID (用于API查询)
- timeout: 请求超时秒数
- transition_window_minutes: 启停过渡期窗口(默认 15分钟)
- login_url: 登录接口地址(启用后自动获取 JWT)
- login_username: 登录用户名
- login_password: 登录密码
- login_type: 登录类型
- login_dep_id: 部门ID
- """
- self.scada_url = scada_url
- self.scada_jwt = str(scada_jwt or "").strip()
- self.project_id = project_id
- self.timeout = timeout
- self.transition_window_minutes = transition_window_minutes
- # 自动登录配置
- self.login_url = str(login_url or "").strip()
- self.login_username = str(login_username or "").strip()
- self.login_password = str(login_password or "")
- self.login_type = str(login_type or "account")
- self.login_dep_id = str(login_dep_id or "")
- self._auth_lock = threading.Lock()
-
- # 状态缓存: {pump_id: True/False}
- self.current_states = {}
-
- # 最后状态变化时间缓存: {pump_id: datetime}
- # 注意: 由本地记录,不依赖接口返回的 htime(那是数据采集时间,不是状态变化时间)
- self.state_change_time = {}
-
- # 上次查询时间(避免频繁查询)
- self.last_query_time = {}
- self.min_query_interval_seconds = 30 # 30 秒查询一次
- # ------------------------------------------------------------------ #
- # Token 管理 #
- # ------------------------------------------------------------------ #
- def _can_auto_login(self) -> bool:
- return bool(self.login_url and self.login_username and self.login_password)
- @staticmethod
- def _extract_token(payload: dict) -> str:
- if not isinstance(payload, dict):
- return ""
- candidates: List[object] = []
- data = payload.get("data")
- if isinstance(data, dict):
- candidates.extend([
- data.get("token"), data.get("jwt"), data.get("jwtToken"),
- data.get("accessToken"), data.get("access_token"),
- ])
- elif isinstance(data, str):
- candidates.append(data)
- candidates.extend([
- payload.get("token"), payload.get("jwt"), payload.get("jwtToken"),
- payload.get("accessToken"), payload.get("access_token"),
- ])
- for value in candidates:
- if isinstance(value, str) and value.strip():
- return value.strip()
- return ""
- def _login_and_get_jwt(self) -> bool:
- if not self._can_auto_login():
- return False
- body = json.dumps({
- "UserName": self.login_username,
- "Password": self.login_password,
- "type": self.login_type,
- "DepId": self.login_dep_id,
- }).encode("utf-8")
- req = urllib_request.Request(
- self.login_url, data=body,
- headers={"Content-Type": "application/json"}, method="POST",
- )
- try:
- with urllib_request.urlopen(req, timeout=self.timeout) as resp:
- data = json.loads(resp.read().decode("utf-8"))
- token = self._extract_token(data)
- if token:
- self.scada_jwt = token
- logger.info("SCADA 登录成功,JWT 已刷新")
- return True
- logger.warning("SCADA 登录成功但响应内无 JWT 字段")
- except (urllib_error.URLError, TimeoutError, json.JSONDecodeError, Exception) as e:
- logger.warning("SCADA 登录失败: %s", e)
- return False
- def _ensure_jwt(self) -> bool:
- if self.scada_jwt:
- return True
- with self._auth_lock:
- if self.scada_jwt:
- return True
- return self._login_and_get_jwt()
- def _clear_jwt(self) -> None:
- with self._auth_lock:
- self.scada_jwt = ""
-
- def query_pump_status(self, point, pump_name=""):
- """
- 查询单个泵的运行状态(使用实时数据接口)
- 使用 current-data 接口直接获取最新一条数据,无需时间窗口查询。
- 支持自动登录获取 JWT,401/403 时自动刷新重试。
- 参数:
- point: 点位标识, 如 "C.M.RO1_GYB@run"
- pump_name: 泵名称, 用于日志
- 返回:
- (is_running, last_change_time):
- is_running: True=运行中, False=停机
- last_change_time: 最后一次状态变化时间 (来自 htime 字段)
- """
- # 当前时间戳(毫秒)
- now_ms = int(datetime.now().timestamp() * 1000)
- # 请求参数
- params = {"time": now_ms}
- # 请求体:使用实时数据接口格式
- request_body = [
- {
- "deviceId": "1",
- "deviceItems": point,
- "deviceName": pump_name or point,
- "project_id": self.project_id
- }
- ]
- for attempt in range(2):
- if not self._ensure_jwt():
- logger.warning("泵状态查询失败: JWT 不可用")
- break
- headers = {
- "Content-Type": "application/json",
- "JWT-TOKEN": self.scada_jwt
- }
- try:
- response = requests.post(
- self.scada_url,
- params=params,
- json=request_body,
- headers=headers,
- timeout=self.timeout
- )
- if response.status_code == 200:
- data = response.json()
- code = data.get("code")
- if code in (401, 403) and self._can_auto_login() and attempt == 0:
- self._clear_jwt()
- continue
- if code == 200 and data.get("data"):
- latest = data["data"][0]
- if "val" in latest:
- val = int(float(latest["val"]))
- is_running = val == 1
- htime_str = latest.get("htime", "")
- last_change_time = None
- if htime_str:
- try:
- last_change_time = datetime.strptime(htime_str, "%Y-%m-%d %H:%M:%S")
- except ValueError:
- logger.warning(f"无法解析 htime: {htime_str}")
- logger.debug(f"泵状态查询: {pump_name or point} = {'运行' if is_running else '停机'} (变化时间={htime_str})")
- return is_running, last_change_time
- logger.warning(f"泵状态查询无数据: {pump_name or point}")
- elif response.status_code in (401, 403) and self._can_auto_login() and attempt == 0:
- self._clear_jwt()
- if self._ensure_jwt():
- continue
- break
- else:
- logger.warning(f"泵状态查询HTTP错误: {pump_name or point} | status={response.status_code}")
- except Exception as e:
- logger.warning(f"泵状态查询失败: {pump_name or point} | {e}")
- break
- logger.warning(f"泵状态查询: {pump_name or point} | 查询失败,保持上次状态")
- # 查询失败时返回上次已知状态,避免网络抖动误判停机触发15分钟过渡期
- last_state = self.current_states.get(pump_id, True)
- return last_state, self.state_change_time.get(pump_id)
-
- def update_pump_state(self, pump_id, point, pump_name=""):
- """
- 更新并检测泵状态变化
-
- 状态变化时间由本地记录,不依赖接口返回的 htime(那是数据采集时间)。
-
- 参数:
- pump_id: 泵唯一标识
- point: SCADA点位
- pump_name: 泵名称
-
- 返回:
- (当前状态 True/False, 最后状态变化时间)
- """
- # 限制查询频率
- now = datetime.now()
- last_query = self.last_query_time.get(pump_id)
- if last_query:
- elapsed = (now - last_query).total_seconds()
- if elapsed < self.min_query_interval_seconds:
- # 使用缓存状态
- cached_state = self.current_states.get(pump_id)
- cached_time = self.state_change_time.get(pump_id)
- return cached_state, cached_time
-
- # 查询状态(实时接口只返回当前状态,不返回状态变化时间)
- new_state, _ = self.query_pump_status(point, pump_name)
-
- self.last_query_time[pump_id] = now
-
- # 检测状态变化
- old_state = self.current_states.get(pump_id)
- state_change_time = self.state_change_time.get(pump_id)
-
- if old_state is None:
- # 首次查询:初始化缓存状态
- # 假设泵已经稳定运行,不设置过渡期(避免程序启动时所有泵都被认为在过渡期)
- state_change_time = None # None 表示不在过渡期
- logger.info(f"泵状态初始化: {pump_name or pump_id} = {'运行' if new_state else '停机'}")
- elif new_state != old_state:
- # 状态变化:记录当前时间为变化时间
- state_change_time = now
- event = "启动" if new_state else "停机"
- logger.info(f"泵状态变化: {pump_name or pump_id} {event} | 进入过渡期")
-
- # 更新缓存
- self.current_states[pump_id] = new_state
- self.state_change_time[pump_id] = state_change_time
-
- return new_state, state_change_time
-
- def is_in_transition(self, pump_id):
- """
- 检查泵是否处于启停过渡期
-
- 基于本地记录的状态变化时间判断。
-
- 参数:
- pump_id: 泵唯一标识
-
- 返回:
- True=过渡期内(最近N分钟有状态变化), False=稳定状态
- """
- last_change_time = self.state_change_time.get(pump_id)
- if not last_change_time:
- # None 表示首次查询或无状态变化记录,视为稳定状态
- return False
-
- # 使用本地时间比较(变化时间是本地记录的)
- elapsed_minutes = (datetime.now() - last_change_time).total_seconds() / 60
-
- return elapsed_minutes < self.transition_window_minutes
-
- def check_pumps_transition(self, pump_configs):
- """
- 检查多个泵是否有任何处于启停过渡期
-
- 参数:
- pump_configs: 泵配置列表 [{"point": "...", "name": "..."}, ...]
-
- 返回:
- (是否有泵在过渡期, 过渡期泵名称列表)
- """
- in_transition = False
- transition_pumps = []
-
- for pump_cfg in pump_configs:
- point = pump_cfg.get("point", "")
- name = pump_cfg.get("name", point)
- pump_id = point # 使用点位作为唯一ID
-
- # 更新状态
- self.update_pump_state(pump_id, point, name)
-
- # 检查是否过渡期
- if self.is_in_transition(pump_id):
- in_transition = True
- transition_pumps.append(name)
-
- return in_transition, transition_pumps
|