瀏覽代碼

1.更新锡山20min预测模型

zhanghao 3 周之前
父節點
當前提交
01d92c36bc

二進制
models/prediction_models/xishan/20min_model.pth


+ 413 - 0
models/prediction_models/xishan/20min_predict.py

@@ -0,0 +1,413 @@
+import os
+import torch
+import pandas as pd
+import numpy as np
+import joblib
+import pywt
+from datetime import datetime, timedelta
+from torch.utils.data import DataLoader, TensorDataset
+from gat_lstm import GAT_LSTM    # 导入自定义的GAT-LSTM模型
+from tqdm import tqdm
+
+def set_seed(seed):
+    """设置随机种子,保证实验可重复性"""
+    import random
+    random.seed(seed)
+    os.environ['PYTHONHASHSEED'] = str(seed)
+    np.random.seed(seed)
+    torch.manual_seed(seed)
+    torch.cuda.manual_seed(seed)
+    torch.cuda.manual_seed_all(seed)
+    torch.backends.cudnn.deterministic = True
+    torch.backends.cudnn.benchmark = False
+
+class Predictor:
+    """预测器类,用于加载数据、模型并执行预测流程"""
+    def __init__(self):
+        self.seq_len = 10    # 输入序列长度(历史时间步)
+        self.output_size = 5    # 预测步长(未来预测的时间步数)
+        self.labels_num = 16    # 预测目标数量(16个待预测的指标)
+        self.feature_num = 79   # 输入特征总维度
+        self.step_size = 5      # 数据采样步长(每隔step_size取一个样本)
+        self.dropout = 0        # dropout概率(防止过拟合)
+        self.lr = 0.01          # 学习率(训练时使用,预测时仅作参数记录)
+        self.num_heads = 8      # 注意力头数(模型结构参数)
+        self.hidden_size = 64   # 隐藏层维度
+        self.batch_size = 512   # 批处理大小
+        self.num_layers = 1     # LSTM层数
+        self.resolution = 60    # 数据分辨率(原始数据每隔60条取一条,下采样)
+        self.test_start_date = '2025-07-01'  # 测试集起始日期(初始值,会动态更新)
+        self.wavelet = 'db4'    # 小波变换类型(预留,未实际使用)
+        self.level = 3          # 小波分解层数(预留)
+        self.level_after = 4    # 后续小波处理层数(预留)
+        self.mode = 'soft'      # 小波阈值模式(预留)
+        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")  # 计算设备(GPU优先)
+        self.model_path = '20min_model.pth'   # 模型权重保存路径
+        self.output_csv_path = '20min_predictions.csv'   # 预测结果保存路径
+        self.random_seed = 1314    # 随机种子
+        self.min_rows = 600         # 定义最小数据行数要求(600行)
+        self.uf_threshold = 0.001   # UF指标阈值(预留)
+        self.ro_threshold = 0.01    # RO指标阈值(预留)
+        self.flow_threshold = 1.0   # 流量阈值(预留)
+        
+        # 定义16个预测目标的原始列名
+        self.target_columns = [
+            'C.M.UF1_DB@press_PV', 'C.M.UF2_DB@press_PV', 'C.M.UF3_DB@press_PV', 'C.M.UF4_DB@press_PV',
+            'UF1Per','UF2Per','UF3Per','UF4Per',
+            'C.M.RO1_DB@DPT_1', 'C.M.RO2_DB@DPT_1', 'C.M.RO3_DB@DPT_1', 'C.M.RO4_DB@DPT_1',
+            'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_2',
+        ]
+        
+        self.raw_input_data = None  
+        set_seed(self.random_seed)    # 初始化随机种子
+        self.scaler = joblib.load('20min_scaler.pkl')    # 加载数据归一化器(训练时保存)
+        self.model = None         # 模型实例(后续加载)
+        self.edge_index = None    # 图结构边索引(图模型用)
+        self.test_loader = None   # 测试数据加载器(后续创建)
+        
+    def ensure_min_rows(self, df):
+        """
+        确保数据至少有600行,不足则进行前后补充
+        向前补充:使用最早的数据向前扩展
+        向后补充:使用最新的数据向后扩展
+        """
+        current_rows = len(df)
+        if current_rows >= self.min_rows:
+            return df
+        
+        # 计算需要补充的行数
+        need_rows = self.min_rows - current_rows
+        print(f"数据行数不足{self.min_rows}行(当前{current_rows}行),需要补充{need_rows}行")
+        
+        # 计算时间间隔(假设数据是均匀采样的)
+        time_col = 'index'
+        df[time_col] = pd.to_datetime(df[time_col])
+        time_diff = (df[time_col].iloc[1] - df[time_col].iloc[0]).total_seconds()
+        
+        # 向前补充(使用最早的数据)
+        forward_rows = need_rows // 2
+        if forward_rows > 0:
+            earliest_data = df.iloc[0:1].copy()
+            forward_data = []
+            for i in range(1, forward_rows + 1):
+                new_row = earliest_data.copy()
+                new_row[time_col] = earliest_data[time_col] - timedelta(seconds=time_diff * i)
+                forward_data.append(new_row)
+            forward_df = pd.concat(forward_data, ignore_index=True)
+            df = pd.concat([forward_df, df], ignore_index=True)
+        
+        # 检查是否还需要向后补充
+        current_rows = len(df)
+        if current_rows < self.min_rows:
+            backward_rows = self.min_rows - current_rows
+            latest_data = df.iloc[-1:].copy()
+            backward_data = []
+            for i in range(1, backward_rows + 1):
+                new_row = latest_data.copy()
+                new_row[time_col] = latest_data[time_col] + timedelta(seconds=time_diff * i)
+                backward_data.append(new_row)
+            backward_df = pd.concat(backward_data, ignore_index=True)
+            df = pd.concat([df, backward_df], ignore_index=True)
+        
+        print(f"数据补充完成,当前行数:{len(df)}行")
+        return df
+
+    def reorder_columns(self, df):
+        """
+        调整数据列顺序,确保与训练时的特征顺序一致
+        避免因列顺序不一致导致模型输入特征错位
+        """
+        desired_order = [
+            'index',
+            'C.M.FT_ZGJJY1@out','C.M.RO1_FT_JS@out','C.M.RO2_FT_JS@out','C.M.RO3_FT_JS@out',
+            'C.M.RO4_FT_JS@out','C.M.UF1_FT_JS@out','C.M.UF2_FT_JS@out','C.M.UF3_FT_JS@out',
+            'C.M.UF4_FT_JS@out','C.M.UF_FT_ZCS@out','C.M.FT_ZGJJY2@out','C.M.FT_ZGJJY3@out',
+            'C.M.FT_ZGJJY4@out','C.M.RO1_PT_JS@out','C.M.RO2_PT_JS@out','C.M.RO3_PT_JS@out',
+            'C.M.UF1_PT_JS@out','C.M.UF2_PT_JS@out','C.M.UF3_PT_JS@out','C.M.UF4_PT_JS@out',
+            'C.M.LT_JSC@out','C.M.RO1_PT_CS@out','C.M.RO1_PT_DJ2@out','C.M.RO2_PT_CS@out',
+            'C.M.RO2_PT_DJ2@out','C.M.RO3_PT_CS@out','C.M.RO3_PT_DJ2@out','C.M.RO4_PT_CS@out',
+            'C.M.RO4_PT_DJ2@out','C.M.RO4_PT_JS@out','C.M.LT_HCl@out','C.M.LT_NaClO@out',
+            'C.M.LT_PAC@out','C.M.LT_QSC@out','C.M.RO_Cond_ZCS@out','C.M.RO_TT_ZJS@out',
+            'C.M.UF1_JSF_kd@out','C.M.UF2_JSF_kd@out','C.M.UF_GSB4_fre@out','C.M.UF_ORP_ZCS@out',
+            'C.M.JYB2_ZGJ1_fre@out','C.M.JYB2_ZGJ2_fre@out','C.M.JYB2_ZGJ3_fre@out','C.M.JYB2_ZGJ4_fre@out',
+            'C.M.RO1_GYB_fre@out','C.M.RO2_GYB_fre@out','C.M.RO3_GYB_fre@out','C.M.RO4_GYB_fre@out',
+            'C.M.UF3_JSF_kd@out','C.M.UF4_JSF_kd@out','C.M.UF_FXB2_fre@out','C.M.RO1_DJB_fre@out',
+            'C.M.RO1_GYBF_kd@out','C.M.RO2_DJB_fre@out','C.M.RO2_GYBF_kd@out','C.M.RO3_DJB_fre@out',
+            'C.M.RO3_GYBF_kd@out','C.M.RO4_DJB_fre@out','C.M.RO4_GYBF_kd@out',
+            'C.M.UF1_DB@press_PV','C.M.UF2_DB@press_PV','C.M.UF3_DB@press_PV','C.M.UF4_DB@press_PV',
+            'UF1Per','UF2Per','UF3Per','UF4Per',
+            'C.M.RO1_DB@DPT_1','C.M.RO2_DB@DPT_1','C.M.RO3_DB@DPT_1','C.M.RO4_DB@DPT_1',
+            'C.M.RO1_DB@DPT_2','C.M.RO2_DB@DPT_2','C.M.RO3_DB@DPT_2','C.M.RO4_DB@DPT_2',
+        ]
+        return df.loc[:, desired_order]
+
+    def process_date(self, data):
+        """
+        处理日期列,生成周期性时间特征(捕捉时间周期性模式)
+        包括:分钟级正弦/余弦特征(每日周期)、年中日正弦/余弦特征(年度周期)
+        """
+        if 'index' in data.columns:
+            data = data.rename(columns={'index': 'date'})
+        data['date'] = pd.to_datetime(data['date'])
+        data['minute_of_day'] = data['date'].dt.hour * 60 + data['date'].dt.minute
+        data['day_of_year'] = data['date'].dt.dayofyear
+        
+        # 周期性编码(将时间转换为正弦/余弦值,确保周期性连续)
+        data['minute_sin'] = np.sin(2 * np.pi * data['minute_of_day'] / 1440)  # 分钟正弦特征
+        data['minute_cos'] = np.cos(2 * np.pi * data['minute_of_day'] / 1440)  # 分钟余弦特征
+        data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366)   # 年中日正弦特征
+        data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366)   # 年中日余弦特征
+        # 移除原始时间列(仅保留编码后的特征)
+        data.drop(columns=['minute_of_day', 'day_of_year'], inplace=True)
+        
+        # 调整列顺序:日期 + 时间特征 + 其他特征
+        time_features = ['minute_sin', 'minute_cos', 'day_year_sin', 'day_year_cos']
+        other_columns = [col for col in data.columns if col not in ['date'] + time_features]
+        return data[['date'] + time_features + other_columns]
+
+    def scaler_data(self, data):
+        """
+        对数据进行归一化(使用训练时保存的scaler)
+        保持与训练数据的归一化方式一致(0-1缩放)
+        """
+        date_col = data[['date']]
+        data_to_scale = data.drop(columns=['date'])
+        scaled = self.scaler.transform(data_to_scale)
+        scaled_df = pd.DataFrame(scaled, columns=data_to_scale.columns)
+        # 拼接日期列和归一化后的特征列
+        return pd.concat([date_col.reset_index(drop=True), scaled_df], axis=1)
+    
+    def remove_outliers(self, predictions):
+        """
+        用四分位法处理预测结果中的异常值
+        异常值定义:小于Q1-1.5*IQR或大于Q3+1.5*IQR的值
+        异常值替换为正常值的平均值(避免极端值影响)
+        """
+        cleaned = predictions.copy()
+        # 遍历每个特征列(16个标签)
+        for col in range(cleaned.shape[1]):
+            values = cleaned[:, col]
+            # 计算四分位数
+            q1 = np.percentile(values, 25)
+            q3 = np.percentile(values, 75)
+            iqr = q3 - q1
+            # 异常值边界
+            lower_bound = q1 - 1.5 * iqr
+            upper_bound = q3 + 1.5 * iqr
+            # 筛选正常值
+            normal_values = values[(values >= lower_bound) & (values <= upper_bound)]
+            # 用正常值的平均值替换异常值
+            if len(normal_values) > 0:
+                mean_normal = np.mean(normal_values)
+                cleaned[(values < lower_bound) | (values > upper_bound), col] = mean_normal
+        return cleaned
+    
+    def smooth_predictions(self, predictions):
+        """
+        对预测结果进行加权平滑处理,减少预测波动
+        采用滑动窗口加权平均:中间值权重为2,前后邻居权重为1(边缘值特殊处理)
+        """
+        smoothed = predictions.copy()
+        n_timesteps = predictions.shape[0]
+        if n_timesteps <= 1:
+            return smoothed
+        
+        # 遍历每个特征列
+        for col in range(predictions.shape[1]):
+            values = predictions[:, col]
+            # 第一个值:加权前两个值(避免边缘过度平滑)
+            smoothed[0, col] = (2 * values[0] + values[1]) / 3
+            # 中间值:加权前后邻居(核心平滑)
+            for i in range(1, n_timesteps - 1):
+                smoothed[i, col] = (values[i-1] + 2 * values[i] + values[i+1]) / 4
+            # 最后一个值:加权最后两个值(避免边缘过度平滑)
+            smoothed[-1, col] = (values[-2] + 2 * values[-1]) / 3
+        return smoothed
+
+    def create_test_loader(self, df):
+        """
+        构建测试数据加载器(将原始数据转换为模型输入格式)
+        输入:预处理后的DataFrame
+        输出:PyTorch DataLoader(批量加载模型输入)
+        """
+        df['date'] = pd.to_datetime(df['date'])
+        # 计算时间间隔(根据分辨率,单位:分钟)
+        time_interval = pd.Timedelta(minutes=(4 * self.resolution / 60))
+        # 计算窗口时间跨度(确保能覆盖输入序列长度+预测步长)
+        window_time_span = time_interval * (self.seq_len + 20)
+        # 调整测试集起始时间(确保有足够的历史数据构建输入序列)
+        adjusted_test_start = pd.to_datetime(self.test_start_date) - window_time_span
+        # 筛选所需的历史数据
+        test_df = df[df['date'] >= adjusted_test_start].reset_index(drop=True)
+
+        test_df = test_df.drop(columns=['date'])
+
+        # 构建监督学习数据集(输入序列+目标序列的占位)
+        feature_columns = test_df.columns.tolist()
+        cols = []
+        
+        # 构建输入序列(历史seq_len个时间步的特征)
+        for col in feature_columns:
+            for i in range(self.seq_len - 1, -1, -1):
+                cols.append(test_df[[col]].shift(i))   # 滞后i步的特征(t-0到t-(seq_len-1))
+                
+        # 构建目标序列占位(未来output_size个时间步的标签,预测时不使用真实值)
+        for i in range(1, self.output_size + 1):
+            for col in feature_columns[-self.labels_num:]:
+                cols.append(test_df[[col]].shift(-i))    # 超前i步的标签(t+1到t+output_size)
+                
+        # 合并列并按步长采样,最后取最后一行作为预测输入(最新的历史数据)
+        dataset = pd.concat(cols, axis=1).iloc[::self.step_size]
+        dataset = dataset.iloc[[-1]]
+    
+        # 提取输入特征(前n_features_total列)
+        n_features_total = self.feature_num * self.seq_len
+        supervised_data = dataset.iloc[:, :n_features_total]
+
+        # 转换为模型输入格式:[样本数, 序列长度, 特征数]
+        X = supervised_data.values.reshape(-1, self.seq_len, self.feature_num)
+        X = torch.tensor(X, dtype=torch.float32).to(self.device)
+        tensor_dataset = TensorDataset(X)
+        loader = DataLoader(tensor_dataset, batch_size=self.batch_size, shuffle=False)
+        return loader
+
+    def load_data(self, df):
+        """
+        数据加载主流程:重排列、下采样、日期处理、归一化、创建测试加载器
+        确保输入数据格式与训练时一致
+        """
+        df = self.reorder_columns(df)
+        df = df.iloc[::self.resolution, :].reset_index(drop=True)
+        df = self.process_date(df)
+        df = self.scaler_data(df)
+        self.test_loader = self.create_test_loader(df)
+        self.edge_index = torch.load('edge_index.pt', map_location=self.device, weights_only=True)
+
+    def load_model(self):
+        """加载模型结构和预训练权重,并设置为评估模式"""
+        self.model = GAT_LSTM(self).to(self.device)
+        if self.edge_index is not None:
+            self.model.set_edge_index(self.edge_index.to(self.device))   # 设置图边索引
+        self.model.load_state_dict(torch.load(self.model_path, map_location=self.device, weights_only=True))
+        self.model.eval()
+
+    def get_recent_values_as_fallback(self):
+        """从原始输入数据中获取最近的output_size条记录作为备用输出"""
+        # 确保原始数据已保存
+        if self.raw_input_data is None:
+            raise ValueError("原始输入数据未保存,无法获取备用值")
+            
+        # 按时间排序并取最近的output_size条
+        recent_data = self.raw_input_data.sort_values('index').tail(self.output_size)
+        
+        # 若数据不足,用最后一条补充
+        if len(recent_data) < self.output_size:
+            last_row = recent_data.iloc[-1:] if not recent_data.empty else pd.DataFrame(
+                {col: [0.0] for col in self.target_columns}, index=[0])
+            while len(recent_data) < self.output_size:
+                recent_data = pd.concat([recent_data, last_row], ignore_index=True)
+        
+        # 提取目标列值并返回
+        fallback_values = recent_data[self.target_columns].values
+        return fallback_values
+    
+    def predict(self, df):
+        """
+        执行预测主流程:更新测试起始时间、加载数据、加载模型、执行预测、反归一化
+        输入:原始数据DataFrame
+        输出:反归一化后的预测结果(numpy数组)
+        """
+        # 保存原始输入数据用于可能的降级策略
+        self.raw_input_data = df.copy()
+        
+        # 确保数据行数不少于600行
+        df = self.ensure_min_rows(df)
+        
+        # 更新测试起始时间为输入数据最新时间+4分钟(预测起始点)
+        self.test_start_date = (pd.to_datetime(df['index']).max() + timedelta(minutes=4)).strftime("%Y-%m-%d %H:%M:%S")
+        self.load_data(df)
+        self.load_model()
+
+        all_predictions = []
+        with torch.no_grad():
+            for batch in self.test_loader:
+                inputs = batch[0].to(self.device)
+                outputs = self.model(inputs)
+                all_predictions.append(outputs.cpu().numpy())
+        
+        # 拼接所有批次的预测结果,并重塑为[时间步, 标签数]
+        predictions = np.concatenate(all_predictions, axis=0).reshape(-1, self.labels_num)
+        
+        # 反归一化(仅对标签列,使用训练时的scaler参数)
+        from sklearn.preprocessing import MinMaxScaler
+        inverse_scaler = MinMaxScaler()
+        inverse_scaler.min_ = self.scaler.min_[-self.labels_num:]
+        inverse_scaler.scale_ = self.scaler.scale_[-self.labels_num:]
+        predictions = inverse_scaler.inverse_transform(predictions)
+        
+        # 可选:异常值处理和平滑(当前注释掉,可根据需求启用)
+        # predictions = self.remove_outliers(predictions)  # 处理异常值
+        # predictions = self.smooth_predictions(predictions)  # 平滑处理
+        if np.isnan(predictions).any():
+            # 用备用值替换
+            predictions = self.get_recent_values_as_fallback()
+            
+        return predictions
+
+    def save_predictions(self, predictions):
+        """
+        将预测结果保存为CSV文件,包含时间戳和各指标的预测值
+        输入:反归一化后的预测结果(numpy数组)
+        """
+        start_time = datetime.strptime(self.test_start_date, "%Y-%m-%d %H:%M:%S")
+        time_interval = timedelta(minutes=(4 * self.resolution / 60))
+        timestamps = [start_time + i * time_interval for i in range(len(predictions))]
+
+        pred_columns = [f'{col}_pred' for col in self.target_columns]
+        df_result = pd.DataFrame(predictions, columns=pred_columns)
+        df_result.insert(0, 'date', timestamps)
+        df_result.to_csv(self.output_csv_path, index=False)
+        print(f"预测结果保存至:{self.output_csv_path}")
+
+if __name__ == '__main__':
+    """主函数:初始化预测器、加载数据、执行预测并保存结果"""
+    import json  # 用于解析JSON结构
+    import os
+    import pandas as pd
+    from datetime import timedelta
+
+    predictor = Predictor()
+    
+    # 读取JSON文件作为输入数据
+    json_file_path = 'pp.json'  # pp.json文件路径,可根据实际位置修改
+    if not os.path.exists(json_file_path):
+        raise FileNotFoundError(f"未找到文件: {json_file_path}")
+    print(f"读取文件:{json_file_path}")
+    
+    # 解析JSON并提取data字段(不使用try,直接判断)
+    with open(json_file_path, 'r', encoding='utf-8') as f:
+        json_data = json.load(f)
+    
+    # 检查data字段存在性及格式
+    if 'data' not in json_data:
+        raise ValueError("JSON文件中未找到'data'字段,请检查结构")
+    data_list = json_data['data']
+    if not isinstance(data_list, list) or len(data_list) == 0:
+        raise ValueError("'data'字段必须是非空列表")
+    
+    # 转换为DataFrame
+    df = pd.DataFrame(data_list)
+    
+    # 检查并处理datetime列
+    if 'datetime' not in df.columns:
+        raise ValueError("数据中未找到'datetime'字段,请检查键名")
+    df = df.rename(columns={'datetime': 'index'})
+    
+    # 转换index列为datetime格式
+    df['index'] = pd.to_datetime(df['index'])  # 若格式错误会直接抛出异常
+    
+    # 执行预测并保存结果
+    predictions = predictor.predict(df)
+    # predictor.save_predictions(predictions)
+    
+    

二進制
models/prediction_models/xishan/20min_scaler.pkl


+ 59 - 0
models/prediction_models/xishan/args.py

@@ -0,0 +1,59 @@
+# args.py
+import argparse
+
+def lstm_args_parser():
+    parser = argparse.ArgumentParser(description="LSTM模型训练参数")
+    
+    # 数据集划分
+    parser.add_argument('--train_start_date', type=str, default='2024-02-23', help='训练集开始日期')
+    parser.add_argument('--train_end_date', type=str, default='2026-03-10', help='训练集结束日期')
+    parser.add_argument('--val_start_date', type=str, default='2024-02-23', help='验证集开始日期')
+    parser.add_argument('--val_end_date', type=str, default='2026-03-10', help='验证集结束日期')
+    parser.add_argument('--test_start_date', type=str, default='2024-02-23', help='测试集开始日期')
+    parser.add_argument('--test_end_date', type=str, default='2026-03-10', help='测试集结束日期')
+
+    # 模型相关参数
+    # 预测20分钟(模型一)和预测90天(模型二)需要改变的参数
+    parser.add_argument('--seq_len', type=int, default=10, help='输入序列的长度(输入步长), 模型一为10, 模型二为4320')
+    parser.add_argument('--output_size', type=int, default=5, help='输出数据的维度(预测步长), 模型一为5, 模型二为2160')
+    parser.add_argument('--step_size', type=int, default=5, help='输入数据间隔,模型一为5, 模型二为2160')
+    parser.add_argument('--resolution', type=int, default=60, help='输入数据分辨率(每多少个数据取一次), 模型一为60, 模型二为900')
+    parser.add_argument('--feature_num', type=int, default=79, help='特征维度, 模型一为79,模型二为16')
+    parser.add_argument('--labels_num', type=int, default=16, help='标签维度(子模型数量), 模型一为16,模型二为8')
+    
+    # 通用参数
+    parser.add_argument('--epochs', type=int, default=500, help='训练轮数')
+    parser.add_argument('--hidden_size', type=int, default=64, help='隐藏层大小')
+    parser.add_argument('--num_layers', type=int, default=1, help='LSTM层数')
+    parser.add_argument('--dropout', type=float, default=0, help='dropout的概率')
+    parser.add_argument('--lr', type=float, default=0.01, help='学习率')
+    parser.add_argument('--batch_size', type=int, default=1024, help='批次大小')
+    
+    # 学习率调度器
+    parser.add_argument('--scheduler_step_size', type=int, default=100, help='学习率调整步长')
+    parser.add_argument('--scheduler_gamma', type=float, default=0.9, help='学习率衰减率')
+    
+    # 早停
+    parser.add_argument('--patience', type=int, default=500, help='早停耐心值')
+    parser.add_argument('--min_delta', type=float, default=1e-10, help='最小改善阈值')
+    
+    # 设备选择
+    parser.add_argument('--device', type=int, default=1, help='选择使用的GPU设备')
+
+    # 数据处理相关参数
+    parser.add_argument('--start_files', type=int, default=1, help='开始文件索引')
+    parser.add_argument('--end_files', type=int, default=69, help='结束文件索引')
+    parser.add_argument('--data_dir', type=str, default='datasets_xishan', help='数据文件夹路径')
+    parser.add_argument('--file_pattern', type=str, default='data_process_{}.csv', help='数据文件命名模式')
+    
+    # 模型保存路径
+    parser.add_argument('--model_path', type=str, default='20min_model.pth', help='模型保存路径')
+    parser.add_argument('--scaler_path', type=str, default='20min_scaler.pkl', help='归一化器路径')
+    parser.add_argument('--output_csv_path', type=str, default='20min_predictions.csv', help='预测文件保存路径')
+    
+    # 随机种子
+    parser.add_argument('--random_seed', type=int, default=1314, help='随机种子')
+
+    args = parser.parse_args()
+    
+    return args

+ 327 - 0
models/prediction_models/xishan/data_preprocessor.py

@@ -0,0 +1,327 @@
+# data_preprocessor.py
+import os
+import torch
+import joblib
+import numpy as np
+import pandas as pd
+from tqdm import tqdm    # 进度条显示
+from sklearn.preprocessing import MinMaxScaler    # 数据归一化工具
+from torch.utils.data import DataLoader, TensorDataset    # PyTorch数据加载工具
+from concurrent.futures import ThreadPoolExecutor    # 多线程读取文件
+
+class DataPreprocessor:
+    """数据预处理类,负责数据加载、划分、转换为模型可输入的格式"""
+    
+    @staticmethod
+    def load_and_process_data(args, data):
+        
+        """
+        加载并处理数据,划分训练/验证/测试集,创建数据加载器
+        参数:
+            args: 配置参数(包含数据集划分日期、序列长度等)
+            data: 预处理后的完整数据(含日期列)
+        返回:
+            train_loader: 训练集数据加载器
+            val_loader: 验证集数据加载器
+            test_loader: 测试集数据加载器
+            data: 原始数据(用于后续处理)
+        """
+        
+        # 处理日期列
+        data['date'] = pd.to_datetime(data['date'])
+        time_interval = pd.Timedelta(minutes=(4 * args.resolution / 60))
+        window_time_span = time_interval * (args.seq_len + 1)
+
+        # 划分训练/验证/测试集(调整起始日期以适应滑动窗口)
+        val_start_date = pd.to_datetime(args.val_start_date)
+        test_start_date = pd.to_datetime(args.test_start_date)
+        
+        # 调整验证集/测试集起始时间(向前推一个窗口,确保有足够历史数据构建输入序列)
+        adjusted_val_start = val_start_date - window_time_span
+        adjusted_test_start = test_start_date - window_time_span
+        
+        # 构建数据集掩码(按日期筛选)
+        train_mask = (data['date'] >= pd.to_datetime(args.train_start_date)) & \
+                     (data['date'] <= pd.to_datetime(args.train_end_date))
+
+        val_mask = (data['date'] >= adjusted_val_start) & \
+                   (data['date'] <= pd.to_datetime(args.val_end_date))
+
+        test_mask = (data['date'] >= adjusted_test_start) & \
+                    (data['date'] <= pd.to_datetime(args.test_end_date))
+
+        # 筛选数据并重置索引
+        train_data = data[train_mask].reset_index(drop=True)
+        val_data = data[val_mask].reset_index(drop=True)
+        test_data = data[test_mask].reset_index(drop=True)
+        
+        # 移除日期列用于建模
+        train_data = train_data.drop(columns=['date'])
+        val_data = val_data.drop(columns=['date'])
+        test_data = test_data.drop(columns=['date'])
+    
+        # 创建监督学习数据集(输入序列+目标序列)
+        train_supervised = DataPreprocessor.create_supervised_dataset(
+            args,
+            train_data,
+            1
+        )
+        
+        val_supervised = DataPreprocessor.create_supervised_dataset(
+            args,
+            val_data,
+            1
+        )
+        
+        test_supervised = DataPreprocessor.create_supervised_dataset(
+            args,
+            test_data,
+            args.step_size
+        )
+        
+        # 转换为DataLoader
+        train_loader = DataPreprocessor.load_data(
+            args, 
+            train_supervised,
+            shuffle=True
+        )
+        
+        val_loader = DataPreprocessor.load_data(
+            args, 
+            val_supervised,
+            shuffle=False
+        )
+        
+        test_loader = DataPreprocessor.load_data(
+            args, 
+            test_supervised,
+            shuffle=False
+        )
+        
+        return train_loader, val_loader, test_loader, data  # 返回原始数据用于后续处理
+    
+    @staticmethod
+    def read_and_combine_csv_files(args):
+        """
+        多线程读取并合并多个CSV文件,进行下采样、日期处理和归一化
+        参数:
+            args: 配置参数(包含数据路径、文件范围等)
+        返回:
+            chunk: 预处理后的合并数据(含日期和归一化特征)
+        """
+        current_dir = os.path.dirname(__file__)
+        parent_dir = os.path.dirname(current_dir)
+        args.data_dir = os.path.join(parent_dir, args.data_dir)
+        
+        def read_file(file_count):
+            """读取单个CSV文件的函数(供多线程调用)"""
+            file_name = args.file_pattern.format(file_count)
+            file_path = os.path.join(args.data_dir, file_name)
+            return pd.read_csv(file_path)
+        
+        # 生成待读取的文件索引列表
+        file_indices = list(range(args.start_files, args.end_files + 1))
+        
+        # 多线程读取文件(加速大文件读取)
+        max_workers = os.cpu_count()
+        with ThreadPoolExecutor(max_workers=max_workers) as executor:
+            results = list(tqdm(executor.map(read_file, file_indices),
+                                total=len(file_indices),
+                                desc="正在读取文件"))
+        
+        all_data = pd.concat(results, ignore_index=True)
+        
+        # 根据feature_num筛选特征列
+        if args.feature_num == 16:
+            # 定义需要保留的16个特征(包含index列用于后续日期处理)
+            '''
+            specified_features = [
+                "C.M.RO1_FT_JS@out",
+                "C.M.RO2_FT_JS@out",
+                "C.M.RO3_FT_JS@out",
+                "C.M.RO4_FT_JS@out",
+                "C.M.RO_TT_ZJS@out",
+                "C.M.RO_Cond_ZCS@out",
+                "C.M.RO1_DB@DPT_1",
+                "C.M.RO1_DB@DPT_2",
+                "C.M.RO2_DB@DPT_1",
+                "C.M.RO2_DB@DPT_2",
+                "C.M.RO3_DB@DPT_1",
+                "C.M.RO3_DB@DPT_2",
+                "C.M.RO4_DB@DPT_1",
+                "C.M.RO4_DB@DPT_2"
+            ]
+            '''
+            specified_features = [
+                "C.M.UF1_FT_JS@out",        # UF1进水流量
+                "C.M.UF2_FT_JS@out",        # UF2进水流量
+                "C.M.UF3_FT_JS@out",        # UF3进水流量
+                "C.M.UF4_FT_JS@out",        # UF4进水流量
+                "C.M.RO_TT_ZJS@out",        # 反渗透总进水温度
+                "C.M.UF_ORP_ZCS@out",       # 超滤总产水ORP
+                "C.M.UF1_DB@press_PV",      # UF1跨膜压差
+                "C.M.UF2_DB@press_PV",      # UF2跨膜压差
+                "C.M.UF3_DB@press_PV",      # UF3跨膜压差
+                "C.M.UF4_DB@press_PV",      # UF4跨膜压差
+                "UF1Per",                   # UF1膜渗透率
+                "UF2Per",                   # UF2膜渗透率
+                "UF3Per",                   # UF3膜渗透率
+                "UF4Per",                   # UF4膜渗透率
+                ]
+
+            # 必须保留'index'列用于后续日期处理,添加到特征列表
+            columns_to_keep = ['index'] + specified_features
+            # 筛选并按指定顺序保留列
+            all_data = all_data[columns_to_keep]
+        # 当feature_num=79时,保持原有所有特征
+        
+        # 按分辨率下采样
+        chunk = all_data.iloc[::args.resolution, :].reset_index(drop=True)
+        
+        # 处理日期和时间特征
+        chunk = DataPreprocessor.process_date(chunk, args)
+        # 归一化
+        chunk = DataPreprocessor.scaler_data(chunk, args)
+        
+        return chunk
+    
+    @staticmethod
+    def process_date(data, args):
+        """
+        处理日期列,根据分辨率生成周期性时间特征
+        参数:
+            data: 含'index'列(原始日期)的DataFrame
+            resolution: 数据分辨率,用于决定生成的时间特征
+        返回:
+            data: 处理后的DataFrame(含日期列和时间特征)
+        """
+        data = data.rename(columns={'index': 'date'})
+        data['date'] = pd.to_datetime(data['date'])
+    
+        # 生成周期性时间特征
+        time_features = []
+        
+        if args.resolution == 60:
+            # 分辨率为60时,生成分钟级和日级特征
+            data['minute_of_day'] = data['date'].dt.hour * 60 + data['date'].dt.minute
+            data['minute_sin'] = np.sin(2 * np.pi * data['minute_of_day'] / 1440)
+            data['minute_cos'] = np.cos(2 * np.pi * data['minute_of_day'] / 1440)
+            time_features.extend(['minute_sin', 'minute_cos'])
+            data.drop(columns=['minute_of_day'], inplace=True)
+        
+        # 两种分辨率下都保留日级特征
+        data['day_of_year'] = data['date'].dt.dayofyear
+        data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366)
+        data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366)
+        time_features.extend(['day_year_sin', 'day_year_cos'])
+        data.drop(columns=['day_of_year'], inplace=True)
+    
+        # 调整列顺序(日期+时间特征+其他特征)
+        other_columns = [col for col in data.columns if col not in ['date'] and col not in time_features]
+        data = data[['date'] + time_features + other_columns]
+    
+        return data
+    
+    @staticmethod
+    def scaler_data(data, args):
+        """
+        对数据进行归一化(0-1缩放),并保存归一化器(供预测时反归一化)
+        参数:
+            data: 含'date'列和特征列的DataFrame
+            args: 配置参数(包含scaler_path)
+        返回:
+            scaled_data: 归一化后的DataFrame(含日期列)
+        """
+        date_col = data[['date']]
+        data_to_scale = data.drop(columns=['date'])
+
+        scaler = MinMaxScaler(feature_range=(0, 1))
+        scaled_data = scaler.fit_transform(data_to_scale)
+        joblib.dump(scaler, args.scaler_path)  # 保存归一化器
+
+        # 转换为DataFrame并拼接日期列
+        scaled_data = pd.DataFrame(scaled_data, columns=data_to_scale.columns)
+        scaled_data = pd.concat([date_col.reset_index(drop=True), scaled_data], axis=1)
+        
+        return scaled_data
+    
+    @staticmethod
+    def create_supervised_dataset(args, data, step_size):
+        """
+        创建监督学习数据集(输入序列+目标序列)
+        输入序列:历史seq_len个时间步的所有特征
+        目标序列:未来output_size个时间步的标签特征(最后labels_num列)
+        参数:
+            args: 配置参数(含seq_len、output_size等)
+            data: 输入数据(不含日期列的特征数据)
+            step_size: 采样步长(每隔step_size取一个样本)
+        返回:
+            dataset: 监督学习数据集(DataFrame)
+        """
+        data = pd.DataFrame(data)
+        cols = []
+        col_names = []
+        
+        feature_columns = data.columns.tolist()
+
+        # 输入序列(t-0到t-(seq_len-1))
+        for col in feature_columns:
+            for i in range(args.seq_len - 1, -1, -1):
+                cols.append(data[[col]].shift(i))
+                col_names.append(f"{col}(t-{i})")
+        
+        # 目标序列(仅取最后labels_num列作为预测目标)
+        target_columns = feature_columns[-args.labels_num:]
+        for i in range(1, args.output_size + 1):
+            for col in target_columns:
+                cols.append(data[[col]].shift(-i))
+                col_names.append(f"{col}(t+{i})")
+
+        # 合并并清洗数据
+        dataset = pd.concat(cols, axis=1)
+        dataset.columns = col_names
+        dataset = dataset.iloc[::step_size, :]  # 按步长采样
+        dataset.dropna(inplace=True)  # 移除含缺失值的行
+        
+        return dataset
+
+    @staticmethod
+    def load_data(args, dataset, shuffle):
+        """
+        将监督学习数据集转换为PyTorch张量,并创建DataLoader
+        参数:
+            args: 配置参数(含特征数、批大小等)
+            dataset: 监督学习数据集(DataFrame)
+            shuffle: 是否打乱数据(训练集True,验证/测试集False)
+        返回:
+            data_loader: PyTorch DataLoader
+        """
+        input_length = args.seq_len
+        n_features = args.feature_num
+        labels_num = args.labels_num
+    
+        n_features_total = n_features * input_length  # 输入特征总维度
+        n_labels_total = args.output_size * labels_num  # 目标总维度
+
+        # 分割输入和目标
+        X = dataset.values[:, :n_features_total]
+        y = dataset.values[:, n_features_total:n_features_total + n_labels_total]
+    
+        # 重塑输入为[样本数, 序列长度, 特征数]
+        X = X.reshape(X.shape[0], input_length, n_features)
+        X = torch.tensor(X, dtype=torch.float32).to(args.device)
+        y = torch.tensor(y, dtype=torch.float32).to(args.device)
+
+        # 创建数据集和数据加载器
+        dataset_tensor = TensorDataset(X, y)
+        generator = torch.Generator()
+        generator.manual_seed(args.random_seed)  # 固定随机种子确保可复现
+        
+        data_loader = DataLoader(
+            dataset_tensor, 
+            batch_size=args.batch_size, 
+            shuffle=shuffle,
+            generator=generator
+        )
+    
+        return data_loader

+ 267 - 0
models/prediction_models/xishan/data_trainer.py

@@ -0,0 +1,267 @@
+# data_trainer.py
+import torch
+import joblib
+import numpy as np
+import pandas as pd
+from sklearn.metrics import r2_score
+from datetime import datetime, timedelta
+from sklearn.preprocessing import MinMaxScaler
+
+class Trainer:
+    def __init__(self, model, args, data):
+        """
+        模型训练器类,负责模型训练、验证、保存和评估
+        参数:
+            model: 待训练的模型实例
+            args: 配置参数
+            data: 原始数据(用于生成评估的时间戳)
+        """
+        self.args = args
+        self.model = model
+        self.data = data
+        
+        # 早停相关参数
+        self.patience = args.patience
+        self.min_delta = args.min_delta
+        self.counter = 0
+        self.early_stop = False
+        self.best_val_loss = float('inf')
+        self.best_model_state = None
+        self.best_epoch = 0
+
+    def train_full_model(self, train_loader, val_loader, optimizer, criterion, scheduler):
+        """
+        联合训练所有8/16个子模型(端到端训练)
+        参数:
+            train_loader: 训练集数据加载器
+            val_loader: 验证集数据加载器
+            optimizer: 优化器(如Adam)
+            criterion: 损失函数(如MSE)
+            scheduler: 学习率调度器
+        返回:
+            训练好的模型(加载最佳权重)
+        """
+        self.counter = 0
+        self.best_val_loss = float('inf')
+        self.early_stop = False
+        self.best_model_state = None
+        self.best_epoch = 0
+        max_epochs = self.args.epochs
+
+        for epoch in range(max_epochs):
+            self.model.train()
+            running_loss = 0.0
+            
+            for inputs, targets in train_loader:
+                inputs = inputs.to(self.args.device)
+                targets = targets.to(self.args.device)  # 整体目标值(包含所有8/16个因变量)
+                
+                optimizer.zero_grad()
+                outputs = self.model(inputs)  # 整体模型输出
+                
+                loss = criterion(outputs, targets)  # 计算整体损失
+                loss.backward()
+                optimizer.step()
+                running_loss += loss.item()
+            
+            train_loss = running_loss / len(train_loader)
+            val_loss = self.validate_full(val_loader, criterion) if val_loader else 0.0
+
+            print(f'Epoch {epoch+1}/{max_epochs}, '
+                  f'Train Loss: {train_loss:.6f}, '
+                  f'Val Loss: {val_loss:.6f}, '
+                  f'LR: {optimizer.param_groups[0]["lr"]:.6f}')
+
+            # 早停逻辑(基于整体验证损失)
+            if val_loader:
+                improved = val_loss < (self.best_val_loss - self.min_delta)
+                if improved:
+                    self.best_val_loss = val_loss
+                    self.counter = 0
+                    self.best_model_state = self.model.state_dict()
+                    self.best_epoch = epoch
+                else:
+                    self.counter += 1
+                    if self.counter >= self.patience:
+                        self.early_stop = True
+                        print(f"早停触发")
+                        
+            scheduler.step()
+            torch.cuda.empty_cache()
+            if self.early_stop:
+                break
+
+        # 加载最佳状态
+        if self.best_model_state is not None:
+            self.model.load_state_dict(self.best_model_state)
+        print(f"最佳迭代: {self.best_epoch+1}, 最佳验证损失: {self.best_val_loss:.6f}")
+        return self.model
+
+    def validate_full(self, val_loader, criterion):
+        """
+        验证整个模型(计算验证集损失)
+        参数:
+            val_loader: 验证集数据加载器
+            criterion: 损失函数
+        返回:
+            平均验证损失
+        """
+        self.model.eval()
+        total_loss = 0.0
+        with torch.no_grad():
+            for inputs, targets in val_loader:
+                inputs = inputs.to(self.args.device)
+                targets = targets.to(self.args.device)  # 整体目标值
+                
+                outputs = self.model(inputs)  # 整体模型输出
+                loss = criterion(outputs, targets)  # 整体损失计算
+                total_loss += loss.item()
+        return total_loss / len(val_loader)
+
+    def save_model(self):
+        """保存模型最佳权重到指定路径"""
+        torch.save(self.model.state_dict(), self.args.model_path)
+        print(f"模型已保存到:{self.args.model_path}")
+            
+    def evaluate_model(self, test_loader, criterion):
+        """
+        评估模型在测试集上的性能,计算R方、RMSE、MAPE等指标,并保存结果
+        参数:
+            test_loader: 测试集数据加载器
+            criterion: 损失函数(用于计算测试损失)
+        返回:
+            各指标的字典(R方、RMSE、MAPE)
+        """
+        self.model.eval()
+        scaler_path = self.args.scaler_path
+        scaler = joblib.load(scaler_path)
+        predictions = []
+        true_values = []
+        device = self.args.device
+        
+        with torch.no_grad():
+            for inputs, targets in test_loader:
+                inputs = inputs.to(device)
+                targets = targets.to(device)
+                outputs = self.model(inputs)
+                predictions.append(outputs.cpu().numpy())
+                true_values.append(targets.cpu().numpy())
+    
+        predictions = np.concatenate(predictions, axis=0)
+        true_values = np.concatenate(true_values, axis=0)
+    
+        # 重塑预测值和真实值形状以匹配反归一化要求
+        reshaped_predictions = predictions.reshape(predictions.shape[0], 
+                                                   self.args.output_size, 
+                                                   self.args.labels_num)
+        predictions = reshaped_predictions.reshape(-1, self.args.labels_num)
+        
+        reshaped_true_values = true_values.reshape(true_values.shape[0], 
+                                                   self.args.output_size, 
+                                                   self.args.labels_num)
+        true_values = reshaped_true_values.reshape(-1, self.args.labels_num)
+    
+        # 反归一化(仅对标签列)
+        column_scaler = MinMaxScaler(feature_range=(0, 1))
+        column_scaler.min_ = scaler.min_[-self.args.labels_num:] 
+        column_scaler.scale_ = scaler.scale_[-self.args.labels_num:] 
+        
+        true_values = column_scaler.inverse_transform(true_values)
+        predictions = column_scaler.inverse_transform(predictions)
+    
+        # 定义列名(8/16个因变量)
+        full_column_names = [
+            'C.M.UF1_DB@press_PV', 'C.M.UF2_DB@press_PV', 'C.M.UF3_DB@press_PV', 'C.M.UF4_DB@press_PV',
+            'UF1Per','UF2Per','UF3Per','UF4Per',
+            'C.M.RO1_DB@DPT_1', 'C.M.RO2_DB@DPT_1', 'C.M.RO3_DB@DPT_1', 'C.M.RO4_DB@DPT_1',
+            'C.M.RO1_DB@DPT_2', 'C.M.RO2_DB@DPT_2', 'C.M.RO3_DB@DPT_2', 'C.M.RO4_DB@DPT_2'
+        ]
+        
+        # 根据labels_num选择对应的列名子集
+        if self.args.labels_num == 16:
+            column_names = full_column_names
+        elif self.args.labels_num == 8:
+            # 取后8个RO相关的列名
+            # column_names = full_column_names[-8:]
+            column_names = full_column_names[:8]
+        else:
+            # 处理不支持的labels_num值(可选,根据需求调整)
+            raise ValueError(f"不支持的labels_num值: {self.args.labels_num},仅支持16或8")
+    
+        # 生成时间序列
+        start_datetime = datetime.strptime(self.args.test_start_date, "%Y-%m-%d")
+        time_interval = timedelta(minutes=(4 * self.args.resolution / 60))
+        total_points = len(predictions)
+        date_times = [start_datetime + i * time_interval for i in range(total_points)]
+        
+        # 保存结果到DataFrame
+        results = pd.DataFrame({'date': date_times})
+
+        # 计算评估指标
+        r2_scores = {}
+        rmse_scores = {}
+        mape_scores = {}
+        metrics_details = []
+        
+        for i, col_name in enumerate(column_names):
+            results[f'{col_name}_True'] = true_values[:, i]
+            results[f'{col_name}_Predicted'] = predictions[:, i]
+
+            var_true = true_values[:, i]
+            var_pred = predictions[:, i]
+
+            # 过滤零值(避免除零错误)
+            non_zero_mask = var_true != 0
+            var_true_nonzero = var_true[non_zero_mask]
+            var_pred_nonzero = var_pred[non_zero_mask]
+
+            r2 = float('nan')
+            rmse = float('nan')
+            mape = float('nan')
+            
+            if len(var_true_nonzero) > 0:
+                r2 = r2_score(var_true_nonzero, var_pred_nonzero)
+                rmse = np.sqrt(np.mean((var_true_nonzero - var_pred_nonzero) ** 2))
+                mape = np.mean(np.abs((var_true_nonzero - var_pred_nonzero) / np.abs(var_true_nonzero))) * 100
+                
+                r2_scores[col_name] = r2
+                rmse_scores[col_name] = rmse
+                mape_scores[col_name] = mape
+                
+                detail = f"{col_name}:\n  R方 = {r2:.6f}\n  RMSE = {rmse:.6f}\n  MAPE = {mape:.6f}%"
+                metrics_details.append(detail)
+                print(f"{col_name} R方: {r2:.6f}")
+            else:
+                metrics_details.append(f"{col_name}: 没有有效数据用于计算指标")
+                print(f"{col_name} 没有有效数据用于计算R方")
+
+        # 计算平均指标
+        valid_r2 = [score for score in r2_scores.values() if not np.isnan(score)]
+        valid_rmse = [score for score in rmse_scores.values() if not np.isnan(score)]
+        valid_mape = [score for score in mape_scores.values() if not np.isnan(score)]
+        
+        avg_r2 = np.mean(valid_r2) if valid_r2 else float('nan')
+        avg_rmse = np.mean(valid_rmse) if valid_rmse else float('nan')
+        avg_mape = np.mean(valid_mape) if valid_mape else float('nan')
+
+        avg_detail = f"\n平均指标:\n  R方 = {avg_r2:.6f}\n  RMSE = {avg_rmse:.6f}\n  MAPE = {avg_mape:.6f}%"
+        if np.isnan(avg_r2):
+            avg_detail = "\n平均指标: 没有有效的指标可用于计算平均值"
+        
+        metrics_details.append(avg_detail)
+        print(avg_detail)
+
+        # 保存结果
+        results.to_csv(self.args.output_csv_path, index=False)
+        print(f"预测结果已保存到:{self.args.output_csv_path}")
+
+        txt_path = self.args.output_csv_path.replace('.csv', '_metrics_results.txt')
+        with open(txt_path, 'w') as f:
+            f.write("各变量预测指标结果:\n")
+            f.write("===================\n\n")
+            for detail in metrics_details:
+                f.write(detail + '\n')
+        
+        print(f"预测指标结果已保存到:{txt_path}")
+        
+        return r2_scores, rmse_scores, mape_scores

二進制
models/prediction_models/xishan/edge_index.pt


+ 102 - 0
models/prediction_models/xishan/gat_lstm.py

@@ -0,0 +1,102 @@
+# gat_lstm.py
+import torch
+import torch.nn as nn    # PyTorch神经网络模块
+
+# 单个独立模型(对应1个因变量)
+class SingleGATLSTM(nn.Module):
+    def __init__(self, args):
+        """
+        单个子模型:包含GAT-LSTM层和输出层,用于预测1个目标指标
+        参数:
+            args: 配置参数(含特征数、隐藏层大小等)
+        """
+        super(SingleGATLSTM, self).__init__()
+        self.args = args
+        
+        # 独立的LSTM层
+        self.lstm = nn.LSTM(
+            input_size=args.feature_num,
+            hidden_size=args.hidden_size,
+            num_layers=args.num_layers,
+            batch_first=True
+        )
+        
+        # 独立的输出层
+        self.final_linear = nn.Sequential(
+            nn.Linear(args.hidden_size, args.hidden_size),
+            nn.LeakyReLU(0.01),
+            nn.Dropout(args.dropout * 0.4),
+            nn.Linear(args.hidden_size, args.output_size)
+        )
+        
+        self._init_weights()
+        
+    def _init_weights(self):
+        """初始化网络权重,加速模型收敛"""
+        for m in self.modules():
+            if isinstance(m, nn.Linear):
+                nn.init.xavier_uniform_(m.weight)
+                if m.bias is not None:
+                    nn.init.zeros_(m.bias)
+            elif isinstance(m, nn.BatchNorm1d):
+                nn.init.constant_(m.weight, 1)
+                nn.init.constant_(m.bias, 0)
+
+        # 初始化LSTM权重
+        for name, param in self.lstm.named_parameters():
+            if 'weight_ih' in name:
+                nn.init.xavier_uniform_(param.data)
+            elif 'weight_hh' in name:
+                nn.init.orthogonal_(param.data)
+            elif 'bias' in name:
+                param.data.fill_(0)
+                n = param.size(0)
+                start, end = n // 4, n // 2
+                param.data[start:end].fill_(1)
+        
+    def forward(self, x):
+        """
+        前向传播:输入序列经过LSTM和输出层,得到预测结果
+        参数:
+            x: 输入序列,形状为[batch_size, seq_len, feature_num]
+        返回:
+            output: 预测结果,形状为[batch_size, output_size]
+        """
+        batch_size, seq_len, feature_num = x.size()
+        lstm_out, _ = self.lstm(x)
+        # 取最后一个时间步的输出
+        last_out = lstm_out[:, -1, :]
+        
+        # 输出层预测
+        output = self.final_linear(last_out)
+        return output  # [batch_size, output_size]
+
+
+# 16个独立模型的容器(总模型)
+class GAT_LSTM(nn.Module):
+    def __init__(self, args):
+        """
+        总模型:包含多个SingleGATLSTM子模型,分别预测不同的目标
+        参数:
+            args: 配置参数(含labels_num,即子模型数量)
+        """
+        super(GAT_LSTM, self).__init__()
+        self.args = args
+        # 创建16个独立模型(数量由labels_num指定)
+        self.models = nn.ModuleList([SingleGATLSTM(args) for _ in range(args.labels_num)])
+    
+    def set_edge_index(self, edge_index):
+        self.edge_index = edge_index  # 将传入的edge_index保存到模型内部
+        
+    def forward(self, x):
+        """
+        前向传播:所有子模型并行处理输入,拼接预测结果
+        参数:
+            x: 输入序列,形状为[batch_size, seq_len, feature_num]
+        返回:
+            拼接后的预测结果,形状为[batch_size, output_size * labels_num]
+        """
+        outputs = []
+        for model in self.models:
+            outputs.append(model(x))  # 每个输出为[batch, output_size]
+        return torch.cat(outputs, dim=1)  # 拼接后[batch, output_size * labels_num]

+ 70 - 0
models/prediction_models/xishan/main.py

@@ -0,0 +1,70 @@
+# main.py
+import os
+import torch
+import numpy as np
+import random
+from gat_lstm import GAT_LSTM
+from data_trainer import Trainer
+from args import lstm_args_parser
+from torch.nn import MSELoss
+from data_preprocessor import DataPreprocessor
+
+def set_seed(seed):
+    random.seed(seed)
+    os.environ['PYTHONHASHSEED'] = str(seed)
+    np.random.seed(seed)
+    torch.manual_seed(seed)
+    torch.cuda.manual_seed(seed)
+    torch.cuda.manual_seed_all(seed)
+    torch.backends.cudnn.deterministic = True
+    torch.backends.cudnn.benchmark = False
+
+
+def main():
+    args = lstm_args_parser()
+    set_seed(args.random_seed)
+    
+    device = torch.device(f"cuda:{args.device}" if torch.cuda.is_available() else "cpu")
+    args.device = device  # 将device存入args,方便后续使用
+
+    # 数据预处理
+    data = DataPreprocessor.read_and_combine_csv_files(args)
+    train_loader, val_loader, test_loader, _ = DataPreprocessor.load_and_process_data(args, data)
+    
+    # 初始化包含16个子模型的整体模型
+    model = GAT_LSTM(args).to(device)
+
+    # 初始化训练器和MSE损失函数
+    trainer = Trainer(model, args, data)
+    criterion = MSELoss()
+
+    # 优化器:优化所有子模型的参数(联合训练)
+    optimizer = torch.optim.Adam(
+        model.parameters(),  # 整体模型参数
+        lr=args.lr
+    )
+    scheduler = torch.optim.lr_scheduler.StepLR(
+        optimizer,
+        step_size=args.scheduler_step_size,
+        gamma=args.scheduler_gamma
+    )
+
+    # 整体训练大模型(包含所有8/16个子模型)
+    print("=== 开始训练包含8/16个子模型的整体模型 ===")
+    trainer.train_full_model(
+        train_loader,
+        val_loader,
+        optimizer,
+        criterion,
+        scheduler
+    )
+
+    # 保存包含所有8/16个子模型参数的整体模型
+    trainer.save_model()
+    print("\n=== 模型训练完成,已保存整体模型 ===")
+
+    # 评估模型
+    trainer.evaluate_model(test_loader, MSELoss())
+
+if __name__ == "__main__":
+    main()