PS:要转载请注明出处,本人版权所有。
PS: 这个只是基于《我自己》的理解,
如果和你的原则及想法相冲突,请谅解,勿喷。
环境说明
无
前言
本文是这个系列第七篇,它们是:
本文的核心是介绍transformer模型结构,下面是transformer的网络结构示意图(图来源:见参考文献部分)。
从上面的架构图可以知道,在开始介绍之前,需要提前介绍多头注意力、自注意力、位置编码等前置知识。
点积注意力与自注意力
首先我们来介绍一种新的注意力评分方式,点积注意力,其计算公式是:$$text{Attention}(Q, K, V) = text{Softmax}left(frac{Q K^T}{sqrt{d_k}}right) V$$。
回到前面文章中的seq2seq中的注意力机制(一种加法注意力评分方式),其KV来自于encoder的output,Q来自于decoder的隐藏态。这个时候,我们假设一下,如果QKV都是同一种数据,那么每一次Q,都会输出对整个KV(也就是Q本身)的注意力,这种特殊的注意力被称为自注意力。
下面是点积注意力的代码,当QKV都是同一个输入时,下面的注意力就是自注意力。
class DotProductAttention(nn.Module): #@save """Scaled dot product attention.""" def __init__(self, dropout): super().__init__() self.dropout = nn.Dropout(dropout) # Shape of queries: (batch_size, no. of queries, d) # Shape of keys: (batch_size, no. of key-value pairs, d) # Shape of values: (batch_size, no. of key-value pairs, value dimension) # Shape of valid_lens: (batch_size,) or (batch_size, no. of queries) def forward(self, queries, keys, values, valid_lens=None): d = queries.shape[-1] # Swap the last two dimensions of keys with keys.transpose(1, 2) scores = torch.bmm(queries, keys.transpose(1, 2)) / math.sqrt(d) self.attention_weights = masked_softmax(scores, valid_lens) return torch.bmm(self.dropout(self.attention_weights), values)
位置编码
我们知道,我们的序列数据中的每个数据都是在序列中有位置信息的,根据点积注意力的并行计算的实现,我们知道每个序列数据在同一时间进行了运算,没有序列之间的顺序信息。为了让我们的并行计算过程中,让模型感受到序列的顺序信息,因此我们需要在输入数据中含有位置信息,因此有人设计了位置编码。其代码实现如下:
class PositionalEncoding(nn.Module): #@save """Positional encoding.""" def __init__(self, num_hiddens, dropout, max_len=1000): super().__init__() self.dropout = nn.Dropout(dropout) # Create a long enough P self.P = torch.zeros((1, max_len, num_hiddens)) X = torch.arange(max_len, dtype=torch.float32).reshape( -1, 1) / torch.pow(10000, torch.arange( 0, num_hiddens, 2, dtype=torch.float32) / num_hiddens) self.P[:, :, 0::2] = torch.sin(X) self.P[:, :, 1::2] = torch.cos(X) def forward(self, X): X = X + self.P[:, :X.shape[1], :].to(X.device) return self.dropout(X)
当我们的序列数据经过了位置编码后,在进行点积注意力计算时,我们的输入数据有了顺序信息,会让我们的模型学习到序列顺序相关的信息。
多头注意力
注意力机制已经可以对一个数据进行有侧重的关注。但是我们希望的是,注意力机制可以对数据的多个维度的侧重关注,因为我们的数据有很多的不同维度的属性信息。例如:一句英文,其有语法信息、有语境信息、有单词之间的信息等等。
基于这里提到的问题,有人提出了多头注意力机制。从上面的介绍来看,很好理解这个机制,就是每个头单独分析数据的属性,这样我们可以同时关注数据的多个维度的属性,提升我们的模型的理解能力。
其代码实现如下:
class MultiHeadAttention(nn.Module): #@save """Multi-head attention.""" def __init__(self, num_hiddens, num_heads, dropout, bias=False, **kwargs): super().__init__() self.num_heads = num_heads self.attention = DotProductAttention(dropout) self.W_q = nn.LazyLinear(num_hiddens, bias=bias) self.W_k = nn.LazyLinear(num_hiddens, bias=bias) self.W_v = nn.LazyLinear(num_hiddens, bias=bias) self.W_o = nn.LazyLinear(num_hiddens, bias=bias) def transpose_qkv(self, X): """Transposition for parallel computation of multiple attention heads.""" # Shape of input X: (batch_size, no. of queries or key-value pairs, # num_hiddens). Shape of output X: (batch_size, no. of queries or # key-value pairs, num_heads, num_hiddens / num_heads) X = X.reshape(X.shape[0], X.shape[1], self.num_heads, -1) # Shape of output X: (batch_size, num_heads, no. of queries or key-value # pairs, num_hiddens / num_heads) X = X.permute(0, 2, 1, 3) # Shape of output: (batch_size * num_heads, no. of queries or key-value # pairs, num_hiddens / num_heads) return X.reshape(-1, X.shape[2], X.shape[3]) def transpose_output(self, X): """Reverse the operation of transpose_qkv.""" X = X.reshape(-1, self.num_heads, X.shape[1], X.shape[2]) X = X.permute(0, 2, 1, 3) return X.reshape(X.shape[0], X.shape[1], -1) def forward(self, queries, keys, values, valid_lens): # Shape of queries, keys, or values: # (batch_size, no. of queries or key-value pairs, num_hiddens) # Shape of valid_lens: (batch_size,) or (batch_size, no. of queries) # After transposing, shape of output queries, keys, or values: # (batch_size * num_heads, no. of queries or key-value pairs, # num_hiddens / num_heads) queries = self.transpose_qkv(self.W_q(queries)) keys = self.transpose_qkv(self.W_k(keys)) values = self.transpose_qkv(self.W_v(values)) if valid_lens is not None: # On axis 0, copy the first item (scalar or vector) for num_heads # times, then copy the next item, and so on valid_lens = torch.repeat_interleave( valid_lens, repeats=self.num_heads, dim=0) # Shape of output: (batch_size * num_heads, no. of queries, # num_hiddens / num_heads) output = self.attention(queries, keys, values, valid_lens) # Shape of output_concat: (batch_size, no. of queries, num_hiddens) output_concat = self.transpose_output(output) return self.W_o(output_concat)
上面的代码透露了一个问题,多头注意力并不是简单的创建N个相同的注意力进行运算,而是通过nn.LazyLinear投影后,在num_hiddens维度进行num_heads个数的划分,注意经过nn.LazyLinear后,num_hiddens维度的每一个数据其实都和输入的数据有关联,因此这个时候进行num_heads个数的划分是有效的,因为这个时候每个num_heads的组都携带了输入数据的全部信息。
位置前馈网络
引入非线性计算,加强网络认知能力。代码如下:
class PositionWiseFFN(nn.Module): #@save """The positionwise feed-forward network.""" def __init__(self, ffn_num_hiddens, ffn_num_outputs): super().__init__() self.dense1 = nn.LazyLinear(ffn_num_hiddens) self.relu = nn.ReLU() self.dense2 = nn.LazyLinear(ffn_num_outputs) def forward(self, X): return self.dense2(self.relu(self.dense1(X)))
残差连接和层归一化
这个结构主要将原始输入叠加到一个其他计算(例如注意力)的输出上面,这样可以保证输出不会丢失原始输入信息,这个在网络层数大的情况下有奇效。代码如下:
class AddNorm(nn.Module): #@save """The residual connection followed by layer normalization.""" def __init__(self, norm_shape, dropout): super().__init__() self.dropout = nn.Dropout(dropout) self.ln = nn.LayerNorm(norm_shape) def forward(self, X, Y): return self.ln(self.dropout(Y) + X)
下面是transformer-Encoder部分的代码
class TransformerEncoderBlock(nn.Module): #@save """The Transformer encoder block.""" def __init__(self, num_hiddens, ffn_num_hiddens, num_heads, dropout, use_bias=False): super().__init__() self.attention = MultiHeadAttention(num_hiddens, num_heads, dropout, use_bias) self.addnorm1 = AddNorm(num_hiddens, dropout) self.ffn = PositionWiseFFN(ffn_num_hiddens, num_hiddens) self.addnorm2 = AddNorm(num_hiddens, dropout) def forward(self, X, valid_lens): Y = self.addnorm1(X, self.attention(X, X, X, valid_lens)) return self.addnorm2(Y, self.ffn(Y))
从代码中可以知道,其计算过程就是多头注意力、残差连接及层归一化、位置前馈网络、残差连接及层归一化的过程。
下面是transformer-Decoder部分的代码
class TransformerDecoderBlock(nn.Module): # The i-th block in the Transformer decoder def __init__(self, num_hiddens, ffn_num_hiddens, num_heads, dropout, i): super().__init__() self.i = i self.attention1 = MultiHeadAttention(num_hiddens, num_heads, dropout) self.addnorm1 = AddNorm(num_hiddens, dropout) self.attention2 = MultiHeadAttention(num_hiddens, num_heads, dropout) self.addnorm2 = AddNorm(num_hiddens, dropout) self.ffn = PositionWiseFFN(ffn_num_hiddens, num_hiddens) self.addnorm3 = AddNorm(num_hiddens, dropout) def forward(self, X, state): enc_outputs, enc_valid_lens = state[0], state[1] # During training, all the tokens of any output sequence are processed # at the same time, so state[2][self.i] is None as initialized. When # decoding any output sequence token by token during prediction, # state[2][self.i] contains representations of the decoded output at # the i-th block up to the current time step if state[2][self.i] is None: key_values = X else: key_values = torch.cat((state[2][self.i], X), dim=1) state[2][self.i] = key_values if self.training: batch_size, num_steps, _ = X.shape # Shape of dec_valid_lens: (batch_size, num_steps), where every # row is [1, 2, ..., num_steps] dec_valid_lens = torch.arange( 1, num_steps + 1, device=X.device).repeat(batch_size, 1) else: dec_valid_lens = None # Self-attention X2 = self.attention1(X, key_values, key_values, dec_valid_lens) Y = self.addnorm1(X, X2) # Encoder-decoder attention. Shape of enc_outputs: # (batch_size, num_steps, num_hiddens) Y2 = self.attention2(Y, enc_outputs, enc_outputs, enc_valid_lens) Z = self.addnorm2(Y, Y2) return self.addnorm3(Z, self.ffn(Z)), state
从代码中可以知道,其计算过程就是多头注意力、残差连接及层归一化、多头注意力、残差连接及层归一化、位置前馈网络、残差连接及层归一化的过程。
关于dataset部分的内容,请参考前面seq2seq相关文章。
完整代码如下
import os import random import torch import math from torch import nn from torch.nn import functional as F import numpy as np import time import visdom import collections import dataset class Accumulator: """在n个变量上累加""" def __init__(self, n): """Defined in :numref:`sec_softmax_scratch`""" self.data = [0.0] * n def add(self, *args): self.data = [a + float(b) for a, b in zip(self.data, args)] def reset(self): self.data = [0.0] * len(self.data) def __getitem__(self, idx): return self.data[idx] class Timer: """记录多次运行时间""" def __init__(self): """Defined in :numref:`subsec_linear_model`""" self.times = [] self.start() def start(self): """启动计时器""" self.tik = time.time() def stop(self): """停止计时器并将时间记录在列表中""" self.times.append(time.time() - self.tik) return self.times[-1] def avg(self): """返回平均时间""" return sum(self.times) / len(self.times) def sum(self): """返回时间总和""" return sum(self.times) def cumsum(self): """返回累计时间""" return np.array(self.times).cumsum().tolist() class Encoder(nn.Module): """编码器-解码器架构的基本编码器接口""" def __init__(self, **kwargs): # 调用父类nn.Module的构造函数,确保正确初始化 super(Encoder, self).__init__(**kwargs) def forward(self, X, *args): # 抛出未实现错误,意味着该方法需要在子类中具体实现 raise NotImplementedError class Decoder(nn.Module): """编码器-解码器架构的基本解码器接口 Defined in :numref:`sec_encoder-decoder`""" def __init__(self, **kwargs): # 调用父类nn.Module的构造函数,确保正确初始化 super(Decoder, self).__init__(**kwargs) def init_state(self, enc_outputs, *args): # 抛出未实现错误,意味着该方法需要在子类中具体实现 raise NotImplementedError def forward(self, X, state): # 抛出未实现错误,意味着该方法需要在子类中具体实现 raise NotImplementedError class EncoderDecoder(nn.Module): """编码器-解码器架构的基类 Defined in :numref:`sec_encoder-decoder`""" def __init__(self, encoder, decoder, **kwargs): # 调用父类nn.Module的构造函数,确保正确初始化 super(EncoderDecoder, self).__init__(**kwargs) # 将传入的编码器实例赋值给类的属性 self.encoder = encoder # 将传入的解码器实例赋值给类的属性 self.decoder = decoder def forward(self, enc_X, dec_X, enc_X_valid_len, *args): # 调用编码器的前向传播方法,处理输入的编码器输入数据enc_X enc_outputs = self.encoder(enc_X, enc_X_valid_len, *args) # 调用解码器的init_state方法,根据编码器的输出初始化解码器的状态 dec_state = self.decoder.init_state(enc_outputs, enc_X_valid_len) # 调用解码器的前向传播方法,处理输入的解码器输入数据dec_X和初始化后的状态 return self.decoder(dec_X, dec_state) def masked_softmax(X, valid_lens): #@save """Perform softmax operation by masking elements on the last axis.""" # X: 3D tensor, valid_lens: 1D or 2D tensor def _sequence_mask(X, valid_len, value=0): maxlen = X.size(1) mask = torch.arange((maxlen), dtype=torch.float32, device=X.device)[None, :] < valid_len[:, None] X[~mask] = value return X if valid_lens is None: return nn.functional.softmax(X, dim=-1) else: shape = X.shape if valid_lens.dim() == 1: valid_lens = torch.repeat_interleave(valid_lens, shape[1]) else: valid_lens = valid_lens.reshape(-1) # On the last axis, replace masked elements with a very large negative # value, whose exponentiation outputs 0 X = _sequence_mask(X.reshape(-1, shape[-1]), valid_lens, value=-1e6) return nn.functional.softmax(X.reshape(shape), dim=-1) class DotProductAttention(nn.Module): #@save """Scaled dot product attention.""" def __init__(self, dropout): super().__init__() self.dropout = nn.Dropout(dropout) # Shape of queries: (batch_size, no. of queries, d) # Shape of keys: (batch_size, no. of key-value pairs, d) # Shape of values: (batch_size, no. of key-value pairs, value dimension) # Shape of valid_lens: (batch_size,) or (batch_size, no. of queries) def forward(self, queries, keys, values, valid_lens=None): d = queries.shape[-1] # Swap the last two dimensions of keys with keys.transpose(1, 2) scores = torch.bmm(queries, keys.transpose(1, 2)) / math.sqrt(d) self.attention_weights = masked_softmax(scores, valid_lens) return torch.bmm(self.dropout(self.attention_weights), values) class MultiHeadAttention(nn.Module): #@save """Multi-head attention.""" def __init__(self, num_hiddens, num_heads, dropout, bias=False, **kwargs): super().__init__() self.num_heads = num_heads self.attention = DotProductAttention(dropout) self.W_q = nn.LazyLinear(num_hiddens, bias=bias) self.W_k = nn.LazyLinear(num_hiddens, bias=bias) self.W_v = nn.LazyLinear(num_hiddens, bias=bias) self.W_o = nn.LazyLinear(num_hiddens, bias=bias) def transpose_qkv(self, X): """Transposition for parallel computation of multiple attention heads.""" # Shape of input X: (batch_size, no. of queries or key-value pairs, # num_hiddens). Shape of output X: (batch_size, no. of queries or # key-value pairs, num_heads, num_hiddens / num_heads) X = X.reshape(X.shape[0], X.shape[1], self.num_heads, -1) # Shape of output X: (batch_size, num_heads, no. of queries or key-value # pairs, num_hiddens / num_heads) X = X.permute(0, 2, 1, 3) # Shape of output: (batch_size * num_heads, no. of queries or key-value # pairs, num_hiddens / num_heads) return X.reshape(-1, X.shape[2], X.shape[3]) def transpose_output(self, X): """Reverse the operation of transpose_qkv.""" X = X.reshape(-1, self.num_heads, X.shape[1], X.shape[2]) X = X.permute(0, 2, 1, 3) return X.reshape(X.shape[0], X.shape[1], -1) def forward(self, queries, keys, values, valid_lens): # Shape of queries, keys, or values: # (batch_size, no. of queries or key-value pairs, num_hiddens) # Shape of valid_lens: (batch_size,) or (batch_size, no. of queries) # After transposing, shape of output queries, keys, or values: # (batch_size * num_heads, no. of queries or key-value pairs, # num_hiddens / num_heads) queries = self.transpose_qkv(self.W_q(queries)) keys = self.transpose_qkv(self.W_k(keys)) values = self.transpose_qkv(self.W_v(values)) if valid_lens is not None: # On axis 0, copy the first item (scalar or vector) for num_heads # times, then copy the next item, and so on valid_lens = torch.repeat_interleave( valid_lens, repeats=self.num_heads, dim=0) # Shape of output: (batch_size * num_heads, no. of queries, # num_hiddens / num_heads) output = self.attention(queries, keys, values, valid_lens) # Shape of output_concat: (batch_size, no. of queries, num_hiddens) output_concat = self.transpose_output(output) return self.W_o(output_concat) class PositionWiseFFN(nn.Module): #@save """The positionwise feed-forward network.""" def __init__(self, ffn_num_hiddens, ffn_num_outputs): super().__init__() self.dense1 = nn.LazyLinear(ffn_num_hiddens) self.relu = nn.ReLU() self.dense2 = nn.LazyLinear(ffn_num_outputs) def forward(self, X): return self.dense2(self.relu(self.dense1(X))) class AddNorm(nn.Module): #@save """The residual connection followed by layer normalization.""" def __init__(self, norm_shape, dropout): super().__init__() self.dropout = nn.Dropout(dropout) self.ln = nn.LayerNorm(norm_shape) def forward(self, X, Y): return self.ln(self.dropout(Y) + X) class TransformerEncoderBlock(nn.Module): #@save """The Transformer encoder block.""" def __init__(self, num_hiddens, ffn_num_hiddens, num_heads, dropout, use_bias=False): super().__init__() self.attention = MultiHeadAttention(num_hiddens, num_heads, dropout, use_bias) self.addnorm1 = AddNorm(num_hiddens, dropout) self.ffn = PositionWiseFFN(ffn_num_hiddens, num_hiddens) self.addnorm2 = AddNorm(num_hiddens, dropout) def forward(self, X, valid_lens): Y = self.addnorm1(X, self.attention(X, X, X, valid_lens)) return self.addnorm2(Y, self.ffn(Y)) class PositionalEncoding(nn.Module): #@save """Positional encoding.""" def __init__(self, num_hiddens, dropout, max_len=1000): super().__init__() self.dropout = nn.Dropout(dropout) # Create a long enough P self.P = torch.zeros((1, max_len, num_hiddens)) X = torch.arange(max_len, dtype=torch.float32).reshape( -1, 1) / torch.pow(10000, torch.arange( 0, num_hiddens, 2, dtype=torch.float32) / num_hiddens) self.P[:, :, 0::2] = torch.sin(X) self.P[:, :, 1::2] = torch.cos(X) def forward(self, X): X = X + self.P[:, :X.shape[1], :].to(X.device) return self.dropout(X) class TransformerEncoder(Encoder): #@save """The Transformer encoder.""" def __init__(self, vocab_size, num_hiddens, ffn_num_hiddens, num_heads, num_blks, dropout, use_bias=False): super().__init__() self.num_hiddens = num_hiddens self.embedding = nn.Embedding(vocab_size, num_hiddens) self.pos_encoding = PositionalEncoding(num_hiddens, dropout) self.blks = nn.Sequential() for i in range(num_blks): self.blks.add_module("block"+str(i), TransformerEncoderBlock( num_hiddens, ffn_num_hiddens, num_heads, dropout, use_bias)) def forward(self, X, valid_lens): # Since positional encoding values are between -1 and 1, the embedding # values are multiplied by the square root of the embedding dimension # to rescale before they are summed up # X[batch_size, seq_len, num_hidden] X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens)) self.attention_weights = [None] * len(self.blks) for i, blk in enumerate(self.blks): X = blk(X, valid_lens) self.attention_weights[i] = blk.attention.attention.attention_weights # X[batch_size, seq_len, num_hidden] return X class TransformerDecoderBlock(nn.Module): # The i-th block in the Transformer decoder def __init__(self, num_hiddens, ffn_num_hiddens, num_heads, dropout, i): super().__init__() self.i = i self.attention1 = MultiHeadAttention(num_hiddens, num_heads, dropout) self.addnorm1 = AddNorm(num_hiddens, dropout) self.attention2 = MultiHeadAttention(num_hiddens, num_heads, dropout) self.addnorm2 = AddNorm(num_hiddens, dropout) self.ffn = PositionWiseFFN(ffn_num_hiddens, num_hiddens) self.addnorm3 = AddNorm(num_hiddens, dropout) def forward(self, X, state): enc_outputs, enc_valid_lens = state[0], state[1] # During training, all the tokens of any output sequence are processed # at the same time, so state[2][self.i] is None as initialized. When # decoding any output sequence token by token during prediction, # state[2][self.i] contains representations of the decoded output at # the i-th block up to the current time step if state[2][self.i] is None: key_values = X else: key_values = torch.cat((state[2][self.i], X), dim=1) state[2][self.i] = key_values if self.training: batch_size, num_steps, _ = X.shape # Shape of dec_valid_lens: (batch_size, num_steps), where every # row is [1, 2, ..., num_steps] dec_valid_lens = torch.arange( 1, num_steps + 1, device=X.device).repeat(batch_size, 1) else: dec_valid_lens = None # Self-attention X2 = self.attention1(X, key_values, key_values, dec_valid_lens) Y = self.addnorm1(X, X2) # Encoder-decoder attention. Shape of enc_outputs: # (batch_size, num_steps, num_hiddens) Y2 = self.attention2(Y, enc_outputs, enc_outputs, enc_valid_lens) Z = self.addnorm2(Y, Y2) return self.addnorm3(Z, self.ffn(Z)), state class TransformerDecoder(Decoder): def __init__(self, vocab_size, num_hiddens, ffn_num_hiddens, num_heads, num_blks, dropout): super().__init__() self.num_hiddens = num_hiddens self.num_blks = num_blks self.embedding = nn.Embedding(vocab_size, num_hiddens) self.pos_encoding = PositionalEncoding(num_hiddens, dropout) self.blks = nn.Sequential() for i in range(num_blks): self.blks.add_module("block"+str(i), TransformerDecoderBlock( num_hiddens, ffn_num_hiddens, num_heads, dropout, i)) self.dense = nn.LazyLinear(vocab_size) def init_state(self, enc_outputs, enc_valid_lens): return [enc_outputs, enc_valid_lens, [None] * self.num_blks] def forward(self, X, state): X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens)) self._attention_weights = [[None] * len(self.blks) for _ in range (2)] for i, blk in enumerate(self.blks): X, state = blk(X, state) # Decoder self-attention weights self._attention_weights[0][ i] = blk.attention1.attention.attention_weights # Encoder-decoder attention weights self._attention_weights[1][ i] = blk.attention2.attention.attention_weights return self.dense(X), state @property def attention_weights(self): return self._attention_weights def sequence_mask(X, valid_len, value=0): """在序列中屏蔽不相关的项""" maxlen = X.size(1) mask = torch.arange((maxlen), dtype=torch.float32, device=X.device)[None, :] < valid_len[:, None] X[~mask] = value return X class MaskedSoftmaxCELoss(nn.CrossEntropyLoss): """带遮蔽的softmax交叉熵损失函数""" # pred的形状:(batch_size,num_steps,vocab_size) # label的形状:(batch_size,num_steps) # valid_len的形状:(batch_size,) def forward(self, pred, label, valid_len): weights = torch.ones_like(label) weights = sequence_mask(weights, valid_len) self.reduction='none' unweighted_loss = super(MaskedSoftmaxCELoss, self).forward( pred.permute(0, 2, 1), label) weighted_loss = (unweighted_loss * weights).mean(dim=1) return weighted_loss def grad_clipping(net, theta): #@save """裁剪梯度""" if isinstance(net, nn.Module): params = [p for p in net.parameters() if p.requires_grad] else: params = net.params norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params)) if norm > theta: for param in params: param.grad[:] *= theta / norm def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device): """训练序列到序列模型""" def xavier_init_weights(m): if type(m) == nn.Linear: nn.init.xavier_uniform_(m.weight) if type(m) == nn.GRU: for param in m._flat_weights_names: if "weight" in param: nn.init.xavier_uniform_(m._parameters[param]) net.apply(xavier_init_weights) net.to(device) optimizer = torch.optim.Adam(net.parameters(), lr=lr) loss = MaskedSoftmaxCELoss() net.train() vis = visdom.Visdom(env=u'test1', server="http://127.0.0.1", port=8097) animator = vis for epoch in range(num_epochs): timer = Timer() metric = Accumulator(2) # 训练损失总和,词元数量 for batch in data_iter: #清零(reset)优化器中的梯度缓存 optimizer.zero_grad() # x.shape = [batch_size, num_steps] X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch] # bos.shape = batch_size 个 bos-id bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0], device=device).reshape(-1, 1) # dec_input.shape = (batch_size, num_steps) # 解码器的输入通常由序列的起始标志 bos 和目标序列(去掉末尾的部分 Y[:, :-1])组成。 dec_input = torch.cat([bos, Y[:, :-1]], 1) # 强制教学 # Y_hat的形状:(batch_size,num_steps,vocab_size) Y_hat, _ = net(X, dec_input, X_valid_len) l = loss(Y_hat, Y, Y_valid_len) l.sum().backward() # 损失函数的标量进行“反向传播” grad_clipping(net, 1) num_tokens = Y_valid_len.sum() optimizer.step() with torch.no_grad(): metric.add(l.sum(), num_tokens) if (epoch + 1) % 10 == 0: # print(predict('你是?')) # print(epoch) # animator.add(epoch + 1, ) if epoch == 9: # 清空图表:使用空数组来替换现有内容 vis.line(X=np.array([0]), Y=np.array([0]), win='train_ch8', update='replace') # _loss_val = l # _loss_val = _loss_val.cpu().sum().detach().numpy() vis.line( X=np.array([epoch + 1]), Y=[ metric[0] / metric[1]], win='train_ch8', update='append', opts={ 'title': 'train_ch8', 'xlabel': 'epoch', 'ylabel': 'loss', 'linecolor': np.array([[0, 0, 255]]), # 蓝色线条 } ) print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} ' f'tokens/sec on {str(device)}') torch.save(net.cpu().state_dict(), 'model_h.pt') # [[6]] torch.save(net.cpu(), 'model.pt') # [[6]] def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps, device, save_attention_weights=False): """序列到序列模型的预测""" # 在预测时将net设置为评估模式 net.eval() src_tokens = src_vocab[src_sentence.lower().split(' ')] + [ src_vocab['<eos>']] enc_valid_len = torch.tensor([len(src_tokens)], device=device) src_tokens = dataset.truncate_pad(src_tokens, num_steps, src_vocab['<pad>']) # 添加批量轴 enc_X = torch.unsqueeze( torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0) enc_outputs = net.encoder(enc_X, enc_valid_len) dec_state = net.decoder.init_state(enc_outputs, enc_valid_len) # 添加批量轴 dec_X = torch.unsqueeze(torch.tensor( [tgt_vocab['<bos>']], dtype=torch.long, device=device), dim=0) output_seq, attention_weight_seq = [], [] for _ in range(num_steps): Y, dec_state = net.decoder(dec_X, dec_state) # 我们使用具有预测最高可能性的词元,作为解码器在下一时间步的输入 dec_X = Y.argmax(dim=2) pred = dec_X.squeeze(dim=0).type(torch.int32).item() # 保存注意力权重(稍后讨论) if save_attention_weights: # 2'st block&2'st attention attention_weight_seq.append(net.decoder.attention_weights[1][1].cpu()) # 一旦序列结束词元被预测,输出序列的生成就完成了 if pred == tgt_vocab['<eos>']: break output_seq.append(pred) return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq def bleu(pred_seq, label_seq, k): #@save """计算BLEU""" pred_tokens, label_tokens = pred_seq.split(' '), [i for i in label_seq] len_pred, len_label = len(pred_tokens), len(label_tokens) score = math.exp(min(0, 1 - len_label / len_pred)) for n in range(1, k + 1): num_matches, label_subs = 0, collections.defaultdict(int) for i in range(len_label - n + 1): label_subs[' '.join(label_tokens[i: i + n])] += 1 for i in range(len_pred - n + 1): if label_subs[' '.join(pred_tokens[i: i + n])] > 0: num_matches += 1 label_subs[' '.join(pred_tokens[i: i + n])] -= 1 score *= math.pow(num_matches / (len_pred - n + 1), math.pow(0.5, n)) return score def try_gpu(i=0): """如果存在,则返回gpu(i),否则返回cpu() Defined in :numref:`sec_use_gpu`""" if torch.cuda.device_count() >= i + 1: return torch.device(f'cuda:{i}') return torch.device('cpu') from matplotlib import pyplot as plt import matplotlib # from matplotlib_inline import backend_inline def show_heatmaps(matrices, xlabel, ylabel, titles=None, figsize=(2.5, 2.5), cmap='Reds'): """ 显示矩阵的热图(Heatmaps)。 这个函数旨在以子图网格的形式绘制多个矩阵,通常用于可视化注意力权重等。 参数: matrices (numpy.ndarray 或 torch.Tensor 数组): 一个四维数组,形状应为 (num_rows, num_cols, height, width)。 其中,num_rows 和 num_cols 决定了子图网格的布局, height 和 width 是每个热图(即每个矩阵)的维度。 xlabel (str): 所有最底行子图的 x 轴标签。 ylabel (str): 所有最左列子图的 y 轴标签。 titles (list of str, optional): 一个包含 num_cols 个标题的列表,用于设置每一列子图的标题。默认 None。 figsize (tuple, optional): 整个图形(figure)的大小。默认 (2.5, 2.5)。 cmap (str, optional): 用于绘制热图的颜色映射(colormap)。默认 'Reds'。 """ # 导入所需的 matplotlib 模块,确保图形在 Jupyter/IPython 环境中正确显示为 SVG 格式 # (假设在包含这个函数的环境中已经导入了 matplotlib 的 backend_inline) # backend_inline.set_matplotlib_formats('svg') matplotlib.use('TkAgg') # 从输入的 matrices 形状中解构出子图网格的行数和列数 # 假设 matrices 的形状是 (num_rows, num_cols, height, width) num_rows, num_cols, _, _ = matrices.shape # 创建一个包含多个子图(axes)的图形(fig) # fig: 整个图形对象 # axes: 一个 num_rows x num_cols 的子图对象数组 fig, axes = plt.subplots( num_rows, num_cols, figsize=figsize, sharex=True, # 所有子图共享 x 轴刻度 sharey=True, # 所有子图共享 y 轴刻度 squeeze=False # 即使只有一行或一列,也强制返回二维数组的 axes,方便后续循环 ) # 遍历子图的行和对应的矩阵行 # i 是行索引, row_axes 是当前行的子图数组, row_matrices 是当前行的矩阵数组 for i, (row_axes, row_matrices) in enumerate(zip(axes, matrices)): # 遍历当前行中的子图和对应的矩阵 # j 是列索引, ax 是当前的子图对象, matrix 是当前的待绘矩阵 for j, (ax, matrix) in enumerate(zip(row_axes, row_matrices)): # 使用 ax.imshow() 绘制热图 # matrix.detach().numpy():将 PyTorch Tensor 转换为 numpy 数组,并从计算图中分离(如果它是 Tensor) # cmap:指定颜色映射 pcm = ax.imshow(matrix.detach().numpy(), cmap=cmap) # --- 设置轴标签和标题 --- # 只有最底行 (i == num_rows - 1) 的子图才显示 x 轴标签 if i == num_rows - 1: ax.set_xlabel(xlabel) # 只有最左列 (j == 0) 的子图才显示 y 轴标签 if j == 0: ax.set_ylabel(ylabel) # 如果提供了标题列表,则设置当前列的子图标题(所有行共享列标题) if titles: ax.set_title(titles[j]) # --- 添加颜色条(Colorbar) --- # 为整个图形添加一个颜色条,用于表示数值和颜色的对应关系 # pcm: 之前绘制的第一个热图返回的 Colormap # ax=axes: 颜色条将参照整个子图网格进行定位和缩放 # shrink=0.6: 缩小颜色条的高度/长度,使其只占图形高度的 60% fig.colorbar(pcm, ax=axes, shrink=0.6) plt.show() if __name__ == '__main__': num_hiddens, num_blks, dropout = 256, 2, 0.2 ffn_num_hiddens, num_heads = 64, 4 batch_size = 1024 num_steps = 10 lr, num_epochs, device = 0.001, 2000, try_gpu() train_iter, src_vocab, tgt_vocab, source, target = dataset.load_data(batch_size, num_steps) encoder = TransformerEncoder( len(src_vocab), num_hiddens, ffn_num_hiddens, num_heads, num_blks, dropout) decoder = TransformerDecoder( len(tgt_vocab), num_hiddens, ffn_num_hiddens, num_heads, num_blks, dropout) net = EncoderDecoder(encoder, decoder) is_train = False is_show = True if is_train: train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device) elif is_show: state_dict = torch.load('model_h.pt') net.load_state_dict(state_dict) net.to(device) src_text = "Call us." translation, attention_weight_seq = predict_seq2seq( net, src_text, src_vocab, tgt_vocab, num_steps, device, True) # attention_weights = torch.eye(10).reshape((1, 1, 10, 10)) # (num_rows, num_cols, height, width) print(f'translation={translation}') # print(attention_weight_seq.shape) stacked_tensor = torch.stack(attention_weight_seq, dim=0).permute(2, 1, 0, 3) print(stacked_tensor.shape) show_heatmaps( stacked_tensor, xlabel='Attention weight', ylabel='Decode Step', titles=['Head %d' % i for i in range(1, 5)]) else: state_dict = torch.load('model_h.pt') net.load_state_dict(state_dict) net.to(device) C = 0 C1 = 0 for i in range(2000): # print(source[i]) # print(target[i]) translation, attention_weight_seq = predict_seq2seq( net, source[i], src_vocab, tgt_vocab, num_steps, device) score = bleu(translation, target[i], k=2) if score > 0.0: C = C + 1 if score > 0.8: C1 = C1 + 1 print(f'{source[i]} => {translation}, bleu {score:.3f}') print(f'Counter(bleu > 0) = {C}') print(f'Valid-Counter(bleu > 0.8) = {C1}')
我们先看一下TransformerEncoder做了什么:
- 和前面类似,首先输入做了embedding,然后叠加位置编码
- 然后循环计算每一个TransformerEncoderBlock
TransformerEncoderBlock中做了:
- 计算自注意力
- 残差连接和层归一化
- 位置前馈网络
- 残差连接和层归一化
然后我们来看看TransformerDecoder做了什么:
- 和TransformerEncoder类似,首先输入做了embedding,然后叠加位置编码
- 然后循环计算每一个TransformerDecoderBlock
- 最后接一个全连接,映射到词表大小
TransformerDecoderBlock做了:
- 首先准备自注意力的(K_1 V_1),其更新过程是每次输入X的拼接过程
- 将输入X 作为Q,(K_1 V_1)作为KV开始自注意力的运算过程
- 残差连接和层归一化,得到Y
- 将enc_output作为KV, Y作为Q,计算编码器-解码器注意力
- 残差连接和层归一化
- 位置前馈网络
- 残差连接和层归一化
下面是训练和测试的一些结果