使用Amazon SageMaker Python SDK从Amazon SageMaker离线特征存储构建适用于机器学习的数据集

Amazon SageMaker Feature Store 是一种专门用于存储和检索特征数据供机器学习(ML)模型使用的服务。Feature Store 提供了一种在线存储,可进行低延迟、高吞吐量的读写,以及一种离线存储,可提供所有历史记录数据的批量访问。Feature Store 处理在线和离线存储之间的数据同步。

由于模型开发是一个迭代的过程,客户经常会查询离线存储并构建各种数据集以进行模型训练。目前,有几种访问离线存储中特征的方法,包括使用 Amazon Athena 运行 SQL 查询或使用 Apache Spark 中的 Spark SQL。然而,这些模式需要编写临时(有时是复杂的)SQL 语句,这并不总是适合数据科学家的角色。

Feature Store 最近扩展了 SageMaker Python SDK,以便更轻松地从离线存储中创建数据集。通过此版本,您可以使用 SDK 中的一组新方法创建数据集,而无需编写 SQL 查询。这些新方法支持常见操作,如时间旅行、过滤重复记录和连接多个特征组,同时确保在某一时刻的准确性。

在本文中,我们演示如何使用 SageMaker Python SDK 构建 ML-ready 数据集,而无需编写任何 SQL 语句。

解决方案概述

为了演示新功能,我们使用两个数据集:潜在客户和网络营销指标。这些数据集可用于构建模型,预测给定营销活动和为该潜在客户捕获的指标的情况下,潜在客户是否会转化为销售。

潜在客户数据包含有关潜在客户的信息,这些客户使用Lead_ProspectID 进行识别。一个潜在客户的特征(例如,LeadSource)可以随时间更新,这会导致该潜在客户的新记录。Lead_EventTime 表示创建每个记录的时间。以下截图显示了此数据的示例。

网络营销指标数据跟踪潜在客户的参与度指标,其中每个潜在客户使用 Web_ProspectID 进行识别。Web_EventTime 表示记录创建的时间。与潜在客户特征组不同,此特征组中每个潜在客户仅有一条记录。以下截图显示了此数据的示例。

我们将介绍 sagemaker-feature-store-offline-sdk.ipynb 笔记本的关键部分,该笔记本演示以下步骤:

  1. 使用特征组创建数据集。
  2. 连接多个特征组。
  3. 基于特定时间戳的一组事件,创建特征组和数据集之间的时序连接。
  4. 检索特定时间范围内的特征历史记录。
  5. 检索特定时间戳的特征。

先决条件

您需要以下先决条件:

  • 一个 AWS 帐户。
  • 一个 SageMaker Jupyter 笔记本实例。从 GitHub 存储库中访问代码并将其上传到您的笔记本实例。
  • 您还可以在 Amazon SageMaker Studio 环境中运行笔记本,这是一个面向 ML 开发的 IDE。您可以使用以下命令通过 Studio 环境内的终端克隆 GitHub 存储库:
git clone https://github.com/aws-samples/amazon-sagemaker-feature-store-offline-queries.git

我们假设已经使用现有的 FeatureGroup.create 方法创建了潜在客户数据的特征组,并且可以使用变量 base_fg 引用。有关特征组的更多信息,请参阅创建特征组。

使用特征组创建数据集

要使用 SageMaker SDK 创建数据集,我们使用 FeatureStore 类,该类包含 create_dataset 方法。此方法接受一个基础特征组,该组可能与其他特征组或 DataFrame 进行连接。我们首先提供潜在客户特征组作为基础,以及一个 Amazon Simple Storage Service(Amazon S3)路径来存储数据集:

from sagemaker.feature_store.feature_store import FeatureStore
feature_store = FeatureStore(sagemaker_session=feature_store_session)
ds1_builder = feature_store.create_dataset (base=base_fg,
output_path=f"s3://{s3_bucket_name}/dataset_query_results",)

create_dataset方法返回一个DatasetBuilder对象,可以用于从一个或多个特征组生成数据集(我们在下一部分中演示)。要创建仅包含leads特征的简单数据集,我们调用to_csv_file方法。这将在Athena中运行一个查询,从离线存储中检索特征,并将结果保存到指定的S3路径。

csv, query = ds1_builder.to_csv_file()
# 显示CSV文件的S3位置
print(f'CSV文件: {csv}')

连接多个特征组

使用SageMaker SDK,可以轻松地连接多个特征组来构建数据集。还可以在现有的Pandas DataFrame与单个或多个特征组之间执行连接操作。基本特征组是连接的重要概念。基本特征组是具有其他特征组或已连接到它的Pandas DataFrame的特征组。

在使用create_dataset函数创建数据集时,我们使用with_feature_group方法,该方法使用记录标识符和基本特征组中的目标特征名称在基本特征组和另一个特征组之间执行内部连接。在我们的示例中,基本特征组是leads特征组,目标特征组是Web营销特征组。 with_feature_group方法接受以下参数:

  • feature_group – 这是我们要连接的特征组。在我们的代码示例中,通过使用Web营销数据集创建目标特征组。
  • target_feature_name_in_base – 我们在连接中使用作为键的基本特征组中的特征名称。我们使用Lead_ProspectID作为基本特征组的记录标识符。
  • included_feature_names – 这是基本特征组的特征名称列表。我们使用此字段指定要包含在数据集中的特征。

以下代码显示了通过将基本特征组与目标特征组连接来创建数据集的示例:

join_builder = feature_store.create_dataset(base=base_fg, 
output_path=f"s3://{s3_bucket_name}/dataset_query_results").with_feature_group(
feature_group=target_fg,
target_feature_name_in_base="Lead_ProspectID",
included_feature_names=["Web_ProspectID",
"LastCampaignActivity","PageViewsPerVisit",
"TotalTimeOnWebsite","TotalWebVisits",
"AttendedMarketingEvent","OrganicSearch",
"ViewedAdvertisement",],)

您可以通过在前面的代码示例末尾添加with_feature_group方法并定义新特征组的必要参数来扩展连接操作,以包括多个特征组。您还可以通过将基础定义为现有的Pandas DataFrame并与感兴趣的特征组连接来执行连接操作。以下代码示例显示如何使用现有的Pandas DataFrame和现有的特征组创建数据集:

ds2_builder = feature_store.create_dataset(
base=new_records_df2, # Pandas DataFrame
event_time_identifier_feature_name="Lead_EventTime",
record_identifier_feature_name="Lead_ProspectID",
output_path=f"s3://{s3_bucket_name}/dataset_query_results",).with_feature_group(
base_fg, "Lead_ProspectID", ["LeadSource"])

有关这些各种配置的更多示例,请参阅从特征组创建数据集。

创建时间点连接

这个增强功能最强大的能力之一是可以简单地执行时间点连接,而无需编写复杂的SQL代码。在构建ML模型时,数据科学家需要避免数据泄漏或目标泄漏,即意外使用在模型训练期间不可用的数据进行模型训练。例如,如果我们正在尝试预测信用卡欺诈,我们应该排除在我们正在预测的欺诈行为之后到达的交易,否则训练的模型可能会使用这些欺诈后的信息来更改模型,使其更不容易推广。

检索准确的时间点特征数据需要您提供一个实体 DataFrame,该 DataFrame 提供一组记录 ID(或主键)和对应的事件时间作为事件的截止时间。这种检索机制有时被称为行级时间旅行,因为它允许为每个行键应用不同的时间约束。为了使用 SageMaker SDK 进行准确的时间点连接,我们使用 Dataset Builder 类,并将实体 DataFrame 作为构造函数的基本参数。

在下面的代码中,我们创建了一个简单的实体 DataFrame,其中包含两条记录。我们设置事件时间,用于指示截止时间,在时间序列数据的中间位置(2023年1月中旬)附近:

# 创建事件(实体表)DataFrame 以传递时间戳以进行准确的时间点连接
events = [['2023-01-20T00:00:00Z', record_id1],
['2023-01-15T00:00:00Z', record_id2]]
df_events = pd.DataFrame(events, columns=['Event_Time', 'Lead_ProspectID'])

当我们使用 point_in_time_accurate_join 函数与 create_dataset 调用一起使用时,内部查询排除所有时间戳晚于提供的截止时间的记录,返回在事件发生时可用的最新特征值:

# 使用准确的时间点连接函数创建 Dataset Builder

pit_builder = feature_store.create_dataset(
base=df_events,
event_time_identifier_feature_name='Event_Time',
record_identifier_feature_name='Lead_ProspectID',
output_path=f"s3://{s3_bucket_name}/{s3_prefix}/dataset_query_results"
).with_feature_group(base_fg, "Lead_ProspectID"
).point_in_time_accurate_join(
).with_number_of_recent_records_by_record_identifier(1)

请注意,由准确的时间点连接返回的 DataFrame 中只有两条记录。这是因为我们在实体 DataFrame 中只提交了两个记录 ID,分别是我们要检索的每个 Lead_ProspectID 的一个。时间点条件指定记录的事件时间(存储在 Lead_Eventtime 字段中)必须包含小于截止时间的值。

此外,我们指示查询仅检索符合此条件的最新记录,因为我们应用了 with_number_of_recent_records_by_record_identifier 方法。当与 point_in_time_accurate_join 方法一起使用时,这允许调用者指定要从满足准确的时间点连接条件的记录中返回多少条记录。

将准确的时间点连接结果与 Athena 查询结果进行比较

为了验证 SageMaker SDK point_in_time_accurate_join 函数返回的输出,我们将其与 Athena 查询结果进行比较。首先,我们使用 SELECT 语句创建一个标准的 Athena 查询,该查询与 Feature Store 运行时创建的特定表相关联。可以通过在从 FeatureGroup API 实例化的 athena_query 后引用 table_name 字段来找到此表名称:

SELECT * FROM "sagemaker_featurestore"."off_sdk_fg_lead_1682348629" 
WHERE "off_sdk_fg_lead_1682348629"."Lead_ProspectID" = '5e84c78f-6438-4d91-aa96-b492f7e91029'

Athena 查询不包含任何准确的时间点连接语义,因此它返回与指定的记录 ID(Lead_ProspectID)匹配的所有记录。

接下来,我们使用 Pandas 库按事件时间对 Athena 结果进行排序,以便进行比较。时间戳晚于提交给 point_in_time_accurate_join 的实体 DataFrame 中指定的事件时间(例如,2023-01-15T00:00:00Z)的记录不会出现在准确的时间点结果中。因为我们还指定了我们只想要前面的 create_dataset 代码中的单个记录,所以我们只得到截止时间之前的最新记录。通过将 SageMaker SDK 的结果与 Athena 查询结果进行比较,我们可以看到准确的时间点连接函数返回了正确的记录。

因此,我们有信心使用SageMaker SDK执行行级时间旅行并避免目标泄漏。此外,该功能适用于刷新时间完全不同的多个特征组。

检索特定时间范围内的特征历史记录

我们还想演示在连接特征组以形成数据集时指定时间范围窗口的使用。时间窗口使用with_event_time_range定义,该函数接受两个输入starting_timestampending_timestamp,并返回数据集构建器对象。在我们的代码示例中,我们将检索时间窗口设置为从2022-07-01 00:00:002022-07-02 00:00:00的一整天。

以下代码显示了如何在将基础特征组与目标特征组连接时创建具有指定事件时间窗口的数据集:

# 设置事件时间窗口:Unix时间戳秒
# 从2022年7月1日开始,并将时间窗口设置为一天
start_ts = 1656633600
time_window = 86400
# 使用数据集中硬编码的时间戳,然后添加时间窗口
datetime_start = datetime.fromtimestamp(start_ts)
datetime_end = datetime.fromtimestamp(start_ts+time_window)
print(f'设置检索时间窗口:{datetime_start}到{datetime_end}')
time_window_builder = (feature_store.create_dataset(
base=base_fg, output_path=f"s3://{s3_bucket_name}/dataset_query_results").with_feature_group(
feature_group=target_fg,
target_feature_name_in_base="Lead_ProspectID",
included_feature_names=["Web_ProspectID","LastCampaignActivity","PageViewsPerVisit",
"TotalTimeOnWebsite","TotalWebVisits","AttendedMarketingEvent",
"OrganicSearch","ViewedAdvertisement",],)
.with_event_time_range(starting_timestamp=datetime_start,ending_timestamp=datetime_end))

我们还通过使用with_event_time_range导出到Pandas DataFrame并显示数据,确认了使用with_event_time_range创建的数据集大小之间的差异。请注意,结果集仅有原始10,020条记录的一小部分,因为它仅检索其event_time在1天时间段内的记录。

按特定时间戳检索特征

DatasetBuilder as_of方法从数据集中检索满足基于时间戳的约束的特征,调用方将其作为函数参数提供。此机制对于重新运行以前收集的数据的实验、回测时间序列模型或从离线存储的先前状态构建数据集以进行数据审计等场景非常有用。此功能有时被称为时间旅行,因为它实质上将数据存储回滚到较早的日期和时间。此时间约束也称为截止时间戳。

在我们的示例代码中,我们首先通过读取Feature Store中写入的最后一条记录的write_time值(使用put_record写入的记录)来创建截止时间戳。然后,我们将此截止时间戳作为参数提供给DatasetBuilderas_of方法:

# 使用as-of时间戳创建数据集
print(f'使用截止时间:{asof_cutoff_datetime}')
as_of_builder = feature_store.create_dataset(
base=base_fg,
output_path=f"s3://{s3_bucket_name}/{s3_prefix}/dataset_query_results").with_feature_group(
feature_group=target_fg,
target_feature_name_in_base='Lead_ProspectID',
included_feature_names=['Web_ProspectID','Web_EventTime',
'TotalWebVisits']).as_of(asof_cutoff_datetime)

需要注意的是,as_of方法将时间约束应用于内部的write_time字段,该字段由Feature Store自动生成。write_time字段表示记录写入数据存储的实际时间戳。这与其他方法(例如point-in-time-accurate-joinwith_event_time_range)不同,这些方法使用客户提供的event_time字段作为比较器。

清理

请务必删除此示例创建的所有资源,以避免产生持续的费用。这包括特征组和包含离线存储数据的S3存储桶。

SageMaker Python SDK体验 vs. 编写SQL

SageMaker Python SDK中的新方法使您能够快速创建数据集,并在ML生命周期中快速转向训练步骤。为了展示可以节省的时间和精力,让我们来看一个用例,在此用例中,我们需要在检索指定时间范围内的特征的同时加入两个特征组。以下图比较了离线特征存储中Python查询和用于创建Python查询背后数据集的SQL。

您可以看到,加入两个特征组的相同操作需要您创建一个长而复杂的SQL查询,而可以使用SageMaker Python SDK中的with_feature_groupwith_event_time_range方法来完成。

结论

Python SageMaker SDK中的新离线存储方法使您可以查询离线特征,而无需编写复杂的SQL语句。这为习惯于在模型开发期间编写Python代码的客户提供了无缝的体验。有关特征组的更多信息,请参阅“从您的特征组创建数据集”和“特征存储API:特征组”。

此帖子中的完整示例可在GitHub存储库中找到。试试并在评论中告诉我们您的反馈。