import os import pandas as pd import numpy as np import pywt import logging from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split import torch import joblib from torch.utils.data import TensorDataset, DataLoader class DataPreprocessor: def __init__(self, args, logger=None): self.args = args self.data_dir = args.data_dir self.num_files = args.num_files self.scaler_features = StandardScaler() self.scaler_targets = StandardScaler() self.logger = logger if logger is not None else self._default_logger() self.features = None # 保存特征数据用于构建邻接矩阵 self.scaler_dir = 'scalers' os.makedirs(self.scaler_dir, exist_ok=True) self.features_scaler_path = os.path.join(self.scaler_dir, 'features_scaler.joblib') self.targets_scaler_path = os.path.join(self.scaler_dir, 'targets_scaler.joblib') def _default_logger(self): """默认日志记录器""" logger = logging.getLogger('DataPreprocessor') logger.setLevel(logging.INFO) console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(formatter) logger.addHandler(console_handler) return logger def load_data(self): """加载所有数据文件并合并""" all_data = [] for i in range(1, self.num_files + 1): file_path = os.path.join(self.data_dir, f'data_process_{i}.csv') try: df = pd.read_csv(file_path, index_col=0) df = df.reset_index() # 将原索引作为第一列 all_data.append(df) self.logger.info(f"Loaded file {i}/{self.num_files}") except Exception as e: self.logger.error(f"Error loading file {i}: {e}") combined_df = pd.concat(all_data, ignore_index=True) return combined_df def decompose_time(self, df): """将时间列分解为年、月、日、时、分、秒""" time_col = df.columns[0] df[time_col] = pd.to_datetime(df[time_col]) df['year'] = df[time_col].dt.year df['month'] = df[time_col].dt.month df['day'] = df[time_col].dt.day df['hour'] = df[time_col].dt.hour df['minute'] = df[time_col].dt.minute df['second'] = df[time_col].dt.second df = df.drop(columns=[time_col]) # 调整列顺序 time_features = ['year', 'month', 'day', 'hour', 'minute', 'second'] other_features = [col for col in df.columns if col not in time_features] df = df[time_features + other_features] return df def wavelet_denoising(self, data, wavelet='db4', level=1): """对数据进行小波降噪,避免除以零警告""" denoised_data = np.zeros_like(data) epsilon = 1e-10 # 极小值,避免除以零 for i in range(data.shape[1]): # 小波分解 coeffs = pywt.wavedec(data[:, i], wavelet, level=level) # 计算阈值时避免系数为零 sigma = np.median(np.abs(coeffs[-level] + epsilon)) / 0.6745 # 加epsilon original_length = len(data[:, i]) threshold = sigma * np.sqrt(2 * np.log(original_length)) # 对系数进行阈值处理(手动实现软阈值,避免库函数警告) processed_coeffs = [] for c in coeffs[1:]: # 手动计算软阈值:y = sign(x) * max(|x| - threshold, 0) magnitude = np.abs(c) # 避免除以零:给magnitude加epsilon thresholded = np.where( magnitude > threshold, np.sign(c) * (magnitude - threshold), 0 ) processed_coeffs.append(thresholded) coeffs[1:] = processed_coeffs # 小波重构(保持之前的长度对齐处理) reconstructed = pywt.waverec(coeffs, wavelet) # 补充之前的长度对齐逻辑(如果之前已添加) original_length = data[:, i].shape[0] if len(reconstructed) > original_length: reconstructed = reconstructed[:original_length] elif len(reconstructed) < original_length: reconstructed = np.pad(reconstructed, (0, original_length - len(reconstructed)), mode='edge') denoised_data[:, i] = reconstructed return denoised_data def normalize_data(self, features, targets): """归一化数据并保存scaler""" features_scaled = self.scaler_features.fit_transform(features) targets_scaled = self.scaler_targets.fit_transform(targets) # 保存scaler joblib.dump(self.scaler_features, self.features_scaler_path) joblib.dump(self.scaler_targets, self.targets_scaler_path) self.logger.info(f"已保存特征归一化模型到 {self.features_scaler_path}") self.logger.info(f"已保存目标归一化模型到 {self.targets_scaler_path}") return features_scaled, targets_scaled def inverse_transform_targets(self, targets_scaled): """将归一化的目标变量反变换回原始尺度""" return self.scaler_targets.inverse_transform(targets_scaled) def split_data(self, features, targets): """划分训练集、验证集和测试集""" X_train, X_temp, y_train, y_temp = train_test_split( features, targets, test_size=self.args.test_ratio + self.args.val_ratio, shuffle=False ) test_size = self.args.test_ratio / (self.args.test_ratio + self.args.val_ratio) X_val, X_test, y_val, y_test = train_test_split( X_temp, y_temp, test_size=test_size, shuffle=False ) return X_train, X_val, X_test, y_train, y_val, y_test def create_dataloaders(self, X_train, X_val, X_test, y_train, y_val, y_test): """创建DataLoader""" X_train = torch.FloatTensor(X_train) y_train = torch.FloatTensor(y_train) X_val = torch.FloatTensor(X_val) y_val = torch.FloatTensor(y_val) X_test = torch.FloatTensor(X_test) y_test = torch.FloatTensor(y_test) train_dataset = TensorDataset(X_train, y_train) val_dataset = TensorDataset(X_val, y_val) test_dataset = TensorDataset(X_test, y_test) train_loader = DataLoader( train_dataset, batch_size=self.args.batch_size, shuffle=True ) val_loader = DataLoader( val_dataset, batch_size=self.args.batch_size, shuffle=False ) test_loader = DataLoader( test_dataset, batch_size=self.args.batch_size, shuffle=False ) return train_loader, val_loader, test_loader def preprocess(self): """完整的预处理流程""" df = self.load_data() self.logger.info(f"Original data shape: {df.shape}") df = self.decompose_time(df) self.logger.info(f"Data shape after time decomposition: {df.shape}") data = df.values data_denoised = self.wavelet_denoising(data) self.logger.info(f"Data shape after wavelet denoising: {data_denoised.shape}") # 保存特征数据用于构建邻接矩阵 self.features = data_denoised[:, :self.args.num_features] targets = data_denoised[:, self.args.num_features:self.args.num_features+self.args.num_targets] self.logger.info(f"Features shape: {self.features.shape}, Targets shape: {targets.shape}") features_scaled, targets_scaled = self.normalize_data(self.features, targets) X_train, X_val, X_test, y_train, y_val, y_test = self.split_data( features_scaled, targets_scaled ) self.logger.info(f"Train: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}") train_loader, val_loader, test_loader = self.create_dataloaders( X_train, X_val, X_test, y_train, y_val, y_test ) return train_loader, val_loader, test_loader, self def create_adjacency_matrix(self): """创建有向图的邻接矩阵(基于特征相关性)""" num_nodes = self.args.num_features adj = torch.zeros((num_nodes, num_nodes)) if self.features is None: self.logger.warning("特征数据未初始化,使用默认邻接矩阵") # 默认自连接 for i in range(num_nodes): adj[i, i] = 1 return adj # 计算特征之间的相关性 corr_matrix = np.corrcoef(self.features.T) corr_threshold = 0.3 # 相关性阈值 # 基于相关性构建有向边 for i in range(num_nodes): adj[i, i] = 1 # 自连接 for j in range(num_nodes): if i != j and abs(corr_matrix[i, j]) > corr_threshold: adj[i, j] = 1 self.logger.info(f"邻接矩阵中边的数量: {int(torch.sum(adj))}") return adj