| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- import sys
- sys.path.append("..")
- from Analysis.pearsonr import DFMat, PearsonrMat
- from Database.database_ import DatabaseParam
- import config
- import os
- import json
- import pandas as pd
- from sklearn.preprocessing import StandardScaler
- from sklearn.linear_model import Lasso, LassoCV, LinearRegression
- from sklearn.model_selection import TimeSeriesSplit
- import numpy as np
- import matplotlib.pyplot as plt
- from sklearn.metrics import r2_score
- import scipy.stats as stats
- from utils.tools import set_chinese_font
- from sklearn.metrics import mean_squared_error, mean_absolute_error
- from sklearn.model_selection import cross_val_score
- from statsmodels.stats.outliers_influence import OLSInfluence
- import statsmodels.api as sm
- class RegressionBox(PearsonrMat):
- """Lasso回归模型+OLS最小回归"""
- def __init__(self, keys_file_dir: str, min_records:int, db_param: DatabaseParam, transfer_file_dir:str, is_from_local:bool=True):
- super().__init__(keys_file_dir=keys_file_dir, min_records=min_records, db_param=db_param, transfer_file_dir=transfer_file_dir, is_from_local=is_from_local)
- self.lasso_info = {"help":"x,自变量名;y,因变量名;alpha,最佳参数;coef,自变量权重;intercept,截距;n_iter,迭代次数;dual_gap,对偶间隙;tol,对偶容忍"}
- self.ols_info = {"help":"x,自变量名;y,因变量名;最佳参数;coef,自变量权重;intercept,截距;n_iter,迭代次数;score,R2决定系数;"}
- self.ols_model = None # 最终的线性OLS回归模型
- def read_features_file(self):
- """加载特征文件,确定因变量Y和自变量X的标签"""
- path = config.LASSO_FEATURE_FILE_PATH
- if not os.path.exists(path):
- raise FileNotFoundError('文件未发现:', path)
- with open(path, "r", encoding="utf-8") as f:
- json_data = json.load(f)
- return json_data.get('targets'), json_data.get('features')
- def load_features(self)->tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
- y_label_name, x_label_name = self.read_features_file()
- # name转换为code
- y_label_code = [self.name_2code_dict.get(i) for i in y_label_name if self.name_2code_dict.get(i) in self.df_merge.columns.tolist()]
- x_label_code = [self.name_2code_dict.get(i) for i in x_label_name if self.name_2code_dict.get(i) in self.df_merge.columns.tolist()]
- if len(y_label_code) ==0 or len(x_label_code) == 0:
- raise ValueError('需要拟合的特征为空,请检查建模字段是否存在', (y_label_code, x_label_code))
- targets = self.df_merge.loc[:, y_label_code].copy()
- features = self.df_merge.loc[:, x_label_code].copy()
- time = self.df_merge.loc[:, ['time']].copy()
- return targets, features, time
- def select_features(self):
- pass
- def lasso_(self, y_value:np.ndarray, scaler_x_value:np.ndarray,n_splits:int=5, max_iter:int=10000):
- """实现Lasso回归分析,选择字段"""
- tscv = TimeSeriesSplit(n_splits=n_splits)
- # 寻找最优alphas
- lasso_model = LassoCV(alphas=100,
- cv=tscv,
- max_iter=max_iter,
- random_state=42,
- n_jobs=-1)
- lasso_model.fit(scaler_x_value, y_value)
- # 记录最优alphas
- self.lasso_info['alpha'] = lasso_model.alpha_
- # 记录截距
- self.lasso_info['intercept'] = lasso_model.intercept_
- # 记录迭代次数
- self.lasso_info['n_iter'] = lasso_model.n_iter_
- # 记录对偶间隙
- self.lasso_info['dual_gap'] = lasso_model.dual_gap_
- # 记录对偶容忍
- self.lasso_info['tol'] = lasso_model.tol
- # 记录权重
- self.lasso_info['coef'] = lasso_model.coef_
- def ols_(self, y_value:np.ndarray, scaler_x_value:np.ndarray)->LinearRegression:
- """OLS回归"""
- model = LinearRegression()
- model.fit(scaler_x_value, y_value)
- # 记录截距
- self.ols_info['intercept'] = model.intercept_
- # 记录权重
- self.ols_info['coef'] = model.coef_
- # 记录R²
- self.ols_info['score'] = model.score(scaler_x_value, y_value)
- return model
- def any_regression_full(self, target_name:str):
- """对任意输入字段进行全字段回归建模"""
- pass
- def any_regression_r_rank(self, target_name:str):
- """基于皮尔逊系数排序对字段进行回归建模"""
- # 所有需要建模的字段
- y_label_name = target_name
- x_label_name = self.query_r_rank_n(y_label_name) # 根据皮尔逊排序挑选相关性字段
- # 剔除自身字段
- if y_label_name in x_label_name:
- x_label_name.remove(y_label_name)
- # 拿到数据
- y_label_code = self.name_2code_dict[y_label_name]
- x_label_code = [self.name_2code_dict.get(name) for name in x_label_name]
- y = self.df_merge.loc[:, y_label_code].copy() # 真实值
- y = y.to_numpy()
- x = self.df_merge.loc[:, x_label_code].copy() # 预测值
- t = self.df_merge.loc[:, 'time'].copy() # 时间序列
- # 标准化
- scaler = StandardScaler()
- x = scaler.fit_transform(x)
- # Lasso回归,选择字段
- self.lasso_(y_value=y, scaler_x_value=x)
- self.lasso_info['x'] = x_label_name
- self.lasso_info['y'] = y_label_name
- # Lasso模型诊断与可视化
- print('\n===========Lasso训练结果==================')
- print(f'最优lambda:{self.lasso_info.get('alpha')}')
- print(f'Y:{self.lasso_info.get('y')}')
- print(f"Lasso系数:")
- for feat, coef in zip(x_label_name, self.lasso_info.get('coef')):
- print(f" {feat}: {coef}")
- print(f'截距:{self.lasso_info.get('intercept')}')
- print(f'迭代次数:{self.lasso_info.get('n_iter')}')
- print(f'对偶间隙:{self.lasso_info.get('dual_gap')}')
- print(f'对偶间隙容忍:{self.lasso_info.get('tol')}')
- # OLS回归,筛选系数不为零的向量
- mask = self.lasso_info.get('coef') != 0
- x_label_name = list(np.array(x_label_name)[mask])
- x_label_code = list(np.array(x_label_code)[mask])
- x = self.df_merge.loc[:, x_label_code] # 没进行归一化/标准化
- self.ols_model = self.ols_(y_value=y, scaler_x_value=x)
- self.ols_info['x'] = x_label_name
- self.ols_info['y'] = y_label_name
- # OLS模型诊断
- print('\n===========OLS训练结果==================')
- print(f"OLS 截距: {self.ols_info.get('intercept')}")
- print(f"OLS 系数:")
- for feat, coef in zip(x_label_name, self.ols_info.get('coef')):
- print(f" {feat}: {coef}")
- print(f"OLS R² (训练集): {self.ols_info.get('score'):.4f}")
- # 基本指标评价
- y_pred = self.ols_model.predict(x)
- residuals = y - y_pred
- mse = mean_squared_error(y, y_pred)
- rmse = np.sqrt(mse)
- mae = mean_absolute_error(y, y_pred)
- r2 = r2_score(y, y_pred)
- # 调整R²
- n = len(y)
- p = x.shape[1]
- adj_r2 = 1 - (1 - r2) * (n - 1) / (n - p - 1)
- print("\n===========模型性能指标==================:")
- print(f"均方误差 (MSE): {mse:.4f}")
- print(f"均方根误差 (RMSE): {rmse:.4f}")
- print(f"平均绝对误差 (MAE): {mae:.4f}")
- print(f"决定系数 (R²): {r2:.4f}")
- print(f"调整R²: {adj_r2:.4f}")
- # 创建诊断图
- set_chinese_font()
- fig, axes = plt.subplots(2, 3, figsize=(15, 10))
- # 1. 残差 vs 拟合值图(检查同方差性和线性关系)
- axes[0, 0].scatter(y_pred, residuals, alpha=0.6)
- axes[0, 0].axhline(y=0, color='red', linestyle='--')
- axes[0, 0].set_xlabel('拟合值')
- axes[0, 0].set_ylabel('残差')
- axes[0, 0].set_title('残差 vs 拟合值')
- # 2. 正态Q-Q图(检查残差正态性)
- stats.probplot(residuals, dist="norm", plot=axes[0, 1])
- axes[0, 1].set_title('Q-Q图(检查正态性)')
- # 3. 残差直方图
- axes[0, 2].hist(residuals, bins=30, density=True, alpha=0.7)
- axes[0, 2].set_xlabel('残差')
- axes[0, 2].set_ylabel('密度')
- axes[0, 2].set_title('残差分布')
- # 4. 观测值 vs 拟合值
- axes[1, 0].scatter(y, y_pred, alpha=0.6)
- min_val = min(y.min(), y_pred.min())
- max_val = max(y.max(), y_pred.max())
- axes[1, 0].plot([min_val, max_val], [min_val, max_val], 'red', linestyle='--')
- axes[1, 0].set_xlabel('实际值')
- axes[1, 0].set_ylabel('预测值')
- axes[1, 0].set_title('实际值 vs 预测值')
- r2 = r2_score(y, y_pred)
- axes[1, 0].text(0.05, 0.95, f'R² = {r2:.3f}', transform=axes[1, 0].transAxes)
- # 5. 残差的时间序列图(如果是时间序列数据)
- axes[1, 1].plot(residuals)
- axes[1, 1].axhline(y=0, color='red', linestyle='--')
- axes[1, 1].set_xlabel('时间/观测序号')
- axes[1, 1].set_ylabel('残差')
- axes[1, 1].set_title('残差时间序列')
- # 6. 尺度-位置图(检查同方差性)
- standardized_residuals = residuals / np.std(residuals)
- axes[1, 2].scatter(y_pred, np.sqrt(np.abs(standardized_residuals)), alpha=0.6)
- axes[1, 2].set_xlabel('拟合值')
- axes[1, 2].set_ylabel('√|标准化残差|')
- axes[1, 2].set_title('尺度-位置图')
- plt.tight_layout()
- plt.show()
- pass
- def any_regression_custom(self, target_name:str, path:str):
- """基于自定义字段进行回归建模,从文件读入建模字段"""
- def auto_fit(self, x_label_name:str, y_label_name:str, is_use_lasso:bool=True):
- """回归分析"""
- if __name__ == '__main__':
- # 数据库参数
- db_param = DatabaseParam(
- db_host=config.DB_HOST,
- db_user=config.DB_USER,
- db_password=config.DB_PASSWORD,
- db_name=config.DB_NAME,
- db_port=config.DB_PORT)
- my_box = RegressionBox(
- keys_file_dir=os.path.join(config.STATISTICS_FILE_DIR, config.STATISTICS_FILE_NAME),
- min_records = config.MIN_RECORDS, db_param = db_param,
- transfer_file_dir = os.path.join(config.ALL_ITEMS_FILE_DIR, config.TRANSFER_JSON_NAME))
- # 计算皮尔逊
- my_box.calculate_pearsonr_mat()
- # 进行回归分析
- my_box.any_regression_r_rank("RO1脱盐率")
|