# data_preprocessor.py import os import torch import joblib import numpy as np import pandas as pd from tqdm import tqdm # 进度条工具 from sklearn.preprocessing import MinMaxScaler # 数据标准化 from torch.utils.data import DataLoader, TensorDataset # PyTorch数据加载工具 from concurrent.futures import ThreadPoolExecutor # 多线程并行处理 class DataPreprocessor: @staticmethod def load_and_process_data(args, data): """ 加载并处理数据,划分训练/验证/测试集,生成数据加载器 :param args: 配置参数(包含日期范围、序列长度等) :param data: 原始数据(DataFrame格式) :return: 训练/验证/测试数据加载器、原始数据 """ # 处理日期列 data['date'] = pd.to_datetime(data['date']) time_interval = pd.Timedelta(hours=(args.resolution / 60)) window_time_span = time_interval * (args.seq_len + 314) # 划分训练/验证/测试集(调整起始日期以适应滑动窗口) val_start_date = pd.to_datetime(args.val_start_date) test_start_date = pd.to_datetime(args.test_start_date) # 调整验证集/测试集的起始日期(提前窗口跨度,确保能生成完整输入序列) adjusted_val_start = val_start_date - window_time_span adjusted_test_start = test_start_date - window_time_span # 生成训练/验证/测试集的掩码(布尔索引) train_mask = (data['date'] >= pd.to_datetime(args.train_start_date)) & \ (data['date'] <= pd.to_datetime(args.train_end_date)) val_mask = (data['date'] >= adjusted_val_start) & \ (data['date'] <= pd.to_datetime(args.val_end_date)) test_mask = (data['date'] >= adjusted_test_start) & \ (data['date'] <= pd.to_datetime(args.test_end_date)) # 应用掩码并重置索引 train_data = data[train_mask].reset_index(drop=True) val_data = data[val_mask].reset_index(drop=True) test_data = data[test_mask].reset_index(drop=True) # 移除日期列用于建模 train_data = train_data.drop(columns=['date']) val_data = val_data.drop(columns=['date']) test_data = test_data.drop(columns=['date']) # 创建监督学习数据集(输入序列+目标序列) train_supervised = DataPreprocessor.create_supervised_dataset( args, train_data, 1 ) val_supervised = DataPreprocessor.create_supervised_dataset( args, val_data, 1 ) test_supervised = DataPreprocessor.create_supervised_dataset( args, test_data, args.step_size ) # 转换为DataLoader train_loader = DataPreprocessor.load_data( args, train_supervised, shuffle=True ) val_loader = DataPreprocessor.load_data( args, val_supervised, shuffle=False ) test_loader = DataPreprocessor.load_data( args, test_supervised, shuffle=False ) return train_loader, val_loader, test_loader, data # 返回原始数据用于后续处理 @staticmethod def read_and_combine_csv_files(args): """ 读取并合并多个CSV文件(支持多线程加速) :param args: 配置参数(包含数据目录、文件命名模式等) :return: 合并并预处理后的DataFrame """ current_dir = os.path.dirname(__file__) parent_dir = os.path.dirname(current_dir) args.data_dir = os.path.join(parent_dir, args.data_dir) def read_file(file_count): """内部函数:读取单个CSV文件""" file_name = args.file_pattern.format(file_count) file_path = os.path.join(args.data_dir, file_name) return pd.read_csv(file_path) # 生成文件索引列表(从start_files到end_files) file_indices = list(range(args.start_files, args.end_files + 1)) # 多线程读取文件(加速大文件读取) max_workers = os.cpu_count() # 按CPU核心数设置线程数 with ThreadPoolExecutor(max_workers=max_workers) as executor: results = list(tqdm(executor.map(read_file, file_indices), total=len(file_indices), desc="正在读取文件")) # 合并所有数据并重置索引 all_data = pd.concat(results, ignore_index=True) # 按分辨率下采样 chunk = all_data.iloc[::args.resolution, :].reset_index(drop=True) # 处理日期和时间特征 chunk = DataPreprocessor.process_date(chunk) # 归一化 chunk = DataPreprocessor.scaler_data(chunk) return chunk @staticmethod def process_date(data): """ 处理日期列,生成周期性时间特征(年周期) :param data: 包含'index'列(日期字符串)的DataFrame :return: 增加时间特征后的DataFrame """ data = data.rename(columns={'index': 'date'}) data['date'] = pd.to_datetime(data['date']) # 生成周期性时间特征 data['day_of_year'] = data['date'].dt.dayofyear data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366) data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366) # 移除原始时间列,保留特征列 data.drop(columns=['day_of_year'], inplace=True) # 调整列顺序(日期+时间特征+其他特征) time_features = ['day_year_sin', 'day_year_cos'] other_columns = [col for col in data.columns if col not in ['date'] and col not in time_features] data = data[['date'] + time_features + other_columns] return data @staticmethod def scaler_data(data): """ 对数据进行归一化(除日期列外),并保存标准化器 :param data: 包含'date'列的DataFrame :return: 标准化后的DataFrame """ date_col = data[['date']] data_to_scale = data.drop(columns=['date']) scaler = MinMaxScaler(feature_range=(0, 1)) scaled_data = scaler.fit_transform(data_to_scale) joblib.dump(scaler, 'scaler.pkl') # 保存归一化器 scaled_data = pd.DataFrame(scaled_data, columns=data_to_scale.columns) # 拼接日期列和标准化后的数据 scaled_data = pd.concat([date_col.reset_index(drop=True), scaled_data], axis=1) return scaled_data @staticmethod def create_supervised_dataset(args, data, step_size): """ 将时间序列数据转换为监督学习格式(输入序列+目标序列) :param args: 配置参数(序列长度、输出长度等) :param data: 输入数据(DataFrame,不含日期列) :param step_size: 滑动窗口步长 :return: 监督学习数据集(DataFrame) """ data = pd.DataFrame(data) cols = [] col_names = [] feature_columns = data.columns.tolist() # 输入序列(t-0到t-(seq_len-1)) for col in feature_columns: for i in range(args.seq_len - 1, -1, -1): cols.append(data[[col]].shift(i)) col_names.append(f"{col}(t-{i})") # 目标序列(仅取最后labels_num列作为预测目标) target_columns = feature_columns[-args.labels_num:] for i in range(1, args.output_size + 1): for col in target_columns: cols.append(data[[col]].shift(-i)) col_names.append(f"{col}(t+{i})") # 合并并清洗数据 dataset = pd.concat(cols, axis=1) dataset.columns = col_names dataset = dataset.iloc[::step_size, :] # 按步长采样 dataset.dropna(inplace=True) # 移除含缺失值的行 return dataset @staticmethod def load_data(args, dataset, shuffle): """ 将监督学习数据集转换为PyTorch张量,并创建DataLoader :param args: 配置参数 :param dataset: 监督学习数据集(DataFrame) :param shuffle: 是否打乱数据 :return: DataLoader对象 """ input_length = args.seq_len n_features = args.feature_num labels_num = args.labels_num n_features_total = n_features * input_length # 输入特征总维度 n_labels_total = args.output_size * labels_num # 目标总维度 # 分割输入和目标 X = dataset.values[:, :n_features_total] y = dataset.values[:, n_features_total:n_features_total + n_labels_total] # 重塑输入为[样本数, 序列长度, 特征数] X = X.reshape(X.shape[0], input_length, n_features) X = torch.tensor(X, dtype=torch.float32).to(args.device) y = torch.tensor(y, dtype=torch.float32).to(args.device) # 创建数据集和数据加载器 dataset_tensor = TensorDataset(X, y) generator = torch.Generator() generator.manual_seed(args.random_seed) # 固定随机种子确保可复现 data_loader = DataLoader( dataset_tensor, batch_size=args.batch_size, shuffle=shuffle, generator=generator ) return data_loader