使用Pandera对PySpark应用程序进行数据验证
使用Pandera验证PySpark应用程序数据
如果您是数据从业者,您将会意识到数据验证对于确保准确性和一致性具有最重要的意义。当处理大型数据集或来自不同来源的数据时,这变得尤为关键。然而,Pandera Python库可以帮助简化和自动化数据验证过程。Pandera是一个精心设计的开源库,旨在简化架构和数据验证的任务。它建立在pandas的稳健性和多功能性之上,并引入了一个直观且表达力强的API,专门为数据验证目的而设计。
本文简要介绍了Pandera的主要特点,然后详细说明了如何将Pandera数据验证与使用原生PySpark SQL的数据处理工作流集成(自Pandera 0.16.0版本以来)。
Pandera设计用于与其他流行的Python库(如pandas、pyspark.pandas、Dask等)配合使用。这使得将数据验证纳入现有的数据处理工作流程变得容易。不久前,Pandera还不支持PySpark SQL,但为了弥补这一差距,由QuantumBlack团队(由McKinsey的Ismail Negm-PARI、Neeraj Malhotra、Jaskaran Singh Sidana、Kasper Janehag、Oleksandr Lazarchuk以及Pandera创始人Niels Bantilan组成)开发了本地PySpark SQL支持,并将其贡献给Pandera。本文的文字也是由该团队准备的,并以他们的措辞编写。
- 最强大的机器学习模型解析(Transformers, CNNs, RNNs, GANs …)
- 普林斯顿大学的研究人员推出了MeZO:一种内存高效的零阶优化器,可以对大型语言模型(LLMs)进行微调
- 3家医疗机构如何使用生成式人工智能
Pandera的主要特点
如果您不熟悉使用Pandera对数据进行验证,我们建议您阅读Khuyen Tran的“使用Pandera验证pandas DataFrame”的文章,其中描述了基础知识。在这里简要介绍一下主要特点和优点,包括简单直观的API、内置的验证函数和自定义功能。
简单直观的API
Pandera的显著特点之一是其简单直观的API。您可以使用声明性语法定义数据架构,这使得代码易于阅读和理解。这使得编写既高效又有效的数据验证代码变得容易。
以下是Pandera中模式定义的示例:
class InputSchema(pa.DataFrameModel):
year: Series[int] = pa.Field()
month: Series[int] = pa.Field()
day: Series[int] = pa.Field()
内置的验证函数
Pandera提供了一组内置函数(通常称为检查)来执行数据验证。当我们在Pandera模式上调用validate()
时,它将执行架构和数据验证。数据验证将在幕后调用check
函数。
以下是使用Pandera对数据帧对象运行数据check
的简单示例。
class InputSchema(pa.DataFrameModel):
year: Series[int] = pa.Field(gt=2000, coerce=True)
month: Series[int] = pa.Field(ge=1, le=12, coerce=True)
day: Series[int] = pa.Field(ge=0, le=365, coerce=True)
InputSchema.validate(df)
如上所示,对于year
字段,我们定义了一个gt=2000
的检查,要求该字段中的所有值都必须大于2000,否则Pandera将引发验证失败。
以下是默认情况下Pandera提供的所有内置检查的列表:
eq:检查值是否等于给定的文字
ne:检查值是否不等于给定的文字
gt:检查值是否大于给定的文字
ge:检查值是否大于等于给定的文字
lt:检查值是否小于给定的文字
le:检查值是否小于等于给定的文字
in_range:检查值是否在给定范围内
isin:检查值是否在给定的文字列表中
notin:检查值是否不在给定的文字列表中
str_contains:检查值是否包含字符串文字
str_endswith:检查值是否以字符串文字结尾
str_length:检查值长度是否匹配
str_matches:检查值是否与字符串文字匹配
str_startswith:检查值是否以字符串文字开头
自定义验证函数
除了内置的验证检查之外,Pandera还允许您定义自己的自定义验证函数。这使您能够根据用例定义自己的验证规则。
例如,您可以定义一个lambda函数作为数据验证,如下所示:
schema = pa.DataFrameSchema({
"column2": pa.Column(str, [
pa.Check(lambda s: s.str.startswith("value")),
pa.Check(lambda s: s.str.split("_", expand=True).shape[1] == 2)
]),
})
向Pandera添加对PySpark SQL DataFrame的支持
在添加对PySpark SQL的支持过程中,我们遵循了两个基本原则:
- 接口和用户体验的一致性
- 对PySpark的性能优化。
首先,让我们深入探讨一致性的话题,因为从用户的角度来看,无论选择的框架是哪个,他们都需要一套一致的API和接口。由于Pandera提供多个可选择的框架,因此在PySpark SQL API中拥有一致的用户体验更加关键。
有了这个想法,我们可以使用PySpark SQL来定义Pandera模式,如下所示:
from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.types as T
import pandera.pyspark as pa
spark = SparkSession.builder.getOrCreate()
class PanderaSchema(DataFrameModel):
"""测试模式"""
id: T.IntegerType() = Field(gt=5)
product_name: T.StringType() = Field(str_startswith="B")
price: T.DecimalType(20, 5) = Field()
description: T.ArrayType(T.StringType()) = Field()
meta: T.MapType(T.StringType(), T.StringType()) = Field()
data_fail = [
(5, "Bread", 44.4, ["产品描述"], {"产品类别": "乳制品"}),
(15, "Butter", 99.0, ["更多细节"], {"产品类别": "糕点"}),
]
spark_schema = T.StructType(
[
T.StructField("id", T.IntegerType(), False),
T.StructField("product", T.StringType(), False),
T.StructField("price", T.DecimalType(20, 5), False),
T.StructField("description", T.ArrayType(T.StringType(), False), False),
T.StructField(
"meta", T.MapType(T.StringType(), T.StringType(), False), False
),
],
)
df_fail = spark_df(spark, data_fail, spark_schema)
在上面的代码中,PanderaSchema
定义了入站pyspark dataframe的模式。它有5个字段,具有不同的dtypes
,并对id
和product_name
字段执行数据检查。
class PanderaSchema(DataFrameModel):
"""测试模式"""
id: T.IntegerType() = Field(gt=5)
product_name: T.StringType() = Field(str_startswith="B")
price: T.DecimalType(20, 5) = Field()
description: T.ArrayType(T.StringType()) = Field()
meta: T.MapType(T.StringType(), T.StringType()) = Field()
接下来,我们创建了一个虚拟数据,并强制执行了原生的PySpark SQL模式,如spark_schema
中所定义的。
spark_schema = T.StructType(
[
T.StructField("id", T.IntegerType(), False),
T.StructField("product", T.StringType(), False),
T.StructField("price", T.DecimalType(20, 5), False),
T.StructField("description", T.ArrayType(T.StringType(), False), False),
T.StructField(
"meta", T.MapType(T.StringType(), T.StringType(), False), False
),
],
)
df_fail = spark_df(spark, data_fail, spark_schema)
这样做是为了模拟模式和数据验证失败。
这是df_fail
dataframe的内容:
df_fail.show()
+---+-------+--------+--------------------+--------------------+
| id|product| price| description| meta|
+---+-------+--------+--------------------+--------------------+
| 5| 面包|44.40000|[产品描述]|{产品类别...|
| 15| 黄油|99.00000| [更多细节在这里]|{产品类别...|
+---+-------+--------+--------------------+--------------------+
接下来我们可以调用Pandera的validate函数执行模式和数据级别的验证,如下所示:
df_out = PanderaSchema.validate(check_obj=df)
我们稍后将探索df_out
的内容。
PySpark的性能优化
我们的贡献专门针对使用PySpark数据框架时的最佳性能进行设计,这在处理大型数据集时非常关键,以处理PySpark的分布式计算环境的独特挑战。
Pandera利用PySpark的分布式计算架构,在处理大型数据集时能够高效地保持数据的一致性和准确性。我们为PySpark性能重新编写了Pandera的自定义验证函数,以实现更快、更高效的大型数据集验证,同时减少高容量下数据错误和不一致性的风险。
全面的错误报告
我们为Pandera增加了生成详细错误报告的功能,以Python字典对象的形式呈现。这些报告可以通过从validate函数返回的数据框架进行访问。它们提供了所有模式和数据级别验证的综合摘要,根据用户的配置。
这个功能对于开发人员迅速识别和解决任何与数据相关的问题非常有价值。通过使用生成的错误报告,团队可以编制应用程序中模式和数据问题的全面列表。这使他们能够以高效和精确的方式优先处理和解决问题。
需要注意的是,这个功能目前仅适用于PySpark SQL,为用户在Pandera中使用错误报告提供了更好的体验。
在上面的代码示例中,记得我们在Spark数据框架上调用了validate()
:
df_out = PanderaSchema.validate(check_obj=df)
它返回一个数据框架对象。使用访问器,我们可以从中提取出错误报告,如下所示:
print(df_out.pandera.errors)
{
"SCHEMA":{
"COLUMN_NOT_IN_DATAFRAME":[
{
"schema":"PanderaSchema",
"column":"PanderaSchema",
"check":"column_in_dataframe",
"error":"column 'product_name' not in dataframe Row(id=5, product='Bread', price=None, description=['description of product'], meta={'product_category': 'dairy'})"
}
],
"WRONG_DATATYPE":[
{
"schema":"PanderaSchema",
"column":"description",
"check":"dtype('ArrayType(StringType(), True)')",
"error":"expected column 'description' to have type ArrayType(StringType(), True), got ArrayType(StringType(), False)"
},
{
"schema":"PanderaSchema",
"column":"meta",
"check":"dtype('MapType(StringType(), StringType(), True)')",
"error":"expected column 'meta' to have type MapType(StringType(), StringType(), True), got MapType(StringType(), StringType(), False)"
}
]
},
"DATA":{
"DATAFRAME_CHECK":[
{
"schema":"PanderaSchema",
"column":"id",
"check":"greater_than(5)",
"error":"column 'id' with type IntegerType() failed validation greater_than(5)"
}
]
}
}
如上所示,错误报告在一个Python字典对象中聚合了2个级别,可以方便地被下游应用程序使用,比如使用Grafana等工具对错误随时间的趋势进行可视化:
- 验证类型 =
SCHEMA
或DATA
- 错误类别 =
DATAFRAME_CHECK
或WRONG_DATATYPE
等
这种重新组织错误报告的新格式是在0.16.0版本中作为我们的贡献的一部分引入的。
启用/禁用开关
对于依赖于PySpark的应用程序来说,启用/禁用开关是一个重要的功能,可以在灵活性和风险管理方面产生重大影响。具体而言,启用/禁用开关允许团队在生产环境中禁用数据验证,而无需进行代码更改。
这对于性能至关重要的大数据流水线尤为重要。在许多情况下,数据验证可能占用大量的处理时间,这会影响流水线的整体性能。通过启用/禁用开关,团队可以在需要时快速而轻松地禁用数据验证,而无需经过修改代码的耗时过程。
我们的团队将启用/禁用开关引入到Pandera中,使用户可以通过更改配置设置来轻松关闭生产环境中的数据验证。这提供了在开发中优先考虑性能时所需的灵活性,而不会牺牲数据质量或准确性。
要启用验证,请在环境变量中设置以下内容:
export PANDERA_VALIDATION_ENABLED=False
这将被Pandera捕获,以禁用应用程序中的所有验证。默认情况下,启用验证。
目前,此功能仅适用于PySpark SQL从0.16.0版本开始,因为它是我们贡献的一个新概念。
Pandera执行的细粒度控制
除了启用/禁用开关功能外,我们还引入了一种对Pandera验证流程执行的更细粒度控制。这通过引入可配置的设置来实现,允许用户在三个不同的级别上控制执行:
SCHEMA_ONLY
:此设置仅执行模式验证。它检查数据是否符合模式定义,但不执行任何附加的数据级验证。DATA_ONLY
:此设置仅执行数据级验证。它检查数据是否符合定义的约束和规则,但不验证模式。SCHEMA_AND_DATA
:此设置同时执行模式和数据级验证。它检查数据是否符合模式定义、定义的约束和规则。
通过提供这种细粒度的控制,用户可以选择最适合其特定用例的验证级别。例如,如果主要关注确保数据符合定义的模式,可以使用SCHEMA_ONLY
设置来减少总体处理时间。或者,如果已知数据符合模式,并且重点是确保数据质量,可以使用DATA_ONLY
设置来优先进行数据级验证。
对Pandera执行的增强控制使用户能够在精确性和效率之间实现精细调整的平衡,从而实现更有针对性和优化的验证体验。
export PANDERA_VALIDATION_DEPTH=SCHEMA_ONLY
默认情况下,启用验证,并将深度设置为SCHEMA_AND_DATA
,可以根据用例的需要更改为SCHEMA_ONLY
或DATA_ONLY
。
目前,此功能仅适用于PySpark SQL从0.16.0版本开始,因为它是我们贡献的一个新概念。
列和数据框级别的元数据
我们的团队向Pandera添加了一个新功能,允许用户在Field
和Schema / Model
级别存储附加的元数据。此功能旨在允许用户在其模式定义中嵌入上下文信息,以供其他应用程序使用。
例如,通过存储有关特定列的详细信息,如数据类型、格式或单位,开发人员可以确保下游应用程序能够正确解释和使用数据。类似地,通过存储有关哪些列在特定用例中需要的信息,开发人员可以优化数据处理流程,减少存储成本,并提高查询性能。
在模式级别,用户可以存储信息来帮助对整个应用程序中的不同模式进行分类。此元数据可以包括模式的目的、数据的来源或数据的日期范围等详细信息。这对于管理复杂的数据处理工作流非常有用,其中使用多个模式来实现不同的目的,并且需要高效地跟踪和管理。
class PanderaSchema(DataFrameModel):
"""Pandera模式类"""
id: T.IntegerType() = Field(
gt=5,
metadata={"usecase": ["零售定价", "消费者行为"],
"category": "产品定价"},
)
product_name: T.StringType() = Field(str_startswith="B")
price: T.DecimalType(20, 5) = Field()
class Config:
"""Pandera类的配置"""
name = "产品信息"
strict = True
coerce = True
metadata = {"category": "产品详情"}
在上面的示例中,我们在模式对象本身上引入了附加信息。这在两个级别上是允许的:字段和模式。
要提取模式级别上的元数据(包括其中的所有字段),我们提供了以下辅助函数:
PanderaSchema.get_metadata()
输出将是以下字典对象:
{
"产品信息": {
"columns": {
"id": {"usecase": ["零售定价", "消费者行为"],
"category": "产品定价"},
"product_name": None,
"price": None,
},
"dataframe": {"category": "产品详情"},
}
}
目前,此功能是0.16.0中的一个新概念,并已添加到PySpark SQL和Pandas中。
摘要
我们介绍了几个新功能和概念,包括一个开关,允许团队在生产中禁用验证而无需更改代码,对Pandera的验证流程进行细粒度控制,以及在列和数据框级别存储附加元数据的能力。您可以在版本0.16.0的更新Pandera文档中找到更多详细信息。
正如Pandera创始人Niels Bantilan在最近关于Pandera 0.16.0发布的博文中解释的那样:
为了验证Pandera的可扩展性,我们使用新的模式规范和后端API与QuantumBlack团队合作,实现了Pyspark SQL的模式和后端……我们在几个月内完成了一个MVP!
这个对Pandera开源代码库的最新贡献将使与PySpark和其他大数据技术一起工作的团队受益。
以下QuantumBlack的团队成员是这个最新贡献的责任人:Ismail Negm-PARI,Neeraj Malhotra,Jaskaran Singh Sidana,Kasper Janehag,Oleksandr Lazarchuk。我特别要感谢Neeraj在准备本文发表方面的帮助。Jo Stitchbury是一位经验丰富的技术作家。她写作关于数据科学和分析、人工智能和软件行业的内容。