浏览代码

1:简化展示和优化数据传入记录

wmy 5 月之前
父节点
当前提交
d1a7231203

+ 152 - 313
models/pressure-predictor/gat-lstm_model/20min/predict.py

@@ -44,6 +44,10 @@ except ImportError:
         logger = logging.getLogger(name)
         logger.setLevel(getattr(logging, level))
         
+        # 避免重复添加handler
+        if logger.handlers:
+            return logger
+        
         formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
         
         # 控制台处理器
@@ -58,6 +62,9 @@ except ImportError:
             file_handler.setFormatter(formatter)
             logger.addHandler(file_handler)
         
+        # 防止日志传播到root logger
+        logger.propagate = False
+        
         return logger
     
     def log_execution_time(func):
@@ -162,9 +169,7 @@ class Predictor:
             backup_count=self.config.get('logging.backup_count', 5)
         )
         
-        self.logger.info("=" * 80)
         self.logger.info("初始化20分钟TMP预测器")
-        self.logger.info("=" * 80)
         
         # 模型参数(从配置文件加载)
         self.seq_len = self.config.get('model.seq_len', 10)
@@ -209,13 +214,11 @@ class Predictor:
         
         if use_cuda and torch.cuda.is_available():
             self.device = torch.device(f"cuda:{cuda_device}")
-            self.logger.info(f"使用GPU设备: {self.device} ({torch.cuda.get_device_name(cuda_device)})")
+            self.logger.info(f"使用设备: GPU-{torch.cuda.get_device_name(cuda_device)}")
         else:
             self.device = torch.device("cpu")
-            self.logger.info("使用CPU设备")
+            self.logger.info("使用设备: CPU")
         
-        # 设置随机种子
-        self.logger.info(f"设置随机种子: {self.random_seed}")
         set_seed(self.random_seed)
         
         # 加载数据归一化器
@@ -223,7 +226,6 @@ class Predictor:
             self.logger.error(f"归一化器文件不存在: {self.scaler_path}")
             raise FileNotFoundError(f"归一化器文件不存在: {self.scaler_path}")
         
-        self.logger.info(f"加载数据归一化器: {self.scaler_path}")
         self.scaler = joblib.load(self.scaler_path)
         
         # 初始化模型和数据加载器(后续加载)
@@ -232,25 +234,9 @@ class Predictor:
         self.test_loader = None
         
         self.logger.info("预测器初始化完成")
-        self.logger.info(f"模型参数: seq_len={self.seq_len}, output_size={self.output_size}, "
-                        f"labels_num={self.labels_num}, feature_num={self.feature_num}")
-        self.logger.info(f"数据参数: resolution={self.resolution}, batch_size={self.batch_size}")
         
     def reorder_columns(self, df):
-        """
-        调整数据列顺序,确保与训练时的特征顺序一致
-        
-        Args:
-            df: 输入的DataFrame
-            
-        Returns:
-            DataFrame: 列顺序调整后的DataFrame
-            
-        Note:
-            - 避免因列顺序不一致导致模型输入特征错位
-            - 必须包含所有必需的特征列
-        """
-        self.logger.debug("开始重排数据列顺序")
+        """调整数据列顺序,确保与训练时的特征顺序一致"""
         desired_order = [
             'index',
             'C.M.FT_ZGJJY1@out','C.M.RO1_FT_JS@out','C.M.RO2_FT_JS@out','C.M.RO3_FT_JS@out',
@@ -273,302 +259,182 @@ class Predictor:
             'C.M.RO1_DB@DPT_1','C.M.RO2_DB@DPT_1','C.M.RO3_DB@DPT_1','C.M.RO4_DB@DPT_1',
             'C.M.RO1_DB@DPT_2','C.M.RO2_DB@DPT_2','C.M.RO3_DB@DPT_2','C.M.RO4_DB@DPT_2',
         ]
-        self.logger.debug(f"原始列: {list(df.columns)}")
-        self.logger.debug(f"目标列顺序: {desired_order}")
         return df.loc[:, desired_order]
 
     def process_date(self, data):
-        """
-        处理日期列,生成周期性时间特征
-        
-        Args:
-            data: 输入DataFrame,必须包含'index'或'date'列
-            
-        Returns:
-            DataFrame: 包含时间特征的DataFrame
-            
-        Note:
-            - 生成分钟级正弦/余弦特征(捕捉每日周期性模式)
-            - 生成年中日正弦/余弦特征(捕捉年度周期性模式)
-            - 使用三角函数编码确保时间连续性(避免边界突变)
-        """
-        self.logger.debug("开始处理日期特征")
+        """处理日期列,生成周期性时间特征"""
         if 'index' in data.columns:
             data = data.rename(columns={'index': 'date'})
         data['date'] = pd.to_datetime(data['date'])
         data['minute_of_day'] = data['date'].dt.hour * 60 + data['date'].dt.minute
         data['day_of_year'] = data['date'].dt.dayofyear
         
-        # 周期性编码(将时间转换为正弦/余弦值,确保周期性连续)
-        data['minute_sin'] = np.sin(2 * np.pi * data['minute_of_day'] / 1440)  # 分钟正弦特征
-        data['minute_cos'] = np.cos(2 * np.pi * data['minute_of_day'] / 1440)  # 分钟余弦特征
-        data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366)   # 年中日正弦特征
-        data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366)   # 年中日余弦特征
-        # 移除原始时间列(仅保留编码后的特征)
+        # 周期性编码
+        data['minute_sin'] = np.sin(2 * np.pi * data['minute_of_day'] / 1440)
+        data['minute_cos'] = np.cos(2 * np.pi * data['minute_of_day'] / 1440)
+        data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366)
+        data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366)
         data.drop(columns=['minute_of_day', 'day_of_year'], inplace=True)
         
-        # 调整列顺序:日期 + 时间特征 + 其他特征
+        # 调整列顺序
         time_features = ['minute_sin', 'minute_cos', 'day_year_sin', 'day_year_cos']
         other_columns = [col for col in data.columns if col not in ['date'] + time_features]
-        self.logger.debug(f"生成时间特征: {time_features}")
         return data[['date'] + time_features + other_columns]
 
     def scaler_data(self, data):
-        """
-        对数据进行归一化处理
-        
-        Args:
-            data: 输入DataFrame
-            
-        Returns:
-            DataFrame: 归一化后的DataFrame
-            
-        Note:
-            - 使用训练时保存的scaler进行归一化
-            - 保持与训练数据的归一化方式一致(MinMax 0-1缩放)
-            - 日期列不参与归一化
-        """
-        self.logger.debug("开始数据归一化")
+        """对数据进行归一化处理"""
         date_col = data[['date']]
         data_to_scale = data.drop(columns=['date'])
         scaled = self.scaler.transform(data_to_scale)
         scaled_df = pd.DataFrame(scaled, columns=data_to_scale.columns)
-        # 拼接日期列和归一化后的特征列
         result = pd.concat([date_col.reset_index(drop=True), scaled_df], axis=1)
-        self.logger.debug(f"归一化完成,数据形状: {result.shape}")
         return result
     
     def remove_outliers(self, predictions):
-        """
-        使用四分位法处理预测结果中的异常值
-        
-        Args:
-            predictions: numpy数组,形状为[时间步, 标签数]
-            
-        Returns:
-            numpy数组: 处理异常值后的预测结果
-            
-        Note:
-            - 异常值定义:小于Q1-1.5*IQR或大于Q3+1.5*IQR的值
-            - 异常值替换为正常值的平均值(避免极端值影响结果)
-            - 按列(每个指标)独立处理
-        """
-        self.logger.info("开始移除异常值(四分位法)")
+        """使用四分位法处理预测结果中的异常值"""
         cleaned = predictions.copy()
-        # 遍历每个特征列(16个标签)
         for col in range(cleaned.shape[1]):
             values = cleaned[:, col]
-            # 计算四分位数
             q1 = np.percentile(values, 25)
             q3 = np.percentile(values, 75)
             iqr = q3 - q1
-            # 异常值边界
             lower_bound = q1 - 1.5 * iqr
             upper_bound = q3 + 1.5 * iqr
-            # 筛选正常值
             normal_values = values[(values >= lower_bound) & (values <= upper_bound)]
-            # 用正常值的平均值替换异常值
             if len(normal_values) > 0:
                 mean_normal = np.mean(normal_values)
-                outlier_count = np.sum((values < lower_bound) | (values > upper_bound))
-                if outlier_count > 0:
-                    self.logger.debug(f"列{col}: 检测到{outlier_count}个异常值,替换为均值{mean_normal:.4f}")
                 cleaned[(values < lower_bound) | (values > upper_bound), col] = mean_normal
-        self.logger.info("异常值处理完成")
         return cleaned
     
     def smooth_predictions(self, predictions):
-        """
-        对预测结果进行加权平滑处理
-        
-        Args:
-            predictions: numpy数组,形状为[时间步, 标签数]
-            
-        Returns:
-            numpy数组: 平滑后的预测结果
-            
-        Note:
-            - 采用滑动窗口加权平均减少预测波动
-            - 中间值权重为2,前后邻居权重为1
-            - 边缘值特殊处理(避免过度平滑)
-        """
-        self.logger.info("开始平滑预测结果")
+        """对预测结果进行加权平滑处理"""
         smoothed = predictions.copy()
         n_timesteps = predictions.shape[0]
         if n_timesteps <= 1:
             return smoothed
         
-        # 遍历每个特征列
         for col in range(predictions.shape[1]):
             values = predictions[:, col]
-            # 第一个值:加权前两个值(避免边缘过度平滑)
             smoothed[0, col] = (2 * values[0] + values[1]) / 3
-            # 中间值:加权前后邻居(核心平滑)
             for i in range(1, n_timesteps - 1):
                 smoothed[i, col] = (values[i-1] + 2 * values[i] + values[i+1]) / 4
-            # 最后一个值:加权最后两个值(避免边缘过度平滑)
             smoothed[-1, col] = (values[-2] + 2 * values[-1]) / 3
-        self.logger.info("预测结果平滑完成")
         return smoothed
 
     def create_test_loader(self, df):
-        """
-        构建测试数据加载器
-        
-        Args:
-            df: 预处理后的DataFrame
-            
-        Returns:
-            DataLoader: PyTorch数据加载器
-            
-        Note:
-            - 将原始时间序列数据转换为模型输入格式
-            - 构建滑动窗口序列:[样本数, 序列长度, 特征数]
-            - 确保有足够的历史数据构建输入序列
-        """
-        self.logger.info("创建测试数据加载器")
+        """构建测试数据加载器"""
         df['date'] = pd.to_datetime(df['date'])
-        # 计算时间间隔(根据分辨率,单位:分钟)
         time_interval = pd.Timedelta(minutes=(4 * self.resolution / 60))
-        # 计算窗口时间跨度(确保能覆盖输入序列长度+预测步长)
         window_time_span = time_interval * (self.seq_len + 20)
-        # 调整测试集起始时间(确保有足够的历史数据构建输入序列)
         adjusted_test_start = pd.to_datetime(self.test_start_date) - window_time_span
-        # 筛选所需的历史数据
         test_df = df[df['date'] >= adjusted_test_start].reset_index(drop=True)
-
         test_df = test_df.drop(columns=['date'])
 
-        # 构建监督学习数据集(输入序列+目标序列的占位)
+        # 构建监督学习数据集
         feature_columns = test_df.columns.tolist()
         cols = []
         
-        # 构建输入序列(历史seq_len个时间步的特征)
         for col in feature_columns:
             for i in range(self.seq_len - 1, -1, -1):
-                cols.append(test_df[[col]].shift(i))   # 滞后i步的特征(t-0到t-(seq_len-1))
+                cols.append(test_df[[col]].shift(i))
                 
-        # 构建目标序列占位(未来output_size个时间步的标签,预测时不使用真实值)
         for i in range(1, self.output_size + 1):
             for col in feature_columns[-self.labels_num:]:
-                cols.append(test_df[[col]].shift(-i))    # 超前i步的标签(t+1到t+output_size)
+                cols.append(test_df[[col]].shift(-i))
                 
-        # 合并列并按步长采样,最后取最后一行作为预测输入(最新的历史数据)
         dataset = pd.concat(cols, axis=1).iloc[::self.step_size]
         dataset = dataset.iloc[[-1]]
     
-        # 提取输入特征(前n_features_total列)
         n_features_total = self.feature_num * self.seq_len
         supervised_data = dataset.iloc[:, :n_features_total]
 
-        # 转换为模型输入格式:[样本数, 序列长度, 特征数]
         X = supervised_data.values.reshape(-1, self.seq_len, self.feature_num)
         X = torch.tensor(X, dtype=torch.float32).to(self.device)
         tensor_dataset = TensorDataset(X)
         loader = DataLoader(tensor_dataset, batch_size=self.batch_size, shuffle=False)
         
-        self.logger.info(f"测试数据加载器创建完成,输入形状: {X.shape}")
         return loader
 
     @log_execution_time
     def load_data(self, df):
-        """
-        数据加载和预处理主流程
+        """数据加载和预处理"""
+        self.logger.info(f"[数据加载] 原始形状: {df.shape}, 列数: {len(df.columns)}")
         
-        Args:
-            df: 原始输入DataFrame
-            
-        Note:
-            - 重排列特征列顺序
-            - 下采样(根据resolution参数)
-            - 日期特征工程
-            - 数据归一化
-            - 创建测试数据加载器
-            - 加载图结构边索引
-        """
-        self.logger.info("开始加载和预处理数据")
-        self.logger.info(f"原始数据形状: {df.shape}")
+        try:
+            df = self.reorder_columns(df)
+            self.logger.info(f"[列重排] 完成")
+        except Exception as e:
+            self.logger.error(f"[列重排] 失败: {e}")
+            raise
         
-        df = self.reorder_columns(df)
-        self.logger.info(f"下采样率: {self.resolution}")
         df = df.iloc[::self.resolution, :].reset_index(drop=True)
-        self.logger.info(f"下采样后数据形状: {df.shape}")
-        
-        df = self.process_date(df)
-        df = self.scaler_data(df)
-        self.test_loader = self.create_test_loader(df)
+        self.logger.info(f"[下采样] 采样率={self.resolution}, 采样后形状: {df.shape}")
+        
+        try:
+            df = self.process_date(df)
+            self.logger.info(f"[时间特征] 生成完成")
+        except Exception as e:
+            self.logger.error(f"[时间特征] 生成失败: {e}")
+            raise
+        
+        try:
+            df = self.scaler_data(df)
+            self.logger.info(f"[归一化] 完成")
+        except Exception as e:
+            self.logger.error(f"[归一化] 失败: {e}")
+            raise
+        
+        try:
+            self.test_loader = self.create_test_loader(df)
+            self.logger.info(f"[数据加载器] 创建完成")
+        except Exception as e:
+            self.logger.error(f"[数据加载器] 创建失败: {e}")
+            raise
         
         if not os.path.exists(self.edge_index_path):
-            self.logger.error(f"图边索引文件不存在: {self.edge_index_path}")
+            self.logger.error(f"[结构] 边索引文件不存在: {self.edge_index_path}")
             raise FileNotFoundError(f"图边索引文件不存在: {self.edge_index_path}")
         
-        self.logger.info(f"加载图边索引: {self.edge_index_path}")
         self.edge_index = torch.load(self.edge_index_path, map_location=self.device, weights_only=True)
-        self.logger.info("数据加载和预处理完成")
+        self.logger.info(f"[图结构] 边索引加载完成, shape: {self.edge_index.shape}")
 
     @log_execution_time
     def load_model(self):
-        """
-        加载模型结构和预训练权重
-        
-        Raises:
-            FileNotFoundError: 模型文件不存在
-            
-        Note:
-            - 实例化GAT-LSTM模型
-            - 加载预训练权重
-            - 设置为评估模式(关闭dropout和batch normalization)
-            - 设置图结构边索引
-        """
+        """加载模型和预训练权重"""
         if not os.path.exists(self.model_path):
-            self.logger.error(f"模型文件不存在: {self.model_path}")
+            self.logger.error(f"[模型加载] 文件不存在: {self.model_path}")
             raise FileNotFoundError(f"模型文件不存在: {self.model_path}")
         
-        self.logger.info("开始加载模型")
-        self.logger.info(f"模型路径: {self.model_path}")
-        
-        self.model = GAT_LSTM(self).to(self.device)
-        
-        if self.edge_index is not None:
-            self.logger.debug(f"设置图边索引,形状: {self.edge_index.shape}")
-            self.model.set_edge_index(self.edge_index.to(self.device))
-        
-        self.model.load_state_dict(torch.load(self.model_path, map_location=self.device, weights_only=True))
-        self.model.eval()
-        
-        # 统计模型参数量
-        total_params = sum(p.numel() for p in self.model.parameters())
-        trainable_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad)
-        self.logger.info(f"模型加载完成 - 总参数量: {total_params:,}, 可训练参数量: {trainable_params:,}")
+        try:
+            self.logger.info("[模型加载] 初始化模型结构")
+            self.model = GAT_LSTM(self).to(self.device)
+            
+            if self.edge_index is not None:
+                self.model.set_edge_index(self.edge_index.to(self.device))
+                self.logger.info("[模型加载] 图结构边索引设置完成")
+            
+            self.model.load_state_dict(torch.load(self.model_path, map_location=self.device, weights_only=True))
+            self.model.eval()
+            
+            total_params = sum(p.numel() for p in self.model.parameters())
+            self.logger.info(f"[模型加载] 完成 - 参数量: {total_params:,}")
+        except Exception as e:
+            self.logger.error(f"[模型加载] 失败: {e}")
+            raise
 
     @log_execution_time
     def predict(self, df):
-        """
-        执行预测主流程
-        
-        Args:
-            df: 原始输入DataFrame,必须包含'index'列(时间戳)
-            
-        Returns:
-            numpy数组: 反归一化后的预测结果,形状为[output_size, labels_num]
-            
-        Note:
-            - 自动更新测试起始时间为输入数据最新时间+4分钟
-            - 执行数据预处理
-            - 加载模型
-            - 执行批量预测
-            - 反归一化预测结果
-            - 可选的异常值处理和平滑
-        """
-        self.logger.info("=" * 80)
-        self.logger.info("开始预测流程")
-        self.logger.info("=" * 80)
-        
-        # 更新测试起始时间为输入数据最新时间+4分钟(预测起始点)
-        latest_time = pd.to_datetime(df['index']).max()
-        self.test_start_date = (latest_time + timedelta(minutes=4)).strftime("%Y-%m-%d %H:%M:%S")
-        self.logger.info(f"输入数据最新时间: {latest_time}")
-        self.logger.info(f"预测起始时间: {self.test_start_date}")
+        """执行预测"""
+        self.logger.info("[预测流程] 开始")
+        
+        try:
+            # 更新测试起始时间
+            latest_time = pd.to_datetime(df['index']).max()
+            self.test_start_date = (latest_time + timedelta(minutes=4)).strftime("%Y-%m-%d %H:%M:%S")
+            self.logger.info(f"[预测时间] 输入数据最新时间: {latest_time}, 预测起始时间: {self.test_start_date}")
+        except Exception as e:
+            self.logger.error(f"[预测时间] 计算失败: {e}")
+            raise
         
         # 加载和预处理数据
         self.load_data(df)
@@ -576,139 +442,112 @@ class Predictor:
         # 加载模型
         self.load_model()
 
-        # 执行预测
-        self.logger.info("开始模型推理")
-        all_predictions = []
-        with torch.no_grad():
-            for batch_idx, batch in enumerate(self.test_loader):
-                inputs = batch[0].to(self.device)
-                outputs = self.model(inputs)
-                all_predictions.append(outputs.cpu().numpy())
-                self.logger.debug(f"批次 {batch_idx + 1} 推理完成,输入形状: {inputs.shape}, 输出形状: {outputs.shape}")
-        
-        # 拼接所有批次的预测结果,并重塑为[时间步, 标签数]
-        predictions = np.concatenate(all_predictions, axis=0).reshape(-1, self.labels_num)
-        self.logger.info(f"模型推理完成,预测结果形状: {predictions.shape}")
-        
-        # 反归一化(仅对标签列,使用训练时的scaler参数)
-        self.logger.info("开始反归一化预测结果")
-        from sklearn.preprocessing import MinMaxScaler
-        inverse_scaler = MinMaxScaler()
-        inverse_scaler.min_ = self.scaler.min_[-self.labels_num:]
-        inverse_scaler.scale_ = self.scaler.scale_[-self.labels_num:]
-        predictions = inverse_scaler.inverse_transform(predictions)
-        self.logger.info("反归一化完成")
-        
-        # 可选:异常值处理和平滑(根据配置文件决定是否启用)
+        # 执行推理
+        try:
+            self.logger.info("[模型推理] 开始")
+            all_predictions = []
+            batch_count = 0
+            with torch.no_grad():
+                for batch in self.test_loader:
+                    inputs = batch[0].to(self.device)
+                    outputs = self.model(inputs)
+                    all_predictions.append(outputs.cpu().numpy())
+                    batch_count += 1
+            
+            self.logger.info(f"[模型推理] 完成 - 批次数: {batch_count}")
+            predictions = np.concatenate(all_predictions, axis=0).reshape(-1, self.labels_num)
+            self.logger.info(f"[模型推理] 原始预测形状: {predictions.shape}")
+        except Exception as e:
+            self.logger.error(f"[模型推理] 失败: {e}")
+            raise
+        
+        # 反归一化
+        try:
+            self.logger.info("[反归一化] 开始")
+            from sklearn.preprocessing import MinMaxScaler
+            inverse_scaler = MinMaxScaler()
+            inverse_scaler.min_ = self.scaler.min_[-self.labels_num:]
+            inverse_scaler.scale_ = self.scaler.scale_[-self.labels_num:]
+            predictions = inverse_scaler.inverse_transform(predictions)
+            self.logger.info(f"[反归一化] 完成 - 值域: [{predictions.min():.2f}, {predictions.max():.2f}]")
+        except Exception as e:
+            self.logger.error(f"[反归一化] 失败: {e}")
+            raise
+        
+        # 可选后处理
         if self.remove_outliers_flag:
+            self.logger.info("[后处理] 执行异常值移除")
             predictions = self.remove_outliers(predictions)
-        
         if self.smooth_flag:
+            self.logger.info("[后处理] 执行平滑处理")
             predictions = self.smooth_predictions(predictions)
         
-        self.logger.info(f"预测流程完成,最终预测结果形状: {predictions.shape}")
-        self.logger.info(f"预测值范围: min={predictions.min():.4f}, max={predictions.max():.4f}, mean={predictions.mean():.4f}")
+        self.logger.info(f"[预测流程] 完成 - 最终形状: {predictions.shape}, 值域: [{predictions.min():.2f}, {predictions.max():.2f}]")
         
         return predictions
 
     def save_predictions(self, predictions, start_date=None, output_path=None):
-        """
-        保存预测结果为CSV文件并返回DataFrame
-        
-        Args:
-            predictions: 反归一化后的预测结果(numpy数组)
-            start_date: 预测起始时间字符串,格式:'YYYY-MM-DD HH:MM:SS',如果为None则使用test_start_date
-            output_path: 输出CSV路径,如果为None则使用默认路径
-            
-        Returns:
-            DataFrame: 包含日期和预测结果的DataFrame
+        """保存预测结果为CSV并返回DataFrame"""
+        try:
+            if start_date is None:
+                start_date = self.test_start_date
             
-        Note:
-            - 生成时间戳序列
-            - 添加列名
-            - 保存为CSV格式
-        """
-        self.logger.info("开始保存预测结果")
-        
-        if start_date is None:
-            start_date = self.test_start_date
+            self.logger.info(f"[保存结果] 预测起始时间: {start_date}, 预测点数: {len(predictions)}")
             
-        start_time = datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S")
-        time_interval = timedelta(minutes=(4 * self.resolution / 60))
-        timestamps = [start_time + i * time_interval for i in range(len(predictions))]
+            start_time = datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S")
+            time_interval = timedelta(minutes=(4 * self.resolution / 60))
+            timestamps = [start_time + i * time_interval for i in range(len(predictions))]
 
-        # 定义16个预测目标的原始列名
-        base_columns = [
-            'C.M.UF1_DB@press_PV', 'C.M.UF2_DB@press_PV', 'C.M.UF3_DB@press_PV', 'C.M.UF4_DB@press_PV',
-            'UF1Per','UF2Per','UF3Per','UF4Per',
-            'C.M.RO1_DB@DPT_1', 'C.M.RO2_DB@DPT_1', 'C.M.RO3_DB@DPT_1', 'C.M.RO4_DB@DPT_1',
-            'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_2',
-        ]
+            base_columns = [
+                'C.M.UF1_DB@press_PV', 'C.M.UF2_DB@press_PV', 'C.M.UF3_DB@press_PV', 'C.M.UF4_DB@press_PV',
+                'UF1Per','UF2Per','UF3Per','UF4Per',
+                'C.M.RO1_DB@DPT_1', 'C.M.RO2_DB@DPT_1', 'C.M.RO3_DB@DPT_1', 'C.M.RO4_DB@DPT_1',
+                'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_2',
+            ]
 
-        pred_columns = [f'{col}_Predicted' for col in base_columns]
-        df_result = pd.DataFrame(predictions, columns=pred_columns)
-        df_result.insert(0, 'index', timestamps)
-        
-        # 如果指定了输出路径则使用,否则使用默认路径
-        save_path = output_path if output_path else self.output_csv_path
-        df_result.to_csv(save_path, index=False)
-        
-        self.logger.info(f"预测结果已保存至: {save_path}")
-        self.logger.info(f"预测时间范围: {timestamps[0]} 至 {timestamps[-1]}")
-        self.logger.info(f"预测记录数: {len(predictions)}")
-        
-        return df_result
+            pred_columns = [f'{col}_Predicted' for col in base_columns]
+            df_result = pd.DataFrame(predictions, columns=pred_columns)
+            df_result.insert(0, 'index', timestamps)
+            
+            save_path = output_path if output_path else self.output_csv_path
+            df_result.to_csv(save_path, index=False)
+            self.logger.info(f"[保存结果] 完成 - 文件: {save_path}, 时间范围: {timestamps[0]} 至 {timestamps[-1]}")
+            
+            return df_result
+        except Exception as e:
+            self.logger.error(f"[保存结果] 失败: {e}")
+            raise
 
 
 if __name__ == '__main__':
-    """
-    主函数:执行20分钟TMP预测
-    
-    使用方法:
-        1. 准备输入数据(JSON格式)
-        2. 运行此脚本
-        3. 查看预测结果(保存在20min_predictions.csv)
-        
-    输入数据格式:
-        - JSON文件,包含历史时间序列数据
-        - 必须包含'index'列(时间戳)和所有必需的特征列
-    """
+    """主函数:执行20分钟TMP预测"""
     import json
     import os
     import pandas as pd
-    from datetime import timedelta
     
     try:
-        # 初始化预测器(自动加载配置文件)
         predictor = Predictor()
         
-        # 读取JSON文件作为输入数据
-        json_file_path = '/Users/wmy/Downloads/pp.json'  # pp.json文件路径,可根据实际位置修改
-        
+        json_file_path = '/Users/wmy/Downloads/pp.json'
         if not os.path.exists(json_file_path):
             predictor.logger.error(f"输入文件不存在: {json_file_path}")
             raise FileNotFoundError(f"未找到文件: {json_file_path}")
         
         predictor.logger.info(f"读取输入文件: {json_file_path}")
         
-        # 解析JSON并转换为DataFrame
         with open(json_file_path, 'r', encoding='utf-8') as f:
             json_data = json.load(f)
             df = pd.DataFrame(json_data)
-            predictor.logger.info(f"成功读取输入数据,数据形状: {df.shape}")
 
-        # 执行预测并保存结果
         predictions = predictor.predict(df)
         predictor.save_predictions(predictions)
         
-        predictor.logger.info("=" * 80)
-        predictor.logger.info("预测任务全部完成!")
-        predictor.logger.info("=" * 80)
+        predictor.logger.info("预测任务完成")
         
     except Exception as e:
         if 'predictor' in locals():
-            predictor.logger.error(f"预测过程发生错误: {str(e)}", exc_info=True)
+            predictor.logger.error(f"预测失败: {str(e)}", exc_info=True)
         else:
-            print(f"初始化预测器时发生错误: {str(e)}")
+            print(f"初始化失败: {str(e)}")
         raise
 

+ 3 - 0
models/pressure-predictor/gat-lstm_model/README.md

@@ -33,6 +33,9 @@ gat-lstm_model/
 │   ├── api.log            # API服务日志
 │   └── 20min_predict.log  # 预测日志
+├── received_data/          # 接口接收数据存档
+│   └── request_*.json     # 请求数据文件
+│
 ├── api_main.py            # FastAPI主程序
 ├── config.yaml            # 配置文件
 ├── start.sh               # 启动脚本

+ 99 - 164
models/pressure-predictor/gat-lstm_model/api_main.py

@@ -1,9 +1,9 @@
 """
 GAT-LSTM TMP预测模型 - FastAPI服务
-版本:1.1.0
-最后更新:2025-10-29
+版本:2.0.0
+最后更新:2025-10-30
 
-提供20分钟短期TMP预测的API服务
+20分钟短期TMP预测的API服务
 """
 
 import os
@@ -11,7 +11,6 @@ import sys
 import logging
 from logging.handlers import RotatingFileHandler
 import datetime
-import json
 import pandas as pd
 import uvicorn
 from fastapi import FastAPI, HTTPException
@@ -19,32 +18,28 @@ from fastapi.middleware.cors import CORSMiddleware
 from pydantic import BaseModel
 from typing import List, Dict, Any
 
-# --- 日志配置 ---
-# 日志保存在logs目录下
-log_dir = os.path.join(os.path.dirname(__file__), 'logs')
+# --- 日志和数据目录配置 ---
+base_dir = os.path.dirname(__file__)
+log_dir = os.path.join(base_dir, 'logs')
 os.makedirs(log_dir, exist_ok=True)
 
-log_handler = RotatingFileHandler(
-    os.path.join(log_dir, "api.log"),
-    maxBytes=2 * 1024 * 1024,  # 2 MB
-    backupCount=5,
-    encoding='utf-8'
+# 数据保存目录
+data_save_dir = os.path.join(base_dir, 'received_data')
+os.makedirs(data_save_dir, exist_ok=True)
+
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(levelname)s - %(message)s',
+    handlers=[
+        RotatingFileHandler(
+            os.path.join(log_dir, "api.log"),
+            maxBytes=2 * 1024 * 1024,
+            backupCount=5,
+            encoding='utf-8'
+        ),
+        logging.StreamHandler()
+    ]
 )
-
-# 支持环境变量控制日志详细程度
-LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper()
-DETAILED_LOGS = os.getenv('DETAILED_LOGS', 'false').lower() == 'true'
-
-# 避免重复配置日志处理器
-if not logging.getLogger().handlers:
-    logging.basicConfig(
-        level=getattr(logging, LOG_LEVEL),
-        format='%(asctime)s - %(levelname)s - %(message)s',
-        handlers=[
-            log_handler,
-            logging.StreamHandler()
-        ]
-    )
 logger = logging.getLogger(__name__)
 
 # --- 添加当前目录到Python路径 ---
@@ -52,55 +47,22 @@ current_dir = os.path.dirname(os.path.abspath(__file__))
 if current_dir not in sys.path:
     sys.path.insert(0, current_dir)
 
-# --- 模型导入与模拟 ---
-# 优先尝试导入真实模型,如果失败则使用模拟类(Mock Class)替代
-try:
-    # 使用importlib动态导入(因为模块名以数字开头)
-    import importlib.util
-    predict_module_path = os.path.join(current_dir, '20min', 'predict.py')
-    spec = importlib.util.spec_from_file_location("predict_20min", predict_module_path)
-    predict_module = importlib.util.module_from_spec(spec)
-    spec.loader.exec_module(predict_module)
-    Predictor = predict_module.Predictor
-    logger.info("成功加载20分钟TMP预测模型模块。")
-except Exception as e:
-    logger.warning(f"未能找到20分钟模型模块: {e},将使用模拟类进行替代。")
-    logger.warning("请确保模型模块路径正确。")
-    
-    class Predictor:
-        """模拟预测器"""
-        def predict(self, df: pd.DataFrame):
-            logger.info("正在使用模拟的 Predictor.predict 方法...")
-            import numpy as np
-            # 模拟返回5个时间步,16个特征的预测结果
-            return np.random.rand(5, 16) * 100
-        
-        def save_predictions(self, res, start_date: str, output_path: str = None) -> pd.DataFrame:
-            logger.info("正在使用模拟的 Predictor.save_predictions 方法...")
-            start_dt = datetime.datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S")
-            time_index = [start_dt + datetime.timedelta(minutes=4 * i) for i in range(len(res))]
-            # 创建包含16个预测列的DataFrame
-            columns = [
-                'C.M.UF1_DB@press_PV_Predicted', 'C.M.UF2_DB@press_PV_Predicted',
-                'C.M.UF3_DB@press_PV_Predicted', 'C.M.UF4_DB@press_PV_Predicted',
-                'UF1Per_Predicted', 'UF2Per_Predicted', 'UF3Per_Predicted', 'UF4Per_Predicted',
-                'C.M.RO1_DB@DPT_1_Predicted', 'C.M.RO2_DB@DPT_1_Predicted',
-                'C.M.RO3_DB@DPT_1_Predicted', 'C.M.RO4_DB@DPT_1_Predicted',
-                'C.M.RO1_DB@DPT_2_Predicted', 'C.M.RO2_DB@DPT_2_Predicted',
-                'C.M.RO3_DB@DPT_2_Predicted', 'C.M.RO4_DB@DPT_2_Predicted'
-            ]
-            result_df = pd.DataFrame(res, columns=columns)
-            result_df.insert(0, 'index', time_index)
-            return result_df
+# --- 导入20分钟预测模型 ---
+import importlib.util
+predict_module_path = os.path.join(current_dir, '20min', 'predict.py')
+spec = importlib.util.spec_from_file_location("predict_20min", predict_module_path)
+predict_module = importlib.util.module_from_spec(spec)
+spec.loader.exec_module(predict_module)
+Predictor = predict_module.Predictor
+logger.info("成功加载20分钟TMP预测模型")
 
 # --- FastAPI 应用初始化 ---
 app = FastAPI(
-    title="智能决策与预测 API",
-    description="一个集成了GAT-LSTM TMP预测模型的 FastAPI 服务。",
-    version="1.1.0"
+    title="GAT-LSTM TMP预测 API",
+    description="20分钟短期TMP预测服务",
+    version="2.0.0"
 )
 
-# 配置CORS中间件
 app.add_middleware(
     CORSMiddleware,
     allow_origins=["*"],
@@ -110,164 +72,137 @@ app.add_middleware(
 )
 
 # --- 全局模型实例 ---
-xishan_predict = Predictor()
-logger.info("预测模型实例初始化完成")
+predictor = Predictor()
+logger.info("预测初始化完成")
 
 
-# --- Pydantic 数据校验模型 ---
+# --- 数据模型 ---
 class TimeSeriesDataPoint(BaseModel):
-    """定义单个时间序列数据点的结构,允许包含除datetime外的其他任意字段"""
     datetime: str
-    
     class Config:
         extra = "allow"
 
 
-class DoubleMembranceRequest(BaseModel):
-    """双膜模型预测的请求体结构"""
+class PredictRequest(BaseModel):
     data: List[TimeSeriesDataPoint]
 
 
-class SuccessResponse(BaseModel):
-    """定义标准的成功响应结构"""
+class PredictResponse(BaseModel):
     success: bool = True
     predict_result: List[Dict[str, Any]]
 
 
-# --- API 端点定义 ---
+# --- API 端点 ---
 @app.post(
     "/api/v1/process_model/double_membrance",
-    response_model=SuccessResponse,
-    summary="双膜环境体模型预测",
-    tags=["模型处理"]
+    response_model=PredictResponse,
+    summary="20分钟TMP预测",
+    tags=["预测"]
 )
-def get_double_membrance_model(request: DoubleMembranceRequest):
-    """接收历史时序数据,使用GAT-LSTM模型进行未来趋势预测。"""
+def get_double_membrance_model(request: PredictRequest):
+    """接收历史时序数据,返回未来20分钟TMP预测结果"""
     try:
-        # 精简的请求开始日志
-        logger.info(f"开始双膜环境体模型预测 - 数据点: {len(request.data)}")
+        logger.info(f"收到预测请求 - 数据点数: {len(request.data)}")
         
         if not request.data:
-            raise HTTPException(status_code=400, detail="输入数据 'data' 不能为空")
+            raise HTTPException(status_code=400, detail="输入数据不能为空")
         
-        # 详细日志仅在调试模式下记录
-        if DETAILED_LOGS:
-            logger.info("输入数据结构分析:")
-            logger.info(f"  - 数据点数量: {len(request.data)}")
-            logger.info(f"  - 时间范围: {request.data[0].datetime} 到 {request.data[-1].datetime}")
+        # 转换为DataFrame
+        df = pd.DataFrame([item.dict() for item in request.data])
+        logger.info(f"数据转换完成 - shape: {df.shape}, 列数: {len(df.columns)}")
+        
+        # 保存接收到的数据
+        try:
+            import json
+            timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
+            save_filename = f"request_{timestamp}.json"
+            save_path = os.path.join(data_save_dir, save_filename)
             
-            sample_data = request.data[0].dict()
-            feature_count = len([k for k in sample_data.keys() if k != 'datetime'])
-            logger.info(f"  - 特征数量: {feature_count}")
+            with open(save_path, 'w', encoding='utf-8') as f:
+                json.dump({
+                    "timestamp": datetime.datetime.now().isoformat(),
+                    "data_shape": {"rows": len(df), "columns": len(df.columns)},
+                    "data": [item.dict() for item in request.data]
+                }, f, ensure_ascii=False, indent=2)
+            logger.info(f"请求数据已保存: {save_filename}")
+        except Exception as e:
+            logger.warning(f"保存请求数据失败: {e}")
         
-        # 将输入数据转换为DataFrame并进行预处理
-        df = pd.DataFrame([item.dict() for item in request.data])
         df["datetime"] = pd.to_datetime(df["datetime"])
         df = df.sort_values(by="datetime").rename(columns={"datetime": "index"})
+        logger.info(f"数据预处理完成 - 时间范围: {df['index'].min()} 到 {df['index'].max()}")
         
-        if DETAILED_LOGS:
-            logger.info(f"数据预处理完成 - 形状: {df.shape}")
+        # 执行预测(predict内部会计算predict_start_time)
+        predictions = predictor.predict(df)
         
-        # 调用模型进行预测
-        logger.info("开始模型预测...")
-        res = xishan_predict.predict(df)
-        logger.info(f"模型预测完成 - 结果形状: {res.shape}")
+        # 保存预测结果(使用predict内部计算的test_start_date)
+        result_df = predictor.save_predictions(predictions)
         
-        predict_start_time = (df['index'].max() + datetime.timedelta(minutes=4)).strftime("%Y-%m-%d %H:%M:%S")
-        predict_result_df = xishan_predict.save_predictions(res, start_date=predict_start_time)
+        # 格式化输出
+        result_df["index"] = result_df["index"].apply(lambda x: x.strftime("%Y-%m-%d %H:%M:%S"))
+        result_df = result_df.rename(columns={"index": "datetime"})
+        predict_result = result_df.to_dict(orient="records")
         
-        # 格式化预测结果以符合API输出
-        predict_result_df["index"] = predict_result_df["index"].apply(lambda x: x.strftime("%Y-%m-%d %H:%M:%S"))
-        predict_result_df = predict_result_df.rename(columns={"index": "datetime"})
-        predict_result = predict_result_df.to_dict(orient="records")
-        
-        # 精简的输出日志
-        logger.info(
-            f"预测完成 - 预测点: {len(predict_result)}, 时间范围: {predict_result[0]['datetime']} 到 {predict_result[-1]['datetime']}")
+        logger.info(f"预测完成 - 预测点数: {len(predict_result)}, 时间范围: {predict_result[0]['datetime']} 到 {predict_result[-1]['datetime']}")
         
         return {"success": True, "predict_result": predict_result}
+    
     except Exception as e:
-        logger.error("处理 'double_membrance' 请求时发生错误:", exc_info=True)
+        logger.error(f"预测失败: {str(e)}", exc_info=True)
         raise HTTPException(status_code=500, detail=str(e))
 
 
 @app.get(
     "/api/v1/process_model/test_double_membrance_from_file",
-    response_model=SuccessResponse,
-    summary="从本地文件测试双膜环境体模型预测",
-    tags=["模型处理-测试"]
+    response_model=PredictResponse,
+    summary="测试预测(从本地文件)",
+    tags=["测试"]
 )
 def test_double_membrance_from_file():
-    """
-    从本地JSON文件加载模拟数据,用于测试环境体模型预测,无需调用接口传递数据。
-    """
+    """从本地JSON文件加载测试数据进行预测"""
     try:
-        base_dir = os.path.dirname(os.path.abspath(__file__))
+        import json
         file_path = os.path.join(base_dir, "test_files", "pp.json")
-        
-        logger.info(f"开始本地文件测试 - 文件: {file_path}")
+        logger.info(f"开始测试 - 文件: {file_path}")
         
         with open(file_path, 'r', encoding='utf-8') as f:
             request_data = json.load(f)
         
-        if "data" not in request_data:
-            raise HTTPException(status_code=400, detail=f"JSON文件 {file_path} 中缺少 'data' 键")
+        if "data" not in request_data or not request_data["data"]:
+            raise HTTPException(status_code=400, detail="测试文件数据无效")
         
-        input_data = request_data["data"]
-        if not input_data:
-            raise HTTPException(status_code=400, detail="JSON文件中的 'data' 列表不能为空")
+        # 转换为DataFrame
+        df = pd.DataFrame(request_data["data"])
+        logger.info(f"测试数据加载完成 - shape: {df.shape}")
         
-        # 详细日志仅在调试模式下记录
-        if DETAILED_LOGS:
-            logger.info("测试数据结构分析:")
-            logger.info(f"  - 数据点数量: {len(input_data)}")
-            logger.info(f"  - 时间范围: {input_data[0]['datetime']} 到 {input_data[-1]['datetime']}")
-            
-            sample_data = input_data[0]
-            feature_count = len([k for k in sample_data.keys() if k != 'datetime'])
-            logger.info(f"  - 特征数量: {feature_count}")
-        
-        # 后续逻辑与 get_double_membrance_model 相同
-        df = pd.DataFrame(input_data)
         df["datetime"] = pd.to_datetime(df["datetime"])
         df = df.sort_values(by="datetime").rename(columns={"datetime": "index"})
+        logger.info(f"测试数据预处理完成 - 时间范围: {df['index'].min()} 到 {df['index'].max()}")
         
-        if DETAILED_LOGS:
-            logger.info(f"测试数据预处理完成 - 形状: {df.shape}")
-        
-        logger.info("开始测试模型预测...")
-        res = xishan_predict.predict(df)
-        logger.info(f"测试预测完成 - 结果形状: {res.shape}")
-        
-        predict_start_time = (df['index'].max() + datetime.timedelta(minutes=4)).strftime("%Y-%m-%d %H:%M:%S")
-        predict_result_df = xishan_predict.save_predictions(res, start_date=predict_start_time)
+        # 执行预测
+        predictions = predictor.predict(df)
+        result_df = predictor.save_predictions(predictions)
         
-        predict_result_df["index"] = predict_result_df["index"].apply(lambda x: x.strftime("%Y-%m-%d %H:%M:%S"))
-        predict_result_df = predict_result_df.rename(columns={"index": "datetime"})
-        predict_result = predict_result_df.to_dict(orient="records")
+        # 格式化输出
+        result_df["index"] = result_df["index"].apply(lambda x: x.strftime("%Y-%m-%d %H:%M:%S"))
+        result_df = result_df.rename(columns={"index": "datetime"})
+        predict_result = result_df.to_dict(orient="records")
         
-        logger.info(
-            f"测试完成 - 预测点: {len(predict_result)}, 时间范围: {predict_result[0]['datetime']} 到 {predict_result[-1]['datetime']}")
+        logger.info(f"测试完成 - 预测点数: {len(predict_result)}")
         return {"success": True, "predict_result": predict_result}
     
     except FileNotFoundError:
-        logger.error(f"测试文件未找到: {file_path}")
         raise HTTPException(status_code=404, detail=f"测试文件未找到: {file_path}")
-    except json.JSONDecodeError:
-        logger.error(f"无法解析JSON文件: {file_path}")
-        raise HTTPException(status_code=400, detail=f"无法解析JSON文件,请检查格式: {file_path}")
     except Exception as e:
-        logger.error("处理本地文件测试请求时发生未知错误:", exc_info=True)
+        logger.error(f"测试失败: {str(e)}", exc_info=True)
         raise HTTPException(status_code=500, detail=str(e))
 
 
 @app.get("/", include_in_schema=False)
 def root():
-    """根路径,提供API文档链接。"""
-    return {"message": "欢迎使用GAT-LSTM TMP预测 API. 请访问 /docs 查看 API 文档."}
+    return {"message": "GAT-LSTM TMP预测服务运行中,访问 /docs 查看API文档"}
 
 
-# --- 服务启动入口 ---
 if __name__ == "__main__":
     uvicorn.run("api_main:app", host="0.0.0.0", port=7980, reload=False)
 

+ 2 - 0
models/pressure-predictor/gat-lstm_model/requirements.txt

@@ -29,3 +29,5 @@ pyyaml>=6.0
 # torch-cu117  # CUDA 11.7
 # torch-cu118  # CUDA 11.8
 
+
+