""" 20分钟TMP预测模型 版本:1.0 最后更新:2025-10-28 """ import os import sys import torch import pandas as pd import numpy as np import joblib import pywt from datetime import datetime, timedelta from torch.utils.data import DataLoader, TensorDataset from gat_lstm import GAT_LSTM # 导入自定义的GAT-LSTM模型 from tqdm import tqdm # 添加项目根目录到系统路径,以便导入common模块 project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../..')) if project_root not in sys.path: sys.path.insert(0, project_root) from common.utils.logger import setup_logger, log_execution_time from common.utils.config import Config def set_seed(seed): """ 设置全局随机种子,保证实验可重复性 Args: seed: 随机种子值 Note: - 设置Python、NumPy、PyTorch的随机种子 - 确保CUDA操作的确定性 - 关闭CUDA的性能优化(以确保可重复性) """ import random random.seed(seed) # Python随机数生成器 os.environ['PYTHONHASHSEED'] = str(seed) # Python哈希种子 np.random.seed(seed) # NumPy随机数生成器 torch.manual_seed(seed) # PyTorch CPU随机数生成器 torch.cuda.manual_seed(seed) # 当前GPU随机数生成器 torch.cuda.manual_seed_all(seed) # 所有GPU随机数生成器 torch.backends.cudnn.deterministic = True # 确保CUDA操作确定性 torch.backends.cudnn.benchmark = False # 关闭CUDA性能优化 class Predictor: """ TMP预测器类 功能: - 加载并预处理输入数据 - 加载训练好的GAT-LSTM模型 - 执行预测并保存结果 使用示例: predictor = Predictor() predictions = predictor.predict(df) predictor.save_predictions(predictions) """ def __init__(self, config_path='config.yaml'): """ 初始化预测器 Args: config_path: 配置文件路径,默认为当前目录下的config.yaml Raises: FileNotFoundError: 配置文件或模型文件不存在 Note: - 从配置文件加载所有参数 - 自动检测并使用GPU(如果可用) - 加载训练时保存的数据归一化器 """ # 加载配置文件 config_file = os.path.join(os.path.dirname(__file__), config_path) self.config = Config(config_file) # 设置日志 log_dir = os.path.join(os.path.dirname(__file__), os.path.dirname(self.config.get('logging.log_file', 'logs/20min_predict.log'))) os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(os.path.dirname(__file__), self.config.get('logging.log_file', 'logs/20min_predict.log')) self.logger = setup_logger( name='20min_predict', level=self.config.get('logging.level', 'INFO'), log_file=log_file, format_type=self.config.get('logging.format', 'colored'), max_bytes=self.config.get('logging.max_bytes', 10*1024*1024), backup_count=self.config.get('logging.backup_count', 5) ) self.logger.info("=" * 80) self.logger.info("初始化20分钟TMP预测器") self.logger.info("=" * 80) # 模型参数(从配置文件加载) self.seq_len = self.config.get('model.seq_len', 10) self.output_size = self.config.get('model.output_size', 5) self.labels_num = self.config.get('model.labels_num', 16) self.feature_num = self.config.get('model.feature_num', 79) self.step_size = self.config.get('model.step_size', 5) self.dropout = self.config.get('model.dropout', 0) self.lr = self.config.get('model.lr', 0.01) self.num_heads = self.config.get('model.num_heads', 8) self.hidden_size = self.config.get('model.hidden_size', 64) self.batch_size = self.config.get('model.batch_size', 512) self.num_layers = self.config.get('model.num_layers', 1) self.random_seed = self.config.get('model.random_seed', 1314) # 数据处理参数 self.resolution = self.config.get('data.resolution', 60) self.test_start_date = self.config.get('data.test_start_date', '2025-07-01') self.wavelet = self.config.get('data.wavelet.type', 'db4') self.level = self.config.get('data.wavelet.level', 3) self.level_after = self.config.get('data.wavelet.level_after', 4) self.mode = self.config.get('data.wavelet.mode', 'soft') # 阈值参数 self.uf_threshold = self.config.get('data.threshold.uf', 0.001) self.ro_threshold = self.config.get('data.threshold.ro', 0.01) self.flow_threshold = self.config.get('data.threshold.flow', 1.0) # 文件路径(相对于当前脚本目录) script_dir = os.path.dirname(__file__) self.model_path = os.path.join(script_dir, self.config.get('paths.model_path', '20min_model.pth')) self.scaler_path = os.path.join(script_dir, self.config.get('paths.scaler_path', '20min_scaler.pkl')) self.edge_index_path = os.path.join(script_dir, self.config.get('paths.edge_index_path', 'edge_index.pt')) self.output_csv_path = os.path.join(script_dir, self.config.get('paths.output_csv_path', '20min_predictions.csv')) # 后处理参数 self.remove_outliers_flag = self.config.get('postprocess.remove_outliers', False) self.smooth_flag = self.config.get('postprocess.smooth', False) # 设备配置 use_cuda = self.config.get('device.use_cuda', True) cuda_device = self.config.get('device.cuda_device', 0) if use_cuda and torch.cuda.is_available(): self.device = torch.device(f"cuda:{cuda_device}") self.logger.info(f"使用GPU设备: {self.device} ({torch.cuda.get_device_name(cuda_device)})") else: self.device = torch.device("cpu") self.logger.info("使用CPU设备") # 设置随机种子 self.logger.info(f"设置随机种子: {self.random_seed}") set_seed(self.random_seed) # 加载数据归一化器 if not os.path.exists(self.scaler_path): self.logger.error(f"归一化器文件不存在: {self.scaler_path}") raise FileNotFoundError(f"归一化器文件不存在: {self.scaler_path}") self.logger.info(f"加载数据归一化器: {self.scaler_path}") self.scaler = joblib.load(self.scaler_path) # 初始化模型和数据加载器(后续加载) self.model = None self.edge_index = None self.test_loader = None self.logger.info("预测器初始化完成") self.logger.info(f"模型参数: seq_len={self.seq_len}, output_size={self.output_size}, " f"labels_num={self.labels_num}, feature_num={self.feature_num}") self.logger.info(f"数据参数: resolution={self.resolution}, batch_size={self.batch_size}") def reorder_columns(self, df): """ 调整数据列顺序,确保与训练时的特征顺序一致 Args: df: 输入的DataFrame Returns: DataFrame: 列顺序调整后的DataFrame Note: - 避免因列顺序不一致导致模型输入特征错位 - 必须包含所有必需的特征列 """ self.logger.debug("开始重排数据列顺序") desired_order = [ 'index', 'C.M.FT_ZGJJY1@out','C.M.RO1_FT_JS@out','C.M.RO2_FT_JS@out','C.M.RO3_FT_JS@out', 'C.M.RO4_FT_JS@out','C.M.UF1_FT_JS@out','C.M.UF2_FT_JS@out','C.M.UF3_FT_JS@out', 'C.M.UF4_FT_JS@out','C.M.UF_FT_ZCS@out','C.M.FT_ZGJJY2@out','C.M.FT_ZGJJY3@out', 'C.M.FT_ZGJJY4@out','C.M.RO1_PT_JS@out','C.M.RO2_PT_JS@out','C.M.RO3_PT_JS@out', 'C.M.UF1_PT_JS@out','C.M.UF2_PT_JS@out','C.M.UF3_PT_JS@out','C.M.UF4_PT_JS@out', 'C.M.LT_JSC@out','C.M.RO1_PT_CS@out','C.M.RO1_PT_DJ2@out','C.M.RO2_PT_CS@out', 'C.M.RO2_PT_DJ2@out','C.M.RO3_PT_CS@out','C.M.RO3_PT_DJ2@out','C.M.RO4_PT_CS@out', 'C.M.RO4_PT_DJ2@out','C.M.RO4_PT_JS@out','C.M.LT_HCl@out','C.M.LT_NaClO@out', 'C.M.LT_PAC@out','C.M.LT_QSC@out','C.M.RO_Cond_ZCS@out','C.M.RO_TT_ZJS@out', 'C.M.UF1_JSF_kd@out','C.M.UF2_JSF_kd@out','C.M.UF_GSB4_fre@out','C.M.UF_ORP_ZCS@out', 'C.M.JYB2_ZGJ1_fre@out','C.M.JYB2_ZGJ2_fre@out','C.M.JYB2_ZGJ3_fre@out','C.M.JYB2_ZGJ4_fre@out', 'C.M.RO1_GYB_fre@out','C.M.RO2_GYB_fre@out','C.M.RO3_GYB_fre@out','C.M.RO4_GYB_fre@out', 'C.M.UF3_JSF_kd@out','C.M.UF4_JSF_kd@out','C.M.UF_FXB2_fre@out','C.M.RO1_DJB_fre@out', 'C.M.RO1_GYBF_kd@out','C.M.RO2_DJB_fre@out','C.M.RO2_GYBF_kd@out','C.M.RO3_DJB_fre@out', 'C.M.RO3_GYBF_kd@out','C.M.RO4_DJB_fre@out','C.M.RO4_GYBF_kd@out', 'C.M.UF1_DB@press_PV','C.M.UF2_DB@press_PV','C.M.UF3_DB@press_PV','C.M.UF4_DB@press_PV', 'UF1Per','UF2Per','UF3Per','UF4Per', 'C.M.RO1_DB@DPT_1','C.M.RO2_DB@DPT_1','C.M.RO3_DB@DPT_1','C.M.RO4_DB@DPT_1', 'C.M.RO1_DB@DPT_2','C.M.RO2_DB@DPT_2','C.M.RO3_DB@DPT_2','C.M.RO4_DB@DPT_2', ] self.logger.debug(f"原始列: {list(df.columns)}") self.logger.debug(f"目标列顺序: {desired_order}") return df.loc[:, desired_order] def process_date(self, data): """ 处理日期列,生成周期性时间特征 Args: data: 输入DataFrame,必须包含'index'或'date'列 Returns: DataFrame: 包含时间特征的DataFrame Note: - 生成分钟级正弦/余弦特征(捕捉每日周期性模式) - 生成年中日正弦/余弦特征(捕捉年度周期性模式) - 使用三角函数编码确保时间连续性(避免边界突变) """ self.logger.debug("开始处理日期特征") if 'index' in data.columns: data = data.rename(columns={'index': 'date'}) data['date'] = pd.to_datetime(data['date']) data['minute_of_day'] = data['date'].dt.hour * 60 + data['date'].dt.minute data['day_of_year'] = data['date'].dt.dayofyear # 周期性编码(将时间转换为正弦/余弦值,确保周期性连续) data['minute_sin'] = np.sin(2 * np.pi * data['minute_of_day'] / 1440) # 分钟正弦特征 data['minute_cos'] = np.cos(2 * np.pi * data['minute_of_day'] / 1440) # 分钟余弦特征 data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366) # 年中日正弦特征 data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366) # 年中日余弦特征 # 移除原始时间列(仅保留编码后的特征) data.drop(columns=['minute_of_day', 'day_of_year'], inplace=True) # 调整列顺序:日期 + 时间特征 + 其他特征 time_features = ['minute_sin', 'minute_cos', 'day_year_sin', 'day_year_cos'] other_columns = [col for col in data.columns if col not in ['date'] + time_features] self.logger.debug(f"生成时间特征: {time_features}") return data[['date'] + time_features + other_columns] def scaler_data(self, data): """ 对数据进行归一化处理 Args: data: 输入DataFrame Returns: DataFrame: 归一化后的DataFrame Note: - 使用训练时保存的scaler进行归一化 - 保持与训练数据的归一化方式一致(MinMax 0-1缩放) - 日期列不参与归一化 """ self.logger.debug("开始数据归一化") date_col = data[['date']] data_to_scale = data.drop(columns=['date']) scaled = self.scaler.transform(data_to_scale) scaled_df = pd.DataFrame(scaled, columns=data_to_scale.columns) # 拼接日期列和归一化后的特征列 result = pd.concat([date_col.reset_index(drop=True), scaled_df], axis=1) self.logger.debug(f"归一化完成,数据形状: {result.shape}") return result def remove_outliers(self, predictions): """ 使用四分位法处理预测结果中的异常值 Args: predictions: numpy数组,形状为[时间步, 标签数] Returns: numpy数组: 处理异常值后的预测结果 Note: - 异常值定义:小于Q1-1.5*IQR或大于Q3+1.5*IQR的值 - 异常值替换为正常值的平均值(避免极端值影响结果) - 按列(每个指标)独立处理 """ self.logger.info("开始移除异常值(四分位法)") cleaned = predictions.copy() # 遍历每个特征列(16个标签) for col in range(cleaned.shape[1]): values = cleaned[:, col] # 计算四分位数 q1 = np.percentile(values, 25) q3 = np.percentile(values, 75) iqr = q3 - q1 # 异常值边界 lower_bound = q1 - 1.5 * iqr upper_bound = q3 + 1.5 * iqr # 筛选正常值 normal_values = values[(values >= lower_bound) & (values <= upper_bound)] # 用正常值的平均值替换异常值 if len(normal_values) > 0: mean_normal = np.mean(normal_values) outlier_count = np.sum((values < lower_bound) | (values > upper_bound)) if outlier_count > 0: self.logger.debug(f"列{col}: 检测到{outlier_count}个异常值,替换为均值{mean_normal:.4f}") cleaned[(values < lower_bound) | (values > upper_bound), col] = mean_normal self.logger.info("异常值处理完成") return cleaned def smooth_predictions(self, predictions): """ 对预测结果进行加权平滑处理 Args: predictions: numpy数组,形状为[时间步, 标签数] Returns: numpy数组: 平滑后的预测结果 Note: - 采用滑动窗口加权平均减少预测波动 - 中间值权重为2,前后邻居权重为1 - 边缘值特殊处理(避免过度平滑) """ self.logger.info("开始平滑预测结果") smoothed = predictions.copy() n_timesteps = predictions.shape[0] if n_timesteps <= 1: return smoothed # 遍历每个特征列 for col in range(predictions.shape[1]): values = predictions[:, col] # 第一个值:加权前两个值(避免边缘过度平滑) smoothed[0, col] = (2 * values[0] + values[1]) / 3 # 中间值:加权前后邻居(核心平滑) for i in range(1, n_timesteps - 1): smoothed[i, col] = (values[i-1] + 2 * values[i] + values[i+1]) / 4 # 最后一个值:加权最后两个值(避免边缘过度平滑) smoothed[-1, col] = (values[-2] + 2 * values[-1]) / 3 self.logger.info("预测结果平滑完成") return smoothed def create_test_loader(self, df): """ 构建测试数据加载器 Args: df: 预处理后的DataFrame Returns: DataLoader: PyTorch数据加载器 Note: - 将原始时间序列数据转换为模型输入格式 - 构建滑动窗口序列:[样本数, 序列长度, 特征数] - 确保有足够的历史数据构建输入序列 """ self.logger.info("创建测试数据加载器") df['date'] = pd.to_datetime(df['date']) # 计算时间间隔(根据分辨率,单位:分钟) time_interval = pd.Timedelta(minutes=(4 * self.resolution / 60)) # 计算窗口时间跨度(确保能覆盖输入序列长度+预测步长) window_time_span = time_interval * (self.seq_len + 20) # 调整测试集起始时间(确保有足够的历史数据构建输入序列) adjusted_test_start = pd.to_datetime(self.test_start_date) - window_time_span # 筛选所需的历史数据 test_df = df[df['date'] >= adjusted_test_start].reset_index(drop=True) test_df = test_df.drop(columns=['date']) # 构建监督学习数据集(输入序列+目标序列的占位) feature_columns = test_df.columns.tolist() cols = [] # 构建输入序列(历史seq_len个时间步的特征) for col in feature_columns: for i in range(self.seq_len - 1, -1, -1): cols.append(test_df[[col]].shift(i)) # 滞后i步的特征(t-0到t-(seq_len-1)) # 构建目标序列占位(未来output_size个时间步的标签,预测时不使用真实值) for i in range(1, self.output_size + 1): for col in feature_columns[-self.labels_num:]: cols.append(test_df[[col]].shift(-i)) # 超前i步的标签(t+1到t+output_size) # 合并列并按步长采样,最后取最后一行作为预测输入(最新的历史数据) dataset = pd.concat(cols, axis=1).iloc[::self.step_size] dataset = dataset.iloc[[-1]] # 提取输入特征(前n_features_total列) n_features_total = self.feature_num * self.seq_len supervised_data = dataset.iloc[:, :n_features_total] # 转换为模型输入格式:[样本数, 序列长度, 特征数] X = supervised_data.values.reshape(-1, self.seq_len, self.feature_num) X = torch.tensor(X, dtype=torch.float32).to(self.device) tensor_dataset = TensorDataset(X) loader = DataLoader(tensor_dataset, batch_size=self.batch_size, shuffle=False) self.logger.info(f"测试数据加载器创建完成,输入形状: {X.shape}") return loader @log_execution_time def load_data(self, df): """ 数据加载和预处理主流程 Args: df: 原始输入DataFrame Note: - 重排列特征列顺序 - 下采样(根据resolution参数) - 日期特征工程 - 数据归一化 - 创建测试数据加载器 - 加载图结构边索引 """ self.logger.info("开始加载和预处理数据") self.logger.info(f"原始数据形状: {df.shape}") df = self.reorder_columns(df) self.logger.info(f"下采样率: {self.resolution}") df = df.iloc[::self.resolution, :].reset_index(drop=True) self.logger.info(f"下采样后数据形状: {df.shape}") df = self.process_date(df) df = self.scaler_data(df) self.test_loader = self.create_test_loader(df) if not os.path.exists(self.edge_index_path): self.logger.error(f"图边索引文件不存在: {self.edge_index_path}") raise FileNotFoundError(f"图边索引文件不存在: {self.edge_index_path}") self.logger.info(f"加载图边索引: {self.edge_index_path}") self.edge_index = torch.load(self.edge_index_path, map_location=self.device, weights_only=True) self.logger.info("数据加载和预处理完成") @log_execution_time def load_model(self): """ 加载模型结构和预训练权重 Raises: FileNotFoundError: 模型文件不存在 Note: - 实例化GAT-LSTM模型 - 加载预训练权重 - 设置为评估模式(关闭dropout和batch normalization) - 设置图结构边索引 """ if not os.path.exists(self.model_path): self.logger.error(f"模型文件不存在: {self.model_path}") raise FileNotFoundError(f"模型文件不存在: {self.model_path}") self.logger.info("开始加载模型") self.logger.info(f"模型路径: {self.model_path}") self.model = GAT_LSTM(self).to(self.device) if self.edge_index is not None: self.logger.debug(f"设置图边索引,形状: {self.edge_index.shape}") self.model.set_edge_index(self.edge_index.to(self.device)) self.model.load_state_dict(torch.load(self.model_path, map_location=self.device, weights_only=True)) self.model.eval() # 统计模型参数量 total_params = sum(p.numel() for p in self.model.parameters()) trainable_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad) self.logger.info(f"模型加载完成 - 总参数量: {total_params:,}, 可训练参数量: {trainable_params:,}") @log_execution_time def predict(self, df): """ 执行预测主流程 Args: df: 原始输入DataFrame,必须包含'index'列(时间戳) Returns: numpy数组: 反归一化后的预测结果,形状为[output_size, labels_num] Note: - 自动更新测试起始时间为输入数据最新时间+4分钟 - 执行数据预处理 - 加载模型 - 执行批量预测 - 反归一化预测结果 - 可选的异常值处理和平滑 """ self.logger.info("=" * 80) self.logger.info("开始预测流程") self.logger.info("=" * 80) # 更新测试起始时间为输入数据最新时间+4分钟(预测起始点) latest_time = pd.to_datetime(df['index']).max() self.test_start_date = (latest_time + timedelta(minutes=4)).strftime("%Y-%m-%d %H:%M:%S") self.logger.info(f"输入数据最新时间: {latest_time}") self.logger.info(f"预测起始时间: {self.test_start_date}") # 加载和预处理数据 self.load_data(df) # 加载模型 self.load_model() # 执行预测 self.logger.info("开始模型推理") all_predictions = [] with torch.no_grad(): for batch_idx, batch in enumerate(self.test_loader): inputs = batch[0].to(self.device) outputs = self.model(inputs) all_predictions.append(outputs.cpu().numpy()) self.logger.debug(f"批次 {batch_idx + 1} 推理完成,输入形状: {inputs.shape}, 输出形状: {outputs.shape}") # 拼接所有批次的预测结果,并重塑为[时间步, 标签数] predictions = np.concatenate(all_predictions, axis=0).reshape(-1, self.labels_num) self.logger.info(f"模型推理完成,预测结果形状: {predictions.shape}") # 反归一化(仅对标签列,使用训练时的scaler参数) self.logger.info("开始反归一化预测结果") from sklearn.preprocessing import MinMaxScaler inverse_scaler = MinMaxScaler() inverse_scaler.min_ = self.scaler.min_[-self.labels_num:] inverse_scaler.scale_ = self.scaler.scale_[-self.labels_num:] predictions = inverse_scaler.inverse_transform(predictions) self.logger.info("反归一化完成") # 可选:异常值处理和平滑(根据配置文件决定是否启用) if self.remove_outliers_flag: predictions = self.remove_outliers(predictions) if self.smooth_flag: predictions = self.smooth_predictions(predictions) self.logger.info(f"预测流程完成,最终预测结果形状: {predictions.shape}") self.logger.info(f"预测值范围: min={predictions.min():.4f}, max={predictions.max():.4f}, mean={predictions.mean():.4f}") return predictions def save_predictions(self, predictions): """ 保存预测结果为CSV文件 Args: predictions: 反归一化后的预测结果(numpy数组) Note: - 生成时间戳序列 - 添加列名 - 保存为CSV格式 """ self.logger.info("开始保存预测结果") start_time = datetime.strptime(self.test_start_date, "%Y-%m-%d %H:%M:%S") time_interval = timedelta(minutes=(4 * self.resolution / 60)) timestamps = [start_time + i * time_interval for i in range(len(predictions))] # 定义16个预测目标的原始列名 base_columns = [ 'C.M.UF1_DB@press_PV', 'C.M.UF2_DB@press_PV', 'C.M.UF3_DB@press_PV', 'C.M.UF4_DB@press_PV', 'UF1Per','UF2Per','UF3Per','UF4Per', 'C.M.RO1_DB@DPT_1', 'C.M.RO2_DB@DPT_1', 'C.M.RO3_DB@DPT_1', 'C.M.RO4_DB@DPT_1', 'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_2', ] column_names = [ 'C.M.UF1_DB@press_PV', 'C.M.UF2_DB@press_PV', 'C.M.UF3_DB@press_PV', 'C.M.UF4_DB@press_PV', 'C.M.RO1_DB@DPT_1', 'C.M.RO2_DB@DPT_1', 'C.M.RO3_DB@DPT_1', 'C.M.RO4_DB@DPT_1', 'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_2', 'RO1_CSFlow', 'RO2_CSFlow', 'RO3_CSFlow', 'RO4_CSFlow'] pred_columns = [f'{col}_Predicted' for col in base_columns] df_result = pd.DataFrame(predictions, columns=pred_columns) df_result.insert(0, 'date', timestamps) df_result.to_csv(self.output_csv_path, index=False) self.logger.info(f"预测结果已保存至: {self.output_csv_path}") self.logger.info(f"预测时间范围: {timestamps[0]} 至 {timestamps[-1]}") self.logger.info(f"预测记录数: {len(predictions)}") if __name__ == '__main__': """ 主函数:执行20分钟TMP预测 使用方法: 1. 准备输入数据(JSON格式) 2. 运行此脚本 3. 查看预测结果(保存在20min_predictions.csv) 输入数据格式: - JSON文件,包含历史时间序列数据 - 必须包含'index'列(时间戳)和所有必需的特征列 """ import json import os import pandas as pd from datetime import timedelta try: # 初始化预测器(自动加载配置文件) predictor = Predictor() # 读取JSON文件作为输入数据 json_file_path = '/Users/wmy/Downloads/pp.json' # pp.json文件路径,可根据实际位置修改 if not os.path.exists(json_file_path): predictor.logger.error(f"输入文件不存在: {json_file_path}") raise FileNotFoundError(f"未找到文件: {json_file_path}") predictor.logger.info(f"读取输入文件: {json_file_path}") # 解析JSON并转换为DataFrame with open(json_file_path, 'r', encoding='utf-8') as f: json_data = json.load(f) df = pd.DataFrame(json_data) predictor.logger.info(f"成功读取输入数据,数据形状: {df.shape}") # 执行预测并保存结果 predictions = predictor.predict(df) predictor.save_predictions(predictions) predictor.logger.info("=" * 80) predictor.logger.info("预测任务全部完成!") predictor.logger.info("=" * 80) except Exception as e: if 'predictor' in locals(): predictor.logger.error(f"预测过程发生错误: {str(e)}", exc_info=True) else: print(f"初始化预测器时发生错误: {str(e)}") raise