| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- # predict.py
- import os
- import torch
- import joblib
- import pandas as pd
- import numpy as np
- from datetime import datetime, timedelta
- from gat_lstm import GAT_LSTM
- class RealTimePredictor:
- def __init__(self, model_path='model.pth', scaler_path='scaler.pkl', device=None):
- """
- 初始化预测器
- """
- # 1. 参数配置 (与训练 args.py 保持一致)
- self.seq_len = 10 # 输入序列长度
- self.feature_num = 32 # 输入特征数 (4时间编码 + 28业务特征)
- self.labels_num = 4 # 输出标签数
- self.hidden_size = 64
- self.num_layers = 1
- self.output_size = 5 # 预测未来 5 步
- self.dropout = 0
-
- # 2. 设备与资源加载
- self.device = device if device else torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
- self.model_path = model_path
- self.scaler_path = scaler_path
-
- # 加载归一化器
- if not os.path.exists(self.scaler_path):
- raise FileNotFoundError(f"未找到归一化文件: {self.scaler_path},请确保已完成训练。")
- self.scaler = joblib.load(self.scaler_path)
- # 加载模型
- self._load_model()
- # 定义必须存在的列名 (29个,包含index,顺序必须固定)
- self.required_columns = [
- "index",
- "water_out", # 外供水流量
- "ns=3;s=AI_ROJSLL_OUT", # 进水流量反馈
- "ns=3;s=AI_UFCSLL_OUT", # UF产水流量反馈
- "ns=3;s=RO_1DJSLL_SSD", # SSD_Flow_1djs
- "ns=3;s=RO_2DJSLL_SSD", # SSD_Flow_2djs
- "ns=3;s=RO_NS_SSD", # SSD_Flow_ns
- "ns=3;s=AI_JYCSLL1_OUT", # 产水流量计1反馈
- "ns=3;s=AI_RODJYL_OUT", # 段间压力反馈
- "ns=3;s=AI_ROJSYL_OUT", # 进水压力反馈
- "ns=3;s=AI_UFCSYL_OUT", # UF产水压力反馈
- "ns=3;s=AI_JYCIPPH_OUT", # CIPph反馈
- "ns=3;s=AI_JYCSDD_OUT", # 外供水电导反馈
- "ns=3;s=AI_UFCSZD_OUT", # UF产水浊度反馈
- "ns=3;s=AI_ROCSDD_OUT", # 产水电导反馈
- "ns=3;s=AI_UFJSORP_OUT", # UF进水ORP反馈
- "ns=3;s=AI_UFJSPH_OUT", # UF进水ph反馈
- "ns=3;s=AI_UFJSYW_OUT", # UF进水温度反馈
- "ns=3;s=AI_JYROCSYW_OUT", # 反渗透产水液位计反馈
- "ns=3;s=AI_JYSYW_OUT", # 酸液位反馈
- "ns=3;s=AI_RODJB_FR_OUT", # RO段间泵频率反馈
- "ns=3;s=AI_ROGSB_FR_OUT", # RO供水泵频率反馈
- "ns=3;s=AI_ROGYB_FR_OUT", # RO高压泵频率反馈
- "ns=3;s=AI_UFFXB_FR_OUT", # UF反洗泵频率反馈
- "ns=3;s=AI_UFCSB_FR_OUT", # UF产水泵频率反馈
- "ns=3;s=UF_TMP", # SSD跨膜压差
- "ns=3;s=RO_CHA1YL_SSD", # SSD_PressCha1
- "ns=3;s=RO_CHA2YL_SSD", # SSD_PressCha2
- "ns=3;s=RO_ZCS_SSD", # SSD_Flow_zcs
- ]
- # 用于防空值兜底机制的变量
- self.raw_input_data = None
- self.target_columns = self.required_columns[-self.labels_num:]
- def _load_model(self):
- """内部方法:加载模型权重"""
- class ModelArgs: pass
- args = ModelArgs()
- args.feature_num = self.feature_num
- args.hidden_size = self.hidden_size
- args.num_layers = self.num_layers
- args.output_size = self.output_size
- args.labels_num = self.labels_num
- args.dropout = self.dropout
- self.model = GAT_LSTM(args).to(self.device)
-
- # 加载edge_index.pt
- if os.path.exists('edge_index.pt'):
- edge_index = torch.load('edge_index.pt', map_location=self.device, weights_only=True)
- self.model.set_edge_index(edge_index)
-
- if not os.path.exists(self.model_path):
- raise FileNotFoundError(f"未找到模型权重文件: {self.model_path}")
-
- state_dict = torch.load(self.model_path, map_location=self.device, weights_only=True)
- self.model.load_state_dict(state_dict)
- self.model.eval()
- def _preprocess(self, df):
- """数据预处理:补全、排序、生成时间特征、整体归一化"""
- data = df.copy()
-
- # 1. 统一时间列名
- if 'datetime' in data.columns:
- data = data.rename(columns={'datetime': 'index'})
- if 'index' not in data.columns:
- data['index'] = pd.date_range(end=datetime.now(), periods=len(data), freq='min')
- data['index'] = pd.to_datetime(data['index'])
-
- # 2. 补全长度 (Padding)
- if len(data) < self.seq_len:
- pad_len = self.seq_len - len(data)
- first_row = data.iloc[0:1]
- pads = pd.concat([first_row] * pad_len, ignore_index=True)
- start_time = data['index'].iloc[0]
- for i in range(pad_len):
- pads.at[i, 'index'] = start_time - timedelta(minutes=(pad_len-i))
- data = pd.concat([pads, data], ignore_index=True)
- # 3. 列筛选排序 (提取业务数据,不含index)
- try:
- # required_columns[0] 是 'index',我们取后面的业务列
- business_cols = self.required_columns[1:]
- data_business = data[business_cols].copy()
-
- # 策略: 前向填充 -> 后向填充 -> 填充为0
- data_business = data_business.ffill().bfill().fillna(0.0)
-
- except KeyError:
- missing = list(set(self.required_columns) - set(data.columns))
- raise ValueError(f"缺少列: {missing}")
- # 4. 生成时间特征
- date_col = data['index']
- minute_of_day = date_col.dt.hour * 60 + date_col.dt.minute
- day_of_year = date_col.dt.dayofyear
-
- time_features = pd.DataFrame({
- 'minute_sin': np.sin(2 * np.pi * minute_of_day / 1440),
- 'minute_cos': np.cos(2 * np.pi * minute_of_day / 1440),
- 'day_year_sin': np.sin(2 * np.pi * day_of_year / 366),
- 'day_year_cos': np.cos(2 * np.pi * day_of_year / 366)
- })
-
- # 5. 拼接:[时间特征 + 业务特征]
- # 注意:训练时的顺序是 time_features + other_columns
- # 必须重置索引以避免拼接错位
- data_to_scale = pd.concat([
- time_features.reset_index(drop=True),
- data_business.reset_index(drop=True)
- ], axis=1)
-
- # 6. 整体归一化
- scaled_array = self.scaler.transform(data_to_scale)
-
- return scaled_array
- # --- 备用防空值兜底函数 ---
- def get_recent_values_as_fallback(self):
- """从原始输入数据中获取最近的output_size条记录作为备用输出,避免输出空值"""
- if self.raw_input_data is None or self.raw_input_data.empty:
- return np.zeros((self.output_size, self.labels_num))
- df_copy = self.raw_input_data.copy()
-
- # 统一时间列格式,防止报错
- if 'datetime' in df_copy.columns:
- df_copy = df_copy.rename(columns={'datetime': 'index'})
- if 'index' not in df_copy.columns:
- df_copy['index'] = pd.date_range(end=datetime.now(), periods=len(df_copy), freq='min')
- df_copy['index'] = pd.to_datetime(df_copy['index'])
- # 按时间排序并取最近的output_size条
- recent_data = df_copy.sort_values('index').tail(self.output_size)
-
- # 若数据不足,用最后一条补充
- if len(recent_data) < self.output_size:
- last_row = recent_data.iloc[-1:] if not recent_data.empty else pd.DataFrame(
- {col: [0.0] for col in self.target_columns}, index=[0])
- while len(recent_data) < self.output_size:
- recent_data = pd.concat([recent_data, last_row], ignore_index=True)
-
- # 确保提取的兜底数据中没有空值 (NaN)
- recent_data[self.target_columns] = recent_data[self.target_columns].ffill().bfill().fillna(0.0)
- # 提取目标列值并返回
- try:
- fallback_values = recent_data[self.target_columns].values
- except KeyError:
- # 极度异常情况兜底(输入中缺少目标列)
- fallback_values = np.zeros((self.output_size, self.labels_num))
-
- return fallback_values
- def predict(self, df):
- """
- 返回: List[List[float]]
- 格式: [[t+1时刻的4个值], [t+2时刻的4个值], ..., [t+5时刻的4个值]]
- """
- # --- 保存原始输入数据用于可能的降级策略 ---
- self.raw_input_data = df.copy()
-
- # 1. 预处理 (返回的是归一化后的 numpy 数组)
- processed_data = self._preprocess(df)
-
- # 2. 取最后 seq_len 个时间步构建 Tensor
- input_seq = processed_data[-self.seq_len:]
- input_tensor = torch.tensor(input_seq, dtype=torch.float32).unsqueeze(0).to(self.device)
-
- # 3. 推理
- with torch.no_grad():
- output = self.model(input_tensor)
-
- # 4. 反归一化
- # 输出形状调整为 (5, 4) -> 5个步长, 4个变量
- preds = output.cpu().numpy().reshape(self.output_size, self.labels_num)
-
- # 获取最后4列的归一化参数 (目标变量)
- target_min = self.scaler.min_[-self.labels_num:]
- target_scale = self.scaler.scale_[-self.labels_num:]
-
- real_preds = (preds - target_min) / target_scale
- real_preds = np.abs(real_preds)
-
- # --- 空值/NaN 检测与兜底机制 ---
- # 如果模型因极端情况输出 NaN 或者 inf 无穷大,触发历史数据兜底
- if np.isnan(real_preds).any() or np.isinf(real_preds).any():
- real_preds = self.get_recent_values_as_fallback()
-
- # 5. 返回纯数值列表
- return real_preds.tolist()
- if __name__ == "__main__":
- # 测试代码
- try:
- # 初始化
- predictor = RealTimePredictor()
-
- # 生成模拟数据
- mock_data = pd.DataFrame()
- mock_data['index'] = pd.date_range(end=datetime.now(), periods=15, freq='min')
- for col in predictor.required_columns[1:]:
- mock_data[col] = np.random.rand(15) * 10
-
- # 人为制造空值测试鲁棒性
- mock_data.loc[3:6, "water_out"] = np.nan
- mock_data.loc[12, predictor.target_columns[0]] = np.nan
-
- # 预测
- result = predictor.predict(mock_data)
-
- print("预测结果 (5x4 数组):")
- print(result)
-
- except Exception as e:
- print(f"Error: {e}")
- import traceback
- traceback.print_exc()
|