|
|
@@ -0,0 +1,193 @@
|
|
|
+# data_preprocessor.py
|
|
|
+import os
|
|
|
+import torch
|
|
|
+import joblib
|
|
|
+import numpy as np
|
|
|
+import pandas as pd
|
|
|
+from tqdm import tqdm
|
|
|
+from sklearn.preprocessing import MinMaxScaler
|
|
|
+from torch.utils.data import DataLoader, TensorDataset
|
|
|
+from concurrent.futures import ThreadPoolExecutor
|
|
|
+
|
|
|
+class DataPreprocessor:
|
|
|
+ """数据预处理类"""
|
|
|
+
|
|
|
+ # 定义必须保留的列
|
|
|
+ COLUMNS_TO_KEEP = [
|
|
|
+ 'index',
|
|
|
+ "AR.1#UF_JSFLOW_O", "AR.2#UF_JSFLOW_O", "AR.1#RO_JSFLOW_O", "AR.2#RO_JSFLOW_O",
|
|
|
+ "AR.1#UF_JSPRESS_O", "AR.2#UF_JSPRESS_O", "AR.1#RO_JSPRESS_O", "AR.2#RO_JSPRESS_O",
|
|
|
+ "AR.1#RO_EDJSPRESS_O", "AR.1#RO_SDJSPRESS_O", "AR.2#RO_EDJSPRESS_O", "AR.2#RO_SDJSPRESS_O",
|
|
|
+ "AR.ZJS_TEMP_O", "AR.ZJS_ZD_O", "AR.RO_JSDD_O", "AR.RO_JSORP_O", "AR.RO_JSPH_O",
|
|
|
+ "AR.1#UF_V_FB_O", "AR.2#UF_V_FB_O", "AR.1#UFBWB_FRE_FB_O", "AR.2#UFBWB_FRE_FB_O",
|
|
|
+ "AR.1#RODJB_FRE_FB_O", "AR.1#ROGYB_FRE_FB_O", "AR.1#RODJB_CZ_O", "AR.1#ROGYB_CZ_O",
|
|
|
+ "AR.2#RODJB_CZ_O", "AR.2#ROGYB_CZ_O", "AR.ROGSB_FRE_FB_O", "AR.UFGSB_FRE_FB_O",
|
|
|
+ "AR.V_UF1_TJV_KD_FB", "AR.V_UF2_TJV_KD_FB", "AR.CS_LEVEL_O", "AR.UF_CSLEVEL_O",
|
|
|
+ "AR.UF1_SSD_KMYC", "AR.UF2_SSD_KMYC", "AR.RO1_2D_YC", "AR.PUBLIC_BY_REAL_1",
|
|
|
+ "1#RO_CSFLOW"
|
|
|
+ ]
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def load_and_process_data(args, data):
|
|
|
+ """加载并处理数据,划分训练/验证/测试集"""
|
|
|
+ # 处理日期
|
|
|
+ data['date'] = pd.to_datetime(data['date'])
|
|
|
+ time_interval = pd.Timedelta(minutes=(4 * args.resolution / 60))
|
|
|
+ window_time_span = time_interval * (args.seq_len + 1)
|
|
|
+
|
|
|
+ val_start_date = pd.to_datetime(args.val_start_date)
|
|
|
+ test_start_date = pd.to_datetime(args.test_start_date)
|
|
|
+
|
|
|
+ # 调整时间窗口
|
|
|
+ adjusted_val_start = val_start_date - window_time_span
|
|
|
+ adjusted_test_start = test_start_date - window_time_span
|
|
|
+
|
|
|
+ train_mask = (data['date'] >= pd.to_datetime(args.train_start_date)) & \
|
|
|
+ (data['date'] <= pd.to_datetime(args.train_end_date))
|
|
|
+ val_mask = (data['date'] >= adjusted_val_start) & \
|
|
|
+ (data['date'] <= pd.to_datetime(args.val_end_date))
|
|
|
+ test_mask = (data['date'] >= adjusted_test_start) & \
|
|
|
+ (data['date'] <= pd.to_datetime(args.test_end_date))
|
|
|
+
|
|
|
+ train_data = data[train_mask].reset_index(drop=True)
|
|
|
+ val_data = data[val_mask].reset_index(drop=True)
|
|
|
+ test_data = data[test_mask].reset_index(drop=True)
|
|
|
+
|
|
|
+ train_data = train_data.drop(columns=['date'])
|
|
|
+ val_data = val_data.drop(columns=['date'])
|
|
|
+ test_data = test_data.drop(columns=['date'])
|
|
|
+
|
|
|
+ # 创建数据集
|
|
|
+ train_supervised = DataPreprocessor.create_supervised_dataset(args, train_data, 1)
|
|
|
+ val_supervised = DataPreprocessor.create_supervised_dataset(args, val_data, 1)
|
|
|
+ test_supervised = DataPreprocessor.create_supervised_dataset(args, test_data, args.step_size)
|
|
|
+
|
|
|
+ # 转换为DataLoader
|
|
|
+ train_loader = DataPreprocessor.load_data(args, train_supervised, shuffle=True)
|
|
|
+ val_loader = DataPreprocessor.load_data(args, val_supervised, shuffle=False)
|
|
|
+ test_loader = DataPreprocessor.load_data(args, test_supervised, shuffle=False)
|
|
|
+
|
|
|
+ return train_loader, val_loader, test_loader, data
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def read_and_combine_csv_files(args):
|
|
|
+ """读取文件并进行特征筛选和预处理"""
|
|
|
+ current_dir = os.path.dirname(__file__)
|
|
|
+ parent_dir = os.path.dirname(current_dir)
|
|
|
+ args.data_dir = os.path.join(parent_dir, args.data_dir)
|
|
|
+
|
|
|
+ def read_file(file_count):
|
|
|
+ file_name = args.file_pattern.format(file_count)
|
|
|
+ file_path = os.path.join(args.data_dir, file_name)
|
|
|
+ try:
|
|
|
+ df = pd.read_csv(file_path)
|
|
|
+ # 确保只读取需要的列,若列不存在则会报错提示
|
|
|
+ return df[DataPreprocessor.COLUMNS_TO_KEEP]
|
|
|
+ except KeyError as e:
|
|
|
+ print(f"文件 {file_name} 中缺少列: {e}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ file_indices = list(range(args.start_files, args.end_files + 1))
|
|
|
+ max_workers = os.cpu_count()
|
|
|
+
|
|
|
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
|
+ results = list(tqdm(executor.map(read_file, file_indices),
|
|
|
+ total=len(file_indices),
|
|
|
+ desc="正在读取文件"))
|
|
|
+
|
|
|
+ all_data = pd.concat(results, ignore_index=True)
|
|
|
+
|
|
|
+ # 确保列顺序一致
|
|
|
+ all_data = all_data[DataPreprocessor.COLUMNS_TO_KEEP]
|
|
|
+
|
|
|
+ # 下采样
|
|
|
+ chunk = all_data.iloc[::args.resolution, :].reset_index(drop=True)
|
|
|
+
|
|
|
+ # 处理特征
|
|
|
+ chunk = DataPreprocessor.process_date(chunk, args)
|
|
|
+ chunk = DataPreprocessor.scaler_data(chunk, args)
|
|
|
+
|
|
|
+ return chunk
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def process_date(data, args):
|
|
|
+ data = data.rename(columns={'index': 'date'})
|
|
|
+ data['date'] = pd.to_datetime(data['date'])
|
|
|
+
|
|
|
+ time_features = []
|
|
|
+ # 固定生成分钟级和日级特征,保持与Predictor一致
|
|
|
+ data['minute_of_day'] = data['date'].dt.hour * 60 + data['date'].dt.minute
|
|
|
+ data['minute_sin'] = np.sin(2 * np.pi * data['minute_of_day'] / 1440)
|
|
|
+ data['minute_cos'] = np.cos(2 * np.pi * data['minute_of_day'] / 1440)
|
|
|
+
|
|
|
+ data['day_of_year'] = data['date'].dt.dayofyear
|
|
|
+ data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366)
|
|
|
+ data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366)
|
|
|
+
|
|
|
+ time_features.extend(['minute_sin', 'minute_cos', 'day_year_sin', 'day_year_cos'])
|
|
|
+ data.drop(columns=['minute_of_day', 'day_of_year'], inplace=True)
|
|
|
+
|
|
|
+ other_columns = [col for col in data.columns if col not in ['date'] and col not in time_features]
|
|
|
+ data = data[['date'] + time_features + other_columns]
|
|
|
+ return data
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def scaler_data(data, args):
|
|
|
+ date_col = data[['date']]
|
|
|
+ data_to_scale = data.drop(columns=['date'])
|
|
|
+
|
|
|
+ scaler = MinMaxScaler(feature_range=(0, 1))
|
|
|
+ scaled_data = scaler.fit_transform(data_to_scale)
|
|
|
+ joblib.dump(scaler, args.scaler_path)
|
|
|
+
|
|
|
+ scaled_data = pd.DataFrame(scaled_data, columns=data_to_scale.columns)
|
|
|
+ scaled_data = pd.concat([date_col.reset_index(drop=True), scaled_data], axis=1)
|
|
|
+ return scaled_data
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def create_supervised_dataset(args, data, step_size):
|
|
|
+ data = pd.DataFrame(data)
|
|
|
+ cols = []
|
|
|
+ col_names = []
|
|
|
+ feature_columns = data.columns.tolist()
|
|
|
+
|
|
|
+ # 输入序列
|
|
|
+ for col in feature_columns:
|
|
|
+ for i in range(args.seq_len - 1, -1, -1):
|
|
|
+ cols.append(data[[col]].shift(i))
|
|
|
+ col_names.append(f"{col}(t-{i})")
|
|
|
+
|
|
|
+ # 目标序列 (取最后labels_num列)
|
|
|
+ target_columns = feature_columns[-args.labels_num:]
|
|
|
+ for i in range(1, args.output_size + 1):
|
|
|
+ for col in target_columns:
|
|
|
+ cols.append(data[[col]].shift(-i))
|
|
|
+ col_names.append(f"{col}(t+{i})")
|
|
|
+
|
|
|
+ dataset = pd.concat(cols, axis=1)
|
|
|
+ dataset.columns = col_names
|
|
|
+ dataset = dataset.iloc[::step_size, :]
|
|
|
+ dataset.dropna(inplace=True)
|
|
|
+ return dataset
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def load_data(args, dataset, shuffle):
|
|
|
+ input_length = args.seq_len
|
|
|
+ n_features = args.feature_num
|
|
|
+ labels_num = args.labels_num
|
|
|
+
|
|
|
+ n_features_total = n_features * input_length
|
|
|
+ n_labels_total = args.output_size * labels_num
|
|
|
+
|
|
|
+ X = dataset.values[:, :n_features_total]
|
|
|
+ y = dataset.values[:, n_features_total:n_features_total + n_labels_total]
|
|
|
+
|
|
|
+ X = X.reshape(X.shape[0], input_length, n_features)
|
|
|
+ X = torch.tensor(X, dtype=torch.float32).to(args.device)
|
|
|
+ y = torch.tensor(y, dtype=torch.float32).to(args.device)
|
|
|
+
|
|
|
+ dataset_tensor = TensorDataset(X, y)
|
|
|
+ generator = torch.Generator()
|
|
|
+ generator.manual_seed(args.random_seed)
|
|
|
+
|
|
|
+ return DataLoader(dataset_tensor, batch_size=args.batch_size, shuffle=shuffle, generator=generator)
|