| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- import os
- import torch
- import pandas as pd
- import numpy as np
- import joblib
- from datetime import datetime, timedelta
- from torch.utils.data import DataLoader, TensorDataset
- from .gat_lstm import GAT_LSTM
- from scipy.signal import savgol_filter
- from sklearn.preprocessing import MinMaxScaler
- from .data_mysql import get_sensor_data
- def set_seed(seed):
- import random
- random.seed(seed)
- os.environ['PYTHONHASHSEED'] = str(seed)
- np.random.seed(seed)
- torch.manual_seed(seed)
- torch.cuda.manual_seed(seed)
- torch.cuda.manual_seed_all(seed)
- torch.backends.cudnn.deterministic = True
- torch.backends.cudnn.benchmark = False
- class Predictor:
- """预测器类,封装了数据处理、模型加载、预测和结果保存的完整流程"""
- def __init__(self):
- # 模型和数据相关参数
- self.seq_len = 4320 # 输入序列长度
- self.output_size = 2160 # 预测输出长度
- self.labels_num = 8 # 预测目标特征数量
- self.feature_num = 16 # 输入特征总数量
- self.step_size = 2160 # 滑动窗口步长
- self.dropout = 0 # 模型dropout参数
- self.lr = 0.01 # 学习率
- self.hidden_size = 64 # LSTM隐藏层大小
- self.batch_size = 128 # 批处理大小
- self.num_layers = 1 # LSTM层数
- self.resolution = 60 # 数据时间分辨率(单位:秒)
- self.test_start_date = '2025-09-10' # 预测起始日期(动态更新)
- self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
- self.model_path = 'model.pth' # 模型权重路径(可外部修改)
- self.output_csv_path = 'predictions.csv' # 结果保存路径(可外部修改)
- self.random_seed = 1314 # 随机种子
- # 预测结果平滑参数
- self.smooth_window = 30
- self.ema_alpha = 0.1
- self.use_savitzky = True
- self.sg_window = 25
- self.sg_polyorder = 2
- # 初始化设置
- set_seed(self.random_seed)
- scaler_path = os.path.join(os.path.dirname(__file__), 'scaler.pkl')
- self.scaler = joblib.load(scaler_path) # 加载标准化器(确保文件存在)
- self.model = None
- self.edge_index = None
- self.test_loader = None
-
- def reorder_columns(self, df):
- """调整DataFrame列顺序以匹配模型训练时的特征顺序"""
- desired_order = [
- 'index', # 时间索引列
- 'C.M.RO1_FT_JS@out','C.M.RO2_FT_JS@out','C.M.RO3_FT_JS@out','C.M.RO4_FT_JS@out',
- 'C.M.RO_TT_ZJS@out','C.M.RO_Cond_ZJS@out',
- 'C.M.RO1_DB@DPT_1','C.M.RO1_DB@DPT_2',
- 'C.M.RO2_DB@DPT_1','C.M.RO2_DB@DPT_2',
- 'C.M.RO3_DB@DPT_1','C.M.RO3_DB@DPT_2',
- 'C.M.RO4_DB@DPT_1','C.M.RO4_DB@DPT_2',
- ]
- return df.loc[:, desired_order]
- def process_date(self, data):
- """处理日期特征,添加年周期正弦/余弦编码"""
- if 'index' in data.columns:
- data = data.rename(columns={'index': 'date'})
- data['date'] = pd.to_datetime(data['date'])
- data['day_of_year'] = data['date'].dt.dayofyear
- data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366)
- data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366)
- data.drop(columns=['day_of_year'], inplace=True)
-
- time_features = ['day_year_sin', 'day_year_cos']
- other_columns = [col for col in data.columns if col not in ['date'] + time_features]
- return data[['date'] + time_features + other_columns]
- def scaler_data(self, data):
- """使用预训练标准化器处理数据(保留date列)"""
- date_col = data[['date']]
- data_to_scale = data.drop(columns=['date'])
- scaled = self.scaler.transform(data_to_scale)
- scaled_df = pd.DataFrame(scaled, columns=data_to_scale.columns)
- return pd.concat([date_col.reset_index(drop=True), scaled_df], axis=1)
- def create_test_loader(self, df):
- """创建测试数据加载器,生成模型输入序列"""
- if 'date' in df.columns:
- test_data = df.drop(columns=['date']).values
- else:
- test_data = df.values
- X = test_data.reshape(-1, self.seq_len, self.feature_num)
- X = torch.tensor(X, dtype=torch.float32).to(self.device)
- tensor_dataset = TensorDataset(X)
- return DataLoader(tensor_dataset, batch_size=self.batch_size, shuffle=False)
-
- def load_data(self, df):
- """数据加载与预处理统一接口"""
- df = self.reorder_columns(df)
- # df = df.iloc[::self.resolution, :].reset_index(drop=True)
- df = self.process_date(df)
- df = self.scaler_data(df)
- self.test_loader = self.create_test_loader(df)
- def load_model(self):
- """加载预训练模型并设置为评估模式"""
- self.model = GAT_LSTM(self).to(self.device)
- model_path = os.path.join(os.path.dirname(__file__), 'model.pth' )
- self.model.load_state_dict(torch.load(model_path, map_location=self.device, weights_only=True))
- self.model.eval()
- def moving_average_smooth(self, data):
- """滑动平均平滑处理"""
- smoothed = []
- for i in range(data.shape[1]):
- feature = data[:, i]
- padded = np.pad(feature, (self.smooth_window//2, self.smooth_window//2), mode='edge')
- window = np.ones(self.smooth_window) / self.smooth_window
- smoothed_feature = np.convolve(padded, window, mode='valid')
- smoothed.append(smoothed_feature.reshape(-1, 1))
- return np.concatenate(smoothed, axis=1)
- def exponential_smooth(self, data):
- """指数移动平均平滑处理"""
- smoothed = []
- for i in range(data.shape[1]):
- feature = data[:, i]
- smoothed_feature = np.zeros_like(feature)
- smoothed_feature[0] = feature[0]
- for t in range(1, len(feature)):
- smoothed_feature[t] = self.ema_alpha * feature[t] + (1 - self.ema_alpha) * smoothed_feature[t-1]
- smoothed.append(smoothed_feature.reshape(-1, 1))
- return np.concatenate(smoothed, axis=1)
- def savitzky_golay_smooth(self, data):
- """Savitzky-Golay滤波平滑处理"""
- smoothed = []
- for i in range(data.shape[1]):
- feature = data[:, i]
- window = min(self.sg_window, len(feature) if len(feature) % 2 == 1 else len(feature)-1)
- if window < 3:
- smoothed.append(feature.reshape(-1, 1))
- continue
- smoothed_feature = savgol_filter(feature, window_length=window, polyorder=self.sg_polyorder)
- smoothed.append(smoothed_feature.reshape(-1, 1))
- return np.concatenate(smoothed, axis=1)
- def smooth_predictions(self, predictions):
- """组合多步平滑策略处理预测结果"""
- smoothed = self.moving_average_smooth(predictions)
- smoothed = self.exponential_smooth(smoothed)
- if self.use_savitzky and len(predictions) >= self.sg_window:
- smoothed = self.savitzky_golay_smooth(smoothed)
- return smoothed
- def predict(self, df):
- # 获取当前或者传入时间的历史数据(180天)
- # df = get_sensor_data(start_date=start_date)
- """核心预测接口:输入原始数据,返回处理后的预测结果"""
- self.test_start_date = (pd.to_datetime(df['index']).max() + timedelta(hours=3)).strftime("%Y-%m-%d %H:%M:%S")
- self.load_data(df)
- self.load_model()
- all_predictions = []
- with torch.no_grad():
- for batch in self.test_loader:
- inputs = batch[0].to(self.device)
- outputs = self.model(inputs)
- all_predictions.append(outputs.cpu().numpy())
- predictions = np.concatenate(all_predictions, axis=0).reshape(-1, self.labels_num)
-
- # 反标准化处理
- inverse_scaler = MinMaxScaler()
- inverse_scaler.min_ = self.scaler.min_[-self.labels_num:]
- inverse_scaler.scale_ = self.scaler.scale_[-self.labels_num:]
- predictions = inverse_scaler.inverse_transform(predictions)
- predictions = np.clip(predictions, 0, None)
-
- # 平滑处理
- predictions = self.smooth_predictions(predictions)
- self.test_start_date = (pd.to_datetime(df['index']).max() + timedelta(hours=1)).strftime("%Y-%m-%d %H:%M:%S")
- # 直接返回结果
- start_time = datetime.strptime(self.test_start_date, "%Y-%m-%d %H:%M:%S")
- time_interval = pd.Timedelta(hours=(self.resolution / 60))
- timestamps = [start_time + i * time_interval for i in range(len(predictions))]
- base_columns = [
- 'C.M.RO1_DB@DPT_1', 'C.M.RO2_DB@DPT_1', 'C.M.RO3_DB@DPT_1', 'C.M.RO4_DB@DPT_1',
- 'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_2',
- ]
- pred_columns = [f'{col}_pred' for col in base_columns]
- df_result = pd.DataFrame(predictions, columns=pred_columns)
- df_result.insert(0, 'date', timestamps)
- return df_result
- def save_predictions(self, predictions):
- """保存预测结果到CSV文件"""
- start_time = datetime.strptime(self.test_start_date, "%Y-%m-%d %H:%M:%S")
- time_interval = pd.Timedelta(hours=(self.resolution / 60))
- timestamps = [start_time + i * time_interval for i in range(len(predictions))]
- base_columns = [
- 'C.M.RO1_DB@DPT_1', 'C.M.RO2_DB@DPT_1', 'C.M.RO3_DB@DPT_1', 'C.M.RO4_DB@DPT_1',
- 'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_2',
- ]
- pred_columns = [f'{col}_pred' for col in base_columns]
- df_result = pd.DataFrame(predictions, columns=pred_columns)
- df_result.insert(0, 'date', timestamps)
- df_result.to_csv(self.output_csv_path, index=False)
- print(f"预测结果保存至:{self.output_csv_path}")
-
- if __name__ == '__main__':
- # 获取处理后的数据
- # 创建预测器实例并进行预测
- predictor = Predictor()
- predictions = predictor.predict(start_date =None)
- # 保存预测结果到 CSV 文件
- predictor.save_predictions(predictions)
|