import os.path import sys sys.path.append("..") import pandas as pd import config import pickle from utils.tools import create_custom_heatmap, set_chinese_font, group_list, quick_sort, load_transfer_file_name_code import csv from sklearn.preprocessing import StandardScaler from sklearn.linear_model import LinearRegression from sklearn.metrics import mean_squared_error, mean_absolute_error from sklearn.metrics import r2_score import numpy as np from openpyxl import load_workbook set_chinese_font() def load_pearsonr_mat(): with open(os.path.join(config.R_MAT_JSON_FILE_DIR, config.R_MAT_JSON_FILE_NAME), 'rb') as f: results = pickle.load(f) return results def show_all_results(): """展示所有结果""" # 加载计算结果 with open(os.path.join(config.R_MAT_JSON_FILE_DIR, config.R_MAT_JSON_FILE_NAME), 'rb') as f: results = pickle.load(f) label_list = results.columns.tolist() # 行列分组显示热力图 row_group_elements_num = 25 # 行分组 row_group_name = group_list(label_list, row_group_elements_num) col_group_elements_num = 50 # 列分组 col_group_name = group_list(label_list, col_group_elements_num) for i, row_group in enumerate(row_group_name): for j, col_group in enumerate(col_group_name): corr_matrix = results.loc[row_group, col_group] create_custom_heatmap(corr_matrix, title=f'{config.PROJECT_ID}_水厂数据相关系数热力图{i}-{j}') query_name = input('是否继续查询结果所在位置(y/n):') if query_name == 'y' or query_name == 'Y': while query_name != '退出': query_name = input('查询:') flag = False for i, row_group in enumerate(row_group_name): if query_name in row_group: flag = True print(f'位置:{i}-*.png') break if not flag: print(f'位置:{query_name}不在统计范围') def save_txt(path): """按照某一个格式写入txt文件""" with open(os.path.join(config.R_MAT_JSON_FILE_DIR, config.R_MAT_JSON_FILE_NAME), 'rb') as f: results = pickle.load(f) label_list = results.columns.tolist() with open(path, 'w', encoding='utf-8') as f: for i in range(len(label_list)): for j in range(len(label_list)): r = results.iloc[i,j] if abs(r-1) < 1e-6 or r < 0.2: continue f.write(f'{label_list[i]}-{label_list[j]}:{r:.2f};') def save_csv(path): with open(os.path.join(config.R_MAT_JSON_FILE_DIR, config.R_MAT_JSON_FILE_NAME), 'rb') as f: results = pickle.load(f) label_list = results.columns.tolist() def rank(path): """按照相关系数进行排序,排序后写入文件""" if os.path.exists(path): os.remove(path) rmat = load_pearsonr_mat() # 对称矩阵 label_list = rmat.columns.tolist() with open(path, 'w', newline='', encoding='utf-8') as f: csv_writer = csv.writer(f) for col_label in label_list: # 从皮尔逊矩阵挑选出1列元素 elements = [] for row_label in label_list: elements.append((row_label, rmat.loc[row_label, col_label])) # 按照皮尔逊相关系数的绝对值进行升序排序 quick_sort(elements, 0, len(elements) - 1) # 反转list,由大到小排序 elements = elements[::-1] # 写入csv csv_line_content = [f'{tup[0]} | {tup[1]:.2f}' for tup in elements if abs(tup[1]) > 0] csv_writer.writerow([col_label] + csv_line_content) def directed_heatmap(series_a_name, series_b_name): # 定向绘制皮尔逊系数矩阵 rmat = load_pearsonr_mat() series_a_name = [i for i in series_a_name if i in rmat.columns.tolist()] series_b_name = [i for i in series_b_name if i in rmat.columns.tolist()] corr_matrix = rmat.loc[series_a_name, series_b_name] create_custom_heatmap(corr_matrix, title=f'{config.PROJECT_ID}_PearsonMat-{'_'.join(series_a_name[:3])}等-VS-{'_'.join(series_b_name[:3])}等') def free_ols(target_name, x_name): """自由最小二乘""" # 剔除自身字段 if target_name in x_name: x_name.remove(target_name) # 获取数据 with open(config.DF_MERGE_FILE_PATH, 'rb') as f: df_merge_mat = pickle.load(f) name_2_code_dict, code_2_name_dict = load_transfer_file_name_code(os.path.join(config.ALL_ITEMS_FILE_DIR, config.TRANSFER_JSON_NAME)) if target_name not in name_2_code_dict.keys(): raise RuntimeError('输入的target字段与数据不匹配', target_name) x_name = [i for i in x_name if i in name_2_code_dict.keys()] target_code = name_2_code_dict.get(target_name) if target_code not in df_merge_mat.columns.tolist(): return x_code = [name_2_code_dict.get(i) for i in x_name if name_2_code_dict.get(i) in df_merge_mat.columns.tolist()] if len(x_name) == 0 or len(x_code) == 0: raise RuntimeError('输入的x字段与数据不匹配', x_name) #ols # 标准化 x = df_merge_mat.loc[:, x_code].copy() y = df_merge_mat.loc[:, target_code] scaler = StandardScaler() x = scaler.fit_transform(x) ols_model = LinearRegression() ols_model.fit(x, y) # OLS模型诊断 print_info = [] print('\n===========OLS训练结果==================') print(f'Y:{target_name}') print(f"OLS 截距: {ols_model.intercept_}") print(f"OLS 系数:") for feat, coef in zip(x_name, ols_model.coef_): print(f" {feat}: {coef:.4f}") print_info.append(f'{coef:.4f}*{feat}') print(f"OLS R² (训练集): {ols_model.score(x,y):.4f}") print_info = ['+'+i if i[0]!='-' else i for i in print_info] print(f"{target_name}="+''.join(print_info) + f'+{ols_model.intercept_:.4}' if str(ols_model.intercept_)[0] != '-' else f'{ols_model.intercept_:.4}') # 基本指标评价 y_pred = ols_model.predict(x) residuals = y - y_pred mse = mean_squared_error(y, y_pred) rmse = np.sqrt(mse) mae = mean_absolute_error(y, y_pred) r2 = r2_score(y, y_pred) # 调整R² n = len(y) p = x.shape[1] adj_r2 = 1 - (1 - r2) * (n - 1) / (n - p - 1) print("\n===========模型性能指标==================:") print(f"均方误差 (MSE): {mse:.4f}") print(f"均方根误差 (RMSE): {rmse:.4f}") print(f"平均绝对误差 (MAE): {mae:.4f}") print(f"决定系数 (R²): {r2:.4f}") print(f"调整R²: {adj_r2:.4f}") if __name__ == '__main__': # 对所有结果进行展示 # show_all_results() # 按照格式写入txt # save_txt('./tem.txt') # 皮尔逊排序 #rank(f'./{config.PROJECT_ID}_rank.csv') # 定向绘制皮尔逊分布图 # 加载Excel工作簿 # workbook = load_workbook(f'./{config.PROJECT_ID}_field_combination.xlsx') # # 获取所有sheet的名称 # sheet_names = workbook.sheetnames # print("文件中包含的sheet有:", sheet_names) # # 遍历每一个sheet # for sheet_name in sheet_names: # sheet = workbook[sheet_name] # # 获取A列数据(从第一行开始) # series_a_name = [cell.value for cell in sheet['A'] if cell.value is not None] # # 获取B列数据(从第一行开始) # series_b_name = [cell.value for cell in sheet['B'] if cell.value is not None] # print(f"Sheet名称: {sheet_name}") # print(f" A列 {series_a_name} ") # print(f" B列 {series_b_name} ") # directed_heatmap(series_a_name, series_b_name) # 定向自由回归 # 加载Excel工作簿 workbook = load_workbook(f'./{config.PROJECT_ID}_field_ols.xlsx') # 获取所有sheet的名称 sheet_names = workbook.sheetnames print("文件中包含的sheet有:", sheet_names) # 遍历每一个sheet for sheet_name in sheet_names: sheet = workbook[sheet_name] # 获取A列数据(从第一行开始) series_a_name = [cell.value for cell in sheet['A'] if cell.value is not None] # 获取B列数据(从第一行开始) series_b_name = [cell.value for cell in sheet['B'] if cell.value is not None] free_ols(series_b_name[0], series_a_name)