| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- # data_preprocessor.py
- import os
- import torch
- import joblib
- import numpy as np
- import pandas as pd
- from tqdm import tqdm
- from sklearn.preprocessing import MinMaxScaler
- from torch.utils.data import DataLoader, TensorDataset
- from concurrent.futures import ThreadPoolExecutor
- from config import config
- class DataPreprocessor:
- """数据预处理类"""
- @staticmethod
- def load_and_process_data(data):
- data['date'] = pd.to_datetime(data['date'])
- time_interval = pd.Timedelta(minutes=(4 * config.RESOLUTION / 60))
- window_time_span = time_interval * (config.SEQ_LEN + 1)
- val_start_date = pd.to_datetime(config.VAL_START_DATE)
- test_start_date = pd.to_datetime(config.TEST_START_DATE)
-
- adjusted_val_start = val_start_date - window_time_span
- adjusted_test_start = test_start_date - window_time_span
-
- train_mask = (data['date'] >= pd.to_datetime(config.TRAIN_START_DATE)) & \
- (data['date'] <= pd.to_datetime(config.TRAIN_END_DATE))
- val_mask = (data['date'] >= adjusted_val_start) & \
- (data['date'] <= pd.to_datetime(config.VAL_END_DATE))
- test_mask = (data['date'] >= adjusted_test_start) & \
- (data['date'] <= pd.to_datetime(config.TEST_END_DATE))
- train_data = data[train_mask].reset_index(drop=True).drop(columns=['date'])
- val_data = data[val_mask].reset_index(drop=True).drop(columns=['date'])
- test_data = data[test_mask].reset_index(drop=True).drop(columns=['date'])
-
- train_supervised = DataPreprocessor.create_supervised_dataset(train_data, 1)
- val_supervised = DataPreprocessor.create_supervised_dataset(val_data, 1)
- test_supervised = DataPreprocessor.create_supervised_dataset(test_data, config.STEP_SIZE)
-
- train_loader = DataPreprocessor.load_data(train_supervised, shuffle=True)
- val_loader = DataPreprocessor.load_data(val_supervised, shuffle=False)
- test_loader = DataPreprocessor.load_data(test_supervised, shuffle=False)
-
- return train_loader, val_loader, test_loader, data
-
- @staticmethod
- def read_and_combine_csv_files():
- def read_file(file_count):
- file_name = config.FILE_PATTERN.format(file_count)
- file_path = os.path.join(config.DATA_DIR, file_name)
- try:
- df = pd.read_csv(file_path)
- return df[config.REQUIRED_COLUMNS]
- except KeyError as e:
- print(f"文件 {file_name} 中缺少列: {e}")
- raise
-
- file_indices = list(range(config.START_FILES, config.END_FILES + 1))
-
- with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
- results = list(tqdm(executor.map(read_file, file_indices),
- total=len(file_indices), desc="正在读取文件"))
-
- all_data = pd.concat(results, ignore_index=True)
- all_data = all_data[config.REQUIRED_COLUMNS]
-
- chunk = all_data.iloc[::config.RESOLUTION, :].reset_index(drop=True)
- chunk = DataPreprocessor.process_date(chunk)
- chunk = DataPreprocessor.scaler_data(chunk)
- return chunk
-
- @staticmethod
- def process_date(data):
- data = data.rename(columns={'index': 'date'})
- data['date'] = pd.to_datetime(data['date'])
-
- minute_of_day = data['date'].dt.hour * 60 + data['date'].dt.minute
- day_of_year = data['date'].dt.dayofyear
-
- time_features = ['minute_sin', 'minute_cos', 'day_year_sin', 'day_year_cos']
- data['minute_sin'] = np.sin(2 * np.pi * minute_of_day / 1440)
- data['minute_cos'] = np.cos(2 * np.pi * minute_of_day / 1440)
- data['day_year_sin'] = np.sin(2 * np.pi * day_of_year / 366)
- data['day_year_cos'] = np.cos(2 * np.pi * day_of_year / 366)
-
- other_columns = [col for col in data.columns if col not in ['date'] + time_features]
- return data[['date'] + time_features + other_columns]
-
- @staticmethod
- def scaler_data(data):
- date_col = data[['date']]
- data_to_scale = data.drop(columns=['date'])
- scaler = MinMaxScaler(feature_range=(0, 1))
- scaled_data = scaler.fit_transform(data_to_scale)
- joblib.dump(scaler, config.SCALER_PATH)
- scaled_data = pd.DataFrame(scaled_data, columns=data_to_scale.columns)
- return pd.concat([date_col.reset_index(drop=True), scaled_data], axis=1)
-
- @staticmethod
- def create_supervised_dataset(data, step_size):
- data = pd.DataFrame(data)
- cols, col_names = [], []
- feature_columns = data.columns.tolist()
- for col in feature_columns:
- for i in range(config.SEQ_LEN - 1, -1, -1):
- cols.append(data[[col]].shift(i))
- col_names.append(f"{col}(t-{i})")
-
- target_columns = feature_columns[-config.LABELS_NUM:]
- for i in range(1, config.OUTPUT_SIZE + 1):
- for col in target_columns:
- cols.append(data[[col]].shift(-i))
- col_names.append(f"{col}(t+{i})")
- dataset = pd.concat(cols, axis=1)
- dataset.columns = col_names
- dataset = dataset.iloc[::step_size, :]
- dataset.dropna(inplace=True)
- return dataset
- @staticmethod
- def load_data(dataset, shuffle):
- n_features_total = config.FEATURE_NUM * config.SEQ_LEN
- n_labels_total = config.OUTPUT_SIZE * config.LABELS_NUM
- X = dataset.values[:, :n_features_total]
- y = dataset.values[:, n_features_total:n_features_total + n_labels_total]
-
- X = X.reshape(X.shape[0], config.SEQ_LEN, config.FEATURE_NUM)
- device = torch.device(f"cuda:{config.DEVICE_ID}" if torch.cuda.is_available() else "cpu")
-
- X = torch.tensor(X, dtype=torch.float32).to(device)
- y = torch.tensor(y, dtype=torch.float32).to(device)
- dataset_tensor = TensorDataset(X, y)
- generator = torch.Generator()
- generator.manual_seed(config.RANDOM_SEED)
-
- return DataLoader(dataset_tensor, batch_size=config.BATCH_SIZE, shuffle=shuffle, generator=generator)
|