命名实体识别(Named Entity Recognition, NER)是一种自然语言处理(NLP)任务,旨在识别文本中具有特定意义的实体并将其分类。常见的实体类型包括:
- 人名(Person Names)
- 地名(Location Names)
- 组织机构名(Organization Names)
- 时间表达(Temporal Expressions)
- 货币单位(Monetary Units)
NER 通常用于信息提取、问答系统和文本分析等应用中。通过识别文本中的命名实体,可以更好地理解其内容和结构,从而支持更高级的自然语言处理任务。
数据标注
在命名实体识别(NER)任务中,数据标注是至关重要的一步,它涉及到对文本中的特定实体进行分类和标记。标注的符号通常包括以下几种:
- B(Begin):表示实体的开始部分。
- I(Inside):表示实体的内部部分,用于连续的字符中除了开始和结束的其他部分。
- E(End):表示实体的结束部分。
- S(Single):表示单个字符的实体。如果实体只由一个字组成,那么这个字会被标记为S。
- O(Outside):表示不属于任何实体的字符或词。
例如:右图就是一个标注的例子,如果是一个字表示一个实体,比如“赣”,那么将会标注为S。
M和I都表示是一个词的中间部分,但有的python包不支持M标签,所以按实际情况来吧。
评价指标
例如上图有五个预测的标签,可以看到有位置对但标签错,多或者少预测了一个字,以及预测到了其他的不应该出现的实体。
对于这么多种预测值的打分标准,是要根据实际需求来的,例如有的只要求预测对了位置就行,标签无所谓,那么pre1就可以得分。
目前常用的指标
目前常用的指标是seqeval.metrics包里的f1_score分数。
要介绍这个f1_score分数,就要先从准确率(accuracy),精确率(precision)和召回率(recall)开始说起。
准确率(accuracy_score):即将文本中所有的文字对应标签的预测正确率做统计,一般不采用准确率,因为准确率会虚高。例如一个含五百字的文本,实体只有五个字,那么即使全部预测为O,那准确率也有90%。
精确率(precision_score):即只对实体部分的文字对应的标签预测正确率做统计,这种的可行度就很高,例如一个含五百字的文本,有一个五字实体(正确标签是BMMME),预测是BMMEE,那么精确率就是4/5
召回率(recall_score):即按照实体个数为单位进行正确率的统计,例如上一个例子即使五个字对了四个,没有全队也依然算全错,例如一个含五百字的文本,有三个实体,其中有两个实体的标签完全预测正确,那么召回率就是2/3
f1_score:该指标由精确率和召回率决定,计算公式为f1_score = (2 * 精确率 * 召回率 )/ ( 精确率 + 召回率),所以该指标是较为公正的一个指标。
代码实现
采用的是Bert+BiLSTM+CRF
另外我发现两个特别无语的问题,第一个是显示bert模型导入失败,我一开始以为是版本过高,结果是pillow库和这个冲突了。第二个是显示接受到的数据是空列表,也就是没读到数据,然后找了一万遍发现我原来的数据格式是beems,我手动改后缀成txt格式后虽然能打开看到,但机器不认,必须用格式转换。
'''
torch==1.9.1
transformers == 4.18.0
seqeval == 1.2.2
pytorch-crf == 0.7.2
'''
import os
from torch.utils.data import Dataset,DataLoader
from transformers import BertTokenizer
from transformers import BertModel
# from transformers import AdamW
import torch
import torch.nn as nn
from seqeval.metrics import accuracy_score as seq_accuracy_score
from seqeval.metrics import f1_score as seq_f1_score
from seqeval.metrics import precision_score as seq_precision_score
from seqeval.metrics import recall_score as seq_recall_score
from torchcrf import CRF
def read_data(file):
with open(file, "r", encoding="utf-8") as f: # 默认用utf-8读取,如果是乱码改成GBK试试
all_data = f.read().split("\n") # 一行一行读进来
all_text = []
all_label = []
text = []
label = []
for data in all_data:
if data == "": # 如果该行为空,说明一个句子借宿了,那就把这个句子添加进all_text列表里,标签同理
all_text.append(text)
all_label.append(label)
text = [] # 每读进去一个句子清空,为读取下一个句子做准备
label = []
else: # 如果该行不为空,那说明这个句子还没完,用空格分隔字和标签,然后存储到text和label列表中
t, l = data.split(" ")
text.append(t)
label.append(l)
return all_text, all_label
def build_label(train_label): # BMSE这些标签机器看不懂,要转换成0123这种机器看的懂的符号
label_2_index = {"PAD": 0, "UNK": 1} # 额外增加PAD标注和UNK未知标注
for label in train_label:
for l in label:
if l not in label_2_index:
label_2_index[l] = len(label_2_index) # 构建标注符号-下标的索引
return label_2_index, list(label_2_index) # list是把所有的标签放在一个列表里了,而列表天生带有下标,因此后期可以用来做index_2_label
class BertDataset(Dataset): #
def __init__(self, all_text, all_label, label_2_index, max_len, tokenizer):
self.all_text = all_text
self.all_label = all_label
self.label_2_index = label_2_index
self.tokenizer = tokenizer
self.max_len = max_len
def __getitem__(self, index):
text = self.all_text[index]
label = self.all_label[index][:self.max_len] # 后面这个是截断的意思,因为文本在bert编码器那里截断了,这里也要截断
# 参数分别为:需要编码的文本,是否添加特殊字符(开头结尾符等),最大长度(因为有开头结尾符所以要+2),按最长句长度填充,允许按最长句截断,返回类型为pytorch的tensor型
text_index = self.tokenizer.encode(text, add_special_tokens=True, max_length=self.max_len + 2,
padding="max_length", truncation=True, return_tensors="pt")
# 开头结尾符标志为0,1是未知符的标号,所以get(l,1)就是如果见过就取下标为l的标志,没见过就去下标为1(UNK)的标志,最后再加上填充的PAD字符
label_index = [0] + [self.label_2_index.get(l, 1) for l in label] + [0] + [0] * (max_len - len(text))
label_index = torch.tensor(label_index) # 标签也要转换成tensor类型
return text_index.reshape(-1), label_index, len(label) # text_index是维度是【1,32】我们不需要第一个维度,所以reshape-1删掉第一个维度
def __len__(self):
return self.all_text.__len__()
class Bert_LSTM_NerModel(nn.Module):
def __init__(self,lstm_hidden,class_num):
super().__init__()
self.bert = BertModel.from_pretrained(os.path.join("..","bert_base_chinese"))
for name,param in self.bert.named_parameters(): # 这两行的意思是把bert预训练模型里的参数固定住,只更新下游任务的参数,因为bert模型太大了可能会炸显存
param.requires_grad = False
self.lstm = nn.LSTM(768,lstm_hidden,batch_first=True,num_layers=1,bidirectional=False) # 768是bert输出的固定数量,不能改的
self.classifier = nn.Linear(lstm_hidden,class_num)
self.crf = CRF(class_num,batch_first=True) # crf代替loss的位置,我的理解是crf并没有改变模型,只是纵观所有可能的结果选出较好的一种,具体的选法就是计算各个结果的loss
# self.loss_fun = nn.CrossEntropyLoss() # 交叉熵,包括softmax函数+DLL损失计算
def forward(self,batch_index,batch_label=None): # 传了真实标签就说明是训练集,要做loss,没传(None)就说明是验证集/测试集,不需要计算loss,直接输出预测就行
bert_out = self.bert(batch_index)
# 这是bert模型自带的两个输出,前一个是字符级别特征(一般用于NER),后一个是篇章级别特征(一般用于文本分类等等)
bert_out0,bert_out1 = bert_out[0],bert_out[1] # bert_out0:字符级别特征, bert_out1:篇章级别
# 可以把bert看成50×32×768的矩阵,lstm是768×lstm_hidden_size的矩阵,所以lstm的输出就是50×32×lstm_hidden
lstm_out,_ = self.lstm(bert_out0) # lstm有两个输出,第二个输出用不到
pre = self.classifier(lstm_out)
if batch_label is not None: # 如果传了标签过来。开始计算loss
# pre的维度是【10(10篇文章一个batch),32(每篇文章32个字),30(预测是第三十类别】,batch laber的维度是【10,32】,loss做不了二维和三维的运算,要用reshape转换为【320.30】和【320】
# loss = self.loss_fun(pre.reshape(-1,pre.shape[-1]),batch_label.reshape(-1))
loss = -self.crf(pre,batch_label) # crf不用展开,很方便
# 这里加负号是因为crf输出就是负数,但神奇的是不管加不加负号,最后的loss训练都会降低
return loss
else: # 如果不需要计算loss
pre = self.crf.decode(pre) # 需要crf解码才会得到正确的预测,不过计算loss的时候不用解码
return pre
# return torch.argmax(pre, dim=-1) # 直接返回最大概率的标签,dim=-1是倒数第一个的维度,也就是表示标签的维度
if __name__ == "__main__":
# 读取数据,其实用data\\train.txt也行,但mac系统和win系统的路径拼接不一样,所以为了兼容,用os拼接路径,这是个好习惯
train_text, train_label = read_data(os.path.join("data", "train.txt"))
dev_text, dev_label = read_data(os.path.join("data", "dev.txt"))
test_text, test_label = read_data(os.path.join("data", "test.txt"))
# 构建标签-下标的双向索引
label_2_index, index_2_label = build_label(train_label)
tokenizer = BertTokenizer.from_pretrained(os.path.join("..", "bert_base_chinese")) # 编码器直接调用BERT现成的预训练模型
batch_size = 50
epoch = 10
max_len = 30
lr = 0.0005
lstm_hidden = 128
do_train = False
do_test = True
do_input = False
device = "cuda:0" if torch.cuda.is_available() else "cpu"
if do_train:
train_dataset = BertDataset(train_text, train_label, label_2_index, max_len, tokenizer)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
dev_dataset = BertDataset(dev_text, dev_label, label_2_index, max_len, tokenizer)
dev_dataloader = DataLoader(dev_dataset, batch_size=batch_size, shuffle=False)
model = Bert_LSTM_NerModel(lstm_hidden, len(label_2_index)).to(device) # len(label_2_index)是分类器的数量,也就是最后一共有多少类,我们用的是预训练模型参数是官方给的
opt = torch.optim.AdamW(model.parameters(), lr) # 这个Adam优化器和pytorch自带的没什么区别,都可以用.咳咳,现在Transformers的Adam优化器已经被弃用了,建议用pytorch的
best_score = -1
for e in range(epoch):
model.train()
timeset = 0
for batch_text_index,batch_label_index,batch_len in train_dataloader:
batch_text_index = batch_text_index.to(device)
batch_label_index = batch_label_index.to(device)
loss = model.forward(batch_text_index,batch_label_index)
loss.backward()
opt.step()
opt.zero_grad()
timeset = timeset + 1
if timeset % 20 == 0:
print(f'loss:{loss:.2f}')
model.eval()
all_pre = []
all_tag = []
for batch_text_index,batch_label_index,batch_len in dev_dataloader:
batch_text_index = batch_text_index.to(device)
batch_label_index = batch_label_index.to(device)
pre = model.forward(batch_text_index) # 不传标签的时候输出的是预测值,不是loss
# 之前没用crf的时候需要把tensor转换成list,但crf返回的就是list,所以不用转换
# pre = pre.cpu().numpy().tolist() # 因为pre是双重tensor类型的,要转回list类型,所以用的.numpy().tolist()
tag = batch_label_index.cpu().numpy().tolist()
for p,t,l in zip(pre,tag,batch_len):
p = p[1:1+l] # 这里为什么要截断,因为之前加了开头结尾符,还有后面的一坨padding,当然这些可以一起拿来算分数,但那些都不算实体,也就没必要,怕影响准确率
t = t[1:1+l]
p = [index_2_label[i] for i in p]
t = [index_2_label[i] for i in t]
all_pre.append(p)
all_tag.append(t)
f1_score = seq_f1_score(all_tag,all_pre)
if f1_score > best_score:
torch.save(model, "best_model.pt")
best_score = f1_score
print(f"best_score:{best_score:.2f},f1_score:{f1_score:.2f}")
if do_test:
test_dataset = BertDataset(test_text, test_label, label_2_index, max_len, tokenizer,True)
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False)
model = torch.load("best_model.pt")
all_pre = []
all_tag = []
test_out = []
for idx,(batch_text_index, batch_label_index, batch_len) in enumerate(test_dataloader):
text = test_text[idx]
batch_text_index = batch_text_index.to(device)
batch_label_index = batch_label_index.to(device)
pre = model.forward(batch_text_index)
# pre = pre.cpu().numpy().tolist()
tag = batch_label_index.cpu().numpy().tolist()
pre = pre[0][1:-1]
tag = tag[0][1:-1]
pre = [index_2_label[i] for i in pre]
tag = [index_2_label[i] for i in tag]
all_pre.append(pre)
all_tag.append(tag)
test_out.extend([f"{w} {t}" for w,t in zip(text,pre)])
test_out.append("")
f1_score = seq_f1_score(all_tag, all_pre)
print(f"test_f1_score:{f1_score:.2f}")
with open("test_out.txt","w",encoding='utf-8') as f:
f.write("\n".join(test_out))
if do_input:
model = torch.load("best_model.pt")
text = input("请输入:")
# text = text[:510]
text_idx = tokenizer.encode(text, add_special_tokens=True, return_tensors="pt")
text_idx = text_idx.to(device)
pre = model.forward(text_idx)
pre = pre[0][1:-1]
pre = [index_2_label[i] for i in pre]
print("\n".join([f"{w}:{t}" for w,t in zip(text,pre)]))
Comments NOTHING