| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- import os
- import pandas as pd
- import numpy as np
- import pywt
- import logging
- from sklearn.preprocessing import StandardScaler
- from sklearn.model_selection import train_test_split
- import torch
- import joblib
- from torch.utils.data import TensorDataset, DataLoader
- class DataPreprocessor:
- def __init__(self, args, logger=None):
- self.args = args
- self.data_dir = args.data_dir
- self.num_files = args.num_files
- self.scaler_features = StandardScaler()
- self.scaler_targets = StandardScaler()
- self.logger = logger if logger is not None else self._default_logger()
- self.features = None # 保存特征数据用于构建邻接矩阵
- self.scaler_dir = 'scalers'
- os.makedirs(self.scaler_dir, exist_ok=True)
- self.features_scaler_path = os.path.join(self.scaler_dir, 'features_scaler.joblib')
- self.targets_scaler_path = os.path.join(self.scaler_dir, 'targets_scaler.joblib')
-
- def _default_logger(self):
- """默认日志记录器"""
- logger = logging.getLogger('DataPreprocessor')
- logger.setLevel(logging.INFO)
- console_handler = logging.StreamHandler()
- console_handler.setLevel(logging.INFO)
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- console_handler.setFormatter(formatter)
- logger.addHandler(console_handler)
- return logger
-
- def load_data(self):
- """加载所有数据文件并合并"""
- all_data = []
-
- for i in range(1, self.num_files + 1):
- file_path = os.path.join(self.data_dir, f'data_process_{i}.csv')
- try:
- df = pd.read_csv(file_path, index_col=0)
- df = df.reset_index() # 将原索引作为第一列
- all_data.append(df)
- self.logger.info(f"Loaded file {i}/{self.num_files}")
- except Exception as e:
- self.logger.error(f"Error loading file {i}: {e}")
-
- combined_df = pd.concat(all_data, ignore_index=True)
- return combined_df
-
- def decompose_time(self, df):
- """将时间列分解为年、月、日、时、分、秒"""
- time_col = df.columns[0]
- df[time_col] = pd.to_datetime(df[time_col])
-
- df['year'] = df[time_col].dt.year
- df['month'] = df[time_col].dt.month
- df['day'] = df[time_col].dt.day
- df['hour'] = df[time_col].dt.hour
- df['minute'] = df[time_col].dt.minute
- df['second'] = df[time_col].dt.second
-
- df = df.drop(columns=[time_col])
-
- # 调整列顺序
- time_features = ['year', 'month', 'day', 'hour', 'minute', 'second']
- other_features = [col for col in df.columns if col not in time_features]
- df = df[time_features + other_features]
-
- return df
-
- def wavelet_denoising(self, data, wavelet='db4', level=1):
- """对数据进行小波降噪,避免除以零警告"""
- denoised_data = np.zeros_like(data)
- epsilon = 1e-10 # 极小值,避免除以零
-
- for i in range(data.shape[1]):
- # 小波分解
- coeffs = pywt.wavedec(data[:, i], wavelet, level=level)
-
- # 计算阈值时避免系数为零
- sigma = np.median(np.abs(coeffs[-level] + epsilon)) / 0.6745 # 加epsilon
- original_length = len(data[:, i])
- threshold = sigma * np.sqrt(2 * np.log(original_length))
-
- # 对系数进行阈值处理(手动实现软阈值,避免库函数警告)
- processed_coeffs = []
- for c in coeffs[1:]:
- # 手动计算软阈值:y = sign(x) * max(|x| - threshold, 0)
- magnitude = np.abs(c)
- # 避免除以零:给magnitude加epsilon
- thresholded = np.where(
- magnitude > threshold,
- np.sign(c) * (magnitude - threshold),
- 0
- )
- processed_coeffs.append(thresholded)
-
- coeffs[1:] = processed_coeffs
-
- # 小波重构(保持之前的长度对齐处理)
- reconstructed = pywt.waverec(coeffs, wavelet)
- # 补充之前的长度对齐逻辑(如果之前已添加)
- original_length = data[:, i].shape[0]
- if len(reconstructed) > original_length:
- reconstructed = reconstructed[:original_length]
- elif len(reconstructed) < original_length:
- reconstructed = np.pad(reconstructed, (0, original_length - len(reconstructed)), mode='edge')
-
- denoised_data[:, i] = reconstructed
-
- return denoised_data
-
- def normalize_data(self, features, targets):
- """归一化数据并保存scaler"""
- features_scaled = self.scaler_features.fit_transform(features)
- targets_scaled = self.scaler_targets.fit_transform(targets)
-
- # 保存scaler
- joblib.dump(self.scaler_features, self.features_scaler_path)
- joblib.dump(self.scaler_targets, self.targets_scaler_path)
- self.logger.info(f"已保存特征归一化模型到 {self.features_scaler_path}")
- self.logger.info(f"已保存目标归一化模型到 {self.targets_scaler_path}")
-
- return features_scaled, targets_scaled
-
- def inverse_transform_targets(self, targets_scaled):
- """将归一化的目标变量反变换回原始尺度"""
- return self.scaler_targets.inverse_transform(targets_scaled)
-
- def split_data(self, features, targets):
- """划分训练集、验证集和测试集"""
- X_train, X_temp, y_train, y_temp = train_test_split(
- features, targets, test_size=self.args.test_ratio + self.args.val_ratio,
- shuffle=False
- )
-
- test_size = self.args.test_ratio / (self.args.test_ratio + self.args.val_ratio)
- X_val, X_test, y_val, y_test = train_test_split(
- X_temp, y_temp, test_size=test_size, shuffle=False
- )
-
- return X_train, X_val, X_test, y_train, y_val, y_test
-
- def create_dataloaders(self, X_train, X_val, X_test, y_train, y_val, y_test):
- """创建DataLoader"""
- X_train = torch.FloatTensor(X_train)
- y_train = torch.FloatTensor(y_train)
- X_val = torch.FloatTensor(X_val)
- y_val = torch.FloatTensor(y_val)
- X_test = torch.FloatTensor(X_test)
- y_test = torch.FloatTensor(y_test)
-
- train_dataset = TensorDataset(X_train, y_train)
- val_dataset = TensorDataset(X_val, y_val)
- test_dataset = TensorDataset(X_test, y_test)
-
- train_loader = DataLoader(
- train_dataset, batch_size=self.args.batch_size, shuffle=True
- )
- val_loader = DataLoader(
- val_dataset, batch_size=self.args.batch_size, shuffle=False
- )
- test_loader = DataLoader(
- test_dataset, batch_size=self.args.batch_size, shuffle=False
- )
-
- return train_loader, val_loader, test_loader
-
- def preprocess(self):
- """完整的预处理流程"""
- df = self.load_data()
- self.logger.info(f"Original data shape: {df.shape}")
-
- df = self.decompose_time(df)
- self.logger.info(f"Data shape after time decomposition: {df.shape}")
-
- data = df.values
- data_denoised = self.wavelet_denoising(data)
- self.logger.info(f"Data shape after wavelet denoising: {data_denoised.shape}")
-
- # 保存特征数据用于构建邻接矩阵
- self.features = data_denoised[:, :self.args.num_features]
- targets = data_denoised[:, self.args.num_features:self.args.num_features+self.args.num_targets]
- self.logger.info(f"Features shape: {self.features.shape}, Targets shape: {targets.shape}")
-
- features_scaled, targets_scaled = self.normalize_data(self.features, targets)
-
- X_train, X_val, X_test, y_train, y_val, y_test = self.split_data(
- features_scaled, targets_scaled
- )
- self.logger.info(f"Train: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}")
-
- train_loader, val_loader, test_loader = self.create_dataloaders(
- X_train, X_val, X_test, y_train, y_val, y_test
- )
-
- return train_loader, val_loader, test_loader, self
-
- def create_adjacency_matrix(self):
- """创建有向图的邻接矩阵(基于特征相关性)"""
- num_nodes = self.args.num_features
- adj = torch.zeros((num_nodes, num_nodes))
-
- if self.features is None:
- self.logger.warning("特征数据未初始化,使用默认邻接矩阵")
- # 默认自连接
- for i in range(num_nodes):
- adj[i, i] = 1
- return adj
-
- # 计算特征之间的相关性
- corr_matrix = np.corrcoef(self.features.T)
- corr_threshold = 0.3 # 相关性阈值
-
- # 基于相关性构建有向边
- for i in range(num_nodes):
- adj[i, i] = 1 # 自连接
- for j in range(num_nodes):
- if i != j and abs(corr_matrix[i, j]) > corr_threshold:
- adj[i, j] = 1
-
- self.logger.info(f"邻接矩阵中边的数量: {int(torch.sum(adj))}")
- return adj
|