| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- import os.path
- import sys
- sys.path.append("..")
- import pandas as pd
- import config
- import pickle
- from utils.tools import create_custom_heatmap, set_chinese_font, group_list, quick_sort, load_transfer_file_name_code
- import csv
- from sklearn.preprocessing import StandardScaler
- from sklearn.linear_model import LinearRegression
- from sklearn.metrics import mean_squared_error, mean_absolute_error
- from sklearn.metrics import r2_score
- import numpy as np
- from openpyxl import load_workbook
- set_chinese_font()
- def load_pearsonr_mat():
- with open(os.path.join(config.R_MAT_JSON_FILE_DIR, config.R_MAT_JSON_FILE_NAME), 'rb') as f:
- results = pickle.load(f)
- return results
- def show_all_results():
- """展示所有结果"""
- # 加载计算结果
- with open(os.path.join(config.R_MAT_JSON_FILE_DIR, config.R_MAT_JSON_FILE_NAME), 'rb') as f:
- results = pickle.load(f)
- label_list = results.columns.tolist()
- # 行列分组显示热力图
- row_group_elements_num = 25 # 行分组
- row_group_name = group_list(label_list, row_group_elements_num)
- col_group_elements_num = 50 # 列分组
- col_group_name = group_list(label_list, col_group_elements_num)
- for i, row_group in enumerate(row_group_name):
- for j, col_group in enumerate(col_group_name):
- corr_matrix = results.loc[row_group, col_group]
- create_custom_heatmap(corr_matrix, title=f'{config.PROJECT_ID}_水厂数据相关系数热力图{i}-{j}')
- query_name = input('是否继续查询结果所在位置(y/n):')
- if query_name == 'y' or query_name == 'Y':
- while query_name != '退出':
- query_name = input('查询:')
- flag = False
- for i, row_group in enumerate(row_group_name):
- if query_name in row_group:
- flag = True
- print(f'位置:{i}-*.png')
- break
- if not flag:
- print(f'位置:{query_name}不在统计范围')
- def save_txt(path):
- """按照某一个格式写入txt文件"""
- with open(os.path.join(config.R_MAT_JSON_FILE_DIR, config.R_MAT_JSON_FILE_NAME), 'rb') as f:
- results = pickle.load(f)
- label_list = results.columns.tolist()
- with open(path, 'w', encoding='utf-8') as f:
- for i in range(len(label_list)):
- for j in range(len(label_list)):
- r = results.iloc[i,j]
- if abs(r-1) < 1e-6 or r < 0.2: continue
- f.write(f'{label_list[i]}-{label_list[j]}:{r:.2f};')
- def save_csv(path):
- with open(os.path.join(config.R_MAT_JSON_FILE_DIR, config.R_MAT_JSON_FILE_NAME), 'rb') as f:
- results = pickle.load(f)
- label_list = results.columns.tolist()
- def rank(path):
- """按照相关系数进行排序,排序后写入文件"""
- if os.path.exists(path):
- os.remove(path)
- rmat = load_pearsonr_mat() # 对称矩阵
- label_list = rmat.columns.tolist()
- with open(path, 'w', newline='', encoding='utf-8') as f:
- csv_writer = csv.writer(f)
- for col_label in label_list:
- # 从皮尔逊矩阵挑选出1列元素
- elements = []
- for row_label in label_list:
- elements.append((row_label, rmat.loc[row_label, col_label]))
- # 按照皮尔逊相关系数的绝对值进行升序排序
- quick_sort(elements, 0, len(elements) - 1)
- # 反转list,由大到小排序
- elements = elements[::-1]
- # 写入csv
- csv_line_content = [f'{tup[0]} | {tup[1]:.2f}' for tup in elements if abs(tup[1]) > 0]
- csv_writer.writerow([col_label] + csv_line_content)
- def directed_heatmap(series_a_name, series_b_name):
- # 定向绘制皮尔逊系数矩阵
- rmat = load_pearsonr_mat()
- series_a_name = [i for i in series_a_name if i in rmat.columns.tolist()]
- series_b_name = [i for i in series_b_name if i in rmat.columns.tolist()]
- corr_matrix = rmat.loc[series_a_name, series_b_name]
- create_custom_heatmap(corr_matrix, title=f'{config.PROJECT_ID}_PearsonMat-{'_'.join(series_a_name[:3])}等-VS-{'_'.join(series_b_name[:3])}等')
- def free_ols(target_name, x_name):
- """自由最小二乘"""
- # 剔除自身字段
- if target_name in x_name:
- x_name.remove(target_name)
- # 获取数据
- with open(config.DF_MERGE_FILE_PATH, 'rb') as f:
- df_merge_mat = pickle.load(f)
- name_2_code_dict, code_2_name_dict = load_transfer_file_name_code(os.path.join(config.ALL_ITEMS_FILE_DIR, config.TRANSFER_JSON_NAME))
- if target_name not in name_2_code_dict.keys():
- raise RuntimeError('输入的target字段与数据不匹配', target_name)
- x_name = [i for i in x_name if i in name_2_code_dict.keys()]
- target_code = name_2_code_dict.get(target_name)
- if target_code not in df_merge_mat.columns.tolist():
- return
- x_code = [name_2_code_dict.get(i) for i in x_name if name_2_code_dict.get(i) in df_merge_mat.columns.tolist()]
- if len(x_name) == 0 or len(x_code) == 0:
- raise RuntimeError('输入的x字段与数据不匹配', x_name)
- #ols
- # 标准化
- x = df_merge_mat.loc[:, x_code].copy()
- y = df_merge_mat.loc[:, target_code]
- scaler = StandardScaler()
- x = scaler.fit_transform(x)
- ols_model = LinearRegression()
- ols_model.fit(x, y)
- # OLS模型诊断
- print_info = []
- print('\n===========OLS训练结果==================')
- print(f'Y:{target_name}')
- print(f"OLS 截距: {ols_model.intercept_}")
- print(f"OLS 系数:")
- for feat, coef in zip(x_name, ols_model.coef_):
- print(f" {feat}: {coef:.4f}")
- print_info.append(f'{coef:.4f}*{feat}')
- print(f"OLS R² (训练集): {ols_model.score(x,y):.4f}")
- print_info = ['+'+i if i[0]!='-' else i for i in print_info]
- print(f"{target_name}="+''.join(print_info) + f'+{ols_model.intercept_:.4}' if str(ols_model.intercept_)[0] != '-' else f'{ols_model.intercept_:.4}')
- # 基本指标评价
- y_pred = ols_model.predict(x)
- residuals = y - y_pred
- mse = mean_squared_error(y, y_pred)
- rmse = np.sqrt(mse)
- mae = mean_absolute_error(y, y_pred)
- r2 = r2_score(y, y_pred)
- # 调整R²
- n = len(y)
- p = x.shape[1]
- adj_r2 = 1 - (1 - r2) * (n - 1) / (n - p - 1)
- print("\n===========模型性能指标==================:")
- print(f"均方误差 (MSE): {mse:.4f}")
- print(f"均方根误差 (RMSE): {rmse:.4f}")
- print(f"平均绝对误差 (MAE): {mae:.4f}")
- print(f"决定系数 (R²): {r2:.4f}")
- print(f"调整R²: {adj_r2:.4f}")
- if __name__ == '__main__':
- # 对所有结果进行展示
- # show_all_results()
- # 按照格式写入txt
- # save_txt('./tem.txt')
- # 皮尔逊排序
- #rank(f'./{config.PROJECT_ID}_rank.csv')
- # 定向绘制皮尔逊分布图
- # 加载Excel工作簿
- # workbook = load_workbook(f'./{config.PROJECT_ID}_field_combination.xlsx')
- # # 获取所有sheet的名称
- # sheet_names = workbook.sheetnames
- # print("文件中包含的sheet有:", sheet_names)
- # # 遍历每一个sheet
- # for sheet_name in sheet_names:
- # sheet = workbook[sheet_name]
- # # 获取A列数据(从第一行开始)
- # series_a_name = [cell.value for cell in sheet['A'] if cell.value is not None]
- # # 获取B列数据(从第一行开始)
- # series_b_name = [cell.value for cell in sheet['B'] if cell.value is not None]
- # print(f"Sheet名称: {sheet_name}")
- # print(f" A列 {series_a_name} ")
- # print(f" B列 {series_b_name} ")
- # directed_heatmap(series_a_name, series_b_name)
- # 定向自由回归
- # 加载Excel工作簿
- workbook = load_workbook(f'./{config.PROJECT_ID}_field_ols.xlsx')
- # 获取所有sheet的名称
- sheet_names = workbook.sheetnames
- print("文件中包含的sheet有:", sheet_names)
- # 遍历每一个sheet
- for sheet_name in sheet_names:
- sheet = workbook[sheet_name]
- # 获取A列数据(从第一行开始)
- series_a_name = [cell.value for cell in sheet['A'] if cell.value is not None]
- # 获取B列数据(从第一行开始)
- series_b_name = [cell.value for cell in sheet['B'] if cell.value is not None]
- free_ols(series_b_name[0], series_a_name)
|