data_provider.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. """
  2. 统一数据获取层。自动登录获取 JWT Token,支持过期刷新。
  3. """
  4. import hashlib
  5. import json
  6. import logging
  7. import threading
  8. import time
  9. from datetime import datetime, timedelta
  10. from typing import Any, Dict, List, Optional
  11. import numpy as np
  12. import pandas as pd
  13. import requests
  14. from core.config import PipelineConfig
  15. logger = logging.getLogger(__name__)
  16. class DataProvider:
  17. """统一 API / DB 数据访问层,启动时登录一次获取 Token。"""
  18. def __init__(self, config: PipelineConfig):
  19. self._config = config
  20. self._api_base = config.api_base_url
  21. self._project_id = str(config.project_id)
  22. self._scada_secret = config.scada_secret
  23. # 登录凭证
  24. self._login_user = config.login_user
  25. self._login_password = config.login_password
  26. self._login_dep_id = config.login_dep_id
  27. # Token 状态(启动时登录)
  28. self._token: Optional[str] = None
  29. self._headers: Dict[str, str] = {"Content-Type": "application/json"}
  30. self._db_engine = None
  31. # API 限速:防止多线程并发请求导致竞争
  32. self._api_lock = threading.Lock()
  33. self._last_request_time = 0.0
  34. self._login()
  35. # ======================== Token ========================
  36. def _login(self) -> None:
  37. """调用登录接口获取 JWT Token。启动时调用一次。"""
  38. url = f"{self._api_base}/api/v2/user/login"
  39. payload = {
  40. "UserName": self._login_user,
  41. "Password": self._login_password,
  42. "type": "account",
  43. "DepId": self._login_dep_id,
  44. }
  45. try:
  46. resp = requests.post(url, json=payload, timeout=15)
  47. resp.raise_for_status()
  48. data = resp.json()
  49. if data.get("code") != 200:
  50. raise RuntimeError(f"登录失败: {data.get('msg', '未知错误')}")
  51. # 响应 data 可能是 dict 或直接的 token 字符串
  52. resp_data = data.get("data")
  53. if isinstance(resp_data, dict):
  54. self._token = resp_data.get("token") or resp_data.get("Token") or resp_data.get("access_token")
  55. elif isinstance(resp_data, str):
  56. self._token = resp_data
  57. else:
  58. self._token = data.get("token")
  59. if not self._token:
  60. raise RuntimeError(f"登录响应中未找到 token, resp keys: {list(data.keys())}, data type: {type(resp_data)}")
  61. self._headers["JWT-TOKEN"] = self._token
  62. logger.info(f"登录成功, token={str(self._token)[:20]}...")
  63. except Exception as e:
  64. logger.error(f"登录异常: {e}")
  65. raise
  66. def _request(self, method: str, url: str, **kwargs) -> Optional[requests.Response]:
  67. """统一请求封装,自带 Token,限速防并发竞争。"""
  68. with self._api_lock:
  69. elapsed = time.time() - self._last_request_time
  70. if elapsed < 0.5:
  71. time.sleep(0.5 - elapsed)
  72. self._last_request_time = time.time()
  73. kwargs.setdefault("headers", self._headers)
  74. kwargs.setdefault("timeout", 60)
  75. try:
  76. return getattr(requests, method)(url, **kwargs)
  77. except requests.exceptions.RequestException as e:
  78. logger.error(f"请求失败: {e}")
  79. return None
  80. # ======================== 历史数据查询 ========================
  81. def query_single_point_history(
  82. self, item_name: str, start_time: datetime, end_time: datetime,
  83. interval: str = "s",
  84. ) -> pd.Series:
  85. """查询单个 PLC 点位的历史数据。interval: s/minute/hour 等。"""
  86. params = {
  87. "deviceid": "1",
  88. "dataitemid": item_name,
  89. "project_id": self._project_id,
  90. "stime": int(start_time.timestamp() * 1000),
  91. "etime": int(end_time.timestamp() * 1000),
  92. "size": "1",
  93. "interval": interval,
  94. "aggregator": "new",
  95. }
  96. url = f"{self._api_base}/api/v1/jinke-cloud/db/device/history-data"
  97. resp = self._request("get", url, params=params)
  98. if resp is None:
  99. return pd.Series(dtype=float)
  100. try:
  101. data = resp.json()
  102. if data.get("code") != 200 or not data.get("data"):
  103. return pd.Series(dtype=float)
  104. times, vals = [], []
  105. for item in data["data"]:
  106. if item.get("val") is not None and item.get("htime_at") is not None:
  107. try:
  108. times.append(item["htime_at"])
  109. vals.append(float(item["val"]))
  110. except (ValueError, TypeError):
  111. pass
  112. if not times:
  113. return pd.Series(dtype=float)
  114. s = pd.Series(vals, index=pd.to_datetime(times, format="mixed", errors="coerce"))
  115. return s[~s.index.isna()].sort_index()
  116. except Exception as e:
  117. logger.warning(f"解析点位 {item_name} 响应失败: {e}")
  118. return pd.Series(dtype=float)
  119. def query_points_history(
  120. self, points: List[str], duration_minutes: int = 40, sample_interval: int = 4
  121. ) -> pd.DataFrame:
  122. """批量查询多个 PLC 点位的历史数据,降采样后合并为 DataFrame。"""
  123. query_duration = duration_minutes + 1
  124. end_time = datetime.now()
  125. start_time = end_time - timedelta(minutes=query_duration)
  126. time_index = pd.date_range(start=start_time, end=end_time, freq=f"{sample_interval}s")
  127. result = {"index": time_index}
  128. success, empty = 0, 0
  129. for i, pt in enumerate(points):
  130. if (i + 1) % 30 == 0 or (i + 1) == len(points):
  131. logger.info(f"批量查询进度: [{i+1}/{len(points)}]")
  132. raw = self.query_single_point_history(pt, start_time, end_time)
  133. if raw.empty:
  134. result[pt] = np.nan
  135. empty += 1
  136. else:
  137. resampled = raw.resample(f"{sample_interval}s").mean()
  138. resampled = resampled.reindex(
  139. time_index,
  140. method="nearest",
  141. tolerance=pd.Timedelta(seconds=sample_interval),
  142. )
  143. result[pt] = resampled.ffill().bfill()
  144. success += 1
  145. logger.info(f"批量查询完成: {success} 成功, {empty} 无数据 / 共 {len(points)} 点位")
  146. df = pd.DataFrame(result)
  147. df.set_index("index", inplace=True)
  148. df = df.fillna(0)
  149. return df
  150. # ======================== 当前值查询 ========================
  151. def query_point_current(self, payload: dict) -> Optional[float]:
  152. """获取单个设备数据项的当前值。"""
  153. url = f"{self._api_base}/api/v1/jinke-cloud/device/current-data"
  154. resp = self._request("post", url, json=[payload], timeout=10)
  155. if resp is None:
  156. return None
  157. try:
  158. data = resp.json()
  159. if data.get("code") == 200 and data.get("data"):
  160. val_str = data["data"][0].get("val")
  161. if val_str is not None:
  162. return float(val_str)
  163. except Exception as e:
  164. logger.error(f"获取当前值失败 {payload.get('deviceItems', '?')}: {e}")
  165. return None
  166. # ======================== PLC 写入 ========================
  167. def send_plc_update(
  168. self, device_name: str, item: str, old_value, new_value, command_type: int
  169. ) -> bool:
  170. """向 PLC 发送参数更新指令。"""
  171. record_obj = {
  172. "project_id": int(self._project_id),
  173. "item": item,
  174. "old_value": str(old_value),
  175. "new_value": str(new_value),
  176. "command_type": command_type,
  177. }
  178. record_data = json.dumps([record_obj])
  179. ts = int(time.time())
  180. signature = hashlib.md5(
  181. f"{record_data}{self._scada_secret}{ts}".encode()
  182. ).hexdigest().upper()
  183. url = f"{self._api_base}/api/v1/plc/set-var-values?sign={signature}&timestamp={ts}"
  184. for attempt in range(1, 4):
  185. try:
  186. resp = requests.post(url, json=[record_obj], timeout=15)
  187. rj = resp.json()
  188. if rj.get("code") == 200:
  189. logger.info(f"[{device_name}] PLC指令成功 {item}: {old_value} -> {new_value}")
  190. return True
  191. logger.error(f"[{device_name}] PLC指令失败: {rj.get('msg')}")
  192. except Exception as e:
  193. logger.error(f"[{device_name}] PLC指令异常: {e}")
  194. if attempt < 3:
  195. time.sleep(60)
  196. return False
  197. # ======================== 回调 ========================
  198. def send_callback(self, type_name: str, **kwargs) -> Optional[int]:
  199. """发送决策结果到回调接口,返回 use_model 状态。"""
  200. url = f"{self._api_base}/api/dtgateway/v1/decision/data"
  201. payload = {"list": [{"type": type_name, "project_id": int(self._project_id), **kwargs}]}
  202. for attempt in range(1, 4):
  203. try:
  204. resp = requests.post(url, headers=self._headers, json=payload, timeout=15)
  205. resp.raise_for_status()
  206. rj = resp.json()
  207. use_model = rj.get("data")
  208. logger.info(f"[{type_name}] 回调成功, use_model={use_model}")
  209. return use_model
  210. except Exception as e:
  211. logger.error(f"[{type_name}] 回调失败: {e}")
  212. if attempt < 3:
  213. time.sleep(60)
  214. return None