| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- """
- 统一数据获取层。自动登录获取 JWT Token,支持过期刷新。
- """
- import hashlib
- import json
- import logging
- import time
- from datetime import datetime, timedelta
- from typing import Any, Dict, List, Optional
- import numpy as np
- import pandas as pd
- import requests
- from core.config import PipelineConfig
- logger = logging.getLogger(__name__)
- class DataProvider:
- """统一 API / DB 数据访问层,启动时登录一次获取 Token。"""
- def __init__(self, config: PipelineConfig):
- self._config = config
- self._api_base = config.api_base_url
- self._project_id = str(config.project_id)
- self._scada_secret = config.scada_secret
- # 登录凭证
- self._login_user = config.login_user
- self._login_password = config.login_password
- self._login_dep_id = config.login_dep_id
- # Token 状态(启动时登录)
- self._token: Optional[str] = None
- self._headers: Dict[str, str] = {"Content-Type": "application/json"}
- self._db_engine = None
- self._login()
- # ======================== Token ========================
- def _login(self) -> None:
- """调用登录接口获取 JWT Token。启动时调用一次。"""
- url = f"{self._api_base}/api/v2/user/login"
- payload = {
- "UserName": self._login_user,
- "Password": self._login_password,
- "type": "account",
- "DepId": self._login_dep_id,
- }
- try:
- resp = requests.post(url, json=payload, timeout=15)
- resp.raise_for_status()
- data = resp.json()
- if data.get("code") != 200:
- raise RuntimeError(f"登录失败: {data.get('msg', '未知错误')}")
- # 响应 data 可能是 dict 或直接的 token 字符串
- resp_data = data.get("data")
- if isinstance(resp_data, dict):
- self._token = resp_data.get("token") or resp_data.get("Token") or resp_data.get("access_token")
- elif isinstance(resp_data, str):
- self._token = resp_data
- else:
- self._token = data.get("token")
- if not self._token:
- raise RuntimeError(f"登录响应中未找到 token, resp keys: {list(data.keys())}, data type: {type(resp_data)}")
- self._headers["JWT-TOKEN"] = self._token
- logger.info(f"登录成功, token={str(self._token)[:20]}...")
- except Exception as e:
- logger.error(f"登录异常: {e}")
- raise
- def _request(self, method: str, url: str, **kwargs) -> Optional[requests.Response]:
- """统一请求封装,自带 Token。"""
- kwargs.setdefault("headers", self._headers)
- kwargs.setdefault("timeout", 60)
- try:
- return getattr(requests, method)(url, **kwargs)
- except requests.exceptions.RequestException as e:
- logger.error(f"请求失败: {e}")
- return None
- # ======================== 历史数据查询 ========================
- def query_single_point_history(
- self, item_name: str, start_time: datetime, end_time: datetime
- ) -> pd.Series:
- """查询单个 PLC 点位的历史数据(按秒采样)。"""
- params = {
- "deviceid": "1",
- "dataitemid": item_name,
- "project_id": self._project_id,
- "stime": int(start_time.timestamp() * 1000),
- "etime": int(end_time.timestamp() * 1000),
- "size": "1",
- "interval": "s",
- "aggregator": "new",
- }
- url = f"{self._api_base}/api/v1/jinke-cloud/db/device/history-data"
- resp = self._request("get", url, params=params)
- if resp is None:
- return pd.Series(dtype=float)
- try:
- data = resp.json()
- if data.get("code") != 200 or not data.get("data"):
- return pd.Series(dtype=float)
- times, vals = [], []
- for item in data["data"]:
- if item.get("val") is not None and item.get("htime_at") is not None:
- try:
- times.append(item["htime_at"])
- vals.append(float(item["val"]))
- except (ValueError, TypeError):
- pass
- if not times:
- return pd.Series(dtype=float)
- s = pd.Series(vals, index=pd.to_datetime(times, format="mixed", errors="coerce"))
- return s[~s.index.isna()].sort_index()
- except Exception as e:
- logger.warning(f"解析点位 {item_name} 响应失败: {e}")
- return pd.Series(dtype=float)
- def query_points_history(
- self, points: List[str], duration_minutes: int = 40, sample_interval: int = 4
- ) -> pd.DataFrame:
- """批量查询多个 PLC 点位的历史数据,降采样后合并为 DataFrame。"""
- query_duration = duration_minutes + 1
- end_time = datetime.now()
- start_time = end_time - timedelta(minutes=query_duration)
- time_index = pd.date_range(start=start_time, end=end_time, freq=f"{sample_interval}s")
- result = {"index": time_index}
- success, empty = 0, 0
- for i, pt in enumerate(points):
- if (i + 1) % 30 == 0 or (i + 1) == len(points):
- logger.info(f"批量查询进度: [{i+1}/{len(points)}]")
- raw = self.query_single_point_history(pt, start_time, end_time)
- if raw.empty:
- result[pt] = np.nan
- empty += 1
- else:
- resampled = raw.resample(f"{sample_interval}s").mean()
- resampled = resampled.reindex(
- time_index,
- method="nearest",
- tolerance=pd.Timedelta(seconds=sample_interval),
- )
- result[pt] = resampled.ffill().bfill()
- success += 1
- logger.info(f"批量查询完成: {success} 成功, {empty} 无数据 / 共 {len(points)} 点位")
- df = pd.DataFrame(result)
- df.set_index("index", inplace=True)
- df = df.fillna(0)
- return df
- # ======================== 当前值查询 ========================
- def query_point_current(self, payload: dict) -> Optional[float]:
- """获取单个设备数据项的当前值。"""
- url = f"{self._api_base}/api/v1/jinke-cloud/device/current-data"
- resp = self._request("post", url, json=[payload], timeout=10)
- if resp is None:
- return None
- try:
- data = resp.json()
- if data.get("code") == 200 and data.get("data"):
- val_str = data["data"][0].get("val")
- if val_str is not None:
- return float(val_str)
- except Exception as e:
- logger.error(f"获取当前值失败 {payload.get('deviceItems', '?')}: {e}")
- return None
- # ======================== PLC 写入 ========================
- def send_plc_update(
- self, device_name: str, item: str, old_value, new_value, command_type: int
- ) -> bool:
- """向 PLC 发送参数更新指令。"""
- record_obj = {
- "project_id": int(self._project_id),
- "item": item,
- "old_value": str(old_value),
- "new_value": str(new_value),
- "command_type": command_type,
- }
- record_data = json.dumps([record_obj])
- ts = int(time.time())
- signature = hashlib.md5(
- f"{record_data}{self._scada_secret}{ts}".encode()
- ).hexdigest().upper()
- url = f"{self._api_base}/api/v1/plc/set-var-values?sign={signature}×tamp={ts}"
- for attempt in range(1, 4):
- try:
- resp = requests.post(url, json=[record_obj], timeout=15)
- rj = resp.json()
- if rj.get("code") == 200:
- logger.info(f"[{device_name}] PLC指令成功 {item}: {old_value} -> {new_value}")
- return True
- logger.error(f"[{device_name}] PLC指令失败: {rj.get('msg')}")
- except Exception as e:
- logger.error(f"[{device_name}] PLC指令异常: {e}")
- if attempt < 3:
- time.sleep(60)
- return False
- # ======================== 回调 ========================
- def send_callback(self, type_name: str, **kwargs) -> Optional[int]:
- """发送决策结果到回调接口,返回 use_model 状态。"""
- url = f"{self._api_base}/api/dtgateway/v1/decision/data"
- payload = {"list": [{"type": type_name, "project_id": int(self._project_id), **kwargs}]}
- for attempt in range(1, 4):
- try:
- resp = requests.post(url, headers=self._headers, json=payload, timeout=15)
- resp.raise_for_status()
- rj = resp.json()
- use_model = rj.get("data")
- logger.info(f"[{type_name}] 回调成功, use_model={use_model}")
- return use_model
- except Exception as e:
- logger.error(f"[{type_name}] 回调失败: {e}")
- if attempt < 3:
- time.sleep(60)
- return None
|