【NER】命名实体识别项目实战

发布于 26 天前  72 次阅读


命名实体识别(Named Entity Recognition, NER)是一种自然语言处理(NLP)任务,旨在识别文本中具有特定意义的实体并将其分类。常见的实体类型包括:

  • 人名(Person Names)
  • 地名(Location Names)
  • 组织机构名(Organization Names)
  • 时间表达(Temporal Expressions)
  • 货币单位(Monetary Units)

NER 通常用于信息提取、问答系统和文本分析等应用中。通过识别文本中的命名实体,可以更好地理解其内容和结构,从而支持更高级的自然语言处理任务。

数据标注

在命名实体识别(NER)任务中,数据标注是至关重要的一步,它涉及到对文本中的特定实体进行分类和标记。标注的符号通常包括以下几种:

  1. B(Begin):表示实体的开始部分。
  2. I(Inside):表示实体的内部部分,用于连续的字符中除了开始和结束的其他部分。
  3. E(End):表示实体的结束部分。
  4. S(Single):表示单个字符的实体。如果实体只由一个字组成,那么这个字会被标记为S。
  5. O(Outside):表示不属于任何实体的字符或词。

例如:右图就是一个标注的例子,如果是一个字表示一个实体,比如“赣”,那么将会标注为S。

M和I都表示是一个词的中间部分,但有的python包不支持M标签,所以按实际情况来吧。

评价指标

例如上图有五个预测的标签,可以看到有位置对但标签错,多或者少预测了一个字,以及预测到了其他的不应该出现的实体。

对于这么多种预测值的打分标准,是要根据实际需求来的,例如有的只要求预测对了位置就行,标签无所谓,那么pre1就可以得分。

目前常用的指标

目前常用的指标是seqeval.metrics包里的f1_score分数。

要介绍这个f1_score分数,就要先从准确率(accuracy),精确率(precision)和召回率(recall)开始说起。

准确率(accuracy_score):即将文本中所有的文字对应标签的预测正确率做统计,一般不采用准确率,因为准确率会虚高。例如一个含五百字的文本,实体只有五个字,那么即使全部预测为O,那准确率也有90%。

精确率(precision_score):即只对实体部分的文字对应的标签预测正确率做统计,这种的可行度就很高,例如一个含五百字的文本,有一个五字实体(正确标签是BMMME),预测是BMMEE,那么精确率就是4/5

召回率(recall_score):即按照实体个数为单位进行正确率的统计,例如上一个例子即使五个字对了四个,没有全队也依然算全错,例如一个含五百字的文本,有三个实体,其中有两个实体的标签完全预测正确,那么召回率就是2/3

f1_score:该指标由精确率和召回率决定,计算公式为f1_score = (2 * 精确率 * 召回率 )/ ( 精确率 + 召回率),所以该指标是较为公正的一个指标。

代码实现

采用的是Bert+BiLSTM+CRF

另外我发现两个特别无语的问题,第一个是显示bert模型导入失败,我一开始以为是版本过高,结果是pillow库和这个冲突了。第二个是显示接受到的数据是空列表,也就是没读到数据,然后找了一万遍发现我原来的数据格式是beems,我手动改后缀成txt格式后虽然能打开看到,但机器不认,必须用格式转换。

				
					'''
torch==1.9.1
transformers == 4.18.0
seqeval == 1.2.2
pytorch-crf == 0.7.2
'''
import os
from torch.utils.data import Dataset,DataLoader
from transformers import BertTokenizer
from transformers import BertModel
# from transformers import AdamW
import torch
import torch.nn as nn
from seqeval.metrics import accuracy_score as seq_accuracy_score
from seqeval.metrics import f1_score as seq_f1_score
from seqeval.metrics import precision_score as seq_precision_score
from seqeval.metrics import recall_score as seq_recall_score
from torchcrf import CRF


def read_data(file):
    with open(file, "r", encoding="utf-8") as f:  # 默认用utf-8读取,如果是乱码改成GBK试试
        all_data = f.read().split("\n")  # 一行一行读进来

    all_text = []
    all_label = []

    text = []
    label = []
    for data in all_data:
        if data == "":  # 如果该行为空,说明一个句子借宿了,那就把这个句子添加进all_text列表里,标签同理
            all_text.append(text)
            all_label.append(label)
            text = []  # 每读进去一个句子清空,为读取下一个句子做准备
            label = []
        else:  # 如果该行不为空,那说明这个句子还没完,用空格分隔字和标签,然后存储到text和label列表中
            t, l = data.split(" ")
            text.append(t)
            label.append(l)

    return all_text, all_label


def build_label(train_label):  # BMSE这些标签机器看不懂,要转换成0123这种机器看的懂的符号
    label_2_index = {"PAD": 0, "UNK": 1}  # 额外增加PAD标注和UNK未知标注
    for label in train_label:
        for l in label:
            if l not in label_2_index:
                label_2_index[l] = len(label_2_index)  # 构建标注符号-下标的索引
    return label_2_index, list(label_2_index)  # list是把所有的标签放在一个列表里了,而列表天生带有下标,因此后期可以用来做index_2_label


class BertDataset(Dataset):  #
    def __init__(self, all_text, all_label, label_2_index, max_len, tokenizer):
        self.all_text = all_text
        self.all_label = all_label
        self.label_2_index = label_2_index
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __getitem__(self, index):
        text = self.all_text[index]
        label = self.all_label[index][:self.max_len]  # 后面这个是截断的意思,因为文本在bert编码器那里截断了,这里也要截断
        # 参数分别为:需要编码的文本,是否添加特殊字符(开头结尾符等),最大长度(因为有开头结尾符所以要+2),按最长句长度填充,允许按最长句截断,返回类型为pytorch的tensor型
        text_index = self.tokenizer.encode(text, add_special_tokens=True, max_length=self.max_len + 2,
                                           padding="max_length", truncation=True, return_tensors="pt")
        # 开头结尾符标志为0,1是未知符的标号,所以get(l,1)就是如果见过就取下标为l的标志,没见过就去下标为1(UNK)的标志,最后再加上填充的PAD字符
        label_index = [0] + [self.label_2_index.get(l, 1) for l in label] + [0] + [0] * (max_len - len(text))

        label_index = torch.tensor(label_index)  # 标签也要转换成tensor类型
        return text_index.reshape(-1), label_index, len(label)  # text_index是维度是【1,32】我们不需要第一个维度,所以reshape-1删掉第一个维度

    def __len__(self):
        return self.all_text.__len__()


class Bert_LSTM_NerModel(nn.Module):
    def __init__(self,lstm_hidden,class_num):
        super().__init__()

        self.bert = BertModel.from_pretrained(os.path.join("..","bert_base_chinese"))
        for name,param in self.bert.named_parameters(): # 这两行的意思是把bert预训练模型里的参数固定住,只更新下游任务的参数,因为bert模型太大了可能会炸显存
            param.requires_grad = False

        self.lstm = nn.LSTM(768,lstm_hidden,batch_first=True,num_layers=1,bidirectional=False) # 768是bert输出的固定数量,不能改的
        self.classifier = nn.Linear(lstm_hidden,class_num)

        self.crf = CRF(class_num,batch_first=True)  # crf代替loss的位置,我的理解是crf并没有改变模型,只是纵观所有可能的结果选出较好的一种,具体的选法就是计算各个结果的loss
        # self.loss_fun = nn.CrossEntropyLoss()  # 交叉熵,包括softmax函数+DLL损失计算

    def forward(self,batch_index,batch_label=None):  # 传了真实标签就说明是训练集,要做loss,没传(None)就说明是验证集/测试集,不需要计算loss,直接输出预测就行
        bert_out = self.bert(batch_index)
        # 这是bert模型自带的两个输出,前一个是字符级别特征(一般用于NER),后一个是篇章级别特征(一般用于文本分类等等)
        bert_out0,bert_out1 = bert_out[0],bert_out[1]  # bert_out0:字符级别特征, bert_out1:篇章级别
        # 可以把bert看成50×32×768的矩阵,lstm是768×lstm_hidden_size的矩阵,所以lstm的输出就是50×32×lstm_hidden
        lstm_out,_ = self.lstm(bert_out0)  # lstm有两个输出,第二个输出用不到

        pre = self.classifier(lstm_out)
        if batch_label is not None:  # 如果传了标签过来。开始计算loss
            # pre的维度是【10(10篇文章一个batch),32(每篇文章32个字),30(预测是第三十类别】,batch laber的维度是【10,32】,loss做不了二维和三维的运算,要用reshape转换为【320.30】和【320】
            # loss = self.loss_fun(pre.reshape(-1,pre.shape[-1]),batch_label.reshape(-1))
            loss = -self.crf(pre,batch_label)  # crf不用展开,很方便
            # 这里加负号是因为crf输出就是负数,但神奇的是不管加不加负号,最后的loss训练都会降低
            return loss
        else:  # 如果不需要计算loss

            pre = self.crf.decode(pre)  # 需要crf解码才会得到正确的预测,不过计算loss的时候不用解码
            return pre
            #  return torch.argmax(pre, dim=-1)   # 直接返回最大概率的标签,dim=-1是倒数第一个的维度,也就是表示标签的维度


if __name__ == "__main__":
    # 读取数据,其实用data\\train.txt也行,但mac系统和win系统的路径拼接不一样,所以为了兼容,用os拼接路径,这是个好习惯
    train_text, train_label = read_data(os.path.join("data", "train.txt"))
    dev_text, dev_label = read_data(os.path.join("data", "dev.txt"))
    test_text, test_label = read_data(os.path.join("data", "test.txt"))

    # 构建标签-下标的双向索引
    label_2_index, index_2_label = build_label(train_label)

    tokenizer = BertTokenizer.from_pretrained(os.path.join("..", "bert_base_chinese"))  # 编码器直接调用BERT现成的预训练模型

    batch_size = 50
    epoch = 10
    max_len = 30
    lr = 0.0005
    lstm_hidden = 128
    do_train = False
    do_test = True
    do_input = False

    device = "cuda:0" if torch.cuda.is_available() else "cpu"

    if do_train:
        train_dataset = BertDataset(train_text, train_label, label_2_index, max_len, tokenizer)
        train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)

        dev_dataset = BertDataset(dev_text, dev_label, label_2_index, max_len, tokenizer)
        dev_dataloader = DataLoader(dev_dataset, batch_size=batch_size, shuffle=False)

        model = Bert_LSTM_NerModel(lstm_hidden, len(label_2_index)).to(device)  # len(label_2_index)是分类器的数量,也就是最后一共有多少类,我们用的是预训练模型参数是官方给的
        opt = torch.optim.AdamW(model.parameters(), lr)  # 这个Adam优化器和pytorch自带的没什么区别,都可以用.咳咳,现在Transformers的Adam优化器已经被弃用了,建议用pytorch的

        best_score = -1
        for e in range(epoch):
            model.train()
            timeset = 0
            for batch_text_index,batch_label_index,batch_len in train_dataloader:
                batch_text_index = batch_text_index.to(device)
                batch_label_index = batch_label_index.to(device)
                loss = model.forward(batch_text_index,batch_label_index)
                loss.backward()

                opt.step()
                opt.zero_grad()

                timeset = timeset + 1
                if timeset % 20 == 0:
                    print(f'loss:{loss:.2f}')

            model.eval()

            all_pre = []
            all_tag = []
            for batch_text_index,batch_label_index,batch_len in dev_dataloader:
                batch_text_index = batch_text_index.to(device)
                batch_label_index = batch_label_index.to(device)
                pre = model.forward(batch_text_index)  # 不传标签的时候输出的是预测值,不是loss
                # 之前没用crf的时候需要把tensor转换成list,但crf返回的就是list,所以不用转换
                # pre = pre.cpu().numpy().tolist()  # 因为pre是双重tensor类型的,要转回list类型,所以用的.numpy().tolist()
                tag = batch_label_index.cpu().numpy().tolist()

                for p,t,l in zip(pre,tag,batch_len):
                    p = p[1:1+l]  # 这里为什么要截断,因为之前加了开头结尾符,还有后面的一坨padding,当然这些可以一起拿来算分数,但那些都不算实体,也就没必要,怕影响准确率
                    t = t[1:1+l]

                    p = [index_2_label[i] for i in p]
                    t = [index_2_label[i] for i in t]

                    all_pre.append(p)
                    all_tag.append(t)

            f1_score = seq_f1_score(all_tag,all_pre)
            if f1_score > best_score:
                torch.save(model, "best_model.pt")
                best_score = f1_score

            print(f"best_score:{best_score:.2f},f1_score:{f1_score:.2f}")

    if do_test:
        test_dataset = BertDataset(test_text, test_label, label_2_index, max_len, tokenizer,True)
        test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False)

        model = torch.load("best_model.pt")

        all_pre = []
        all_tag = []
        test_out = []
        for idx,(batch_text_index, batch_label_index, batch_len) in enumerate(test_dataloader):
            text = test_text[idx]

            batch_text_index = batch_text_index.to(device)
            batch_label_index = batch_label_index.to(device)
            pre = model.forward(batch_text_index)

            # pre = pre.cpu().numpy().tolist()
            tag = batch_label_index.cpu().numpy().tolist()


            pre = pre[0][1:-1]
            tag = tag[0][1:-1]

            pre = [index_2_label[i] for i in pre]
            tag = [index_2_label[i] for i in tag]

            all_pre.append(pre)
            all_tag.append(tag)

            test_out.extend([f"{w} {t}" for w,t in zip(text,pre)])
            test_out.append("")

        f1_score = seq_f1_score(all_tag, all_pre)
        print(f"test_f1_score:{f1_score:.2f}")

        with open("test_out.txt","w",encoding='utf-8') as f:
            f.write("\n".join(test_out))

    if do_input:
        model = torch.load("best_model.pt")
        text = input("请输入:")
        # text = text[:510]
        text_idx = tokenizer.encode(text, add_special_tokens=True, return_tensors="pt")
        text_idx = text_idx.to(device)

        pre = model.forward(text_idx)
        pre = pre[0][1:-1]
        pre = [index_2_label[i] for i in pre]
        print("\n".join([f"{w}:{t}" for w,t in zip(text,pre)]))



				
			
届ける言葉を今は育ててる
最后更新于 2024-09-18