| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- #sys.path.append("..")
- import json
- import os
- import csv
- from temp import config_analysis
- from temp.utils_analysis import create_custom_heatmap, set_chinese_font, cross_corr, group_list
- from Database.database_ import Database, DatabaseParam
- def read_json(json_file, key:str='data'):
- """加载json"""
- with open(json_file, 'r', encoding='utf-8') as f:
- data_ = json.load(f)
- print('数据加载成功,总数量:', data_.get('len'))
- return data_.get(key)
- def select(d_list: list, l_list:list) -> list:
- counter_dict_row = {}
- counter_dict_col = {}
- # 行计数
- # for l in l_list:
- # counter_dict_row.update({l: 0})
- # counter_dict_col.update({l: 0})
- for d in d_list:
- counter_dict_row.update({d.get('A').get('name'): 0})
- counter_dict_col.update({d.get('B').get('name'): 0})
- # 列计数
- for d in d_list:
- counter_dict_row[d['A']['name']] += 1
- counter_dict_col[d['B']['name']] += 1
- # 剔除只自相关的数据字段
- new_d_list_idx = []
- for idx, d in enumerate(d_list):
- if d['A']['name'] == d['B']['name']:
- if counter_dict_row[d['A']['name']] == 1 and counter_dict_col[d['B']['name']] == 1:
- continue
- new_d_list_idx.append(idx)
- new_d_list = [d_list[i] for i in new_d_list_idx]
- return new_d_list
- if __name__ == '__main__':
- # 添加列表
- added_list = ['超滤总产水浊度','超滤总产水余氯']
- # 排除列表
- not_selected = ['电流', '控制字', '步序', '时间设定', '开关', '报警', '噪音']
- # 设置中文字体
- set_chinese_font()
- # 获取name和code的映射关系
- with open('../GetItem/92_all_items_name_code_transfer.json', 'r', encoding='utf-8') as f: # 总字段加载文件
- name_code_transfer = json.load(f)
- print(f'加载name与code映射字典,共{name_code_transfer.get('len')}条')
- name_2_code = name_code_transfer.get('name_2_code')
- code_2_name = name_code_transfer.get('code_2_name')
- del name_code_transfer
- # 结果表格化
- out_name = 'result_' + os.path.basename(config_analysis.OUTPUT_JSON_FILE)[:-5] + '.csv'
- data_list = read_json(config_analysis.OUTPUT_JSON_FILE)
- # 需要统计的所有字段
- total_name = read_json(config_analysis.OUTPUT_JSON_FILE[:-9] + '.json', 'total_name_list')
- total_code = read_json(config_analysis.OUTPUT_JSON_FILE[:-9] + '.json', 'total_code_list')
- data_list = select(data_list, total_name)
- # 写入csv
- if os.path.exists(out_name):
- os.remove(out_name)
- with open(out_name, 'a', encoding='utf-8') as f:
- csv.writer(f).writerow(['A序列', 'B序列', 'k1', 'p1', 'r1', 'k2', 'p2', 'r2', 'k3', 'p3', 'r3'])
- for item_dict in data_list:
- txt_content_A = [f'{item_dict.get('A').get('name')}({item_dict.get('A').get('code')})']
- txt_content_B = [f'{item_dict.get('B').get('name')}({item_dict.get('B').get('code')})']
- txt_content_res = []
- for it in item_dict.get('res'):
- txt_content_res.append(f'{it.get('k')}')
- txt_content_res.append(f'{it.get('p'):.4f}')
- txt_content_res.append(f'{it.get('r'):.4f}')
- csv.writer(f).writerow(txt_content_A + txt_content_B + txt_content_res)
- new_label_name = set()
- for d in data_list:
- new_label_name.add(d['A']['name'])
- new_label_name.add(d['B']['name'])
- # 增加添加列表
- for ele in added_list:
- new_label_name.add(ele)
- # 剔除排除列表
- new_label_name_temp = []
- for ele in new_label_name:
- flag = True
- for not_selected_ele in not_selected:
- if not_selected_ele in ele:
- flag = False
- break
- if flag: new_label_name_temp.append(ele)
- new_label_name = new_label_name_temp
- del new_label_name_temp
- new_label_code = [name_2_code.get(ele) for ele in new_label_name]
- # 二次筛选后的统计字段
- del data_list
- print('筛选后还剩下的字段数:', len(new_label_name))
- # 逐组进行二次计算
- db_param = DatabaseParam(
- db_host='192.168.50.4',
- db_user='root',
- db_password='*B-@p2b+97D5xAF1e6',
- db_name='ws_data',
- db_port=4000)
- # 按组计算
- with Database(db_param) as db:
- group_df = db.query_sql_time_series_group2data_frame(
- code_name_dict=code_2_name,
- project_id=config_analysis.PROJECT_ID,
- sheet_name=config_analysis.DB_SHEET_NAME,
- data_codes=new_label_code,
- start_year=config_analysis.CHECK_YEAR_START,
- end_year=config_analysis.CHECK_YEAR_END,
- start_month=config_analysis.CHECK_MONTH_START,
- end_month=config_analysis.CHECK_MONTH_END,
- start_day=config_analysis.CHECK_DAY_START,
- end_day=config_analysis.CHECK_DAY_END)
- del new_label_code, new_label_name
- new_label_code = sorted([i for i in group_df.columns.tolist() if i != 'time'])
- # 对列表进行分组
- row_group_elements_num = 25 # 行分组
- row_group_code = group_list(new_label_code, row_group_elements_num)
- col_group_elements_num = 50 # 列分组
- col_group_code = group_list(new_label_code, col_group_elements_num)
- for i, row_code in enumerate(row_group_code):
- for j, col_code in enumerate(col_group_code):
- corr = cross_corr(row_code, col_code, group_df, code_2_name)
- create_custom_heatmap(corr_matrix=corr, title=f'中荷水厂数据相关系数热力图{i}-{j}')
- query_name = input('是否继续查询结果所在位置(y/n):')
- if query_name == 'y' or query_name == 'Y':
- while query_name != '退出':
- query_name = input('查询:')
- flag = False
- for i, row_group in enumerate(row_group_code):
- if name_2_code.get(query_name) in row_group:
- flag = True
- print(f'位置:{i}-*.png')
- break
- if not flag:
- print(f'位置:{query_name}不在统计范围')
|