# -*- coding: utf-8 -*- """ pump_state_monitor.py - 泵状态监控模块 ================================================================================ 使用示例 ================================================================================ # 初始化 from pump_state_monitor import PumpStateMonitor monitor = PumpStateMonitor( scada_url="http://47.96.12.136:8788/api/v1/jinke-cloud/device/current-data", scada_jwt="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJJRCI6NywiVXNlcm5hbWUiOiJhZG1pbiIsIkRlcCI6IjEzNSIsImV4cCI6MTc3NjExOTExNCwiaXNzIjoiZ2luLWJsb2cifQ.0HTtzHZjyd2mHo8VCy8icYROxmntRMuQhyoZsAYRL_M", project_id=92, transition_window_minutes=15 ) # 单泵查询 is_running, _ = monitor.update_pump_state("pump_1", "C.M.RO1_GYB@run", "RO1高压泵") in_transition = monitor.is_in_transition("pump_1") # 多泵批量查询 pump_configs = [ {"point": "C.M.RO1_GYB@run", "name": "RO1高压泵"}, {"point": "C.M.RO2_GYB@run", "name": "RO2高压泵"} ] in_transition, pumps = monitor.check_pumps_transition(pump_configs) # 业务逻辑 for audio_file in get_audio_files(): is_running, _ = monitor.update_pump_state("pump_1", "C.M.RO1_GYB@run", "RO1高压泵") if not is_running: skip_audio(audio_file) # 泵停机,跳过 continue if monitor.is_in_transition("pump_1"): skip_audio(audio_file) # 过渡期,跳过 continue process_audio(audio_file) # 泵运行且稳定,正常处理 ================================================================================ 时间线案例 (过渡期=15分钟) ================================================================================ 10:00:00 首次调用 -> is_running=True, in_transition=False (初始化) 10:05:00 泵停机 -> is_running=False, in_transition=True (进入过渡期) 10:20:00 过渡期结束 -> is_running=False, in_transition=False (15分钟后) 10:25:00 泵启动 -> is_running=True, in_transition=True (再次进入过渡期) 10:40:00 过渡期结束 -> is_running=True, in_transition=False (可正常处理) ================================================================================ 内部机制 ================================================================================ 1. 查询缓存: 30秒内同一泵只查询一次SCADA,调用方无需控制频率 2. 状态变化: 本地比较前后状态,变化时自动记录时间 3. 过渡期判定: 基于状态变化时间,默认15分钟窗口 4. 首次查询: 不视为过渡期(假设泵已稳定运行) 日志: 泵状态初始化/变化时输出 INFO 级别日志 """ import requests import logging from datetime import datetime, timedelta from collections import defaultdict logger = logging.getLogger(__name__) class PumpStateMonitor: """ 泵状态监控器 功能: - 查询泵运行状态 - 记录状态变化历史 - 判断是否处于启停过渡期 """ def __init__(self, scada_url, scada_jwt, project_id, timeout=10, transition_window_minutes=15): """ 初始化泵状态监控器 参数: scada_url: SCADA API地址 scada_jwt: JWT认证Token project_id: 项目ID (用于API查询) timeout: 请求超时秒数 transition_window_minutes: 启停过渡期窗口(默认 15分钟) """ self.scada_url = scada_url self.scada_jwt = scada_jwt self.project_id = project_id self.timeout = timeout self.transition_window_minutes = transition_window_minutes # 状态缓存: {pump_id: True/False} self.current_states = {} # 最后状态变化时间缓存: {pump_id: datetime} # 注意: 由本地记录,不依赖接口返回的 htime(那是数据采集时间,不是状态变化时间) self.state_change_time = {} # 上次查询时间(避免频繁查询) self.last_query_time = {} self.min_query_interval_seconds = 30 # 30 秒查询一次 def query_pump_status(self, point, pump_name=""): """ 查询单个泵的运行状态(使用实时数据接口) 使用 current-data 接口直接获取最新一条数据,无需时间窗口查询。 参数: point: 点位标识, 如 "C.M.RO1_GYB@run" pump_name: 泵名称, 用于日志 返回: (is_running, last_change_time): is_running: True=运行中, False=停机 last_change_time: 最后一次状态变化时间 (来自 htime 字段) """ headers = { "Content-Type": "application/json", "JWT-TOKEN": self.scada_jwt } # 当前时间戳(毫秒) now_ms = int(datetime.now().timestamp() * 1000) # 请求参数 params = {"time": now_ms} # 请求体:使用实时数据接口格式 request_body = [ { "deviceId": "1", "deviceItems": point, "deviceName": pump_name or point, "project_id": self.project_id } ] try: response = requests.post( self.scada_url, params=params, json=request_body, headers=headers, timeout=self.timeout ) if response.status_code == 200: data = response.json() if data.get("code") == 200 and data.get("data"): # 获取第一条数据(实时接口只返回最新一条) latest = data["data"][0] if "val" in latest: val = int(float(latest["val"])) is_running = val == 1 # 解析 htime 时间字段(实时接口返回的是北京时间字符串) htime_str = latest.get("htime", "") last_change_time = None if htime_str: try: # 直接解析,不做时区转换(按北京时间处理) last_change_time = datetime.strptime(htime_str, "%Y-%m-%d %H:%M:%S") except ValueError: logger.warning(f"无法解析 htime: {htime_str}") logger.debug(f"泵状态查询: {pump_name or point} = {'运行' if is_running else '停机'} (变化时间={htime_str})") return is_running, last_change_time # 接口返回成功但无数据 logger.warning(f"泵状态查询无数据: {pump_name or point}") else: logger.warning(f"泵状态查询HTTP错误: {pump_name or point} | status={response.status_code}") except Exception as e: logger.warning(f"泵状态查询失败: {pump_name or point} | {e}") # 查询失败时默认返回停机状态 logger.warning(f"泵状态查询: {pump_name or point} | 查询失败,默认视为停机") return False, None def update_pump_state(self, pump_id, point, pump_name=""): """ 更新并检测泵状态变化 状态变化时间由本地记录,不依赖接口返回的 htime(那是数据采集时间)。 参数: pump_id: 泵唯一标识 point: SCADA点位 pump_name: 泵名称 返回: (当前状态 True/False, 最后状态变化时间) """ # 限制查询频率 now = datetime.now() last_query = self.last_query_time.get(pump_id) if last_query: elapsed = (now - last_query).total_seconds() if elapsed < self.min_query_interval_seconds: # 使用缓存状态 cached_state = self.current_states.get(pump_id) cached_time = self.state_change_time.get(pump_id) return cached_state, cached_time # 查询状态(实时接口只返回当前状态,不返回状态变化时间) new_state, _ = self.query_pump_status(point, pump_name) self.last_query_time[pump_id] = now # 检测状态变化 old_state = self.current_states.get(pump_id) state_change_time = self.state_change_time.get(pump_id) if old_state is None: # 首次查询:初始化缓存状态 # 假设泵已经稳定运行,不设置过渡期(避免程序启动时所有泵都被认为在过渡期) state_change_time = None # None 表示不在过渡期 logger.info(f"泵状态初始化: {pump_name or pump_id} = {'运行' if new_state else '停机'}") elif new_state != old_state: # 状态变化:记录当前时间为变化时间 state_change_time = now event = "启动" if new_state else "停机" logger.info(f"泵状态变化: {pump_name or pump_id} {event} | 进入过渡期") # 更新缓存 self.current_states[pump_id] = new_state self.state_change_time[pump_id] = state_change_time return new_state, state_change_time def is_in_transition(self, pump_id): """ 检查泵是否处于启停过渡期 基于本地记录的状态变化时间判断。 参数: pump_id: 泵唯一标识 返回: True=过渡期内(最近N分钟有状态变化), False=稳定状态 """ last_change_time = self.state_change_time.get(pump_id) if not last_change_time: # None 表示首次查询或无状态变化记录,视为稳定状态 return False # 使用本地时间比较(变化时间是本地记录的) elapsed_minutes = (datetime.now() - last_change_time).total_seconds() / 60 return elapsed_minutes < self.transition_window_minutes def check_pumps_transition(self, pump_configs): """ 检查多个泵是否有任何处于启停过渡期 参数: pump_configs: 泵配置列表 [{"point": "...", "name": "..."}, ...] 返回: (是否有泵在过渡期, 过渡期泵名称列表) """ in_transition = False transition_pumps = [] for pump_cfg in pump_configs: point = pump_cfg.get("point", "") name = pump_cfg.get("name", point) pump_id = point # 使用点位作为唯一ID # 更新状态 self.update_pump_state(pump_id, point, name) # 检查是否过渡期 if self.is_in_transition(pump_id): in_transition = True transition_pumps.append(name) return in_transition, transition_pumps