import sys sys.path.append("..") import config import os import requests import time import csv from datetime import datetime import shutil import json class DataHelper: """采用爬虫方式,动态获取smart-water网站某项目的各传感器数据库标签和对应的中文名称 项目代码 :92, 锡山中荷污水再生水项目 """ def __init__(self, project_id = config.PROJECT_ID, username = config.USERNAME, password = config.PASSWORD, dep_id = config.DEP_ID, base_url = config.BASE_URL, out_path = config.ALL_ITEMS_FILE_DIR, out_file_name = config.ALL_ITEMS_FILE_NAME, save_path_final = config.ALL_ITEMS_FILE_PATH, max_pages = config.MAX_PAGES, page_size = config.PAGE_SIZE, include_head = config.INCLUDE_HEAD ): print('开始获取项目所有的数据编号...') self.username = username self.password = password self.dep_id = dep_id self.project_id = project_id self.BASE_URL = base_url #smart-water 网站首页 self.out_path = out_path self.out_file_name = out_file_name self.max_pages = int(max_pages) self.page_size = int(page_size) self.token = None self.include_head = include_head self.save_path_tem = os.path.join(self.out_path,'tem_' + self.out_file_name) self.save_path_final = save_path_final self.start_time = time.time() self.end_time = self.start_time # 清理上一次执行的结果文件 if os.path.exists(self.save_path_tem) or os.path.exists(self.save_path_final): print(f'清理缓存文件...') if os.path.exists(self.save_path_final): os.remove(self.save_path_final) print(f'清理 {self.save_path_final}') if os.path.exists(self.save_path_tem): os.remove(self.save_path_tem) print(f'清理 {self.save_path_tem}') def login_smart_water(self): login_url = f"{self.BASE_URL}/api/v2/user/login" # smart-water 登陆页面 login_headers = { # 登陆请求头 "Accept": "application/json", "Content-Type": "application/json;charset=utf-8", "Cookie": "lang=zh-CN", "Origin": self.BASE_URL, "Referer": f"{self.BASE_URL}/", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36" } login_params = { # 请求参数 "username": self.username, "password": self.password, "type": "account", "DepId": self.dep_id # 部门ID } try: # 尝试登陆 response = requests.post(login_url, json=login_params, headers=login_headers) response.raise_for_status() # 检查HTTP错误 data = response.json() token = data['data']['token'] self.token = token if token != '' else None if self.token is not None: print(f'{self.username} 登陆成功! \n获取token {self.token}') else: print(f'{self.username} 登陆失败!') except requests.exceptions.HTTPError as errh: print("HTTP Error:", errh) except requests.exceptions.ConnectionError as errc: print("Error Connecting:", errc) except requests.exceptions.Timeout as errt: print("Timeout Error:", errt) except requests.exceptions.RequestException as err: print("OOps: Something Else", err) return None @staticmethod def write_file(handler, data: list): write_cnt = 0 for label in data: # '名称', '编码', '单位' , '精度', '是否枚举', '设备号' csv.writer(handler).writerow([label['ItemAlias'], label['ItemName'], label['ItemUnit'], label['ItemPrecise'], int(label['IsBool']), label['DeviceCode']]) write_cnt += 1 return write_cnt @staticmethod def format_chinese_datetime(dt=None): """格式化日期时间为中文格式""" if dt is None: dt = datetime.now() # 提取日期时间各部分 year = dt.year month = dt.month day = dt.day hour = dt.hour minute = dt.minute # 格式化为中文 return f"{year}年{month}月{day}日 {hour:02d}:{minute:02d}" def get_all_label(self): if self.token is None: self.login_smart_water() label_url = f"{self.BASE_URL}/api/v1/config/device-realtime-plc-item/list/{self.project_id}" # 数据抓取页面 headers = { 'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Cookie': 'lang=zh-CN', 'Host': '120.55.44.4:8900', 'JWT-TOKEN': self.token, 'Referer': 'http://120.55.44.4:8900/', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36' } with requests.Session() as session: session.headers.update(headers) failed_cnt = 0 # 爬取每个页面 with open(self.save_path_tem, mode='a', encoding='utf-8', newline='') as file_handler: # 按照'名称', '编码', '单位' , '精度', '设备号' 格式保存数据 print('准备写入数据...') csv.writer(file_handler).writerow(['名称', '编码', '单位' , '精度', '是否枚举', '设备号']) pages = 1 total_write_cnt = 0 while pages <= self.max_pages: try: params = { 'currentPage': f'{pages}', 'pageSize': f'{self.page_size}', 'ProjectId': self.project_id, 'time': int(time.time() * 1000) } response = session.get(label_url, params=params) response.raise_for_status() result = response.json() if result.get('code') == 603: # token 过期就重新登录一次 self.login_smart_water() headers['JWT-TOKEN'] = self.token session.headers.update(headers) if result.get('code') == 200: print(f'时间:{params['time']} 页码:{params['currentPage']}, 网页数据获取成功, 写入文件') label_list = result['data']['list'] total_write_cnt += self.write_file(file_handler, label_list) pages += 1 except requests.exceptions.HTTPError as errh: print("HTTP Error:", errh) failed_cnt += 1 except requests.exceptions.ConnectionError as errc: print("Error Connecting:", errc) failed_cnt += 1 except requests.exceptions.Timeout as errt: print("Timeout Error:", errt) failed_cnt += 1 except requests.exceptions.RequestException as err: print("OOps: Something Else", err) failed_cnt += 1 finally: if failed_cnt >= 3 : print('失败次数达到3次, 自动退出!') break print(f'数据写入完成,写入网页数量为{pages - 1}页,{total_write_cnt}条数据记录!') # 写最终文件 self.end_time = time.time() total_time = round(self.end_time - self.start_time, 2) current_date = self.format_chinese_datetime() stat_info = f"# 项目编号: {self.project_id}, 获取日期: {current_date}, 总记录数量: {total_write_cnt}, 总耗时: {total_time}s" with open(self.save_path_tem, mode='r', encoding='utf-8') as file_handler: with open(self.save_path_final, mode='w', encoding='utf-8', newline='') as final_file_handler: if self.include_head: final_file_handler.write(stat_info + '\n') # 复制临时文件内容到最终文件 shutil.copyfileobj(file_handler, final_file_handler) os.unlink(self.save_path_tem) print('all-items文件写入成功:',self.save_path_final) def get_name_code_transfer(self): """生成code和name之间的转换文件""" total_name_to_code = {'name_2_code': {}, 'code_2_name': {}, 'len': 0} if not os.path.exists(self.save_path_final): raise RuntimeError('文件不存在:', self.save_path_final) file_path_out = config.TRANSFER_JSON_NAME # file_path_out = self.save_path_final[:-4] + '_name_code_transfer.json' if os.path.exists(file_path_out): print('清理历史文件:', file_path_out) os.remove(file_path_out) with open(self.save_path_final, 'r', encoding='utf-8') as file_handler: csv_reader = csv.reader(file_handler) if self.include_head: try: next(csv_reader) except StopIteration: pass try: next(csv_reader) except StopIteration: pass for row in csv_reader: total_name_to_code.get('name_2_code').update({row[0].strip(): row[1].strip()}) total_name_to_code['len'] += 1 total_name_to_code.get('code_2_name').update({v: k for k, v in total_name_to_code.get('name_2_code').items()}) with open(file_path_out, 'w', encoding="utf-8",newline='') as f: json.dump(total_name_to_code, f, ensure_ascii=False, indent=4) print('name-code字典文件写入成功:',file_path_out) if __name__ == '__main__': # 从智慧水萝卜网站获取数据库中的数据字段英文编号和中文名称 dh = DataHelper() dh.get_all_label() # 生成code-name字典文件 dh.get_name_code_transfer()