从PyTorch的DDP到Accelerate再到Trainer,轻松掌握分布式训练
'PyTorch DDP to Accelerate to Trainer, mastering distributed training effortlessly.'
总览
本教程假设您对PyTorch和如何训练简单模型有基本的了解。它将展示通过一种称为分布式数据并行(DDP)的过程在多个GPU上进行训练,通过三个不同级别的递增抽象:
- 通过
pytorch.distributed
模块的原生PyTorch DDP - 利用🤗 Accelerate对
pytorch.distributed
的轻量级封装,该封装还可以确保代码可以在单个GPU和TPU上运行,并且对原始代码的更改很小 - 利用🤗 Transformer的高级Trainer API,它抽象了所有样板代码,并支持各种设备和分布式场景
什么是“分布式”训练,为什么它很重要?
请看下面一些非常基本的PyTorch训练代码,它基于官方MNIST示例设置和训练一个模型:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
class BasicNet(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
self.act = F.relu
def forward(self, x):
x = self.act(self.conv1(x))
x = self.act(self.conv2(x))
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.act(self.fc1(x))
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
我们定义训练设备(cuda
):
device = "cuda"
构建一些PyTorch数据加载器:
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
将模型移动到CUDA设备上:
model = BasicNet().to(device)
构建一个PyTorch优化器:
optimizer = optim.AdamW(model.parameters(), lr=1e-3)
最后创建一个简单的训练和评估循环,对数据集执行一次完整的迭代并计算测试准确率:
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
optimizer.zero_grad()
model.eval()
correct = 0
with torch.no_grad():
for data, target in test_loader:
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'准确率:{100. * correct / len(test_loader.dataset)}')
通常情况下,您可以将所有这些内容放入一个Python脚本中,或者在Jupyter Notebook上运行它。
然而,如果有两个GPU或多个机器可用,您如何让这个脚本在这些资源上运行,以通过分布式训练提高训练速度呢?仅仅运行python myscript.py
只会使用单个GPU运行脚本。这就是torch.distributed
发挥作用的地方。
PyTorch分布式数据并行
正如其名所示,torch.distributed
是用于分布式设置的。这可以包括多节点,其中每个节点都有一个单独的GPU,或者多GPU,其中一个系统有多个GPU,或者两者的组合。
要将上面的代码转换为在分布式设置中工作,首先必须定义一些设置配置,详细说明在DDP教程中。
首先,必须声明一个setup
和一个cleanup
函数。这将打开一个处理组,所有计算进程都可以通过该组进行通信。
注意:对于教程的本节,应假设这些是以python脚本文件的形式发送的。稍后将讨论使用Accelerate的启动器,它将消除这种必要性。
import os
import torch.distributed as dist
def setup(rank, world_size):
"为PyTorch分布式数据并行设置进程组和配置"
os.environ["MASTER_ADDR"] = 'localhost'
os.environ["MASTER_PORT"] = "12355"
# 初始化进程组
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
"清理分布式环境"
dist.destroy_process_group()
拼图的最后一块是如何将数据和模型发送到另一个GPU上?
这就是DistributedDataParallel
模块的作用。它会将模型复制到每个GPU上,当调用loss.backward()
时,会执行反向传播,并对所有这些模型副本上的梯度进行平均/缩减。这确保每个设备在优化器步骤后具有相同的权重。
下面是我们的训练设置的示例,重构为一个具有这种功能的函数:
注意:这里的rank是当前GPU与所有其他可用GPU相比的整体排名,意味着它们的排名是
0 -> n-1
from torch.nn.parallel import DistributedDataParallel as DDP
def train(model, rank, world_size):
setup(rank, world_size)
model = model.to(rank)
ddp_model = DDP(model, device_ids=[rank])
optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
# 训练一个epoch
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
optimizer.zero_grad()
cleanup()
为了正确计算所有梯度,优化器需要根据特定设备上的模型(即ddp_model
而不是model
)声明。
最后,要运行脚本,PyTorch有一个方便的torchrun
命令行模块可以帮助。只需传入它应该使用的节点数以及要运行的脚本,就可以了:
torchrun --nproc_per_nodes=2 --nnodes=1 example_script.py
上述命令将在一台单机上的两个GPU上运行训练脚本,这是使用PyTorch进行分布式训练的基本操作。
现在让我们谈谈Accelerate,这是一个旨在使这个过程更加无缝并帮助实践的库。
🤗 Accelerate
Accelerate是一个设计用于允许您执行上述操作的库,而无需大幅修改代码。除此之外,Accelerate固有的数据流水线也可以提高代码的性能。
首先,让我们将我们刚刚执行的所有代码包装到一个函数中,以帮助我们可视化差异:
def train_ddp(rank, world_size):
setup(rank, world_size)
# 构建DataLoader
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
# 构建模型
model = model.to(rank)
ddp_model = DDP(model, device_ids=[rank])
# 构建优化器
optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
# 训练一个epoch
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
optimizer.zero_grad()
# 评估
model.eval()
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'准确率:{100. * correct / len(test_loader.dataset)}')
接下来让我们来谈谈加速器如何帮助。上述代码存在一些问题:
- 这种方法稍微低效,因为每个设备都会生成
n
个数据加载器并进行推送。 - 此代码仅适用于多GPU,因此需要特别注意在单节点上重新运行或在TPU上运行它。
加速器通过 Accelerator
类来解决这个问题。通过它,除了比较单节点和多节点时的三行代码之外,其余代码基本相同,如下所示:
def train_ddp_accelerate():
accelerator = Accelerator()
# 构建数据加载器
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
# 构建模型
model = BasicModel()
# 构建优化器
optimizer = optim.AdamW(model.parameters(), lr=1e-3)
# 通过 `accelerator.prepare` 将所有内容发送
train_loader, test_loader, model, optimizer = accelerator.prepare(
train_loader, test_loader, model, optimizer
)
# 进行单个epoch的训练
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
output = model(data)
loss = F.nll_loss(output, target)
accelerator.backward(loss)
optimizer.step()
optimizer.zero_grad()
# 评估
model.eval()
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'准确率: {100. * correct / len(test_loader.dataset)}')
有了这个,你的PyTorch训练循环现在已经设置好可以在任何分布式环境中运行,这要归功于 Accelerator
对象。这段代码仍然可以通过 torchrun
CLI 或者 Accelerate 自带的CLI界面 accelerate launch
来启动。
因此,使用 Accelerate 进行分布式训练变得非常简单,并且尽可能保持了原始的PyTorch代码。
之前提到过,Accelerate 还可以使数据加载器更高效。这是通过自定义的采样器实现的,训练期间可以自动将部分批次发送到不同的设备上,从而在内存中一次只知道数据的一个副本,而不是根据配置一次性将四个副本全部加载到内存中。此外,内存中只有一个原始数据集的完整副本。该数据集的子集被分割在用于训练的所有节点之间,这样就可以在单个实例上训练更大的数据集,而不会导致内存使用量激增。
使用 notebook_launcher
之前提到可以直接从Jupyter Notebook中启动分布式代码。这是通过 Accelerate 的 notebook_launcher
工具实现的,它允许在Jupyter Notebook中基于代码启动多GPU训练。
使用它非常简单,只需导入启动器:
from accelerate import notebook_launcher
然后传入之前声明的训练函数、要传递的任何参数以及要使用的进程数量(例如在TPU上使用8个进程,或者在两个GPU上使用2个进程)。可以运行上述两个训练函数,但请注意,在启动一个单独的实例后,需要重新启动该实例才能生成另一个实例。
notebook_launcher(train_ddp, args=(), num_processes=2)
或者:
notebook_launcher(train_accelerate_ddp, args=(), num_processes=2)
使用 🤗 Trainer
最后,我们来到了最高级别的API — Hugging Face Trainer。
这个包装了尽可能多的训练,同时还能在分布式系统上进行训练,而用户无需做任何操作。
首先,我们需要导入Trainer:
from transformers import Trainer
然后我们定义一些TrainingArguments
来控制所有常见的超参数。Trainer也能处理字典,所以需要创建一个自定义的collate函数。
最后,我们继承Trainer并编写自己的compute_loss
。
之后,这段代码也可以在分布式环境下运行,而无需编写任何训练代码!
from transformers import Trainer, TrainingArguments
model = BasicNet()
training_args = TrainingArguments(
"basic-trainer",
per_device_train_batch_size=64,
per_device_eval_batch_size=64,
num_train_epochs=1,
evaluation_strategy="epoch",
remove_unused_columns=False
)
def collate_fn(examples):
pixel_values = torch.stack([example[0] for example in examples])
labels = torch.tensor([example[1] for example in examples])
return {"x":pixel_values, "labels":labels}
class MyTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False):
outputs = model(inputs["x"])
target = inputs["labels"]
loss = F.nll_loss(outputs, target)
return (loss, outputs) if return_outputs else loss
trainer = MyTrainer(
model,
training_args,
train_dataset=train_dset,
eval_dataset=test_dset,
data_collator=collate_fn,
)
trainer.train()
***** 运行训练 *****
样本数量 = 60000
训练轮数 = 1
每个设备的批量训练大小 = 64
总的训练批量大小(并行、分布式和累积) = 64
梯度累积步数 = 1
总的优化步数 = 938
与上面使用notebook_launcher
的示例类似,可以通过将所有内容放入一个训练函数中来实现:
def train_trainer_ddp():
model = BasicNet()
training_args = TrainingArguments(
"basic-trainer",
per_device_train_batch_size=64,
per_device_eval_batch_size=64,
num_train_epochs=1,
evaluation_strategy="epoch",
remove_unused_columns=False
)
def collate_fn(examples):
pixel_values = torch.stack([example[0] for example in examples])
labels = torch.tensor([example[1] for example in examples])
return {"x":pixel_values, "labels":labels}
class MyTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False):
outputs = model(inputs["x"])
target = inputs["labels"]
loss = F.nll_loss(outputs, target)
return (loss, outputs) if return_outputs else loss
trainer = MyTrainer(
model,
training_args,
train_dataset=train_dset,
eval_dataset=test_dset,
data_collator=collate_fn,
)
trainer.train()
notebook_launcher(train_trainer_ddp, args=(), num_processes=2)
资源
要了解有关PyTorch分布式数据并行性的更多信息,请查看此处的文档
要了解更多关于🤗加速的信息,请查看此处的文档
要了解更多关于🤗变压器的信息,请查看此处的文档