data_preprocessor.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. # data_preprocessor.py
  2. import os
  3. import torch
  4. import joblib
  5. import numpy as np
  6. import pandas as pd
  7. from tqdm import tqdm
  8. from sklearn.preprocessing import MinMaxScaler
  9. from torch.utils.data import DataLoader, TensorDataset
  10. from concurrent.futures import ThreadPoolExecutor
  11. class DataPreprocessor:
  12. @staticmethod
  13. def load_and_process_data(args, data):
  14. # 处理日期列
  15. data['date'] = pd.to_datetime(data['date'])
  16. time_interval = pd.Timedelta(hours=(args.resolution / 60))
  17. window_time_span = time_interval * (args.seq_len + 314)
  18. # 划分训练/验证/测试集(调整起始日期以适应滑动窗口)
  19. val_start_date = pd.to_datetime(args.val_start_date)
  20. test_start_date = pd.to_datetime(args.test_start_date)
  21. adjusted_val_start = val_start_date - window_time_span
  22. adjusted_test_start = test_start_date - window_time_span
  23. train_mask = (data['date'] >= pd.to_datetime(args.train_start_date)) & \
  24. (data['date'] <= pd.to_datetime(args.train_end_date))
  25. val_mask = (data['date'] >= adjusted_val_start) & \
  26. (data['date'] <= pd.to_datetime(args.val_end_date))
  27. test_mask = (data['date'] >= adjusted_test_start) & \
  28. (data['date'] <= pd.to_datetime(args.test_end_date))
  29. train_data = data[train_mask].reset_index(drop=True)
  30. val_data = data[val_mask].reset_index(drop=True)
  31. test_data = data[test_mask].reset_index(drop=True)
  32. # 移除日期列用于建模
  33. train_data = train_data.drop(columns=['date'])
  34. val_data = val_data.drop(columns=['date'])
  35. test_data = test_data.drop(columns=['date'])
  36. # 创建监督学习数据集(输入序列+目标序列)
  37. train_supervised = DataPreprocessor.create_supervised_dataset(
  38. args,
  39. train_data,
  40. 1
  41. )
  42. val_supervised = DataPreprocessor.create_supervised_dataset(
  43. args,
  44. val_data,
  45. 1
  46. )
  47. test_supervised = DataPreprocessor.create_supervised_dataset(
  48. args,
  49. test_data,
  50. args.step_size
  51. )
  52. # 转换为DataLoader
  53. train_loader = DataPreprocessor.load_data(
  54. args,
  55. train_supervised,
  56. shuffle=True
  57. )
  58. val_loader = DataPreprocessor.load_data(
  59. args,
  60. val_supervised,
  61. shuffle=False
  62. )
  63. test_loader = DataPreprocessor.load_data(
  64. args,
  65. test_supervised,
  66. shuffle=False
  67. )
  68. return train_loader, val_loader, test_loader, data # 返回原始数据用于后续处理
  69. @staticmethod
  70. def read_and_combine_csv_files(args):
  71. # 读取并合并多个CSV文件
  72. current_dir = os.path.dirname(__file__)
  73. parent_dir = os.path.dirname(current_dir)
  74. args.data_dir = os.path.join(parent_dir, args.data_dir)
  75. def read_file(file_count):
  76. file_name = args.file_pattern.format(file_count)
  77. file_path = os.path.join(args.data_dir, file_name)
  78. return pd.read_csv(file_path)
  79. file_indices = list(range(args.start_files, args.end_files + 1))
  80. max_workers = os.cpu_count()
  81. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  82. results = list(tqdm(executor.map(read_file, file_indices),
  83. total=len(file_indices),
  84. desc="正在读取文件"))
  85. all_data = pd.concat(results, ignore_index=True)
  86. # 按分辨率下采样
  87. chunk = all_data.iloc[::args.resolution, :].reset_index(drop=True)
  88. # 处理日期和时间特征
  89. chunk = DataPreprocessor.process_date(chunk)
  90. # 归一化
  91. chunk = DataPreprocessor.scaler_data(chunk)
  92. return chunk
  93. @staticmethod
  94. def process_date(data):
  95. # 处理日期列并生成时间特征
  96. data = data.rename(columns={'index': 'date'})
  97. data['date'] = pd.to_datetime(data['date'])
  98. # 生成周期性时间特征
  99. data['day_of_year'] = data['date'].dt.dayofyear
  100. data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366)
  101. data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366)
  102. # 移除原始时间列,保留特征列
  103. data.drop(columns=['day_of_year'], inplace=True)
  104. # 调整列顺序(日期+时间特征+其他特征)
  105. time_features = ['day_year_sin', 'day_year_cos']
  106. other_columns = [col for col in data.columns if col not in ['date'] and col not in time_features]
  107. data = data[['date'] + time_features + other_columns]
  108. return data
  109. @staticmethod
  110. def scaler_data(data):
  111. # 归一化数据(除日期外)
  112. date_col = data[['date']]
  113. data_to_scale = data.drop(columns=['date'])
  114. scaler = MinMaxScaler(feature_range=(0, 1))
  115. scaled_data = scaler.fit_transform(data_to_scale)
  116. joblib.dump(scaler, 'scaler.pkl') # 保存归一化器
  117. scaled_data = pd.DataFrame(scaled_data, columns=data_to_scale.columns)
  118. scaled_data = pd.concat([date_col.reset_index(drop=True), scaled_data], axis=1)
  119. return scaled_data
  120. @staticmethod
  121. def create_supervised_dataset(args, data, step_size):
  122. # 创建监督学习数据集(输入序列+目标序列)
  123. data = pd.DataFrame(data)
  124. cols = []
  125. col_names = []
  126. feature_columns = data.columns.tolist()
  127. # 输入序列(t-0到t-(seq_len-1))
  128. for col in feature_columns:
  129. for i in range(args.seq_len - 1, -1, -1):
  130. cols.append(data[[col]].shift(i))
  131. col_names.append(f"{col}(t-{i})")
  132. # 目标序列(仅取最后labels_num列作为预测目标)
  133. target_columns = feature_columns[-args.labels_num:]
  134. for i in range(1, args.output_size + 1):
  135. for col in target_columns:
  136. cols.append(data[[col]].shift(-i))
  137. col_names.append(f"{col}(t+{i})")
  138. # 合并并清洗数据
  139. dataset = pd.concat(cols, axis=1)
  140. dataset.columns = col_names
  141. dataset = dataset.iloc[::step_size, :] # 按步长采样
  142. dataset.dropna(inplace=True) # 移除含缺失值的行
  143. return dataset
  144. @staticmethod
  145. def load_data(args, dataset, shuffle):
  146. # 将数据集转换为PyTorch张量并创建DataLoader
  147. input_length = args.seq_len
  148. n_features = args.feature_num
  149. labels_num = args.labels_num
  150. n_features_total = n_features * input_length # 输入特征总维度
  151. n_labels_total = args.output_size * labels_num # 目标总维度
  152. # 分割输入和目标
  153. X = dataset.values[:, :n_features_total]
  154. y = dataset.values[:, n_features_total:n_features_total + n_labels_total]
  155. # 重塑输入为[样本数, 序列长度, 特征数]
  156. X = X.reshape(X.shape[0], input_length, n_features)
  157. X = torch.tensor(X, dtype=torch.float32).to(args.device)
  158. y = torch.tensor(y, dtype=torch.float32).to(args.device)
  159. # 创建数据集和数据加载器
  160. dataset_tensor = TensorDataset(X, y)
  161. generator = torch.Generator()
  162. generator.manual_seed(args.random_seed) # 固定随机种子确保可复现
  163. data_loader = DataLoader(
  164. dataset_tensor,
  165. batch_size=args.batch_size,
  166. shuffle=shuffle,
  167. generator=generator
  168. )
  169. return data_loader