| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630 |
- """
- 20分钟TMP预测模型
- 版本:1.0
- 最后更新:2025-10-28
- """
- import os
- import sys
- import torch
- import pandas as pd
- import numpy as np
- import joblib
- import pywt
- from datetime import datetime, timedelta
- from torch.utils.data import DataLoader, TensorDataset
- from tqdm import tqdm
- # 添加父目录到系统路径以导入shared模块
- current_dir = os.path.dirname(os.path.abspath(__file__))
- parent_dir = os.path.dirname(current_dir)
- if parent_dir not in sys.path:
- sys.path.insert(0, parent_dir)
- # 从shared目录导入GAT-LSTM模型
- sys.path.insert(0, os.path.join(parent_dir, 'shared'))
- from gat_lstm import GAT_LSTM
- # 尝试导入common模块,如果失败则使用标准库
- try:
- project_root = os.path.abspath(os.path.join(parent_dir, '../..'))
- if project_root not in sys.path:
- sys.path.insert(0, project_root)
- from common.utils.logger import setup_logger, log_execution_time
- from common.utils.config import Config
- except ImportError:
- # 使用标准库作为fallback
- import logging
- import yaml
- from functools import wraps
- import time
-
- def setup_logger(name, level='INFO', log_file=None, format_type='colored', max_bytes=10485760, backup_count=5):
- """logger设置"""
- logger = logging.getLogger(name)
- logger.setLevel(getattr(logging, level))
-
- # 避免重复添加handler
- if logger.handlers:
- return logger
-
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-
- # 控制台处理器
- console_handler = logging.StreamHandler()
- console_handler.setFormatter(formatter)
- logger.addHandler(console_handler)
-
- # 文件处理器
- if log_file:
- from logging.handlers import RotatingFileHandler
- file_handler = RotatingFileHandler(log_file, maxBytes=max_bytes, backupCount=backup_count)
- file_handler.setFormatter(formatter)
- logger.addHandler(file_handler)
-
- # 防止日志传播到root logger
- logger.propagate = False
-
- return logger
-
- def log_execution_time(func):
- """简化版执行时间装饰器"""
- @wraps(func)
- def wrapper(*args, **kwargs):
- start_time = time.time()
- result = func(*args, **kwargs)
- end_time = time.time()
- if hasattr(args[0], 'logger'):
- args[0].logger.info(f"{func.__name__} 执行时间: {end_time - start_time:.2f}秒")
- return result
- return wrapper
-
- class Config:
- """配置类"""
- def __init__(self, config_file):
- with open(config_file, 'r', encoding='utf-8') as f:
- self.config = yaml.safe_load(f)
-
- def get(self, key, default=None):
- keys = key.split('.')
- value = self.config
- for k in keys:
- if isinstance(value, dict):
- value = value.get(k)
- else:
- return default
- if value is None:
- return default
- return value
- def set_seed(seed):
- """
- 设置全局随机种子,保证实验可重复性
-
- Args:
- seed: 随机种子值
-
- Note:
- - 设置Python、NumPy、PyTorch的随机种子
- - 确保CUDA操作的确定性
- - 关闭CUDA的性能优化(以确保可重复性)
- """
- import random
- random.seed(seed) # Python随机数生成器
- os.environ['PYTHONHASHSEED'] = str(seed) # Python哈希种子
- np.random.seed(seed) # NumPy随机数生成器
- torch.manual_seed(seed) # PyTorch CPU随机数生成器
- torch.cuda.manual_seed(seed) # 当前GPU随机数生成器
- torch.cuda.manual_seed_all(seed) # 所有GPU随机数生成器
- torch.backends.cudnn.deterministic = True # 确保CUDA操作确定性
- torch.backends.cudnn.benchmark = False # 关闭CUDA性能优化
- class Predictor:
- """
- TMP预测器类
-
- 功能:
- - 加载并预处理输入数据
- - 加载训练好的GAT-LSTM模型
- - 执行预测并保存结果
-
- 使用示例:
- predictor = Predictor()
- predictions = predictor.predict(df)
- result_df = predictor.save_predictions(predictions, start_date)
- """
-
- def __init__(self, config_path='../config.yaml'):
- """
- 初始化预测器
-
- Args:
- config_path: 配置文件路径,相对于gat-lstm_model根目录
-
- Raises:
- FileNotFoundError: 配置文件或模型文件不存在
-
- Note:
- - 从配置文件加载所有参数
- - 自动检测并使用GPU(如果可用)
- - 加载训练时保存的数据归一化器
- """
- # 加载配置文件(指向父目录的config.yaml)
- current_dir = os.path.dirname(__file__)
- parent_dir = os.path.dirname(current_dir)
- config_file = os.path.join(parent_dir, 'config.yaml')
- self.config = Config(config_file)
-
- # 设置日志目录(在gat-lstm_model根目录的logs下)
- log_dir = os.path.join(parent_dir, 'logs')
- os.makedirs(log_dir, exist_ok=True)
-
- log_file = os.path.join(log_dir, '20min_predict.log')
- self.logger = setup_logger(
- name='20min_predict',
- level=self.config.get('logging.level', 'INFO'),
- log_file=log_file,
- format_type=self.config.get('logging.format', 'colored'),
- max_bytes=self.config.get('logging.max_bytes', 10*1024*1024),
- backup_count=self.config.get('logging.backup_count', 5)
- )
-
- self.logger.info("初始化20分钟TMP预测器")
-
- # 模型参数(从配置文件加载)
- self.seq_len = self.config.get('model.seq_len', 10)
- self.output_size = self.config.get('model.output_size', 5)
- self.labels_num = self.config.get('model.labels_num', 16)
- self.feature_num = self.config.get('model.feature_num', 79)
- self.step_size = self.config.get('model.step_size', 5)
- self.dropout = self.config.get('model.dropout', 0)
- self.lr = self.config.get('model.lr', 0.01)
- self.num_heads = self.config.get('model.num_heads', 8)
- self.hidden_size = self.config.get('model.hidden_size', 64)
- self.batch_size = self.config.get('model.batch_size', 512)
- self.num_layers = self.config.get('model.num_layers', 1)
- self.random_seed = self.config.get('model.random_seed', 1314)
-
- # 数据处理参数
- self.resolution = self.config.get('data.resolution', 60)
- self.test_start_date = self.config.get('data.test_start_date', '2025-07-01')
- self.wavelet = self.config.get('data.wavelet.type', 'db4')
- self.level = self.config.get('data.wavelet.level', 3)
- self.level_after = self.config.get('data.wavelet.level_after', 4)
- self.mode = self.config.get('data.wavelet.mode', 'soft')
- self.min_rows = self.config.get('data.min_rows', 600)
-
- # 阈值参数
- self.uf_threshold = self.config.get('data.threshold.uf', 0.001)
- self.ro_threshold = self.config.get('data.threshold.ro', 0.01)
- self.flow_threshold = self.config.get('data.threshold.flow', 1.0)
-
- # 文件路径(相对于20min目录)
- self.model_path = os.path.join(current_dir, '20min_model.pth')
- self.scaler_path = os.path.join(current_dir, '20min_scaler.pkl')
- self.edge_index_path = os.path.join(parent_dir, 'shared', 'edge_index.pt')
- self.output_csv_path = os.path.join(current_dir, '20min_predictions.csv')
-
- # 后处理参数
- self.remove_outliers_flag = self.config.get('postprocess.remove_outliers', False)
- self.smooth_flag = self.config.get('postprocess.smooth', False)
-
- # 预测目标列名
- self.target_columns = self.config.get('target_columns', [])
-
- # 设备配置
- use_cuda = self.config.get('device.use_cuda', True)
- cuda_device = self.config.get('device.cuda_device', 0)
-
- if use_cuda and torch.cuda.is_available():
- self.device = torch.device(f"cuda:{cuda_device}")
- self.logger.info(f"使用设备: GPU-{torch.cuda.get_device_name(cuda_device)}")
- else:
- self.device = torch.device("cpu")
- self.logger.info("使用设备: CPU")
-
- set_seed(self.random_seed)
-
- # 加载数据归一化器
- if not os.path.exists(self.scaler_path):
- self.logger.error(f"归一化器文件不存在: {self.scaler_path}")
- raise FileNotFoundError(f"归一化器文件不存在: {self.scaler_path}")
-
- self.scaler = joblib.load(self.scaler_path)
-
- # 初始化模型和数据加载器(后续加载)
- self.model = None
- self.edge_index = None
- self.test_loader = None
- self.raw_input_data = None
-
- self.logger.info("预测器初始化完成")
-
- def ensure_min_rows(self, df):
- """
- 确保数据至少有指定行数,不足则进行前后补充
- 向前补充:使用最早的数据向前扩展
- 向后补充:使用最新的数据向后扩展
- """
- current_rows = len(df)
- if current_rows >= self.min_rows:
- return df
-
- # 计算需要补充的行数
- need_rows = self.min_rows - current_rows
- self.logger.info(f"数据行数不足{self.min_rows}行(当前{current_rows}行),需要补充{need_rows}行")
-
- # 计算时间间隔(假设数据是均匀采样的)
- time_col = 'index'
- df[time_col] = pd.to_datetime(df[time_col])
- time_diff = (df[time_col].iloc[1] - df[time_col].iloc[0]).total_seconds()
-
- # 向前补充(使用最早的数据)
- forward_rows = need_rows // 2
- if forward_rows > 0:
- earliest_data = df.iloc[0:1].copy()
- forward_data = []
- for i in range(1, forward_rows + 1):
- new_row = earliest_data.copy()
- new_row[time_col] = earliest_data[time_col] - timedelta(seconds=time_diff * i)
- forward_data.append(new_row)
- forward_df = pd.concat(forward_data, ignore_index=True)
- df = pd.concat([forward_df, df], ignore_index=True)
-
- # 检查是否还需要向后补充
- current_rows = len(df)
- if current_rows < self.min_rows:
- backward_rows = self.min_rows - current_rows
- latest_data = df.iloc[-1:].copy()
- backward_data = []
- for i in range(1, backward_rows + 1):
- new_row = latest_data.copy()
- new_row[time_col] = latest_data[time_col] + timedelta(seconds=time_diff * i)
- backward_data.append(new_row)
- backward_df = pd.concat(backward_data, ignore_index=True)
- df = pd.concat([df, backward_df], ignore_index=True)
-
- self.logger.info(f"数据补充完成,当前行数:{len(df)}行")
- return df
-
- def reorder_columns(self, df):
- """调整数据列顺序,确保与训练时的特征顺序一致"""
- desired_order = [
- 'index',
- 'C.M.FT_ZGJJY1@out','C.M.RO1_FT_JS@out','C.M.RO2_FT_JS@out','C.M.RO3_FT_JS@out',
- 'C.M.RO4_FT_JS@out','C.M.UF1_FT_JS@out','C.M.UF2_FT_JS@out','C.M.UF3_FT_JS@out',
- 'C.M.UF4_FT_JS@out','C.M.UF_FT_ZCS@out','C.M.FT_ZGJJY2@out','C.M.FT_ZGJJY3@out',
- 'C.M.FT_ZGJJY4@out','C.M.RO1_PT_JS@out','C.M.RO2_PT_JS@out','C.M.RO3_PT_JS@out',
- 'C.M.UF1_PT_JS@out','C.M.UF2_PT_JS@out','C.M.UF3_PT_JS@out','C.M.UF4_PT_JS@out',
- 'C.M.LT_JSC@out','C.M.RO1_PT_CS@out','C.M.RO1_PT_DJ2@out','C.M.RO2_PT_CS@out',
- 'C.M.RO2_PT_DJ2@out','C.M.RO3_PT_CS@out','C.M.RO3_PT_DJ2@out','C.M.RO4_PT_CS@out',
- 'C.M.RO4_PT_DJ2@out','C.M.RO4_PT_JS@out','C.M.LT_HCl@out','C.M.LT_NaClO@out',
- 'C.M.LT_PAC@out','C.M.LT_QSC@out','C.M.RO_Cond_ZCS@out','C.M.RO_TT_ZJS@out',
- 'C.M.UF1_JSF_kd@out','C.M.UF2_JSF_kd@out','C.M.UF_GSB4_fre@out','C.M.UF_ORP_ZCS@out',
- 'C.M.JYB2_ZGJ1_fre@out','C.M.JYB2_ZGJ2_fre@out','C.M.JYB2_ZGJ3_fre@out','C.M.JYB2_ZGJ4_fre@out',
- 'C.M.RO1_GYB_fre@out','C.M.RO2_GYB_fre@out','C.M.RO3_GYB_fre@out','C.M.RO4_GYB_fre@out',
- 'C.M.UF3_JSF_kd@out','C.M.UF4_JSF_kd@out','C.M.UF_FXB2_fre@out','C.M.RO1_DJB_fre@out',
- 'C.M.RO1_GYBF_kd@out','C.M.RO2_DJB_fre@out','C.M.RO2_GYBF_kd@out','C.M.RO3_DJB_fre@out',
- 'C.M.RO3_GYBF_kd@out','C.M.RO4_DJB_fre@out','C.M.RO4_GYBF_kd@out',
- 'C.M.UF1_DB@press_PV','C.M.UF2_DB@press_PV','C.M.UF3_DB@press_PV','C.M.UF4_DB@press_PV',
- 'UF1Per','UF2Per','UF3Per','UF4Per',
- 'C.M.RO1_DB@DPT_1','C.M.RO2_DB@DPT_1','C.M.RO3_DB@DPT_1','C.M.RO4_DB@DPT_1',
- 'C.M.RO1_DB@DPT_2','C.M.RO2_DB@DPT_2','C.M.RO3_DB@DPT_2','C.M.RO4_DB@DPT_2',
- ]
- return df.loc[:, desired_order]
- def process_date(self, data):
- """处理日期列,生成周期性时间特征"""
- if 'index' in data.columns:
- data = data.rename(columns={'index': 'date'})
- data['date'] = pd.to_datetime(data['date'])
- data['minute_of_day'] = data['date'].dt.hour * 60 + data['date'].dt.minute
- data['day_of_year'] = data['date'].dt.dayofyear
-
- # 周期性编码
- data['minute_sin'] = np.sin(2 * np.pi * data['minute_of_day'] / 1440)
- data['minute_cos'] = np.cos(2 * np.pi * data['minute_of_day'] / 1440)
- data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366)
- data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366)
- data.drop(columns=['minute_of_day', 'day_of_year'], inplace=True)
-
- # 调整列顺序
- time_features = ['minute_sin', 'minute_cos', 'day_year_sin', 'day_year_cos']
- other_columns = [col for col in data.columns if col not in ['date'] + time_features]
- return data[['date'] + time_features + other_columns]
- def scaler_data(self, data):
- """对数据进行归一化处理"""
- date_col = data[['date']]
- data_to_scale = data.drop(columns=['date'])
- scaled = self.scaler.transform(data_to_scale)
- scaled_df = pd.DataFrame(scaled, columns=data_to_scale.columns)
- result = pd.concat([date_col.reset_index(drop=True), scaled_df], axis=1)
- return result
-
- def remove_outliers(self, predictions):
- """使用四分位法处理预测结果中的异常值"""
- cleaned = predictions.copy()
- for col in range(cleaned.shape[1]):
- values = cleaned[:, col]
- q1 = np.percentile(values, 25)
- q3 = np.percentile(values, 75)
- iqr = q3 - q1
- lower_bound = q1 - 1.5 * iqr
- upper_bound = q3 + 1.5 * iqr
- normal_values = values[(values >= lower_bound) & (values <= upper_bound)]
- if len(normal_values) > 0:
- mean_normal = np.mean(normal_values)
- cleaned[(values < lower_bound) | (values > upper_bound), col] = mean_normal
- return cleaned
-
- def smooth_predictions(self, predictions):
- """对预测结果进行加权平滑处理"""
- smoothed = predictions.copy()
- n_timesteps = predictions.shape[0]
- if n_timesteps <= 1:
- return smoothed
-
- for col in range(predictions.shape[1]):
- values = predictions[:, col]
- smoothed[0, col] = (2 * values[0] + values[1]) / 3
- for i in range(1, n_timesteps - 1):
- smoothed[i, col] = (values[i-1] + 2 * values[i] + values[i+1]) / 4
- smoothed[-1, col] = (values[-2] + 2 * values[-1]) / 3
- return smoothed
- def create_test_loader(self, df):
- """构建测试数据加载器"""
- df['date'] = pd.to_datetime(df['date'])
- time_interval = pd.Timedelta(minutes=(4 * self.resolution / 60))
- window_time_span = time_interval * (self.seq_len + 20)
- adjusted_test_start = pd.to_datetime(self.test_start_date) - window_time_span
- test_df = df[df['date'] >= adjusted_test_start].reset_index(drop=True)
- test_df = test_df.drop(columns=['date'])
- # 构建监督学习数据集
- feature_columns = test_df.columns.tolist()
- cols = []
-
- for col in feature_columns:
- for i in range(self.seq_len - 1, -1, -1):
- cols.append(test_df[[col]].shift(i))
-
- for i in range(1, self.output_size + 1):
- for col in feature_columns[-self.labels_num:]:
- cols.append(test_df[[col]].shift(-i))
-
- dataset = pd.concat(cols, axis=1).iloc[::self.step_size]
- dataset = dataset.iloc[[-1]]
-
- n_features_total = self.feature_num * self.seq_len
- supervised_data = dataset.iloc[:, :n_features_total]
- X = supervised_data.values.reshape(-1, self.seq_len, self.feature_num)
- X = torch.tensor(X, dtype=torch.float32).to(self.device)
- tensor_dataset = TensorDataset(X)
- loader = DataLoader(tensor_dataset, batch_size=self.batch_size, shuffle=False)
-
- return loader
-
- def get_recent_values_as_fallback(self):
- """从原始输入数据中获取最近的output_size条记录作为备用输出"""
- # 确保原始数据已保存
- if self.raw_input_data is None:
- raise ValueError("原始输入数据未保存,无法获取备用值")
-
- # 按时间排序并取最近的output_size条
- recent_data = self.raw_input_data.sort_values('index').tail(self.output_size)
-
- # 若数据不足,用最后一条补充
- if len(recent_data) < self.output_size:
- last_row = recent_data.iloc[-1:] if not recent_data.empty else pd.DataFrame(
- {col: [0.0] for col in self.target_columns}, index=[0])
- while len(recent_data) < self.output_size:
- recent_data = pd.concat([recent_data, last_row], ignore_index=True)
-
- # 提取目标列值并返回
- fallback_values = recent_data[self.target_columns].values
- return fallback_values
-
- @log_execution_time
- def load_data(self, df):
- """数据加载和预处理"""
- self.logger.info(f"[数据加载] 原始形状: {df.shape}, 列数: {len(df.columns)}")
-
- try:
- df = self.reorder_columns(df)
- self.logger.info(f"[列重排] 完成")
- except Exception as e:
- self.logger.error(f"[列重排] 失败: {e}")
- raise
-
- df = df.iloc[::self.resolution, :].reset_index(drop=True)
- self.logger.info(f"[下采样] 采样率={self.resolution}, 采样后形状: {df.shape}")
-
- try:
- df = self.process_date(df)
- self.logger.info(f"[时间特征] 生成完成")
- except Exception as e:
- self.logger.error(f"[时间特征] 生成失败: {e}")
- raise
-
- try:
- df = self.scaler_data(df)
- self.logger.info(f"[归一化] 完成")
- except Exception as e:
- self.logger.error(f"[归一化] 失败: {e}")
- raise
-
- try:
- self.test_loader = self.create_test_loader(df)
- self.logger.info(f"[数据加载器] 创建完成")
- except Exception as e:
- self.logger.error(f"[数据加载器] 创建失败: {e}")
- raise
-
- if not os.path.exists(self.edge_index_path):
- self.logger.error(f"[图结构] 边索引文件不存在: {self.edge_index_path}")
- raise FileNotFoundError(f"图边索引文件不存在: {self.edge_index_path}")
-
- self.edge_index = torch.load(self.edge_index_path, map_location=self.device, weights_only=True)
- self.logger.info(f"[图结构] 边索引加载完成, shape: {self.edge_index.shape}")
- @log_execution_time
- def load_model(self):
- """加载模型和预训练权重"""
- if not os.path.exists(self.model_path):
- self.logger.error(f"[模型加载] 文件不存在: {self.model_path}")
- raise FileNotFoundError(f"模型文件不存在: {self.model_path}")
-
- try:
- self.logger.info("[模型加载] 初始化模型结构")
- self.model = GAT_LSTM(self).to(self.device)
-
- if self.edge_index is not None:
- self.model.set_edge_index(self.edge_index.to(self.device))
- self.logger.info("[模型加载] 图结构边索引设置完成")
-
- self.model.load_state_dict(torch.load(self.model_path, map_location=self.device, weights_only=True))
- self.model.eval()
-
- total_params = sum(p.numel() for p in self.model.parameters())
- self.logger.info(f"[模型加载] 完成 - 参数量: {total_params:,}")
- except Exception as e:
- self.logger.error(f"[模型加载] 失败: {e}")
- raise
- @log_execution_time
- def predict(self, df):
- """执行预测"""
- self.logger.info("[预测流程] 开始")
-
- # 保存原始输入数据用于可能的降级策略
- self.raw_input_data = df.copy()
-
- # 确保数据行数不少于指定行数
- df = self.ensure_min_rows(df)
-
- try:
- # 更新测试起始时间
- latest_time = pd.to_datetime(df['index']).max()
- self.test_start_date = (latest_time + timedelta(minutes=4)).strftime("%Y-%m-%d %H:%M:%S")
- self.logger.info(f"[预测时间] 输入数据最新时间: {latest_time}, 预测起始时间: {self.test_start_date}")
- except Exception as e:
- self.logger.error(f"[预测时间] 计算失败: {e}")
- raise
-
- # 加载和预处理数据
- self.load_data(df)
-
- # 加载模型
- self.load_model()
- # 执行推理
- try:
- self.logger.info("[模型推理] 开始")
- all_predictions = []
- batch_count = 0
- with torch.no_grad():
- for batch in self.test_loader:
- inputs = batch[0].to(self.device)
- outputs = self.model(inputs)
- all_predictions.append(outputs.cpu().numpy())
- batch_count += 1
-
- self.logger.info(f"[模型推理] 完成 - 批次数: {batch_count}")
- predictions = np.concatenate(all_predictions, axis=0).reshape(-1, self.labels_num)
- self.logger.info(f"[模型推理] 原始预测形状: {predictions.shape}")
- except Exception as e:
- self.logger.error(f"[模型推理] 失败: {e}")
- raise
-
- # 反归一化
- try:
- self.logger.info("[反归一化] 开始")
- from sklearn.preprocessing import MinMaxScaler
- inverse_scaler = MinMaxScaler()
- inverse_scaler.min_ = self.scaler.min_[-self.labels_num:]
- inverse_scaler.scale_ = self.scaler.scale_[-self.labels_num:]
- predictions = inverse_scaler.inverse_transform(predictions)
- self.logger.info(f"[反归一化] 完成 - 值域: [{predictions.min():.2f}, {predictions.max():.2f}]")
- except Exception as e:
- self.logger.error(f"[反归一化] 失败: {e}")
- raise
-
- # 检查是否有NaN值,有则使用备用值
- if np.isnan(predictions).any():
- self.logger.warning("[预测结果] 发现NaN值,使用最近值作为备用")
- predictions = self.get_recent_values_as_fallback()
-
- # 可选后处理
- if self.remove_outliers_flag:
- self.logger.info("[后处理] 执行异常值移除")
- predictions = self.remove_outliers(predictions)
- if self.smooth_flag:
- self.logger.info("[后处理] 执行平滑处理")
- predictions = self.smooth_predictions(predictions)
-
- self.logger.info(f"[预测流程] 完成 - 最终形状: {predictions.shape}, 值域: [{predictions.min():.2f}, {predictions.max():.2f}]")
-
- return predictions
- def save_predictions(self, predictions, start_date=None, output_path=None):
- """保存预测结果为CSV并返回DataFrame"""
- try:
- if start_date is None:
- start_date = self.test_start_date
-
- self.logger.info(f"[保存结果] 预测起始时间: {start_date}, 预测点数: {len(predictions)}")
-
- start_time = datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S")
- time_interval = timedelta(minutes=(4 * self.resolution / 60))
- timestamps = [start_time + i * time_interval for i in range(len(predictions))]
- base_columns = self.target_columns
- pred_columns = [f'{col}_Predicted' for col in base_columns]
- df_result = pd.DataFrame(predictions, columns=pred_columns)
- df_result.insert(0, 'index', timestamps)
-
- save_path = output_path if output_path else self.output_csv_path
- df_result.to_csv(save_path, index=False)
- self.logger.info(f"[保存结果] 完成 - 文件: {save_path}, 时间范围: {timestamps[0]} 至 {timestamps[-1]}")
-
- return df_result
- except Exception as e:
- self.logger.error(f"[保存结果] 失败: {e}")
- raise
- if __name__ == '__main__':
- """主函数:执行20分钟TMP预测"""
- import json
- import os
- import pandas as pd
-
- try:
- predictor = Predictor()
-
- json_file_path = '/Users/wmy/Downloads/pp.json'
- if not os.path.exists(json_file_path):
- predictor.logger.error(f"输入文件不存在: {json_file_path}")
- raise FileNotFoundError(f"未找到文件: {json_file_path}")
-
- predictor.logger.info(f"读取输入文件: {json_file_path}")
-
- with open(json_file_path, 'r', encoding='utf-8') as f:
- json_data = json.load(f)
- df = pd.DataFrame(json_data)
- predictions = predictor.predict(df)
- predictor.save_predictions(predictions)
-
- predictor.logger.info("预测任务完成")
-
- except Exception as e:
- if 'predictor' in locals():
- predictor.logger.error(f"预测失败: {str(e)}", exc_info=True)
- else:
- print(f"初始化失败: {str(e)}")
- raise
|