| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- # -*- coding: utf-8 -*-
- """
- pump_state_monitor.py - 泵状态监控模块
- ================================================================================
- 使用示例
- ================================================================================
- # 初始化
- from pump_state_monitor import PumpStateMonitor
- monitor = PumpStateMonitor(
- scada_url="http://47.96.12.136:8788/api/v1/jinke-cloud/device/current-data",
- scada_jwt="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJJRCI6NywiVXNlcm5hbWUiOiJhZG1pbiIsIkRlcCI6IjEzNSIsImV4cCI6MTc3NjExOTExNCwiaXNzIjoiZ2luLWJsb2cifQ.0HTtzHZjyd2mHo8VCy8icYROxmntRMuQhyoZsAYRL_M",
- project_id=92,
- transition_window_minutes=15
- )
- # 单泵查询
- is_running, _ = monitor.update_pump_state("pump_1", "C.M.RO1_GYB@run", "RO1高压泵")
- in_transition = monitor.is_in_transition("pump_1")
- # 多泵批量查询
- pump_configs = [
- {"point": "C.M.RO1_GYB@run", "name": "RO1高压泵"},
- {"point": "C.M.RO2_GYB@run", "name": "RO2高压泵"}
- ]
- in_transition, pumps = monitor.check_pumps_transition(pump_configs)
- # 业务逻辑
- for audio_file in get_audio_files():
- is_running, _ = monitor.update_pump_state("pump_1", "C.M.RO1_GYB@run", "RO1高压泵")
- if not is_running:
- skip_audio(audio_file) # 泵停机,跳过
- continue
- if monitor.is_in_transition("pump_1"):
- skip_audio(audio_file) # 过渡期,跳过
- continue
- process_audio(audio_file) # 泵运行且稳定,正常处理
- ================================================================================
- 时间线案例 (过渡期=15分钟)
- ================================================================================
- 10:00:00 首次调用 -> is_running=True, in_transition=False (初始化)
- 10:05:00 泵停机 -> is_running=False, in_transition=True (进入过渡期)
- 10:20:00 过渡期结束 -> is_running=False, in_transition=False (15分钟后)
- 10:25:00 泵启动 -> is_running=True, in_transition=True (再次进入过渡期)
- 10:40:00 过渡期结束 -> is_running=True, in_transition=False (可正常处理)
- ================================================================================
- 内部机制
- ================================================================================
- 1. 查询缓存: 30秒内同一泵只查询一次SCADA,调用方无需控制频率
- 2. 状态变化: 本地比较前后状态,变化时自动记录时间
- 3. 过渡期判定: 基于状态变化时间,默认15分钟窗口
- 4. 首次查询: 不视为过渡期(假设泵已稳定运行)
- 日志: 泵状态初始化/变化时输出 INFO 级别日志
- """
- import requests
- import logging
- from datetime import datetime, timedelta
- from collections import defaultdict
- logger = logging.getLogger(__name__)
- class PumpStateMonitor:
- """
- 泵状态监控器
-
- 功能:
- - 查询泵运行状态
- - 记录状态变化历史
- - 判断是否处于启停过渡期
- """
-
- def __init__(self, scada_url, scada_jwt, project_id,
- timeout=10, transition_window_minutes=15):
- """
- 初始化泵状态监控器
-
- 参数:
- scada_url: SCADA API地址
- scada_jwt: JWT认证Token
- project_id: 项目ID (用于API查询)
- timeout: 请求超时秒数
- transition_window_minutes: 启停过渡期窗口(默认 15分钟)
- """
- self.scada_url = scada_url
- self.scada_jwt = scada_jwt
- self.project_id = project_id
- self.timeout = timeout
- self.transition_window_minutes = transition_window_minutes
-
- # 状态缓存: {pump_id: True/False}
- self.current_states = {}
-
- # 最后状态变化时间缓存: {pump_id: datetime}
- # 注意: 由本地记录,不依赖接口返回的 htime(那是数据采集时间,不是状态变化时间)
- self.state_change_time = {}
-
- # 上次查询时间(避免频繁查询)
- self.last_query_time = {}
- self.min_query_interval_seconds = 30 # 30 秒查询一次
-
- def query_pump_status(self, point, pump_name=""):
- """
- 查询单个泵的运行状态(使用实时数据接口)
-
- 使用 current-data 接口直接获取最新一条数据,无需时间窗口查询。
-
- 参数:
- point: 点位标识, 如 "C.M.RO1_GYB@run"
- pump_name: 泵名称, 用于日志
-
- 返回:
- (is_running, last_change_time):
- is_running: True=运行中, False=停机
- last_change_time: 最后一次状态变化时间 (来自 htime 字段)
- """
- headers = {
- "Content-Type": "application/json",
- "JWT-TOKEN": self.scada_jwt
- }
-
- # 当前时间戳(毫秒)
- now_ms = int(datetime.now().timestamp() * 1000)
-
- # 请求参数
- params = {"time": now_ms}
-
- # 请求体:使用实时数据接口格式
- request_body = [
- {
- "deviceId": "1",
- "deviceItems": point,
- "deviceName": pump_name or point,
- "project_id": self.project_id
- }
- ]
-
- try:
- response = requests.post(
- self.scada_url,
- params=params,
- json=request_body,
- headers=headers,
- timeout=self.timeout
- )
-
- if response.status_code == 200:
- data = response.json()
- if data.get("code") == 200 and data.get("data"):
- # 获取第一条数据(实时接口只返回最新一条)
- latest = data["data"][0]
- if "val" in latest:
- val = int(float(latest["val"]))
- is_running = val == 1
-
- # 解析 htime 时间字段(实时接口返回的是北京时间字符串)
- htime_str = latest.get("htime", "")
- last_change_time = None
- if htime_str:
- try:
- # 直接解析,不做时区转换(按北京时间处理)
- last_change_time = datetime.strptime(htime_str, "%Y-%m-%d %H:%M:%S")
- except ValueError:
- logger.warning(f"无法解析 htime: {htime_str}")
-
- logger.debug(f"泵状态查询: {pump_name or point} = {'运行' if is_running else '停机'} (变化时间={htime_str})")
- return is_running, last_change_time
-
- # 接口返回成功但无数据
- logger.warning(f"泵状态查询无数据: {pump_name or point}")
- else:
- logger.warning(f"泵状态查询HTTP错误: {pump_name or point} | status={response.status_code}")
-
- except Exception as e:
- logger.warning(f"泵状态查询失败: {pump_name or point} | {e}")
-
- # 查询失败时默认返回停机状态
- logger.warning(f"泵状态查询: {pump_name or point} | 查询失败,默认视为停机")
- return False, None
-
- def update_pump_state(self, pump_id, point, pump_name=""):
- """
- 更新并检测泵状态变化
-
- 状态变化时间由本地记录,不依赖接口返回的 htime(那是数据采集时间)。
-
- 参数:
- pump_id: 泵唯一标识
- point: SCADA点位
- pump_name: 泵名称
-
- 返回:
- (当前状态 True/False, 最后状态变化时间)
- """
- # 限制查询频率
- now = datetime.now()
- last_query = self.last_query_time.get(pump_id)
- if last_query:
- elapsed = (now - last_query).total_seconds()
- if elapsed < self.min_query_interval_seconds:
- # 使用缓存状态
- cached_state = self.current_states.get(pump_id)
- cached_time = self.state_change_time.get(pump_id)
- return cached_state, cached_time
-
- # 查询状态(实时接口只返回当前状态,不返回状态变化时间)
- new_state, _ = self.query_pump_status(point, pump_name)
-
- self.last_query_time[pump_id] = now
-
- # 检测状态变化
- old_state = self.current_states.get(pump_id)
- state_change_time = self.state_change_time.get(pump_id)
-
- if old_state is None:
- # 首次查询:初始化缓存状态
- # 假设泵已经稳定运行,不设置过渡期(避免程序启动时所有泵都被认为在过渡期)
- state_change_time = None # None 表示不在过渡期
- logger.info(f"泵状态初始化: {pump_name or pump_id} = {'运行' if new_state else '停机'}")
- elif new_state != old_state:
- # 状态变化:记录当前时间为变化时间
- state_change_time = now
- event = "启动" if new_state else "停机"
- logger.info(f"泵状态变化: {pump_name or pump_id} {event} | 进入过渡期")
-
- # 更新缓存
- self.current_states[pump_id] = new_state
- self.state_change_time[pump_id] = state_change_time
-
- return new_state, state_change_time
-
- def is_in_transition(self, pump_id):
- """
- 检查泵是否处于启停过渡期
-
- 基于本地记录的状态变化时间判断。
-
- 参数:
- pump_id: 泵唯一标识
-
- 返回:
- True=过渡期内(最近N分钟有状态变化), False=稳定状态
- """
- last_change_time = self.state_change_time.get(pump_id)
- if not last_change_time:
- # None 表示首次查询或无状态变化记录,视为稳定状态
- return False
-
- # 使用本地时间比较(变化时间是本地记录的)
- elapsed_minutes = (datetime.now() - last_change_time).total_seconds() / 60
-
- return elapsed_minutes < self.transition_window_minutes
-
- def check_pumps_transition(self, pump_configs):
- """
- 检查多个泵是否有任何处于启停过渡期
-
- 参数:
- pump_configs: 泵配置列表 [{"point": "...", "name": "..."}, ...]
-
- 返回:
- (是否有泵在过渡期, 过渡期泵名称列表)
- """
- in_transition = False
- transition_pumps = []
-
- for pump_cfg in pump_configs:
- point = pump_cfg.get("point", "")
- name = pump_cfg.get("name", point)
- pump_id = point # 使用点位作为唯一ID
-
- # 更新状态
- self.update_pump_state(pump_id, point, name)
-
- # 检查是否过渡期
- if self.is_in_transition(pump_id):
- in_transition = True
- transition_pumps.append(name)
-
- return in_transition, transition_pumps
|