| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- import sys
- sys.path.append("..")
- import config
- import os
- import requests
- import time
- import csv
- from datetime import datetime
- import shutil
- import json
- class DataHelper:
- """采用爬虫方式,动态获取smart-water网站某项目的各传感器数据库标签和对应的中文名称
- 项目代码 :92, 锡山中荷污水再生水项目
- """
- def __init__(self,
- project_id = config.PROJECT_ID,
- username = config.USERNAME,
- password = config.PASSWORD,
- dep_id = config.DEP_ID,
- base_url = config.BASE_URL,
- out_path = config.ALL_ITEMS_FILE_DIR,
- out_file_name = config.ALL_ITEMS_FILE_NAME,
- save_path_final = config.ALL_ITEMS_FILE_PATH,
- max_pages = config.MAX_PAGES,
- page_size = config.PAGE_SIZE,
- include_head = config.INCLUDE_HEAD
- ):
- print('开始获取项目所有的数据编号...')
- self.username = username
- self.password = password
- self.dep_id = dep_id
- self.project_id = project_id
- self.BASE_URL = base_url #smart-water 网站首页
- self.out_path = out_path
- self.out_file_name = out_file_name
- self.max_pages = int(max_pages)
- self.page_size = int(page_size)
- self.token = None
- self.include_head = include_head
- self.save_path_tem = os.path.join(self.out_path,'tem_' + self.out_file_name)
- self.save_path_final = save_path_final
- self.start_time = time.time()
- self.end_time = self.start_time
- # 清理上一次执行的结果文件
- if os.path.exists(self.save_path_tem) or os.path.exists(self.save_path_final):
- print(f'清理缓存文件...')
- if os.path.exists(self.save_path_final):
- os.remove(self.save_path_final)
- print(f'清理 {self.save_path_final}')
- if os.path.exists(self.save_path_tem):
- os.remove(self.save_path_tem)
- print(f'清理 {self.save_path_tem}')
- def login_smart_water(self):
- login_url = f"{self.BASE_URL}/api/v2/user/login" # smart-water 登陆页面
- login_headers = { # 登陆请求头
- "Accept": "application/json",
- "Content-Type": "application/json;charset=utf-8",
- "Cookie": "lang=zh-CN",
- "Origin": self.BASE_URL,
- "Referer": f"{self.BASE_URL}/",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36"
- }
- login_params = { # 请求参数
- "username": self.username,
- "password": self.password,
- "type": "account",
- "DepId": self.dep_id # 部门ID
- }
- try:
- # 尝试登陆
- response = requests.post(login_url, json=login_params, headers=login_headers)
- response.raise_for_status() # 检查HTTP错误
- data = response.json()
- token = data['data']['token']
- self.token = token if token != '' else None
- if self.token is not None:
- print(f'{self.username} 登陆成功! \n获取token {self.token}')
- else:
- print(f'{self.username} 登陆失败!')
- except requests.exceptions.HTTPError as errh:
- print("HTTP Error:", errh)
- except requests.exceptions.ConnectionError as errc:
- print("Error Connecting:", errc)
- except requests.exceptions.Timeout as errt:
- print("Timeout Error:", errt)
- except requests.exceptions.RequestException as err:
- print("OOps: Something Else", err)
- return None
- @staticmethod
- def write_file(handler, data: list):
- write_cnt = 0
- for label in data:
- # '名称', '编码', '单位' , '精度', '是否枚举', '设备号'
- csv.writer(handler).writerow([label['ItemAlias'], label['ItemName'], label['ItemUnit'], label['ItemPrecise'], int(label['IsBool']), label['DeviceCode']])
- write_cnt += 1
- return write_cnt
- @staticmethod
- def format_chinese_datetime(dt=None):
- """格式化日期时间为中文格式"""
- if dt is None:
- dt = datetime.now()
- # 提取日期时间各部分
- year = dt.year
- month = dt.month
- day = dt.day
- hour = dt.hour
- minute = dt.minute
- # 格式化为中文
- return f"{year}年{month}月{day}日 {hour:02d}:{minute:02d}"
- def get_all_label(self):
- if self.token is None:
- self.login_smart_water()
- label_url = f"{self.BASE_URL}/api/v1/config/device-realtime-plc-item/list/{self.project_id}" # 数据抓取页面
- headers = {
- 'Accept': '*/*',
- 'Accept-Encoding': 'gzip, deflate',
- 'Accept-Language': 'zh-CN,zh;q=0.9',
- 'Connection': 'keep-alive',
- 'Cookie': 'lang=zh-CN',
- 'Host': '120.55.44.4:8900',
- 'JWT-TOKEN': self.token,
- 'Referer': 'http://120.55.44.4:8900/',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36'
- }
- with requests.Session() as session:
- session.headers.update(headers)
- failed_cnt = 0
- # 爬取每个页面
- with open(self.save_path_tem, mode='a', encoding='utf-8', newline='') as file_handler:
- # 按照'名称', '编码', '单位' , '精度', '设备号' 格式保存数据
- print('准备写入数据...')
- csv.writer(file_handler).writerow(['名称', '编码', '单位' , '精度', '是否枚举', '设备号'])
- pages = 1
- total_write_cnt = 0
- while pages <= self.max_pages:
- try:
- params = {
- 'currentPage': f'{pages}',
- 'pageSize': f'{self.page_size}',
- 'ProjectId': self.project_id,
- 'time': int(time.time() * 1000)
- }
- response = session.get(label_url, params=params)
- response.raise_for_status()
- result = response.json()
- if result.get('code') == 603: # token 过期就重新登录一次
- self.login_smart_water()
- headers['JWT-TOKEN'] = self.token
- session.headers.update(headers)
- if result.get('code') == 200:
- print(f'时间:{params['time']} 页码:{params['currentPage']}, 网页数据获取成功, 写入文件')
- label_list = result['data']['list']
- total_write_cnt += self.write_file(file_handler, label_list)
- pages += 1
- except requests.exceptions.HTTPError as errh:
- print("HTTP Error:", errh)
- failed_cnt += 1
- except requests.exceptions.ConnectionError as errc:
- print("Error Connecting:", errc)
- failed_cnt += 1
- except requests.exceptions.Timeout as errt:
- print("Timeout Error:", errt)
- failed_cnt += 1
- except requests.exceptions.RequestException as err:
- print("OOps: Something Else", err)
- failed_cnt += 1
- finally:
- if failed_cnt >= 3 :
- print('失败次数达到3次, 自动退出!')
- break
- print(f'数据写入完成,写入网页数量为{pages - 1}页,{total_write_cnt}条数据记录!')
- # 写最终文件
- self.end_time = time.time()
- total_time = round(self.end_time - self.start_time, 2)
- current_date = self.format_chinese_datetime()
- stat_info = f"# 项目编号: {self.project_id}, 获取日期: {current_date}, 总记录数量: {total_write_cnt}, 总耗时: {total_time}s"
- with open(self.save_path_tem, mode='r', encoding='utf-8') as file_handler:
- with open(self.save_path_final, mode='w', encoding='utf-8', newline='') as final_file_handler:
- if self.include_head: final_file_handler.write(stat_info + '\n')
- # 复制临时文件内容到最终文件
- shutil.copyfileobj(file_handler, final_file_handler)
- os.unlink(self.save_path_tem)
- print('all-items文件写入成功:',self.save_path_final)
- def get_name_code_transfer(self):
- """生成code和name之间的转换文件"""
- total_name_to_code = {'name_2_code': {},
- 'code_2_name': {},
- 'len': 0}
- if not os.path.exists(self.save_path_final):
- raise RuntimeError('文件不存在:', self.save_path_final)
- file_path_out = config.TRANSFER_JSON_NAME
- # file_path_out = self.save_path_final[:-4] + '_name_code_transfer.json'
- if os.path.exists(file_path_out):
- print('清理历史文件:', file_path_out)
- os.remove(file_path_out)
- with open(self.save_path_final, 'r', encoding='utf-8') as file_handler:
- csv_reader = csv.reader(file_handler)
- if self.include_head:
- try:
- next(csv_reader)
- except StopIteration:
- pass
- try:
- next(csv_reader)
- except StopIteration:
- pass
- for row in csv_reader:
- total_name_to_code.get('name_2_code').update({row[0].strip(): row[1].strip()})
- total_name_to_code['len'] += 1
- total_name_to_code.get('code_2_name').update({v: k for k, v in total_name_to_code.get('name_2_code').items()})
- with open(file_path_out, 'w', encoding="utf-8",newline='') as f:
- json.dump(total_name_to_code, f, ensure_ascii=False, indent=4)
- print('name-code字典文件写入成功:',file_path_out)
- if __name__ == '__main__':
- # 从智慧水萝卜网站获取数据库中的数据字段英文编号和中文名称
- dh = DataHelper()
- dh.get_all_label()
- # 生成code-name字典文件
- dh.get_name_code_transfer()
|