“全球最小的数据管道框架”

领先全球的微型数据管道框架

一个简单而快速的数据管道基础,具有复杂的功能。

Photo by Ana Lucia Cottone on Unsplash

数据整理可能是数据科学家最耗时的工作。数据整理包括清理、转换和一般操纵数据,将其从未经加工的状态转化为有用的数据。与许多活动一样,整理过程通常需要随着时间的推移不断完善。因此,重要的是跟踪数据集是如何进行整理的,以便您的团队可以在时间上进行管理和重现整个过程。数据整理虽然并不总是有趣,但却是任何现代公司最重要的活动。

有一些专门从事数据管道的公司,它们可能非常复杂和高级。但在这个探讨中,我们先考虑将文本文件转化为一组单词或“标记”的任务,将对我们不有用的文本排除。让我们从简单的开始,逐步提高。

首先,让我们定义一系列的步骤,对文本中的单词执行整理功能。我们将使用Python的text.translate()函数来完成其中的一些工作。考虑以下4个函数:

import stringdef step1(word):    trans = str.maketrans("", "", string.punctuation)    return word.replace("\n", " ").translate(trans)def step2(word):    return word.lower()def step3(word):    trans = str.maketrans("", "", "0123456789")    return word.replace("\n", " ").translate(trans)def step4(word):    return (all([char in string.ascii_letters for char in word]) and             len(word) > 0)

step1是一个函数,从单词中删除所有的标点符号,并去除换行符。 step2将单词转换为小写。 step3再次使用了 text.translate()来删除数字。而step4将用作过滤器,以过滤掉包含非ASCII字母的单词。您可以想象出额外的步骤,例如词干提取。

由于这些是简单的函数,如果我们将 step1 应用于一个单词,我们将得到:

>>> step1("Testing---123;")'Testing123'

事实上,它已经从文本中删除了标点符号。我们可以通过将它们像俄罗斯套娃一样嵌套在单词周围来应用所有三个函数:

>>> step3(step2(step1("Testing---123;")))'testing'

在这里我们可以看到函数 step1step2step3已经被应用,只剩下了字母“testing”。请注意,我们将定义我们的函数按特定的顺序工作。也就是说,step1 应该在 step2 之前完成,等等。

这种基于函数的处理过程简单创建和使用。当然,我们可以一次执行所有的函数。但是随着“管道”中的函数越来越长和更复杂,将过程分解为离散的步骤将使过程更可控。实际上,每个步骤可能变得非常复杂,需要不同的团队来处理。

好的,到目前为止都很好。但是当然,我们不想手动将函数管道应用到每个单词上。相反,我们希望将其应用于列表中的每个单词。为此,我们创建一个非常简单的函数 apply():

def apply(step, values):    return [step(value) for value in values]

现在我们可以在整个单词列表上使用相同的函数:

>>> apply(step3,           apply(step2,                 apply(step1,                       ["Testing---123;", "456---", "Hello!"])))['testing', '', 'hello']

啊,是的,我们需要删除空单词。 step4 正是为此而设计的,但使用起来稍微复杂一些。它看起来像这样:

>>> list(filter(step4,             apply(step3,                   apply(step2,                         apply(step1,                               ["Testing---123;", "456---", "Hello!"])))))['testing', 'hello']

也就是说,因为 step4 是一个过滤函数,返回 True 保留它,返回 False 移除它,所以它的应用方式是这样的:filter(step4, data)

这个简单方法存在几个问题:

  1. 步骤是从内而外应用的。也就是说,第一步 step1 是最内层的函数,而 step3 处于最外层。不太直观。
  2. 非常冗长,因为我们必须为每个步骤函数重复使用 apply() 函数。
  3. 过滤器(如 step4)无法像其他函数那样使用。

考虑到这些问题,我们可以将主要功能抽象成一个通用的流水线吗?我想到了一个两步的方法:

# 首先,我们创建一个流水线函数:p = my_pipeline(step1, step2, step3)# 然后,我们将其应用于一个数据集:p(["Testing---123;", "456---", "Hello!"])

我们如何定义 my_pipeline 呢?事实证明,这非常简单:

def my_pipeline(*steps):    def wrapper(inputs):        for step in steps:            inputs = apply(step, inputs)        return inputs    return wrapper

也就是说, my_pipeline 是一个接受一系列步骤函数的函数,返回一个接受单词列表的函数,将系列中的每个步骤应用于列表,并返回处理后的单词列表。

让我们试一试:

>>> p = my_pipeline(step1, step2, step3)>>> p(["Testing---123;", "456---", "Hello!"])['testing', '', 'hello']

成功了 – 我们得到了与之前完全相同的结果!那么, step4 过滤函数呢?我们暂时放一放,先在“真实”数据上试试这个系统。嗯,其实是假的真实数据。为了这些实验,我们将创建 10,000 个由 10 个段落组成的文档。我们将使用 Python 包 essential_generators 中的 DocumentGenerator()

from essential_generators import DocumentGeneratorimport osgen = DocumentGenerator()def generate_documents(    count=10_000,     paragraphs=10,     output_folder="documents",     overwrite=False):    os.makedirs(output_folder, exist_ok=True)    for n in range(count):        filename = os.path.join(            output_folder,             "doc_%05d.txt" % (n + 1)        )        if overwrite or not os.path.exists(filename):            with open(filename, "w") as fp:                for p in range(paragraphs):                    fp.write(gen.paragraph() + "\n\n")generate_documents()

生成所有数据需要约 30 秒的时间。为了继续我们简单的代码,我们需要引入一个额外的步骤:

def step0(filename):    return open(filename).read().split(" ")

这一步将接受一个文件名,打开文件,并以空格分割文本。我们还需要对我们的 apply() 函数进行微小调整,以处理单词列表而不是单词:

def apply(step, outputs):    return (step(input) if not isinstance(input, list) else             [step(i) for i in input] for input in outputs)

我还对 apply 做了一个微小的调整:它现在使用周围的括号而不是方括号返回一个生成器表达式而不是列表理解式。这将推迟处理直到需要(有时称为“惰性求值”)。

现在我们可以构建一个几乎完整的流水线系统:

p = my_pipeline(step0, step1, step2, step3)list(p(["documents/doc_00001.txt"]))

请注意,它以文件名列表作为输入。简单而明了。但还有几个我仍希望看到的东西:

  1. 能够简单地处理过滤器
  2. 能够并行运行流水线以快速处理数据集
  3. 能够可视化流水线

对于这三个添加,我要推荐你使用我基于上述思想开发的picopipe项目。你可以通过 pip 安装它:

pip install picopipe

使用与上面相同的步骤函数运行它:

from picopipe import pipeline, pfilterp = pipeline(step0, step1, step2, step3, pfilter(step4))list(p(["documents/doc_00001.txt"])[0])

这里,pfilter 代表流水线过滤器,你只需将其包裹在step4 函数周围。我对这个设计非常满意。但让我们看看它能跑多快。

首先,让我们获取所有文档的文件名。一个简单的方法是使用 glob

import globdataset = glob.glob("documents/doc_*.txt")

现在我们可以处理所有的文档:

results = list(p(dataset))

在我的笔记本电脑上,处理所有 10,000 个文档大约需要 21 秒。简短而精简!我们能让它运行得更快吗?

当然可以!现在,管道还有一个 n_jobs 参数,用于指示可以并行运行的作业数。下面是一小段代码,它将使用 0 到 9 个线程多次处理数据集。你认为使用 9 个线程并行处理速度会提高多少?

import timex = []y = []for i in range(10):    start = time.time()    results = list(p(dataset, n_jobs=i))    total_time = time.time() - start    x.append(i)    y.append(total_time)

这将需要几分钟时间。绘制结果时间与线程数的图表如下:

绘制分割为一系列并行作业的运行时间的图表。图像由作者创建。

有趣的是,图表的曲线在达到某个水平后趋于平稳,而不是继续随着线程数量的增加而下降。也就是说,使用 9 个线程并不比使用一个线程快 9 倍。为什么呢?不幸的是,你无法违背定律。而定律是存在的: 阿姆达尔定律。它基本上说,你永远不会得到 N 倍的加速,因为有一个无法减少的开销。在这个案例中,我可以将时间从大约 21 秒减少到使用 4 个线程只需 8 秒。仍然不错!

最后,我想要可视化流水线。在这个项目中,我选择尝试 Mermaid 图表格式。它最近得到了很多支持,包括在 github 的仓库中。这种格式非常简单,易于创建。对于 github 渲染,只需将文件命名为 .mmd 扩展名。下面是如何使用 picopipe 生成一个 Mermaid 脚本:

from picopipe import to_mermaidwith open("pipeline.mmd", "w") as fp:    fp.write(to_mermaid(p))

这是在 github 渲染中显示的结果:

Github.com 直接支持 Mermaid 文档文件。图像由作者创建。

不幸的是,GitHub无法展示CSS中定义的鼠标悬停功能。不过,如果您可以设置自己的CSS样式,那么它不仅可以可视化流水线,还可以在您将鼠标悬停在步骤框上时显示步骤代码:

作者提供的Comet自定义面板中显示的一张美人鱼图表。

上述带有鼠标悬停支持的美人鱼图表是使用Comet的自定义面板系统(对所有用户免费)创建的。创建一个显示美人鱼文件的自定义面板非常容易。以下是上述美人鱼图表的实时渲染演示:comet.com/dsblank/picopipe/a4c044c1657b464087ec44f67ae22709

以上是我们对开发世界上最小的数据流水线框架以及探索其并行化和可视化的探索。您可以在这里找到所有的代码:github.com/dsblank/picopipe希望您觉得这里提出的想法和最终模块都很有用。

对人工智能、机器学习或数据科学感兴趣吗?请鼓掌并关注。Doug是comet.com的研究主管,该公司提供机器学习实验跟踪和模型监控服务。