从PyTorch的DDP到Accelerate再到Trainer,轻松掌握分布式训练

'PyTorch DDP to Accelerate to Trainer, mastering distributed training effortlessly.'

总览

本教程假设您对PyTorch和如何训练简单模型有基本的了解。它将展示通过一种称为分布式数据并行(DDP)的过程在多个GPU上进行训练,通过三个不同级别的递增抽象:

  • 通过pytorch.distributed模块的原生PyTorch DDP
  • 利用🤗 Accelerate对pytorch.distributed的轻量级封装,该封装还可以确保代码可以在单个GPU和TPU上运行,并且对原始代码的更改很小
  • 利用🤗 Transformer的高级Trainer API,它抽象了所有样板代码,并支持各种设备和分布式场景

什么是“分布式”训练,为什么它很重要?

请看下面一些非常基本的PyTorch训练代码,它基于官方MNIST示例设置和训练一个模型:

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

class BasicNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)
        self.act = F.relu

    def forward(self, x):
        x = self.act(self.conv1(x))
        x = self.act(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.act(self.fc1(x))
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output

我们定义训练设备(cuda):

device = "cuda"

构建一些PyTorch数据加载器:

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307), (0.3081))
])

train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

将模型移动到CUDA设备上:

model = BasicNet().to(device)

构建一个PyTorch优化器:

optimizer = optim.AdamW(model.parameters(), lr=1e-3)

最后创建一个简单的训练和评估循环,对数据集执行一次完整的迭代并计算测试准确率:

model.train()
for batch_idx, (data, target) in enumerate(train_loader):
    data, target = data.to(device), target.to(device)
    output = model(data)
    loss = F.nll_loss(output, target)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

model.eval()
correct = 0
with torch.no_grad():
    for data, target in test_loader:
        output = model(data)
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
print(f'准确率:{100. * correct / len(test_loader.dataset)}')

通常情况下,您可以将所有这些内容放入一个Python脚本中,或者在Jupyter Notebook上运行它。

然而,如果有两个GPU或多个机器可用,您如何让这个脚本在这些资源上运行,以通过分布式训练提高训练速度呢?仅仅运行python myscript.py只会使用单个GPU运行脚本。这就是torch.distributed发挥作用的地方。

PyTorch分布式数据并行

正如其名所示,torch.distributed是用于分布式设置的。这可以包括多节点,其中每个节点都有一个单独的GPU,或者多GPU,其中一个系统有多个GPU,或者两者的组合。

要将上面的代码转换为在分布式设置中工作,首先必须定义一些设置配置,详细说明在DDP教程中。

首先,必须声明一个setup和一个cleanup函数。这将打开一个处理组,所有计算进程都可以通过该组进行通信。

注意:对于教程的本节,应假设这些是以python脚本文件的形式发送的。稍后将讨论使用Accelerate的启动器,它将消除这种必要性。

import os
import torch.distributed as dist

def setup(rank, world_size):
    "为PyTorch分布式数据并行设置进程组和配置"
    os.environ["MASTER_ADDR"] = 'localhost'
    os.environ["MASTER_PORT"] = "12355"

    # 初始化进程组
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    "清理分布式环境"
    dist.destroy_process_group()

拼图的最后一块是如何将数据和模型发送到另一个GPU上?

这就是DistributedDataParallel模块的作用。它会将模型复制到每个GPU上,当调用loss.backward()时,会执行反向传播,并对所有这些模型副本上的梯度进行平均/缩减。这确保每个设备在优化器步骤后具有相同的权重。

下面是我们的训练设置的示例,重构为一个具有这种功能的函数:

注意:这里的rank是当前GPU与所有其他可用GPU相比的整体排名,意味着它们的排名是0 -> n-1

from torch.nn.parallel import DistributedDataParallel as DDP

def train(model, rank, world_size):
    setup(rank, world_size)
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])
    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
    # 训练一个epoch
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    cleanup()

为了正确计算所有梯度,优化器需要根据特定设备上的模型(即ddp_model而不是model)声明。

最后,要运行脚本,PyTorch有一个方便的torchrun命令行模块可以帮助。只需传入它应该使用的节点数以及要运行的脚本,就可以了:

torchrun --nproc_per_nodes=2 --nnodes=1 example_script.py

上述命令将在一台单机上的两个GPU上运行训练脚本,这是使用PyTorch进行分布式训练的基本操作。

现在让我们谈谈Accelerate,这是一个旨在使这个过程更加无缝并帮助实践的库。

🤗 Accelerate

Accelerate是一个设计用于允许您执行上述操作的库,而无需大幅修改代码。除此之外,Accelerate固有的数据流水线也可以提高代码的性能。

首先,让我们将我们刚刚执行的所有代码包装到一个函数中,以帮助我们可视化差异:

def train_ddp(rank, world_size):
    setup(rank, world_size)
    # 构建DataLoader
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307), (0.3081))
    ])

    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)

    train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
    test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

    # 构建模型
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    # 构建优化器
    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)

    # 训练一个epoch
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    
    # 评估
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    print(f'准确率:{100. * correct / len(test_loader.dataset)}')

接下来让我们来谈谈加速器如何帮助。上述代码存在一些问题:

  1. 这种方法稍微低效,因为每个设备都会生成 n 个数据加载器并进行推送。
  2. 此代码仅适用于多GPU,因此需要特别注意在单节点上重新运行或在TPU上运行它。

加速器通过 Accelerator 类来解决这个问题。通过它,除了比较单节点和多节点时的三行代码之外,其余代码基本相同,如下所示:

def train_ddp_accelerate():
    accelerator = Accelerator()
    # 构建数据加载器
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307), (0.3081))
    ])

    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)

    train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
    test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

    # 构建模型
    model = BasicModel()

    # 构建优化器
    optimizer = optim.AdamW(model.parameters(), lr=1e-3)

    # 通过 `accelerator.prepare` 将所有内容发送
    train_loader, test_loader, model, optimizer = accelerator.prepare(
        train_loader, test_loader, model, optimizer
    )

    # 进行单个epoch的训练
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        output = model(data)
        loss = F.nll_loss(output, target)
        accelerator.backward(loss)
        optimizer.step()
        optimizer.zero_grad()
    
    # 评估
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    print(f'准确率: {100. * correct / len(test_loader.dataset)}')

有了这个,你的PyTorch训练循环现在已经设置好可以在任何分布式环境中运行,这要归功于 Accelerator 对象。这段代码仍然可以通过 torchrun CLI 或者 Accelerate 自带的CLI界面 accelerate launch 来启动。

因此,使用 Accelerate 进行分布式训练变得非常简单,并且尽可能保持了原始的PyTorch代码。

之前提到过,Accelerate 还可以使数据加载器更高效。这是通过自定义的采样器实现的,训练期间可以自动将部分批次发送到不同的设备上,从而在内存中一次只知道数据的一个副本,而不是根据配置一次性将四个副本全部加载到内存中。此外,内存中只有一个原始数据集的完整副本。该数据集的子集被分割在用于训练的所有节点之间,这样就可以在单个实例上训练更大的数据集,而不会导致内存使用量激增。

使用 notebook_launcher

之前提到可以直接从Jupyter Notebook中启动分布式代码。这是通过 Accelerate 的 notebook_launcher 工具实现的,它允许在Jupyter Notebook中基于代码启动多GPU训练。

使用它非常简单,只需导入启动器:

from accelerate import notebook_launcher

然后传入之前声明的训练函数、要传递的任何参数以及要使用的进程数量(例如在TPU上使用8个进程,或者在两个GPU上使用2个进程)。可以运行上述两个训练函数,但请注意,在启动一个单独的实例后,需要重新启动该实例才能生成另一个实例。

notebook_launcher(train_ddp, args=(), num_processes=2)

或者:

notebook_launcher(train_accelerate_ddp, args=(), num_processes=2)

使用 🤗 Trainer

最后,我们来到了最高级别的API — Hugging Face Trainer。

这个包装了尽可能多的训练,同时还能在分布式系统上进行训练,而用户无需做任何操作。

首先,我们需要导入Trainer:

from transformers import Trainer

然后我们定义一些TrainingArguments来控制所有常见的超参数。Trainer也能处理字典,所以需要创建一个自定义的collate函数。

最后,我们继承Trainer并编写自己的compute_loss

之后,这段代码也可以在分布式环境下运行,而无需编写任何训练代码!

from transformers import Trainer, TrainingArguments

model = BasicNet()

training_args = TrainingArguments(
    "basic-trainer",
    per_device_train_batch_size=64,
    per_device_eval_batch_size=64,
    num_train_epochs=1,
    evaluation_strategy="epoch",
    remove_unused_columns=False
)

def collate_fn(examples):
    pixel_values = torch.stack([example[0] for example in examples])
    labels = torch.tensor([example[1] for example in examples])
    return {"x":pixel_values, "labels":labels}

class MyTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        outputs = model(inputs["x"])
        target = inputs["labels"]
        loss = F.nll_loss(outputs, target)
        return (loss, outputs) if return_outputs else loss

trainer = MyTrainer(
    model,
    training_args,
    train_dataset=train_dset,
    eval_dataset=test_dset,
    data_collator=collate_fn,
)

trainer.train()

    ***** 运行训练 *****
      样本数量 = 60000
      训练轮数 = 1
      每个设备的批量训练大小 = 64
      总的训练批量大小(并行、分布式和累积) = 64
      梯度累积步数 = 1
      总的优化步数 = 938

与上面使用notebook_launcher的示例类似,可以通过将所有内容放入一个训练函数中来实现:

def train_trainer_ddp():
    model = BasicNet()

    training_args = TrainingArguments(
        "basic-trainer",
        per_device_train_batch_size=64,
        per_device_eval_batch_size=64,
        num_train_epochs=1,
        evaluation_strategy="epoch",
        remove_unused_columns=False
    )

    def collate_fn(examples):
        pixel_values = torch.stack([example[0] for example in examples])
        labels = torch.tensor([example[1] for example in examples])
        return {"x":pixel_values, "labels":labels}

    class MyTrainer(Trainer):
        def compute_loss(self, model, inputs, return_outputs=False):
            outputs = model(inputs["x"])
            target = inputs["labels"]
            loss = F.nll_loss(outputs, target)
            return (loss, outputs) if return_outputs else loss

    trainer = MyTrainer(
        model,
        training_args,
        train_dataset=train_dset,
        eval_dataset=test_dset,
        data_collator=collate_fn,
    )

    trainer.train()

notebook_launcher(train_trainer_ddp, args=(), num_processes=2)

资源

要了解有关PyTorch分布式数据并行性的更多信息,请查看此处的文档

要了解更多关于🤗加速的信息,请查看此处的文档

要了解更多关于🤗变压器的信息,请查看此处的文档