HuggingFace Transformers 基础组件之Model(下)

Reference:【HuggingFace Transformers-入门篇】基础组件之Model(下)

文本分类示例

导入相关包

1
from transformers import AutoTokenizer, AutoModelForSequenceClassification

加载数据

本次用到的数据集是一个用于情感分类的中文数据集,下载地址这里还有很多用于其他任务的中文语料库。

1
2
data = pd.read_csv("./ChnSentiCorp_htl_all.csv")
data = data.dropna() # 去除空数据

数据分为两列,label和review,正面评论的label为1反之为0。

dropna()用于数据中的空数据,即丢弃含空值的行或列。

创建Dataset

然后要创建一个自己的数据类,它继承自Dataset类,包括三个方法。

  1. __init__:初始化方法,用于读取数据,并进行简单的数据清洗。
  2. __getitem__:可以根据索引index返回数据,这里让它分开返回label和review。
  3. __len__:返回当前数据的大小size。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from torch.utils.data import Dataset

class MyDataset(Dataset):

def __init__(self) -> None:
super().__init__()
self.data = pd.read_csv("./ChnSentiCorp_htl_all.csv")
self.data = self.data.dropna()

def __getitem__(self, index):
return self.data.iloc[index]["review"], self.data.iloc[index]["label"]

def __len__(self):
return len(self.data)

然后创建一个MyDataset()对象,现在dataset是一条一条的。

1
2
3
dataset = MyDataset()
for i in range(5):
print(dataset[i])

划分数据集

有了总的数据集之后,要进行训练集、验证集的划分。这里用到random_split(),它需要传入两个参数,一个是dataset,一个是分割的比例(如果用比例写的话和要为1)。

1
2
3
from torch.utils.data import random_split

trainset, validset = random_split(dataset, lengths=[0.9, 0.1])

创建Dataloader

划分好数据集之后,需要针对不同的数据集创建不同的DataLoader。因为现在的dataset是一条一条数据的返回,如果希望以batch形式返回,就要用到DataLoader,这里的参数shuffle=True意味着要打乱数据。

1
2
3
4
from torch.utils.data import DataLoader

trainloader = DataLoader(trainset, batch_size=32, shuffle=True)
validloader = DataLoader(validset, batch_size=64, shuffle=False)

这个时候validloader里面的数据如下图,可以看到存储的是原始文本数据和label的tensor向量形式,但我们还需要把原始文本数据处理为编码数据,因此要用到参数collate_fn

collate_fn是用于整理数据的函数,这个可视化视频可以帮助理解。它让data loader按照batch取数据时,

  1. 取出大小等同于batch size的index列表;
  2. 将index输入到dataset的getitem()中取出index对应的数据;
  3. 对每个index对应的数据进行堆叠,形成一个batch的数据。

在这里,指定collate_fn=collate_func来整理数据,我们需要编写collate_func这个函数。 首先我们要将文本内容和label都拿到,所以创建两个list,通过遍历来分别存储text和label。然后用tokenizer进行分词,设置填充和截断。

input是个字典,且上一篇有提到如果传入了label模型会自动计算loss,所以最后新增一个label的键来存储label的tensor向量,不需要额外写计算loss的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import torch
from torch.utils.data import DataLoader

tokenizer = AutoTokenizer.from_pretrained("hflrbt3")

def collate_func(batch):
texts, labels = [], []
for item in batch:
texts.append(item[0])
labels.append(item[1])
inputs = tokenizer(texts, max_length=128, padding="max_length", truncation=True, return_tensors="pt")
inputs["labels"] = torch.tensor(labels)
return inputs

trainloader = DataLoader(trainset, batch_size=32, shuffle=True, collate_fn=collate_func)
validloader = DataLoader(validset, batch_size=64, shuffle=False, collate_fn=collate_func)

这里的input返回的是list,所以要设置参数return_tensors="pt"让他返回tensor。可以看到数据以batch size的大小进行堆叠,便于后续计算。

创建模型及优化器

优化器可以从torch里面直接导,使用Adam优化器,要传入两个参数:模型要优化的参数model.parameters()和学习率lr=2e-5,因为是做迁移学习所以不需要很高的学习率。

1
2
3
4
5
6
7
8
from torch.optim import Adam

model = AutoModelForSequenceClassification.from_pretrained("hflrbt3")

# 如果有gpu,则将模型放到gpu上
if torch.cuda.is_available():
model = model.cuda()
optimizer = Adam(model.parameters(), lr=2e-5)

训练与验证

在pytorch中,通过调用model.train()model.eval()来实现训练、评估两种模式的切换。这两种模式下一些层的运行方式会有不同,具体可以看这篇博客

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 验证部分
def evaluate():
model.eval() # 将模型设置为评估模式
acc_num = 0 # 统计预测正确的个数
with torch.inference_mode():
for batch in validloader:
if torch.cuda.is_available():
batch = {k: v.cuda() for k, v in batch.items()}
output = model(**batch)
pred = torch.argmax(output.logits, dim=-1)
acc_num += (pred.long() == batch["labels"].long()).float().sum()
return acc_num / len(validset)

# 训练三轮,每一百步打印一次
def train(epoch=3, log_step=100):
global_step = 0
for ep in range(epoch):
model.train() # 将模型设置为训练模式
for batch in trainloader:
# 如果有gpu,就把要计算的数据都放到gpu上
if torch.cuda.is_available():
batch = {k: v.cuda() for k, v in batch.items()}
optimizer.zero_grad() # 优化器梯度归零
output = model(**batch)
output.loss.backward()
optimizer.step() # 更新模型
if global_step % log_step == 0:
print(f"ep: {ep}, global_step: {global_step}, loss: {output.loss.item()}")
global_step += 1
acc = evaluate() # 每轮训练结束后评估一次模型
print(f"ep: {ep}, acc: {acc}")

模型训练

直接调用train()函数。有gpu还是好,我是纯cpu挂着跑,四十多分钟才跑出来真是泪目了……

模型预测

做一个id到label的映射,切换到模型评估模式进行预测。

1
2
3
4
5
6
7
8
9
10
sen = "我觉得这家酒店不错,饭很好吃!"
id2_label = {0: "差评!", 1: "好评!"}
model.eval()
with torch.inference_mode():
inputs = tokenizer(sen, return_tensors="pt")
if torch.cuda.is_available():
inputs = {k: v.cuda() for k, v in inputs.items()}
logits = model(**inputs).logits
pred = torch.argmax(logits, dim=-1)
print(f"输入:{sen}\n模型预测结果:{id2_label.get(pred.item())}")

除此之外,也可以用pipeline来实现预测。

【代码汇总】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from torch.utils.data import Dataset
from torch.utils.data import random_split
import torch
from torch.utils.data import DataLoader
from torch.optim import Adam

csv_path = "D:\\transformers_test\\01 Getting Started\\04 model\\ChnSentiCorp_htl_all.csv"

# 0.定义MyDataset类,继承自Dataset类
class MyDataset(Dataset):

def __init__(self) -> None:
super().__init__()
self.data = pd.read_csv(csv_path)
self.data = self.data.dropna()

def __getitem__(self, index):
return self.data.iloc[index]["review"], self.data.iloc[index]["label"]

def __len__(self):
return len(self.data)

# 1. 读取数据 + 简单清洗数据
dataset = MyDataset()

# 2. 划分数据集
trainset, validset = random_split(dataset, lengths=[0.9, 0.1])

# 3. 分词
tokenizer = AutoTokenizer.from_pretrained("hflrbt3")

# 4. 定义dataloader
def collate_func(batch):
texts, labels = [], []
for item in batch:
texts.append(item[0])
labels.append(item[1])
inputs = tokenizer(texts, max_length=128, padding="max_length", truncation=True, return_tensors="pt")
inputs["labels"] = torch.tensor(labels)
return inputs

trainloader = DataLoader(trainset, batch_size=32, shuffle=True, collate_fn=collate_func)
validloader = DataLoader(validset, batch_size=64, shuffle=False, collate_fn=collate_func)

# 5. 创建模型和优化器
model = AutoModelForSequenceClassification.from_pretrained("hflrbt3")
if torch.cuda.is_available():
model = model.cuda()

optimizer = Adam(model.parameters(), lr=2e-5)

# 6. 训练与验证
def evaluate():
model.eval()
acc_num = 0
with torch.inference_mode():
for batch in validloader:
if torch.cuda.is_available():
batch = {k: v.cuda() for k, v in batch.items()}
output = model(**batch)
pred = torch.argmax(output.logits, dim=-1)
acc_num += (pred.long() == batch["labels"].long()).float().sum()
return acc_num / len(validset)

def train(epoch=3, log_step=100):
global_step = 0
for ep in range(epoch):
model.train()
for batch in trainloader:
if torch.cuda.is_available():
batch = {k: v.cuda() for k, v in batch.items()}
optimizer.zero_grad()
output = model(**batch)
output.loss.backward()
optimizer.step()
if global_step % log_step == 0:
print(f"ep: {ep}, global_step: {global_step}, loss: {output.loss.item()}")
global_step += 1
acc = evaluate()
print(f"ep: {ep}, acc: {acc}")

# 7. 模型训练
train()

# 8. 模型预测
sen = "我觉得这家酒店不错,饭很好吃!"
id2_label = {0: "差评!", 1: "好评!"}
model.eval()
with torch.inference_mode():
inputs = tokenizer(sen, return_tensors="pt")
if torch.cuda.is_available():
inputs = {k: v.cuda() for k, v in inputs.items()}
logits = model(**inputs).logits
pred = torch.argmax(logits, dim=-1)
print(f"输入:{sen}\n模型预测结果:{id2_label.get(pred.item())}")


HuggingFace Transformers 基础组件之Model(下)
https://jiangcara.github.io/posts/f568602f/
作者
Jiang Cara
发布于
2024年7月26日
许可协议