predict.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. import os
  2. import torch
  3. import pandas as pd
  4. import numpy as np
  5. import joblib
  6. import pywt
  7. from datetime import datetime, timedelta
  8. from torch.utils.data import DataLoader, TensorDataset
  9. from gat_lstm import GAT_LSTM # 导入自定义的GAT-LSTM模型
  10. from tqdm import tqdm
  11. def set_seed(seed):
  12. """设置随机种子,保证实验可重复性"""
  13. import random
  14. random.seed(seed)
  15. os.environ['PYTHONHASHSEED'] = str(seed)
  16. np.random.seed(seed)
  17. torch.manual_seed(seed)
  18. torch.cuda.manual_seed(seed)
  19. torch.cuda.manual_seed_all(seed)
  20. torch.backends.cudnn.deterministic = True
  21. torch.backends.cudnn.benchmark = False
  22. class Predictor:
  23. """预测器类,用于加载数据、模型并执行预测流程"""
  24. def __init__(self):
  25. self.seq_len = 10 # 输入序列长度(历史时间步)
  26. self.output_size = 5 # 预测步长(未来预测的时间步数)
  27. self.labels_num = 16 # 预测目标数量(16个待预测的指标)
  28. self.feature_num = 79 # 输入特征总维度
  29. self.step_size = 5 # 数据采样步长(每隔step_size取一个样本)
  30. self.dropout = 0 # dropout概率(防止过拟合)
  31. self.lr = 0.01 # 学习率(训练时使用,预测时仅作参数记录)
  32. self.num_heads = 8 # 注意力头数(模型结构参数)
  33. self.hidden_size = 32 # 隐藏层维度
  34. self.batch_size = 512 # 批处理大小
  35. self.num_layers = 1 # LSTM层数
  36. self.resolution = 60 # 数据分辨率(原始数据每隔60条取一条,下采样)
  37. self.test_start_date = '2025-07-01' # 测试集起始日期(初始值,会动态更新)
  38. self.wavelet = 'db4' # 小波变换类型(预留,未实际使用)
  39. self.level = 3 # 小波分解层数(预留)
  40. self.level_after = 4 # 后续小波处理层数(预留)
  41. self.mode = 'soft' # 小波阈值模式(预留)
  42. self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # 计算设备(GPU优先)
  43. self.model_path = 'model.pth' # 模型权重保存路径
  44. self.output_csv_path = 'predictions.csv' # 预测结果保存路径
  45. self.random_seed = 1314 # 随机种子
  46. self.uf_threshold = 0.001 # UF指标阈值(预留)
  47. self.ro_threshold = 0.01 # RO指标阈值(预留)
  48. self.flow_threshold = 1.0 # 流量阈值(预留)
  49. set_seed(self.random_seed) # 初始化随机种子
  50. self.scaler = joblib.load('scaler.pkl') # 加载数据归一化器(训练时保存)
  51. self.model = None # 模型实例(后续加载)
  52. self.edge_index = None # 图结构边索引(图模型用)
  53. self.test_loader = None # 测试数据加载器(后续创建)
  54. def reorder_columns(self, df):
  55. """
  56. 调整数据列顺序,确保与训练时的特征顺序一致
  57. 避免因列顺序不一致导致模型输入特征错位
  58. """
  59. desired_order = [
  60. 'index',
  61. 'C.M.FT_ZGJJY1@out','C.M.RO1_FT_JS@out','C.M.RO2_FT_JS@out','C.M.RO3_FT_JS@out',
  62. 'C.M.RO4_FT_JS@out','C.M.UF1_FT_JS@out','C.M.UF2_FT_JS@out','C.M.UF3_FT_JS@out',
  63. 'C.M.UF4_FT_JS@out','C.M.UF_FT_ZCS@out','C.M.FT_ZGJJY2@out','C.M.FT_ZGJJY3@out',
  64. 'C.M.FT_ZGJJY4@out','C.M.RO1_PT_JS@out','C.M.RO2_PT_JS@out','C.M.RO3_PT_JS@out',
  65. 'C.M.UF1_PT_JS@out','C.M.UF2_PT_JS@out','C.M.UF3_PT_JS@out','C.M.UF4_PT_JS@out',
  66. 'C.M.LT_JSC@out','C.M.RO1_PT_CS@out','C.M.RO1_PT_DJ2@out','C.M.RO2_PT_CS@out',
  67. 'C.M.RO2_PT_DJ2@out','C.M.RO3_PT_CS@out','C.M.RO3_PT_DJ2@out','C.M.RO4_PT_CS@out',
  68. 'C.M.RO4_PT_DJ2@out','C.M.RO4_PT_JS@out','C.M.LT_HCl@out','C.M.LT_NaClO@out',
  69. 'C.M.LT_PAC@out','C.M.LT_QSC@out','C.M.RO_Cond_ZCS@out','C.M.RO_TT_ZJS@out',
  70. 'C.M.UF1_JSF_kd@out','C.M.UF2_JSF_kd@out','C.M.UF_GSB4_fre@out','C.M.UF_ORP_ZCS@out',
  71. 'C.M.JYB2_ZGJ1_fre@out','C.M.JYB2_ZGJ2_fre@out','C.M.JYB2_ZGJ3_fre@out','C.M.JYB2_ZGJ4_fre@out',
  72. 'C.M.RO1_GYB_fre@out','C.M.RO2_GYB_fre@out','C.M.RO3_GYB_fre@out','C.M.RO4_GYB_fre@out',
  73. 'C.M.UF3_JSF_kd@out','C.M.UF4_JSF_kd@out','C.M.UF_FXB2_fre@out','C.M.RO1_DJB_fre@out',
  74. 'C.M.RO1_GYBF_kd@out','C.M.RO2_DJB_fre@out','C.M.RO2_GYBF_kd@out','C.M.RO3_DJB_fre@out',
  75. 'C.M.RO3_GYBF_kd@out','C.M.RO4_DJB_fre@out','C.M.RO4_GYBF_kd@out',
  76. 'C.M.UF1_DB@press_PV','C.M.UF2_DB@press_PV','C.M.UF3_DB@press_PV','C.M.UF4_DB@press_PV',
  77. 'C.M.RO1_DB@DPT_1','C.M.RO2_DB@DPT_1','C.M.RO3_DB@DPT_1','C.M.RO4_DB@DPT_1',
  78. 'C.M.RO1_DB@DPT_2','C.M.RO2_DB@DPT_2','C.M.RO3_DB@DPT_2','C.M.RO4_DB@DPT_2',
  79. 'RO1_CSFlow','RO2_CSFlow','RO3_CSFlow','RO4_CSFlow'
  80. ]
  81. return df.loc[:, desired_order]
  82. def process_date(self, data):
  83. """
  84. 处理日期列,生成周期性时间特征(捕捉时间周期性模式)
  85. 包括:分钟级正弦/余弦特征(每日周期)、年中日正弦/余弦特征(年度周期)
  86. """
  87. if 'index' in data.columns:
  88. data = data.rename(columns={'index': 'date'})
  89. data['date'] = pd.to_datetime(data['date'])
  90. data['minute_of_day'] = data['date'].dt.hour * 60 + data['date'].dt.minute
  91. data['day_of_year'] = data['date'].dt.dayofyear
  92. # 周期性编码(将时间转换为正弦/余弦值,确保周期性连续)
  93. data['minute_sin'] = np.sin(2 * np.pi * data['minute_of_day'] / 1440) # 分钟正弦特征
  94. data['minute_cos'] = np.cos(2 * np.pi * data['minute_of_day'] / 1440) # 分钟余弦特征
  95. data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366) # 年中日正弦特征
  96. data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366) # 年中日余弦特征
  97. # 移除原始时间列(仅保留编码后的特征)
  98. data.drop(columns=['minute_of_day', 'day_of_year'], inplace=True)
  99. # 调整列顺序:日期 + 时间特征 + 其他特征
  100. time_features = ['minute_sin', 'minute_cos', 'day_year_sin', 'day_year_cos']
  101. other_columns = [col for col in data.columns if col not in ['date'] + time_features]
  102. return data[['date'] + time_features + other_columns]
  103. def scaler_data(self, data):
  104. """
  105. 对数据进行归一化(使用训练时保存的scaler)
  106. 保持与训练数据的归一化方式一致(0-1缩放)
  107. """
  108. date_col = data[['date']]
  109. data_to_scale = data.drop(columns=['date'])
  110. scaled = self.scaler.transform(data_to_scale)
  111. scaled_df = pd.DataFrame(scaled, columns=data_to_scale.columns)
  112. # 拼接日期列和归一化后的特征列
  113. return pd.concat([date_col.reset_index(drop=True), scaled_df], axis=1)
  114. def remove_outliers(self, predictions):
  115. """
  116. 用四分位法处理预测结果中的异常值
  117. 异常值定义:小于Q1-1.5*IQR或大于Q3+1.5*IQR的值
  118. 异常值替换为正常值的平均值(避免极端值影响)
  119. """
  120. cleaned = predictions.copy()
  121. # 遍历每个特征列(16个标签)
  122. for col in range(cleaned.shape[1]):
  123. values = cleaned[:, col]
  124. # 计算四分位数
  125. q1 = np.percentile(values, 25)
  126. q3 = np.percentile(values, 75)
  127. iqr = q3 - q1
  128. # 异常值边界
  129. lower_bound = q1 - 1.5 * iqr
  130. upper_bound = q3 + 1.5 * iqr
  131. # 筛选正常值
  132. normal_values = values[(values >= lower_bound) & (values <= upper_bound)]
  133. # 用正常值的平均值替换异常值
  134. if len(normal_values) > 0:
  135. mean_normal = np.mean(normal_values)
  136. cleaned[(values < lower_bound) | (values > upper_bound), col] = mean_normal
  137. return cleaned
  138. def smooth_predictions(self, predictions):
  139. """
  140. 对预测结果进行加权平滑处理,减少预测波动
  141. 采用滑动窗口加权平均:中间值权重为2,前后邻居权重为1(边缘值特殊处理)
  142. """
  143. smoothed = predictions.copy()
  144. n_timesteps = predictions.shape[0]
  145. if n_timesteps <= 1:
  146. return smoothed
  147. # 遍历每个特征列
  148. for col in range(predictions.shape[1]):
  149. values = predictions[:, col]
  150. # 第一个值:加权前两个值(避免边缘过度平滑)
  151. smoothed[0, col] = (2 * values[0] + values[1]) / 3
  152. # 中间值:加权前后邻居(核心平滑)
  153. for i in range(1, n_timesteps - 1):
  154. smoothed[i, col] = (values[i-1] + 2 * values[i] + values[i+1]) / 4
  155. # 最后一个值:加权最后两个值(避免边缘过度平滑)
  156. smoothed[-1, col] = (values[-2] + 2 * values[-1]) / 3
  157. return smoothed
  158. def create_test_loader(self, df):
  159. """
  160. 构建测试数据加载器(将原始数据转换为模型输入格式)
  161. 输入:预处理后的DataFrame
  162. 输出:PyTorch DataLoader(批量加载模型输入)
  163. """
  164. df['date'] = pd.to_datetime(df['date'])
  165. # 计算时间间隔(根据分辨率,单位:分钟)
  166. time_interval = pd.Timedelta(minutes=(4 * self.resolution / 60))
  167. # 计算窗口时间跨度(确保能覆盖输入序列长度+预测步长)
  168. window_time_span = time_interval * (self.seq_len + 2)
  169. # 调整测试集起始时间(确保有足够的历史数据构建输入序列)
  170. adjusted_test_start = pd.to_datetime(self.test_start_date) - window_time_span
  171. # 筛选所需的历史数据
  172. test_df = df[df['date'] >= adjusted_test_start].reset_index(drop=True)
  173. test_df = test_df.drop(columns=['date'])
  174. # 构建监督学习数据集(输入序列+目标序列的占位)
  175. feature_columns = test_df.columns.tolist()
  176. cols = []
  177. # 构建输入序列(历史seq_len个时间步的特征)
  178. for col in feature_columns:
  179. for i in range(self.seq_len - 1, -1, -1):
  180. cols.append(test_df[[col]].shift(i)) # 滞后i步的特征(t-0到t-(seq_len-1))
  181. # 构建目标序列占位(未来output_size个时间步的标签,预测时不使用真实值)
  182. for i in range(1, self.output_size + 1):
  183. for col in feature_columns[-self.labels_num:]:
  184. cols.append(test_df[[col]].shift(-i)) # 超前i步的标签(t+1到t+output_size)
  185. # 合并列并按步长采样,最后取最后一行作为预测输入(最新的历史数据)
  186. dataset = pd.concat(cols, axis=1).iloc[::self.step_size]
  187. dataset = dataset.iloc[[-1]]
  188. # 提取输入特征(前n_features_total列)
  189. n_features_total = self.feature_num * self.seq_len
  190. supervised_data = dataset.iloc[:, :n_features_total]
  191. # 转换为模型输入格式:[样本数, 序列长度, 特征数]
  192. X = supervised_data.values.reshape(-1, self.seq_len, self.feature_num)
  193. X = torch.tensor(X, dtype=torch.float32).to(self.device)
  194. tensor_dataset = TensorDataset(X)
  195. loader = DataLoader(tensor_dataset, batch_size=self.batch_size, shuffle=False)
  196. return loader
  197. def load_data(self, df):
  198. """
  199. 数据加载主流程:重排列、下采样、日期处理、归一化、创建测试加载器
  200. 确保输入数据格式与训练时一致
  201. """
  202. df = self.reorder_columns(df)
  203. df = df.iloc[::self.resolution, :].reset_index(drop=True)
  204. df = self.process_date(df)
  205. df = self.scaler_data(df)
  206. self.test_loader = self.create_test_loader(df)
  207. self.edge_index = torch.load('edge_index.pt', weights_only=True)
  208. def load_model(self):
  209. """加载模型结构和预训练权重,并设置为评估模式"""
  210. self.model = GAT_LSTM(self).to(self.device)
  211. if self.edge_index is not None:
  212. self.model.set_edge_index(self.edge_index.to(self.device)) # 设置图边索引
  213. self.model.load_state_dict(torch.load(self.model_path, map_location=self.device, weights_only=True))
  214. self.model.eval()
  215. def predict(self, df):
  216. """
  217. 执行预测主流程:更新测试起始时间、加载数据、加载模型、执行预测、反归一化
  218. 输入:原始数据DataFrame
  219. 输出:反归一化后的预测结果(numpy数组)
  220. """
  221. # 更新测试起始时间为输入数据最新时间+4分钟(预测起始点)
  222. self.test_start_date = (pd.to_datetime(df['index']).max() + timedelta(minutes=4)).strftime("%Y-%m-%d %H:%M:%S")
  223. self.load_data(df)
  224. self.load_model()
  225. all_predictions = []
  226. with torch.no_grad():
  227. for batch in self.test_loader:
  228. inputs = batch[0].to(self.device)
  229. outputs = self.model(inputs)
  230. all_predictions.append(outputs.cpu().numpy())
  231. # 拼接所有批次的预测结果,并重塑为[时间步, 标签数]
  232. predictions = np.concatenate(all_predictions, axis=0).reshape(-1, self.labels_num)
  233. # 反归一化(仅对标签列,使用训练时的scaler参数)
  234. from sklearn.preprocessing import MinMaxScaler
  235. inverse_scaler = MinMaxScaler()
  236. inverse_scaler.min_ = self.scaler.min_[-self.labels_num:]
  237. inverse_scaler.scale_ = self.scaler.scale_[-self.labels_num:]
  238. predictions = inverse_scaler.inverse_transform(predictions)
  239. # 可选:异常值处理和平滑(当前注释掉,可根据需求启用)
  240. # predictions = self.remove_outliers(predictions) # 处理异常值
  241. # predictions = self.smooth_predictions(predictions) # 平滑处理
  242. return predictions
  243. def save_predictions(self, predictions):
  244. """
  245. 将预测结果保存为CSV文件,包含时间戳和各指标的预测值
  246. 输入:反归一化后的预测结果(numpy数组)
  247. """
  248. start_time = datetime.strptime(self.test_start_date, "%Y-%m-%d %H:%M:%S")
  249. time_interval = timedelta(minutes=(4 * self.resolution / 60))
  250. timestamps = [start_time + i * time_interval for i in range(len(predictions))]
  251. # 定义16个预测目标的原始列名
  252. base_columns = [
  253. 'C.M.UF1_DB@press_PV', 'C.M.UF2_DB@press_PV', 'C.M.UF3_DB@press_PV', 'C.M.UF4_DB@press_PV',
  254. 'C.M.RO1_DB@DPT_1', 'C.M.RO2_DB@DPT_1', 'C.M.RO3_DB@DPT_1', 'C.M.RO4_DB@DPT_1',
  255. 'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_2',
  256. 'RO1_CSFlow', 'RO2_CSFlow', 'RO3_CSFlow', 'RO4_CSFlow'
  257. ]
  258. pred_columns = [f'{col}_pred' for col in base_columns]
  259. df_result = pd.DataFrame(predictions, columns=pred_columns)
  260. df_result.insert(0, 'date', timestamps)
  261. df_result.to_csv(self.output_csv_path, index=False)
  262. print(f"预测结果保存至:{self.output_csv_path}")
  263. if __name__ == '__main__':
  264. """主函数:初始化预测器、加载数据、执行预测并保存结果"""
  265. predictor = Predictor()
  266. base_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))
  267. data_dir = os.path.join(base_dir, 'datasets_xishan')
  268. file_pattern = 'data_process_{}.csv'
  269. file_indices = range(46, 50)
  270. dfs = []
  271. for i in file_indices:
  272. file_path = os.path.join(data_dir, file_pattern.format(i))
  273. if not os.path.exists(file_path):
  274. raise FileNotFoundError(f"未找到文件: {file_path}")
  275. print(f"读取文件:{file_path}")
  276. df = pd.read_csv(file_path)
  277. dfs.append(df)
  278. df = pd.concat(dfs, ignore_index=True)
  279. predictions = predictor.predict(df)
  280. predictor.save_predictions(predictions)