data_preprocessor.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. import os
  2. import pandas as pd
  3. import numpy as np
  4. import pywt
  5. import logging
  6. from sklearn.preprocessing import StandardScaler
  7. from sklearn.model_selection import train_test_split
  8. import torch
  9. import joblib
  10. from torch.utils.data import TensorDataset, DataLoader
  11. class DataPreprocessor:
  12. def __init__(self, args, logger=None):
  13. self.args = args
  14. self.data_dir = args.data_dir
  15. self.num_files = args.num_files
  16. self.scaler_features = StandardScaler()
  17. self.scaler_targets = StandardScaler()
  18. self.logger = logger if logger is not None else self._default_logger()
  19. self.features = None # 保存特征数据用于构建邻接矩阵
  20. self.scaler_dir = 'scalers'
  21. os.makedirs(self.scaler_dir, exist_ok=True)
  22. self.features_scaler_path = os.path.join(self.scaler_dir, 'features_scaler.joblib')
  23. self.targets_scaler_path = os.path.join(self.scaler_dir, 'targets_scaler.joblib')
  24. def _default_logger(self):
  25. """默认日志记录器"""
  26. logger = logging.getLogger('DataPreprocessor')
  27. logger.setLevel(logging.INFO)
  28. console_handler = logging.StreamHandler()
  29. console_handler.setLevel(logging.INFO)
  30. formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  31. console_handler.setFormatter(formatter)
  32. logger.addHandler(console_handler)
  33. return logger
  34. def load_data(self):
  35. """加载所有数据文件并合并"""
  36. all_data = []
  37. for i in range(1, self.num_files + 1):
  38. file_path = os.path.join(self.data_dir, f'data_process_{i}.csv')
  39. try:
  40. df = pd.read_csv(file_path, index_col=0)
  41. df = df.reset_index() # 将原索引作为第一列
  42. all_data.append(df)
  43. self.logger.info(f"Loaded file {i}/{self.num_files}")
  44. except Exception as e:
  45. self.logger.error(f"Error loading file {i}: {e}")
  46. combined_df = pd.concat(all_data, ignore_index=True)
  47. return combined_df
  48. def decompose_time(self, df):
  49. """将时间列分解为年、月、日、时、分、秒"""
  50. time_col = df.columns[0]
  51. df[time_col] = pd.to_datetime(df[time_col])
  52. df['year'] = df[time_col].dt.year
  53. df['month'] = df[time_col].dt.month
  54. df['day'] = df[time_col].dt.day
  55. df['hour'] = df[time_col].dt.hour
  56. df['minute'] = df[time_col].dt.minute
  57. df['second'] = df[time_col].dt.second
  58. df = df.drop(columns=[time_col])
  59. # 调整列顺序
  60. time_features = ['year', 'month', 'day', 'hour', 'minute', 'second']
  61. other_features = [col for col in df.columns if col not in time_features]
  62. df = df[time_features + other_features]
  63. return df
  64. def wavelet_denoising(self, data, wavelet='db4', level=1):
  65. """对数据进行小波降噪,避免除以零警告"""
  66. denoised_data = np.zeros_like(data)
  67. epsilon = 1e-10 # 极小值,避免除以零
  68. for i in range(data.shape[1]):
  69. # 小波分解
  70. coeffs = pywt.wavedec(data[:, i], wavelet, level=level)
  71. # 计算阈值时避免系数为零
  72. sigma = np.median(np.abs(coeffs[-level] + epsilon)) / 0.6745 # 加epsilon
  73. original_length = len(data[:, i])
  74. threshold = sigma * np.sqrt(2 * np.log(original_length))
  75. # 对系数进行阈值处理(手动实现软阈值,避免库函数警告)
  76. processed_coeffs = []
  77. for c in coeffs[1:]:
  78. # 手动计算软阈值:y = sign(x) * max(|x| - threshold, 0)
  79. magnitude = np.abs(c)
  80. # 避免除以零:给magnitude加epsilon
  81. thresholded = np.where(
  82. magnitude > threshold,
  83. np.sign(c) * (magnitude - threshold),
  84. 0
  85. )
  86. processed_coeffs.append(thresholded)
  87. coeffs[1:] = processed_coeffs
  88. # 小波重构(保持之前的长度对齐处理)
  89. reconstructed = pywt.waverec(coeffs, wavelet)
  90. # 补充之前的长度对齐逻辑(如果之前已添加)
  91. original_length = data[:, i].shape[0]
  92. if len(reconstructed) > original_length:
  93. reconstructed = reconstructed[:original_length]
  94. elif len(reconstructed) < original_length:
  95. reconstructed = np.pad(reconstructed, (0, original_length - len(reconstructed)), mode='edge')
  96. denoised_data[:, i] = reconstructed
  97. return denoised_data
  98. def normalize_data(self, features, targets):
  99. """归一化数据并保存scaler"""
  100. features_scaled = self.scaler_features.fit_transform(features)
  101. targets_scaled = self.scaler_targets.fit_transform(targets)
  102. # 保存scaler
  103. joblib.dump(self.scaler_features, self.features_scaler_path)
  104. joblib.dump(self.scaler_targets, self.targets_scaler_path)
  105. self.logger.info(f"已保存特征归一化模型到 {self.features_scaler_path}")
  106. self.logger.info(f"已保存目标归一化模型到 {self.targets_scaler_path}")
  107. return features_scaled, targets_scaled
  108. def inverse_transform_targets(self, targets_scaled):
  109. """将归一化的目标变量反变换回原始尺度"""
  110. return self.scaler_targets.inverse_transform(targets_scaled)
  111. def split_data(self, features, targets):
  112. """划分训练集、验证集和测试集"""
  113. X_train, X_temp, y_train, y_temp = train_test_split(
  114. features, targets, test_size=self.args.test_ratio + self.args.val_ratio,
  115. shuffle=False
  116. )
  117. test_size = self.args.test_ratio / (self.args.test_ratio + self.args.val_ratio)
  118. X_val, X_test, y_val, y_test = train_test_split(
  119. X_temp, y_temp, test_size=test_size, shuffle=False
  120. )
  121. return X_train, X_val, X_test, y_train, y_val, y_test
  122. def create_dataloaders(self, X_train, X_val, X_test, y_train, y_val, y_test):
  123. """创建DataLoader"""
  124. X_train = torch.FloatTensor(X_train)
  125. y_train = torch.FloatTensor(y_train)
  126. X_val = torch.FloatTensor(X_val)
  127. y_val = torch.FloatTensor(y_val)
  128. X_test = torch.FloatTensor(X_test)
  129. y_test = torch.FloatTensor(y_test)
  130. train_dataset = TensorDataset(X_train, y_train)
  131. val_dataset = TensorDataset(X_val, y_val)
  132. test_dataset = TensorDataset(X_test, y_test)
  133. train_loader = DataLoader(
  134. train_dataset, batch_size=self.args.batch_size, shuffle=True
  135. )
  136. val_loader = DataLoader(
  137. val_dataset, batch_size=self.args.batch_size, shuffle=False
  138. )
  139. test_loader = DataLoader(
  140. test_dataset, batch_size=self.args.batch_size, shuffle=False
  141. )
  142. return train_loader, val_loader, test_loader
  143. def preprocess(self):
  144. """完整的预处理流程"""
  145. df = self.load_data()
  146. self.logger.info(f"Original data shape: {df.shape}")
  147. df = self.decompose_time(df)
  148. self.logger.info(f"Data shape after time decomposition: {df.shape}")
  149. data = df.values
  150. data_denoised = self.wavelet_denoising(data)
  151. self.logger.info(f"Data shape after wavelet denoising: {data_denoised.shape}")
  152. # 保存特征数据用于构建邻接矩阵
  153. self.features = data_denoised[:, :self.args.num_features]
  154. targets = data_denoised[:, self.args.num_features:self.args.num_features+self.args.num_targets]
  155. self.logger.info(f"Features shape: {self.features.shape}, Targets shape: {targets.shape}")
  156. features_scaled, targets_scaled = self.normalize_data(self.features, targets)
  157. X_train, X_val, X_test, y_train, y_val, y_test = self.split_data(
  158. features_scaled, targets_scaled
  159. )
  160. self.logger.info(f"Train: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}")
  161. train_loader, val_loader, test_loader = self.create_dataloaders(
  162. X_train, X_val, X_test, y_train, y_val, y_test
  163. )
  164. return train_loader, val_loader, test_loader, self
  165. def create_adjacency_matrix(self):
  166. """创建有向图的邻接矩阵(基于特征相关性)"""
  167. num_nodes = self.args.num_features
  168. adj = torch.zeros((num_nodes, num_nodes))
  169. if self.features is None:
  170. self.logger.warning("特征数据未初始化,使用默认邻接矩阵")
  171. # 默认自连接
  172. for i in range(num_nodes):
  173. adj[i, i] = 1
  174. return adj
  175. # 计算特征之间的相关性
  176. corr_matrix = np.corrcoef(self.features.T)
  177. corr_threshold = 0.3 # 相关性阈值
  178. # 基于相关性构建有向边
  179. for i in range(num_nodes):
  180. adj[i, i] = 1 # 自连接
  181. for j in range(num_nodes):
  182. if i != j and abs(corr_matrix[i, j]) > corr_threshold:
  183. adj[i, j] = 1
  184. self.logger.info(f"邻接矩阵中边的数量: {int(torch.sum(adj))}")
  185. return adj