数据科学的最佳实践,第一部分-测试您的查询
数据科学的最佳实践-测试查询
如何确保我们的查询达到预期效果-以及其他未来的好处。

数据科学领域的基础是数学、统计学和计算机科学。虽然它在过去几十年中取得了很大的发展,但直到过去10-15年,它才在组织中崭露头角,成为一个成熟的角色,并成为技术行业中独立的领域。
作为一个相对年轻的职业,数据科学的最佳实践还没有足够的时间来凝聚和记录。这与相关的软件工程领域形成了鲜明对比,后者更加成熟,并且在知识指南、结构和方法论方面非常丰富,这些都经过了时间的验证。
可以合理地期望数据科学家从与软件工程师的重叠和密切合作中受益,特别是在实践方面。不幸的是,情况往往并非如此,因为许多数据科学家要么不知道这些方法,要么不愿意学习,声称它们要么不相关,要么不属于他们的责任范围。
在这个博客系列中,我想分享一些可以在数据科学家的工作中使用的技巧、窍门和系统方法,以提高我们代码的正确性和稳定性,更好地管理我们的模型,并改善团队合作。
前提
我们从任何使用大数据的人在某个时候都会面临的场景开始,有些人甚至可能每天都会面临:
您正在使用PySpark,并希望从一个大表中提取一些信息。您无法将大量相关数据全部保存在内存中,因此您被迫在查询语言中进行所有的转换、聚合、连接等操作。
您开始编写查询,并且对此感到满意,因为PySpark使得即使查询太复杂以至于无法向其他人解释,也很容易使用Pythonic和优雅的API。即使您决定使用SQL接口,您仍然充满喜悦地输入。
然后,您意识到在groupBy调用中忘记了一个关键列,于是回去修复它。
然后,您意识到一个窗口函数缺少了orderBy子句。
然后,您决定第四行使用的这个魔术数字应该是1.25,而不是1.2。
您不断地在这20-50行查询代码中来回修改,不断地构建最终的查询结构。
然后…您运行查询,结果失败。
您再次遍历您刚刚创建的代码行,努力找出您遗漏的逻辑要求,并逐个进行修复。
最终,查询运行并返回一些结果。
但是…
谁能保证这些结果确实反映了您一直努力获取的结果,并且与您目前心中所持的过程相符呢?
这就是测试的用武之地。
测试?
是的。我们所做的是:
- 手工制作一个小数据集。
- 手动计算我们希望从查询中获得的结果。
- 将我们编写的查询应用于这个小数据集。
- 将查询结果与我们自己的计算进行比较。
如果结果不匹配,我们必须修复一些问题-要么是我们的手动计算有误,要么是查询的执行结果与我们的预期不符。另一方面,如果结果匹配-我们可以继续进行下一步。
现在,我将逐步介绍我在编写这些测试时使用的结构。
设置环境
让我们从创建我们需要在PySpark中使用的环境(也称为夹具)开始。我们可能会运行多个测试用例,所以我们在模块级别设置PySpark会话。否则,我们可能会为每个测试启动和停止一个会话,这会产生不可忽视的开销。
我使用Python内置的unittest,但是如果你或你的团队成员使用pytest、nose或其他任何测试框架,我相信你们会找到一种方法来执行这些操作。
unittest有两个钩子setUpModule和tearDownModule,分别在测试之前和之后运行。我们将使用它们来启动和停止我们的PySpark会话。
# test_pyspark.py
import pysparkimport unittestspark: pyspark.sql.SparkSession | None = Nonedef setUpModule(): global spark spark = get_spark_session('local')def tearDownModule(): global spark if spark is None: return try: spark.stop() finally: spark = None
我喜欢我的会话创建函数可重用,所以这里是它(当需要时,我会填写非本地选项):
# query_pyspark.py
import pysparkdef get_spark_session(scope='local'): if scope == 'local': return ( pyspark.sql.SparkSession.builder .appName('unit-tests') .master('local[4]') ).getOrCreate() else: ... # TODO
如果项目变得更大,我会将该函数放在一个特定于PySpark的工具文件中,但目前让我们保持项目的扁平和简洁。
我们的第一个测试
现在我要测试的第一件事是当运行这个测试时,我是否真的得到了一个会话。这是测试:
# test_pyspark.py
class TestPysparkQueries(unittest.TestCase): def test_session_created(self): self.assertIsNotNone(spark)
你瞧,我运行了这个测试(PyCharm允许你直接从代码中运行测试,如你所见,代码旁边有绿色的“播放”按钮),我得到了一个OK的消息:
创建和测试我们的数据
现在我们可以开始讨论数据了。你应该准备一个小型数据集,你觉得它涵盖了不同情况,并且你仍然可以处理它。就实际大小而言,根据领域和查询的复杂性,我通常会选择20-50行。如果涉及分组,请选择5-10个不同的分组。
为了教学目的,我创建了一个包含姓名和出生日期的数据集。为了简单起见,我假设同姓的人是兄弟姐妹。我还对行的顺序引入了随机性,以防止对顺序敏感的查询直接得到正确答案。数据如下所示:
现在是时候将数据加载到我们的PySpark会话中了。但是首先,让我们为它创建一个健全性测试。顺便说一下,先创建一个测试,然后再编写使测试通过的代码,是测试驱动开发(TDD)方法的一部分,但我不建议数据科学家这样做,只是测试部分。
对于健全性测试,我们可以测试列名,可以测试数据集的大小,可以同时进行测试,或者可以提出更深入的测试。实际上,我们甚至可以编写一个将CSV文件与DataFrame逐行匹配的测试。
在编写测试时,我们越严格,后来我们就越确定代码是正确的,但这也会使将来的更改变得更加困难,例如,如果我们想要添加/更改数据集中的一行以测试特定的边缘情况,会怎么样?
在我们的工作领域,平衡速度和正确性因素是一门艺术而不是科学的一部分,随着时间和实践的增长,这种平衡将更加自然。
# test_pyspark.py
def test_load_data(self): df = get_family_data() self.assertEqual(25, df.count())
接下来,让我们编写加载数据的函数:
# query_pyspark.py
def get_family_data(): return ( get_spark_session(scope='local') .read.csv(os.path.join(os.path.dirname(__file__), '../assets/data_sample.csv')) )
当我运行测试时…它失败了?但是怎么可能呢?
再次计算行数并确保为25后,我将header=True添加到代码中,测试通过了(不用担心,在下一个示例中,我会省略虚假的戏剧效果):
# query_pyspark.py
def get_family_data(): return ( get_spark_session(scope='local') .read.csv(os.path.join(os.path.dirname(__file__), '../assets/data_sample.csv'), header=True) )
测试我们的查询
现在是特定查询的测试时间了。假设我想从每个家庭中获取最年长的孩子。我通过目视检查数据集(或使用排序后的电子表格)找到我期望获取的确切姓名集,并将其硬编码到我的测试中:
# test_pyspark.py
def test_elder_child_query(self): df = get_elder_child(get_family_data()) elders = {_.elder_child for _ in df.toLocalIterator()} self.assertEqual(elders, {'Gus', 'Rita', 'Sam', 'Trent', 'Ursula'})
使测试通过的代码:
# query_pyspark.py
def get_elder_child(family_df: pyspark.sql.DataFrame): return ( family_df .orderBy(f.col('date_born').desc()) .groupby('last_name') .agg(f.first('first_name').alias('elder_child')) )
尽管我省略了戏剧效果,但我必须多次修复查询才能使测试通过。例如,我按first_name分组,聚合了last_name的值,并且忘记了进行降序排序。
在我的工作中,测试多次帮我保住了颜面。
我们完成了吗?绝对不是。
我们应该考虑一些边缘情况,比如如果有双胞胎怎么办?是否有没有孩子的家庭?如果我们的数据不可靠,那么有关空值的情况呢?
对于这些选项中的每一个,我们将转到我们的数据集,更改它以产生这种情况,然后更新我们的测试和代码。
如果我们通过后续出现的错误遇到这些特殊情况(即使我们没有自己提出它们),我们将做同样的操作-更改数据集以反映这些情况,并从那里继续。
我们还应该为其他查询编写测试,而且我们将遇到不同类型的测试。在上面的测试中,我们关心的是结果集合,但是如果我们想要测试一个简单的1:1转换,即f(row) = y,我们需要考虑Spark在行顺序方面的非确定性。
例如,假设我们想要获取数据集中的姓名首字母。
一种选择是对DataFrame进行排序,并在与我们手动创建的列表进行相等性断言时信任此顺序:
# query_pyspark.py
def get_initials_col(): return ( f.concat( f.substring('first_name', 0, 1), f.lit('. '), f.substring('last_name', 0, 1), f.lit('.'), ) ).alias('initials')
# test_pyspark.py
def test_get_initials_col_1_by_1(self): df = ( get_family_data() .withColumn('initials', get_initials_col()) .orderBy('date_born') ) expected_list = ['V. A.', 'W. W.', 'X. M.', 'Y. T.', 'Z. C.', 'I. M.', 'J. T.', 'K. C.', 'L. A.', 'M. W.', 'N. M.', 'O. T.', 'P. C.', 'Q. A.', 'A. A.', 'B. W.', 'C. M.', 'E. T.', 'F. C.', 'G. A.', 'H. W.', 'R. W.', 'S. M.', 'T. T.', 'U. C.'] for expected, actual in zip(expected_list, [_.initials for _ in df.toLocalIterator()]): self.assertEqual(expected, actual)
另一种选择是编写一个本地函数来执行相同的工作,并对其进行充分的测试。然后,在将结果加载到内存后,我们可以将其应用于输入,并编写一个测试来断言每一行的相等性。以下是一个示例:
# query_pyspark.py
def get_initials(first_name, last_name): return f'{first_name[:1]}. {last_name[:1]}.'
# test_pyspark.py
def test_get_initials(self): self.assertEqual('B. H.', get_initials('Bob', 'Hope')) self.assertEqual('C. C.', get_initials('Charlie', 'Chaplin')) self.assertEqual('J. L.', get_initials('Jonathan', 'Livingstone')) def test_get_initials_col_support_function(self): df = ( get_family_data() .withColumn('initials', get_initials_col()) ) for row in df.toLocalIterator(): self.assertEqual(get_initials(row.first_name, row.last_name), row.initials)
这两种选择中,我肯定更喜欢后者,因为它更加灵活,不直接依赖于数据,而是通过支持函数进行测试,而支持函数则没有与数据集的任何耦合。
当然,如果对于查询来说,这个函数的负担不太重,你可能更喜欢将其作为UDF使用,以保持代码的复杂性低。
等等,还有更多?
当然。还有很多不同的情况,比如连接和窗口函数的结果,但是我相信上面的例子足以说明测试是写查询时的一种重要工具和有效的方法选择,即使对于像我这样的数据科学家也是如此。
请注意,我选择演示这个模式在使用PySpark时如何工作,因为它是一种常见的大数据工具,但是这种模式不仅限于PySpark,也不仅限于大数据数据库。实际上,它应该适用于任何数据库。您还可以将此方法用于内存数据库类似的工具,如pandas。
只要您能够:
- 连接到数据源。
- 将数据加载/模拟到数据源中。
- 执行查询。
- 检索和处理查询结果。
您就可以开始了。如果您使用的工具不允许您执行其中一步操作,您可能需要重新考虑是否使用该工具。
而且,你知道吗,测试代码还有一个隐藏的好处。假设您发现其中一个函数在运行时间或内存消耗方面表现不佳,并决定尝试优化或重构它。现在,您可以使用现有的测试来确保您的新代码与之前的代码输出相同。如果没有这些测试,我个人甚至害怕改变一行代码,害怕破坏下游的某些依赖关系。
总结
测试是一种强大而重要的方法,可以确保代码的正确性,并使任何重构更容易管理。
在未来的文章中,我将给出其他在数据科学领域中我认为是好的实践的例子。我们将涉及一些主题,例如如何在同一个模型上合作而不互相干扰,如何管理数据集的版本,如何观察我们的代码在生产环境中的性能等等。
敬请关注。
常见问题
问:等等,什么意思?
答:欢迎在这里或其他地方就本博客系列中提出的概念进行讨论。
问:如果在我的查询中需要测试成千上万行怎么办?
答:编写查询的参数化版本,例如def get_n_smalles_children(family_df, n): …,并将参数设得足够小。另一种选择是以编程方式模拟数据,但这也会带来新的问题和挑战。
问:如果我不断更改查询,那么是否意味着我需要更改测试?
答:理想情况下,您不会随着时间的推移更改查询,但我知道我们领域的探索性质。所以答案是是的。这是您可能会感到写测试时速度降低的原因之一。然而,速度是与准确性/正确性相权衡的。在查询结构更加稳定时,您可以在过程的后期编写测试。
问:如果我不使用PyCharm,如何运行测试?
答:在测试文件的末尾添加下面的魔术行,并使用python test_pyspark.py命令运行它。不要忘记确保代码根目录被包含在PYTHONPATH中,以确保导入工作正常(PyCharm会自动完成这个操作)。
if __name__ == '__main__': unittest.main()
问:如果我不想(或者不能)将我的数据保存为.csv文件怎么办?
答:任何适合您的存储和加载数据的方法都可以,只要保持整洁即可。对于非常小的数据集,我使用字典转为DataFrame(或者如果您愿意,可以将JSON转为DataFrame);对于较大的数据集,我使用永久存储在Hadoop上的表格。
问:你上面给出的示例函数不是很简单吗?
答:是的。这是我教学的方法之一——通过给出简单的示例,并逐渐使它们变得更复杂。不幸的是,本文的页边空间太小,容纳不了后面的部分。
问:你是否在某个代码仓库中有上述代码,以便我可以用作参考?
答:是的,我有。

