仙守
作者仙守·2024-05-14 10:15
算法工程师·苏宁易购

自然语言处理——NLP之预训练语言模型BERT(下)

字数 42359阅读 2300评论 0赞 1

上篇:自然语言处理——NLP之预训练语言模型BERT(上)

5.2 BertModel实现


创建一个文件名称为Bert.py

import torch
from torch.nn.init import normal_
from .BertEmbedding import BertEmbeddings
from .MyTransformer import MyMultiheadAttention
import torch.nn as nn
import os
import logging
from copy import deepcopy


def get_activation(activation_string):
    act = activation_string.lower()
    if act == "linear":
        return None
    elif act == "relu":
        return nn.ReLU()
    elif act == "gelu":
        return nn.GELU()
    elif act == "tanh":
        return nn.Tanh()
    else:
        raise ValueError("Unsupported activation: %s" % act)


class BertSelfAttention(nn.Module):
    """
    实现多头注意力机制,对应的是GoogleResearch代码中的attention_layer方法
    https://github.com/google-research/bert/blob/eedf5716ce1268e56f0a50264a88cafad334ac61/modeling.py#L558
    """

    def __init__(self, config):
        super(BertSelfAttention, self).__init__()
        if 'use_torch_multi_head' in config.__dict__ and config.use_torch_multi_head:
            MultiHeadAttention = nn.MultiheadAttention
        else:
            MultiHeadAttention = MyMultiheadAttention
        self.multi_head_attention = MultiHeadAttention(embed_dim=config.hidden_size,
                                                       num_heads=config.num_attention_heads,
                                                       dropout=config.attention_probs_dropout_prob)

    def forward(self, query, key, value, attn_mask=None, key_padding_mask=None):
        """
        :param query: # [tgt_len, batch_size, hidden_size], tgt_len 表示目标序列的长度
        :param key:  #  [src_len, batch_size, hidden_size], src_len 表示源序列的长度
        :param value: # [src_len, batch_size, hidden_size], src_len 表示源序列的长度
        :param attn_mask: # [tgt_len,src_len] or [num_heads*batch_size,tgt_len, src_len]
        一般只在解码时使用,为了并行一次喂入所有解码部分的输入,所以要用mask来进行掩盖当前时刻之后的位置信息
        在Bert中,attention_mask指代的其实是key_padding_mask,因为Bert主要是基于Transformer Encoder部分构建的,
        所有没有Decoder部分,因此也就不需要用mask来进行掩盖当前时刻之后的位置信息
        :param key_padding_mask: [batch_size, src_len], src_len 表示源序列的长度
        :return:
        attn_output: [tgt_len, batch_size, hidden_size]
        attn_output_weights: # [batch_size, tgt_len, src_len]
        """
        return self.multi_head_attention(query, key, value, attn_mask=attn_mask, key_padding_mask=key_padding_mask)


class BertSelfOutput(nn.Module):
    def __init__(self, config):
        super().__init__()
        # self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=1e-12)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

    def forward(self, hidden_states, input_tensor):
        """
        :param hidden_states: [src_len, batch_size, hidden_size]
        :param input_tensor: [src_len, batch_size, hidden_size]
        :return: [src_len, batch_size, hidden_size]
        """
        # hidden_states = self.dense(hidden_states)  # [src_len, batch_size, hidden_size]
        hidden_states = self.dropout(hidden_states)
        hidden_states = self.LayerNorm(hidden_states + input_tensor)
        return hidden_states


class BertAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.self = BertSelfAttention(config)
        self.output = BertSelfOutput(config)

    def forward(self,
                hidden_states,
                attention_mask=None):
        """
        :param hidden_states: [src_len, batch_size, hidden_size]
        :param attention_mask: [batch_size, src_len]
        :return: [src_len, batch_size, hidden_size]
        """
        self_outputs = self.self(hidden_states,
                                 hidden_states,
                                 hidden_states,
                                 attn_mask=None,
                                 key_padding_mask=attention_mask)
        # self_outputs[0] shape: [src_len, batch_size, hidden_size]
        attention_output = self.output(self_outputs[0], hidden_states)
        return attention_output


class BertIntermediate(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.intermediate_size)
        if isinstance(config.hidden_act, str):
            self.intermediate_act_fn = get_activation(config.hidden_act)
        else:
            self.intermediate_act_fn = config.hidden_act

    def forward(self, hidden_states):
        """
        :param hidden_states: [src_len, batch_size, hidden_size]
        :return: [src_len, batch_size, intermediate_size]
        """
        hidden_states = self.dense(hidden_states)  # [src_len, batch_size, intermediate_size]
        if self.intermediate_act_fn is None:
            hidden_states = hidden_states
        else:
            hidden_states = self.intermediate_act_fn(hidden_states)
        return hidden_states


class BertOutput(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.intermediate_size, config.hidden_size)
        self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=1e-12)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

    def forward(self, hidden_states, input_tensor):
        """
        :param hidden_states: [src_len, batch_size, intermediate_size]
        :param input_tensor: [src_len, batch_size, hidden_size]
        :return: [src_len, batch_size, hidden_size]
        """
        hidden_states = self.dense(hidden_states)  # [src_len, batch_size, hidden_size]
        hidden_states = self.dropout(hidden_states)
        hidden_states = self.LayerNorm(hidden_states + input_tensor)
        return hidden_states


class BertLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.bert_attention = BertAttention(config)
        self.bert_intermediate = BertIntermediate(config)
        self.bert_output = BertOutput(config)

    def forward(self,
                hidden_states,
                attention_mask=None):
        """
        :param hidden_states: [src_len, batch_size, hidden_size]
        :param attention_mask: [batch_size, src_len] mask掉padding部分的内容
        :return: [src_len, batch_size, hidden_size]
        """
        attention_output = self.bert_attention(hidden_states, attention_mask)
        # [src_len, batch_size, hidden_size]
        intermediate_output = self.bert_intermediate(attention_output)
        # [src_len, batch_size, intermediate_size]
        layer_output = self.bert_output(intermediate_output, attention_output)
        # [src_len, batch_size, hidden_size]
        return layer_output


class BertEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.bert_layers = nn.ModuleList([BertLayer(config) for _ in range(config.num_hidden_layers)])

    def forward(
            self,
            hidden_states,
            attention_mask=None):
        """
        :param hidden_states: [src_len, batch_size, hidden_size]
        :param attention_mask: [batch_size, src_len]
        :return:
        """
        all_encoder_layers = []
        layer_output = hidden_states
        '''重复n个编码层 '''
        for i, layer_module in enumerate(self.bert_layers):
            layer_output = layer_module(layer_output,
                                        attention_mask)
            #  [src_len, batch_size, hidden_size]
            all_encoder_layers.append(layer_output)
        return all_encoder_layers


class BertPooler(nn.Module):
    ''' 在将 BertEncoder 部分的输出结果输入到下游任务前,需要将其进行略微的处理, '''
    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.activation = nn.Tanh()
        self.config = config

    def forward(self, hidden_states):
        """
        :param hidden_states:  [src_len, batch_size, hidden_size]
        :return: [batch_size, hidden_size]
        """
        '''来取 BertEncoder输出的第一个位置([cls]位置),例如在进行文本分类时可以取该位置上的结果进行下一步的分类处理 '''
        if self.config.pooler_type == "first_token_transform":
            token_tensor = hidden_states[0, :].reshape(-1, self.config.hidden_size)
        '''是掌柜自己加入的一个选项,表示取所有位置的平均值,当然我们也可以根据自己的需要在添加下面添加其它的方式  '''
        elif self.config.pooler_type == "all_token_average":
            token_tensor = torch.mean(hidden_states, dim=0)
        pooled_output = self.dense(token_tensor)  # [batch_size, hidden_size]
        pooled_output = self.activation(pooled_output)
        return pooled_output  # [batch_size, hidden_size]


def format_paras_for_torch(loaded_paras_names, loaded_paras):
    """
    该函数的作用是将预训练参数格式化成符合torch(1.5.0)框架中MultiHeadAttention的参数形式
    :param loaded_paras_names:
    :param loaded_paras:
    :return:
    """
    qkv_weight_names = ['query.weight', 'key.weight', 'value.weight']
    qkv_bias_names = ['query.bias', 'key.bias', 'value.bias']
    qkv_weight, qkv_bias = [], []
    torch_paras = []
    for i in range(len(loaded_paras_names)):
        para_name_in_pretrained = loaded_paras_names[i]
        para_name = ".".join(para_name_in_pretrained.split('.')[-2:])
        if para_name in qkv_weight_names:
            qkv_weight.append(loaded_paras[para_name_in_pretrained])
        elif para_name in qkv_bias_names:
            qkv_bias.append(loaded_paras[para_name_in_pretrained])
        else:
            torch_paras.append(loaded_paras[para_name_in_pretrained])
        if len(qkv_weight) == 3:
            torch_paras.append(torch.cat(qkv_weight, dim=0))
            qkv_weight = []
        if len(qkv_bias) == 3:
            torch_paras.append(torch.cat(qkv_bias, dim=0))
            qkv_bias = []
    return torch_paras


def replace_512_position(init_embedding, loaded_embedding):
    """
    本函数的作用是当max_positional_embedding > 512时,用预训练模型中的512个向量来
    替换随机初始化的positional embedding中的前512个向量
    :param init_embedding:  初始化的positional embedding矩阵,大于512行
    :param loaded_embedding: 预训练模型中的positional embedding矩阵,等于512行
    :return: 前512行被替换后的初始化的positional embedding矩阵
    """
    logging.info(f"模型参数max_positional_embedding > 512,采用替换处理!")
    init_embedding[:512, :] = loaded_embedding[:512, :]
    return init_embedding


class BertModel(nn.Module):
    """
    """

    def __init__(self, config):
        super().__init__()
        self.bert_embeddings = BertEmbeddings(config)
        self.bert_encoder = BertEncoder(config)
        self.bert_pooler = BertPooler(config)
        self.config = config
        self._reset_parameters()

    def forward(self,
                input_ids=None,
                attention_mask=None,
                token_type_ids=None,
                position_ids=None):
        """
        ***** 一定要注意,attention_mask中,被mask的Token用1(True)表示,没有mask的用0(false)表示
        这一点一定一定要注意
        :param input_ids:  [src_len, batch_size]
        :param attention_mask: [batch_size, src_len] mask掉padding部分的内容
        :param token_type_ids: [src_len, batch_size]  # 如果输入模型的只有一个序列,那么这个参数也不用传值
        :param position_ids: [1,src_len] # 在实际建模时这个参数其实可以不用传值
        :return:
        """
        ''' Input Embedding 后的输出结果,其形状为[src_len, batch_size, hidden_size]; '''
        embedding_output = self.bert_embeddings(input_ids=input_ids,
                                                position_ids=position_ids,
                                                token_type_ids=token_type_ids)
        '''是整个 BERT 编码部分的输出,其中 all_encoder_outputs 为一个包含有 num_hidden_layers 个层的输出 '''
        # embedding_output: [src_len, batch_size, hidden_size]
        all_encoder_outputs = self.bert_encoder(embedding_output,
                                                attention_mask=attention_mask)

        '''处理得到整个 BERT 网络的输出,这里取了最后一层的输出,形状为[src_len, batch_size, hidden_size] '''
        # all_encoder_outputs 为一个包含有num_hidden_layers个层的输出
        sequence_output = all_encoder_outputs[-1]  # 取最后一层

        '''默认是最后一层的第 1 个 token 即[cls]位置经 dense + tanh 后的结果,其形状为[batch_size, hidden_size] '''
        # sequence_output: [src_len, batch_size, hidden_size]
        pooled_output = self.bert_pooler(sequence_output)

        # 默认是最后一层的first token 即[cls]位置经dense + tanh 后的结果
        # pooled_output: [batch_size, hidden_size]
        return pooled_output, all_encoder_outputs

    def _reset_parameters(self):
        r"""Initiate parameters in the transformer model."""
        """
        初始化
        """
        for p in self.parameters():
            if p.dim() > 1:
                normal_(p, mean=0.0, std=self.config.initializer_range)

    @classmethod
    def from_pretrained(cls, config, pretrained_model_dir=None):
        model = cls(config)  # 初始化模型,cls为未实例化的对象,即一个未实例化的BertModel对象
        pretrained_model_path = os.path.join(pretrained_model_dir, "pytorch_model.bin")
        if not os.path.exists(pretrained_model_path):
            raise ValueError(f"<路径:{pretrained_model_path} 中的模型不存在,请仔细检查!>")
        loaded_paras = torch.load(pretrained_model_path)
        state_dict = deepcopy(model.state_dict())
        loaded_paras_names = list(loaded_paras.keys())[:-8]
        model_paras_names = list(state_dict.keys())[1:]
        if 'use_torch_multi_head' in config.__dict__ and config.use_torch_multi_head:
            torch_paras = format_paras_for_torch(loaded_paras_names, loaded_paras)
            for i in range(len(model_paras_names)):
                logging.debug(f"## 成功赋值参数:{model_paras_names[i]},形状为: {torch_paras[i].size()}")
                if "position_embeddings" in model_paras_names[i]:
                    # 这部分代码用来消除预训练模型只能输入小于512个字符的限制
                    if config.max_position_embeddings > 512:
                        new_embedding = replace_512_position(state_dict[model_paras_names[i]],
                                                             loaded_paras[loaded_paras_names[i]])
                        state_dict[model_paras_names[i]] = new_embedding
                        continue
                state_dict[model_paras_names[i]] = torch_paras[i]
            logging.info(f"## 注意,正在使用torch框架中的MultiHeadAttention实现")
        else:
            for i in range(len(loaded_paras_names)):
                logging.debug(f"## 成功将参数:{loaded_paras_names[i]}赋值给{model_paras_names[i]},"
                              f"参数形状为:{state_dict[model_paras_names[i]].size()}")
                if "position_embeddings" in model_paras_names[i]:
                    # 这部分代码用来消除预训练模型只能输入小于512个字符的限制
                    if config.max_position_embeddings > 512:
                        new_embedding = replace_512_position(state_dict[model_paras_names[i]],
                                                             loaded_paras[loaded_paras_names[i]])
                        state_dict[model_paras_names[i]] = new_embedding
                        continue
                state_dict[model_paras_names[i]] = loaded_paras[loaded_paras_names[i]]
            logging.info(f"## 注意,正在使用本地MyTransformer中的MyMultiHeadAttention实现,"
                         f"如需使用torch框架中的MultiHeadAttention模块可通过config.__dict__['use_torch_multi_head'] = True实现")
        '''加载模型 '''
        model.load_state_dict(state_dict)
        return model

if __name__ == '__main__':
   
    json_file = '../bert_base_chinese/config.json'
    config = BertConfig.from_json_file(json_file)
    config.__dict__['use_torch_multi_head'] = True  # 表示使用 torch框架中的MultiHeadAttention 注意力实现方法
    config.max_position_embeddings = 518 # 测试大于512时的情况
    src = torch.tensor([[1, 3, 5, 7, 9, 2, 3], [2, 4, 6, 8, 10, 0, 0]], dtype=torch.long)
    src = src.transpose(0, 1)  # [src_len, batch_size]
    print(f"input shape [src_len,batch_size]: ", src.shape)
    token_type_ids = torch.LongTensor([[0, 0, 0, 1, 1, 1, 1], [0, 0, 1, 1, 1, 0, 0]]).transpose(0, 1)
    attention_mask = torch.tensor([[True, True, True, True, True, True, True],
                                   [True, True, True, True, True, False, False]])
    # attention_mask 实际就是Transformer中指代的key_padding_mask

    # ------ BertEmbedding -------
    bert_embedding = BertEmbeddings(config)
    bert_embedding_result = bert_embedding(src, token_type_ids=token_type_ids)
    # [src_len, batch_size, hidden_size]

    # 测试类BertAttention
    bert_attention = BertAttention(config)
    bert_attention_output = bert_attention(bert_embedding_result, attention_mask=attention_mask)
    print(f"BertAttention output shape [src_len, batch_size, hidden_size]: ", bert_attention_output.shape)

    # 测试类BertLayer
    bert_layer = BertLayer(config)
    bert_layer_output = bert_layer(bert_embedding_result, attention_mask)
    print(f"BertLayer output shape [src_len, batch_size, hidden_size]: ", bert_layer_output.shape)

    # 测试类BertEncoder
    bert_encoder = BertEncoder(config)
    bert_encoder_outputs = bert_encoder(bert_embedding_result, attention_mask)
    print(f"num of BertEncoder [config.num_hidden_layers]: ", len(bert_encoder_outputs))
    print(f"each output shape in BertEncoder [src_len, batch_size, hidden_size]: ", bert_encoder_outputs[0].shape)

    # 测试类BertModel
    position_ids = torch.arange(src.size()[0]).expand((1, -1))  # [1,src_len]
    bert_model = BertModel(config)
    bert_model_output = bert_model(input_ids=src,
                                   attention_mask=attention_mask,
                                   token_type_ids=token_type_ids,
                                   position_ids=position_ids)[0]
    print(f"BertModel's pooler output shape [batch_size, hidden_size]: ", bert_model_output.shape)
    print("\\n  =======  BertMolde 参数: ========")
    for param_tensor in bert_model.state_dict():
        print(param_tensor, "\\t", bert_model.state_dict()[param_tensor].size())

    print(f"\\n  =======  测试BertModel载入预训练模型: ========")
    model = BertModel.from_pretrained(config, pretrained_model_dir="../bert_base_chinese")

5.3 Bert进行文本分类的代码解析

基于 BERT的文本分类(准确的是单文本,也就是输入只包含一个句子)模型就是在原始的 BERT 模型后再加上一个分类层即可。同时,对于分类层的输入(也就是原始 BERT 的输出),默认情况下取 BERT输出结果中[CLS]位置对于的向量即可,当然也可以修改为其它方式,例如所有位置向量的均值等(将配置文件 config.json 中的 pooler_type 字段设置为"all_token_average"即可)。因此,对于基于 BERT 的文本分类模型来说其输入就是 BERT 的输入,输出则是每个类别对应的 logits 值。

  • 由于对于文本分类这个场景来说其输入只有一个序列,所以在构建数据集的时候并不需要构造 Segment Embedding 的输入,直接默认使用全为 0 即可
  • 同时,对于 Position Embedding 来说在任何场景下都不需要对其指定输入,因为我们在代码实现时已经做了相应默认时的处理
  • 因此,对于文本分类这个场景来说,只需要构造原始文本对应的 Token序列,并在首尾分别再加上一个[CLS]符和[SEP]符作为输入即可

5.3.1 数据集预览

数据集是今日头条开放的一个新闻分类数据集[13],一共包含有 382688 条数据,15 个类别。同时掌柜已近将其进行了格式化处理,以 7:2:1 的比例划分成了训练集、验证集和测试集 3 个部分。假如我们现在有两个样本构成了一个 batch,那么其整个数据的处理过程。

  • 第 1步需要将原始的数据样本进行分字(tokenize)处理;
  • 第2 步再根据 tokenize 后的结果构造一个字典,不过在使用 BERT 预训练时并不需要我们自己来构造这个字典,直接载入谷歌开源的 vocab.txt 文件构造字典即可,因为只有 vocab.txt 中每个字的索引顺序才与开源模型中每个字的Embedding 向量一一对应的。
  • 第 3 步则是根据字典将 tokenize 后的文本序列转换为 Token 序列,同时在 Token 序列的首尾分别加上[CLS]和[SEP]符号,并进行 Padding。
  • 第4 步则是根据第 3 步处理后的结果生成对应的 Padding Mask 向量。
  • 最后,在模型训练时只需要将第 3 步和第 4 步处理后的结果一起喂给模型即可

5.3.2 数据集构建
1)定义tokenize
因为预训练模型打算用huggingface提供的bert,所以直接用对应的BertTokenizer方法

import torch
from torch.utils.data import DataLoader
from tqdm import tqdm
import pandas as pd
import json
import logging
import os
from sklearn.model_selection import train_test_split
import collections
import six


class Vocab:
    """
    根据本地的vocab文件,构造一个词表
    vocab = Vocab()
    print(vocab.itos)  # 得到一个列表,返回词表中的每一个词;
    print(vocab.itos[2])  # 通过索引返回得到词表中对应的词;
    print(vocab.stoi)  # 得到一个字典,返回词表中每个词的索引;
    print(vocab.stoi['我'])  # 通过单词返回得到词表中对应的索引
    print(len(vocab))  # 返回词表长度
    """
    UNK = '[UNK]'

    def __init__(self, vocab_path):
        self.stoi = {}
        self.itos = []
        with open(vocab_path, 'r', encoding='utf-8') as f:
            for i, word in enumerate(f):
                w = word.strip('\\n')
                self.stoi[w] = i
                self.itos.append(w)

    def __getitem__(self, token):
        return self.stoi.get(token, self.stoi.get(Vocab.UNK))

    def __len__(self):
        return len(self.itos)


def build_vocab(vocab_path):
    """
    vocab = Vocab()
    print(vocab.itos)  # 得到一个列表,返回词表中的每一个词;
    print(vocab.itos[2])  # 通过索引返回得到词表中对应的词;
    print(vocab.stoi)  # 得到一个字典,返回词表中每个词的索引;
    print(vocab.stoi['我'])  # 通过单词返回得到词表中对应的索引
    """
    return Vocab(vocab_path)


def pad_sequence(sequences, batch_first=False, max_len=None, padding_value=0):
    """
    对一个List中的元素进行padding
    Pad a list of variable length Tensors with ``padding_value``
    a = torch.ones(25)
    b = torch.ones(22)
    c = torch.ones(15)
    pad_sequence([a, b, c],max_len=None).size()
    torch.Size([25, 3])
        sequences:
        batch_first: 是否把batch_size放到第一个维度
        padding_value:
        max_len :
                当max_len = 50时,表示以某个固定长度对样本进行padding,多余的截掉;
                当max_len=None是,表示以当前batch中最长样本的长度对其它进行padding;
    Returns:
    """
    if max_len is None:
        max_len = max([s.size(0) for s in sequences])
    out_tensors = []
    for tensor in sequences:
        if tensor.size(0) < max_len:
            tensor = torch.cat([tensor, torch.tensor([padding_value] * (max_len - tensor.size(0)))], dim=0)
        else:
            tensor = tensor[:max_len]
        out_tensors.append(tensor)
    out_tensors = torch.stack(out_tensors, dim=1)
    if batch_first:
        return out_tensors.transpose(0, 1)
    return out_tensors


def cache(func):
    """
    本修饰器的作用是将SQuAD数据集中data_process()方法处理后的结果进行缓存,下次使用时可直接载入!
    :param func:
    :return:
    """

    def wrapper(*args, **kwargs):
        filepath = kwargs['filepath']
        postfix = kwargs['postfix']
        data_path = filepath.split('.')[0] + '_' + postfix + '.pt'
        if not os.path.exists(data_path):
            logging.info(f"缓存文件 {data_path} 不存在,重新处理并缓存!")
            data = func(*args, **kwargs)
            with open(data_path, 'wb') as f:
                torch.save(data, f)
        else:
            logging.info(f"缓存文件 {data_path} 存在,直接载入缓存文件!")
            with open(data_path, 'rb') as f:
                data = torch.load(f)
        return data

    return wrapper


class LoadSingleSentenceClassificationDataset:
    def __init__(self,
                 vocab_path='./vocab.txt',  #
                 tokenizer=None,
                 batch_size=32,
                 max_sen_len=None,
                 split_sep='\\n',
                 max_position_embeddings=512,
                 pad_index=0,
                 is_sample_shuffle=True
                 ):

        """
        :param vocab_path: 本地词表vocab.txt的路径
        :param tokenizer:
        :param batch_size:
        :param max_sen_len: 在对每个batch进行处理时的配置;
                            当max_sen_len = None时,即以每个batch中最长样本长度为标准,对其它进行padding
                            当max_sen_len = 'same'时,以整个数据集中最长样本为标准,对其它进行padding
                            当max_sen_len = 50, 表示以某个固定长度符样本进行padding,多余的截掉;
        :param split_sep: 文本和标签之前的分隔符,默认为'\\t'
        :param max_position_embeddings: 指定最大样本长度,超过这个长度的部分将本截取掉
        :param is_sample_shuffle: 是否打乱训练集样本(只针对训练集)
                在后续构造DataLoader时,验证集和测试集均指定为了固定顺序(即不进行打乱),修改程序时请勿进行打乱
                因为当shuffle为True时,每次通过for循环遍历data_iter时样本的顺序都不一样,这会导致在模型预测时
                返回的标签顺序与原始的顺序不一样,不方便处理。
        """
        self.tokenizer = tokenizer
        self.vocab = build_vocab(vocab_path)
        self.PAD_IDX = pad_index
        self.SEP_IDX = self.vocab['[SEP]']
        self.CLS_IDX = self.vocab['[CLS]']
        # self.UNK_IDX = '[UNK]'

        self.batch_size = batch_size
        self.split_sep = split_sep
        self.max_position_embeddings = max_position_embeddings
        if isinstance(max_sen_len, int) and max_sen_len > max_position_embeddings:
            max_sen_len = max_position_embeddings
        self.max_sen_len = max_sen_len
        self.is_sample_shuffle = is_sample_shuffle

    @cache
    def data_process(self, filepath, postfix='cache'):
        """
        将每一句话中的每一个词根据字典转换成索引的形式,同时返回所有样本中最长样本的长度
        :param filepath: 数据集路径
        :return:
        """
        raw_iter = open(filepath, encoding="utf8").readlines()
        data = []
        max_len = 0
        for raw in tqdm(raw_iter, ncols=80):
            line = raw.rstrip("\\n").split(self.split_sep)
            s, l = line[0], line[1]
            tmp = [self.CLS_IDX] + [self.vocab[token] for token in self.tokenizer(s)]
            if len(tmp) > self.max_position_embeddings - 1:
                tmp = tmp[:self.max_position_embeddings - 1]  # BERT预训练模型只取前512个字符
            tmp += [self.SEP_IDX]
            tensor_ = torch.tensor(tmp, dtype=torch.long)
            l = torch.tensor(int(l), dtype=torch.long)
            max_len = max(max_len, tensor_.size(0))
            data.append((tensor_, l))
        return data, max_len

    def load_train_val_test_data(self, train_file_path=None,
                                 val_file_path=None,
                                 test_file_path=None,
                                 only_test=False):
        postfix = str(self.max_sen_len)
        test_data, _ = self.data_process(filepath=test_file_path, postfix=postfix)
        test_iter = DataLoader(test_data, batch_size=self.batch_size,
                               shuffle=False, collate_fn=self.generate_batch)
        if only_test:
            return test_iter
        train_data, max_sen_len = self.data_process(filepath=train_file_path,
                                                    postfix=postfix)  # 得到处理好的所有样本
        if self.max_sen_len == 'same':
            self.max_sen_len = max_sen_len
        val_data, _ = self.data_process(filepath=val_file_path,
                                        postfix=postfix)
        train_iter = DataLoader(train_data, batch_size=self.batch_size,  # 构造DataLoader
                                shuffle=self.is_sample_shuffle, collate_fn=self.generate_batch)
        val_iter = DataLoader(val_data, batch_size=self.batch_size,
                              shuffle=False, collate_fn=self.generate_batch)
        return train_iter, test_iter, val_iter

    def generate_batch(self, data_batch):
        batch_sentence, batch_label = [], []
        for (sen, label) in data_batch:  # 开始对一个batch中的每一个样本进行处理。
            batch_sentence.append(sen)
            batch_label.append(label)
        batch_sentence = pad_sequence(batch_sentence,  # [batch_size,max_len]
                                      padding_value=self.PAD_IDX,
                                      batch_first=False,
                                      max_len=self.max_sen_len)
        batch_label = torch.tensor(batch_label, dtype=torch.long)
        return batch_sentence, batch_label

测试上面的代码

from Tasks.TaskForSingleSentenceClassification import ModelConfig
from utils.data_helpers import LoadSingleSentenceClassificationDataset
from transformers import BertTokenizer

if __name__ == '__main__':
    model_config = ModelConfig()
    load_dataset = LoadSingleSentenceClassificationDataset(
        vocab_path=model_config.vocab_path,
        tokenizer=BertTokenizer.from_pretrained(model_config.pretrained_model_dir).tokenize,
        batch_size=model_config.batch_size,
        max_sen_len=model_config.max_sen_len,
        split_sep=model_config.split_sep,
        max_position_embeddings=model_config.max_position_embeddings,
        pad_index=model_config.pad_token_id,
        is_sample_shuffle=model_config.is_sample_shuffle)

    train_iter, test_iter, val_iter = \\
        load_dataset.load_train_val_test_data(model_config.train_file_path,
                                              model_config.val_file_path,
                                              model_config.test_file_path)
    for sample, label in train_iter:
        print(sample.shape)  # [seq_len,batch_size]
        print(sample.transpose(0, 1))
        padding_mask = (sample == load_dataset.PAD_IDX).transpose(0, 1)
        print(padding_mask)
        # print(label)
        break

5.3.3下游任务
创建一个文件名称为BertForSentenceClassification.py
在bert输出的基础上增加dropout层和一层dnn层用于分类

from ..BasicBert.Bert import BertModel
import torch.nn as nn


class BertForSentenceClassification(nn.Module):
    def __init__(self, config, bert_pretrained_model_dir=None):
        super(BertForSentenceClassification, self).__init__()
        self.num_labels = config.num_labels
        if bert_pretrained_model_dir is not None:
            self.bert = BertModel.from_pretrained(config, bert_pretrained_model_dir)
        else:
            self.bert = BertModel(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, self.num_labels)

    def forward(self, input_ids,
                attention_mask=None,
                token_type_ids=None,
                position_ids=None,
                labels=None):
        """
        :param input_ids:  [src_len, batch_size]
        :param attention_mask: [batch_size, src_len]
        :param token_type_ids: 句子分类时为None
        :param position_ids: [1,src_len]
        :param labels: [batch_size,]
        :return:
        """
        pooled_output, _ = self.bert(input_ids=input_ids,
                                     attention_mask=attention_mask,
                                     token_type_ids=token_type_ids,
                                     position_ids=position_ids)  # [batch_size,hidden_size]

        '''将模型输出输入到dropout层和dnn层,label有则输出loss,无则直接输出logit '''
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)  # [batch_size, num_label]
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            return loss, logits
        else:
            return logits

5.3.4train和inference
创建一个文件名称为TaskForSingleSentenceClassification.py

import sys

sys.path.append('../')
from model import BertForSentenceClassification
from model import BertConfig
from utils import LoadSingleSentenceClassificationDataset
from utils import logger_init
from transformers import BertTokenizer
import logging
import torch
import os
import time


class ModelConfig:
    def __init__(self):
        self.project_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
        self.dataset_dir = os.path.join(self.project_dir, 'data', 'SingleSentenceClassification')
        self.pretrained_model_dir = os.path.join(self.project_dir, "bert_base_chinese")
        self.vocab_path = os.path.join(self.pretrained_model_dir, 'vocab.txt')
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        self.train_file_path = os.path.join(self.dataset_dir, 'toutiao_train.txt')
        self.val_file_path = os.path.join(self.dataset_dir, 'toutiao_val.txt')
        self.test_file_path = os.path.join(self.dataset_dir, 'toutiao_test.txt')
        self.model_save_dir = os.path.join(self.project_dir, 'cache')
        self.logs_save_dir = os.path.join(self.project_dir, 'logs')
        self.split_sep = '_!_'
        self.is_sample_shuffle = True
        self.batch_size = 64
        self.max_sen_len = None
        self.num_labels = 15
        self.epochs = 10
        self.model_val_per_epoch = 2
        logger_init(log_file_name='single', log_level=logging.INFO,
                    log_dir=self.logs_save_dir)
        if not os.path.exists(self.model_save_dir):
            os.makedirs(self.model_save_dir)

        # 把原始bert中的配置参数也导入进来
        bert_config_path = os.path.join(self.pretrained_model_dir, "config.json")
        bert_config = BertConfig.from_json_file(bert_config_path)
        for key, value in bert_config.__dict__.items():
            self.__dict__[key] = value
        # 将当前配置打印到日志文件中
        logging.info(" ### 将当前配置打印到日志文件中 ")
        for key, value in self.__dict__.items():
            logging.info(f"###  {key} = {value}")


def train(config):
    '''在bert基础上增加dnn层,然后整个网络进行微调 '''
    model = BertForSentenceClassification(config,
                                          config.pretrained_model_dir)
    model_save_path = os.path.join(config.model_save_dir, 'model.pt')
    if os.path.exists(model_save_path):
        loaded_paras = torch.load(model_save_path)
        model.load_state_dict(loaded_paras)
        logging.info("## 成功载入已有模型,进行追加训练......")
    model = model.to(config.device)
    optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)
    model.train()
    bert_tokenize = BertTokenizer.from_pretrained(config.pretrained_model_dir).tokenize
    data_loader = LoadSingleSentenceClassificationDataset(vocab_path=config.vocab_path,
                                                          tokenizer=bert_tokenize,
                                                          batch_size=config.batch_size,
                                                          max_sen_len=config.max_sen_len,
                                                          split_sep=config.split_sep,
                                                          max_position_embeddings=config.max_position_embeddings,
                                                          pad_index=config.pad_token_id,
                                                          is_sample_shuffle=config.is_sample_shuffle)
    train_iter, test_iter, val_iter = data_loader.load_train_val_test_data(config.train_file_path,
                                                                           config.val_file_path,
                                                                           config.test_file_path)
    max_acc = 0
    for epoch in range(config.epochs):
        losses = 0
        start_time = time.time()
        for idx, (sample, label) in enumerate(train_iter):
            sample = sample.to(config.device)  # [src_len, batch_size]
            label = label.to(config.device)
            padding_mask = (sample == data_loader.PAD_IDX).transpose(0, 1)
            loss, logits = model(
                input_ids=sample,
                attention_mask=padding_mask,
                token_type_ids=None,
                position_ids=None,
                labels=label)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            losses += loss.item()
            acc = (logits.argmax(1) == label).float().mean()
            if idx % 10 == 0:
                logging.info(f"Epoch: {epoch}, Batch[{idx}/{len(train_iter)}], "
                             f"Train loss :{loss.item():.3f}, Train acc: {acc:.3f}")
        end_time = time.time()
        train_loss = losses / len(train_iter)
        logging.info(f"Epoch: {epoch}, Train loss: {train_loss:.3f}, Epoch time = {(end_time - start_time):.3f}s")
        if (epoch + 1) % config.model_val_per_epoch == 0:
            acc = evaluate(val_iter, model, config.device, data_loader.PAD_IDX)
            logging.info(f"Accuracy on val {acc:.3f}")
            if acc > max_acc:
                max_acc = acc
                torch.save(model.state_dict(), model_save_path)


def inference(config):
    model = BertForSentenceClassification(config,
                                          config.pretrained_model_dir)
    model_save_path = os.path.join(config.model_save_dir, 'model.pt')
    if os.path.exists(model_save_path):
        loaded_paras = torch.load(model_save_path)
        model.load_state_dict(loaded_paras)
        logging.info("## 成功载入已有模型,进行预测......")
    model = model.to(config.device)
    data_loader = LoadSingleSentenceClassificationDataset(vocab_path=config.vocab_path,
                                                          tokenizer=BertTokenizer.from_pretrained(
                                                              config.pretrained_model_dir).tokenize,
                                                          batch_size=config.batch_size,
                                                          max_sen_len=config.max_sen_len,
                                                          split_sep=config.split_sep,
                                                          max_position_embeddings=config.max_position_embeddings,
                                                          pad_index=config.pad_token_id,
                                                          is_sample_shuffle=config.is_sample_shuffle)
    train_iter, test_iter, val_iter = data_loader.load_train_val_test_data(config.train_file_path,
                                                                           config.val_file_path,
                                                                           config.test_file_path)
    acc = evaluate(test_iter, model, device=config.device, PAD_IDX=data_loader.PAD_IDX)
    logging.info(f"Acc on test:{acc:.3f}")


def evaluate(data_iter, model, device, PAD_IDX):
    model.eval()
    with torch.no_grad():
        acc_sum, n = 0.0, 0
        for x, y in data_iter:
            x, y = x.to(device), y.to(device)
            padding_mask = (x == PAD_IDX).transpose(0, 1)
            '''输出模型预测的概率值 '''
            logits = model(x, attention_mask=padding_mask)
            acc_sum += (logits.argmax(1) == y).float().sum().item()
            n += len(y)
        model.train()
        return acc_sum / n


if __name__ == '__main__':
    model_config = ModelConfig()
    train(model_config)
    inference(model_config)

5.4文本蕴含任务

所谓文本对分类指的就是同时给模型输入两句话,然后让模型来判断两句话之间的关系,所以本质上也就变成了一个文本分类任务。总的来说,基于 BERT 的文本蕴含任务同第 4 节中介绍的单文本分类任务本质上没有任何不同,最终都是对一个文本序列进行分类。只是按照 BERT 模型的思想,文本对分类任务在数据集的构建过程中需要通过 Segment Embedding来区分前后两个不同的序列,并且两个句子之间需要通过一个[SEP]符号来进行分割,因此本节内容的核心就在于如何构建数据集。文本对的分类任务除了在模型输入上发生了变换,其它地方均与单文本分类任务一样,同样也是取最后一层的[CLS]向量进行分类。

5.5 Swag选择任务

通常来说,在 NLP 领域的很多场景中模型最后所做的基本上都是一个分类任务,虽然表面上看起来不是。例如:文本蕴含任务其实就是将两个序列拼接在一起,然后预测其所属的类别;基于神经网络的序列生成模型(翻译、文本生成等)本质就是预测词表中下一个最有可能出现的词,此时的分类类别就是词表的大小。因此,从本质上来说本文介绍的问答选择任务以及在后面将要介绍的问题回答任务其实都是一个分类任务,而关键的地方就在于如何构建模型的输入和输出。

从图中可以看出,原始数据的形式是一个问题和四个选项,模型需要做的就是从四个选项中给出最合理的一个,于是也就变成了一个四分类任务。同时,构建模型输入的方式就是将原始问题和每一个答案都拼接起来构成一个序列中间用[SEP]符号隔开,然后再分别输入到 BERT 模型中进行特征提取得到四个特征向量形状为[4,hidden_size],最后再经过一个分类层进行分类处理得到预测选项。值得一提的是,通常情况下这里的四个特征都是直接取每个序列经 BERT编码后的[CLS]向量。

5.6 SQuAD问答任务

所谓问题回答指的就是同时给模型输入一个问题和一段描述,最后需要模型从给定的描述中预测出问题答案所在的位置(text span)。

在做这个任务之前首先需要明白的就是:

  • ①最终问题的答案一定是在给定的描述中;
  • ②问题再描述中的答案一定是一段连续的字符,不能有间隔。例如对于上面的描述内容来说,如果给出的问题是“苏轼生活在什么年代以及他是哪里人?”,那么模型最终并不会给出类似“北宋”和“眉州眉山人”这两个分离的答案,最好的情况下便是给出“北宋著名的文学家与政治家,眉州眉山人”这一个连续的答案。

在有了这两个限制条件后,对于这类问答任务的本质也就变成了需要让模型预测得到答案在描述中的起始位置(start position)以及它的结束位置(end position)。所以,问题最终又变成了如何在 BERT 模型的基础上再构建一个分类器来对 BERT 最后一层输出的每个 Token 进行分类,判断它们是否属于 start position 或者是 end position。
构建模型输入的方式就是将原始问题和上下文描述拼接成一个序列中间用[SEP]符号隔开,然后再分别输入到 BERT 模型中进行特征提取。在BERT 编码完成后,再取最后一层的输出对每个 Token 进行分类即可得到 start position 和 end position 的预测输出。

  • 值得注意的是在问答场景中是将问题放在上下文描述前面的,即Sentence A 为问题,Sentence B 为描述。而在上一个问题选择任务场景(Swag选择)中是将答案放在描述之后。 猜测这是因为在 SWAG 这一推理数据集中,每个选项其实都可以看作是问题(描述)的下半句,两者具有强烈的先后顺序算是一种逻辑推理,因此将选项放在了描述了后面。
  • 在问题回答这一场景中,论文中将问题放在描述前面猜测是因为:①两者并没有强烈的先后顺序;②问题相对较短放到前面可能好处理一点。所以基于这样的考虑,在问答任务中将问题放在了描述前面。不过后续大家依旧可以尝试交换一下顺序看看效果。

5.6.1数据输入介绍
如果在建模时碰到上下文过长时的情况该怎么办?是直接采取截断处理吗?如果问题答案恰巧是在被截断的那部分里面呢,还能直接截断吗?显然,认真想一想截断这种做法在这里肯定是不行的,因此在论文中作者也采用了另外一种方法来解决这一问题,那就是滑动窗口

第①步需要做的是根据指定最大长度和滑动窗口大小将原始样本进行滑动窗口处理并得到多个子样本。不过这里需要注意的是,sentence A,也就是问题部分不参与滑动处理。同时,图中样本右边的3 列数字分别表示每个子样本的起始、结束索引和原始样本对应的数据集中的ID(如其中的9,13是“[CLS]苏轼哪里人?[SEP],眉州眉山人”这句话从0开始算,即“眉州眉山人”)。紧接着第②步便是将所有原始样本滑动处理后的结果作为训练集来训练模型。
总的来说,在这一场景中训练程并没有太大的问题,因为每个子样本也都有其对应的标签值,因此和普通的训练过程并没有什么本质上的差异。因此,最关键的地方在于如何在推理过程中也使用滑动窗口。
5.6.2 结果筛选
一种最直观的做法就是直接取起始位置预测概率值加结束位置预测概率值最大的子样本对应的结果,作为整个原始样本对应的预测结果。不过下面将来介绍另外一种效果更好的处理方式(这也是论文中所采取的方式),其整个处理流程如图所示。

如图所示,在推理过程中第①步要做的仍旧是需要根据指定最大长度和滑动窗口大小将原始样本进行滑动窗口处理。接着第②步便是根据 BERT 分类的输出取前 K 个概率值最大的结果。在图中这里的 K 值为 4,因此对于每个子样本来说其 start position 和 end position 分别都有 4 个候选结果。例如,
第②步中第 1行的 7:0.41,10:02,9:0.12,2:01表示函数就是对于第 1个子样本来说,start position 为索引 7 的概率值为 0.41,其它同理。这样对于每一个子样本来说,在分别得到 start position 和 end position的 K 个候选值后便可以通过组合来得到更多的候选预测结果,然后再根据一些规则来选择最终原始样本对应的预测输出。
根据图中样本重构后的结果可以看出:

  • (1)最终的索引预测结果肯定是大于 8 的,因为答案只可能在上下文中出现;
  • (2)在进行结果组合的过程中,起始索引肯定是小于等于结束索引的。

因此,根据这两个条件在经过步骤③的处理后,便可以得到进一步的筛选结果。例如,对于第 1 个子样本来说,start position 中 7 和 2 是不满足条件(1)的,所以可以直接去掉;同时,为了满足第(2)个条件所以在 end position 中 8,6,7 均需要去掉。
进一步,将第③步处理后的结果在每个子样本内部进行组合,并按照 start position 加 end position 值的大小进行排序,便可以得到如下图所示的结果

如图所示表示根据概率和排序后的结果。例如第 1 列 9,13,0.65 的含义便是最终原始样本预测结果为 9,13 的概率值为 0.65。因此,最终该原始样本对应的预测值便可以取 9 和 13。

5.7命名实体识别任务

所谓命名体指的是给模型输入一句文本,最后需要模型将其中的实体(例如人名、地名、组织等等)标记出来

对于任意一个 NLP 任务来说最后所要完成的基本上都是一个分类任务,尽管表面上看起来可能不太像。根据给出的标签来看,对于原始句子中的每个字符来说其都有一个对应的类别标签,因此对于 NER 任务来说只需对原始句子里每个字符进行分类即可,然后再将预测后的结果进行后处理便能够得到句子中存在的相应实体
5.7.1任务构造原理

从图中可以看出原始数据输入为一个句子,我们只需要在句子的首尾分别加上[CLS]和[SEP],然后输入到模型当中进行特征提取并最终通过一个分类层对输出的每个Token进行分类即可,最后只需要对各个 Token的预测结果进行后处理便能够实现整个 NER 任务。

5.8 从零实现NSP和MlM预训练任务

6、huggingface例子

参照Huggingface简介及BERT代码浅析中的例子

import torch
from transformers import BertModel, BertTokenizer

# 这里我们调用bert-base模型,同时模型的词典经过小写处理
model_name = 'bert-base-uncased'

# 读取模型对应的tokenizer
tokenizer = BertTokenizer.from_pretrained(model_name)

# 载入模型
model = BertModel.from_pretrained(model_name)

# 输入文本
input_text = "Here is some text to encode"

# 通过tokenizer把文本变成 token_id
input_ids = tokenizer.encode(input_text, add_special_tokens=True)
# input_ids: [101, 2182, 2003, 2070, 3793, 2000, 4372, 16044, 102]
input_ids = torch.tensor([input_ids])

# 获得BERT模型最后一个隐层结果
with torch.no_grad():
    last_hidden_states = model(input_ids)[0]  # Models outputs are now tuples

""" tensor([[[-0.0549,  0.1053, -0.1065,  ..., -0.3550,  0.0686,  0.6506],
         [-0.5759, -0.3650, -0.1383,  ..., -0.6782,  0.2092, -0.1639],
         [-0.1641, -0.5597,  0.0150,  ..., -0.1603, -0.1346,  0.6216],
         ...,
         [ 0.2448,  0.1254,  0.1587,  ..., -0.2749, -0.1163,  0.8809],
         [ 0.0481,  0.4950, -0.2827,  ..., -0.6097, -0.1212,  0.2527],
         [ 0.9046,  0.2137, -0.5897,  ...,  0.3040, -0.6172, -0.1950]]]) 
    shape: (1, 9, 768)     
"""

以tokenization开头的都是跟vocab有关的代码,比如在 tokenization_bert.py 中有函数如whitespace_tokenize,还有不同的tokenizer的类。同时也有各个模型对应的vocab.txt。从第一个链接进去就是bert-base-uncased的词典,这里面有30522个词,对应着config里面的vocab_size。
https://github.com/huggingface/transformers/blob/main/src/transformers/models/bert/tokenization_bert.py

其中,第0个token是[pad],第101个token是[CLS],第102个token是[SEP],所以之前我们encode得到的 [101, 2182, 2003, 2070, 3793, 2000, 4372, 16044, 102] ,其实tokenize后convert前的token就是 ['[CLS]', 'here', 'is', 'some', 'text', 'to', 'en', '##code', '[SEP]'],这是通过子词化得到的,及encode切分成了更小的子词,中文就是切词后的结果
读取一个预训练过的BERT模型,来encode我们指定的一个文本,对文本的每一个token生成768维的向量。如果是二分类任务,我们接下来就可以把第一个token也就是[CLS]的768维向量,接一个linear层,预测出分类的logits,或者根据标签进行训练。

参考文献:
论文解读:BERT模型及fine-tuning
最强预训练模型BERT的Pytorch实现(非官方)
Huggingface简介及BERT代码浅析
bert可以做哪些nlp任务
月来客栈
codertimo/BERT-pytorch
莫凡-BERT 双向语言模型

如果觉得我的文章对您有用,请点赞。您的支持将鼓励我继续创作!

1

添加新评论0 条评论

Ctrl+Enter 发表

作者其他文章

相关文章

相关问题

相关资料

X社区推广