data_provider.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. """
  2. 统一数据获取层。自动登录获取 JWT Token,支持过期刷新。
  3. """
  4. import hashlib
  5. import json
  6. import logging
  7. import time
  8. from datetime import datetime, timedelta
  9. from typing import Any, Dict, List, Optional
  10. import numpy as np
  11. import pandas as pd
  12. import requests
  13. from core.config import PipelineConfig
  14. logger = logging.getLogger(__name__)
  15. class DataProvider:
  16. """统一 API / DB 数据访问层,启动时登录一次获取 Token。"""
  17. def __init__(self, config: PipelineConfig):
  18. self._config = config
  19. self._api_base = config.api_base_url
  20. self._project_id = str(config.project_id)
  21. self._scada_secret = config.scada_secret
  22. # 登录凭证
  23. self._login_user = config.login_user
  24. self._login_password = config.login_password
  25. self._login_dep_id = config.login_dep_id
  26. # Token 状态(启动时登录)
  27. self._token: Optional[str] = None
  28. self._headers: Dict[str, str] = {"Content-Type": "application/json"}
  29. self._db_engine = None
  30. self._login()
  31. # ======================== Token ========================
  32. def _login(self) -> None:
  33. """调用登录接口获取 JWT Token。启动时调用一次。"""
  34. url = f"{self._api_base}/api/v2/user/login"
  35. payload = {
  36. "UserName": self._login_user,
  37. "Password": self._login_password,
  38. "type": "account",
  39. "DepId": self._login_dep_id,
  40. }
  41. try:
  42. resp = requests.post(url, json=payload, timeout=15)
  43. resp.raise_for_status()
  44. data = resp.json()
  45. if data.get("code") != 200:
  46. raise RuntimeError(f"登录失败: {data.get('msg', '未知错误')}")
  47. # 响应 data 可能是 dict 或直接的 token 字符串
  48. resp_data = data.get("data")
  49. if isinstance(resp_data, dict):
  50. self._token = resp_data.get("token") or resp_data.get("Token") or resp_data.get("access_token")
  51. elif isinstance(resp_data, str):
  52. self._token = resp_data
  53. else:
  54. self._token = data.get("token")
  55. if not self._token:
  56. raise RuntimeError(f"登录响应中未找到 token, resp keys: {list(data.keys())}, data type: {type(resp_data)}")
  57. self._headers["JWT-TOKEN"] = self._token
  58. logger.info(f"登录成功, token={str(self._token)[:20]}...")
  59. except Exception as e:
  60. logger.error(f"登录异常: {e}")
  61. raise
  62. def _request(self, method: str, url: str, **kwargs) -> Optional[requests.Response]:
  63. """统一请求封装,自带 Token。"""
  64. kwargs.setdefault("headers", self._headers)
  65. kwargs.setdefault("timeout", 60)
  66. try:
  67. return getattr(requests, method)(url, **kwargs)
  68. except requests.exceptions.RequestException as e:
  69. logger.error(f"请求失败: {e}")
  70. return None
  71. # ======================== 历史数据查询 ========================
  72. def query_single_point_history(
  73. self, item_name: str, start_time: datetime, end_time: datetime,
  74. interval: str = "s",
  75. ) -> pd.Series:
  76. """查询单个 PLC 点位的历史数据。interval: s/minute/hour 等。"""
  77. params = {
  78. "deviceid": "1",
  79. "dataitemid": item_name,
  80. "project_id": self._project_id,
  81. "stime": int(start_time.timestamp() * 1000),
  82. "etime": int(end_time.timestamp() * 1000),
  83. "size": "1",
  84. "interval": interval,
  85. "aggregator": "new",
  86. }
  87. url = f"{self._api_base}/api/v1/jinke-cloud/db/device/history-data"
  88. resp = self._request("get", url, params=params)
  89. if resp is None:
  90. return pd.Series(dtype=float)
  91. try:
  92. data = resp.json()
  93. if data.get("code") != 200 or not data.get("data"):
  94. return pd.Series(dtype=float)
  95. times, vals = [], []
  96. for item in data["data"]:
  97. if item.get("val") is not None and item.get("htime_at") is not None:
  98. try:
  99. times.append(item["htime_at"])
  100. vals.append(float(item["val"]))
  101. except (ValueError, TypeError):
  102. pass
  103. if not times:
  104. return pd.Series(dtype=float)
  105. s = pd.Series(vals, index=pd.to_datetime(times, format="mixed", errors="coerce"))
  106. return s[~s.index.isna()].sort_index()
  107. except Exception as e:
  108. logger.warning(f"解析点位 {item_name} 响应失败: {e}")
  109. return pd.Series(dtype=float)
  110. def query_points_history(
  111. self, points: List[str], duration_minutes: int = 40, sample_interval: int = 4
  112. ) -> pd.DataFrame:
  113. """批量查询多个 PLC 点位的历史数据,降采样后合并为 DataFrame。"""
  114. query_duration = duration_minutes + 1
  115. end_time = datetime.now()
  116. start_time = end_time - timedelta(minutes=query_duration)
  117. time_index = pd.date_range(start=start_time, end=end_time, freq=f"{sample_interval}s")
  118. result = {"index": time_index}
  119. success, empty = 0, 0
  120. for i, pt in enumerate(points):
  121. if (i + 1) % 30 == 0 or (i + 1) == len(points):
  122. logger.info(f"批量查询进度: [{i+1}/{len(points)}]")
  123. raw = self.query_single_point_history(pt, start_time, end_time)
  124. if raw.empty:
  125. result[pt] = np.nan
  126. empty += 1
  127. else:
  128. resampled = raw.resample(f"{sample_interval}s").mean()
  129. resampled = resampled.reindex(
  130. time_index,
  131. method="nearest",
  132. tolerance=pd.Timedelta(seconds=sample_interval),
  133. )
  134. result[pt] = resampled.ffill().bfill()
  135. success += 1
  136. logger.info(f"批量查询完成: {success} 成功, {empty} 无数据 / 共 {len(points)} 点位")
  137. df = pd.DataFrame(result)
  138. df.set_index("index", inplace=True)
  139. df = df.fillna(0)
  140. return df
  141. # ======================== 当前值查询 ========================
  142. def query_point_current(self, payload: dict) -> Optional[float]:
  143. """获取单个设备数据项的当前值。"""
  144. url = f"{self._api_base}/api/v1/jinke-cloud/device/current-data"
  145. resp = self._request("post", url, json=[payload], timeout=10)
  146. if resp is None:
  147. return None
  148. try:
  149. data = resp.json()
  150. if data.get("code") == 200 and data.get("data"):
  151. val_str = data["data"][0].get("val")
  152. if val_str is not None:
  153. return float(val_str)
  154. except Exception as e:
  155. logger.error(f"获取当前值失败 {payload.get('deviceItems', '?')}: {e}")
  156. return None
  157. # ======================== PLC 写入 ========================
  158. def send_plc_update(
  159. self, device_name: str, item: str, old_value, new_value, command_type: int
  160. ) -> bool:
  161. """向 PLC 发送参数更新指令。"""
  162. record_obj = {
  163. "project_id": int(self._project_id),
  164. "item": item,
  165. "old_value": str(old_value),
  166. "new_value": str(new_value),
  167. "command_type": command_type,
  168. }
  169. record_data = json.dumps([record_obj])
  170. ts = int(time.time())
  171. signature = hashlib.md5(
  172. f"{record_data}{self._scada_secret}{ts}".encode()
  173. ).hexdigest().upper()
  174. url = f"{self._api_base}/api/v1/plc/set-var-values?sign={signature}&timestamp={ts}"
  175. for attempt in range(1, 4):
  176. try:
  177. resp = requests.post(url, json=[record_obj], timeout=15)
  178. rj = resp.json()
  179. if rj.get("code") == 200:
  180. logger.info(f"[{device_name}] PLC指令成功 {item}: {old_value} -> {new_value}")
  181. return True
  182. logger.error(f"[{device_name}] PLC指令失败: {rj.get('msg')}")
  183. except Exception as e:
  184. logger.error(f"[{device_name}] PLC指令异常: {e}")
  185. if attempt < 3:
  186. time.sleep(60)
  187. return False
  188. # ======================== 回调 ========================
  189. def send_callback(self, type_name: str, **kwargs) -> Optional[int]:
  190. """发送决策结果到回调接口,返回 use_model 状态。"""
  191. url = f"{self._api_base}/api/dtgateway/v1/decision/data"
  192. payload = {"list": [{"type": type_name, "project_id": int(self._project_id), **kwargs}]}
  193. for attempt in range(1, 4):
  194. try:
  195. resp = requests.post(url, headers=self._headers, json=payload, timeout=15)
  196. resp.raise_for_status()
  197. rj = resp.json()
  198. use_model = rj.get("data")
  199. logger.info(f"[{type_name}] 回调成功, use_model={use_model}")
  200. return use_model
  201. except Exception as e:
  202. logger.error(f"[{type_name}] 回调失败: {e}")
  203. if attempt < 3:
  204. time.sleep(60)
  205. return None