predict.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. """
  2. 90天TMP预测模型
  3. 版本:1.0
  4. 最后更新:2025-10-28
  5. """
  6. import os
  7. import sys
  8. import torch
  9. import pandas as pd
  10. import numpy as np
  11. import joblib
  12. from datetime import datetime, timedelta
  13. from torch.utils.data import DataLoader, TensorDataset
  14. from scipy.signal import savgol_filter # Savitzky-Golay滤波工具
  15. from sklearn.preprocessing import MinMaxScaler # 数据标准化工具
  16. # 添加父目录到系统路径以导入shared模块
  17. current_dir = os.path.dirname(os.path.abspath(__file__))
  18. parent_dir = os.path.dirname(current_dir)
  19. if parent_dir not in sys.path:
  20. sys.path.insert(0, parent_dir)
  21. # 从shared目录导入GAT-LSTM模型
  22. sys.path.insert(0, os.path.join(parent_dir, 'shared'))
  23. from gat_lstm import GAT_LSTM
  24. def set_seed(seed):
  25. """设置随机种子,保证实验可复现性"""
  26. import random
  27. random.seed(seed)
  28. os.environ['PYTHONHASHSEED'] = str(seed)
  29. np.random.seed(seed)
  30. torch.manual_seed(seed)
  31. torch.cuda.manual_seed(seed)
  32. torch.cuda.manual_seed_all(seed)
  33. torch.backends.cudnn.deterministic = True
  34. torch.backends.cudnn.benchmark = False
  35. class Predictor:
  36. """预测器类,封装了数据处理、模型加载、预测和结果保存的完整流程"""
  37. def __init__(self):
  38. # 模型和数据相关参数
  39. self.seq_len = 360 # 输入序列长度
  40. self.output_size = 180 # 预测输出长度
  41. self.labels_num = 8 # 预测目标特征数量
  42. self.feature_num = 16 # 输入特征总数量
  43. self.step_size = 180 # 滑动窗口步长
  44. self.dropout = 0 # 模型dropout参数
  45. self.lr = 0.01 # 学习率
  46. self.hidden_size = 64 # LSTM隐藏层大小
  47. self.batch_size = 128 # 批处理大小
  48. self.num_layers = 1 # LSTM层数
  49. self.resolution = 5400 # 数据时间分辨率(单位:秒)
  50. self.test_start_date = '2025-09-24' # 预测起始日期(动态更新)
  51. self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
  52. # 文件路径(相对于90day目录)
  53. current_dir = os.path.dirname(__file__)
  54. self.model_path = os.path.join(current_dir, '90day_model.pth')
  55. self.scaler_path = os.path.join(current_dir, '90day_scaler.pkl')
  56. self.output_csv_path = os.path.join(current_dir, '90day_predictions.csv')
  57. self.random_seed = 1314 # 随机种子
  58. # 预测结果平滑参数
  59. self.smooth_window = 30 # 滑动平均窗口大小
  60. self.ema_alpha = 0.1 # 指数移动平均系数(权重)
  61. self.use_savitzky = True # 是否使用Savitzky-Golay滤波
  62. self.sg_window = 25 # Savitzky-Golay窗口大小
  63. self.sg_polyorder = 2 # Savitzky-Golay多项式阶数
  64. # 初始化设置
  65. set_seed(self.random_seed) # 设置随机种子
  66. self.scaler = joblib.load(self.scaler_path) # 加载标准化器
  67. self.model = None
  68. self.edge_index = None
  69. self.test_loader = None
  70. def reorder_columns(self, df):
  71. """
  72. 调整DataFrame列顺序,确保与模型训练时的特征顺序一致
  73. (特征顺序对模型输入至关重要,必须与训练时保持一致)
  74. """
  75. desired_order = [
  76. 'index', # 时间索引列
  77. 'C.M.RO1_FT_JS@out','C.M.RO2_FT_JS@out','C.M.RO3_FT_JS@out','C.M.RO4_FT_JS@out',
  78. 'C.M.RO_TT_ZJS@out','C.M.RO_Cond_ZJS@out',
  79. 'C.M.RO1_DB@DPT_1','C.M.RO1_DB@DPT_2',
  80. 'C.M.RO2_DB@DPT_1','C.M.RO2_DB@DPT_2',
  81. 'C.M.RO3_DB@DPT_1','C.M.RO3_DB@DPT_2',
  82. 'C.M.RO4_DB@DPT_1','C.M.RO4_DB@DPT_2',
  83. ]
  84. return df.loc[:, desired_order]
  85. def process_date(self, data):
  86. """
  87. 处理日期特征,生成周期性时间编码(年周期)
  88. 将时间特征转换为正弦/余弦编码,捕捉周期性规律(如季节变化)
  89. """
  90. if 'index' in data.columns:
  91. data = data.rename(columns={'index': 'date'})
  92. data['date'] = pd.to_datetime(data['date'])
  93. data['day_of_year'] = data['date'].dt.dayofyear
  94. # 生成正弦/余弦编码(周期为366天,适应闰年)
  95. data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366)
  96. data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366)
  97. data.drop(columns=['day_of_year'], inplace=True)
  98. # 调整列顺序:日期 + 时间特征 + 其他特征
  99. time_features = ['day_year_sin', 'day_year_cos']
  100. other_columns = [col for col in data.columns if col not in ['date'] + time_features]
  101. return data[['date'] + time_features + other_columns]
  102. def scaler_data(self, data):
  103. """
  104. 使用预训练的标准化器对数据进行标准化(保留date列不处理)
  105. 标准化是为了让不同量级的特征在模型中权重均衡
  106. """
  107. date_col = data[['date']] # 提取日期列(不参与标准化)
  108. data_to_scale = data.drop(columns=['date'])
  109. scaled = self.scaler.transform(data_to_scale)
  110. scaled_df = pd.DataFrame(scaled, columns=data_to_scale.columns)
  111. return pd.concat([date_col.reset_index(drop=True), scaled_df], axis=1) # 拼接日期列和标准化后的数据
  112. def create_test_loader(self, df):
  113. """
  114. 将预处理后的DataFrame转换为模型输入的测试数据加载器
  115. 生成符合模型要求的张量格式([样本数, 序列长度, 特征数])
  116. """
  117. if 'date' in df.columns:
  118. test_data = df.drop(columns=['date']).values
  119. else:
  120. test_data = df.values
  121. # 重塑为LSTM输入格式:[样本数, 序列长度, 特征数]
  122. X = test_data.reshape(-1, self.seq_len, self.feature_num)
  123. X = torch.tensor(X, dtype=torch.float32).to(self.device)
  124. tensor_dataset = TensorDataset(X) # 创建数据集(仅输入,无标签)
  125. # 创建数据加载器(不打乱顺序,按批次加载)
  126. return DataLoader(tensor_dataset, batch_size=self.batch_size, shuffle=False)
  127. def load_data(self, df):
  128. """数据加载与预处理统一接口,依次执行列重排、日期处理、标准化和生成数据加载器"""
  129. df = self.reorder_columns(df) # 调整列顺序
  130. df = self.process_date(df) # 处理日期特征
  131. df = self.scaler_data(df) # 标准化数据
  132. self.test_loader = self.create_test_loader(df)
  133. def load_model(self):
  134. """加载预训练模型并设置为评估模式(关闭dropout等训练特有层)"""
  135. self.model = GAT_LSTM(self).to(self.device)
  136. # 加载模型权重(map_location确保在指定设备加载,weights_only=True提高安全性)
  137. self.model.load_state_dict(torch.load(self.model_path, map_location=self.device, weights_only=True))
  138. self.model.eval()
  139. def moving_average_smooth(self, data):
  140. """
  141. 滑动平均平滑处理:对每个特征单独做滑动平均,减少高频噪声
  142. 采用边缘填充避免边界效应
  143. """
  144. smoothed = []
  145. for i in range(data.shape[1]):
  146. feature = data[:, i]
  147. # 边缘填充:用边缘值填充窗口外的部分,避免边界数据失真
  148. padded = np.pad(feature, (self.smooth_window//2, self.smooth_window//2), mode='edge')
  149. window = np.ones(self.smooth_window) / self.smooth_window # 平均窗口权重
  150. smoothed_feature = np.convolve(padded, window, mode='valid') # 卷积计算滑动平均
  151. smoothed.append(smoothed_feature.reshape(-1, 1)) # 保留维度并收集结果
  152. return np.concatenate(smoothed, axis=1) # 拼接所有特征
  153. def exponential_smooth(self, data):
  154. """
  155. 指数移动平均平滑:对每个特征做指数加权平均,近期数据权重更高
  156. 相比简单滑动平均更关注近期趋势
  157. """
  158. smoothed = []
  159. for i in range(data.shape[1]): # 遍历每个特征
  160. feature = data[:, i]
  161. smoothed_feature = np.zeros_like(feature)
  162. smoothed_feature[0] = feature[0]
  163. for t in range(1, len(feature)):
  164. smoothed_feature[t] = self.ema_alpha * feature[t] + (1 - self.ema_alpha) * smoothed_feature[t-1]
  165. smoothed.append(smoothed_feature.reshape(-1, 1))
  166. return np.concatenate(smoothed, axis=1)
  167. def savitzky_golay_smooth(self, data):
  168. """
  169. Savitzky-Golay滤波:基于多项式拟合的滑动窗口滤波,保留趋势的同时降噪
  170. 窗口大小需为奇数,若数据长度不足则调整窗口
  171. """
  172. smoothed = []
  173. for i in range(data.shape[1]):
  174. feature = data[:, i]
  175. # 确保窗口为奇数且不超过数据长度
  176. window = min(self.sg_window, len(feature) if len(feature) % 2 == 1 else len(feature)-1)
  177. if window < 3: # 窗口过小则不滤波(至少需要3个点拟合2阶多项式)
  178. smoothed.append(feature.reshape(-1, 1))
  179. continue
  180. # 应用Savitzky-Golay滤波
  181. smoothed_feature = savgol_filter(feature, window_length=window, polyorder=self.sg_polyorder)
  182. smoothed.append(smoothed_feature.reshape(-1, 1))
  183. return np.concatenate(smoothed, axis=1)
  184. def smooth_predictions(self, predictions):
  185. """
  186. 组合多步平滑策略处理预测结果:先滑动平均,再指数平滑,最后可选Savitzky-Golay滤波
  187. 多层平滑进一步降低噪声,使预测曲线更平滑
  188. """
  189. smoothed = self.moving_average_smooth(predictions)
  190. smoothed = self.exponential_smooth(smoothed)
  191. if self.use_savitzky and len(predictions) >= self.sg_window:
  192. smoothed = self.savitzky_golay_smooth(smoothed)
  193. return smoothed
  194. def predict(self, df):
  195. """
  196. 核心预测接口:输入原始数据,返回处理后的预测结果
  197. 流程:更新起始时间 -> 数据预处理 -> 加载模型 -> 批量预测 -> 反标准化 -> 平滑处理
  198. """
  199. # 预测起始时间为输入数据的最大时间+3小时(根据业务需求设定)
  200. self.test_start_date = (pd.to_datetime(df['index']).max() + timedelta(hours=3)).strftime("%Y-%m-%d %H:%M:%S")
  201. self.load_data(df)
  202. self.load_model()
  203. all_predictions = []
  204. with torch.no_grad():
  205. for batch in self.test_loader:
  206. inputs = batch[0].to(self.device)
  207. outputs = self.model(inputs)
  208. all_predictions.append(outputs.cpu().numpy()) # 结果移回CPU并转为numpy
  209. # 拼接所有批次结果并重塑为[样本数, 目标特征数]
  210. predictions = np.concatenate(all_predictions, axis=0).reshape(-1, self.labels_num)
  211. # 反标准化处理
  212. inverse_scaler = MinMaxScaler()
  213. # 复用训练时的标准化参数(仅使用目标特征对应的参数)
  214. inverse_scaler.min_ = self.scaler.min_[-self.labels_num:]
  215. inverse_scaler.scale_ = self.scaler.scale_[-self.labels_num:]
  216. predictions = inverse_scaler.inverse_transform(predictions)
  217. predictions = np.clip(predictions, 0, None)
  218. # 平滑处理
  219. predictions = self.smooth_predictions(predictions)
  220. return predictions
  221. def save_predictions(self, predictions, start_date=None, output_path=None):
  222. """
  223. 保存预测结果到CSV文件并返回DataFrame(适配API使用)
  224. Args:
  225. predictions: 预测结果数组
  226. start_date: 预测起始时间字符串,格式:'YYYY-MM-DD HH:MM:SS',如果为None则使用test_start_date
  227. output_path: 输出CSV路径,如果为None则使用默认路径
  228. Returns:
  229. DataFrame: 包含日期和预测结果的DataFrame
  230. """
  231. if start_date is None:
  232. start_date = self.test_start_date
  233. # 解析预测起始时间
  234. start_time = datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S")
  235. # 计算时间间隔(根据分辨率转换为小时)
  236. time_interval = pd.Timedelta(hours=(self.resolution / 60))
  237. # 生成所有预测时间戳
  238. timestamps = [start_time + i * time_interval for i in range(len(predictions))]
  239. # 定义目标特征列名(与训练时一致)
  240. base_columns = [
  241. 'C.M.RO1_DB@DPT_1', 'C.M.RO2_DB@DPT_1', 'C.M.RO3_DB@DPT_1', 'C.M.RO4_DB@DPT_1',
  242. 'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_2',
  243. ]
  244. pred_columns = [f'{col}_Predicted' for col in base_columns]
  245. df_result = pd.DataFrame(predictions, columns=pred_columns)
  246. df_result.insert(0, 'index', timestamps)
  247. # 如果指定了输出路径则使用,否则使用默认路径
  248. save_path = output_path if output_path else self.output_csv_path
  249. df_result.to_csv(save_path, index=False)
  250. print(f"预测结果保存至:{save_path}")
  251. return df_result
  252. if __name__ == '__main__':
  253. """
  254. 主函数:执行90天TMP预测
  255. 使用方法:
  256. 1. 准备输入数据(CSV或JSON格式)
  257. 2. 运行此脚本
  258. 3. 查看预测结果(保存在90day_predictions.csv)
  259. """
  260. import json
  261. try:
  262. # 初始化预测器
  263. predictor = Predictor()
  264. # 读取测试数据(根据实际情况修改路径和格式)
  265. # 示例:从JSON文件读取
  266. # with open('test_data.json', 'r', encoding='utf-8') as f:
  267. # json_data = json.load(f)
  268. # df = pd.DataFrame(json_data)
  269. # 示例:从CSV文件读取
  270. # df = pd.read_csv('test_data.csv')
  271. print("请准备输入数据并取消注释相应的加载代码")
  272. # 执行预测并保存结果
  273. # predictions = predictor.predict(df)
  274. # predictor.save_predictions(predictions)
  275. # print("预测任务完成!")
  276. except Exception as e:
  277. print(f"预测过程发生错误: {str(e)}")
  278. raise