import os import torch import pandas as pd import numpy as np import joblib from datetime import datetime, timedelta from torch.utils.data import DataLoader, TensorDataset from .gat_lstm import GAT_LSTM from scipy.signal import savgol_filter from sklearn.preprocessing import MinMaxScaler from .data_mysql import get_sensor_data def set_seed(seed): import random random.seed(seed) os.environ['PYTHONHASHSEED'] = str(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False class Predictor: """预测器类,封装了数据处理、模型加载、预测和结果保存的完整流程""" def __init__(self): # 模型和数据相关参数 self.seq_len = 4320 # 输入序列长度 self.output_size = 2160 # 预测输出长度 self.labels_num = 8 # 预测目标特征数量 self.feature_num = 16 # 输入特征总数量 self.step_size = 2160 # 滑动窗口步长 self.dropout = 0 # 模型dropout参数 self.lr = 0.01 # 学习率 self.hidden_size = 64 # LSTM隐藏层大小 self.batch_size = 128 # 批处理大小 self.num_layers = 1 # LSTM层数 self.resolution = 60 # 数据时间分辨率(单位:秒) self.test_start_date = '2025-09-10' # 预测起始日期(动态更新) self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") self.model_path = 'model.pth' # 模型权重路径(可外部修改) self.output_csv_path = 'predictions.csv' # 结果保存路径(可外部修改) self.random_seed = 1314 # 随机种子 # 预测结果平滑参数 self.smooth_window = 30 self.ema_alpha = 0.1 self.use_savitzky = True self.sg_window = 25 self.sg_polyorder = 2 # 初始化设置 set_seed(self.random_seed) scaler_path = os.path.join(os.path.dirname(__file__), 'scaler.pkl') self.scaler = joblib.load(scaler_path) # 加载标准化器(确保文件存在) self.model = None self.edge_index = None self.test_loader = None def reorder_columns(self, df): """调整DataFrame列顺序以匹配模型训练时的特征顺序""" desired_order = [ 'index', # 时间索引列 'C.M.RO1_FT_JS@out','C.M.RO2_FT_JS@out','C.M.RO3_FT_JS@out','C.M.RO4_FT_JS@out', 'C.M.RO_TT_ZJS@out','C.M.RO_Cond_ZJS@out', 'C.M.RO1_DB@DPT_1','C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_1','C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_1','C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_1','C.M.RO4_DB@DPT_2', ] return df.loc[:, desired_order] def process_date(self, data): """处理日期特征,添加年周期正弦/余弦编码""" if 'index' in data.columns: data = data.rename(columns={'index': 'date'}) data['date'] = pd.to_datetime(data['date']) data['day_of_year'] = data['date'].dt.dayofyear data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366) data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366) data.drop(columns=['day_of_year'], inplace=True) time_features = ['day_year_sin', 'day_year_cos'] other_columns = [col for col in data.columns if col not in ['date'] + time_features] return data[['date'] + time_features + other_columns] def scaler_data(self, data): """使用预训练标准化器处理数据(保留date列)""" date_col = data[['date']] data_to_scale = data.drop(columns=['date']) scaled = self.scaler.transform(data_to_scale) scaled_df = pd.DataFrame(scaled, columns=data_to_scale.columns) return pd.concat([date_col.reset_index(drop=True), scaled_df], axis=1) def create_test_loader(self, df): """创建测试数据加载器,生成模型输入序列""" if 'date' in df.columns: test_data = df.drop(columns=['date']).values else: test_data = df.values X = test_data.reshape(-1, self.seq_len, self.feature_num) X = torch.tensor(X, dtype=torch.float32).to(self.device) tensor_dataset = TensorDataset(X) return DataLoader(tensor_dataset, batch_size=self.batch_size, shuffle=False) def load_data(self, df): """数据加载与预处理统一接口""" df = self.reorder_columns(df) # df = df.iloc[::self.resolution, :].reset_index(drop=True) df = self.process_date(df) df = self.scaler_data(df) self.test_loader = self.create_test_loader(df) def load_model(self): """加载预训练模型并设置为评估模式""" self.model = GAT_LSTM(self).to(self.device) model_path = os.path.join(os.path.dirname(__file__), 'model.pth' ) self.model.load_state_dict(torch.load(model_path, map_location=self.device, weights_only=True)) self.model.eval() def moving_average_smooth(self, data): """滑动平均平滑处理""" smoothed = [] for i in range(data.shape[1]): feature = data[:, i] padded = np.pad(feature, (self.smooth_window//2, self.smooth_window//2), mode='edge') window = np.ones(self.smooth_window) / self.smooth_window smoothed_feature = np.convolve(padded, window, mode='valid') smoothed.append(smoothed_feature.reshape(-1, 1)) return np.concatenate(smoothed, axis=1) def exponential_smooth(self, data): """指数移动平均平滑处理""" smoothed = [] for i in range(data.shape[1]): feature = data[:, i] smoothed_feature = np.zeros_like(feature) smoothed_feature[0] = feature[0] for t in range(1, len(feature)): smoothed_feature[t] = self.ema_alpha * feature[t] + (1 - self.ema_alpha) * smoothed_feature[t-1] smoothed.append(smoothed_feature.reshape(-1, 1)) return np.concatenate(smoothed, axis=1) def savitzky_golay_smooth(self, data): """Savitzky-Golay滤波平滑处理""" smoothed = [] for i in range(data.shape[1]): feature = data[:, i] window = min(self.sg_window, len(feature) if len(feature) % 2 == 1 else len(feature)-1) if window < 3: smoothed.append(feature.reshape(-1, 1)) continue smoothed_feature = savgol_filter(feature, window_length=window, polyorder=self.sg_polyorder) smoothed.append(smoothed_feature.reshape(-1, 1)) return np.concatenate(smoothed, axis=1) def smooth_predictions(self, predictions): """组合多步平滑策略处理预测结果""" smoothed = self.moving_average_smooth(predictions) smoothed = self.exponential_smooth(smoothed) if self.use_savitzky and len(predictions) >= self.sg_window: smoothed = self.savitzky_golay_smooth(smoothed) return smoothed def predict(self, df): # 获取当前或者传入时间的历史数据(180天) # df = get_sensor_data(start_date=start_date) """核心预测接口:输入原始数据,返回处理后的预测结果""" self.test_start_date = (pd.to_datetime(df['index']).max() + timedelta(hours=3)).strftime("%Y-%m-%d %H:%M:%S") self.load_data(df) self.load_model() all_predictions = [] with torch.no_grad(): for batch in self.test_loader: inputs = batch[0].to(self.device) outputs = self.model(inputs) all_predictions.append(outputs.cpu().numpy()) if not all_predictions: print("警告:模型未产生任何预测!") predictions = np.zeros((self.output_size, self.labels_num)) else: predictions = np.concatenate(all_predictions, axis=0).reshape(-1, self.labels_num) # 诊断信息 print(f"预测数据形状: {predictions.shape}") print(f"预测数据范围: [{np.nanmin(predictions):.4f}, {np.nanmax(predictions):.4f}]") print(f"NaN数量: {np.isnan(predictions).sum()}") # 反标准化处理 inverse_scaler = MinMaxScaler() inverse_scaler.min_ = self.scaler.min_[-self.labels_num:] inverse_scaler.scale_ = self.scaler.scale_[-self.labels_num:] predictions = inverse_scaler.inverse_transform(predictions) predictions = np.clip(predictions, 0, None) # 诊断信息(反标准化后) print(f"反标准化后范围: [{np.nanmin(predictions):.4f}, {np.nanmax(predictions):.4f}]") print(f"反标准化后NaN数量: {np.isnan(predictions).sum()}") # 平滑处理 predictions = self.smooth_predictions(predictions) self.test_start_date = (pd.to_datetime(df['index']).max() + timedelta(hours=1)).strftime("%Y-%m-%d %H:%M:%S") # 直接返回结果 start_time = datetime.strptime(self.test_start_date, "%Y-%m-%d %H:%M:%S") time_interval = pd.Timedelta(hours=(self.resolution / 60)) timestamps = [start_time + i * time_interval for i in range(len(predictions))] base_columns = [ 'C.M.RO1_DB@DPT_1', 'C.M.RO2_DB@DPT_1', 'C.M.RO3_DB@DPT_1', 'C.M.RO4_DB@DPT_1', 'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_2', ] pred_columns = [f'{col}_pred' for col in base_columns] df_result = pd.DataFrame(predictions, columns=pred_columns) df_result.insert(0, 'date', timestamps) return df_result def save_predictions(self, predictions): """保存预测结果到CSV文件""" start_time = datetime.strptime(self.test_start_date, "%Y-%m-%d %H:%M:%S") time_interval = pd.Timedelta(hours=(self.resolution / 60)) timestamps = [start_time + i * time_interval for i in range(len(predictions))] base_columns = [ 'C.M.RO1_DB@DPT_1', 'C.M.RO2_DB@DPT_1', 'C.M.RO3_DB@DPT_1', 'C.M.RO4_DB@DPT_1', 'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_2', ] pred_columns = [f'{col}_pred' for col in base_columns] df_result = pd.DataFrame(predictions, columns=pred_columns) df_result.insert(0, 'date', timestamps) df_result.to_csv(self.output_csv_path, index=False) print(f"预测结果保存至:{self.output_csv_path}") if __name__ == '__main__': # 获取处理后的数据 # 创建预测器实例并进行预测 predictor = Predictor() predictions = predictor.predict(start_date =None) # 保存预测结果到 CSV 文件 predictor.save_predictions(predictions)