20min_predict.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. import os
  2. import torch
  3. import pandas as pd
  4. import numpy as np
  5. import joblib
  6. import pywt
  7. from datetime import datetime, timedelta
  8. from torch.utils.data import DataLoader, TensorDataset
  9. from gat_lstm import GAT_LSTM # 导入自定义的GAT-LSTM模型
  10. from tqdm import tqdm
  11. def set_seed(seed):
  12. """设置随机种子,保证实验可重复性"""
  13. import random
  14. random.seed(seed)
  15. os.environ['PYTHONHASHSEED'] = str(seed)
  16. np.random.seed(seed)
  17. torch.manual_seed(seed)
  18. torch.cuda.manual_seed(seed)
  19. torch.cuda.manual_seed_all(seed)
  20. torch.backends.cudnn.deterministic = True
  21. torch.backends.cudnn.benchmark = False
  22. class Predictor:
  23. """预测器类,用于加载数据、模型并执行预测流程"""
  24. def __init__(self):
  25. self.seq_len = 10 # 输入序列长度(历史时间步)
  26. self.output_size = 5 # 预测步长(未来预测的时间步数)
  27. self.labels_num = 16 # 预测目标数量(16个待预测的指标)
  28. self.feature_num = 79 # 输入特征总维度
  29. self.step_size = 5 # 数据采样步长(每隔step_size取一个样本)
  30. self.dropout = 0 # dropout概率(防止过拟合)
  31. self.lr = 0.01 # 学习率(训练时使用,预测时仅作参数记录)
  32. self.num_heads = 8 # 注意力头数(模型结构参数)
  33. self.hidden_size = 64 # 隐藏层维度
  34. self.batch_size = 512 # 批处理大小
  35. self.num_layers = 1 # LSTM层数
  36. self.resolution = 60 # 数据分辨率(原始数据每隔60条取一条,下采样)
  37. self.test_start_date = '2025-07-01' # 测试集起始日期(初始值,会动态更新)
  38. self.wavelet = 'db4' # 小波变换类型(预留,未实际使用)
  39. self.level = 3 # 小波分解层数(预留)
  40. self.level_after = 4 # 后续小波处理层数(预留)
  41. self.mode = 'soft' # 小波阈值模式(预留)
  42. self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # 计算设备(GPU优先)
  43. self.model_path = '20min_model.pth' # 模型权重保存路径
  44. self.output_csv_path = '20min_predictions.csv' # 预测结果保存路径
  45. self.random_seed = 1314 # 随机种子
  46. self.min_rows = 600 # 定义最小数据行数要求(600行)
  47. self.uf_threshold = 0.001 # UF指标阈值(预留)
  48. self.ro_threshold = 0.01 # RO指标阈值(预留)
  49. self.flow_threshold = 1.0 # 流量阈值(预留)
  50. # 定义16个预测目标的原始列名
  51. self.target_columns = [
  52. 'C.M.UF1_DB@press_PV', 'C.M.UF2_DB@press_PV', 'C.M.UF3_DB@press_PV', 'C.M.UF4_DB@press_PV',
  53. 'UF1Per','UF2Per','UF3Per','UF4Per',
  54. 'C.M.RO1_DB@DPT_1', 'C.M.RO2_DB@DPT_1', 'C.M.RO3_DB@DPT_1', 'C.M.RO4_DB@DPT_1',
  55. 'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_2',
  56. ]
  57. self.raw_input_data = None
  58. set_seed(self.random_seed) # 初始化随机种子
  59. self.scaler = joblib.load('20min_scaler.pkl') # 加载数据归一化器(训练时保存)
  60. self.model = None # 模型实例(后续加载)
  61. self.edge_index = None # 图结构边索引(图模型用)
  62. self.test_loader = None # 测试数据加载器(后续创建)
  63. def ensure_min_rows(self, df):
  64. """
  65. 确保数据至少有600行,不足则进行前后补充
  66. 向前补充:使用最早的数据向前扩展
  67. 向后补充:使用最新的数据向后扩展
  68. """
  69. current_rows = len(df)
  70. if current_rows >= self.min_rows:
  71. return df
  72. # 计算需要补充的行数
  73. need_rows = self.min_rows - current_rows
  74. print(f"数据行数不足{self.min_rows}行(当前{current_rows}行),需要补充{need_rows}行")
  75. # 计算时间间隔(假设数据是均匀采样的)
  76. time_col = 'index'
  77. df[time_col] = pd.to_datetime(df[time_col])
  78. time_diff = (df[time_col].iloc[1] - df[time_col].iloc[0]).total_seconds()
  79. # 向前补充(使用最早的数据)
  80. forward_rows = need_rows // 2
  81. if forward_rows > 0:
  82. earliest_data = df.iloc[0:1].copy()
  83. forward_data = []
  84. for i in range(1, forward_rows + 1):
  85. new_row = earliest_data.copy()
  86. new_row[time_col] = earliest_data[time_col] - timedelta(seconds=time_diff * i)
  87. forward_data.append(new_row)
  88. forward_df = pd.concat(forward_data, ignore_index=True)
  89. df = pd.concat([forward_df, df], ignore_index=True)
  90. # 检查是否还需要向后补充
  91. current_rows = len(df)
  92. if current_rows < self.min_rows:
  93. backward_rows = self.min_rows - current_rows
  94. latest_data = df.iloc[-1:].copy()
  95. backward_data = []
  96. for i in range(1, backward_rows + 1):
  97. new_row = latest_data.copy()
  98. new_row[time_col] = latest_data[time_col] + timedelta(seconds=time_diff * i)
  99. backward_data.append(new_row)
  100. backward_df = pd.concat(backward_data, ignore_index=True)
  101. df = pd.concat([df, backward_df], ignore_index=True)
  102. print(f"数据补充完成,当前行数:{len(df)}行")
  103. return df
  104. def reorder_columns(self, df):
  105. """
  106. 调整数据列顺序,确保与训练时的特征顺序一致
  107. 避免因列顺序不一致导致模型输入特征错位
  108. """
  109. desired_order = [
  110. 'index',
  111. 'C.M.FT_ZGJJY1@out','C.M.RO1_FT_JS@out','C.M.RO2_FT_JS@out','C.M.RO3_FT_JS@out',
  112. 'C.M.RO4_FT_JS@out','C.M.UF1_FT_JS@out','C.M.UF2_FT_JS@out','C.M.UF3_FT_JS@out',
  113. 'C.M.UF4_FT_JS@out','C.M.UF_FT_ZCS@out','C.M.FT_ZGJJY2@out','C.M.FT_ZGJJY3@out',
  114. 'C.M.FT_ZGJJY4@out','C.M.RO1_PT_JS@out','C.M.RO2_PT_JS@out','C.M.RO3_PT_JS@out',
  115. 'C.M.UF1_PT_JS@out','C.M.UF2_PT_JS@out','C.M.UF3_PT_JS@out','C.M.UF4_PT_JS@out',
  116. 'C.M.LT_JSC@out','C.M.RO1_PT_CS@out','C.M.RO1_PT_DJ2@out','C.M.RO2_PT_CS@out',
  117. 'C.M.RO2_PT_DJ2@out','C.M.RO3_PT_CS@out','C.M.RO3_PT_DJ2@out','C.M.RO4_PT_CS@out',
  118. 'C.M.RO4_PT_DJ2@out','C.M.RO4_PT_JS@out','C.M.LT_HCl@out','C.M.LT_NaClO@out',
  119. 'C.M.LT_PAC@out','C.M.LT_QSC@out','C.M.RO_Cond_ZCS@out','C.M.RO_TT_ZJS@out',
  120. 'C.M.UF1_JSF_kd@out','C.M.UF2_JSF_kd@out','C.M.UF_GSB4_fre@out','C.M.UF_ORP_ZCS@out',
  121. 'C.M.JYB2_ZGJ1_fre@out','C.M.JYB2_ZGJ2_fre@out','C.M.JYB2_ZGJ3_fre@out','C.M.JYB2_ZGJ4_fre@out',
  122. 'C.M.RO1_GYB_fre@out','C.M.RO2_GYB_fre@out','C.M.RO3_GYB_fre@out','C.M.RO4_GYB_fre@out',
  123. 'C.M.UF3_JSF_kd@out','C.M.UF4_JSF_kd@out','C.M.UF_FXB2_fre@out','C.M.RO1_DJB_fre@out',
  124. 'C.M.RO1_GYBF_kd@out','C.M.RO2_DJB_fre@out','C.M.RO2_GYBF_kd@out','C.M.RO3_DJB_fre@out',
  125. 'C.M.RO3_GYBF_kd@out','C.M.RO4_DJB_fre@out','C.M.RO4_GYBF_kd@out',
  126. 'C.M.UF1_DB@press_PV','C.M.UF2_DB@press_PV','C.M.UF3_DB@press_PV','C.M.UF4_DB@press_PV',
  127. 'UF1Per','UF2Per','UF3Per','UF4Per',
  128. 'C.M.RO1_DB@DPT_1','C.M.RO2_DB@DPT_1','C.M.RO3_DB@DPT_1','C.M.RO4_DB@DPT_1',
  129. 'C.M.RO1_DB@DPT_2','C.M.RO2_DB@DPT_2','C.M.RO3_DB@DPT_2','C.M.RO4_DB@DPT_2',
  130. ]
  131. return df.loc[:, desired_order]
  132. def process_date(self, data):
  133. """
  134. 处理日期列,生成周期性时间特征(捕捉时间周期性模式)
  135. 包括:分钟级正弦/余弦特征(每日周期)、年中日正弦/余弦特征(年度周期)
  136. """
  137. if 'index' in data.columns:
  138. data = data.rename(columns={'index': 'date'})
  139. data['date'] = pd.to_datetime(data['date'])
  140. data['minute_of_day'] = data['date'].dt.hour * 60 + data['date'].dt.minute
  141. data['day_of_year'] = data['date'].dt.dayofyear
  142. # 周期性编码(将时间转换为正弦/余弦值,确保周期性连续)
  143. data['minute_sin'] = np.sin(2 * np.pi * data['minute_of_day'] / 1440) # 分钟正弦特征
  144. data['minute_cos'] = np.cos(2 * np.pi * data['minute_of_day'] / 1440) # 分钟余弦特征
  145. data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366) # 年中日正弦特征
  146. data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366) # 年中日余弦特征
  147. # 移除原始时间列(仅保留编码后的特征)
  148. data.drop(columns=['minute_of_day', 'day_of_year'], inplace=True)
  149. # 调整列顺序:日期 + 时间特征 + 其他特征
  150. time_features = ['minute_sin', 'minute_cos', 'day_year_sin', 'day_year_cos']
  151. other_columns = [col for col in data.columns if col not in ['date'] + time_features]
  152. return data[['date'] + time_features + other_columns]
  153. def scaler_data(self, data):
  154. """
  155. 对数据进行归一化(使用训练时保存的scaler)
  156. 保持与训练数据的归一化方式一致(0-1缩放)
  157. """
  158. date_col = data[['date']]
  159. data_to_scale = data.drop(columns=['date'])
  160. scaled = self.scaler.transform(data_to_scale)
  161. scaled_df = pd.DataFrame(scaled, columns=data_to_scale.columns)
  162. # 拼接日期列和归一化后的特征列
  163. return pd.concat([date_col.reset_index(drop=True), scaled_df], axis=1)
  164. def remove_outliers(self, predictions):
  165. """
  166. 用四分位法处理预测结果中的异常值
  167. 异常值定义:小于Q1-1.5*IQR或大于Q3+1.5*IQR的值
  168. 异常值替换为正常值的平均值(避免极端值影响)
  169. """
  170. cleaned = predictions.copy()
  171. # 遍历每个特征列(16个标签)
  172. for col in range(cleaned.shape[1]):
  173. values = cleaned[:, col]
  174. # 计算四分位数
  175. q1 = np.percentile(values, 25)
  176. q3 = np.percentile(values, 75)
  177. iqr = q3 - q1
  178. # 异常值边界
  179. lower_bound = q1 - 1.5 * iqr
  180. upper_bound = q3 + 1.5 * iqr
  181. # 筛选正常值
  182. normal_values = values[(values >= lower_bound) & (values <= upper_bound)]
  183. # 用正常值的平均值替换异常值
  184. if len(normal_values) > 0:
  185. mean_normal = np.mean(normal_values)
  186. cleaned[(values < lower_bound) | (values > upper_bound), col] = mean_normal
  187. return cleaned
  188. def smooth_predictions(self, predictions):
  189. """
  190. 对预测结果进行加权平滑处理,减少预测波动
  191. 采用滑动窗口加权平均:中间值权重为2,前后邻居权重为1(边缘值特殊处理)
  192. """
  193. smoothed = predictions.copy()
  194. n_timesteps = predictions.shape[0]
  195. if n_timesteps <= 1:
  196. return smoothed
  197. # 遍历每个特征列
  198. for col in range(predictions.shape[1]):
  199. values = predictions[:, col]
  200. # 第一个值:加权前两个值(避免边缘过度平滑)
  201. smoothed[0, col] = (2 * values[0] + values[1]) / 3
  202. # 中间值:加权前后邻居(核心平滑)
  203. for i in range(1, n_timesteps - 1):
  204. smoothed[i, col] = (values[i-1] + 2 * values[i] + values[i+1]) / 4
  205. # 最后一个值:加权最后两个值(避免边缘过度平滑)
  206. smoothed[-1, col] = (values[-2] + 2 * values[-1]) / 3
  207. return smoothed
  208. def create_test_loader(self, df):
  209. """
  210. 构建测试数据加载器(将原始数据转换为模型输入格式)
  211. 输入:预处理后的DataFrame
  212. 输出:PyTorch DataLoader(批量加载模型输入)
  213. """
  214. df['date'] = pd.to_datetime(df['date'])
  215. # 计算时间间隔(根据分辨率,单位:分钟)
  216. time_interval = pd.Timedelta(minutes=(4 * self.resolution / 60))
  217. # 计算窗口时间跨度(确保能覆盖输入序列长度+预测步长)
  218. window_time_span = time_interval * (self.seq_len + 20)
  219. # 调整测试集起始时间(确保有足够的历史数据构建输入序列)
  220. adjusted_test_start = pd.to_datetime(self.test_start_date) - window_time_span
  221. # 筛选所需的历史数据
  222. test_df = df[df['date'] >= adjusted_test_start].reset_index(drop=True)
  223. test_df = test_df.drop(columns=['date'])
  224. # 构建监督学习数据集(输入序列+目标序列的占位)
  225. feature_columns = test_df.columns.tolist()
  226. cols = []
  227. # 构建输入序列(历史seq_len个时间步的特征)
  228. for col in feature_columns:
  229. for i in range(self.seq_len - 1, -1, -1):
  230. cols.append(test_df[[col]].shift(i)) # 滞后i步的特征(t-0到t-(seq_len-1))
  231. # 构建目标序列占位(未来output_size个时间步的标签,预测时不使用真实值)
  232. for i in range(1, self.output_size + 1):
  233. for col in feature_columns[-self.labels_num:]:
  234. cols.append(test_df[[col]].shift(-i)) # 超前i步的标签(t+1到t+output_size)
  235. # 合并列并按步长采样,最后取最后一行作为预测输入(最新的历史数据)
  236. dataset = pd.concat(cols, axis=1).iloc[::self.step_size]
  237. dataset = dataset.iloc[[-1]]
  238. # 提取输入特征(前n_features_total列)
  239. n_features_total = self.feature_num * self.seq_len
  240. supervised_data = dataset.iloc[:, :n_features_total]
  241. # 转换为模型输入格式:[样本数, 序列长度, 特征数]
  242. X = supervised_data.values.reshape(-1, self.seq_len, self.feature_num)
  243. X = torch.tensor(X, dtype=torch.float32).to(self.device)
  244. tensor_dataset = TensorDataset(X)
  245. loader = DataLoader(tensor_dataset, batch_size=self.batch_size, shuffle=False)
  246. return loader
  247. def load_data(self, df):
  248. """
  249. 数据加载主流程:重排列、下采样、日期处理、归一化、创建测试加载器
  250. 确保输入数据格式与训练时一致
  251. """
  252. df = self.reorder_columns(df)
  253. df = df.iloc[::self.resolution, :].reset_index(drop=True)
  254. df = self.process_date(df)
  255. df = self.scaler_data(df)
  256. self.test_loader = self.create_test_loader(df)
  257. self.edge_index = torch.load('edge_index.pt', map_location=self.device, weights_only=True)
  258. def load_model(self):
  259. """加载模型结构和预训练权重,并设置为评估模式"""
  260. self.model = GAT_LSTM(self).to(self.device)
  261. if self.edge_index is not None:
  262. self.model.set_edge_index(self.edge_index.to(self.device)) # 设置图边索引
  263. self.model.load_state_dict(torch.load(self.model_path, map_location=self.device, weights_only=True))
  264. self.model.eval()
  265. def get_recent_values_as_fallback(self):
  266. """从原始输入数据中获取最近的output_size条记录作为备用输出"""
  267. # 确保原始数据已保存
  268. if self.raw_input_data is None:
  269. raise ValueError("原始输入数据未保存,无法获取备用值")
  270. # 按时间排序并取最近的output_size条
  271. recent_data = self.raw_input_data.sort_values('index').tail(self.output_size)
  272. # 若数据不足,用最后一条补充
  273. if len(recent_data) < self.output_size:
  274. last_row = recent_data.iloc[-1:] if not recent_data.empty else pd.DataFrame(
  275. {col: [0.0] for col in self.target_columns}, index=[0])
  276. while len(recent_data) < self.output_size:
  277. recent_data = pd.concat([recent_data, last_row], ignore_index=True)
  278. # 提取目标列值并返回
  279. fallback_values = recent_data[self.target_columns].values
  280. return fallback_values
  281. def predict(self, df):
  282. """
  283. 执行预测主流程:更新测试起始时间、加载数据、加载模型、执行预测、反归一化
  284. 输入:原始数据DataFrame
  285. 输出:反归一化后的预测结果(numpy数组)
  286. """
  287. # 保存原始输入数据用于可能的降级策略
  288. self.raw_input_data = df.copy()
  289. # 确保数据行数不少于600行
  290. df = self.ensure_min_rows(df)
  291. # 更新测试起始时间为输入数据最新时间+4分钟(预测起始点)
  292. self.test_start_date = (pd.to_datetime(df['index']).max() + timedelta(minutes=4)).strftime("%Y-%m-%d %H:%M:%S")
  293. self.load_data(df)
  294. self.load_model()
  295. all_predictions = []
  296. with torch.no_grad():
  297. for batch in self.test_loader:
  298. inputs = batch[0].to(self.device)
  299. outputs = self.model(inputs)
  300. all_predictions.append(outputs.cpu().numpy())
  301. # 拼接所有批次的预测结果,并重塑为[时间步, 标签数]
  302. predictions = np.concatenate(all_predictions, axis=0).reshape(-1, self.labels_num)
  303. # 反归一化(仅对标签列,使用训练时的scaler参数)
  304. from sklearn.preprocessing import MinMaxScaler
  305. inverse_scaler = MinMaxScaler()
  306. inverse_scaler.min_ = self.scaler.min_[-self.labels_num:]
  307. inverse_scaler.scale_ = self.scaler.scale_[-self.labels_num:]
  308. predictions = inverse_scaler.inverse_transform(predictions)
  309. # 可选:异常值处理和平滑(当前注释掉,可根据需求启用)
  310. # predictions = self.remove_outliers(predictions) # 处理异常值
  311. # predictions = self.smooth_predictions(predictions) # 平滑处理
  312. if np.isnan(predictions).any():
  313. # 用备用值替换
  314. predictions = self.get_recent_values_as_fallback()
  315. return predictions
  316. def save_predictions(self, predictions):
  317. """
  318. 将预测结果保存为CSV文件,包含时间戳和各指标的预测值
  319. 输入:反归一化后的预测结果(numpy数组)
  320. """
  321. start_time = datetime.strptime(self.test_start_date, "%Y-%m-%d %H:%M:%S")
  322. time_interval = timedelta(minutes=(4 * self.resolution / 60))
  323. timestamps = [start_time + i * time_interval for i in range(len(predictions))]
  324. pred_columns = [f'{col}_pred' for col in self.target_columns]
  325. df_result = pd.DataFrame(predictions, columns=pred_columns)
  326. df_result.insert(0, 'date', timestamps)
  327. df_result.to_csv(self.output_csv_path, index=False)
  328. print(f"预测结果保存至:{self.output_csv_path}")
  329. if __name__ == '__main__':
  330. """主函数:初始化预测器、加载数据、执行预测并保存结果"""
  331. import json # 用于解析JSON结构
  332. import os
  333. import pandas as pd
  334. from datetime import timedelta
  335. predictor = Predictor()
  336. # 读取JSON文件作为输入数据
  337. json_file_path = 'pp.json' # pp.json文件路径,可根据实际位置修改
  338. if not os.path.exists(json_file_path):
  339. raise FileNotFoundError(f"未找到文件: {json_file_path}")
  340. print(f"读取文件:{json_file_path}")
  341. # 解析JSON并提取data字段(不使用try,直接判断)
  342. with open(json_file_path, 'r', encoding='utf-8') as f:
  343. json_data = json.load(f)
  344. # 检查data字段存在性及格式
  345. if 'data' not in json_data:
  346. raise ValueError("JSON文件中未找到'data'字段,请检查结构")
  347. data_list = json_data['data']
  348. if not isinstance(data_list, list) or len(data_list) == 0:
  349. raise ValueError("'data'字段必须是非空列表")
  350. # 转换为DataFrame
  351. df = pd.DataFrame(data_list)
  352. # 检查并处理datetime列
  353. if 'datetime' not in df.columns:
  354. raise ValueError("数据中未找到'datetime'字段,请检查键名")
  355. df = df.rename(columns={'datetime': 'index'})
  356. # 转换index列为datetime格式
  357. df['index'] = pd.to_datetime(df['index']) # 若格式错误会直接抛出异常
  358. # 执行预测并保存结果
  359. predictions = predictor.predict(df)
  360. # predictor.save_predictions(predictions)