#sys.path.append("..") import json import os import csv from temp import config_analysis from temp.utils_analysis import create_custom_heatmap, set_chinese_font, cross_corr, group_list from Database.database_ import Database, DatabaseParam def read_json(json_file, key:str='data'): """加载json""" with open(json_file, 'r', encoding='utf-8') as f: data_ = json.load(f) print('数据加载成功,总数量:', data_.get('len')) return data_.get(key) def select(d_list: list, l_list:list) -> list: counter_dict_row = {} counter_dict_col = {} # 行计数 # for l in l_list: # counter_dict_row.update({l: 0}) # counter_dict_col.update({l: 0}) for d in d_list: counter_dict_row.update({d.get('A').get('name'): 0}) counter_dict_col.update({d.get('B').get('name'): 0}) # 列计数 for d in d_list: counter_dict_row[d['A']['name']] += 1 counter_dict_col[d['B']['name']] += 1 # 剔除只自相关的数据字段 new_d_list_idx = [] for idx, d in enumerate(d_list): if d['A']['name'] == d['B']['name']: if counter_dict_row[d['A']['name']] == 1 and counter_dict_col[d['B']['name']] == 1: continue new_d_list_idx.append(idx) new_d_list = [d_list[i] for i in new_d_list_idx] return new_d_list if __name__ == '__main__': # 添加列表 added_list = ['超滤总产水浊度','超滤总产水余氯'] # 排除列表 not_selected = ['电流', '控制字', '步序', '时间设定', '开关', '报警', '噪音'] # 设置中文字体 set_chinese_font() # 获取name和code的映射关系 with open('../GetItem/92_all_items_name_code_transfer.json', 'r', encoding='utf-8') as f: # 总字段加载文件 name_code_transfer = json.load(f) print(f'加载name与code映射字典,共{name_code_transfer.get('len')}条') name_2_code = name_code_transfer.get('name_2_code') code_2_name = name_code_transfer.get('code_2_name') del name_code_transfer # 结果表格化 out_name = 'result_' + os.path.basename(config_analysis.OUTPUT_JSON_FILE)[:-5] + '.csv' data_list = read_json(config_analysis.OUTPUT_JSON_FILE) # 需要统计的所有字段 total_name = read_json(config_analysis.OUTPUT_JSON_FILE[:-9] + '.json', 'total_name_list') total_code = read_json(config_analysis.OUTPUT_JSON_FILE[:-9] + '.json', 'total_code_list') data_list = select(data_list, total_name) # 写入csv if os.path.exists(out_name): os.remove(out_name) with open(out_name, 'a', encoding='utf-8') as f: csv.writer(f).writerow(['A序列', 'B序列', 'k1', 'p1', 'r1', 'k2', 'p2', 'r2', 'k3', 'p3', 'r3']) for item_dict in data_list: txt_content_A = [f'{item_dict.get('A').get('name')}({item_dict.get('A').get('code')})'] txt_content_B = [f'{item_dict.get('B').get('name')}({item_dict.get('B').get('code')})'] txt_content_res = [] for it in item_dict.get('res'): txt_content_res.append(f'{it.get('k')}') txt_content_res.append(f'{it.get('p'):.4f}') txt_content_res.append(f'{it.get('r'):.4f}') csv.writer(f).writerow(txt_content_A + txt_content_B + txt_content_res) new_label_name = set() for d in data_list: new_label_name.add(d['A']['name']) new_label_name.add(d['B']['name']) # 增加添加列表 for ele in added_list: new_label_name.add(ele) # 剔除排除列表 new_label_name_temp = [] for ele in new_label_name: flag = True for not_selected_ele in not_selected: if not_selected_ele in ele: flag = False break if flag: new_label_name_temp.append(ele) new_label_name = new_label_name_temp del new_label_name_temp new_label_code = [name_2_code.get(ele) for ele in new_label_name] # 二次筛选后的统计字段 del data_list print('筛选后还剩下的字段数:', len(new_label_name)) # 逐组进行二次计算 db_param = DatabaseParam( db_host='192.168.50.4', db_user='root', db_password='*B-@p2b+97D5xAF1e6', db_name='ws_data', db_port=4000) # 按组计算 with Database(db_param) as db: group_df = db.query_sql_time_series_group2data_frame( code_name_dict=code_2_name, project_id=config_analysis.PROJECT_ID, sheet_name=config_analysis.DB_SHEET_NAME, data_codes=new_label_code, start_year=config_analysis.CHECK_YEAR_START, end_year=config_analysis.CHECK_YEAR_END, start_month=config_analysis.CHECK_MONTH_START, end_month=config_analysis.CHECK_MONTH_END, start_day=config_analysis.CHECK_DAY_START, end_day=config_analysis.CHECK_DAY_END) del new_label_code, new_label_name new_label_code = sorted([i for i in group_df.columns.tolist() if i != 'time']) # 对列表进行分组 row_group_elements_num = 25 # 行分组 row_group_code = group_list(new_label_code, row_group_elements_num) col_group_elements_num = 50 # 列分组 col_group_code = group_list(new_label_code, col_group_elements_num) for i, row_code in enumerate(row_group_code): for j, col_code in enumerate(col_group_code): corr = cross_corr(row_code, col_code, group_df, code_2_name) create_custom_heatmap(corr_matrix=corr, title=f'中荷水厂数据相关系数热力图{i}-{j}') query_name = input('是否继续查询结果所在位置(y/n):') if query_name == 'y' or query_name == 'Y': while query_name != '退出': query_name = input('查询:') flag = False for i, row_group in enumerate(row_group_code): if name_2_code.get(query_name) in row_group: flag = True print(f'位置:{i}-*.png') break if not flag: print(f'位置:{query_name}不在统计范围')