# data_preprocessor.py import os import torch import joblib import numpy as np import pandas as pd from tqdm import tqdm from sklearn.preprocessing import MinMaxScaler from torch.utils.data import DataLoader, TensorDataset from concurrent.futures import ThreadPoolExecutor class DataPreprocessor: @staticmethod def load_and_process_data(args, data): # 处理日期列 data['date'] = pd.to_datetime(data['date']) time_interval = pd.Timedelta(hours=(args.resolution / 60)) window_time_span = time_interval * (args.seq_len + 314) # 划分训练/验证/测试集(调整起始日期以适应滑动窗口) val_start_date = pd.to_datetime(args.val_start_date) test_start_date = pd.to_datetime(args.test_start_date) adjusted_val_start = val_start_date - window_time_span adjusted_test_start = test_start_date - window_time_span train_mask = (data['date'] >= pd.to_datetime(args.train_start_date)) & \ (data['date'] <= pd.to_datetime(args.train_end_date)) val_mask = (data['date'] >= adjusted_val_start) & \ (data['date'] <= pd.to_datetime(args.val_end_date)) test_mask = (data['date'] >= adjusted_test_start) & \ (data['date'] <= pd.to_datetime(args.test_end_date)) train_data = data[train_mask].reset_index(drop=True) val_data = data[val_mask].reset_index(drop=True) test_data = data[test_mask].reset_index(drop=True) # 移除日期列用于建模 train_data = train_data.drop(columns=['date']) val_data = val_data.drop(columns=['date']) test_data = test_data.drop(columns=['date']) # 创建监督学习数据集(输入序列+目标序列) train_supervised = DataPreprocessor.create_supervised_dataset( args, train_data, 1 ) val_supervised = DataPreprocessor.create_supervised_dataset( args, val_data, 1 ) test_supervised = DataPreprocessor.create_supervised_dataset( args, test_data, args.step_size ) # 转换为DataLoader train_loader = DataPreprocessor.load_data( args, train_supervised, shuffle=True ) val_loader = DataPreprocessor.load_data( args, val_supervised, shuffle=False ) test_loader = DataPreprocessor.load_data( args, test_supervised, shuffle=False ) return train_loader, val_loader, test_loader, data # 返回原始数据用于后续处理 @staticmethod def read_and_combine_csv_files(args): # 读取并合并多个CSV文件 current_dir = os.path.dirname(__file__) parent_dir = os.path.dirname(current_dir) args.data_dir = os.path.join(parent_dir, args.data_dir) def read_file(file_count): file_name = args.file_pattern.format(file_count) file_path = os.path.join(args.data_dir, file_name) return pd.read_csv(file_path) file_indices = list(range(args.start_files, args.end_files + 1)) max_workers = os.cpu_count() with ThreadPoolExecutor(max_workers=max_workers) as executor: results = list(tqdm(executor.map(read_file, file_indices), total=len(file_indices), desc="正在读取文件")) all_data = pd.concat(results, ignore_index=True) # 按分辨率下采样 chunk = all_data.iloc[::args.resolution, :].reset_index(drop=True) # 处理日期和时间特征 chunk = DataPreprocessor.process_date(chunk) # 归一化 chunk = DataPreprocessor.scaler_data(chunk) return chunk @staticmethod def process_date(data): # 处理日期列并生成时间特征 data = data.rename(columns={'index': 'date'}) data['date'] = pd.to_datetime(data['date']) # 生成周期性时间特征 data['day_of_year'] = data['date'].dt.dayofyear data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366) data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366) # 移除原始时间列,保留特征列 data.drop(columns=['day_of_year'], inplace=True) # 调整列顺序(日期+时间特征+其他特征) time_features = ['day_year_sin', 'day_year_cos'] other_columns = [col for col in data.columns if col not in ['date'] and col not in time_features] data = data[['date'] + time_features + other_columns] return data @staticmethod def scaler_data(data): # 归一化数据(除日期外) date_col = data[['date']] data_to_scale = data.drop(columns=['date']) scaler = MinMaxScaler(feature_range=(0, 1)) scaled_data = scaler.fit_transform(data_to_scale) joblib.dump(scaler, 'scaler.pkl') # 保存归一化器 scaled_data = pd.DataFrame(scaled_data, columns=data_to_scale.columns) scaled_data = pd.concat([date_col.reset_index(drop=True), scaled_data], axis=1) return scaled_data @staticmethod def create_supervised_dataset(args, data, step_size): # 创建监督学习数据集(输入序列+目标序列) data = pd.DataFrame(data) cols = [] col_names = [] feature_columns = data.columns.tolist() # 输入序列(t-0到t-(seq_len-1)) for col in feature_columns: for i in range(args.seq_len - 1, -1, -1): cols.append(data[[col]].shift(i)) col_names.append(f"{col}(t-{i})") # 目标序列(仅取最后labels_num列作为预测目标) target_columns = feature_columns[-args.labels_num:] for i in range(1, args.output_size + 1): for col in target_columns: cols.append(data[[col]].shift(-i)) col_names.append(f"{col}(t+{i})") # 合并并清洗数据 dataset = pd.concat(cols, axis=1) dataset.columns = col_names dataset = dataset.iloc[::step_size, :] # 按步长采样 dataset.dropna(inplace=True) # 移除含缺失值的行 return dataset @staticmethod def load_data(args, dataset, shuffle): # 将数据集转换为PyTorch张量并创建DataLoader input_length = args.seq_len n_features = args.feature_num labels_num = args.labels_num n_features_total = n_features * input_length # 输入特征总维度 n_labels_total = args.output_size * labels_num # 目标总维度 # 分割输入和目标 X = dataset.values[:, :n_features_total] y = dataset.values[:, n_features_total:n_features_total + n_labels_total] # 重塑输入为[样本数, 序列长度, 特征数] X = X.reshape(X.shape[0], input_length, n_features) X = torch.tensor(X, dtype=torch.float32).to(args.device) y = torch.tensor(y, dtype=torch.float32).to(args.device) # 创建数据集和数据加载器 dataset_tensor = TensorDataset(X, y) generator = torch.Generator() generator.manual_seed(args.random_seed) # 固定随机种子确保可复现 data_loader = DataLoader( dataset_tensor, batch_size=args.batch_size, shuffle=shuffle, generator=generator ) return data_loader