|
|
@@ -0,0 +1,254 @@
|
|
|
+import os
|
|
|
+import torch
|
|
|
+import pandas as pd
|
|
|
+import numpy as np
|
|
|
+import joblib
|
|
|
+from datetime import datetime, timedelta
|
|
|
+from torch.utils.data import DataLoader, TensorDataset
|
|
|
+from gat_lstm import GAT_LSTM # 导入自定义的GAT-LSTM模型
|
|
|
+from scipy.signal import savgol_filter # Savitzky-Golay滤波工具
|
|
|
+from sklearn.preprocessing import MinMaxScaler # 数据标准化工具
|
|
|
+
|
|
|
+def set_seed(seed):
|
|
|
+ """设置随机种子,保证实验可复现性"""
|
|
|
+ import random
|
|
|
+ random.seed(seed)
|
|
|
+ os.environ['PYTHONHASHSEED'] = str(seed)
|
|
|
+ np.random.seed(seed)
|
|
|
+ torch.manual_seed(seed)
|
|
|
+ torch.cuda.manual_seed(seed)
|
|
|
+ torch.cuda.manual_seed_all(seed)
|
|
|
+ torch.backends.cudnn.deterministic = True
|
|
|
+ torch.backends.cudnn.benchmark = False
|
|
|
+
|
|
|
+class Predictor:
|
|
|
+ """预测器类,封装了数据处理、模型加载、预测和结果保存的完整流程"""
|
|
|
+ def __init__(self):
|
|
|
+ # 模型和数据相关参数
|
|
|
+ self.seq_len = 360 # 输入序列长度
|
|
|
+ self.output_size = 180 # 预测输出长度
|
|
|
+ self.labels_num = 8 # 预测目标特征数量
|
|
|
+ self.feature_num = 16 # 输入特征总数量
|
|
|
+ self.step_size = 180 # 滑动窗口步长
|
|
|
+ self.dropout = 0 # 模型dropout参数
|
|
|
+ self.lr = 0.01 # 学习率
|
|
|
+ self.hidden_size = 64 # LSTM隐藏层大小
|
|
|
+ self.batch_size = 128 # 批处理大小
|
|
|
+ self.num_layers = 1 # LSTM层数
|
|
|
+ self.resolution = 5400 # 数据时间分辨率(单位:秒)
|
|
|
+ self.test_start_date = '2025-09-24' # 预测起始日期(动态更新)
|
|
|
+ self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
|
|
|
+ self.model_path = '90day_model.pth' # 模型权重路径(可外部修改)
|
|
|
+ self.output_csv_path = '90day_predictions.csv' # 结果保存路径(可外部修改)
|
|
|
+ self.random_seed = 1314 # 随机种子
|
|
|
+
|
|
|
+ # 预测结果平滑参数
|
|
|
+ self.smooth_window = 30 # 滑动平均窗口大小
|
|
|
+ self.ema_alpha = 0.1 # 指数移动平均系数(权重)
|
|
|
+ self.use_savitzky = True # 是否使用Savitzky-Golay滤波
|
|
|
+ self.sg_window = 25 # Savitzky-Golay窗口大小
|
|
|
+ self.sg_polyorder = 2 # Savitzky-Golay多项式阶数
|
|
|
+
|
|
|
+ # 初始化设置
|
|
|
+ set_seed(self.random_seed) # 设置随机种子
|
|
|
+ self.scaler = joblib.load('90day_scaler.pkl') # 加载标准化器(确保文件存在)
|
|
|
+ self.model = None
|
|
|
+ self.edge_index = None
|
|
|
+ self.test_loader = None
|
|
|
+
|
|
|
+ def reorder_columns(self, df):
|
|
|
+ """
|
|
|
+ 调整DataFrame列顺序,确保与模型训练时的特征顺序一致
|
|
|
+ (特征顺序对模型输入至关重要,必须与训练时保持一致)
|
|
|
+ """
|
|
|
+ desired_order = [
|
|
|
+ 'index', # 时间索引列
|
|
|
+ 'C.M.RO1_FT_JS@out','C.M.RO2_FT_JS@out','C.M.RO3_FT_JS@out','C.M.RO4_FT_JS@out',
|
|
|
+ 'C.M.RO_TT_ZJS@out','C.M.RO_Cond_ZJS@out',
|
|
|
+ 'C.M.RO1_DB@DPT_1','C.M.RO1_DB@DPT_2',
|
|
|
+ 'C.M.RO2_DB@DPT_1','C.M.RO2_DB@DPT_2',
|
|
|
+ 'C.M.RO3_DB@DPT_1','C.M.RO3_DB@DPT_2',
|
|
|
+ 'C.M.RO4_DB@DPT_1','C.M.RO4_DB@DPT_2',
|
|
|
+ ]
|
|
|
+ return df.loc[:, desired_order]
|
|
|
+
|
|
|
+ def process_date(self, data):
|
|
|
+ """
|
|
|
+ 处理日期特征,生成周期性时间编码(年周期)
|
|
|
+ 将时间特征转换为正弦/余弦编码,捕捉周期性规律(如季节变化)
|
|
|
+ """
|
|
|
+ if 'index' in data.columns:
|
|
|
+ data = data.rename(columns={'index': 'date'})
|
|
|
+ data['date'] = pd.to_datetime(data['date'])
|
|
|
+ data['day_of_year'] = data['date'].dt.dayofyear
|
|
|
+ # 生成正弦/余弦编码(周期为366天,适应闰年)
|
|
|
+ data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366)
|
|
|
+ data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366)
|
|
|
+ data.drop(columns=['day_of_year'], inplace=True)
|
|
|
+
|
|
|
+ # 调整列顺序:日期 + 时间特征 + 其他特征
|
|
|
+ time_features = ['day_year_sin', 'day_year_cos']
|
|
|
+ other_columns = [col for col in data.columns if col not in ['date'] + time_features]
|
|
|
+ return data[['date'] + time_features + other_columns]
|
|
|
+
|
|
|
+ def scaler_data(self, data):
|
|
|
+ """
|
|
|
+ 使用预训练的标准化器对数据进行标准化(保留date列不处理)
|
|
|
+ 标准化是为了让不同量级的特征在模型中权重均衡
|
|
|
+ """
|
|
|
+ date_col = data[['date']] # 提取日期列(不参与标准化)
|
|
|
+ data_to_scale = data.drop(columns=['date'])
|
|
|
+ scaled = self.scaler.transform(data_to_scale)
|
|
|
+ scaled_df = pd.DataFrame(scaled, columns=data_to_scale.columns)
|
|
|
+ return pd.concat([date_col.reset_index(drop=True), scaled_df], axis=1) # 拼接日期列和标准化后的数据
|
|
|
+
|
|
|
+ def create_test_loader(self, df):
|
|
|
+ """
|
|
|
+ 将预处理后的DataFrame转换为模型输入的测试数据加载器
|
|
|
+ 生成符合模型要求的张量格式([样本数, 序列长度, 特征数])
|
|
|
+ """
|
|
|
+ if 'date' in df.columns:
|
|
|
+ test_data = df.drop(columns=['date']).values
|
|
|
+ else:
|
|
|
+ test_data = df.values
|
|
|
+
|
|
|
+ # 重塑为LSTM输入格式:[样本数, 序列长度, 特征数]
|
|
|
+ X = test_data.reshape(-1, self.seq_len, self.feature_num)
|
|
|
+ X = torch.tensor(X, dtype=torch.float32).to(self.device)
|
|
|
+ tensor_dataset = TensorDataset(X) # 创建数据集(仅输入,无标签)
|
|
|
+
|
|
|
+ # 创建数据加载器(不打乱顺序,按批次加载)
|
|
|
+ return DataLoader(tensor_dataset, batch_size=self.batch_size, shuffle=False)
|
|
|
+
|
|
|
+ def load_data(self, df):
|
|
|
+ """数据加载与预处理统一接口,依次执行列重排、日期处理、标准化和生成数据加载器"""
|
|
|
+ df = self.reorder_columns(df) # 调整列顺序
|
|
|
+ df = self.process_date(df) # 处理日期特征
|
|
|
+ df = self.scaler_data(df) # 标准化数据
|
|
|
+ self.test_loader = self.create_test_loader(df)
|
|
|
+
|
|
|
+ def load_model(self):
|
|
|
+ """加载预训练模型并设置为评估模式(关闭dropout等训练特有层)"""
|
|
|
+ self.model = GAT_LSTM(self).to(self.device)
|
|
|
+ # 加载模型权重(map_location确保在指定设备加载,weights_only=True提高安全性)
|
|
|
+ self.model.load_state_dict(torch.load(self.model_path, map_location=self.device, weights_only=True))
|
|
|
+ self.model.eval()
|
|
|
+
|
|
|
+ def moving_average_smooth(self, data):
|
|
|
+ """
|
|
|
+ 滑动平均平滑处理:对每个特征单独做滑动平均,减少高频噪声
|
|
|
+ 采用边缘填充避免边界效应
|
|
|
+ """
|
|
|
+ smoothed = []
|
|
|
+ for i in range(data.shape[1]):
|
|
|
+ feature = data[:, i]
|
|
|
+
|
|
|
+ # 边缘填充:用边缘值填充窗口外的部分,避免边界数据失真
|
|
|
+ padded = np.pad(feature, (self.smooth_window//2, self.smooth_window//2), mode='edge')
|
|
|
+ window = np.ones(self.smooth_window) / self.smooth_window # 平均窗口权重
|
|
|
+ smoothed_feature = np.convolve(padded, window, mode='valid') # 卷积计算滑动平均
|
|
|
+ smoothed.append(smoothed_feature.reshape(-1, 1)) # 保留维度并收集结果
|
|
|
+ return np.concatenate(smoothed, axis=1) # 拼接所有特征
|
|
|
+
|
|
|
+ def exponential_smooth(self, data):
|
|
|
+ """
|
|
|
+ 指数移动平均平滑:对每个特征做指数加权平均,近期数据权重更高
|
|
|
+ 相比简单滑动平均更关注近期趋势
|
|
|
+ """
|
|
|
+ smoothed = []
|
|
|
+ for i in range(data.shape[1]): # 遍历每个特征
|
|
|
+ feature = data[:, i]
|
|
|
+ smoothed_feature = np.zeros_like(feature)
|
|
|
+ smoothed_feature[0] = feature[0]
|
|
|
+ for t in range(1, len(feature)):
|
|
|
+ smoothed_feature[t] = self.ema_alpha * feature[t] + (1 - self.ema_alpha) * smoothed_feature[t-1]
|
|
|
+ smoothed.append(smoothed_feature.reshape(-1, 1))
|
|
|
+ return np.concatenate(smoothed, axis=1)
|
|
|
+
|
|
|
+ def savitzky_golay_smooth(self, data):
|
|
|
+ """
|
|
|
+ Savitzky-Golay滤波:基于多项式拟合的滑动窗口滤波,保留趋势的同时降噪
|
|
|
+ 窗口大小需为奇数,若数据长度不足则调整窗口
|
|
|
+ """
|
|
|
+ smoothed = []
|
|
|
+ for i in range(data.shape[1]):
|
|
|
+ feature = data[:, i]
|
|
|
+ # 确保窗口为奇数且不超过数据长度
|
|
|
+ window = min(self.sg_window, len(feature) if len(feature) % 2 == 1 else len(feature)-1)
|
|
|
+ if window < 3: # 窗口过小则不滤波(至少需要3个点拟合2阶多项式)
|
|
|
+ smoothed.append(feature.reshape(-1, 1))
|
|
|
+ continue
|
|
|
+ # 应用Savitzky-Golay滤波
|
|
|
+ smoothed_feature = savgol_filter(feature, window_length=window, polyorder=self.sg_polyorder)
|
|
|
+ smoothed.append(smoothed_feature.reshape(-1, 1))
|
|
|
+ return np.concatenate(smoothed, axis=1)
|
|
|
+
|
|
|
+ def smooth_predictions(self, predictions):
|
|
|
+ """
|
|
|
+ 组合多步平滑策略处理预测结果:先滑动平均,再指数平滑,最后可选Savitzky-Golay滤波
|
|
|
+ 多层平滑进一步降低噪声,使预测曲线更平滑
|
|
|
+ """
|
|
|
+ smoothed = self.moving_average_smooth(predictions)
|
|
|
+ smoothed = self.exponential_smooth(smoothed)
|
|
|
+ if self.use_savitzky and len(predictions) >= self.sg_window:
|
|
|
+ smoothed = self.savitzky_golay_smooth(smoothed)
|
|
|
+ return smoothed
|
|
|
+
|
|
|
+ def predict(self, df):
|
|
|
+ """
|
|
|
+ 核心预测接口:输入原始数据,返回处理后的预测结果
|
|
|
+ 流程:更新起始时间 -> 数据预处理 -> 加载模型 -> 批量预测 -> 反标准化 -> 平滑处理
|
|
|
+ """
|
|
|
+ # 预测起始时间为输入数据的最大时间+3小时(根据业务需求设定)
|
|
|
+ self.test_start_date = (pd.to_datetime(df['index']).max() + timedelta(hours=3)).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+ self.load_data(df)
|
|
|
+ self.load_model()
|
|
|
+
|
|
|
+ all_predictions = []
|
|
|
+ with torch.no_grad():
|
|
|
+ for batch in self.test_loader:
|
|
|
+ inputs = batch[0].to(self.device)
|
|
|
+ outputs = self.model(inputs)
|
|
|
+ all_predictions.append(outputs.cpu().numpy()) # 结果移回CPU并转为numpy
|
|
|
+
|
|
|
+ # 拼接所有批次结果并重塑为[样本数, 目标特征数]
|
|
|
+ predictions = np.concatenate(all_predictions, axis=0).reshape(-1, self.labels_num)
|
|
|
+
|
|
|
+ # 反标准化处理
|
|
|
+ inverse_scaler = MinMaxScaler()
|
|
|
+
|
|
|
+ # 复用训练时的标准化参数(仅使用目标特征对应的参数)
|
|
|
+ inverse_scaler.min_ = self.scaler.min_[-self.labels_num:]
|
|
|
+ inverse_scaler.scale_ = self.scaler.scale_[-self.labels_num:]
|
|
|
+ predictions = inverse_scaler.inverse_transform(predictions)
|
|
|
+ predictions = np.clip(predictions, 0, None)
|
|
|
+
|
|
|
+ # 平滑处理
|
|
|
+ predictions = self.smooth_predictions(predictions)
|
|
|
+
|
|
|
+ return predictions
|
|
|
+
|
|
|
+ def save_predictions(self, predictions):
|
|
|
+ """
|
|
|
+ 保存预测结果到CSV文件,包含时间戳和各目标特征的预测值
|
|
|
+ 时间戳根据起始时间和数据分辨率生成
|
|
|
+ """
|
|
|
+ # 解析预测起始时间
|
|
|
+ start_time = datetime.strptime(self.test_start_date, "%Y-%m-%d %H:%M:%S")
|
|
|
+ # 计算时间间隔(根据分辨率转换为小时)
|
|
|
+ time_interval = pd.Timedelta(hours=(self.resolution / 60))
|
|
|
+ # 生成所有预测时间戳
|
|
|
+ timestamps = [start_time + i * time_interval for i in range(len(predictions))]
|
|
|
+
|
|
|
+ # 定义目标特征列名(与训练时一致)
|
|
|
+ base_columns = [
|
|
|
+ 'C.M.RO1_DB@DPT_1', 'C.M.RO2_DB@DPT_1', 'C.M.RO3_DB@DPT_1', 'C.M.RO4_DB@DPT_1',
|
|
|
+ 'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_2',
|
|
|
+ ]
|
|
|
+ pred_columns = [f'{col}_pred' for col in base_columns]
|
|
|
+ df_result = pd.DataFrame(predictions, columns=pred_columns)
|
|
|
+ df_result.insert(0, 'date', timestamps)
|
|
|
+ df_result.to_csv(self.output_csv_path, index=False)
|
|
|
+ print(f"预测结果保存至:{self.output_csv_path}")
|
|
|
+
|
|
|
+
|