90d_predict.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. import os
  2. import torch
  3. import pandas as pd
  4. import numpy as np
  5. import joblib
  6. from datetime import datetime, timedelta
  7. from torch.utils.data import DataLoader, TensorDataset
  8. from gat_lstm import GAT_LSTM # 导入自定义的GAT-LSTM模型
  9. from scipy.signal import savgol_filter # Savitzky-Golay滤波工具
  10. from sklearn.preprocessing import MinMaxScaler # 数据标准化工具
  11. def set_seed(seed):
  12. """设置随机种子,保证实验可复现性"""
  13. import random
  14. random.seed(seed)
  15. os.environ['PYTHONHASHSEED'] = str(seed)
  16. np.random.seed(seed)
  17. torch.manual_seed(seed)
  18. torch.cuda.manual_seed(seed)
  19. torch.cuda.manual_seed_all(seed)
  20. torch.backends.cudnn.deterministic = True
  21. torch.backends.cudnn.benchmark = False
  22. class Predictor:
  23. """预测器类,封装了数据处理、模型加载、预测和结果保存的完整流程"""
  24. def __init__(self):
  25. # 模型和数据相关参数
  26. self.seq_len = 360 # 输入序列长度
  27. self.output_size = 180 # 预测输出长度
  28. self.labels_num = 8 # 预测目标特征数量
  29. self.feature_num = 16 # 输入特征总数量
  30. self.step_size = 180 # 滑动窗口步长
  31. self.dropout = 0 # 模型dropout参数
  32. self.lr = 0.01 # 学习率
  33. self.hidden_size = 64 # LSTM隐藏层大小
  34. self.batch_size = 128 # 批处理大小
  35. self.num_layers = 1 # LSTM层数
  36. self.resolution = 5400 # 数据时间分辨率(单位:秒)
  37. self.test_start_date = '2025-09-24' # 预测起始日期(动态更新)
  38. self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
  39. self.model_path = '90day_model.pth' # 模型权重路径(可外部修改)
  40. self.output_csv_path = '90day_predictions.csv' # 结果保存路径(可外部修改)
  41. self.random_seed = 1314 # 随机种子
  42. # 预测结果平滑参数
  43. self.smooth_window = 30 # 滑动平均窗口大小
  44. self.ema_alpha = 0.1 # 指数移动平均系数(权重)
  45. self.use_savitzky = True # 是否使用Savitzky-Golay滤波
  46. self.sg_window = 25 # Savitzky-Golay窗口大小
  47. self.sg_polyorder = 2 # Savitzky-Golay多项式阶数
  48. # 初始化设置
  49. set_seed(self.random_seed) # 设置随机种子
  50. self.scaler = joblib.load('90day_scaler.pkl') # 加载标准化器(确保文件存在)
  51. self.model = None
  52. self.edge_index = None
  53. self.test_loader = None
  54. def reorder_columns(self, df):
  55. """
  56. 调整DataFrame列顺序,确保与模型训练时的特征顺序一致
  57. (特征顺序对模型输入至关重要,必须与训练时保持一致)
  58. """
  59. desired_order = [
  60. 'index', # 时间索引列
  61. 'C.M.RO1_FT_JS@out','C.M.RO2_FT_JS@out','C.M.RO3_FT_JS@out','C.M.RO4_FT_JS@out',
  62. 'C.M.RO_TT_ZJS@out','C.M.RO_Cond_ZJS@out',
  63. 'C.M.RO1_DB@DPT_1','C.M.RO1_DB@DPT_2',
  64. 'C.M.RO2_DB@DPT_1','C.M.RO2_DB@DPT_2',
  65. 'C.M.RO3_DB@DPT_1','C.M.RO3_DB@DPT_2',
  66. 'C.M.RO4_DB@DPT_1','C.M.RO4_DB@DPT_2',
  67. ]
  68. return df.loc[:, desired_order]
  69. def process_date(self, data):
  70. """
  71. 处理日期特征,生成周期性时间编码(年周期)
  72. 将时间特征转换为正弦/余弦编码,捕捉周期性规律(如季节变化)
  73. """
  74. if 'index' in data.columns:
  75. data = data.rename(columns={'index': 'date'})
  76. data['date'] = pd.to_datetime(data['date'])
  77. data['day_of_year'] = data['date'].dt.dayofyear
  78. # 生成正弦/余弦编码(周期为366天,适应闰年)
  79. data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366)
  80. data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366)
  81. data.drop(columns=['day_of_year'], inplace=True)
  82. # 调整列顺序:日期 + 时间特征 + 其他特征
  83. time_features = ['day_year_sin', 'day_year_cos']
  84. other_columns = [col for col in data.columns if col not in ['date'] + time_features]
  85. return data[['date'] + time_features + other_columns]
  86. def scaler_data(self, data):
  87. """
  88. 使用预训练的标准化器对数据进行标准化(保留date列不处理)
  89. 标准化是为了让不同量级的特征在模型中权重均衡
  90. """
  91. date_col = data[['date']] # 提取日期列(不参与标准化)
  92. data_to_scale = data.drop(columns=['date'])
  93. scaled = self.scaler.transform(data_to_scale)
  94. scaled_df = pd.DataFrame(scaled, columns=data_to_scale.columns)
  95. return pd.concat([date_col.reset_index(drop=True), scaled_df], axis=1) # 拼接日期列和标准化后的数据
  96. def create_test_loader(self, df):
  97. """
  98. 将预处理后的DataFrame转换为模型输入的测试数据加载器
  99. 生成符合模型要求的张量格式([样本数, 序列长度, 特征数])
  100. """
  101. if 'date' in df.columns:
  102. test_data = df.drop(columns=['date']).values
  103. else:
  104. test_data = df.values
  105. # 重塑为LSTM输入格式:[样本数, 序列长度, 特征数]
  106. X = test_data.reshape(-1, self.seq_len, self.feature_num)
  107. X = torch.tensor(X, dtype=torch.float32).to(self.device)
  108. tensor_dataset = TensorDataset(X) # 创建数据集(仅输入,无标签)
  109. # 创建数据加载器(不打乱顺序,按批次加载)
  110. return DataLoader(tensor_dataset, batch_size=self.batch_size, shuffle=False)
  111. def load_data(self, df):
  112. """数据加载与预处理统一接口,依次执行列重排、日期处理、标准化和生成数据加载器"""
  113. df = self.reorder_columns(df) # 调整列顺序
  114. df = self.process_date(df) # 处理日期特征
  115. df = self.scaler_data(df) # 标准化数据
  116. self.test_loader = self.create_test_loader(df)
  117. def load_model(self):
  118. """加载预训练模型并设置为评估模式(关闭dropout等训练特有层)"""
  119. self.model = GAT_LSTM(self).to(self.device)
  120. # 加载模型权重(map_location确保在指定设备加载,weights_only=True提高安全性)
  121. self.model.load_state_dict(torch.load(self.model_path, map_location=self.device, weights_only=True))
  122. self.model.eval()
  123. def moving_average_smooth(self, data):
  124. """
  125. 滑动平均平滑处理:对每个特征单独做滑动平均,减少高频噪声
  126. 采用边缘填充避免边界效应
  127. """
  128. smoothed = []
  129. for i in range(data.shape[1]):
  130. feature = data[:, i]
  131. # 边缘填充:用边缘值填充窗口外的部分,避免边界数据失真
  132. padded = np.pad(feature, (self.smooth_window//2, self.smooth_window//2), mode='edge')
  133. window = np.ones(self.smooth_window) / self.smooth_window # 平均窗口权重
  134. smoothed_feature = np.convolve(padded, window, mode='valid') # 卷积计算滑动平均
  135. smoothed.append(smoothed_feature.reshape(-1, 1)) # 保留维度并收集结果
  136. return np.concatenate(smoothed, axis=1) # 拼接所有特征
  137. def exponential_smooth(self, data):
  138. """
  139. 指数移动平均平滑:对每个特征做指数加权平均,近期数据权重更高
  140. 相比简单滑动平均更关注近期趋势
  141. """
  142. smoothed = []
  143. for i in range(data.shape[1]): # 遍历每个特征
  144. feature = data[:, i]
  145. smoothed_feature = np.zeros_like(feature)
  146. smoothed_feature[0] = feature[0]
  147. for t in range(1, len(feature)):
  148. smoothed_feature[t] = self.ema_alpha * feature[t] + (1 - self.ema_alpha) * smoothed_feature[t-1]
  149. smoothed.append(smoothed_feature.reshape(-1, 1))
  150. return np.concatenate(smoothed, axis=1)
  151. def savitzky_golay_smooth(self, data):
  152. """
  153. Savitzky-Golay滤波:基于多项式拟合的滑动窗口滤波,保留趋势的同时降噪
  154. 窗口大小需为奇数,若数据长度不足则调整窗口
  155. """
  156. smoothed = []
  157. for i in range(data.shape[1]):
  158. feature = data[:, i]
  159. # 确保窗口为奇数且不超过数据长度
  160. window = min(self.sg_window, len(feature) if len(feature) % 2 == 1 else len(feature)-1)
  161. if window < 3: # 窗口过小则不滤波(至少需要3个点拟合2阶多项式)
  162. smoothed.append(feature.reshape(-1, 1))
  163. continue
  164. # 应用Savitzky-Golay滤波
  165. smoothed_feature = savgol_filter(feature, window_length=window, polyorder=self.sg_polyorder)
  166. smoothed.append(smoothed_feature.reshape(-1, 1))
  167. return np.concatenate(smoothed, axis=1)
  168. def smooth_predictions(self, predictions):
  169. """
  170. 组合多步平滑策略处理预测结果:先滑动平均,再指数平滑,最后可选Savitzky-Golay滤波
  171. 多层平滑进一步降低噪声,使预测曲线更平滑
  172. """
  173. smoothed = self.moving_average_smooth(predictions)
  174. smoothed = self.exponential_smooth(smoothed)
  175. if self.use_savitzky and len(predictions) >= self.sg_window:
  176. smoothed = self.savitzky_golay_smooth(smoothed)
  177. return smoothed
  178. def predict(self, df):
  179. """
  180. 核心预测接口:输入原始数据,返回处理后的预测结果
  181. 流程:更新起始时间 -> 数据预处理 -> 加载模型 -> 批量预测 -> 反标准化 -> 平滑处理
  182. """
  183. # 预测起始时间为输入数据的最大时间+3小时(根据业务需求设定)
  184. self.test_start_date = (pd.to_datetime(df['index']).max() + timedelta(hours=3)).strftime("%Y-%m-%d %H:%M:%S")
  185. self.load_data(df)
  186. self.load_model()
  187. all_predictions = []
  188. with torch.no_grad():
  189. for batch in self.test_loader:
  190. inputs = batch[0].to(self.device)
  191. outputs = self.model(inputs)
  192. all_predictions.append(outputs.cpu().numpy()) # 结果移回CPU并转为numpy
  193. # 拼接所有批次结果并重塑为[样本数, 目标特征数]
  194. predictions = np.concatenate(all_predictions, axis=0).reshape(-1, self.labels_num)
  195. # 反标准化处理
  196. inverse_scaler = MinMaxScaler()
  197. # 复用训练时的标准化参数(仅使用目标特征对应的参数)
  198. inverse_scaler.min_ = self.scaler.min_[-self.labels_num:]
  199. inverse_scaler.scale_ = self.scaler.scale_[-self.labels_num:]
  200. predictions = inverse_scaler.inverse_transform(predictions)
  201. predictions = np.clip(predictions, 0, None)
  202. # 平滑处理
  203. predictions = self.smooth_predictions(predictions)
  204. return predictions
  205. def save_predictions(self, predictions):
  206. """
  207. 保存预测结果到CSV文件,包含时间戳和各目标特征的预测值
  208. 时间戳根据起始时间和数据分辨率生成
  209. """
  210. # 解析预测起始时间
  211. start_time = datetime.strptime(self.test_start_date, "%Y-%m-%d %H:%M:%S")
  212. # 计算时间间隔(根据分辨率转换为小时)
  213. time_interval = pd.Timedelta(hours=(self.resolution / 60))
  214. # 生成所有预测时间戳
  215. timestamps = [start_time + i * time_interval for i in range(len(predictions))]
  216. # 定义目标特征列名(与训练时一致)
  217. base_columns = [
  218. '1', 'C.M.RO2_DB@DPT_1', 'C.M.RO3_DB@DPT_1', 'C.M.RO4_DB@DPT_1',
  219. 'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_2',
  220. ]
  221. pred_columns = [f'{col}_pred' for col in base_columns]
  222. df_result = pd.DataFrame(predictions, columns=pred_columns)
  223. df_result.insert(0, 'date', timestamps)
  224. df_result.to_csv(self.output_csv_path, index=False)
  225. print(f"预测结果保存至:{self.output_csv_path}")