data_preprocessor.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. # data_preprocessor.py
  2. import os
  3. import torch
  4. import joblib
  5. import numpy as np
  6. import pandas as pd
  7. from tqdm import tqdm # 进度条显示
  8. from sklearn.preprocessing import MinMaxScaler # 数据归一化工具
  9. from torch.utils.data import DataLoader, TensorDataset # PyTorch数据加载工具
  10. from concurrent.futures import ThreadPoolExecutor # 多线程读取文件
  11. class DataPreprocessor:
  12. """数据预处理类,负责数据加载、划分、转换为模型可输入的格式"""
  13. @staticmethod
  14. def load_and_process_data(args, data):
  15. """
  16. 加载并处理数据,划分训练/验证/测试集,创建数据加载器
  17. 参数:
  18. args: 配置参数(包含数据集划分日期、序列长度等)
  19. data: 预处理后的完整数据(含日期列)
  20. 返回:
  21. train_loader: 训练集数据加载器
  22. val_loader: 验证集数据加载器
  23. test_loader: 测试集数据加载器
  24. data: 原始数据(用于后续处理)
  25. """
  26. # 处理日期列
  27. data['date'] = pd.to_datetime(data['date'])
  28. time_interval = pd.Timedelta(minutes=(4 * args.resolution / 60))
  29. window_time_span = time_interval * (args.seq_len + 1)
  30. # 划分训练/验证/测试集(调整起始日期以适应滑动窗口)
  31. val_start_date = pd.to_datetime(args.val_start_date)
  32. test_start_date = pd.to_datetime(args.test_start_date)
  33. # 调整验证集/测试集起始时间(向前推一个窗口,确保有足够历史数据构建输入序列)
  34. adjusted_val_start = val_start_date - window_time_span
  35. adjusted_test_start = test_start_date - window_time_span
  36. # 构建数据集掩码(按日期筛选)
  37. train_mask = (data['date'] >= pd.to_datetime(args.train_start_date)) & \
  38. (data['date'] <= pd.to_datetime(args.train_end_date))
  39. val_mask = (data['date'] >= adjusted_val_start) & \
  40. (data['date'] <= pd.to_datetime(args.val_end_date))
  41. test_mask = (data['date'] >= adjusted_test_start) & \
  42. (data['date'] <= pd.to_datetime(args.test_end_date))
  43. # 筛选数据并重置索引
  44. train_data = data[train_mask].reset_index(drop=True)
  45. val_data = data[val_mask].reset_index(drop=True)
  46. test_data = data[test_mask].reset_index(drop=True)
  47. # 移除日期列用于建模
  48. train_data = train_data.drop(columns=['date'])
  49. val_data = val_data.drop(columns=['date'])
  50. test_data = test_data.drop(columns=['date'])
  51. # 创建监督学习数据集(输入序列+目标序列)
  52. train_supervised = DataPreprocessor.create_supervised_dataset(
  53. args,
  54. train_data,
  55. 1
  56. )
  57. val_supervised = DataPreprocessor.create_supervised_dataset(
  58. args,
  59. val_data,
  60. 1
  61. )
  62. test_supervised = DataPreprocessor.create_supervised_dataset(
  63. args,
  64. test_data,
  65. args.step_size
  66. )
  67. # 转换为DataLoader
  68. train_loader = DataPreprocessor.load_data(
  69. args,
  70. train_supervised,
  71. shuffle=True
  72. )
  73. val_loader = DataPreprocessor.load_data(
  74. args,
  75. val_supervised,
  76. shuffle=False
  77. )
  78. test_loader = DataPreprocessor.load_data(
  79. args,
  80. test_supervised,
  81. shuffle=False
  82. )
  83. return train_loader, val_loader, test_loader, data # 返回原始数据用于后续处理
  84. @staticmethod
  85. def read_and_combine_csv_files(args):
  86. """
  87. 多线程读取并合并多个CSV文件,进行下采样、日期处理和归一化
  88. 参数:
  89. args: 配置参数(包含数据路径、文件范围等)
  90. 返回:
  91. chunk: 预处理后的合并数据(含日期和归一化特征)
  92. """
  93. current_dir = os.path.dirname(__file__)
  94. parent_dir = os.path.dirname(current_dir)
  95. args.data_dir = os.path.join(parent_dir, args.data_dir)
  96. def read_file(file_count):
  97. """读取单个CSV文件的函数(供多线程调用)"""
  98. file_name = args.file_pattern.format(file_count)
  99. file_path = os.path.join(args.data_dir, file_name)
  100. return pd.read_csv(file_path)
  101. # 生成待读取的文件索引列表
  102. file_indices = list(range(args.start_files, args.end_files + 1))
  103. # 多线程读取文件(加速大文件读取)
  104. max_workers = os.cpu_count()
  105. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  106. results = list(tqdm(executor.map(read_file, file_indices),
  107. total=len(file_indices),
  108. desc="正在读取文件"))
  109. all_data = pd.concat(results, ignore_index=True)
  110. # 按分辨率下采样
  111. chunk = all_data.iloc[::args.resolution, :].reset_index(drop=True)
  112. # 处理日期和时间特征
  113. chunk = DataPreprocessor.process_date(chunk)
  114. # 归一化
  115. chunk = DataPreprocessor.scaler_data(chunk)
  116. return chunk
  117. @staticmethod
  118. def process_date(data):
  119. """
  120. 处理日期列,生成周期性时间特征(与Predictor中的方法一致,保证一致性)
  121. 参数:
  122. data: 含'index'列(原始日期)的DataFrame
  123. 返回:
  124. data: 处理后的DataFrame(含日期列和时间特征)
  125. """
  126. data = data.rename(columns={'index': 'date'})
  127. data['date'] = pd.to_datetime(data['date'])
  128. # 生成周期性时间特征
  129. data['minute_of_day'] = data['date'].dt.hour * 60 + data['date'].dt.minute
  130. data['day_of_year'] = data['date'].dt.dayofyear
  131. # 周期性编码(正弦/余弦转换,确保时间连续性)
  132. data['minute_sin'] = np.sin(2 * np.pi * data['minute_of_day'] / 1440)
  133. data['minute_cos'] = np.cos(2 * np.pi * data['minute_of_day'] / 1440)
  134. data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366)
  135. data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366)
  136. # 移除原始时间列,保留特征列
  137. data.drop(columns=['minute_of_day', 'day_of_year'], inplace=True)
  138. # 调整列顺序(日期+时间特征+其他特征)
  139. time_features = ['minute_sin', 'minute_cos', 'day_year_sin', 'day_year_cos']
  140. other_columns = [col for col in data.columns if col not in ['date'] and col not in time_features]
  141. data = data[['date'] + time_features + other_columns]
  142. return data
  143. @staticmethod
  144. def scaler_data(data):
  145. """
  146. 对数据进行归一化(0-1缩放),并保存归一化器(供预测时反归一化)
  147. 参数:
  148. data: 含'date'列和特征列的DataFrame
  149. 返回:
  150. scaled_data: 归一化后的DataFrame(含日期列)
  151. """
  152. date_col = data[['date']]
  153. data_to_scale = data.drop(columns=['date'])
  154. scaler = MinMaxScaler(feature_range=(0, 1))
  155. scaled_data = scaler.fit_transform(data_to_scale)
  156. joblib.dump(scaler, 'scaler.pkl') # 保存归一化器
  157. # 转换为DataFrame并拼接日期列
  158. scaled_data = pd.DataFrame(scaled_data, columns=data_to_scale.columns)
  159. scaled_data = pd.concat([date_col.reset_index(drop=True), scaled_data], axis=1)
  160. return scaled_data
  161. @staticmethod
  162. def create_supervised_dataset(args, data, step_size):
  163. """
  164. 创建监督学习数据集(输入序列+目标序列)
  165. 输入序列:历史seq_len个时间步的所有特征
  166. 目标序列:未来output_size个时间步的标签特征(最后labels_num列)
  167. 参数:
  168. args: 配置参数(含seq_len、output_size等)
  169. data: 输入数据(不含日期列的特征数据)
  170. step_size: 采样步长(每隔step_size取一个样本)
  171. 返回:
  172. dataset: 监督学习数据集(DataFrame)
  173. """
  174. data = pd.DataFrame(data)
  175. cols = []
  176. col_names = []
  177. feature_columns = data.columns.tolist()
  178. # 输入序列(t-0到t-(seq_len-1))
  179. for col in feature_columns:
  180. for i in range(args.seq_len - 1, -1, -1):
  181. cols.append(data[[col]].shift(i))
  182. col_names.append(f"{col}(t-{i})")
  183. # 目标序列(仅取最后labels_num列作为预测目标)
  184. target_columns = feature_columns[-args.labels_num:]
  185. for i in range(1, args.output_size + 1):
  186. for col in target_columns:
  187. cols.append(data[[col]].shift(-i))
  188. col_names.append(f"{col}(t+{i})")
  189. # 合并并清洗数据
  190. dataset = pd.concat(cols, axis=1)
  191. dataset.columns = col_names
  192. dataset = dataset.iloc[::step_size, :] # 按步长采样
  193. dataset.dropna(inplace=True) # 移除含缺失值的行
  194. return dataset
  195. @staticmethod
  196. def load_data(args, dataset, shuffle):
  197. """
  198. 将监督学习数据集转换为PyTorch张量,并创建DataLoader
  199. 参数:
  200. args: 配置参数(含特征数、批大小等)
  201. dataset: 监督学习数据集(DataFrame)
  202. shuffle: 是否打乱数据(训练集True,验证/测试集False)
  203. 返回:
  204. data_loader: PyTorch DataLoader
  205. """
  206. input_length = args.seq_len
  207. n_features = args.feature_num
  208. labels_num = args.labels_num
  209. n_features_total = n_features * input_length # 输入特征总维度
  210. n_labels_total = args.output_size * labels_num # 目标总维度
  211. # 分割输入和目标
  212. X = dataset.values[:, :n_features_total]
  213. y = dataset.values[:, n_features_total:n_features_total + n_labels_total]
  214. # 重塑输入为[样本数, 序列长度, 特征数]
  215. X = X.reshape(X.shape[0], input_length, n_features)
  216. X = torch.tensor(X, dtype=torch.float32).to(args.device)
  217. y = torch.tensor(y, dtype=torch.float32).to(args.device)
  218. # 创建数据集和数据加载器
  219. dataset_tensor = TensorDataset(X, y)
  220. generator = torch.Generator()
  221. generator.manual_seed(args.random_seed) # 固定随机种子确保可复现
  222. data_loader = DataLoader(
  223. dataset_tensor,
  224. batch_size=args.batch_size,
  225. shuffle=shuffle,
  226. generator=generator
  227. )
  228. return data_loader