import os import torch import pandas as pd import numpy as np import joblib from datetime import datetime, timedelta from torch.utils.data import DataLoader, TensorDataset from gat_lstm import GAT_LSTM # 导入自定义的GAT-LSTM模型 from scipy.signal import savgol_filter # Savitzky-Golay滤波工具 from sklearn.preprocessing import MinMaxScaler # 数据标准化工具 def set_seed(seed): """设置随机种子,保证实验可复现性""" import random random.seed(seed) os.environ['PYTHONHASHSEED'] = str(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False class Predictor: """预测器类,封装了数据处理、模型加载、预测和结果保存的完整流程""" def __init__(self): # 模型和数据相关参数 self.seq_len = 360 # 输入序列长度 self.output_size = 180 # 预测输出长度 self.labels_num = 8 # 预测目标特征数量 self.feature_num = 16 # 输入特征总数量 self.step_size = 180 # 滑动窗口步长 self.dropout = 0 # 模型dropout参数 self.lr = 0.01 # 学习率 self.hidden_size = 64 # LSTM隐藏层大小 self.batch_size = 128 # 批处理大小 self.num_layers = 1 # LSTM层数 self.resolution = 5400 # 数据时间分辨率(单位:秒) self.test_start_date = '2025-09-24' # 预测起始日期(动态更新) self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") self.model_path = '90day_model.pth' # 模型权重路径(可外部修改) self.output_csv_path = '90day_predictions.csv' # 结果保存路径(可外部修改) self.random_seed = 1314 # 随机种子 # 预测结果平滑参数 self.smooth_window = 30 # 滑动平均窗口大小 self.ema_alpha = 0.1 # 指数移动平均系数(权重) self.use_savitzky = True # 是否使用Savitzky-Golay滤波 self.sg_window = 25 # Savitzky-Golay窗口大小 self.sg_polyorder = 2 # Savitzky-Golay多项式阶数 # 初始化设置 set_seed(self.random_seed) # 设置随机种子 self.scaler = joblib.load('90day_scaler.pkl') # 加载标准化器(确保文件存在) self.model = None self.edge_index = None self.test_loader = None def reorder_columns(self, df): """ 调整DataFrame列顺序,确保与模型训练时的特征顺序一致 (特征顺序对模型输入至关重要,必须与训练时保持一致) """ desired_order = [ 'index', # 时间索引列 'C.M.RO1_FT_JS@out','C.M.RO2_FT_JS@out','C.M.RO3_FT_JS@out','C.M.RO4_FT_JS@out', 'C.M.RO_TT_ZJS@out','C.M.RO_Cond_ZJS@out', 'C.M.RO1_DB@DPT_1','C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_1','C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_1','C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_1','C.M.RO4_DB@DPT_2', ] return df.loc[:, desired_order] def process_date(self, data): """ 处理日期特征,生成周期性时间编码(年周期) 将时间特征转换为正弦/余弦编码,捕捉周期性规律(如季节变化) """ if 'index' in data.columns: data = data.rename(columns={'index': 'date'}) data['date'] = pd.to_datetime(data['date']) data['day_of_year'] = data['date'].dt.dayofyear # 生成正弦/余弦编码(周期为366天,适应闰年) data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366) data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366) data.drop(columns=['day_of_year'], inplace=True) # 调整列顺序:日期 + 时间特征 + 其他特征 time_features = ['day_year_sin', 'day_year_cos'] other_columns = [col for col in data.columns if col not in ['date'] + time_features] return data[['date'] + time_features + other_columns] def scaler_data(self, data): """ 使用预训练的标准化器对数据进行标准化(保留date列不处理) 标准化是为了让不同量级的特征在模型中权重均衡 """ date_col = data[['date']] # 提取日期列(不参与标准化) data_to_scale = data.drop(columns=['date']) scaled = self.scaler.transform(data_to_scale) scaled_df = pd.DataFrame(scaled, columns=data_to_scale.columns) return pd.concat([date_col.reset_index(drop=True), scaled_df], axis=1) # 拼接日期列和标准化后的数据 def create_test_loader(self, df): """ 将预处理后的DataFrame转换为模型输入的测试数据加载器 生成符合模型要求的张量格式([样本数, 序列长度, 特征数]) """ if 'date' in df.columns: test_data = df.drop(columns=['date']).values else: test_data = df.values # 重塑为LSTM输入格式:[样本数, 序列长度, 特征数] X = test_data.reshape(-1, self.seq_len, self.feature_num) X = torch.tensor(X, dtype=torch.float32).to(self.device) tensor_dataset = TensorDataset(X) # 创建数据集(仅输入,无标签) # 创建数据加载器(不打乱顺序,按批次加载) return DataLoader(tensor_dataset, batch_size=self.batch_size, shuffle=False) def load_data(self, df): """数据加载与预处理统一接口,依次执行列重排、日期处理、标准化和生成数据加载器""" df = self.reorder_columns(df) # 调整列顺序 df = self.process_date(df) # 处理日期特征 df = self.scaler_data(df) # 标准化数据 self.test_loader = self.create_test_loader(df) def load_model(self): """加载预训练模型并设置为评估模式(关闭dropout等训练特有层)""" self.model = GAT_LSTM(self).to(self.device) # 加载模型权重(map_location确保在指定设备加载,weights_only=True提高安全性) self.model.load_state_dict(torch.load(self.model_path, map_location=self.device, weights_only=True)) self.model.eval() def moving_average_smooth(self, data): """ 滑动平均平滑处理:对每个特征单独做滑动平均,减少高频噪声 采用边缘填充避免边界效应 """ smoothed = [] for i in range(data.shape[1]): feature = data[:, i] # 边缘填充:用边缘值填充窗口外的部分,避免边界数据失真 padded = np.pad(feature, (self.smooth_window//2, self.smooth_window//2), mode='edge') window = np.ones(self.smooth_window) / self.smooth_window # 平均窗口权重 smoothed_feature = np.convolve(padded, window, mode='valid') # 卷积计算滑动平均 smoothed.append(smoothed_feature.reshape(-1, 1)) # 保留维度并收集结果 return np.concatenate(smoothed, axis=1) # 拼接所有特征 def exponential_smooth(self, data): """ 指数移动平均平滑:对每个特征做指数加权平均,近期数据权重更高 相比简单滑动平均更关注近期趋势 """ smoothed = [] for i in range(data.shape[1]): # 遍历每个特征 feature = data[:, i] smoothed_feature = np.zeros_like(feature) smoothed_feature[0] = feature[0] for t in range(1, len(feature)): smoothed_feature[t] = self.ema_alpha * feature[t] + (1 - self.ema_alpha) * smoothed_feature[t-1] smoothed.append(smoothed_feature.reshape(-1, 1)) return np.concatenate(smoothed, axis=1) def savitzky_golay_smooth(self, data): """ Savitzky-Golay滤波:基于多项式拟合的滑动窗口滤波,保留趋势的同时降噪 窗口大小需为奇数,若数据长度不足则调整窗口 """ smoothed = [] for i in range(data.shape[1]): feature = data[:, i] # 确保窗口为奇数且不超过数据长度 window = min(self.sg_window, len(feature) if len(feature) % 2 == 1 else len(feature)-1) if window < 3: # 窗口过小则不滤波(至少需要3个点拟合2阶多项式) smoothed.append(feature.reshape(-1, 1)) continue # 应用Savitzky-Golay滤波 smoothed_feature = savgol_filter(feature, window_length=window, polyorder=self.sg_polyorder) smoothed.append(smoothed_feature.reshape(-1, 1)) return np.concatenate(smoothed, axis=1) def smooth_predictions(self, predictions): """ 组合多步平滑策略处理预测结果:先滑动平均,再指数平滑,最后可选Savitzky-Golay滤波 多层平滑进一步降低噪声,使预测曲线更平滑 """ smoothed = self.moving_average_smooth(predictions) smoothed = self.exponential_smooth(smoothed) if self.use_savitzky and len(predictions) >= self.sg_window: smoothed = self.savitzky_golay_smooth(smoothed) return smoothed def predict(self, df): """ 核心预测接口:输入原始数据,返回处理后的预测结果 流程:更新起始时间 -> 数据预处理 -> 加载模型 -> 批量预测 -> 反标准化 -> 平滑处理 """ # 预测起始时间为输入数据的最大时间+3小时(根据业务需求设定) self.test_start_date = (pd.to_datetime(df['index']).max() + timedelta(hours=3)).strftime("%Y-%m-%d %H:%M:%S") self.load_data(df) self.load_model() all_predictions = [] with torch.no_grad(): for batch in self.test_loader: inputs = batch[0].to(self.device) outputs = self.model(inputs) all_predictions.append(outputs.cpu().numpy()) # 结果移回CPU并转为numpy # 拼接所有批次结果并重塑为[样本数, 目标特征数] predictions = np.concatenate(all_predictions, axis=0).reshape(-1, self.labels_num) # 反标准化处理 inverse_scaler = MinMaxScaler() # 复用训练时的标准化参数(仅使用目标特征对应的参数) inverse_scaler.min_ = self.scaler.min_[-self.labels_num:] inverse_scaler.scale_ = self.scaler.scale_[-self.labels_num:] predictions = inverse_scaler.inverse_transform(predictions) predictions = np.clip(predictions, 0, None) # 平滑处理 predictions = self.smooth_predictions(predictions) return predictions def save_predictions(self, predictions): """ 保存预测结果到CSV文件,包含时间戳和各目标特征的预测值 时间戳根据起始时间和数据分辨率生成 """ # 解析预测起始时间 start_time = datetime.strptime(self.test_start_date, "%Y-%m-%d %H:%M:%S") # 计算时间间隔(根据分辨率转换为小时) time_interval = pd.Timedelta(hours=(self.resolution / 60)) # 生成所有预测时间戳 timestamps = [start_time + i * time_interval for i in range(len(predictions))] # 定义目标特征列名(与训练时一致) base_columns = [ '1', 'C.M.RO2_DB@DPT_1', 'C.M.RO3_DB@DPT_1', 'C.M.RO4_DB@DPT_1', 'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_2', ] pred_columns = [f'{col}_pred' for col in base_columns] df_result = pd.DataFrame(predictions, columns=pred_columns) df_result.insert(0, 'date', timestamps) df_result.to_csv(self.output_csv_path, index=False) print(f"预测结果保存至:{self.output_csv_path}")