| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325 |
- """
- 90天TMP预测模型
- 版本:1.0
- 最后更新:2025-10-28
- """
- import os
- import sys
- import torch
- import pandas as pd
- import numpy as np
- import joblib
- from datetime import datetime, timedelta
- from torch.utils.data import DataLoader, TensorDataset
- from scipy.signal import savgol_filter # Savitzky-Golay滤波工具
- from sklearn.preprocessing import MinMaxScaler # 数据标准化工具
- # 添加父目录到系统路径以导入shared模块
- current_dir = os.path.dirname(os.path.abspath(__file__))
- parent_dir = os.path.dirname(current_dir)
- if parent_dir not in sys.path:
- sys.path.insert(0, parent_dir)
- # 从shared目录导入GAT-LSTM模型
- sys.path.insert(0, os.path.join(parent_dir, 'shared'))
- from gat_lstm import GAT_LSTM
- def set_seed(seed):
- """设置随机种子,保证实验可复现性"""
- import random
- random.seed(seed)
- os.environ['PYTHONHASHSEED'] = str(seed)
- np.random.seed(seed)
- torch.manual_seed(seed)
- torch.cuda.manual_seed(seed)
- torch.cuda.manual_seed_all(seed)
- torch.backends.cudnn.deterministic = True
- torch.backends.cudnn.benchmark = False
- class Predictor:
- """预测器类,封装了数据处理、模型加载、预测和结果保存的完整流程"""
- def __init__(self):
- # 模型和数据相关参数
- self.seq_len = 360 # 输入序列长度
- self.output_size = 180 # 预测输出长度
- self.labels_num = 8 # 预测目标特征数量
- self.feature_num = 16 # 输入特征总数量
- self.step_size = 180 # 滑动窗口步长
- self.dropout = 0 # 模型dropout参数
- self.lr = 0.01 # 学习率
- self.hidden_size = 64 # LSTM隐藏层大小
- self.batch_size = 128 # 批处理大小
- self.num_layers = 1 # LSTM层数
- self.resolution = 5400 # 数据时间分辨率(单位:秒)
- self.test_start_date = '2025-09-24' # 预测起始日期(动态更新)
- self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
-
- # 文件路径(相对于90day目录)
- current_dir = os.path.dirname(__file__)
- self.model_path = os.path.join(current_dir, '90day_model.pth')
- self.scaler_path = os.path.join(current_dir, '90day_scaler.pkl')
- self.output_csv_path = os.path.join(current_dir, '90day_predictions.csv')
- self.random_seed = 1314 # 随机种子
- # 预测结果平滑参数
- self.smooth_window = 30 # 滑动平均窗口大小
- self.ema_alpha = 0.1 # 指数移动平均系数(权重)
- self.use_savitzky = True # 是否使用Savitzky-Golay滤波
- self.sg_window = 25 # Savitzky-Golay窗口大小
- self.sg_polyorder = 2 # Savitzky-Golay多项式阶数
- # 初始化设置
- set_seed(self.random_seed) # 设置随机种子
- self.scaler = joblib.load(self.scaler_path) # 加载标准化器
- self.model = None
- self.edge_index = None
- self.test_loader = None
-
- def reorder_columns(self, df):
- """
- 调整DataFrame列顺序,确保与模型训练时的特征顺序一致
- (特征顺序对模型输入至关重要,必须与训练时保持一致)
- """
- desired_order = [
- 'index', # 时间索引列
- 'C.M.RO1_FT_JS@out','C.M.RO2_FT_JS@out','C.M.RO3_FT_JS@out','C.M.RO4_FT_JS@out',
- 'C.M.RO_TT_ZJS@out','C.M.RO_Cond_ZJS@out',
- 'C.M.RO1_DB@DPT_1','C.M.RO1_DB@DPT_2',
- 'C.M.RO2_DB@DPT_1','C.M.RO2_DB@DPT_2',
- 'C.M.RO3_DB@DPT_1','C.M.RO3_DB@DPT_2',
- 'C.M.RO4_DB@DPT_1','C.M.RO4_DB@DPT_2',
- ]
- return df.loc[:, desired_order]
- def process_date(self, data):
- """
- 处理日期特征,生成周期性时间编码(年周期)
- 将时间特征转换为正弦/余弦编码,捕捉周期性规律(如季节变化)
- """
- if 'index' in data.columns:
- data = data.rename(columns={'index': 'date'})
- data['date'] = pd.to_datetime(data['date'])
- data['day_of_year'] = data['date'].dt.dayofyear
- # 生成正弦/余弦编码(周期为366天,适应闰年)
- data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366)
- data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366)
- data.drop(columns=['day_of_year'], inplace=True)
-
- # 调整列顺序:日期 + 时间特征 + 其他特征
- time_features = ['day_year_sin', 'day_year_cos']
- other_columns = [col for col in data.columns if col not in ['date'] + time_features]
- return data[['date'] + time_features + other_columns]
- def scaler_data(self, data):
- """
- 使用预训练的标准化器对数据进行标准化(保留date列不处理)
- 标准化是为了让不同量级的特征在模型中权重均衡
- """
- date_col = data[['date']] # 提取日期列(不参与标准化)
- data_to_scale = data.drop(columns=['date'])
- scaled = self.scaler.transform(data_to_scale)
- scaled_df = pd.DataFrame(scaled, columns=data_to_scale.columns)
- return pd.concat([date_col.reset_index(drop=True), scaled_df], axis=1) # 拼接日期列和标准化后的数据
- def create_test_loader(self, df):
- """
- 将预处理后的DataFrame转换为模型输入的测试数据加载器
- 生成符合模型要求的张量格式([样本数, 序列长度, 特征数])
- """
- if 'date' in df.columns:
- test_data = df.drop(columns=['date']).values
- else:
- test_data = df.values
- # 重塑为LSTM输入格式:[样本数, 序列长度, 特征数]
- X = test_data.reshape(-1, self.seq_len, self.feature_num)
- X = torch.tensor(X, dtype=torch.float32).to(self.device)
- tensor_dataset = TensorDataset(X) # 创建数据集(仅输入,无标签)
-
- # 创建数据加载器(不打乱顺序,按批次加载)
- return DataLoader(tensor_dataset, batch_size=self.batch_size, shuffle=False)
-
- def load_data(self, df):
- """数据加载与预处理统一接口,依次执行列重排、日期处理、标准化和生成数据加载器"""
- df = self.reorder_columns(df) # 调整列顺序
- df = self.process_date(df) # 处理日期特征
- df = self.scaler_data(df) # 标准化数据
- self.test_loader = self.create_test_loader(df)
- def load_model(self):
- """加载预训练模型并设置为评估模式(关闭dropout等训练特有层)"""
- self.model = GAT_LSTM(self).to(self.device)
- # 加载模型权重(map_location确保在指定设备加载,weights_only=True提高安全性)
- self.model.load_state_dict(torch.load(self.model_path, map_location=self.device, weights_only=True))
- self.model.eval()
- def moving_average_smooth(self, data):
- """
- 滑动平均平滑处理:对每个特征单独做滑动平均,减少高频噪声
- 采用边缘填充避免边界效应
- """
- smoothed = []
- for i in range(data.shape[1]):
- feature = data[:, i]
-
- # 边缘填充:用边缘值填充窗口外的部分,避免边界数据失真
- padded = np.pad(feature, (self.smooth_window//2, self.smooth_window//2), mode='edge')
- window = np.ones(self.smooth_window) / self.smooth_window # 平均窗口权重
- smoothed_feature = np.convolve(padded, window, mode='valid') # 卷积计算滑动平均
- smoothed.append(smoothed_feature.reshape(-1, 1)) # 保留维度并收集结果
- return np.concatenate(smoothed, axis=1) # 拼接所有特征
- def exponential_smooth(self, data):
- """
- 指数移动平均平滑:对每个特征做指数加权平均,近期数据权重更高
- 相比简单滑动平均更关注近期趋势
- """
- smoothed = []
- for i in range(data.shape[1]): # 遍历每个特征
- feature = data[:, i]
- smoothed_feature = np.zeros_like(feature)
- smoothed_feature[0] = feature[0]
- for t in range(1, len(feature)):
- smoothed_feature[t] = self.ema_alpha * feature[t] + (1 - self.ema_alpha) * smoothed_feature[t-1]
- smoothed.append(smoothed_feature.reshape(-1, 1))
- return np.concatenate(smoothed, axis=1)
- def savitzky_golay_smooth(self, data):
- """
- Savitzky-Golay滤波:基于多项式拟合的滑动窗口滤波,保留趋势的同时降噪
- 窗口大小需为奇数,若数据长度不足则调整窗口
- """
- smoothed = []
- for i in range(data.shape[1]):
- feature = data[:, i]
- # 确保窗口为奇数且不超过数据长度
- window = min(self.sg_window, len(feature) if len(feature) % 2 == 1 else len(feature)-1)
- if window < 3: # 窗口过小则不滤波(至少需要3个点拟合2阶多项式)
- smoothed.append(feature.reshape(-1, 1))
- continue
- # 应用Savitzky-Golay滤波
- smoothed_feature = savgol_filter(feature, window_length=window, polyorder=self.sg_polyorder)
- smoothed.append(smoothed_feature.reshape(-1, 1))
- return np.concatenate(smoothed, axis=1)
- def smooth_predictions(self, predictions):
- """
- 组合多步平滑策略处理预测结果:先滑动平均,再指数平滑,最后可选Savitzky-Golay滤波
- 多层平滑进一步降低噪声,使预测曲线更平滑
- """
- smoothed = self.moving_average_smooth(predictions)
- smoothed = self.exponential_smooth(smoothed)
- if self.use_savitzky and len(predictions) >= self.sg_window:
- smoothed = self.savitzky_golay_smooth(smoothed)
- return smoothed
- def predict(self, df):
- """
- 核心预测接口:输入原始数据,返回处理后的预测结果
- 流程:更新起始时间 -> 数据预处理 -> 加载模型 -> 批量预测 -> 反标准化 -> 平滑处理
- """
- # 预测起始时间为输入数据的最大时间+3小时(根据业务需求设定)
- self.test_start_date = (pd.to_datetime(df['index']).max() + timedelta(hours=3)).strftime("%Y-%m-%d %H:%M:%S")
- self.load_data(df)
- self.load_model()
- all_predictions = []
- with torch.no_grad():
- for batch in self.test_loader:
- inputs = batch[0].to(self.device)
- outputs = self.model(inputs)
- all_predictions.append(outputs.cpu().numpy()) # 结果移回CPU并转为numpy
-
- # 拼接所有批次结果并重塑为[样本数, 目标特征数]
- predictions = np.concatenate(all_predictions, axis=0).reshape(-1, self.labels_num)
-
- # 反标准化处理
- inverse_scaler = MinMaxScaler()
-
- # 复用训练时的标准化参数(仅使用目标特征对应的参数)
- inverse_scaler.min_ = self.scaler.min_[-self.labels_num:]
- inverse_scaler.scale_ = self.scaler.scale_[-self.labels_num:]
- predictions = inverse_scaler.inverse_transform(predictions)
- predictions = np.clip(predictions, 0, None)
-
- # 平滑处理
- predictions = self.smooth_predictions(predictions)
-
- return predictions
- def save_predictions(self, predictions, start_date=None, output_path=None):
- """
- 保存预测结果到CSV文件并返回DataFrame(适配API使用)
-
- Args:
- predictions: 预测结果数组
- start_date: 预测起始时间字符串,格式:'YYYY-MM-DD HH:MM:SS',如果为None则使用test_start_date
- output_path: 输出CSV路径,如果为None则使用默认路径
-
- Returns:
- DataFrame: 包含日期和预测结果的DataFrame
- """
- if start_date is None:
- start_date = self.test_start_date
-
- # 解析预测起始时间
- start_time = datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S")
- # 计算时间间隔(根据分辨率转换为小时)
- time_interval = pd.Timedelta(hours=(self.resolution / 60))
- # 生成所有预测时间戳
- timestamps = [start_time + i * time_interval for i in range(len(predictions))]
-
- # 定义目标特征列名(与训练时一致)
- base_columns = [
- 'C.M.RO1_DB@DPT_1', 'C.M.RO2_DB@DPT_1', 'C.M.RO3_DB@DPT_1', 'C.M.RO4_DB@DPT_1',
- 'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_2',
- ]
- pred_columns = [f'{col}_Predicted' for col in base_columns]
- df_result = pd.DataFrame(predictions, columns=pred_columns)
- df_result.insert(0, 'index', timestamps)
-
- # 如果指定了输出路径则使用,否则使用默认路径
- save_path = output_path if output_path else self.output_csv_path
- df_result.to_csv(save_path, index=False)
- print(f"预测结果保存至:{save_path}")
-
- return df_result
- if __name__ == '__main__':
- """
- 主函数:执行90天TMP预测
-
- 使用方法:
- 1. 准备输入数据(CSV或JSON格式)
- 2. 运行此脚本
- 3. 查看预测结果(保存在90day_predictions.csv)
- """
- import json
-
- try:
- # 初始化预测器
- predictor = Predictor()
-
- # 读取测试数据(根据实际情况修改路径和格式)
- # 示例:从JSON文件读取
- # with open('test_data.json', 'r', encoding='utf-8') as f:
- # json_data = json.load(f)
- # df = pd.DataFrame(json_data)
-
- # 示例:从CSV文件读取
- # df = pd.read_csv('test_data.csv')
-
- print("请准备输入数据并取消注释相应的加载代码")
-
- # 执行预测并保存结果
- # predictions = predictor.predict(df)
- # predictor.save_predictions(predictions)
-
- # print("预测任务完成!")
-
- except Exception as e:
- print(f"预测过程发生错误: {str(e)}")
- raise
|