# gat_lstm.py import torch import torch.nn as nn # 单个独立模型(对应1个因变量) class SingleGATLSTM(nn.Module): def __init__(self, args): """ 初始化单个预测子模型 :param args: 包含模型参数的配置对象(如输入特征数、隐藏层大小等) """ super(SingleGATLSTM, self).__init__() self.args = args # 独立的LSTM层 self.lstm = nn.LSTM( input_size=args.feature_num, hidden_size=args.hidden_size, num_layers=args.num_layers, batch_first=True ) # 独立的输出层:将LSTM输出映射到预测结果(输出长度为output_size) self.final_linear = nn.Sequential( nn.Linear(args.hidden_size, args.hidden_size), nn.LeakyReLU(0.01), nn.Dropout(args.dropout * 0.4), nn.Linear(args.hidden_size, args.output_size) ) self._init_weights() def _init_weights(self): """初始化模型权重,提升训练稳定性和收敛速度""" # 初始化线性层权重 for m in self.modules(): if isinstance(m, nn.Linear): nn.init.xavier_uniform_(m.weight) if m.bias is not None: nn.init.zeros_(m.bias) elif isinstance(m, nn.BatchNorm1d): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) # 初始化LSTM权重 for name, param in self.lstm.named_parameters(): if 'weight_ih' in name: nn.init.xavier_uniform_(param.data) elif 'weight_hh' in name: nn.init.orthogonal_(param.data) elif 'bias' in name: param.data.fill_(0) n = param.size(0) start, end = n // 4, n // 2 param.data[start:end].fill_(1) def forward(self, x): """ 前向传播:输入序列 -> LSTM处理 -> 输出层预测 :param x: 输入张量,形状为[batch_size, seq_len, feature_num] :return: 预测结果,形状为[batch_size, output_size] """ batch_size, seq_len, feature_num = x.size() lstm_out, _ = self.lstm(x) # 取最后一个时间步的输出 last_out = lstm_out[:, -1, :] # 输出层预测 output = self.final_linear(last_out) return output # [batch_size, output_size] # 16个独立模型的容器(总模型) class GAT_LSTM(nn.Module): def __init__(self, args): """ 初始化多输出模型容器 :param args: 配置参数,labels_num指定目标特征数量(即子模型数量) """ super(GAT_LSTM, self).__init__() self.args = args # 创建16个独立模型(数量由labels_num指定) self.models = nn.ModuleList([SingleGATLSTM(args) for _ in range(args.labels_num)]) def forward(self, x): """ 前向传播:所有子模型并行处理输入,拼接结果 :param x: 输入张量,形状为[batch_size, seq_len, feature_num] :return: 拼接后的预测结果,形状为[batch_size, output_size * labels_num] """ outputs = [] for model in self.models: outputs.append(model(x)) # 每个输出为[batch, output_size] return torch.cat(outputs, dim=1) # 拼接后[batch, output_size * labels_num]