# data_preprocessor.py import os import torch import joblib import numpy as np import pandas as pd from tqdm import tqdm from sklearn.preprocessing import MinMaxScaler from torch.utils.data import DataLoader, TensorDataset from concurrent.futures import ThreadPoolExecutor from config import config class DataPreprocessor: """数据预处理类""" @staticmethod def load_and_process_data(data): data['date'] = pd.to_datetime(data['date']) time_interval = pd.Timedelta(minutes=(4 * config.RESOLUTION / 60)) window_time_span = time_interval * (config.SEQ_LEN + 1) val_start_date = pd.to_datetime(config.VAL_START_DATE) test_start_date = pd.to_datetime(config.TEST_START_DATE) adjusted_val_start = val_start_date - window_time_span adjusted_test_start = test_start_date - window_time_span train_mask = (data['date'] >= pd.to_datetime(config.TRAIN_START_DATE)) & \ (data['date'] <= pd.to_datetime(config.TRAIN_END_DATE)) val_mask = (data['date'] >= adjusted_val_start) & \ (data['date'] <= pd.to_datetime(config.VAL_END_DATE)) test_mask = (data['date'] >= adjusted_test_start) & \ (data['date'] <= pd.to_datetime(config.TEST_END_DATE)) train_data = data[train_mask].reset_index(drop=True).drop(columns=['date']) val_data = data[val_mask].reset_index(drop=True).drop(columns=['date']) test_data = data[test_mask].reset_index(drop=True).drop(columns=['date']) train_supervised = DataPreprocessor.create_supervised_dataset(train_data, 1) val_supervised = DataPreprocessor.create_supervised_dataset(val_data, 1) test_supervised = DataPreprocessor.create_supervised_dataset(test_data, config.STEP_SIZE) train_loader = DataPreprocessor.load_data(train_supervised, shuffle=True) val_loader = DataPreprocessor.load_data(val_supervised, shuffle=False) test_loader = DataPreprocessor.load_data(test_supervised, shuffle=False) return train_loader, val_loader, test_loader, data @staticmethod def read_and_combine_csv_files(): def read_file(file_count): file_name = config.FILE_PATTERN.format(file_count) file_path = os.path.join(config.DATA_DIR, file_name) try: df = pd.read_csv(file_path) return df[config.REQUIRED_COLUMNS] except KeyError as e: print(f"文件 {file_name} 中缺少列: {e}") raise file_indices = list(range(config.START_FILES, config.END_FILES + 1)) with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: results = list(tqdm(executor.map(read_file, file_indices), total=len(file_indices), desc="正在读取文件")) all_data = pd.concat(results, ignore_index=True) all_data = all_data[config.REQUIRED_COLUMNS] chunk = all_data.iloc[::config.RESOLUTION, :].reset_index(drop=True) chunk = DataPreprocessor.process_date(chunk) chunk = DataPreprocessor.scaler_data(chunk) return chunk @staticmethod def process_date(data): data = data.rename(columns={'index': 'date'}) data['date'] = pd.to_datetime(data['date']) minute_of_day = data['date'].dt.hour * 60 + data['date'].dt.minute day_of_year = data['date'].dt.dayofyear time_features = ['minute_sin', 'minute_cos', 'day_year_sin', 'day_year_cos'] data['minute_sin'] = np.sin(2 * np.pi * minute_of_day / 1440) data['minute_cos'] = np.cos(2 * np.pi * minute_of_day / 1440) data['day_year_sin'] = np.sin(2 * np.pi * day_of_year / 366) data['day_year_cos'] = np.cos(2 * np.pi * day_of_year / 366) other_columns = [col for col in data.columns if col not in ['date'] + time_features] return data[['date'] + time_features + other_columns] @staticmethod def scaler_data(data): date_col = data[['date']] data_to_scale = data.drop(columns=['date']) scaler = MinMaxScaler(feature_range=(0, 1)) scaled_data = scaler.fit_transform(data_to_scale) joblib.dump(scaler, config.SCALER_PATH) scaled_data = pd.DataFrame(scaled_data, columns=data_to_scale.columns) return pd.concat([date_col.reset_index(drop=True), scaled_data], axis=1) @staticmethod def create_supervised_dataset(data, step_size): data = pd.DataFrame(data) cols, col_names = [], [] feature_columns = data.columns.tolist() for col in feature_columns: for i in range(config.SEQ_LEN - 1, -1, -1): cols.append(data[[col]].shift(i)) col_names.append(f"{col}(t-{i})") target_columns = feature_columns[-config.LABELS_NUM:] for i in range(1, config.OUTPUT_SIZE + 1): for col in target_columns: cols.append(data[[col]].shift(-i)) col_names.append(f"{col}(t+{i})") dataset = pd.concat(cols, axis=1) dataset.columns = col_names dataset = dataset.iloc[::step_size, :] dataset.dropna(inplace=True) return dataset @staticmethod def load_data(dataset, shuffle): n_features_total = config.FEATURE_NUM * config.SEQ_LEN n_labels_total = config.OUTPUT_SIZE * config.LABELS_NUM X = dataset.values[:, :n_features_total] y = dataset.values[:, n_features_total:n_features_total + n_labels_total] X = X.reshape(X.shape[0], config.SEQ_LEN, config.FEATURE_NUM) device = torch.device(f"cuda:{config.DEVICE_ID}" if torch.cuda.is_available() else "cpu") X = torch.tensor(X, dtype=torch.float32).to(device) y = torch.tensor(y, dtype=torch.float32).to(device) dataset_tensor = TensorDataset(X, y) generator = torch.Generator() generator.manual_seed(config.RANDOM_SEED) return DataLoader(dataset_tensor, batch_size=config.BATCH_SIZE, shuffle=shuffle, generator=generator)