import sys sys.path.append("..") from Analysis.pearsonr import DFMat, PearsonrMat from Database.database_ import DatabaseParam import config import os import json import pandas as pd from sklearn.preprocessing import StandardScaler from sklearn.linear_model import Lasso, LassoCV, LinearRegression from sklearn.model_selection import TimeSeriesSplit import numpy as np import matplotlib.pyplot as plt from sklearn.metrics import r2_score import scipy.stats as stats from utils.tools import set_chinese_font from sklearn.metrics import mean_squared_error, mean_absolute_error from sklearn.model_selection import cross_val_score from statsmodels.stats.outliers_influence import OLSInfluence import statsmodels.api as sm class RegressionBox(PearsonrMat): """Lasso回归模型+OLS最小回归""" def __init__(self, keys_file_dir: str, min_records:int, db_param: DatabaseParam, transfer_file_dir:str, is_from_local:bool=True): super().__init__(keys_file_dir=keys_file_dir, min_records=min_records, db_param=db_param, transfer_file_dir=transfer_file_dir, is_from_local=is_from_local) self.lasso_info = {"help":"x,自变量名;y,因变量名;alpha,最佳参数;coef,自变量权重;intercept,截距;n_iter,迭代次数;dual_gap,对偶间隙;tol,对偶容忍"} self.ols_info = {"help":"x,自变量名;y,因变量名;最佳参数;coef,自变量权重;intercept,截距;n_iter,迭代次数;score,R2决定系数;"} self.ols_model = None # 最终的线性OLS回归模型 def read_features_file(self): """加载特征文件,确定因变量Y和自变量X的标签""" path = config.LASSO_FEATURE_FILE_PATH if not os.path.exists(path): raise FileNotFoundError('文件未发现:', path) with open(path, "r", encoding="utf-8") as f: json_data = json.load(f) return json_data.get('targets'), json_data.get('features') def load_features(self)->tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: y_label_name, x_label_name = self.read_features_file() # name转换为code y_label_code = [self.name_2code_dict.get(i) for i in y_label_name if self.name_2code_dict.get(i) in self.df_merge.columns.tolist()] x_label_code = [self.name_2code_dict.get(i) for i in x_label_name if self.name_2code_dict.get(i) in self.df_merge.columns.tolist()] if len(y_label_code) ==0 or len(x_label_code) == 0: raise ValueError('需要拟合的特征为空,请检查建模字段是否存在', (y_label_code, x_label_code)) targets = self.df_merge.loc[:, y_label_code].copy() features = self.df_merge.loc[:, x_label_code].copy() time = self.df_merge.loc[:, ['time']].copy() return targets, features, time def select_features(self): pass def lasso_(self, y_value:np.ndarray, scaler_x_value:np.ndarray,n_splits:int=5, max_iter:int=10000): """实现Lasso回归分析,选择字段""" tscv = TimeSeriesSplit(n_splits=n_splits) # 寻找最优alphas lasso_model = LassoCV(alphas=100, cv=tscv, max_iter=max_iter, random_state=42, n_jobs=-1) lasso_model.fit(scaler_x_value, y_value) # 记录最优alphas self.lasso_info['alpha'] = lasso_model.alpha_ # 记录截距 self.lasso_info['intercept'] = lasso_model.intercept_ # 记录迭代次数 self.lasso_info['n_iter'] = lasso_model.n_iter_ # 记录对偶间隙 self.lasso_info['dual_gap'] = lasso_model.dual_gap_ # 记录对偶容忍 self.lasso_info['tol'] = lasso_model.tol # 记录权重 self.lasso_info['coef'] = lasso_model.coef_ def ols_(self, y_value:np.ndarray, scaler_x_value:np.ndarray)->LinearRegression: """OLS回归""" model = LinearRegression() model.fit(scaler_x_value, y_value) # 记录截距 self.ols_info['intercept'] = model.intercept_ # 记录权重 self.ols_info['coef'] = model.coef_ # 记录R² self.ols_info['score'] = model.score(scaler_x_value, y_value) return model def any_regression_full(self, target_name:str): """对任意输入字段进行全字段回归建模""" pass def any_regression_r_rank(self, target_name:str): """基于皮尔逊系数排序对字段进行回归建模""" # 所有需要建模的字段 y_label_name = target_name x_label_name = self.query_r_rank_n(y_label_name) # 根据皮尔逊排序挑选相关性字段 # 剔除自身字段 if y_label_name in x_label_name: x_label_name.remove(y_label_name) # 拿到数据 y_label_code = self.name_2code_dict[y_label_name] x_label_code = [self.name_2code_dict.get(name) for name in x_label_name] y = self.df_merge.loc[:, y_label_code].copy() # 真实值 y = y.to_numpy() x = self.df_merge.loc[:, x_label_code].copy() # 预测值 t = self.df_merge.loc[:, 'time'].copy() # 时间序列 # 标准化 scaler = StandardScaler() x = scaler.fit_transform(x) # Lasso回归,选择字段 self.lasso_(y_value=y, scaler_x_value=x) self.lasso_info['x'] = x_label_name self.lasso_info['y'] = y_label_name # Lasso模型诊断与可视化 print('\n===========Lasso训练结果==================') print(f'最优lambda:{self.lasso_info.get('alpha')}') print(f'Y:{self.lasso_info.get('y')}') print(f"Lasso系数:") for feat, coef in zip(x_label_name, self.lasso_info.get('coef')): print(f" {feat}: {coef}") print(f'截距:{self.lasso_info.get('intercept')}') print(f'迭代次数:{self.lasso_info.get('n_iter')}') print(f'对偶间隙:{self.lasso_info.get('dual_gap')}') print(f'对偶间隙容忍:{self.lasso_info.get('tol')}') # OLS回归,筛选系数不为零的向量 mask = self.lasso_info.get('coef') != 0 x_label_name = list(np.array(x_label_name)[mask]) x_label_code = list(np.array(x_label_code)[mask]) x = self.df_merge.loc[:, x_label_code] # 没进行归一化/标准化 self.ols_model = self.ols_(y_value=y, scaler_x_value=x) self.ols_info['x'] = x_label_name self.ols_info['y'] = y_label_name # OLS模型诊断 print('\n===========OLS训练结果==================') print(f"OLS 截距: {self.ols_info.get('intercept')}") print(f"OLS 系数:") for feat, coef in zip(x_label_name, self.ols_info.get('coef')): print(f" {feat}: {coef}") print(f"OLS R² (训练集): {self.ols_info.get('score'):.4f}") # 基本指标评价 y_pred = self.ols_model.predict(x) residuals = y - y_pred mse = mean_squared_error(y, y_pred) rmse = np.sqrt(mse) mae = mean_absolute_error(y, y_pred) r2 = r2_score(y, y_pred) # 调整R² n = len(y) p = x.shape[1] adj_r2 = 1 - (1 - r2) * (n - 1) / (n - p - 1) print("\n===========模型性能指标==================:") print(f"均方误差 (MSE): {mse:.4f}") print(f"均方根误差 (RMSE): {rmse:.4f}") print(f"平均绝对误差 (MAE): {mae:.4f}") print(f"决定系数 (R²): {r2:.4f}") print(f"调整R²: {adj_r2:.4f}") # 创建诊断图 set_chinese_font() fig, axes = plt.subplots(2, 3, figsize=(15, 10)) # 1. 残差 vs 拟合值图(检查同方差性和线性关系) axes[0, 0].scatter(y_pred, residuals, alpha=0.6) axes[0, 0].axhline(y=0, color='red', linestyle='--') axes[0, 0].set_xlabel('拟合值') axes[0, 0].set_ylabel('残差') axes[0, 0].set_title('残差 vs 拟合值') # 2. 正态Q-Q图(检查残差正态性) stats.probplot(residuals, dist="norm", plot=axes[0, 1]) axes[0, 1].set_title('Q-Q图(检查正态性)') # 3. 残差直方图 axes[0, 2].hist(residuals, bins=30, density=True, alpha=0.7) axes[0, 2].set_xlabel('残差') axes[0, 2].set_ylabel('密度') axes[0, 2].set_title('残差分布') # 4. 观测值 vs 拟合值 axes[1, 0].scatter(y, y_pred, alpha=0.6) min_val = min(y.min(), y_pred.min()) max_val = max(y.max(), y_pred.max()) axes[1, 0].plot([min_val, max_val], [min_val, max_val], 'red', linestyle='--') axes[1, 0].set_xlabel('实际值') axes[1, 0].set_ylabel('预测值') axes[1, 0].set_title('实际值 vs 预测值') r2 = r2_score(y, y_pred) axes[1, 0].text(0.05, 0.95, f'R² = {r2:.3f}', transform=axes[1, 0].transAxes) # 5. 残差的时间序列图(如果是时间序列数据) axes[1, 1].plot(residuals) axes[1, 1].axhline(y=0, color='red', linestyle='--') axes[1, 1].set_xlabel('时间/观测序号') axes[1, 1].set_ylabel('残差') axes[1, 1].set_title('残差时间序列') # 6. 尺度-位置图(检查同方差性) standardized_residuals = residuals / np.std(residuals) axes[1, 2].scatter(y_pred, np.sqrt(np.abs(standardized_residuals)), alpha=0.6) axes[1, 2].set_xlabel('拟合值') axes[1, 2].set_ylabel('√|标准化残差|') axes[1, 2].set_title('尺度-位置图') plt.tight_layout() plt.show() pass def any_regression_custom(self, target_name:str, path:str): """基于自定义字段进行回归建模,从文件读入建模字段""" def auto_fit(self, x_label_name:str, y_label_name:str, is_use_lasso:bool=True): """回归分析""" if __name__ == '__main__': # 数据库参数 db_param = DatabaseParam( db_host=config.DB_HOST, db_user=config.DB_USER, db_password=config.DB_PASSWORD, db_name=config.DB_NAME, db_port=config.DB_PORT) my_box = RegressionBox( keys_file_dir=os.path.join(config.STATISTICS_FILE_DIR, config.STATISTICS_FILE_NAME), min_records = config.MIN_RECORDS, db_param = db_param, transfer_file_dir = os.path.join(config.ALL_ITEMS_FILE_DIR, config.TRANSFER_JSON_NAME)) # 计算皮尔逊 my_box.calculate_pearsonr_mat() # 进行回归分析 my_box.any_regression_r_rank("RO1脱盐率")