فهرست منبع

数据预处理文件

zhanghao 4 ماه پیش
والد
کامیت
868b216ef5
1فایلهای تغییر یافته به همراه226 افزوده شده و 226 حذف شده
  1. 226 226
      models/causal-inference/data_preprocessor.py

+ 226 - 226
models/causal-inference/data_preprocessor.py

@@ -1,227 +1,227 @@
-import os
-import pandas as pd
-import numpy as np
-import pywt
-import logging
-from sklearn.preprocessing import StandardScaler
-from sklearn.model_selection import train_test_split
-import torch
-import joblib
-from torch.utils.data import TensorDataset, DataLoader
-
-class DataPreprocessor:
-    def __init__(self, args, logger=None):
-        self.args = args
-        self.data_dir = args.data_dir
-        self.num_files = args.num_files
-        self.scaler_features = StandardScaler()
-        self.scaler_targets = StandardScaler()
-        self.logger = logger if logger is not None else self._default_logger()
-        self.features = None  # 保存特征数据用于构建邻接矩阵
-        self.scaler_dir = 'scalers'
-        os.makedirs(self.scaler_dir, exist_ok=True)
-        self.features_scaler_path = os.path.join(self.scaler_dir, 'features_scaler.joblib')
-        self.targets_scaler_path = os.path.join(self.scaler_dir, 'targets_scaler.joblib')
-        
-    def _default_logger(self):
-        """默认日志记录器"""
-        logger = logging.getLogger('DataPreprocessor')
-        logger.setLevel(logging.INFO)
-        console_handler = logging.StreamHandler()
-        console_handler.setLevel(logging.INFO)
-        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-        console_handler.setFormatter(formatter)
-        logger.addHandler(console_handler)
-        return logger
-    
-    def load_data(self):
-        """加载所有数据文件并合并"""
-        all_data = []
-        
-        for i in range(1, self.num_files + 1):
-            file_path = os.path.join(self.data_dir, f'data_process_{i}.csv')
-            try:
-                df = pd.read_csv(file_path, index_col=0)
-                df = df.reset_index()  # 将原索引作为第一列
-                all_data.append(df)
-                self.logger.info(f"Loaded file {i}/{self.num_files}")
-            except Exception as e:
-                self.logger.error(f"Error loading file {i}: {e}")
-        
-        combined_df = pd.concat(all_data, ignore_index=True)
-        return combined_df
-    
-    def decompose_time(self, df):
-        """将时间列分解为年、月、日、时、分、秒"""
-        time_col = df.columns[0]
-        df[time_col] = pd.to_datetime(df[time_col])
-        
-        df['year'] = df[time_col].dt.year
-        df['month'] = df[time_col].dt.month
-        df['day'] = df[time_col].dt.day
-        df['hour'] = df[time_col].dt.hour
-        df['minute'] = df[time_col].dt.minute
-        df['second'] = df[time_col].dt.second
-        
-        df = df.drop(columns=[time_col])
-        
-        # 调整列顺序
-        time_features = ['year', 'month', 'day', 'hour', 'minute', 'second']
-        other_features = [col for col in df.columns if col not in time_features]
-        df = df[time_features + other_features]
-        
-        return df
-    
-    def wavelet_denoising(self, data, wavelet='db4', level=1):
-        """对数据进行小波降噪,避免除以零警告"""
-        denoised_data = np.zeros_like(data)
-        epsilon = 1e-10  # 极小值,避免除以零
-        
-        for i in range(data.shape[1]):
-            # 小波分解
-            coeffs = pywt.wavedec(data[:, i], wavelet, level=level)
-            
-            # 计算阈值时避免系数为零
-            sigma = np.median(np.abs(coeffs[-level] + epsilon)) / 0.6745  # 加epsilon
-            original_length = len(data[:, i])
-            threshold = sigma * np.sqrt(2 * np.log(original_length))
-            
-            # 对系数进行阈值处理(手动实现软阈值,避免库函数警告)
-            processed_coeffs = []
-            for c in coeffs[1:]:
-                # 手动计算软阈值:y = sign(x) * max(|x| - threshold, 0)
-                magnitude = np.abs(c)
-                # 避免除以零:给magnitude加epsilon
-                thresholded = np.where(
-                    magnitude > threshold,
-                    np.sign(c) * (magnitude - threshold),
-                    0
-                )
-                processed_coeffs.append(thresholded)
-            
-            coeffs[1:] = processed_coeffs
-            
-            # 小波重构(保持之前的长度对齐处理)
-            reconstructed = pywt.waverec(coeffs, wavelet)
-            # 补充之前的长度对齐逻辑(如果之前已添加)
-            original_length = data[:, i].shape[0]
-            if len(reconstructed) > original_length:
-                reconstructed = reconstructed[:original_length]
-            elif len(reconstructed) < original_length:
-                reconstructed = np.pad(reconstructed, (0, original_length - len(reconstructed)), mode='edge')
-            
-            denoised_data[:, i] = reconstructed
-        
-        return denoised_data
-    
-    def normalize_data(self, features, targets):
-        """归一化数据并保存scaler"""
-        features_scaled = self.scaler_features.fit_transform(features)
-        targets_scaled = self.scaler_targets.fit_transform(targets)
-        
-        # 保存scaler
-        joblib.dump(self.scaler_features, self.features_scaler_path)
-        joblib.dump(self.scaler_targets, self.targets_scaler_path)
-        self.logger.info(f"已保存特征归一化模型到 {self.features_scaler_path}")
-        self.logger.info(f"已保存目标归一化模型到 {self.targets_scaler_path}")
-        
-        return features_scaled, targets_scaled
-    
-    def inverse_transform_targets(self, targets_scaled):
-        """将归一化的目标变量反变换回原始尺度"""
-        return self.scaler_targets.inverse_transform(targets_scaled)
-    
-    def split_data(self, features, targets):
-        """划分训练集、验证集和测试集"""
-        X_train, X_temp, y_train, y_temp = train_test_split(
-            features, targets, test_size=self.args.test_ratio + self.args.val_ratio, 
-            shuffle=False
-        )
-        
-        test_size = self.args.test_ratio / (self.args.test_ratio + self.args.val_ratio)
-        X_val, X_test, y_val, y_test = train_test_split(
-            X_temp, y_temp, test_size=test_size, shuffle=False
-        )
-        
-        return X_train, X_val, X_test, y_train, y_val, y_test
-    
-    def create_dataloaders(self, X_train, X_val, X_test, y_train, y_val, y_test):
-        """创建DataLoader"""
-        X_train = torch.FloatTensor(X_train)
-        y_train = torch.FloatTensor(y_train)
-        X_val = torch.FloatTensor(X_val)
-        y_val = torch.FloatTensor(y_val)
-        X_test = torch.FloatTensor(X_test)
-        y_test = torch.FloatTensor(y_test)
-        
-        train_dataset = TensorDataset(X_train, y_train)
-        val_dataset = TensorDataset(X_val, y_val)
-        test_dataset = TensorDataset(X_test, y_test)
-        
-        train_loader = DataLoader(
-            train_dataset, batch_size=self.args.batch_size, shuffle=True
-        )
-        val_loader = DataLoader(
-            val_dataset, batch_size=self.args.batch_size, shuffle=False
-        )
-        test_loader = DataLoader(
-            test_dataset, batch_size=self.args.batch_size, shuffle=False
-        )
-        
-        return train_loader, val_loader, test_loader
-    
-    def preprocess(self):
-        """完整的预处理流程"""
-        df = self.load_data()
-        self.logger.info(f"Original data shape: {df.shape}")
-        
-        df = self.decompose_time(df)
-        self.logger.info(f"Data shape after time decomposition: {df.shape}")
-        
-        data = df.values
-        data_denoised = self.wavelet_denoising(data)
-        self.logger.info(f"Data shape after wavelet denoising: {data_denoised.shape}")
-        
-        # 保存特征数据用于构建邻接矩阵
-        self.features = data_denoised[:, :self.args.num_features]
-        targets = data_denoised[:, self.args.num_features:self.args.num_features+self.args.num_targets]
-        self.logger.info(f"Features shape: {self.features.shape}, Targets shape: {targets.shape}")
-        
-        features_scaled, targets_scaled = self.normalize_data(self.features, targets)
-        
-        X_train, X_val, X_test, y_train, y_val, y_test = self.split_data(
-            features_scaled, targets_scaled
-        )
-        self.logger.info(f"Train: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}")
-        
-        train_loader, val_loader, test_loader = self.create_dataloaders(
-            X_train, X_val, X_test, y_train, y_val, y_test
-        )
-        
-        return train_loader, val_loader, test_loader, self
-    
-    def create_adjacency_matrix(self):
-        """创建有向图的邻接矩阵(基于特征相关性)"""
-        num_nodes = self.args.num_features
-        adj = torch.zeros((num_nodes, num_nodes))
-        
-        if self.features is None:
-            self.logger.warning("特征数据未初始化,使用默认邻接矩阵")
-            # 默认自连接
-            for i in range(num_nodes):
-                adj[i, i] = 1
-            return adj
-        
-        # 计算特征之间的相关性
-        corr_matrix = np.corrcoef(self.features.T)
-        corr_threshold = 0.3  # 相关性阈值
-        
-        # 基于相关性构建有向边
-        for i in range(num_nodes):
-            adj[i, i] = 1  # 自连接
-            for j in range(num_nodes):
-                if i != j and abs(corr_matrix[i, j]) > corr_threshold:
-                    adj[i, j] = 1
-        
-        self.logger.info(f"邻接矩阵中边的数量: {int(torch.sum(adj))}")
+import os
+import pandas as pd
+import numpy as np
+import pywt
+import logging
+from sklearn.preprocessing import StandardScaler
+from sklearn.model_selection import train_test_split
+import torch
+import joblib
+from torch.utils.data import TensorDataset, DataLoader
+
+class DataPreprocessor:
+    def __init__(self, args, logger=None):
+        self.args = args
+        self.data_dir = args.data_dir
+        self.num_files = args.num_files
+        self.scaler_features = StandardScaler()
+        self.scaler_targets = StandardScaler()
+        self.logger = logger if logger is not None else self._default_logger()
+        self.features = None  # 保存特征数据用于构建邻接矩阵
+        self.scaler_dir = 'scalers'
+        os.makedirs(self.scaler_dir, exist_ok=True)
+        self.features_scaler_path = os.path.join(self.scaler_dir, 'features_scaler.joblib')
+        self.targets_scaler_path = os.path.join(self.scaler_dir, 'targets_scaler.joblib')
+        
+    def _default_logger(self):
+        """默认日志记录器"""
+        logger = logging.getLogger('DataPreprocessor')
+        logger.setLevel(logging.INFO)
+        console_handler = logging.StreamHandler()
+        console_handler.setLevel(logging.INFO)
+        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+        console_handler.setFormatter(formatter)
+        logger.addHandler(console_handler)
+        return logger
+    
+    def load_data(self):
+        """加载所有数据文件并合并"""
+        all_data = []
+        
+        for i in range(1, self.num_files + 1):
+            file_path = os.path.join(self.data_dir, f'data_process_{i}.csv')
+            try:
+                df = pd.read_csv(file_path, index_col=0)
+                df = df.reset_index()  # 将原索引作为第一列
+                all_data.append(df)
+                self.logger.info(f"Loaded file {i}/{self.num_files}")
+            except Exception as e:
+                self.logger.error(f"Error loading file {i}: {e}")
+        
+        combined_df = pd.concat(all_data, ignore_index=True)
+        return combined_df
+    
+    def decompose_time(self, df):
+        """将时间列分解为年、月、日、时、分、秒"""
+        time_col = df.columns[0]
+        df[time_col] = pd.to_datetime(df[time_col])
+        
+        df['year'] = df[time_col].dt.year
+        df['month'] = df[time_col].dt.month
+        df['day'] = df[time_col].dt.day
+        df['hour'] = df[time_col].dt.hour
+        df['minute'] = df[time_col].dt.minute
+        df['second'] = df[time_col].dt.second
+        
+        df = df.drop(columns=[time_col])
+        
+        # 调整列顺序
+        time_features = ['year', 'month', 'day', 'hour', 'minute', 'second']
+        other_features = [col for col in df.columns if col not in time_features]
+        df = df[time_features + other_features]
+        
+        return df
+    
+    def wavelet_denoising(self, data, wavelet='db4', level=1):
+        """对数据进行小波降噪,避免除以零警告"""
+        denoised_data = np.zeros_like(data)
+        epsilon = 1e-10  # 极小值,避免除以零
+        
+        for i in range(data.shape[1]):
+            # 小波分解
+            coeffs = pywt.wavedec(data[:, i], wavelet, level=level)
+            
+            # 计算阈值时避免系数为零
+            sigma = np.median(np.abs(coeffs[-level] + epsilon)) / 0.6745  # 加epsilon
+            original_length = len(data[:, i])
+            threshold = sigma * np.sqrt(2 * np.log(original_length))
+            
+            # 对系数进行阈值处理(手动实现软阈值,避免库函数警告)
+            processed_coeffs = []
+            for c in coeffs[1:]:
+                # 手动计算软阈值:y = sign(x) * max(|x| - threshold, 0)
+                magnitude = np.abs(c)
+                # 避免除以零:给magnitude加epsilon
+                thresholded = np.where(
+                    magnitude > threshold,
+                    np.sign(c) * (magnitude - threshold),
+                    0
+                )
+                processed_coeffs.append(thresholded)
+            
+            coeffs[1:] = processed_coeffs
+            
+            # 小波重构(保持之前的长度对齐处理)
+            reconstructed = pywt.waverec(coeffs, wavelet)
+            # 补充之前的长度对齐逻辑(如果之前已添加)
+            original_length = data[:, i].shape[0]
+            if len(reconstructed) > original_length:
+                reconstructed = reconstructed[:original_length]
+            elif len(reconstructed) < original_length:
+                reconstructed = np.pad(reconstructed, (0, original_length - len(reconstructed)), mode='edge')
+            
+            denoised_data[:, i] = reconstructed
+        
+        return denoised_data
+    
+    def normalize_data(self, features, targets):
+        """归一化数据并保存scaler"""
+        features_scaled = self.scaler_features.fit_transform(features)
+        targets_scaled = self.scaler_targets.fit_transform(targets)
+        
+        # 保存scaler
+        joblib.dump(self.scaler_features, self.features_scaler_path)
+        joblib.dump(self.scaler_targets, self.targets_scaler_path)
+        self.logger.info(f"已保存特征归一化模型到 {self.features_scaler_path}")
+        self.logger.info(f"已保存目标归一化模型到 {self.targets_scaler_path}")
+        
+        return features_scaled, targets_scaled
+    
+    def inverse_transform_targets(self, targets_scaled):
+        """将归一化的目标变量反变换回原始尺度"""
+        return self.scaler_targets.inverse_transform(targets_scaled)
+    
+    def split_data(self, features, targets):
+        """划分训练集、验证集和测试集"""
+        X_train, X_temp, y_train, y_temp = train_test_split(
+            features, targets, test_size=self.args.test_ratio + self.args.val_ratio, 
+            shuffle=False
+        )
+        
+        test_size = self.args.test_ratio / (self.args.test_ratio + self.args.val_ratio)
+        X_val, X_test, y_val, y_test = train_test_split(
+            X_temp, y_temp, test_size=test_size, shuffle=False
+        )
+        
+        return X_train, X_val, X_test, y_train, y_val, y_test
+    
+    def create_dataloaders(self, X_train, X_val, X_test, y_train, y_val, y_test):
+        """创建DataLoader"""
+        X_train = torch.FloatTensor(X_train)
+        y_train = torch.FloatTensor(y_train)
+        X_val = torch.FloatTensor(X_val)
+        y_val = torch.FloatTensor(y_val)
+        X_test = torch.FloatTensor(X_test)
+        y_test = torch.FloatTensor(y_test)
+        
+        train_dataset = TensorDataset(X_train, y_train)
+        val_dataset = TensorDataset(X_val, y_val)
+        test_dataset = TensorDataset(X_test, y_test)
+        
+        train_loader = DataLoader(
+            train_dataset, batch_size=self.args.batch_size, shuffle=True
+        )
+        val_loader = DataLoader(
+            val_dataset, batch_size=self.args.batch_size, shuffle=False
+        )
+        test_loader = DataLoader(
+            test_dataset, batch_size=self.args.batch_size, shuffle=False
+        )
+        
+        return train_loader, val_loader, test_loader
+    
+    def preprocess(self):
+        """完整的预处理流程"""
+        df = self.load_data()
+        self.logger.info(f"Original data shape: {df.shape}")
+        
+        df = self.decompose_time(df)
+        self.logger.info(f"Data shape after time decomposition: {df.shape}")
+        
+        data = df.values
+        data_denoised = self.wavelet_denoising(data)
+        self.logger.info(f"Data shape after wavelet denoising: {data_denoised.shape}")
+        
+        # 保存特征数据用于构建邻接矩阵
+        self.features = data_denoised[:, :self.args.num_features]
+        targets = data_denoised[:, self.args.num_features:self.args.num_features+self.args.num_targets]
+        self.logger.info(f"Features shape: {self.features.shape}, Targets shape: {targets.shape}")
+        
+        features_scaled, targets_scaled = self.normalize_data(self.features, targets)
+        
+        X_train, X_val, X_test, y_train, y_val, y_test = self.split_data(
+            features_scaled, targets_scaled
+        )
+        self.logger.info(f"Train: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}")
+        
+        train_loader, val_loader, test_loader = self.create_dataloaders(
+            X_train, X_val, X_test, y_train, y_val, y_test
+        )
+        
+        return train_loader, val_loader, test_loader, self
+    
+    def create_adjacency_matrix(self):
+        """创建有向图的邻接矩阵(基于特征相关性)"""
+        num_nodes = self.args.num_features
+        adj = torch.zeros((num_nodes, num_nodes))
+        
+        if self.features is None:
+            self.logger.warning("特征数据未初始化,使用默认邻接矩阵")
+            # 默认自连接
+            for i in range(num_nodes):
+                adj[i, i] = 1
+            return adj
+        
+        # 计算特征之间的相关性
+        corr_matrix = np.corrcoef(self.features.T)
+        corr_threshold = 0.3  # 相关性阈值
+        
+        # 基于相关性构建有向边
+        for i in range(num_nodes):
+            adj[i, i] = 1  # 自连接
+            for j in range(num_nodes):
+                if i != j and abs(corr_matrix[i, j]) > corr_threshold:
+                    adj[i, j] = 1
+        
+        self.logger.info(f"邻接矩阵中边的数量: {int(torch.sum(adj))}")
         return adj