| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 |
- import requests
- import time
- def access_plc_query_api(url: str,headers:dict, data: list, max_retries=2):
- """调用bge-m3,embedding"""
- # 类型检查
- time.sleep(0.010) # 方式频繁调用接口
- for attempt in range(max_retries):
- try:
- response = requests.post(url=url, headers=headers, json=data)
- if response.status_code == 200:
- return response.json()
- except Exception as e:
- print('请求PLC点位查询接口失败!', e)
- time.sleep(1)
- return None
- return None
- def sparse_plc_api_response(response):
- data = response.get('data')
- code = response.get('code')
- if int(code) != 200 or data is None:
- print(f"访问plc接口失败,状态码:{code}, 返回数据有效:{bool(data)}")
- return None
- # 提取名称,时间和值
- plc_query_results = [] # 查询结果
- for item in data:
- plc_query_results.append(f"PLC点位名称:{item.get('alias')},记录时间:{item.get('htime')}, 值:{item.get('val')}")
- return plc_query_results
- def post_req_2_plc_api(project_id:int,database_codes:list):
- headers = {"Content-Type": "application/json",
- "JWT-TOKEN": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJJRCI6NywiVXNlcm5hbWUiOiJhZG1pbiIsIkRlcCI6IjEzNSIsImV4cCI6MTc3NjExOTExNCwiaXNzIjoiZ2luLWJsb2cifQ.0HTtzHZjyd2mHo8VCy8icYROxmntRMuQhyoZsAYRL_M"}
- # 构造数据
- data = [{"deviceId":"1","deviceItems":db_code,"deviceName":"外供水PH","project_id":project_id} for db_code in database_codes]
- # 查询结果
- resp = access_plc_query_api(url='http://120.55.44.4:8900/api/v1/jinke-cloud/device/current-data?time=1739859286292',
- headers=headers, data=data)
- # 解析结果
- res = sparse_plc_api_response(resp)
- return res
- if '__main__' == __name__:
- res = post_req_2_plc_api(1420,['ns=3;s=1#RO_CSDD_O','ns=3;s=2#RO_CSDD_O'])
- print('请求结果', res)
|