Python Numba和CUDA C的批K-Means

Python Numba和CUDA C优化的批K-Means' (Python Numba and CUDA C Optimized Batch K-Means)

如何加快数据分析速度——1600倍对比Scikit-Learn(附带代码!)

基于作者绘制的图生成的图片

并行化数据分析工作负载可能是一项艰巨的任务,尤其当针对特定用例没有有效的现成实现可用时。在本教程中,我将介绍在C和Python Numba中编写CUDA内核的原则,以及如何将这些原则应用于经典的k-means聚类算法。通过本文的阅读,您将能够使用C和Python编写自定义的批量k-means并行化实现,并与标准的Scikit-Learn实现相比,实现高达1600倍的加速。如果您想直接查看代码,可以在Colab上找到它。

介绍

我们将研究两个在NVIDIA GPU上并行化的框架:Python Numba和CUDA C。然后,我们将在两个框架中实现相同的批量k-means算法,并与Scikit-Learn进行基准测试。

Numba对于那些更喜欢Python而不是C的人来说,是一个更温和的学习曲线。Numba将代码的部分编译为称为内核的专门CUDA函数。CUDA C是在抽象层次树上更低一层的,它提供了更精细的控制。Colab示例的设计紧密地与算法实现相吻合,因此一旦了解其中之一,您就可以很容易地理解另一个。

本教程是基于我自己编写的自定义批量k-means实现发展而来的,我将其用作说明性示例。通常,k-means库被优化为在极大型数据集上运行算法的单个实例。然而,在我们的项目中,我们需要并行地对数百万个单独的小数据集进行聚类,这不是我们能找到现成库来解决的问题。

本文分为四个主要部分:CUDA基础知识、并行化k-means算法、Python Numba和CUDA C。我将尽量保持材料相对独立,根据需要对概念进行简要回顾。

CUDA基础知识

CPU设计用于快速顺序处理,GPU设计用于大规模并行处理。对于我们的算法,我们需要在数百万个小独立数据集上运行k-means,这十分适合GPU实现。

我们的实现将使用CUDA,即Compute Unified Device Architecture,它是由NVIDIA开发的C库和计算平台,用于利用GPU进行并行计算。

为了理解GPU内核和设备函数,下面是一些定义:

线程是一系列可以独立执行的指令。

核心是一个可以执行单个线程的处理单元。

线程束(warp)是最小的线程调度单元。每个线程束由32个线程组成,并将它们调度到核心上。

多处理器(Multiprocessor)也称为流处理多处理器(Streaming Multiprocessor,SM),由一定数量的核心组成。

GPU内核是设计为在众多GPU线程上并行执行的函数。内核函数由CPU调用,并在GPU上执行。

设备函数是可以由内核函数或其他设备函数调用的函数。此函数在GPU上执行。

这些内核函数按照网格线程的层次结构进行组织。在GPU的上下文中,每个线程与单个核心连接,并执行内核的副本。

GPUFunction[(x, y), z](Data Structure). 作者提供的图片。

一个是在一个多处理器上运行的线程集合。

网格是一个抽象的数组,用于组织线程的块。网格被用来通过索引将内核实例映射到线程。

在GPU上,有两种类型的内存:全局内存和共享内存。

全局内存存储在DRAM(动态随机存取内存)上。所有线程可以访问其内容,但访问的延迟较高。

共享内存存储在缓存上,并且对于同一块内的线程是私有的。它比全局内存的访问速度要快得多。

并行化K-Means算法

现在让我们介绍K-Means算法。K-Means是一种无监督聚类算法,将数据集分割成k个不重叠的簇。给定一组数据点,我们首先初始化k个质心,即起始的簇中心:

质心初始化(k=3)。图像由作者提供。

然后,在选择初始质心之后,迭代执行两个步骤:

  1. 分配步骤:根据欧几里德距离将每个数据点分配给最近的质心。
  2. 更新步骤:将质心位置重新分配为上一步中分配给该质心的所有点的平均值。

重复执行这些步骤,直到收敛,即质心位置不再显著变化。输出是一组k个簇质心坐标以及一个用于每个原始数据点的簇索引标记数组(称为标签)。

K-means算法(k = 3)。图像由作者提供。

对于一个大型数据集来说,质心初始化可能会显著影响算法的输出结果。因此,程序会尝试多个初始质心,称为初始种子,并返回最佳种子的结果。每个种子从初始数据集中选择,不会重复选择初始质心。我们的算法的最佳种子数是数据点数的三分之一。在我们的程序中,因为我们对100个数据点的单行运行k-means,所以最佳种子数将是33。

在我们的k-means函数中,一百万行数据由块代表,而线程代表种子。块中的线程组织成warp,这是硬件架构中最小的线程调度单元。每个warp由32个线程组成,因此将块大小设置为32的倍数是最优的。然后,每个块会输出具有最小惯性的种子的数据,惯性是通过簇中心和其相应点之间欧几里德距离的和来衡量的。

并行化K-means — GPU端。图像由作者提供。

在这里链接的Colab中,您可以选择使用Python或C跟随。

global variables: numRows, lineSize, numClustersdef hostKMeans:    inputData = initializeInputData(numRows, lineSize)    outputCentroids = createEmptyArray(numRows, numClusters)    outputLabels = createEmptyArray(numRows, lineSize)        sendToDevice(inputData, outputCentroids, outputLabels)    cuda_kmeans[blockDimensions, threadDimensions](inputData, outputCentroids, outputLabels)    waitForKernelCompletion()    copyFromDevice(outputCentroids, outputLabels)

在我们的k-means算法中,我们首先设置全局变量。我们需要从CPU和GPU都引用它们。

全局变量可以从内核中访问,但内核不能直接向主机返回值。为了规避这个限制,我们的代码的CPU部分在输入数据旁边传递了两个空数组给内核。这些数组将用于将最终的中心点和标签数组复制回CPU。

当我们的数据结构被实例化后,我们调用内核,将网格和块的维度定义为一个元组。调用内核包括传入我们的数据,为中心点和标签进行内存分配,并为初始随机中心点初始化提供一个状态。

def KMeansKernel(data, outputCentroids, outputLabels)    row = currentBlock()    seed = currentThread()    sharedInputRow = sharedArray(shape=(lineSize))    sharedInertia = sharedArray(shape=(numSeeds))    sharedCentroids = sharedArray(shape=(numSeeds, numClusters))    sharedLabels = sharedArray(shape=(numSeeds, lineSize))    sharedInputRow = data[row]    synchronizeThreads()    if seed == 0        centroids = initializeCentroids(data)    synchronizeThreads()        KMeansAlgorithm(sharedInputRow, sharedCentroids, sharedLabels)        sharedInertia[Seed] = calculateInertia(sharedInputRow, sharedCentroids, sharedLabels)        synchronizeThreads()    if seed == 0        minInertiaIndex = findMin(Inertia)    sharedOutputCentroids = centroids[minInertiaIndex]    sharedOutputLabels = labels[minInertiaIndex]

一旦在设备上(也就是GPU上),我们的代码现在同时存在于整个网格上。要确定我们在网格上的位置,我们需要访问块和线程的索引。

在GPU上,内存默认为存储在DRAM上的全局内存。共享内存是线程在同一个块内的私有内存。

通过将数据的单行(所有块中的线程都必须从中读取)传输到共享内存中,我们减少了内存访问时间。在cuda_kmeans函数中,我们创建共享内存来存储数据的行、中心点、标签以及称为inertia的每个种子的准确度衡量。

在我们的程序中,每个线程对应一个种子,并且块中的所有线程都在同一行数据上工作。对于每个,一个线程顺序地创建32个种子,并将它们的结果聚合到其他线程的单个数据结构中。

当算法的后续步骤依赖于完成此聚合时,线程必须使用内置的CUDA同步线程函数syncthreads()进行同步注意:必须非常小心地放置syncthreads()调用,因为尝试在并非所有线程都有机会完成时同步线程可能导致死锁和整个程序的挂起。

我们的内核函数描述如下伪代码所示,名为cuda_kmeans。这个函数负责安排上面描述的过程,为所有32个种子的结果腾出空间,并在中心点和标签中选择最佳种子作为最终输出。

def KMeansDevice(dataRow, centroids, labels)    seed = currentThread()    centroidsRow = centroids[seed]    labelsRow = labels[seed]     centroidsRow = sort(centroidsRow)    yardStick = computeYardstick(sortedCentroids)     oldCentroids = localArray(shape=(numSeeds, numClusters))    for iteration in range(100):        if converged(oldCentroids, centroidsRow)            break        oldCentroids = copy(centroidsRow)        assignLabels(dataRow, centroidsRow, labelsRow)        updateCentroids(dataRow, centroidsRow, labelsRow)

从cuda_kmeans中我们调用实际的k-means算法,传递新实例化的共享内存。在我们的k-means算法中,我们首先选择初始中心点,然后将它们从最小到最大排序。我们迭代地将数据点分配给最近的中心点,并更新中心点的位置直到收敛。

为了能够确定是否已达到收敛,我们使用一个称为find_yard_stick的辅助函数。这个函数计算并返回两个初始中心点之间的最小距离(yard_stick)。当没有中心点在单次迭代中移动距离超过yard_stick乘以epsilon时,我们认为已经收敛。

收敛后,我们返回到cuda_kmeans。在这里,我们通过计算每个中心点与其数据点之间的平方欧氏距离来确定最佳种子。具有最小的inertia(表示最有效聚类的指标)的种子被视为最佳。然后,我们将这个种子的中心点和标签复制到输出数组中的单行。一旦所有的块都完成了,这些输出就会被复制回主机(CPU)。

K-means算法中的数据传输。图片由作者提供。

介绍Numba

<p设计自定义内核的最简单方法是使用numba。numba是一个可以将python代码编译成cuda内核的python库。

抽象层级。图片由作者提供。

<p在表面下,numba与cuda进行交互。为了并行化您的代码,numba将编译您指定的gpu代码为内核并将其传递给gpu,将程序逻辑分为两个主要部分:

  1. CPU级别的代码
  2. GPU级别的代码

<p使用numba,您将可串行部分和可并行部分的代码分开并交给cpu和gpu处理。为了为gpu编译函数,程序员在函数定义上方使用@cuda.jit装饰器,从而将该函数转换为在cpu(主机)上调用但在gpu(设备)上并行执行的内核。

Python Numba

链接到Colab

Numba充当Python代码和CUDA平台之间的桥梁。由于Python代码与上面的算法伪代码几乎相同,我只提供一些关键相关语法的示例。

cuda_kmeans[(NUM_ROWS,), (NUM_SEEDS,)](input_rows, output_labels, output_centroids, random_states)

<p在实例化必要的全局变量和数据结构之后,我们可以从主机调用名为cuda_kmeans的内核。numba在块和线程的维度上需要两个元组。因为我们将使用一维块和线程,所以每个元组中的第二个索引为空。我们还传入我们的数据结构和一组随机状态的数组以进行随机种子实例化。

@cuda.jit()def cuda_kmeans(input, output_labels, output_centroids, random_states):    row = cuda.blockIdx.x    seed = cuda.threadIdx.x    shared_input_row = cuda.shared.array(shape=(LINE_SIZE), dtype=np.float32)    shared_inertia = cuda.shared.array(shape=(NUM_SEEDS), dtype=np.float32)    shared_centroids = cuda.shared.array(shape=(NUM_SEEDS, NUM_CLUSTERS), dtype=np.float32)    shared_labels = cuda.shared.array(shape=(NUM_SEEDS, LINE_SIZE), dtype=np.int32)    if seed == 0:        get_initial_centroids(shared_input_row, shared_centroids, random_states)    cuda.syncthreads()    ...    kmeans(shared_input_row, shared_labels, shared_centroids)

<p我们使用numba的@cuda.jit()装饰器来标记gpu编译。使用cuda.blockidx.x和cuda.threadidx.x的表示法,我们获得内核的当前索引。使用cuda.shared.array和两个参数(形状和类型)来实例化共享数组,两者在编译时都必须是已知的。在获取质心并填充行数据之后,我们调用kmeans函数,填充共享数组,并进行cuda.syncthreads()调用。

@cuda.jit(device=True)def kmeans(data_row, output_labels, output_centroids):     seed = cuda.threadIdx.x    labels_row = output_labels[seed]    centroids_row = output_centroids[seed]        ...    old_centroids = cuda.local.array(shape=(NUM_CLUSTERS), dtype=np.float32)    for iteration in range(NUM_ITERATIONS):            if iteration > 0:                if converged(centroids_row, old_centroids, yard_stick * EPSILON_PERCENT, iteration):                    break      # 分配标签和更新质心

<pk-means是一个设备函数,因为它被从内核函数调用。因此,我们必须在装饰器中指定device=true:@cuda.jit(device=true)。然后,k-means函数获取标签和质心的当前行,并在收敛之前运行。

只需额外几行代码和一点努力,您的Python代码就可以变成一个优化的核心,准备并行使用。

我们的并行化k-means确实将计算时间大大减少了。然而,封装和编译像Python这样的高级语言并不一定是最佳选择。我很好奇,如果用C语言编写代码是否能加快我们的项目速度,于是我深入了解了CUDA C。

C简介

在Python中,内存和类型分配是自动的,而在C中,内存可以在堆栈或堆上分配,两者都需要显式类型声明,并分配固定数量的内存。堆栈内存由编译器自动分配和释放,而堆内存需要在运行时手动分配,使用诸如malloc()之类的函数,程序员负责其释放。

指针是用来保存变量内存地址的工具。指针引用的数据类型在声明时定义。使用星号(*)指定指针。使用取地址符(&)检索变量的地址。使用星号再次对指针进行解引用,可以访问指针指向的值。

双指针,或指向指针的指针,存储指针的地址。当将数组传递给函数时,这对于修改其他指针的地址很有用。将数组传递给函数时,它们被作为指向其第一个元素的指针传递,不包含大小信息,依靠指针算术来遍历数组。要从函数返回数组,可以使用&传递指针的地址,使用**接收它,允许您修改原始指针的地址,从而传递数组。

int var = 100; // 声明类型int *ptr = &var; // 指针和引用的使用int **double_ptr = &ptr; // 双指针的示例printf("解引用 double_ptr 和 ptr: %d %d \n:", **double_ptr, *ptr)int *ptr = 100; // 初始化指向int类型的指针

CUDA C

链接到 Colab

CUDA是一个利用NVIDIA GPU来并行化复杂计算问题的计算平台。在您掌握了C语言之后(开玩笑的),CUDA C代码结构与我们逐步进行的伪代码结构完全相同。

在CPU方面,我们设置了一些常量,告诉算法应该期望什么,导入库,初始化变量,并定义一些宏。

#define NUM_ROWS 00000        // 数据集的y维度,块数#define LINE_SIZE 100         // 数据集的x维度#define NUM_ITERATIONS 100    // 最大迭代次数#define NUM_CLUSTERS 3        // 运行k = 3#define MAX_INPUT_VALUE 100   // 数据的上限#define NUM_SEEDS 32          // 种子/线程数量为LINE_SIZE的1/3#define EPSILON_PERCENT 0.02  // 收敛条件void initInputData(float** input) {    srand(1);     // 为数据分配内存        ... // 使用malloc和rand初始化数据    // 在GPU上分配内存    cudaMalloc(input, NUM_ROWS * LINE_SIZE * sizeof(float));     // 将内存从CPU的sample_data复制到GPU的内存    cudaMemcpy(*input, sample_data, NUM_ROWS * LINE_SIZE * sizeof(float), cudaMemcpyHostToDevice);    free(sample_data);}int main() {    float* inputData; // 初始化输入数据,尺寸将为NUM_ROWS x LINE SIZE    initInputData(&inputData); // 解引用并传递给函数    // 初始化输出标签和质心    cudaExtent labelsExtent = make_cudaExtent(sizeof(int), LINE_SIZE, NUM_ROWS);    cudaPitchedPtr outputLabels; // 创建下一个调用所需的指针    cudaMalloc3D(&outputLabels, labelsExtent); // 在GPU上分配内存        cudaExtent centroidsExtent = make_cudaExtent(sizeof(float), NUM_CLUSTERS, NUM_ROWS);    cudaPitchedPtr outputCentroids; // 创建下一个调用所需的指针    cudaMalloc3D(&outputCentroids, centroidsExtent); // 在GPU上分配内存    cuda_kmeans <<<NUM_ROWS, NUM_SEEDS>>> (inputData, outputLabels, outputCentroids);    cudaDeviceSynchronize();        ... // 将输出从设备复制回主机}

让我们一一解释这些差异。

主要功能通过创建一个指针并将其传递给initInputData函数作为指针的地址来初始化数据。该函数将inputData作为指向指针的指针(float ** input)接收,这使得函数能够修改原始指针所持有的地址。然后,将Input指向使用cudaMalloc初始化的GPU内存地址,并使用cudaMemcpy填充该地址,将数据从临时主机数组sample_data复制到其中,该数组已经填充了随机数。

然后,代码在设备上分配内存以保存来自k-means函数的结果。该函数使用make_cudaExtent创建一个cudaExtent对象,其目的是封装多维数组的维度。

cudaPitchedPointer类型用于定义一个能够寻址这个分配内存空间的指针。这种类型的指针专门设计用于与由cudaMalloc3D分配的内存一起工作,cudaMalloc3D使用cudaPitchedPtr和cudaExtent对象来在GPU上分配线性内存。

cuda_kmeans <<<NUM_ROWS, NUM_SEEDS>>> (inputData, outputLabels, outputCentroids);

进入GPU代码,我们将网格定义为每个块对应于一行数据,每个线程对应于一个种子。

种子由每个块中的单个线程初始化,以确保完全不同的种子。

__global__ void cuda_kmeans(float* input, cudaPitchedPtr outputLabels, cudaPitchedPtr outputCentroids) {
    int row = blockIdx.x;
    int seed = threadIdx.x;
    // shared memory is shared between threads in blocks
    __shared__ float input_shm[LINE_SIZE];
    __shared__ float error_shm[NUM_SEEDS];
    __shared__ float seed_centroids[NUM_SEEDS][NUM_CLUSTERS];
    __shared__ int seed_labels[NUM_SEEDS][LINE_SIZE];
    ... // get a single row of data
    ... // populate input_shm
    ... // populating the struct core_params
    // the actual k-means function
    kmeans(core_params);
    // find seed with smallest error
    calcError(core_params);
    __syncthreads();
    if (seed == 0) {
        int* labels_line = LABELS_LINE(outputLabels, row);
        float* centroids_line = CENTROIDS_LINE(outputCentroids, row);
        labels_line[threadIdx.x] = seed_labels[seed][threadIdx.x];
        centroids_line[threadIdx.x] = seed_centroids[seed][threadIdx.x];
    }
}

CUDA C代码使用共享内存来存储数据、质心、标签和错误。然而,与Python不同的是,该代码接收共享内存的指针,并将其存储在结构体中,这只是一种批量传递变量的方法。最后,cuda_kmeans调用实际的k-means算法并传递core_params。

__device__ void kmeans(core_params_t& core_params) {
    DECLARE_CORE_PARAMS(core_params);
    getInitialCentroids(core_params);
    sort_centroids(centroids, num_clusters);
    float yard_stick = findYardStick(core_params);
    float* oldCentroids = (float*)malloc(NUM_CLUSTERS * sizeof(float));
    struct work_params_t work_params;
    work_params.min = find_min(line, LINE_SIZE);
    work_params.max = find_max(line, LINE_SIZE);
    work_params.aux_buf1 = (int*)malloc(NUM_CLUSTERS * sizeof(int));
    work_params.aux_buf2 = (int*)malloc(NUM_CLUSTERS * sizeof(int));
    work_params.aux_buf3 = (float*)malloc(NUM_CLUSTERS * sizeof(float));
    for (int iterations = 0; true; iterations++) {
        bool stop = (iterations > 100) || (iterations > 0 && (converged(core_params, oldCentroids, yard_stick * EPSILON_PERCENT)));
        if (stop)
            break;
        memcpy(oldCentroids, core_params.centroids, NUM_CLUSTERS * sizeof(float));
        getLabels(core_params);
        getCentroids(core_params, work_params);
    }
    free(work_params.aux_buf1);
    free(work_params.aux_buf2);
    free(work_params.aux_buf3);
    free(oldCentroids);
}

在设备函数之前,我们使用DECLARE_CORE_PARAMS宏将core_params结构中的值提取到变量中。

然后,我们运行与Python中相同的k-means算法,唯一的区别是我们传递结构体而不是变量,并且必须管理内存、指针和类型。

基准测试

为了将我们的算法与非并行化的k-means进行比较,我们导入了scikit-learn中的k-means模块。

在我们的基准测试中,我们运行了100,000行100列的数据集,划分为三个簇。由于scikit-learn没有针对不同行的并行化k-means,我们使用for循环依次对行进行运行。

在colab上,我们使用了免费的T4 GPU Colab实例进行基准测试。

图片由作者提供。

结果很好——Python的Numba代码比非并行化的CPU代码快两个数量级,而CUDA C代码则快三个数量级。这些内核函数具有良好的可扩展性,该算法也可以调整以支持更高维度的聚类。

请注意,C和Python算法中的随机初始质心生成并未充分优化以使用所有核心。在改进后,Python算法的运行时间可能接近C代码的运行时间。

基于免费的Colab T4 GPU在2023年11月23日的运行时间。图片由作者提供。

在对不同数据集运行k-means函数一百次并记录结果时间后,我们注意到第一次迭代较慢,这是因为在Colab中编译C和Python代码所需的时间。

结论

现在,您已经准备好编写自己的自定义GPU内核了!一个仍然存在的问题可能是——在并行化数据处理工作负载中,应该使用CUDA C还是Numba?这取决于情况。两者都比现成的scikit-learn快得多。尽管在我的情况下,CUDA C批处理的k-means实现比使用Numba编写的等效实现快大约3.5倍,但Python提供了一些重要的优势,如易读性和对专业C编程技能的依赖较少的Python团队。此外,您特定实现的运行时间将取决于您对代码进行了多少优化,例如在GPU上避免触发串行操作。总之,如果您对C和并行化编程都不熟悉,我建议先使用Numba来原型化您的算法,然后在需要额外加速时将其转换为CUDA C。

参考资料

  1. Scikit-learn: Python中的机器学习,Pedregosa等人,JMLR 12,第2825-2830页,2011年。
  2. NVIDIA,Vingelmann, P. & Fitzek, F.H.P., 2020. CUDA,版本:10.2。89,可在:https://developer.nvidia.com/cuda-toolkit。
  3. Lam, Siu Kwan, Antoine Pitrou和Stanley Seibert。 “Numba:基于llvm的Python JIT编译器。” 第二届高性能计算中LLVM编译器基础设施研讨会论文集。2015。
  4. Harris, C.R., Millman, K.J., van der Walt, S.J.等“NumPy的数组编程。” 《自然》585,357–362(2020)。DOI:10.1038/s41586–020–2649–2。(发布者链接)。

</pk-means是一个设备函数,因为它被从内核函数调用。因此,我们必须在装饰器中指定device=true:@cuda.jit(device=true)。然后,k-means函数获取标签和质心的当前行,并在收敛之前运行。</p我们使用numba的@cuda.jit()装饰器来标记gpu编译。使用cuda.blockidx.x和cuda.threadidx.x的表示法,我们获得内核的当前索引。使用cuda.shared.array和两个参数(形状和类型)来实例化共享数组,两者在编译时都必须是已知的。在获取质心并填充行数据之后,我们调用kmeans函数,填充共享数组,并进行cuda.syncthreads()调用。</p在实例化必要的全局变量和数据结构之后,我们可以从主机调用名为cuda_kmeans的内核。numba在块和线程的维度上需要两个元组。因为我们将使用一维块和线程,所以每个元组中的第二个索引为空。我们还传入我们的数据结构和一组随机状态的数组以进行随机种子实例化。</p使用numba,您将可串行部分和可并行部分的代码分开并交给cpu和gpu处理。为了为gpu编译函数,程序员在函数定义上方使用@cuda.jit装饰器,从而将该函数转换为在cpu(主机)上调用但在gpu(设备)上并行执行的内核。</p在表面下,numba与cuda进行交互。为了并行化您的代码,numba将编译您指定的gpu代码为内核并将其传递给gpu,将程序逻辑分为两个主要部分:</p设计自定义内核的最简单方法是使用numba。numba是一个可以将python代码编译成cuda内核的python库。