使用PyTorch Profiler和TensorBoard解决数据输入管道上的瓶颈问题

使用PyTorch Profiler和TensorBoard解决数据输入管道上的瓶颈问题' can be condensed to '使用PyTorch Profiler和TensorBoard解决数据输入管道瓶颈

PyTorch模型性能分析与优化-第4部分

Photo by Alexander Grey on Unsplash

这是我们关于GPU-based PyTorch工作负载性能分析和优化系列文章的第四篇。本文将重点讨论训练数据输入流程。在典型的训练应用中,主机的CPU在将数据加载、预处理和整理后输入GPU进行训练。当主机无法跟上GPU速度时,输入流程中就会出现瓶颈。这导致GPU,即训练设置中最昂贵的资源,在等待主机输入数据时处于空闲状态。在之前的文章中(例如,这里),我们详细讨论了输入流程中的瓶颈问题,并介绍了解决这些问题的不同方法,例如:

  1. 选择适合工作负载的CPU到GPU计算比例的训练实例(例如,参见我们关于选择最佳实例类型的ML工作负载建议的先前文章),
  2. 通过将一些CPU预处理活动移至GPU来改善CPU和GPU之间的工作负载平衡,
  3. 将一些CPU计算卸载到辅助CPU工作器设备上(例如,参见这里)。

当然,解决数据输入流程中的性能瓶颈的第一步是识别和理解瓶颈。在本文中,我们将演示如何使用PyTorch Profiler及其关联的TensorBoard插件来完成这一任务。

与之前的文章一样,我们将定义一个玩具PyTorch模型,并通过迭代地分析其性能、识别瓶颈并尝试修复它们。我们将在Amazon EC2 g5.2xlarge实例上运行实验(包含NVIDIA A10G GPU和8个虚拟CPU),并使用官方的AWS PyTorch 2.0 Docker镜像。请注意,我们描述的行为可能会因PyTorch的不同版本而有所不同。

非常感谢Yitzhak Levi在本文中的贡献。

玩具模型

在以下代码块中,我们介绍了我们将用于演示的玩具示例。我们首先定义了一个简单的图像分类模型。模型的输入是一批256×256的YUV图像,输出是其对应的语义类别预测批次。

from math import log2import torchimport torch.nn as nnimport torch.nn.functional as Fimg_size = 256num_classes = 10hidden_size = 30# 玩具CNN分类模型class Net(nn.Module):    def __init__(self, img_size=img_size, num_classes=num_classes):        super().__init__()        self.conv_in = nn.Conv2d(3, hidden_size, 3, padding='same')        num_hidden = int(log2(img_size))        hidden = []        for i in range(num_hidden):            hidden.append(nn.Conv2d(hidden_size, hidden_size, 3, padding='same'))            hidden.append(nn.ReLU())            hidden.append(nn.MaxPool2d(2))        self.hidden = nn.Sequential(*hidden)        self.conv_out = nn.Conv2d(hidden_size, num_classes, 3, padding='same')    def forward(self, x):        x = F.relu(self.conv_in(x))        x = self.hidden(x)        x = self.conv_out(x)        x = torch.flatten(x, 1)        return x

下面的代码块包含了我们的数据集定义。我们的数据集包含一万个jpeg图像文件路径及其相关(随机生成的)语义标签。为了简化演示,我们假设所有的jpeg文件路径都指向同一张图片-本文开头色彩丰富的“瓶颈”图片。

import numpy as npfrom PIL import Imagefrom torchvision.datasets.vision import VisionDatasetinput_img_size = [533, 800]class FakeDataset(VisionDataset):    def __init__(self, transform):        super().__init__(root=None, transform=transform)        size = 10000        self.img_files = [f'0.jpg' for i in range(size)]        self.targets = np.random.randint(low=0,high=num_classes,                                         size=(size),dtype=np.uint8).tolist()    def __getitem__(self, index):        img_file, target = self.img_files[index], self.targets[index]        with torch.profiler.record_function('PIL open'):            img = Image.open(img_file)        if self.transform is not None:            img = self.transform(img)        return img, target    def __len__(self):        return len(self.img_files)

请注意,我们已经使用torch.profiler.record_function上下文管理器封装了文件读取器。

我们的输入数据流水线包括对图像进行以下转换:

  1. PILToTensor将PIL图像转换为PyTorch Tensor。
  2. RandomCrop在图像中的随机偏移处返回一个256×256的裁剪。
  3. RandomMask是一个自定义转换,它创建一个随机的256×256布尔掩码,并将其应用于图像。该转换包括对掩码进行四邻域膨胀操作。
  4. ConvertColor是一个自定义转换,它将图像格式从RGB转换为YUV。
  5. Scale是一个自定义转换,它将像素缩放到范围[0,1]。
class RandomMask(torch.nn.Module):    def __init__(self, ratio=0.25):        super().__init__()        self.ratio=ratio    def dilate_mask(self, mask):        # 对掩码进行四邻域膨胀操作        with torch.profiler.record_function('膨胀'):            from scipy.signal import convolve2d            dilated = convolve2d(mask, [[0, 1, 0],                                     [1, 1, 1],                                     [0, 1, 0]], mode='same').astype(bool)        return dilated    def forward(self, img):        with torch.profiler.record_function('随机'):            mask = np.random.uniform(size=(img_size, img_size)) < self.ratio        dilated_mask = torch.unsqueeze(torch.tensor(self.dilate_mask(mask)),0)        dilated_mask = dilated_mask.expand(3,-1,-1)        img[dilated_mask] = 0.        return img    def __repr__(self):        return f"{self.__class__.__name__}(ratio={self.ratio})"class ConvertColor(torch.nn.Module):    def __init__(self):        super().__init__()        self.A=torch.tensor(            [[0.299, 0.587, 0.114],             [-0.16874, -0.33126, 0.5],             [0.5, -0.41869, -0.08131]]        )        self.b=torch.tensor([0.,128.,128.])    def forward(self, img):        img = img.to(dtype=torch.get_default_dtype())        img = torch.matmul(self.A,img.view([3,-1])).view(img.shape)        img = img + self.b[:,None,None]        return img    def __repr__(self):        return f"{self.__class__.__name__}()"class Scale(object):    def __call__(self, img):        return img.to(dtype=torch.get_default_dtype()).div(255)    def __repr__(self):        return f"{self.__class__.__name__}()"

我们使用Compose类链接转换,稍微修改了该类以在每个转换调用周围添加torch.profiler.record_function上下文管理器。

import torchvision.transforms as Tclass CustomCompose(T.Compose):    def __call__(self, img):        for t in self.transforms:            with torch.profiler.record_function(t.__class__.__name__):                img = t(img)        return imgtransform = CustomCompose(    [T.PILToTensor(),     T.RandomCrop(img_size),     RandomMask(),     ConvertColor(),     Scale()])

在下面的代码块中,我们定义了数据集和数据加载器。我们配置DataLoader使用自定义collate函数,在该函数中我们使用torch.profiler.record_function上下文管理器包装了默认的collate函数。

train_set = FakeDataset(transform=transform)def custom_collate(batch):    from torch.utils.data._utils.collate import default_collate    with torch.profiler.record_function('整理'):        batch = default_collate(batch)    image, label = batch    return image, labeltrain_loader = torch.utils.data.DataLoader(train_set, batch_size=256,                                           collate_fn=custom_collate,                                           num_workers=4, pin_memory=True)

最后,我们定义了模型、损失函数、优化器和训练循环,并使用profiler上下文管理器包装了它们。

from statistics import mean, variancefrom time import timedevice = torch.device("cuda:0")model = Net().cuda(device)criterion = nn.CrossEntropyLoss().cuda(device)optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)model.train()t0 = time()times = []with torch.profiler.profile(    schedule=torch.profiler.schedule(wait=10, warmup=2, active=10, repeat=1),    on_trace_ready=torch.profiler.tensorboard_trace_handler('/tmp/prof'),    record_shapes=True,    profile_memory=True,    with_stack=True) as prof:    for step, data in enumerate(train_loader):        with torch.profiler.record_function('h2d复制'):            inputs, labels = data[0].to(device=device, non_blocking=True), \                             data[1].to(device=device, non_blocking=True)        if step >= 40:            break        outputs = model(inputs)        loss = criterion(outputs, labels)        optimizer.zero_grad(set_to_none=True)        loss.backward()        optimizer.step()        prof.step()        times.append(time()-t0)        t0 = time()print(f'平均时间:{mean(times[1:])},方差:{variance(times[1:])}')

在下面的部分中,我们将使用PyTorch Profiler及其关联的TensorBoard插件来评估我们模型的性能。我们的重点将放在分析器报告的Trace View上。请查看我们系列中的第一篇文章,了解如何使用报告的其他部分。

初始性能结果

我们定义的脚本报告的平均步骤时间为1.3秒,平均GPU利用率非常低,为18.21%。在下面的图像中,我们捕捉了TensorBoard插件Trace View中显示的性能结果:

基准模型的Trace View(作者捕捉)

我们可以看到,每第四个训练步骤都包括一个长约5.5秒的数据加载期间,在此期间GPU完全空闲。这种情况发生在每第四个步骤是直接与我们选择的DataLoader工作器数量(四个)有关。每第四个步骤中,我们发现所有工作器都忙于生成下一批样本,而GPU则等待。这清楚地表明了数据输入管道中的瓶颈。问题是我们如何分析它?复杂化问题的是,我们在代码中插入的许多record_function标记在性能分析中找不到。

DataLoader中使用多个工作器对于优化性能至关重要。不幸的是,这也使得性能分析过程更加困难。虽然存在支持多进程分析的性能分析工具(例如,可以尝试VizTracer),但在本文中,我们将采取的方法是在单进程模式下运行、分析和优化我们的模型(即零个DataLoader工作器),然后将优化应用于多工作器模式。诚然,优化独立函数的速度并不保证多个(并行)调用相同函数也会受益。然而,正如我们在本文中所看到的,这种策略将使我们能够识别和解决一些我们无法以其他方式识别的核心问题,并且至少在这里讨论的问题方面,我们将发现两种模式的性能影响之间存在很强的相关性。但是在应用这种策略之前,让我们调整一下工作器数量的选择。

优化1:调整多进程策略

确定多进程/多线程应用程序中的最佳线程或进程数量可能有些棘手。一方面,如果选择的数目太低,可能会低效利用CPU资源。另一方面,如果选择的数目太高,可能会出现抖动现象,即操作系统花费大部分时间来管理多线程/多进程,而不是运行我们的代码。对于PyTorch训练负载,建议尝试不同的DataLoader num_workers设置。一个好的起点是根据主机上的CPU数量来设置该数目(例如,num_workers:=num_cpus/num_gpus)。在我们的情况下,Amazon EC2 g5.2xlarge有八个虚拟CPU,实际上,将DataLoader工作器的数量增加到八个会导致平均步骤时间略有改善,为1.17秒(提升了11%)。

重要的是,要注意其他不太明显的配置设置可能会影响数据输入管道使用的线程或进程的数量。例如,用于计算机视觉工作负载中常用的图像预处理库opencv-python包括cv2.setNumThreads(int)函数,用于控制线程的数量。

在下面的图像中,我们捕捉了使用num_workers设置为零时运行脚本的Trace View的一部分。

单进程模式下基准模型的Trace View(作者捕捉)

以这种方式运行脚本使我们能够看到我们设置的record_function标签,并确定RandomMask transform,或更具体地说,我们的膨胀函数是检索每个单独样本中耗时最长的操作。

优化2:优化膨胀函数

我们当前的膨胀函数实现使用了2D卷积,通常是使用矩阵乘法来实现的,在CPU上运行速度并不快。一种选择是在GPU上运行膨胀函数(如本文所述)。然而,主机与设备之间的开销可能会超过此类解决方案的潜在性能提升,更不用说我们不希望增加GPU的负载了。

在下面的代码块中,我们提出了一种替代方案,更适合CPU的实现膨胀函数,它使用布尔操作代替了卷积:

    def dilate_mask(self, mask):        # 对掩膜进行4邻域膨胀        with torch.profiler.record_function('dilation'):            padded = np.pad(mask, [(1,1),(1,1)])            dilated = padded[0:-2,1:-1] | padded[1:-1,1:-1] | padded[2:,1:-1] | padded[1:-1,0:-2]| padded[1:-1,2:]        return dilated

在进行这一修改后,我们的步骤时间下降到了0.78秒,相当于额外的50%的改进。下面是更新后的单进程Trace-View:

膨胀优化后的单进程模式Trace-View(作者拍摄)

我们可以看到,膨胀操作已经显著缩小,最耗时的操作现在是PILToTensor转换。

对PILToTensor函数(见此处)进行更详细的查看,可以发现其中有三个基本操作:

  1. 加载PIL图像 – 由于Image.open具有惰性加载属性,图像在此处被加载。
  2. PIL图像被转换为numpy数组。
  3. numpy数组被转换为PyTorch Tensor。

虽然图像加载占据了大部分时间,但我们注意到在对整个大小的图像应用后续操作之后立即将其裁剪掉的极大浪费。这引导我们进行下一个优化。

优化3:重新排序变换

幸运的是,RandomCrop变换可以直接应用于PIL图像,使得我们可以将图像尺寸缩小作为我们管道上的第一个操作:

transform = CustomCompose(    [T.RandomCrop(img_size),     T.PILToTensor(),     RandomMask(),     ConvertColor(),     Scale()])

在进行这一优化后,我们的步骤时间下降到了0.72秒,额外的8%的优化。下面的Trace-View捕获显示RandomCrop变换现在是主要操作:

重新排序变换后的单进程模式Trace-View(作者拍摄)

实际上,和之前一样,实际上是PIL图像(惰性加载)加载导致了瓶颈,而不是随机裁剪。

理想情况下,我们希望进一步优化,将读取操作限制在我们感兴趣的裁剪区域。然而,截至本文撰写时,torchvision不支持此选项。在未来的文章中,我们将演示如何通过实现自定义的decode_and_crop PyTorch运算符来克服这个缺点。

优化4:应用批量变换

在我们当前的实现中,每个图像变换都在每个图像上单独应用。然而,有些变换在一次性应用于整个批次时可能会更加高效。在下面的代码块中,我们修改了我们的管道,使得ColorTransformation和Scale变换在我们自定义的collate函数中应用于图像批次

def batch_transform(img):    img = img.to(dtype=torch.get_default_dtype())    A = torch.tensor(        [[0.299, 0.587, 0.114],         [-0.16874, -0.33126, 0.5],         [0.5, -0.41869, -0.08131]]    )    b = torch.tensor([0., 128., 128.])    A = torch.broadcast_to(A, ([img.shape[0],3,3]))    t_img = torch.bmm(A,img.view(img.shape[0],3,-1))    t_img = t_img + b[None,:, None]    return t_img.view(img.shape)/255def custom_collate(batch):    from torch.utils.data._utils.collate import default_collate    with torch.profiler.record_function('collate'):        batch = default_collate(batch)    image, label = batch    with torch.profiler.record_function('batch_transform'):        image = batch_transform(image)    return image, label

这个改变的结果实际上是步骤时间的轻微增加,为0.75秒。虽然对于我们的玩具模型来说没有帮助,但是能够将某些操作作为批处理转换而不是每个样本的转换,有潜力优化某些工作负载。

结果

我们在本文中应用的连续优化导致运行时性能提高了80%。然而,尽管不那么严重,输入管道仍存在瓶颈,GPU 的利用率仍然很低(约30%)。请参阅我们之前的帖子(例如,这里)以获取解决此类问题的其他方法。

总结

在本文中,我们重点关注了训练数据输入管道的性能问题。与本系列中的前几篇文章一样,我们选择了 PyTorch Profiler 及其相关的 TensorBoard 插件作为工具,并展示了如何加速训练速度的使用。特别是,我们展示了如何使用零工作线程运行 DataLoader,以增加我们识别、分析和优化数据输入管道中的瓶颈的能力。

与我们之前的帖子一样,我们强调成功优化的路径将根据训练项目的细节(包括模型架构和训练环境)而大不相同。实际上,达到目标可能比我们在这里介绍的示例更困难。我们描述的一些技术可能对您的性能影响很小,甚至可能使其变得更糟。我们还注意到,我们选择的精确优化和应用顺序在一定程度上是随意的。强烈建议您根据项目的具体细节开发自己的工具和技术,以实现优化目标。