pump_state_monitor.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. # -*- coding: utf-8 -*-
  2. """
  3. pump_state_monitor.py - 泵状态监控模块
  4. ================================================================================
  5. 使用示例
  6. ================================================================================
  7. # 初始化
  8. from pump_state_monitor import PumpStateMonitor
  9. monitor = PumpStateMonitor(
  10. scada_url="http://47.96.12.136:8788/api/v1/jinke-cloud/device/current-data",
  11. scada_jwt="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJJRCI6NywiVXNlcm5hbWUiOiJhZG1pbiIsIkRlcCI6IjEzNSIsImV4cCI6MTc3NjExOTExNCwiaXNzIjoiZ2luLWJsb2cifQ.0HTtzHZjyd2mHo8VCy8icYROxmntRMuQhyoZsAYRL_M",
  12. project_id=92,
  13. transition_window_minutes=15
  14. )
  15. # 单泵查询
  16. is_running, _ = monitor.update_pump_state("pump_1", "C.M.RO1_GYB@run", "RO1高压泵")
  17. in_transition = monitor.is_in_transition("pump_1")
  18. # 多泵批量查询
  19. pump_configs = [
  20. {"point": "C.M.RO1_GYB@run", "name": "RO1高压泵"},
  21. {"point": "C.M.RO2_GYB@run", "name": "RO2高压泵"}
  22. ]
  23. in_transition, pumps = monitor.check_pumps_transition(pump_configs)
  24. # 业务逻辑
  25. for audio_file in get_audio_files():
  26. is_running, _ = monitor.update_pump_state("pump_1", "C.M.RO1_GYB@run", "RO1高压泵")
  27. if not is_running:
  28. skip_audio(audio_file) # 泵停机,跳过
  29. continue
  30. if monitor.is_in_transition("pump_1"):
  31. skip_audio(audio_file) # 过渡期,跳过
  32. continue
  33. process_audio(audio_file) # 泵运行且稳定,正常处理
  34. ================================================================================
  35. 时间线案例 (过渡期=15分钟)
  36. ================================================================================
  37. 10:00:00 首次调用 -> is_running=True, in_transition=False (初始化)
  38. 10:05:00 泵停机 -> is_running=False, in_transition=True (进入过渡期)
  39. 10:20:00 过渡期结束 -> is_running=False, in_transition=False (15分钟后)
  40. 10:25:00 泵启动 -> is_running=True, in_transition=True (再次进入过渡期)
  41. 10:40:00 过渡期结束 -> is_running=True, in_transition=False (可正常处理)
  42. ================================================================================
  43. 内部机制
  44. ================================================================================
  45. 1. 查询缓存: 30秒内同一泵只查询一次SCADA,调用方无需控制频率
  46. 2. 状态变化: 本地比较前后状态,变化时自动记录时间
  47. 3. 过渡期判定: 基于状态变化时间,默认15分钟窗口
  48. 4. 首次查询: 不视为过渡期(假设泵已稳定运行)
  49. 日志: 泵状态初始化/变化时输出 INFO 级别日志
  50. """
  51. import requests
  52. import logging
  53. from datetime import datetime, timedelta
  54. from collections import defaultdict
  55. logger = logging.getLogger(__name__)
  56. class PumpStateMonitor:
  57. """
  58. 泵状态监控器
  59. 功能:
  60. - 查询泵运行状态
  61. - 记录状态变化历史
  62. - 判断是否处于启停过渡期
  63. """
  64. def __init__(self, scada_url, scada_jwt, project_id,
  65. timeout=10, transition_window_minutes=15):
  66. """
  67. 初始化泵状态监控器
  68. 参数:
  69. scada_url: SCADA API地址
  70. scada_jwt: JWT认证Token
  71. project_id: 项目ID (用于API查询)
  72. timeout: 请求超时秒数
  73. transition_window_minutes: 启停过渡期窗口(默认 15分钟)
  74. """
  75. self.scada_url = scada_url
  76. self.scada_jwt = scada_jwt
  77. self.project_id = project_id
  78. self.timeout = timeout
  79. self.transition_window_minutes = transition_window_minutes
  80. # 状态缓存: {pump_id: True/False}
  81. self.current_states = {}
  82. # 最后状态变化时间缓存: {pump_id: datetime}
  83. # 注意: 由本地记录,不依赖接口返回的 htime(那是数据采集时间,不是状态变化时间)
  84. self.state_change_time = {}
  85. # 上次查询时间(避免频繁查询)
  86. self.last_query_time = {}
  87. self.min_query_interval_seconds = 30 # 30 秒查询一次
  88. def query_pump_status(self, point, pump_name=""):
  89. """
  90. 查询单个泵的运行状态(使用实时数据接口)
  91. 使用 current-data 接口直接获取最新一条数据,无需时间窗口查询。
  92. 参数:
  93. point: 点位标识, 如 "C.M.RO1_GYB@run"
  94. pump_name: 泵名称, 用于日志
  95. 返回:
  96. (is_running, last_change_time):
  97. is_running: True=运行中, False=停机
  98. last_change_time: 最后一次状态变化时间 (来自 htime 字段)
  99. """
  100. headers = {
  101. "Content-Type": "application/json",
  102. "JWT-TOKEN": self.scada_jwt
  103. }
  104. # 当前时间戳(毫秒)
  105. now_ms = int(datetime.now().timestamp() * 1000)
  106. # 请求参数
  107. params = {"time": now_ms}
  108. # 请求体:使用实时数据接口格式
  109. request_body = [
  110. {
  111. "deviceId": "1",
  112. "deviceItems": point,
  113. "deviceName": pump_name or point,
  114. "project_id": self.project_id
  115. }
  116. ]
  117. try:
  118. response = requests.post(
  119. self.scada_url,
  120. params=params,
  121. json=request_body,
  122. headers=headers,
  123. timeout=self.timeout
  124. )
  125. if response.status_code == 200:
  126. data = response.json()
  127. if data.get("code") == 200 and data.get("data"):
  128. # 获取第一条数据(实时接口只返回最新一条)
  129. latest = data["data"][0]
  130. if "val" in latest:
  131. val = int(float(latest["val"]))
  132. is_running = val == 1
  133. # 解析 htime 时间字段(实时接口返回的是北京时间字符串)
  134. htime_str = latest.get("htime", "")
  135. last_change_time = None
  136. if htime_str:
  137. try:
  138. # 直接解析,不做时区转换(按北京时间处理)
  139. last_change_time = datetime.strptime(htime_str, "%Y-%m-%d %H:%M:%S")
  140. except ValueError:
  141. logger.warning(f"无法解析 htime: {htime_str}")
  142. logger.debug(f"泵状态查询: {pump_name or point} = {'运行' if is_running else '停机'} (变化时间={htime_str})")
  143. return is_running, last_change_time
  144. # 接口返回成功但无数据
  145. logger.warning(f"泵状态查询无数据: {pump_name or point}")
  146. else:
  147. logger.warning(f"泵状态查询HTTP错误: {pump_name or point} | status={response.status_code}")
  148. except Exception as e:
  149. logger.warning(f"泵状态查询失败: {pump_name or point} | {e}")
  150. # 查询失败时默认返回停机状态
  151. logger.warning(f"泵状态查询: {pump_name or point} | 查询失败,默认视为停机")
  152. return False, None
  153. def update_pump_state(self, pump_id, point, pump_name=""):
  154. """
  155. 更新并检测泵状态变化
  156. 状态变化时间由本地记录,不依赖接口返回的 htime(那是数据采集时间)。
  157. 参数:
  158. pump_id: 泵唯一标识
  159. point: SCADA点位
  160. pump_name: 泵名称
  161. 返回:
  162. (当前状态 True/False, 最后状态变化时间)
  163. """
  164. # 限制查询频率
  165. now = datetime.now()
  166. last_query = self.last_query_time.get(pump_id)
  167. if last_query:
  168. elapsed = (now - last_query).total_seconds()
  169. if elapsed < self.min_query_interval_seconds:
  170. # 使用缓存状态
  171. cached_state = self.current_states.get(pump_id)
  172. cached_time = self.state_change_time.get(pump_id)
  173. return cached_state, cached_time
  174. # 查询状态(实时接口只返回当前状态,不返回状态变化时间)
  175. new_state, _ = self.query_pump_status(point, pump_name)
  176. self.last_query_time[pump_id] = now
  177. # 检测状态变化
  178. old_state = self.current_states.get(pump_id)
  179. state_change_time = self.state_change_time.get(pump_id)
  180. if old_state is None:
  181. # 首次查询:初始化缓存状态
  182. # 假设泵已经稳定运行,不设置过渡期(避免程序启动时所有泵都被认为在过渡期)
  183. state_change_time = None # None 表示不在过渡期
  184. logger.info(f"泵状态初始化: {pump_name or pump_id} = {'运行' if new_state else '停机'}")
  185. elif new_state != old_state:
  186. # 状态变化:记录当前时间为变化时间
  187. state_change_time = now
  188. event = "启动" if new_state else "停机"
  189. logger.info(f"泵状态变化: {pump_name or pump_id} {event} | 进入过渡期")
  190. # 更新缓存
  191. self.current_states[pump_id] = new_state
  192. self.state_change_time[pump_id] = state_change_time
  193. return new_state, state_change_time
  194. def is_in_transition(self, pump_id):
  195. """
  196. 检查泵是否处于启停过渡期
  197. 基于本地记录的状态变化时间判断。
  198. 参数:
  199. pump_id: 泵唯一标识
  200. 返回:
  201. True=过渡期内(最近N分钟有状态变化), False=稳定状态
  202. """
  203. last_change_time = self.state_change_time.get(pump_id)
  204. if not last_change_time:
  205. # None 表示首次查询或无状态变化记录,视为稳定状态
  206. return False
  207. # 使用本地时间比较(变化时间是本地记录的)
  208. elapsed_minutes = (datetime.now() - last_change_time).total_seconds() / 60
  209. return elapsed_minutes < self.transition_window_minutes
  210. def check_pumps_transition(self, pump_configs):
  211. """
  212. 检查多个泵是否有任何处于启停过渡期
  213. 参数:
  214. pump_configs: 泵配置列表 [{"point": "...", "name": "..."}, ...]
  215. 返回:
  216. (是否有泵在过渡期, 过渡期泵名称列表)
  217. """
  218. in_transition = False
  219. transition_pumps = []
  220. for pump_cfg in pump_configs:
  221. point = pump_cfg.get("point", "")
  222. name = pump_cfg.get("name", point)
  223. pump_id = point # 使用点位作为唯一ID
  224. # 更新状态
  225. self.update_pump_state(pump_id, point, name)
  226. # 检查是否过渡期
  227. if self.is_in_transition(pump_id):
  228. in_transition = True
  229. transition_pumps.append(name)
  230. return in_transition, transition_pumps