""" 20分钟TMP预测模型 版本:1.0 最后更新:2025-10-28 """ import os import sys import torch import pandas as pd import numpy as np import joblib import pywt from datetime import datetime, timedelta from torch.utils.data import DataLoader, TensorDataset from tqdm import tqdm # 添加父目录到系统路径以导入shared模块 current_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(current_dir) if parent_dir not in sys.path: sys.path.insert(0, parent_dir) # 从shared目录导入GAT-LSTM模型 sys.path.insert(0, os.path.join(parent_dir, 'shared')) from gat_lstm import GAT_LSTM # 尝试导入common模块,如果失败则使用标准库 try: project_root = os.path.abspath(os.path.join(parent_dir, '../..')) if project_root not in sys.path: sys.path.insert(0, project_root) from common.utils.logger import setup_logger, log_execution_time from common.utils.config import Config except ImportError: # 使用标准库作为fallback import logging import yaml from functools import wraps import time def setup_logger(name, level='INFO', log_file=None, format_type='colored', max_bytes=10485760, backup_count=5): """logger设置""" logger = logging.getLogger(name) logger.setLevel(getattr(logging, level)) # 避免重复添加handler if logger.handlers: return logger formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler) # 文件处理器 if log_file: from logging.handlers import RotatingFileHandler file_handler = RotatingFileHandler(log_file, maxBytes=max_bytes, backupCount=backup_count) file_handler.setFormatter(formatter) logger.addHandler(file_handler) # 防止日志传播到root logger logger.propagate = False return logger def log_execution_time(func): """简化版执行时间装饰器""" @wraps(func) def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() if hasattr(args[0], 'logger'): args[0].logger.info(f"{func.__name__} 执行时间: {end_time - start_time:.2f}秒") return result return wrapper class Config: """配置类""" def __init__(self, config_file): with open(config_file, 'r', encoding='utf-8') as f: self.config = yaml.safe_load(f) def get(self, key, default=None): keys = key.split('.') value = self.config for k in keys: if isinstance(value, dict): value = value.get(k) else: return default if value is None: return default return value def set_seed(seed): """ 设置全局随机种子,保证实验可重复性 Args: seed: 随机种子值 Note: - 设置Python、NumPy、PyTorch的随机种子 - 确保CUDA操作的确定性 - 关闭CUDA的性能优化(以确保可重复性) """ import random random.seed(seed) # Python随机数生成器 os.environ['PYTHONHASHSEED'] = str(seed) # Python哈希种子 np.random.seed(seed) # NumPy随机数生成器 torch.manual_seed(seed) # PyTorch CPU随机数生成器 torch.cuda.manual_seed(seed) # 当前GPU随机数生成器 torch.cuda.manual_seed_all(seed) # 所有GPU随机数生成器 torch.backends.cudnn.deterministic = True # 确保CUDA操作确定性 torch.backends.cudnn.benchmark = False # 关闭CUDA性能优化 class Predictor: """ TMP预测器类 功能: - 加载并预处理输入数据 - 加载训练好的GAT-LSTM模型 - 执行预测并保存结果 使用示例: predictor = Predictor() predictions = predictor.predict(df) result_df = predictor.save_predictions(predictions, start_date) """ def __init__(self, config_path='../config.yaml'): """ 初始化预测器 Args: config_path: 配置文件路径,相对于gat-lstm_model根目录 Raises: FileNotFoundError: 配置文件或模型文件不存在 Note: - 从配置文件加载所有参数 - 自动检测并使用GPU(如果可用) - 加载训练时保存的数据归一化器 """ # 加载配置文件(指向父目录的config.yaml) current_dir = os.path.dirname(__file__) parent_dir = os.path.dirname(current_dir) config_file = os.path.join(parent_dir, 'config.yaml') self.config = Config(config_file) # 设置日志目录(在gat-lstm_model根目录的logs下) log_dir = os.path.join(parent_dir, 'logs') os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, '20min_predict.log') self.logger = setup_logger( name='20min_predict', level=self.config.get('logging.level', 'INFO'), log_file=log_file, format_type=self.config.get('logging.format', 'colored'), max_bytes=self.config.get('logging.max_bytes', 10*1024*1024), backup_count=self.config.get('logging.backup_count', 5) ) self.logger.info("初始化20分钟TMP预测器") # 模型参数(从配置文件加载) self.seq_len = self.config.get('model.seq_len', 10) self.output_size = self.config.get('model.output_size', 5) self.labels_num = self.config.get('model.labels_num', 16) self.feature_num = self.config.get('model.feature_num', 79) self.step_size = self.config.get('model.step_size', 5) self.dropout = self.config.get('model.dropout', 0) self.lr = self.config.get('model.lr', 0.01) self.num_heads = self.config.get('model.num_heads', 8) self.hidden_size = self.config.get('model.hidden_size', 64) self.batch_size = self.config.get('model.batch_size', 512) self.num_layers = self.config.get('model.num_layers', 1) self.random_seed = self.config.get('model.random_seed', 1314) # 数据处理参数 self.resolution = self.config.get('data.resolution', 60) self.test_start_date = self.config.get('data.test_start_date', '2025-07-01') self.wavelet = self.config.get('data.wavelet.type', 'db4') self.level = self.config.get('data.wavelet.level', 3) self.level_after = self.config.get('data.wavelet.level_after', 4) self.mode = self.config.get('data.wavelet.mode', 'soft') # 阈值参数 self.uf_threshold = self.config.get('data.threshold.uf', 0.001) self.ro_threshold = self.config.get('data.threshold.ro', 0.01) self.flow_threshold = self.config.get('data.threshold.flow', 1.0) # 文件路径(相对于20min目录) self.model_path = os.path.join(current_dir, '20min_model.pth') self.scaler_path = os.path.join(current_dir, '20min_scaler.pkl') self.edge_index_path = os.path.join(parent_dir, 'shared', 'edge_index.pt') self.output_csv_path = os.path.join(current_dir, '20min_predictions.csv') # 后处理参数 self.remove_outliers_flag = self.config.get('postprocess.remove_outliers', False) self.smooth_flag = self.config.get('postprocess.smooth', False) # 设备配置 use_cuda = self.config.get('device.use_cuda', True) cuda_device = self.config.get('device.cuda_device', 0) if use_cuda and torch.cuda.is_available(): self.device = torch.device(f"cuda:{cuda_device}") self.logger.info(f"使用设备: GPU-{torch.cuda.get_device_name(cuda_device)}") else: self.device = torch.device("cpu") self.logger.info("使用设备: CPU") set_seed(self.random_seed) # 加载数据归一化器 if not os.path.exists(self.scaler_path): self.logger.error(f"归一化器文件不存在: {self.scaler_path}") raise FileNotFoundError(f"归一化器文件不存在: {self.scaler_path}") self.scaler = joblib.load(self.scaler_path) # 初始化模型和数据加载器(后续加载) self.model = None self.edge_index = None self.test_loader = None self.logger.info("预测器初始化完成") def reorder_columns(self, df): """调整数据列顺序,确保与训练时的特征顺序一致""" desired_order = [ 'index', 'C.M.FT_ZGJJY1@out','C.M.RO1_FT_JS@out','C.M.RO2_FT_JS@out','C.M.RO3_FT_JS@out', 'C.M.RO4_FT_JS@out','C.M.UF1_FT_JS@out','C.M.UF2_FT_JS@out','C.M.UF3_FT_JS@out', 'C.M.UF4_FT_JS@out','C.M.UF_FT_ZCS@out','C.M.FT_ZGJJY2@out','C.M.FT_ZGJJY3@out', 'C.M.FT_ZGJJY4@out','C.M.RO1_PT_JS@out','C.M.RO2_PT_JS@out','C.M.RO3_PT_JS@out', 'C.M.UF1_PT_JS@out','C.M.UF2_PT_JS@out','C.M.UF3_PT_JS@out','C.M.UF4_PT_JS@out', 'C.M.LT_JSC@out','C.M.RO1_PT_CS@out','C.M.RO1_PT_DJ2@out','C.M.RO2_PT_CS@out', 'C.M.RO2_PT_DJ2@out','C.M.RO3_PT_CS@out','C.M.RO3_PT_DJ2@out','C.M.RO4_PT_CS@out', 'C.M.RO4_PT_DJ2@out','C.M.RO4_PT_JS@out','C.M.LT_HCl@out','C.M.LT_NaClO@out', 'C.M.LT_PAC@out','C.M.LT_QSC@out','C.M.RO_Cond_ZCS@out','C.M.RO_TT_ZJS@out', 'C.M.UF1_JSF_kd@out','C.M.UF2_JSF_kd@out','C.M.UF_GSB4_fre@out','C.M.UF_ORP_ZCS@out', 'C.M.JYB2_ZGJ1_fre@out','C.M.JYB2_ZGJ2_fre@out','C.M.JYB2_ZGJ3_fre@out','C.M.JYB2_ZGJ4_fre@out', 'C.M.RO1_GYB_fre@out','C.M.RO2_GYB_fre@out','C.M.RO3_GYB_fre@out','C.M.RO4_GYB_fre@out', 'C.M.UF3_JSF_kd@out','C.M.UF4_JSF_kd@out','C.M.UF_FXB2_fre@out','C.M.RO1_DJB_fre@out', 'C.M.RO1_GYBF_kd@out','C.M.RO2_DJB_fre@out','C.M.RO2_GYBF_kd@out','C.M.RO3_DJB_fre@out', 'C.M.RO3_GYBF_kd@out','C.M.RO4_DJB_fre@out','C.M.RO4_GYBF_kd@out', 'C.M.UF1_DB@press_PV','C.M.UF2_DB@press_PV','C.M.UF3_DB@press_PV','C.M.UF4_DB@press_PV', 'UF1Per','UF2Per','UF3Per','UF4Per', 'C.M.RO1_DB@DPT_1','C.M.RO2_DB@DPT_1','C.M.RO3_DB@DPT_1','C.M.RO4_DB@DPT_1', 'C.M.RO1_DB@DPT_2','C.M.RO2_DB@DPT_2','C.M.RO3_DB@DPT_2','C.M.RO4_DB@DPT_2', ] return df.loc[:, desired_order] def process_date(self, data): """处理日期列,生成周期性时间特征""" if 'index' in data.columns: data = data.rename(columns={'index': 'date'}) data['date'] = pd.to_datetime(data['date']) data['minute_of_day'] = data['date'].dt.hour * 60 + data['date'].dt.minute data['day_of_year'] = data['date'].dt.dayofyear # 周期性编码 data['minute_sin'] = np.sin(2 * np.pi * data['minute_of_day'] / 1440) data['minute_cos'] = np.cos(2 * np.pi * data['minute_of_day'] / 1440) data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366) data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366) data.drop(columns=['minute_of_day', 'day_of_year'], inplace=True) # 调整列顺序 time_features = ['minute_sin', 'minute_cos', 'day_year_sin', 'day_year_cos'] other_columns = [col for col in data.columns if col not in ['date'] + time_features] return data[['date'] + time_features + other_columns] def scaler_data(self, data): """对数据进行归一化处理""" date_col = data[['date']] data_to_scale = data.drop(columns=['date']) scaled = self.scaler.transform(data_to_scale) scaled_df = pd.DataFrame(scaled, columns=data_to_scale.columns) result = pd.concat([date_col.reset_index(drop=True), scaled_df], axis=1) return result def remove_outliers(self, predictions): """使用四分位法处理预测结果中的异常值""" cleaned = predictions.copy() for col in range(cleaned.shape[1]): values = cleaned[:, col] q1 = np.percentile(values, 25) q3 = np.percentile(values, 75) iqr = q3 - q1 lower_bound = q1 - 1.5 * iqr upper_bound = q3 + 1.5 * iqr normal_values = values[(values >= lower_bound) & (values <= upper_bound)] if len(normal_values) > 0: mean_normal = np.mean(normal_values) cleaned[(values < lower_bound) | (values > upper_bound), col] = mean_normal return cleaned def smooth_predictions(self, predictions): """对预测结果进行加权平滑处理""" smoothed = predictions.copy() n_timesteps = predictions.shape[0] if n_timesteps <= 1: return smoothed for col in range(predictions.shape[1]): values = predictions[:, col] smoothed[0, col] = (2 * values[0] + values[1]) / 3 for i in range(1, n_timesteps - 1): smoothed[i, col] = (values[i-1] + 2 * values[i] + values[i+1]) / 4 smoothed[-1, col] = (values[-2] + 2 * values[-1]) / 3 return smoothed def create_test_loader(self, df): """构建测试数据加载器""" df['date'] = pd.to_datetime(df['date']) time_interval = pd.Timedelta(minutes=(4 * self.resolution / 60)) window_time_span = time_interval * (self.seq_len + 20) adjusted_test_start = pd.to_datetime(self.test_start_date) - window_time_span test_df = df[df['date'] >= adjusted_test_start].reset_index(drop=True) test_df = test_df.drop(columns=['date']) # 构建监督学习数据集 feature_columns = test_df.columns.tolist() cols = [] for col in feature_columns: for i in range(self.seq_len - 1, -1, -1): cols.append(test_df[[col]].shift(i)) for i in range(1, self.output_size + 1): for col in feature_columns[-self.labels_num:]: cols.append(test_df[[col]].shift(-i)) dataset = pd.concat(cols, axis=1).iloc[::self.step_size] dataset = dataset.iloc[[-1]] n_features_total = self.feature_num * self.seq_len supervised_data = dataset.iloc[:, :n_features_total] X = supervised_data.values.reshape(-1, self.seq_len, self.feature_num) X = torch.tensor(X, dtype=torch.float32).to(self.device) tensor_dataset = TensorDataset(X) loader = DataLoader(tensor_dataset, batch_size=self.batch_size, shuffle=False) return loader @log_execution_time def load_data(self, df): """数据加载和预处理""" self.logger.info(f"[数据加载] 原始形状: {df.shape}, 列数: {len(df.columns)}") try: df = self.reorder_columns(df) self.logger.info(f"[列重排] 完成") except Exception as e: self.logger.error(f"[列重排] 失败: {e}") raise df = df.iloc[::self.resolution, :].reset_index(drop=True) self.logger.info(f"[下采样] 采样率={self.resolution}, 采样后形状: {df.shape}") try: df = self.process_date(df) self.logger.info(f"[时间特征] 生成完成") except Exception as e: self.logger.error(f"[时间特征] 生成失败: {e}") raise try: df = self.scaler_data(df) self.logger.info(f"[归一化] 完成") except Exception as e: self.logger.error(f"[归一化] 失败: {e}") raise try: self.test_loader = self.create_test_loader(df) self.logger.info(f"[数据加载器] 创建完成") except Exception as e: self.logger.error(f"[数据加载器] 创建失败: {e}") raise if not os.path.exists(self.edge_index_path): self.logger.error(f"[图结构] 边索引文件不存在: {self.edge_index_path}") raise FileNotFoundError(f"图边索引文件不存在: {self.edge_index_path}") self.edge_index = torch.load(self.edge_index_path, map_location=self.device, weights_only=True) self.logger.info(f"[图结构] 边索引加载完成, shape: {self.edge_index.shape}") @log_execution_time def load_model(self): """加载模型和预训练权重""" if not os.path.exists(self.model_path): self.logger.error(f"[模型加载] 文件不存在: {self.model_path}") raise FileNotFoundError(f"模型文件不存在: {self.model_path}") try: self.logger.info("[模型加载] 初始化模型结构") self.model = GAT_LSTM(self).to(self.device) if self.edge_index is not None: self.model.set_edge_index(self.edge_index.to(self.device)) self.logger.info("[模型加载] 图结构边索引设置完成") self.model.load_state_dict(torch.load(self.model_path, map_location=self.device, weights_only=True)) self.model.eval() total_params = sum(p.numel() for p in self.model.parameters()) self.logger.info(f"[模型加载] 完成 - 参数量: {total_params:,}") except Exception as e: self.logger.error(f"[模型加载] 失败: {e}") raise @log_execution_time def predict(self, df): """执行预测""" self.logger.info("[预测流程] 开始") try: # 更新测试起始时间 latest_time = pd.to_datetime(df['index']).max() self.test_start_date = (latest_time + timedelta(minutes=4)).strftime("%Y-%m-%d %H:%M:%S") self.logger.info(f"[预测时间] 输入数据最新时间: {latest_time}, 预测起始时间: {self.test_start_date}") except Exception as e: self.logger.error(f"[预测时间] 计算失败: {e}") raise # 加载和预处理数据 self.load_data(df) # 加载模型 self.load_model() # 执行推理 try: self.logger.info("[模型推理] 开始") all_predictions = [] batch_count = 0 with torch.no_grad(): for batch in self.test_loader: inputs = batch[0].to(self.device) outputs = self.model(inputs) all_predictions.append(outputs.cpu().numpy()) batch_count += 1 self.logger.info(f"[模型推理] 完成 - 批次数: {batch_count}") predictions = np.concatenate(all_predictions, axis=0).reshape(-1, self.labels_num) self.logger.info(f"[模型推理] 原始预测形状: {predictions.shape}") except Exception as e: self.logger.error(f"[模型推理] 失败: {e}") raise # 反归一化 try: self.logger.info("[反归一化] 开始") from sklearn.preprocessing import MinMaxScaler inverse_scaler = MinMaxScaler() inverse_scaler.min_ = self.scaler.min_[-self.labels_num:] inverse_scaler.scale_ = self.scaler.scale_[-self.labels_num:] predictions = inverse_scaler.inverse_transform(predictions) self.logger.info(f"[反归一化] 完成 - 值域: [{predictions.min():.2f}, {predictions.max():.2f}]") except Exception as e: self.logger.error(f"[反归一化] 失败: {e}") raise # 可选后处理 if self.remove_outliers_flag: self.logger.info("[后处理] 执行异常值移除") predictions = self.remove_outliers(predictions) if self.smooth_flag: self.logger.info("[后处理] 执行平滑处理") predictions = self.smooth_predictions(predictions) self.logger.info(f"[预测流程] 完成 - 最终形状: {predictions.shape}, 值域: [{predictions.min():.2f}, {predictions.max():.2f}]") return predictions def save_predictions(self, predictions, start_date=None, output_path=None): """保存预测结果为CSV并返回DataFrame""" try: if start_date is None: start_date = self.test_start_date self.logger.info(f"[保存结果] 预测起始时间: {start_date}, 预测点数: {len(predictions)}") start_time = datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S") time_interval = timedelta(minutes=(4 * self.resolution / 60)) timestamps = [start_time + i * time_interval for i in range(len(predictions))] base_columns = [ 'C.M.UF1_DB@press_PV', 'C.M.UF2_DB@press_PV', 'C.M.UF3_DB@press_PV', 'C.M.UF4_DB@press_PV', 'UF1Per','UF2Per','UF3Per','UF4Per', 'C.M.RO1_DB@DPT_1', 'C.M.RO2_DB@DPT_1', 'C.M.RO3_DB@DPT_1', 'C.M.RO4_DB@DPT_1', 'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_2', ] pred_columns = [f'{col}_Predicted' for col in base_columns] df_result = pd.DataFrame(predictions, columns=pred_columns) df_result.insert(0, 'index', timestamps) save_path = output_path if output_path else self.output_csv_path df_result.to_csv(save_path, index=False) self.logger.info(f"[保存结果] 完成 - 文件: {save_path}, 时间范围: {timestamps[0]} 至 {timestamps[-1]}") return df_result except Exception as e: self.logger.error(f"[保存结果] 失败: {e}") raise if __name__ == '__main__': """主函数:执行20分钟TMP预测""" import json import os import pandas as pd try: predictor = Predictor() json_file_path = '/Users/wmy/Downloads/pp.json' if not os.path.exists(json_file_path): predictor.logger.error(f"输入文件不存在: {json_file_path}") raise FileNotFoundError(f"未找到文件: {json_file_path}") predictor.logger.info(f"读取输入文件: {json_file_path}") with open(json_file_path, 'r', encoding='utf-8') as f: json_data = json.load(f) df = pd.DataFrame(json_data) predictions = predictor.predict(df) predictor.save_predictions(predictions) predictor.logger.info("预测任务完成") except Exception as e: if 'predictor' in locals(): predictor.logger.error(f"预测失败: {str(e)}", exc_info=True) else: print(f"初始化失败: {str(e)}") raise