《分布式数据并行(DDP)的综合指南》

《分布式数据并行(DDP)的综合指南》

使用分布式数据并行(DDP)加速训练模型的全面指南

作者发布的图片

介绍

大家好!我是Francois,是Meta的研究科学家。欢迎来到本教程,它是系列教程Awesome AI Tutorials的一部分。

在本教程中,我们将解密一种名为DDP的众所周知的技术,用于同时在多个GPU上训练模型。

在我在工程学校的日子里,我还记得利用Google Colab的GPU进行训练。然而,在公司领域,情况有所不同。如果你所在的组织在人工智能方面投入了大量资源,尤其是如果你在一家科技巨头公司工作,那么你可能拥有丰富的GPU集群资源。

本次讲座旨在为您提供利用多个GPU的知识,实现快速高效的训练。你知道吗?它比你想象的要简单!在我们继续之前,我建议你对PyTorch有很好的掌握,包括它的核心组件,如数据集、数据加载器、优化器、CUDA和训练循环。

最初,我认为DDP是一种复杂、几乎无法实现的工具,认为它需要一个庞大的团队来设置必要的基础设施。然而,我向你保证,DDP既直观又简洁,只需几行代码即可实现。让我们一起踏上这个启发性的旅程吧!

DDP的高层直觉

分布式数据并行(DDP)是一个简单的概念,一旦我们将其分解开来。想象一下,你有一个由4个GPU组成的集群可供使用。通过DDP,相同的模型和优化器被加载到每个GPU上。主要区别在于我们如何分布数据。

DDP, 图片来自于PyTorch教程

如果你熟悉深度学习,你会回忆起DataLoader,这是一个将数据集分成不同批次的工具。通常将整个数据集分成这些批次,每个批次的计算完成后更新模型。

进一步放大,DDP通过将每个批次划分为我们可以称之为“子批次”的方式来优化这个过程。实际上,每个模型副本处理主批次的一部分,从而为每个GPU产生独特的梯度计算。

在DDP中,我们通过一个称为DistributedSampler的工具将这个批次分成子批次,如下图所示:

DDP, 图片来自于PyTorch教程

将每个子批次分发给各个GPU后,每个GPU计算其独特的梯度。

DDP, 图片来自于PyTorch教程
  • 现在是DDP的魔术时刻。在更新模型参数之前,需要聚合每个GPU上计算的梯度,使每个GPU都有整个数据批次上计算得到的平均梯度。
  • 这是通过取所有GPU的梯度并对其进行平均来实现的。例如,如果你有4个GPU,特定模型参数的平均梯度是该参数在每个GPU上的梯度之和除以4。
  • DDP使用NCCL或Gloo后端(NCCL针对NVIDIA GPU进行了优化,Gloo更通用)来高效地在GPU之间通信和平均梯度。
DDP,图像来自PyTorch教程

词汇表:节点和排名

在深入代码之前,了解我们将经常使用的词汇是至关重要的。让我们解密这些术语:

  • 节点:将节点视为配备多个GPU的强大机器。当我们谈论一个集群时,它不仅仅是一堆GPU拼接在一起。相反,它们被组织成组或“节点”。例如,一个节点可能有8个GPU。
  • 主节点:在多节点环境中,通常有一个节点负责主要任务。这个“主节点”处理诸如同步、启动模型副本、监督模型装载和管理日志条目等任务。没有主节点,每个GPU都会独立生成日志,导致混乱。
  • 本地排名:术语“排名”可以被视为ID或位置。本地排名指的是GPU在其特定节点(或机器)内的位置或ID。它是“本地”的,因为它局限于那个特定的机器。
  • 全局排名:从更广泛的视角看,全局排名标识了所有可用节点中的GPU。它是一个独一无二的标识符,不受机器影响。
  • 世界规模:在本质上,这是您所有节点上可用的GPU的计数。简单来说,它是节点数乘以每个节点中GPU数量的乘积。

为了让事情更具体,如果您只使用一台机器,那么本地排名等同于全局排名。

使用图像来进一步说明:

本地排名,来自教程的图像
本地排名,来自教程的图像

了解DDP的局限性:

分布式数据并行(DDP)在许多深度学习工作流中具有转变性,但了解其边界非常重要。

DDP的局限性主要在于内存消耗。使用DDP时,每个GPU加载模型、优化器以及它们对应的数据批次的副本。 GPU的内存通常从几GB到高端GPU的80GB不等。

对于较小的模型,这不是一个问题。然而,当涉足大型语言模型(LLM)或类似GPT的架构时,单个GPU的内存限制可能不足。

在计算机视觉中,虽然有大量轻量级模型,但在涉及增加批次大小的情况下,特别是涉及3D图像或目标检测任务的情况下,会出现挑战。

于是进入了全分片数据并行(FSDP)方法。此方法不仅分布数据,还将模型和优化器状态分散到GPU内存中。虽然听起来很有优势,但FSDP会增加GPU间的通信,可能会减慢训练速度。

总结:

  • 如果您的模型及其相应的批次可以轻松适应单个GPU的内存,DDP是您的最佳选择,因为它速度快。
  • 对于需要更多内存的巨型模型,FSDP是更合适的选择。但是,请记住它的权衡:在性能方面牺牲了速度以换取内存。

为什么你应该优先选择DDP而不是DP?

如果你访问PyTorch的网站,你会发现有两个选项:DP和DDP。但我只提到这一点是为了避免你迷失或困惑:只需使用DDP,它更快且不限于单个节点。

Comparison from Pytorch tutorial

代码演示:

实现分布式深度学习比你想象的要简单。其美妙之处在于,你不会受到手动GPU配置或梯度分布复杂性的困扰。

你会在以下位置找到所有的模板和脚本:

GitHub – FrancoisPorcher/awesome-ai-tutorials: 最好的AI教程集合以让你成为BOSS数据科学家!

最好的AI教程集合以让你成为BOSS数据科学家!- GitHub …

github.com

下面是我们将采取的步骤简述:

  1. 过程初始化:这涉及到指定主节点、指定端口和设置world_size
  2. 分布式数据加载器设置:这一步的关键是将每个批次在可用的GPU上进行划分。我们将确保数据均匀分布且没有重叠。
  3. 模型训练/测试:实质上,这一步与单个GPU的过程基本相同。

在1个GPU 1个节点上训练(基准)

首先,让我们定义一个基本代码,它加载数据集,创建模型,并在单个GPU上端到端地进行训练。这将是我们的起点:

import torchimport torch.nn.functional as Ffrom torch.utils.data import Dataset, DataLoaderfrom sklearn.datasets import load_winefrom sklearn.model_selection import train_test_splitfrom sklearn.preprocessing import StandardScalerimport numpy as npclass WineDataset(Dataset):    def __init__(self, data, targets):        self.data = data        self.targets = targets    def __len__(self):        return len(self.data)    def __getitem__(self, idx):        return torch.tensor(self.data[idx], dtype=torch.float), torch.tensor(self.targets[idx], dtype=torch.long)class SimpleNN(torch.nn.Module):    def __init__(self):        super(SimpleNN, self).__init__()        self.fc1 = torch.nn.Linear(13, 64)        self.fc2 = torch.nn.Linear(64, 3)    def forward(self, x):        x = F.relu(self.fc1(x))        x = self.fc2(x)        return xclass Trainer():    def __init__(self, model, train_data, optimizer, gpu_id, save_every):        self.model = model        self.train_data = train_data        self.optimizer = optimizer        self.gpu_id = gpu_id        self.save_every = save_every        self.losses = []    def _run_batch(self, source, targets):        self.optimizer.zero_grad()        output = self.model(source)        loss = F.cross_entropy(output, targets)        loss.backward()        self.optimizer.step()        return loss.item()    def _run_epoch(self, epoch):        total_loss = 0.0        num_batches = len(self.train_data)        for source, targets in self.train_data:            source = source.to(self.gpu_id)            targets = targets.to(self.gpu_id)            loss = self._run_batch(source, targets)            total_loss += loss        avg_loss = total_loss / num_batches        self.losses.append(avg_loss)        print(f"Epoch {epoch}, Loss: {avg_loss:.4f}")    def _save_checkpoint(self, epoch):        checkpoint = self.model.state_dict()        PATH = f"model_{epoch}.pt"        torch.save(checkpoint, PATH)        print(f"Epoch {epoch} | Model saved to {PATH}")    def train(self, max_epochs):        self.model.train()        for epoch in range(max_epochs):            self._run_epoch(epoch)            if epoch % self.save_every == 0:                self._save_checkpoint(epoch)def load_train_objs():    wine_data = load_wine()    X = wine_data.data    y = wine_data.target    # Normalize and split    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)    scaler = StandardScaler().fit(X_train)    X_train = scaler.transform(X_train)    X_test = scaler.transform(X_test)    train_set = WineDataset(X_train, y_train)    test_set = WineDataset(X_test, y_test)    print("Sample from dataset:")    sample_data, sample_target = train_set[0]    print(f"Data: {sample_data}")    print(f"Target: {sample_target}")    model = SimpleNN()    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)    return train_set, model, optimizerdef prepare_dataloader(dataset, batch_size):    return DataLoader(dataset, batch_size=batch_size, pin_memory=True, shuffle=True)def main(device, total_epochs, save_every, batch_size):    dataset, model, optimizer = load_train_objs()    train_data = prepare_dataloader(dataset, batch_size)    trainer = Trainer(model, train_data, optimizer, device, save_every)    trainer.train(total_epochs)main(device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu"), total_epochs=100, save_every=50, batch_size=32)

使用多个GPU进行训练,1个节点

现在我们将在单个节点中使用所有的GPU,具体步骤如下:

  1. 导入所需的库以进行分布式训练。
  2. 初始化分布环境(特别是MASTER_ADDRMASTER_PORT)。
  3. 使用DistributedDataParallel包装器将模型转换为DDP。
  4. 使用分布式采样器确保数据集在分布式GPU上进行划分。
  5. 调整主函数以spawn多个进程进行多GPU训练。

对于这些库,我们需要以下代码:

import torch.multiprocessing as mpfrom torch.utils.data.distributed import DistributedSamplerfrom torch.nn.parallel import DistributedDataParallel as DDPfrom torch.distributed import init_process_group, destroy_process_groupimport os

然后我们需要设置每个进程。例如,如果我们在1个节点上有8个GPU,我们将针对每个GPU调用以下函数8次,并使用正确的local_rank

def ddp_setup(rank, world_size):    """    设置分布式环境。        参数:        rank:当前进程的排名。分布式训练中每个进程的唯一标识符。        world_size:参与分布式训练的总进程数。    """        # 主节点的地址。由于我们是单节点训练,设置为localhost。    os.environ["MASTER_ADDR"] = "localhost"        # 主节点用于监听来自工作进程或其他进程的通信的端口。    os.environ["MASTER_PORT"] = "12355"        # 初始化进程组。     # 'backend'指定要使用的通信后端,“nccl”用于优化GPU训练。    init_process_group(backend="nccl", rank=rank, world_size=world_size)        # 将当前CUDA设备设置为指定的设备(由排名标识)。    # 这样确保每个进程在多GPU设置中使用不同的GPU。    torch.cuda.set_device(rank)

关于这个函数的一些解释:

  • MASTER_ADDR是主机的主机名,即运行主节点(或0号进程)的机器。此处为localhost。
  • MASTER_PORT:指定主节点用于监听来自工作进程或其他进程的连接的端口。12355是任意选择的。只要系统中没有其他服务在使用此端口号,并且防火墙规则允许,可以选择任何未使用的端口号。
  • torch.cuda.set_device(rank):这确保每个进程使用其对应的GPU。

然后我们需要稍微更改Trainer类。我们只需使用DDP函数包装模型:

class Trainer():    def __init__(self, model, train_data, optimizer, gpu_id, save_every):        self.model = model.to(gpu_id)        self.train_data = train_data        self.optimizer = optimizer        self.gpu_id = gpu_id        self.save_every = save_every        self.losses = []                # 这里有改动        self.model = DDP(self.model, device_ids=[gpu_id])

Trainer类的其余部分保持不变,太棒了!

现在我们需要更改数据加载器,因为请记住,我们必须在每个GPU上拆分批次:

def prepare_dataloader(dataset: Dataset, batch_size: int):    return DataLoader(        dataset,        batch_size=batch_size,        pin_memory=True,        shuffle=False,        sampler=DistributedSampler(dataset)    )

现在我们可以修改main函数,该函数将为每个进程调用(在我们的情况下为8次):

def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):    """    分布式数据并行(DDP)设置的主要训练函数。        参数:        rank(int):当前进程的排名(0 <= rank < world_size)。每个进程被分配一个唯一的排名。        world_size(int):参与分布式训练的进程总数。        save_every(int):模型检查点保存的频率,以epoch为单位。        total_epochs(int):训练的总epoch数。        batch_size(int):每次迭代中处理的样本数(前向和后向传递)。    """        # 设置分布式环境,包括设置主地址、端口和后端。    ddp_setup(rank, world_size)        # 加载所需的训练对象 - 数据集、模型和优化器。    dataset, model, optimizer = load_train_objs()        # 为分布式训练准备数据加载器。它将数据集划分到各个进程,并处理洗牌。    train_data = prepare_dataloader(dataset, batch_size)        # 使用加载的模型、数据和其他配置初始化训练器实例。    trainer = Trainer(model, train_data, optimizer, rank, save_every)        # 训练模型指定的epoch数。    trainer.train(total_epochs)        # 训练完成后清理分布式环境。    destroy_process_group()

最后,执行脚本时,我们将必须启动8个进程。使用mp.spawn()函数来完成:

if __name__ == "__main__":    import argparse    parser = argparse.ArgumentParser(description='简单的分布式训练任务')    parser.add_argument('total_epochs', type=int, help='模型的总训练轮数')    parser.add_argument('save_every', type=int, help='多久保存一次快照')    parser.add_argument('--batch_size', default=32, type=int, help='每个设备上的输入批量大小(默认为32)')    args = parser.parse_args()        world_size = torch.cuda.device_count()    mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)

终极步骤:在多个节点上进行训练

如果你到达这一步,恭喜!终极步骤是能够在不同节点上招募所有可用的GPU。但如果你理解我们到目前为止所做的,这非常简单。

在跨多个节点扩展时的关键区别是从local_rank切换到global_rank。这是必要的,因为每个进程都需要一个唯一的标识符。例如,如果你使用两个节点,每个节点有8个GPU,那么进程0和9都会有一个local_rank为0。

global_rank由以下直观的公式给出:

global_rank = node_rank * world_size_per_node + local_rank

所以我们首先修改ddp_setup函数:

def ddp_setup(local_rank, world_size_per_node, node_rank):    os.environ["MASTER_ADDR"] = "主节点IP"  # <-- 使用你的主节点IP替换    os.environ["MASTER_PORT"] = "12355"      global_rank = node_rank * world_size_per_node + local_rank    init_process_group(backend="nccl", rank=global_rank, world_size=world_size_per_node*torch.cuda.device_count())    torch.cuda.set_device(local_rank)

我们还必须调整主函数,现在它接受wold_size_per_node作为参数:

def main(local_rank: int, world_size_per_node: int, save_every: int, total_epochs: int, batch_size: int, node_rank: int):    ddp_setup(local_rank, world_size_per_node, node_rank)    # ...(主函数剩余部分)

最后,我们还要调整带有world_size_per_nodemp.spawn()函数:

if __name__ == "__main__":    import argparse    parser = argparse.ArgumentParser(description='简单的分布式训练任务')    parser.add_argument('total_epochs', type=int, help='模型的总训练轮数')    parser.add_argument('save_every', type=int, help='多久保存一次快照')    parser.add_argument('--batch_size', default=32, type=int, help='每个设备上的输入批量大小(默认为32)')    parser.add_argument('--node_rank', default=0, type=int, help='多节点训练中节点的排名')    args = parser.parse_args()    world_size_per_node = torch.cuda.device_count()    mp.spawn(main, args=(world_size_per_node, args.save_every, args.total_epochs, args.batch_size, args.node_rank), nprocs=world_size_per_node)

使用集群(SLURM)

现在你可以将训练发送到集群了。非常简单,你只需要调用所需的节点数。

这是一个SLURM脚本的模板:

#!/bin/bash#SBATCH --job-name=DDPTraining       # 作业名称#SBATCH --nodes=$1                   # 用户指定的节点数#SBATCH --ntasks-per-node=1          # 确保每个节点只运行一个任务#SBATCH --cpus-per-task=1            # 每个任务的CPU核心数#SBATCH --gres=gpu:1                 # 每个节点的GPU数#SBATCH --time=01:00:00              # 时间限制小时:分钟:秒(此示例中为1小时)#SBATCH --mem=4GB                    # 每个GPU的内存限制#SBATCH --output=training_%j.log     # 输出和错误日志名称(%j 扩展为作业ID)#SBATCH --partition=gpu              # 指定分区或队列run python3 your_python_script.py --total_epochs 10 --save_every 2 --batch_size 32 --node_rank $SLURM_NODEID

现在,您可以通过以下命令从终端启动训练:

sbatch train_net.sh 2  # 使用2个节点

恭喜,您成功了!

感谢阅读!在您离开之前:

要获取更多精彩的教程,请查看我的AI教程合集在Github上

GitHub – FrancoisPorcher/awesome-ai-tutorials: AI教程的最佳合集,让您成为…

AI教程的最佳合集,让您成为数据科学达人! – GitHub …

github.com

您应该在收件箱中收到我的文章。 在此处订阅。

如果您想要访问VoAGI的高级文章,您只需要支付每月5美元的会员费用。如果您使用 我的链接注册,您将在不增加额外费用的情况下支持我一部分费用。

如果您觉得本文有启发性和益处,请考虑关注我并为更深入的内容点赞!您的支持帮助我继续制作有助于我们共同理解的内容。

参考资料