在PyTorch中进行多GPU训练和作为替代方法的梯度累积

PyTorch多GPU训练与梯度累积替代方法

代码和理论

https://unsplash.com/photos/vBzJ0UFOA70

在本文中,我们首先会看到数据并行主义(DP)和分布式数据并行主义(DDP)算法之间的区别,然后我们将解释梯度积累(GA)是什么,最后展示在PyTorch中如何实现DDP和GA以及它们如何导致相同的结果。

介绍

当训练深度神经网络(DNN)时,一个重要的超参数是批量大小。通常,批量大小不应该太大,因为网络会倾向于过拟合,但也不应该太小,因为会导致收敛速度慢。当使用高分辨率图像或占用大量内存的其他类型数据时,假设今天大多数大型DNN模型的训练都是在GPU上进行的,适应小批量大小可能会有问题,这取决于可用GPU的内存。因为如我们所说,小批量大小导致收敛速度慢,所以我们可以使用三种主要方法来增加有效批量大小:

  1. 使用多个小型GPU并行运行模型在小批量上 – DP或DDP算法
  2. 使用更大的GPU(昂贵)
  3. 在多个步骤中累积梯度

现在让我们更详细地看看1和3——如果你有一个大型GPU可以容纳你所需的所有数据,你可以阅读DDP部分,并查看在完整代码部分中PyTorch中如何实现它,跳过其余部分。

假设我们希望获得30的有效批量大小,但每个GPU只能容纳10个数据点(小批量大小)。我们有两个选择:数据并行主义或分布式数据并行主义:

数据并行主义(DP)

首先,我们定义主GPU。然后,我们执行以下步骤:

  1. 将10个数据点(小批量)和模型的副本从主GPU移动到其他2个GPU上
  2. 在每个GPU上进行前向传播,并将输出传递给主GPU
  3. 在主GPU上计算总损失,然后将损失发送回每个GPU以计算参数的梯度
  4. 将梯度发送回(这些梯度是所有训练示例的平均梯度)主GPU,将它们相加以获得整个批次的平均梯度
  5. 在主GPU上更新参数,并将这些更新发送到其他2个GPU以进行下一次迭代

这个过程存在一些问题和低效之处:

  • 在将数据分割到其他GPU之前,数据会从主GPU传递出去。此外,主GPU的利用率比其他GPU更高,因为总损失的计算和参数更新是在主GPU上进行的
  • 我们需要在每次迭代中同步其他GPU上的模型,这可能会减慢训练速度

分布式数据并行(DDP)

分布式数据并行是为了改善数据并行算法的低效性而引入的。我们仍然具有与之前相同的设置 – 每个批次有30个数据点,使用3个GPU。不同之处在于:

  1. 它不再有主GPU
  2. 因为我们不再有主GPU,我们直接从磁盘/内存并行加载数据到每个GPU上,而不会重叠 – DistributedSampler为我们执行此操作。在底层,它使用本地排名(GPU id)将数据分布到各个GPU上 – 给定30个数据点,第一个GPU将使用点[0、3、6、..、27],第二个GPU将使用点[1、4、7、..、28],第三个GPU将使用点[2、5、8、..、29]
n_gpu = 3for i in range(n_gpu):  print(np.arange(30)[i:30:n_gpu])

3. 前向传播、损失计算和反向传播在每个GPU上独立执行,梯度异步归约计算平均值,然后在所有GPU上进行更新

由于DDP具有优于DP的优势,现在更倾向于使用DDP,因此我们只会展示DDP的实现。

梯度累积

如果我们只有一个GPU但仍然想使用更大的批量大小,另一种选择是将梯度累积一定数量的步骤,有效地累积一定数量的小批量梯度,从而增加有效批量大小。从上面的例子中,我们可以累积10个数据点的梯度进行3次迭代,以实现与DDP训练相同的结果,有效批量大小为30。

DDP过程 代码

下面我将只介绍实现DDP与1个GPU代码相比的差异。完整的代码可以在本文的后面找到。首先,我们初始化进程组,允许不同的进程之间进行通信。使用int(os.environ[“LOCAL_RANK”])我们可以获取给定进程中使用的GPU。

init_process_group(backend="nccl")device = int(os.environ["LOCAL_RANK"])torch.cuda.set_device(device)

然后,我们需要将模型包装在DistributedDataParallel中,以实现多GPU训练。

model = NeuralNetwork(args.data_size) model = model.to(device)  if args.distributed:  model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[device])

最后一部分是定义在DDP部分提到的DistributedSampler。

sampler = torch.utils.data.DistributedSampler(dataset)

训练的其余部分保持不变 – 我将在本文的最后包含完整的代码。

梯度累积代码

当反向传播发生时,在调用loss.backward()之后,梯度被存储在各自的张量中。当调用optimizer.step()时,实际的更新发生,并且将梯度存储在张量中的值设置为零,以便运行下一次反向传播和参数更新的迭代。因此,为了累积梯度,我们调用loss.backward(),累积所需的梯度累积次数,而不将梯度设置为零,以便它们在多次迭代中累积,并且我们对它们进行平均以获取累积梯度的平均梯度(loss = loss/ACC_STEPS)。之后,我们调用optimizer.step()并将梯度设置为零,以开始下一次梯度累积。代码如下:

ACC_STEPS = dist.get_world_size() # == number of GPUs# iterate through the datafor i, (idxs, row) in enumerate(loader):  loss = model(row)    # scale loss according to accumulation steps  loss = loss/ACC_STEPS  loss.backward()  # keep accumualting gradients for ACC_STEPS  if ((i + 1) % ACC_STEPS == 0):    optimizer.step()      optimizer.zero_grad()

完整代码

import osos.environ["CUDA_VISIBLE_DEVICES"] = "0,1"print(os.environ["CUDA_VISIBLE_DEVICES"])import torchimport torch.nn as nnfrom torch.utils.data import DataLoader, Dataset, Samplerimport argparseimport torch.optim as optim import numpy as npimport randomimport torch.backends.cudnn as cudnnimport torch.nn.functional as Ffrom torch.distributed import init_process_groupimport torch.distributed as distclass data_set(Dataset):        def __init__(self, df):        self.df = df            def __len__(self):        return len(self.df)        def __getitem__(self, index):                    sample = self.df[index]        return index, sample    class NeuralNetwork(nn.Module):    def __init__(self, dsize):        super().__init__()        self.linear =  nn.Linear(dsize, 1, bias=False)        self.linear.weight.data[:] = 1.    def forward(self, x):        x = self.linear(x)        loss = x.sum()        return loss                class DummySampler(Sampler):    def __init__(self, data, batch_size, n_gpus=2):        self.num_samples = len(data)        self.b_size = batch_size        self.n_gpus = n_gpus    def __iter__(self):        ids = []        for i in range(0, self.num_samples, self.b_size * self.n_gpus):            ids.append(np.arange(self.num_samples)[i: i + self.b_size*self.n_gpus :self.n_gpus])            ids.append(np.arange(self.num_samples)[i+1: (i+1) + self.b_size*self.n_gpus :self.n_gpus])        return iter(np.concatenate(ids))    def __len__(self):        # print ('\tcalling Sampler:__len__')        return self.num_samples            def main(args=None):        d_size = args.data_size    if args.distributed:        init_process_group(backend="nccl")        device = int(os.environ["LOCAL_RANK"])        torch.cuda.set_device(device)    else:        device = "cuda:0"    # fix the seed for reproducibility    seed = args.seed                    torch.manual_seed(seed)    np.random.seed(seed)    random.seed(seed)    cudnn.benchmark = True        # generate data    data = torch.rand(d_size, d_size)        model = NeuralNetwork(args.data_size)        model = model.to(device)          if args.distributed:        model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[device])            optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)    dataset = data_set(data)    if args.distributed:        sampler = torch.utils.data.DistributedSampler(dataset, shuffle=False)    else:        # we define `DummySampler` for exact reproducibility with `DistributedSampler`        # which splits the data as described in the article.         sampler = DummySampler(dataset, args.batch_size)            loader = DataLoader(                dataset,                batch_size=args.batch_size,                num_workers=0,                pin_memory=True,                sampler=sampler,                shuffle=False,                collate_fn=None,            )                  if not args.distributed:        grads = []        # ACC_STEPS same as GPU as we need to divide the loss by this number    # to obtain the same gradient as from multiple GPUs that are     # averaged together    ACC_STEPS = args.acc_steps     optimizer.zero_grad()        for epoch in range(args.epochs):                if args.distributed:            loader.sampler.set_epoch(epoch)                    for i, (idxs, row) in enumerate(loader):            if args.distributed:                optimizer.zero_grad()                        row = row.to(device, non_blocking=True)                         if args.distributed:                rank = dist.get_rank() == 0            else:                rank = True                        loss = model(row)                          if args.distributed:                # does average gradients automatically thanks to model wrapper into                 # `DistributedDataParallel`                loss.backward()            else:                # scale loss according to accumulation steps                loss = loss/ACC_STEPS                loss.backward()                        if i == 0 and rank:                print(f"Epoch {epoch} {100 * '='}")            if not args.distributed:                if (i + 1) % ACC_STEPS == 0: # only step when we have done ACC_STEPS                    # acumulate grads for entire epoch                    optimizer.step()                      optimizer.zero_grad()            else:                optimizer.step()                         if not args.distributed and args.verbose:            print(100 * "=")            print("Model weights : ", model.linear.weight)            print(100 * "=")        elif args.distributed and args.verbose and rank:            print(100 * "=")            print("Model weights : ", model.module.linear.weight)            print(100 * "=")    if __name__ == "__main__":            parser = argparse.ArgumentParser()    parser.add_argument('--distributed', action='store_true',)    parser.add_argument('--seed', default=0, type=int)     parser.add_argument('--epochs', default=2, type=int)     parser.add_argument('--batch_size', default=4, type=int)     parser.add_argument('--data_size', default=16, type=int)     parser.add_argument('--acc_steps', default=3, type=int)     parser.add_argument('--verbose', action='store_true',)        args = parser.parse_args()        print(args)    main(args)

现在,如果我们运行这两个脚本:

  • python3 ddp.py — epochs 2 — batch_size 4 — data_size 8 — verbose — acc_steps 2
  • torchrun — standalone — nproc_per_node=2 ddp.py — epochs 2 — distributed — batch_size 4 — data_size 8 — verbose

我们会发现我们获得了完全相同的最终模型参数:

# 从梯度累加器获得的模型权重:Parameter containing:tensor([[0.9472, 0.9440, 0.9527, 0.9687, 0.9570, 0.9343, 0.9411, 0.9186]],       device='cuda:0', requires_grad=True)# 从DDP获得的模型权重:Parameter containing:tensor([[0.9472, 0.9440, 0.9527, 0.9687, 0.9570, 0.9343, 0.9411, 0.9186]],       device='cuda:0', requires_grad=True)

结论

在这篇文章中,我们简要介绍了DP、DDP算法和梯度累积,并展示了如何在没有多个GPU的情况下增加有效批处理大小。一个重要的事情需要注意的是,即使我们获得了相同的最终结果,使用多个GPU进行训练比使用梯度累积要快得多,因此如果训练速度很重要,那么使用多个GPU是加速训练的唯一方法。