| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- # data_preprocessor.py
- import os
- import torch
- import joblib
- import numpy as np
- import pandas as pd
- from tqdm import tqdm
- from sklearn.preprocessing import MinMaxScaler
- from torch.utils.data import DataLoader, TensorDataset
- from concurrent.futures import ThreadPoolExecutor
- class DataPreprocessor:
- """数据预处理类"""
-
- # 定义必须保留的列
- COLUMNS_TO_KEEP = [
- 'index',
- "AR.1#UF_JSFLOW_O", "AR.2#UF_JSFLOW_O", "AR.1#RO_JSFLOW_O", "AR.2#RO_JSFLOW_O",
- "AR.1#UF_JSPRESS_O", "AR.2#UF_JSPRESS_O", "AR.1#RO_JSPRESS_O", "AR.2#RO_JSPRESS_O",
- "AR.1#RO_EDJSPRESS_O", "AR.1#RO_SDJSPRESS_O", "AR.2#RO_EDJSPRESS_O", "AR.2#RO_SDJSPRESS_O",
- "AR.ZJS_TEMP_O", "AR.ZJS_ZD_O", "AR.RO_JSDD_O", "AR.RO_JSORP_O", "AR.RO_JSPH_O",
- "AR.1#UF_V_FB_O", "AR.2#UF_V_FB_O", "AR.1#UFBWB_FRE_FB_O", "AR.2#UFBWB_FRE_FB_O",
- "AR.1#RODJB_FRE_FB_O", "AR.1#ROGYB_FRE_FB_O", "AR.1#RODJB_CZ_O", "AR.1#ROGYB_CZ_O",
- "AR.2#RODJB_CZ_O", "AR.2#ROGYB_CZ_O", "AR.ROGSB_FRE_FB_O", "AR.UFGSB_FRE_FB_O",
- "AR.V_UF1_TJV_KD_FB", "AR.V_UF2_TJV_KD_FB", "AR.CS_LEVEL_O", "AR.UF_CSLEVEL_O",
- "AR.UF1_SSD_KMYC", "AR.UF2_SSD_KMYC", "AR.RO1_2D_YC", "AR.PUBLIC_BY_REAL_1",
- "1#RO_CSFLOW"
- ]
- @staticmethod
- def load_and_process_data(args, data):
- """加载并处理数据,划分训练/验证/测试集"""
- # 处理日期
- data['date'] = pd.to_datetime(data['date'])
- time_interval = pd.Timedelta(minutes=(4 * args.resolution / 60))
- window_time_span = time_interval * (args.seq_len + 1)
- val_start_date = pd.to_datetime(args.val_start_date)
- test_start_date = pd.to_datetime(args.test_start_date)
-
- # 调整时间窗口
- adjusted_val_start = val_start_date - window_time_span
- adjusted_test_start = test_start_date - window_time_span
-
- train_mask = (data['date'] >= pd.to_datetime(args.train_start_date)) & \
- (data['date'] <= pd.to_datetime(args.train_end_date))
- val_mask = (data['date'] >= adjusted_val_start) & \
- (data['date'] <= pd.to_datetime(args.val_end_date))
- test_mask = (data['date'] >= adjusted_test_start) & \
- (data['date'] <= pd.to_datetime(args.test_end_date))
- train_data = data[train_mask].reset_index(drop=True)
- val_data = data[val_mask].reset_index(drop=True)
- test_data = data[test_mask].reset_index(drop=True)
-
- train_data = train_data.drop(columns=['date'])
- val_data = val_data.drop(columns=['date'])
- test_data = test_data.drop(columns=['date'])
-
- # 创建数据集
- train_supervised = DataPreprocessor.create_supervised_dataset(args, train_data, 1)
- val_supervised = DataPreprocessor.create_supervised_dataset(args, val_data, 1)
- test_supervised = DataPreprocessor.create_supervised_dataset(args, test_data, args.step_size)
-
- # 转换为DataLoader
- train_loader = DataPreprocessor.load_data(args, train_supervised, shuffle=True)
- val_loader = DataPreprocessor.load_data(args, val_supervised, shuffle=False)
- test_loader = DataPreprocessor.load_data(args, test_supervised, shuffle=False)
-
- return train_loader, val_loader, test_loader, data
-
- @staticmethod
- def read_and_combine_csv_files(args):
- """读取文件并进行特征筛选和预处理"""
- current_dir = os.path.dirname(__file__)
- parent_dir = os.path.dirname(current_dir)
- args.data_dir = os.path.join(parent_dir, args.data_dir)
-
- def read_file(file_count):
- file_name = args.file_pattern.format(file_count)
- file_path = os.path.join(args.data_dir, file_name)
- try:
- df = pd.read_csv(file_path)
- # 确保只读取需要的列,若列不存在则会报错提示
- return df[DataPreprocessor.COLUMNS_TO_KEEP]
- except KeyError as e:
- print(f"文件 {file_name} 中缺少列: {e}")
- raise
-
- file_indices = list(range(args.start_files, args.end_files + 1))
- max_workers = os.cpu_count()
-
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- results = list(tqdm(executor.map(read_file, file_indices),
- total=len(file_indices),
- desc="正在读取文件"))
-
- all_data = pd.concat(results, ignore_index=True)
-
- # 确保列顺序一致
- all_data = all_data[DataPreprocessor.COLUMNS_TO_KEEP]
-
- # 下采样
- chunk = all_data.iloc[::args.resolution, :].reset_index(drop=True)
-
- # 处理特征
- chunk = DataPreprocessor.process_date(chunk, args)
- chunk = DataPreprocessor.scaler_data(chunk, args)
-
- return chunk
-
- @staticmethod
- def process_date(data, args):
- data = data.rename(columns={'index': 'date'})
- data['date'] = pd.to_datetime(data['date'])
-
- time_features = []
- # 固定生成分钟级和日级特征,保持与Predictor一致
- data['minute_of_day'] = data['date'].dt.hour * 60 + data['date'].dt.minute
- data['minute_sin'] = np.sin(2 * np.pi * data['minute_of_day'] / 1440)
- data['minute_cos'] = np.cos(2 * np.pi * data['minute_of_day'] / 1440)
-
- data['day_of_year'] = data['date'].dt.dayofyear
- data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366)
- data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366)
-
- time_features.extend(['minute_sin', 'minute_cos', 'day_year_sin', 'day_year_cos'])
- data.drop(columns=['minute_of_day', 'day_of_year'], inplace=True)
-
- other_columns = [col for col in data.columns if col not in ['date'] and col not in time_features]
- data = data[['date'] + time_features + other_columns]
- return data
-
- @staticmethod
- def scaler_data(data, args):
- date_col = data[['date']]
- data_to_scale = data.drop(columns=['date'])
- scaler = MinMaxScaler(feature_range=(0, 1))
- scaled_data = scaler.fit_transform(data_to_scale)
- joblib.dump(scaler, args.scaler_path)
- scaled_data = pd.DataFrame(scaled_data, columns=data_to_scale.columns)
- scaled_data = pd.concat([date_col.reset_index(drop=True), scaled_data], axis=1)
- return scaled_data
-
- @staticmethod
- def create_supervised_dataset(args, data, step_size):
- data = pd.DataFrame(data)
- cols = []
- col_names = []
- feature_columns = data.columns.tolist()
- # 输入序列
- for col in feature_columns:
- for i in range(args.seq_len - 1, -1, -1):
- cols.append(data[[col]].shift(i))
- col_names.append(f"{col}(t-{i})")
-
- # 目标序列 (取最后labels_num列)
- target_columns = feature_columns[-args.labels_num:]
- for i in range(1, args.output_size + 1):
- for col in target_columns:
- cols.append(data[[col]].shift(-i))
- col_names.append(f"{col}(t+{i})")
- dataset = pd.concat(cols, axis=1)
- dataset.columns = col_names
- dataset = dataset.iloc[::step_size, :]
- dataset.dropna(inplace=True)
- return dataset
- @staticmethod
- def load_data(args, dataset, shuffle):
- input_length = args.seq_len
- n_features = args.feature_num
- labels_num = args.labels_num
-
- n_features_total = n_features * input_length
- n_labels_total = args.output_size * labels_num
- X = dataset.values[:, :n_features_total]
- y = dataset.values[:, n_features_total:n_features_total + n_labels_total]
-
- X = X.reshape(X.shape[0], input_length, n_features)
- X = torch.tensor(X, dtype=torch.float32).to(args.device)
- y = torch.tensor(y, dtype=torch.float32).to(args.device)
- dataset_tensor = TensorDataset(X, y)
- generator = torch.Generator()
- generator.manual_seed(args.random_seed)
-
- return DataLoader(dataset_tensor, batch_size=args.batch_size, shuffle=shuffle, generator=generator)
|