# gat_lstm.py import torch import torch.nn as nn # 单个独立模型(对应1个因变量) class SingleGATLSTM(nn.Module): def __init__(self, args): super(SingleGATLSTM, self).__init__() self.args = args # 独立的LSTM层 self.lstm = nn.LSTM( input_size=args.feature_num, hidden_size=args.hidden_size, num_layers=args.num_layers, batch_first=True ) # 独立的输出层 self.final_linear = nn.Sequential( nn.Linear(args.hidden_size, args.hidden_size), nn.LeakyReLU(0.01), nn.Dropout(args.dropout * 0.4), nn.Linear(args.hidden_size, args.output_size) ) self._init_weights() def _init_weights(self): # 初始化线性层权重 for m in self.modules(): if isinstance(m, nn.Linear): nn.init.xavier_uniform_(m.weight) if m.bias is not None: nn.init.zeros_(m.bias) elif isinstance(m, nn.BatchNorm1d): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) # 初始化LSTM权重 for name, param in self.lstm.named_parameters(): if 'weight_ih' in name: nn.init.xavier_uniform_(param.data) elif 'weight_hh' in name: nn.init.orthogonal_(param.data) elif 'bias' in name: param.data.fill_(0) n = param.size(0) start, end = n // 4, n // 2 param.data[start:end].fill_(1) def forward(self, x): # LSTM处理输入序列 batch_size, seq_len, feature_num = x.size() lstm_out, _ = self.lstm(x) # 取最后一个时间步的输出 last_out = lstm_out[:, -1, :] # 输出层预测 output = self.final_linear(last_out) return output # [batch_size, output_size] # 16个独立模型的容器(总模型) class GAT_LSTM(nn.Module): def __init__(self, args): super(GAT_LSTM, self).__init__() self.args = args # 创建16个独立模型(数量由labels_num指定) self.models = nn.ModuleList([SingleGATLSTM(args) for _ in range(args.labels_num)]) def forward(self, x): # 收集所有模型的输出并拼接 outputs = [] for model in self.models: outputs.append(model(x)) # 每个输出为[batch, output_size] return torch.cat(outputs, dim=1) # 拼接后[batch, output_size * labels_num]