《分布式数据并行(DDP)的综合指南》
《分布式数据并行(DDP)的综合指南》
使用分布式数据并行(DDP)加速训练模型的全面指南
介绍
大家好!我是Francois,是Meta的研究科学家。欢迎来到本教程,它是系列教程Awesome AI Tutorials的一部分。
在本教程中,我们将解密一种名为DDP的众所周知的技术,用于同时在多个GPU上训练模型。
在我在工程学校的日子里,我还记得利用Google Colab的GPU进行训练。然而,在公司领域,情况有所不同。如果你所在的组织在人工智能方面投入了大量资源,尤其是如果你在一家科技巨头公司工作,那么你可能拥有丰富的GPU集群资源。
本次讲座旨在为您提供利用多个GPU的知识,实现快速高效的训练。你知道吗?它比你想象的要简单!在我们继续之前,我建议你对PyTorch有很好的掌握,包括它的核心组件,如数据集、数据加载器、优化器、CUDA和训练循环。
最初,我认为DDP是一种复杂、几乎无法实现的工具,认为它需要一个庞大的团队来设置必要的基础设施。然而,我向你保证,DDP既直观又简洁,只需几行代码即可实现。让我们一起踏上这个启发性的旅程吧!
DDP的高层直觉
分布式数据并行(DDP)是一个简单的概念,一旦我们将其分解开来。想象一下,你有一个由4个GPU组成的集群可供使用。通过DDP,相同的模型和优化器被加载到每个GPU上。主要区别在于我们如何分布数据。
如果你熟悉深度学习,你会回忆起DataLoader,这是一个将数据集分成不同批次的工具。通常将整个数据集分成这些批次,每个批次的计算完成后更新模型。
进一步放大,DDP通过将每个批次划分为我们可以称之为“子批次”的方式来优化这个过程。实际上,每个模型副本处理主批次的一部分,从而为每个GPU产生独特的梯度计算。
在DDP中,我们通过一个称为DistributedSampler的工具将这个批次分成子批次,如下图所示:
将每个子批次分发给各个GPU后,每个GPU计算其独特的梯度。
- 现在是DDP的魔术时刻。在更新模型参数之前,需要聚合每个GPU上计算的梯度,使每个GPU都有整个数据批次上计算得到的平均梯度。
- 这是通过取所有GPU的梯度并对其进行平均来实现的。例如,如果你有4个GPU,特定模型参数的平均梯度是该参数在每个GPU上的梯度之和除以4。
- DDP使用NCCL或Gloo后端(NCCL针对NVIDIA GPU进行了优化,Gloo更通用)来高效地在GPU之间通信和平均梯度。
词汇表:节点和排名
在深入代码之前,了解我们将经常使用的词汇是至关重要的。让我们解密这些术语:
节点
:将节点视为配备多个GPU的强大机器。当我们谈论一个集群时,它不仅仅是一堆GPU拼接在一起。相反,它们被组织成组或“节点”。例如,一个节点可能有8个GPU。主节点
:在多节点环境中,通常有一个节点负责主要任务。这个“主节点”处理诸如同步、启动模型副本、监督模型装载和管理日志条目等任务。没有主节点,每个GPU都会独立生成日志,导致混乱。本地排名:
术语“排名”可以被视为ID或位置。本地排名指的是GPU在其特定节点(或机器)内的位置或ID。它是“本地”的,因为它局限于那个特定的机器。全局排名:
从更广泛的视角看,全局排名标识了所有可用节点中的GPU。它是一个独一无二的标识符,不受机器影响。世界规模:
在本质上,这是您所有节点上可用的GPU的计数。简单来说,它是节点数乘以每个节点中GPU数量的乘积。
为了让事情更具体,如果您只使用一台机器,那么本地排名等同于全局排名。
使用图像来进一步说明:
了解DDP的局限性:
分布式数据并行(DDP)在许多深度学习工作流中具有转变性,但了解其边界非常重要。
DDP的局限性主要在于内存消耗。使用DDP时,每个GPU加载模型、优化器以及它们对应的数据批次的副本。 GPU的内存通常从几GB到高端GPU的80GB不等。
对于较小的模型,这不是一个问题。然而,当涉足大型语言模型(LLM)或类似GPT的架构时,单个GPU的内存限制可能不足。
在计算机视觉中,虽然有大量轻量级模型,但在涉及增加批次大小的情况下,特别是涉及3D图像或目标检测任务的情况下,会出现挑战。
于是进入了全分片数据并行(FSDP)方法。此方法不仅分布数据,还将模型和优化器状态分散到GPU内存中。虽然听起来很有优势,但FSDP会增加GPU间的通信,可能会减慢训练速度。
总结:
- 如果您的模型及其相应的批次可以轻松适应单个GPU的内存,DDP是您的最佳选择,因为它速度快。
- 对于需要更多内存的巨型模型,FSDP是更合适的选择。但是,请记住它的权衡:在性能方面牺牲了速度以换取内存。
为什么你应该优先选择DDP而不是DP?
如果你访问PyTorch的网站,你会发现有两个选项:DP和DDP。但我只提到这一点是为了避免你迷失或困惑:只需使用DDP,它更快且不限于单个节点。
代码演示:
实现分布式深度学习比你想象的要简单。其美妙之处在于,你不会受到手动GPU配置或梯度分布复杂性的困扰。
你会在以下位置找到所有的模板和脚本:
GitHub – FrancoisPorcher/awesome-ai-tutorials: 最好的AI教程集合以让你成为BOSS数据科学家!
最好的AI教程集合以让你成为BOSS数据科学家!- GitHub …
github.com
下面是我们将采取的步骤简述:
- 过程初始化:这涉及到指定主节点、指定端口和设置
world_size
。 - 分布式数据加载器设置:这一步的关键是将每个批次在可用的GPU上进行划分。我们将确保数据均匀分布且没有重叠。
- 模型训练/测试:实质上,这一步与单个GPU的过程基本相同。
在1个GPU 1个节点上训练(基准)
首先,让我们定义一个基本代码,它加载数据集,创建模型,并在单个GPU上端到端地进行训练。这将是我们的起点:
import torchimport torch.nn.functional as Ffrom torch.utils.data import Dataset, DataLoaderfrom sklearn.datasets import load_winefrom sklearn.model_selection import train_test_splitfrom sklearn.preprocessing import StandardScalerimport numpy as npclass WineDataset(Dataset): def __init__(self, data, targets): self.data = data self.targets = targets def __len__(self): return len(self.data) def __getitem__(self, idx): return torch.tensor(self.data[idx], dtype=torch.float), torch.tensor(self.targets[idx], dtype=torch.long)class SimpleNN(torch.nn.Module): def __init__(self): super(SimpleNN, self).__init__() self.fc1 = torch.nn.Linear(13, 64) self.fc2 = torch.nn.Linear(64, 3) def forward(self, x): x = F.relu(self.fc1(x)) x = self.fc2(x) return xclass Trainer(): def __init__(self, model, train_data, optimizer, gpu_id, save_every): self.model = model self.train_data = train_data self.optimizer = optimizer self.gpu_id = gpu_id self.save_every = save_every self.losses = [] def _run_batch(self, source, targets): self.optimizer.zero_grad() output = self.model(source) loss = F.cross_entropy(output, targets) loss.backward() self.optimizer.step() return loss.item() def _run_epoch(self, epoch): total_loss = 0.0 num_batches = len(self.train_data) for source, targets in self.train_data: source = source.to(self.gpu_id) targets = targets.to(self.gpu_id) loss = self._run_batch(source, targets) total_loss += loss avg_loss = total_loss / num_batches self.losses.append(avg_loss) print(f"Epoch {epoch}, Loss: {avg_loss:.4f}") def _save_checkpoint(self, epoch): checkpoint = self.model.state_dict() PATH = f"model_{epoch}.pt" torch.save(checkpoint, PATH) print(f"Epoch {epoch} | Model saved to {PATH}") def train(self, max_epochs): self.model.train() for epoch in range(max_epochs): self._run_epoch(epoch) if epoch % self.save_every == 0: self._save_checkpoint(epoch)def load_train_objs(): wine_data = load_wine() X = wine_data.data y = wine_data.target # Normalize and split X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) scaler = StandardScaler().fit(X_train) X_train = scaler.transform(X_train) X_test = scaler.transform(X_test) train_set = WineDataset(X_train, y_train) test_set = WineDataset(X_test, y_test) print("Sample from dataset:") sample_data, sample_target = train_set[0] print(f"Data: {sample_data}") print(f"Target: {sample_target}") model = SimpleNN() optimizer = torch.optim.Adam(model.parameters(), lr=0.001) return train_set, model, optimizerdef prepare_dataloader(dataset, batch_size): return DataLoader(dataset, batch_size=batch_size, pin_memory=True, shuffle=True)def main(device, total_epochs, save_every, batch_size): dataset, model, optimizer = load_train_objs() train_data = prepare_dataloader(dataset, batch_size) trainer = Trainer(model, train_data, optimizer, device, save_every) trainer.train(total_epochs)main(device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu"), total_epochs=100, save_every=50, batch_size=32)
使用多个GPU进行训练,1个节点
现在我们将在单个节点中使用所有的GPU,具体步骤如下:
- 导入所需的库以进行分布式训练。
- 初始化分布环境(特别是
MASTER_ADDR
和MASTER_PORT
)。 - 使用
DistributedDataParallel
包装器将模型转换为DDP。 - 使用分布式采样器确保数据集在分布式GPU上进行划分。
- 调整主函数以
spawn
多个进程进行多GPU训练。
对于这些库,我们需要以下代码:
import torch.multiprocessing as mpfrom torch.utils.data.distributed import DistributedSamplerfrom torch.nn.parallel import DistributedDataParallel as DDPfrom torch.distributed import init_process_group, destroy_process_groupimport os
然后我们需要设置每个进程。例如,如果我们在1个节点上有8个GPU,我们将针对每个GPU调用以下函数8次,并使用正确的local_rank
:
def ddp_setup(rank, world_size): """ 设置分布式环境。 参数: rank:当前进程的排名。分布式训练中每个进程的唯一标识符。 world_size:参与分布式训练的总进程数。 """ # 主节点的地址。由于我们是单节点训练,设置为localhost。 os.environ["MASTER_ADDR"] = "localhost" # 主节点用于监听来自工作进程或其他进程的通信的端口。 os.environ["MASTER_PORT"] = "12355" # 初始化进程组。 # 'backend'指定要使用的通信后端,“nccl”用于优化GPU训练。 init_process_group(backend="nccl", rank=rank, world_size=world_size) # 将当前CUDA设备设置为指定的设备(由排名标识)。 # 这样确保每个进程在多GPU设置中使用不同的GPU。 torch.cuda.set_device(rank)
关于这个函数的一些解释:
MASTER_ADDR
是主机的主机名,即运行主节点(或0号进程)的机器。此处为localhost。MASTER_PORT
:指定主节点用于监听来自工作进程或其他进程的连接的端口。12355是任意选择的。只要系统中没有其他服务在使用此端口号,并且防火墙规则允许,可以选择任何未使用的端口号。torch.cuda.set_device(rank)
:这确保每个进程使用其对应的GPU。
然后我们需要稍微更改Trainer类。我们只需使用DDP函数包装模型:
class Trainer(): def __init__(self, model, train_data, optimizer, gpu_id, save_every): self.model = model.to(gpu_id) self.train_data = train_data self.optimizer = optimizer self.gpu_id = gpu_id self.save_every = save_every self.losses = [] # 这里有改动 self.model = DDP(self.model, device_ids=[gpu_id])
Trainer类的其余部分保持不变,太棒了!
现在我们需要更改数据加载器,因为请记住,我们必须在每个GPU上拆分批次:
def prepare_dataloader(dataset: Dataset, batch_size: int): return DataLoader( dataset, batch_size=batch_size, pin_memory=True, shuffle=False, sampler=DistributedSampler(dataset) )
现在我们可以修改main
函数,该函数将为每个进程调用(在我们的情况下为8次):
def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int): """ 分布式数据并行(DDP)设置的主要训练函数。 参数: rank(int):当前进程的排名(0 <= rank < world_size)。每个进程被分配一个唯一的排名。 world_size(int):参与分布式训练的进程总数。 save_every(int):模型检查点保存的频率,以epoch为单位。 total_epochs(int):训练的总epoch数。 batch_size(int):每次迭代中处理的样本数(前向和后向传递)。 """ # 设置分布式环境,包括设置主地址、端口和后端。 ddp_setup(rank, world_size) # 加载所需的训练对象 - 数据集、模型和优化器。 dataset, model, optimizer = load_train_objs() # 为分布式训练准备数据加载器。它将数据集划分到各个进程,并处理洗牌。 train_data = prepare_dataloader(dataset, batch_size) # 使用加载的模型、数据和其他配置初始化训练器实例。 trainer = Trainer(model, train_data, optimizer, rank, save_every) # 训练模型指定的epoch数。 trainer.train(total_epochs) # 训练完成后清理分布式环境。 destroy_process_group()
最后,执行脚本时,我们将必须启动8个进程。使用mp.spawn()
函数来完成:
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='简单的分布式训练任务') parser.add_argument('total_epochs', type=int, help='模型的总训练轮数') parser.add_argument('save_every', type=int, help='多久保存一次快照') parser.add_argument('--batch_size', default=32, type=int, help='每个设备上的输入批量大小(默认为32)') args = parser.parse_args() world_size = torch.cuda.device_count() mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)
终极步骤:在多个节点上进行训练
如果你到达这一步,恭喜!终极步骤是能够在不同节点上招募所有可用的GPU。但如果你理解我们到目前为止所做的,这非常简单。
在跨多个节点扩展时的关键区别是从local_rank
切换到global_rank
。这是必要的,因为每个进程都需要一个唯一的标识符。例如,如果你使用两个节点,每个节点有8个GPU,那么进程0和9都会有一个local_rank
为0。
global_rank
由以下直观的公式给出:
global_rank = node_rank * world_size_per_node + local_rank
所以我们首先修改ddp_setup
函数:
def ddp_setup(local_rank, world_size_per_node, node_rank): os.environ["MASTER_ADDR"] = "主节点IP" # <-- 使用你的主节点IP替换 os.environ["MASTER_PORT"] = "12355" global_rank = node_rank * world_size_per_node + local_rank init_process_group(backend="nccl", rank=global_rank, world_size=world_size_per_node*torch.cuda.device_count()) torch.cuda.set_device(local_rank)
我们还必须调整主函数,现在它接受wold_size_per_node
作为参数:
def main(local_rank: int, world_size_per_node: int, save_every: int, total_epochs: int, batch_size: int, node_rank: int): ddp_setup(local_rank, world_size_per_node, node_rank) # ...(主函数剩余部分)
最后,我们还要调整带有world_size_per_node
的mp.spawn()
函数:
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='简单的分布式训练任务') parser.add_argument('total_epochs', type=int, help='模型的总训练轮数') parser.add_argument('save_every', type=int, help='多久保存一次快照') parser.add_argument('--batch_size', default=32, type=int, help='每个设备上的输入批量大小(默认为32)') parser.add_argument('--node_rank', default=0, type=int, help='多节点训练中节点的排名') args = parser.parse_args() world_size_per_node = torch.cuda.device_count() mp.spawn(main, args=(world_size_per_node, args.save_every, args.total_epochs, args.batch_size, args.node_rank), nprocs=world_size_per_node)
使用集群(SLURM)
现在你可以将训练发送到集群了。非常简单,你只需要调用所需的节点数。
这是一个SLURM脚本的模板:
#!/bin/bash#SBATCH --job-name=DDPTraining # 作业名称#SBATCH --nodes=$1 # 用户指定的节点数#SBATCH --ntasks-per-node=1 # 确保每个节点只运行一个任务#SBATCH --cpus-per-task=1 # 每个任务的CPU核心数#SBATCH --gres=gpu:1 # 每个节点的GPU数#SBATCH --time=01:00:00 # 时间限制小时:分钟:秒(此示例中为1小时)#SBATCH --mem=4GB # 每个GPU的内存限制#SBATCH --output=training_%j.log # 输出和错误日志名称(%j 扩展为作业ID)#SBATCH --partition=gpu # 指定分区或队列run python3 your_python_script.py --total_epochs 10 --save_every 2 --batch_size 32 --node_rank $SLURM_NODEID
现在,您可以通过以下命令从终端启动训练:
sbatch train_net.sh 2 # 使用2个节点
恭喜,您成功了!
感谢阅读!在您离开之前:
要获取更多精彩的教程,请查看我的AI教程合集在Github上
GitHub – FrancoisPorcher/awesome-ai-tutorials: AI教程的最佳合集,让您成为…
AI教程的最佳合集,让您成为数据科学达人! – GitHub …
github.com
您应该在收件箱中收到我的文章。 在此处订阅。
如果您想要访问VoAGI的高级文章,您只需要支付每月5美元的会员费用。如果您使用 我的链接注册,您将在不增加额外费用的情况下支持我一部分费用。
如果您觉得本文有启发性和益处,请考虑关注我并为更深入的内容点赞!您的支持帮助我继续制作有助于我们共同理解的内容。