使用Amazon SageMaker特征存储特征处理器解锁机器学习洞察力
使用Amazon SageMaker特征存储特征处理器解锁机器学习洞察力 Condensed result 使用Amazon SageMaker特征存储特征处理器解锁机器学习洞察力
Amazon SageMaker特征存储提供了一种端到端的解决方案,用于自动化机器学习(ML)的特征工程。对于许多ML用例,原始数据(如日志文件、传感器读数或交易记录)需要转化为适用于模型训练的有意义的特征。
特征质量对于确保高度准确的ML模型至关重要。通常需要使用聚合、编码、归一化和其他操作将原始数据转化为特征,并且可能需要付出很大的努力。工程师必须手动使用Python或Spark为每个用例编写自定义数据预处理和聚合逻辑。
这种重复性、繁琐且容易出错的工作是冗余的。SageMaker特征存储特征处理器通过自动将原始数据转化为适用于批量训练ML模型的聚合特征来减轻这种负担。它允许工程师提供简单的数据转换函数,然后在Spark上扩展运行它们并管理底层基础设施。这使得数据科学家和数据工程师能够专注于特征工程逻辑而不是实现细节。
在本文中,我们演示了一家汽车销售公司如何使用特征处理器将原始销售交易数据转化为特征的三个步骤:
- 使用Amazon SageMaker和Hugging Face,Fetch将ML处理延迟减少了50%
- 火箭金钱 x Hugging Face:扩展生产中的波动机器学习模型
- 如何判断你的人工智能是否有意识
- 本地运行数据转换。
- 使用Spark进行规模化远程运行。
- 通过管道进行操作。
我们展示了SageMaker特征存储如何接收原始数据,使用Spark远程运行特征转换,并将结果聚合的特征加载到特征组中。然后可以使用这些特征来训练ML模型。
对于这个用例,我们看到SageMaker特征存储如何将原始汽车销售数据转化为结构化特征。这些特征随后用于获取以下见解:
- 2010年红色敞篷车的平均价格和最高价格
- 里程数与价格之间的最佳车型
- 多年来新车和二手车的销售趋势
- 不同位置的平均建议零售价(MSRP)的差异
我们还可以看到SageMaker特征存储管道如何在有新数据时保持特征的更新,使公司能够持续获得见解。
解决方案概述
我们使用数据集car_data.csv
进行工作,该数据集包含公司销售的二手车和新车的型号、年份、状态、里程、价格和建议零售价(MSRP)等规格。下面的截图显示了数据集的示例。
解决方案笔记本feature_processor.ipynb
包含了以下主要步骤,我们将在本文中进行解释:
- 创建两个特征组:一个名为
car-data
用于原始汽车销售记录,另一个名为car-data-aggregated
用于聚合汽车销售记录。 - 使用
@feature_processor
装饰器从Amazon Simple Storage Service(Amazon S3)将数据加载到car-data特征组中。 - 远程运行
@feature_processor code
作为Spark应用程序来聚合数据。 - 通过SageMaker管道将特征处理器进行操作化并安排运行。
- 在Amazon SageMaker Studio中探索特征处理管道和血统。
- 使用聚合特征来训练ML模型。
先决条件
要按照本教程进行操作,您需要以下内容:
- AWS帐户。
- 设置SageMaker Studio。
- AWS Identity and Access Management(IAM)权限。在创建此IAM角色时,遵循授予最低特权访问的最佳实践。
对于本文,我们参考以下笔记本,该笔记本演示了如何使用SageMaker Python SDK开始使用特征处理器。
创建特征组
要创建特征组,请完成以下步骤:
-
按照以下方式创建
car-data
的特征组定义:# Feature Group - Car Sales CAR_SALES_FG_NAME = "car-data" CAR_SALES_FG_ARN = f"arn:aws:sagemaker:{region}:{aws_account_id}:feature-group/{CAR_SALES_FG_NAME}" CAR_SALES_FG_ROLE_ARN = offline_store_role CAR_SALES_FG_OFFLINE_STORE_S3_URI = f"s3://{s3_bucket}/{s3_offline_store_prefix}" CAR_SALES_FG_FEATURE_DEFINITIONS = [ FeatureDefinition(feature_name="id", feature_type=FeatureTypeEnum.STRING), FeatureDefinition(feature_name="model", feature_type=FeatureTypeEnum.STRING), FeatureDefinition(feature_name="model_year", feature_type=FeatureTypeEnum.STRING), FeatureDefinition(feature_name="status", feature_type=FeatureTypeEnum.STRING), FeatureDefinition(feature_name="mileage", feature_type=FeatureTypeEnum.STRING), FeatureDefinition(feature_name="price", feature_type=FeatureTypeEnum.STRING), FeatureDefinition(feature_name="msrp", feature_type=FeatureTypeEnum.STRING), FeatureDefinition(feature_name="ingest_time", feature_type=FeatureTypeEnum.FRACTIONAL), ]
特征对应于car_data.csv
数据集中的每一列(Model
,Year
,Status
,Mileage
,Price
和MSRP
)。
- 将记录标识符
id
和事件时间ingest_time
添加到特征组中:
CAR_SALES_FG_RECORD_IDENTIFIER_NAME = "id"
CAR_SALES_FG_EVENT_TIME_FEATURE_NAME = "ingest_time"
- 创建
car-data-aggregated
特征组定义如下:
# 特征组 - 聚合汽车销售AGG_CAR_SALES_FG_NAME = "car-data-aggregated"
AGG_CAR_SALES_FG_ARN = (
f"arn:aws:sagemaker:{region}:{aws_account_id}:feature-group/{AGG_CAR_SALES_FG_NAME}"
)
AGG_CAR_SALES_FG_ROLE_ARN = offline_store_role
AGG_CAR_SALES_FG_OFFLINE_STORE_S3_URI = f"s3://{s3_bucket}/{s3_offline_store_prefix}"
AGG_CAR_SALES_FG_FEATURE_DEFINITIONS = [
FeatureDefinition(feature_name="model_year_status", feature_type=FeatureTypeEnum.STRING),
FeatureDefinition(feature_name="avg_mileage", feature_type=FeatureTypeEnum.STRING),
FeatureDefinition(feature_name="max_mileage", feature_type=FeatureTypeEnum.STRING),
FeatureDefinition(feature_name="avg_price", feature_type=FeatureTypeEnum.STRING),
FeatureDefinition(feature_name="max_price", feature_type=FeatureTypeEnum.STRING),
FeatureDefinition(feature_name="avg_msrp", feature_type=FeatureTypeEnum.STRING),
FeatureDefinition(feature_name="max_msrp", feature_type=FeatureTypeEnum.STRING),
FeatureDefinition(feature_name="ingest_time", feature_type=FeatureTypeEnum.FRACTIONAL),
]
对于聚合特征组,特征是车型年份状态、平均里程、最大里程、平均价格、最大价格、平均MSRP、最大MSRP和事件时间。我们将记录标识符model_year_status
和事件时间ingest_time
添加到该特征组中。
- 现在,创建
car-data
特征组:
# 创建特征组 - 汽车销售记录。
car_sales_fg = FeatureGroup(
name=CAR_SALES_FG_NAME,
feature_definitions=CAR_SALES_FG_FEATURE_DEFINITIONS,
sagemaker_session=sagemaker_session,
)
create_car_sales_fg_resp = car_sales_fg.create(
record_identifier_name=CAR_SALES_FG_RECORD_IDENTIFIER_NAME,
event_time_feature_name=CAR_SALES_FG_EVENT_TIME_FEATURE_NAME,
s3_uri=CAR_SALES_FG_OFFLINE_STORE_S3_URI,
enable_online_store=True,
role_arn=CAR_SALES_FG_ROLE_ARN,
)
- 创建
car-data-aggregated
特征组:
# 创建特征组 - 聚合汽车销售记录。
agg_car_sales_fg = FeatureGroup(
name=AGG_CAR_SALES_FG_NAME,
feature_definitions=AGG_CAR_SALES_FG_FEATURE_DEFINITIONS,
sagemaker_session=sagemaker_session,
)
create_agg_car_sales_fg_resp = agg_car_sales_fg.create(
record_identifier_name=AGG_CAR_SALES_FG_RECORD_IDENTIFIER_NAME,
event_time_feature_name=AGG_CAR_SALES_FG_EVENT_TIME_FEATURE_NAME,
s3_uri=AGG_CAR_SALES_FG_OFFLINE_STORE_S3_URI,
enable_online_store=True,
role_arn=AGG_CAR_SALES_FG_ROLE_ARN,
)
您可以在SageMaker工作室主页的Data下拉菜单中选择SageMaker特征存储选项,以查看特征组。
使用 @feature_processor 装饰器加载数据
在本节中,我们使用特征存储特征处理器将来自Amazon S3的原始输入数据(car_data.csv)在本地转换为 car-data 特征组。这个初始的本地运行允许我们在远程运行之前进行开发和迭代,并且如果需要更快的迭代,可以对数据进行抽样处理。
使用 @feature_processor 装饰器,您的转换函数在一个 Spark 运行时环境中运行,其中提供给函数的输入参数和其返回值都是 Spark DataFrames。
- 使用以下命令安装 SageMaker Python SDK 的 Feature Processor SDK 及其额外组件:
pip install sagemaker[feature-processor]
您的转换函数中的输入参数数量必须与 @feature_processor 装饰器中配置的输入数量相匹配。在本例中,@feature_processor 装饰器的输入为 car-data.csv,输出为 car-data 特征组,表示这是一个批量操作,其中的 target_store 为 OfflineStore:
from sagemaker.feature_store.feature_processor import (
feature_processor,
FeatureGroupDataSource,
CSVDataSource,
)
@feature_processor(
inputs=[CSVDataSource(RAW_CAR_SALES_S3_URI)],
output=CAR_SALES_FG_ARN,
target_stores=["OfflineStore"],
)
- 定义 transform() 函数以转换数据。该函数执行以下操作:
- 将列名转换为小写。
- 将事件时间添加到 ingest_time 列。
- 删除标点符号并用 NA 替换缺失值。
def transform(raw_s3_data_as_df):
"""从 S3 加载数据,执行基本特征工程,将其存储在特征组中"""
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import lit
import time
transformed_df = (
raw_s3_data_as_df.withColumn("Price", regexp_replace("Price", "\$", ""))
# 重命名列
.withColumnRenamed("Id", "id")
.withColumnRenamed("Model", "model")
.withColumnRenamed("Year", "model_year")
.withColumnRenamed("Status", "status")
.withColumnRenamed("Mileage", "mileage")
.withColumnRenamed("Price", "price")
.withColumnRenamed("MSRP", "msrp")
# 添加事件时间
.withColumn("ingest_time", lit(int(time.time())))
# 删除标点符号和冗余信息,用 NA 替换
.withColumn("mileage", regexp_replace("mileage", "(,)|(mi\.)", ""))
.withColumn("mileage", regexp_replace("mileage", "Not available", "NA"))
.withColumn("price", regexp_replace("price", ",", ""))
.withColumn("msrp", regexp_replace("msrp", "(^MSRP\s\\$)|(,)", ""))
.withColumn("msrp", regexp_replace("msrp", "Not specified", "NA"))
.withColumn("msrp", regexp_replace("msrp", "\\$\d+[a-zA-Z\s]+", "NA"))
.withColumn("model", regexp_replace("model", "^\d\d\d\d\s", ""))
)
- 调用 transform() 函数将数据存储在 car-data 特征组中:
# 执行 FeatureProcessor
transform()
输出显示数据已成功摄入 car-data 特征组。
transform_df.show() 函数的输出如下:
INFO:sagemaker:正在将转换后的数据摄入到 arn:aws:sagemaker:us-west-2:416578662734:feature-group/car-data,目标存储:['OfflineStore']
+---+--------------------+----------+------+-------+--------+-----+-----------+
| id| model|model_year|status|mileage| price| msrp|ingest_time|
+---+--------------------+----------+------+-------+--------+-----+-----------+
| 0| Acura TLX A-Spec| 2022| New| NA|49445.00|49445| 1686627154|
| 1| Acura RDX A-Spec| 2023| New| NA|50895.00| NA| 1686627154|
| 2| Acura TLX Type S| 2023| New| NA|57745.00| NA| 1686627154|
| 3| Acura TLX Type S| 2023| New| NA|57545.00| NA| 1686627154|
| 4|Acura MDX Sport H...| 2019| Used| 32675 |40990.00| NA| 1686627154|
| 5| Acura TLX A-Spec| 2023| New| NA|50195.00|50195| 1686627154|
| 6| Acura TLX A-Spec| 2023| New| NA|50195.00|50195| 1686627154|
| 7| Acura TLX Type S| 2023| New| NA|57745.00| NA| 1686627154|
| 8| Acura TLX A-Spec| 2023| New| NA|47995.00| NA| 1686627154|
| 9| Acura TLX A-Spec| 2022| New| NA|49545.00| NA| 1686627154|
| 10|Acura Integra w/A...| 2023| New| NA|36895.00|36895| 1686627154|
| 11| Acura TLX A-Spec| 2023| New| NA|48395.00|48395| 1686627154|
| 12|Acura MDX Type S ...| 2023| New| NA|75590.00| NA| 1686627154|
| 13|Acura RDX A-Spec ...| 2023| New| NA|55345.00| NA| 1686627154|
| 14| Acura TLX A-Spec| 2023| New| NA|50195.00|50195| 1686627154|
| 15|Acura RDX A-Spec ...| 2023| New| NA|55045.00| NA| 1686627154|
| 16| Acura TLX Type S| 2023| New| NA|56445.00| NA| 1686627154|
| 17| Acura TLX A-Spec| 2023| New| NA|47495.00|47495| 1686627154|
| 18| Acura TLX Advance| 2023| New| NA|52245.00|52245| 1686627154|
| 19| Acura TLX A-Spec| 2023| New| NA|50595.00|50595| 1686627154|
+---+--------------------+----------+------+-------+--------+-----+-----------+
仅显示前 20 行
我们已成功转换输入数据并将其摄取到car-data
特征组中。
远程运行@feature_processor代码
在本节中,我们演示了如何使用之前描述的@remote
装饰器将特征处理代码远程运行为Spark应用程序。我们使用Spark远程运行特征处理,以适应大型数据集。Spark提供了在集群上进行分布式处理的能力,以处理单台机器无法处理的大型数据。@remote
装饰器将本地Python代码作为单节点或多节点SageMaker训练作业运行。
- 使用以下方式将
@remote
装饰器与@feature_processor
装饰器一起使用:
@remote(spark_config=SparkConfig(), instance_type = "ml.m5.xlarge", ...)
@feature_processor(inputs=[FeatureGroupDataSource(CAR_SALES_FG_ARN)],
output=AGG_CAR_SALES_FG_ARN, target_stores=["OfflineStore"], enable_ingestion=False )
spark_config
参数指示此代码作为Spark应用程序
运行。SparkConfig实例用于配置Spark配置和依赖项。
- 定义
aggregate()
函数,使用PySpark SQL和用户定义函数(UDFs)对数据进行聚合。此函数执行以下操作:- 将
model
、year
和status
连接起来,创建model_year_status
。 - 取
price
的平均值,创建avg_price
。 - 取
price
的最大值,创建max_price
。 - 取
mileage
的平均值,创建avg_mileage
。 - 取
mileage
的最大值,创建max_mileage
。 - 取
msrp
的平均值,创建avg_msrp
。 - 取
msrp
的最大值,创建max_msrp
。 - 按
model_year_status
分组。
- 将
def aggregate(source_feature_group, spark):
"""
使用SQL查询和UDF对数据进行聚合。
"""
import time
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
@udf(returnType=StringType())
def custom_concat(*cols, delimeter: str = ""):
return delimeter.join(cols)
spark.udf.register("custom_concat", custom_concat)
# 执行SQL字符串。
source_feature_group.createOrReplaceTempView("car_data")
aggregated_car_data = spark.sql(
f"""
SELECT
custom_concat(model, "_", model_year, "_", status) as model_year_status,
AVG(price) as avg_price,
MAX(price) as max_price,
AVG(mileage) as avg_mileage,
MAX(mileage) as max_mileage,
AVG(msrp) as avg_msrp,
MAX(msrp) as max_msrp,
"{int(time.time())}" as ingest_time
FROM car_data
GROUP BY model_year_status
"""
)
aggregated_car_data.show()
return aggregated_car_data
- 运行
aggregate()
函数,创建一个SageMaker训练作业来运行Spark应用程序:
# 执行aggregate函数
aggregate()
结果是,SageMaker创建了一个训练作业,用于运行之前定义的Spark应用程序。它将使用sagemaker-spark-processing image
创建一个Spark运行时环境。
我们在这里使用SageMaker训练作业来运行我们的Spark特征处理应用程序。使用SageMaker训练,通过使用暖池技术,可以将启动时间减少到1分钟或更短,而此技术在SageMaker处理中无法使用。这使得SageMaker训练更适合于启动时间重要的短批处理作业,如特征处理。
- 要查看详细信息,请在SageMaker控制台上,在导航窗格中选择训练下的训练作业,然后选择名称为
aggregate-<timestamp>
的作业。
aggregate()函数的输出会生成遥测代码。在输出中,您将看到如下聚合数据:
+--------------------+------------------+---------+------------------+-----------+--------+--------+-----------+
| model_year_status| avg_price|max_price| avg_mileage|max_mileage|avg_msrp|max_msrp|ingest_time|
+--------------------+------------------+---------+------------------+-----------+--------+--------+-----------+
|Acura CL 3.0_1997...| 7950.0| 7950.00| 100934.0| 100934 | null| NA| 1686634807|
|Acura CL 3.2 Type...| 6795.0| 7591.00| 118692.5| 135760 | null| NA| 1686634807|
|Acura CL 3_1998_Used| 9899.0| 9899.00| 63000.0| 63000 | null| NA| 1686634807|
|Acura ILX 2.0L Te...| 14014.125| 18995.00| 95534.875| 89103 | null| NA| 1686634807|
|Acura ILX 2.0L Te...| 15008.2| 16998.00| 94935.0| 88449 | null| NA| 1686634807|
|Acura ILX 2.0L Te...| 16394.6| 19985.00| 97719.4| 80000 | null| NA| 1686634807|
|Acura ILX 2.0L w/...|14567.181818181818| 16999.00| 96624.72727272728| 98919 | null| NA| 1686634807|
|Acura ILX 2.0L w/...| 16673.4| 18995.00| 84848.6| 96637 | null| NA| 1686634807|
|Acura ILX 2.0L w/...|12580.333333333334| 14546.00|100207.33333333333| 95782 | null| NA| 1686634807|
|Acura ILX 2.0L_20...| 14565.375| 17590.00| 92941.125| 81842 | null| NA| 1686634807|
|Acura ILX 2.0L_20...| 14877.9| 9995.00| 99739.5| 89252 | null| NA| 1686634807|
|Acura ILX 2.0L_20...| 15659.5| 15660.00| 82136.0| 89942 | null| NA| 1686634807|
|Acura ILX 2.0L_20...|17121.785714285714| 20990.00| 78278.14285714286| 96067 | null| NA| 1686634807|
|Acura ILX 2.4L (A...| 17846.0| 21995.00| 101558.0| 85974 | null| NA| 1686634807|
|Acura ILX 2.4L Pr...| 16327.0| 16995.00| 85238.0| 95356 | null| NA| 1686634807|
|Acura ILX 2.4L w/...| 12846.0| 12846.00| 75209.0| 75209 | null| NA| 1686634807|
|Acura ILX 2.4L_20...| 18998.0| 18998.00| 51002.0| 51002 | null| NA| 1686634807|
|Acura ILX 2.4L_20...|17908.615384615383| 19316.00| 74325.38461538461| 89116 | null| NA| 1686634807|
|Acura ILX 4DR SDN...| 18995.0| 18995.00| 37017.0| 37017 | null| NA| 1686634807|
|Acura ILX 8-SPD_2...| 24995.0| 24995.00| 22334.0| 22334 | null| NA| 1686634807|
+--------------------+------------------+---------+------------------+-----------+--------+--------+-----------+
only showing top 20 rows
当训练作业完成后,您应该会看到以下输出:
06-13 05:40 smspark-submit INFO 提交Spark成功。主节点退出。
训练秒数:153
计费秒数:153
通过SageMaker管道将特征处理器投入运营
在本节中,我们将演示如何通过将特征处理器提升为SageMaker管道并安排运行来将其投入运营。
- 首先,将包含特征处理逻辑的transformation_code.py文件上传到Amazon S3:
car_data_s3_uri = s3_path_join("s3://", sagemaker_session.default_bucket(),
'transformation_code', 'car-data-ingestion.py')
S3Uploader.upload(local_path='car-data-ingestion.py', desired_s3_uri=car_data_s3_uri)
print(car_data_s3_uri)
- 接下来,使用.to_pipeline()函数创建一个名为car_data_pipeline的特征处理器管道:
car_data_pipeline_name = f"{CAR_SALES_FG_NAME}-ingestion-pipeline"
car_data_pipeline_arn = fp.to_pipeline(pipeline_name=car_data_pipeline_name,
step=transform,
transformation_code=TransformationCode(s3_uri=car_data_s3_uri) )
print(f"创建了SageMaker管道:{car_data_pipeline_arn}。")
- 要运行管道,请使用以下代码:
car_data_pipeline_execution_arn = fp.execute(pipeline_name=car_data_pipeline_name)
print(f"启动了一个执行,执行ARN:{car_data_pipeline_execution_arn}")
- 同样,您可以为聚合特征创建一个名为
car_data_aggregated_pipeline
的管道并启动一个运行。 - 将
car_data_aggregated_pipeline
安排每24小时运行一次:
fp.schedule(pipeline_name=car_data_aggregated_pipeline_name,
schedule_expression="rate(24 hours)", state="ENABLED")
print(f"创建了一个调度。")
在输出部分,您将看到管道的ARN、管道执行角色以及调度的详细信息:
{'pipeline_arn': 'arn:aws:sagemaker:us-west-2:416578662734:pipeline/car-data-aggregated-ingestion-pipeline',
'pipeline_execution_role_arn': 'arn:aws:iam::416578662734:role/service-role/AmazonSageMaker-ExecutionRole-20230612T120731',
'schedule_arn': 'arn:aws:scheduler:us-west-2:416578662734:schedule/default/car-data-aggregated-ingestion-pipeline',
'schedule_expression': 'rate(24 hours)',
'schedule_state': 'ENABLED',
'schedule_start_date': '2023-06-13T06:05:17Z',
'schedule_role': 'arn:aws:iam::416578662734:role/service-role/AmazonSageMaker-ExecutionRole-20230612T120731'}
- 要获取此帐户中的所有特征处理器管道,请在特征处理器上使用
list_pipelines()
函数:
fp.list_pipelines()
输出结果如下:
[{'pipeline_name': 'car-data-aggregated-ingestion-pipeline'},
{'pipeline_name': 'car-data-ingestion-pipeline'}]
我们已成功创建了SageMaker特征处理器管道。
探索特征处理管道和ML血统
在SageMaker Studio中完成以下步骤:
- 在SageMaker Studio控制台上,选择主页菜单,然后选择管道。
您应该看到创建了两个流水线: car-data-ingestion-pipeline
和 car-data-aggregated-ingestion-pipeline
。
- 选择
car-data-ingestion-pipeline
。
它会在执行选项卡上显示运行详细信息。
- 要查看由该流水线填充的特征组,请选择 数据 下的 特征存储,然后选择
car-data
。
您将看到我们在之前步骤中创建的两个特征组。
- 选择
car-data
特征组。
您将在 特征 选项卡上看到特征的详细信息。
查看流水线运行
要查看流水线运行,请完成以下步骤:
- 在流水线执行选项卡上,选择
car-data-ingestion-pipeline
。
这将显示所有运行。
- 选择其中一个链接以查看运行的详细信息。
- 要查看血统,请选择血统。
car-data
的完整血统显示输入数据源 car_data.csv
和上游实体。 car-data-aggregated
的血统显示输入的 car-data
特征组。
- 选择 加载特征,然后在
car-data
和car-data-ingestion-pipeline
上选择 查询上游血统,以查看所有上游实体。
完整的car-data
特征组的血统应如下所示的截图。
同样地,car-aggregated-data
特征组的血统应如下所示的截图。
SageMaker Studio提供了一个单一的环境,用于跟踪计划的流水线、查看运行、探索血统和查看特征处理代码。
car-data-aggregated
特征组中的聚合特征,如平均价格、最高价格、平均里程等,提供了对数据性质的洞察。您还可以使用这些特征作为训练模型预测汽车价格或进行其他操作的数据集。然而,本文重点是演示SageMaker Feature Store在特征工程方面的能力,训练模型不在本文的范围之内。
清理
不要忘记清理本文中创建的资源,以避免产生持续的费用。
- 使用
fp.schedule()
方法将计划的流水线禁用,状态参数设置为Disabled
:
# 禁用计划的流水线
fp.schedule(
pipeline_name=car_data_aggregated_pipeline_name,
schedule_expression="rate(24 hours)",
state="DISABLED",
)
- 删除两个特征组:
# 删除特征组
car_sales_fg.delete()
agg_car_sales_fg.delete()
位于S3存储桶和离线特征存储中的数据可能会产生费用,因此应删除它们以避免任何费用。
- 删除S3对象。
- 从特征存储中删除记录。
结论
在本文中,我们演示了一家汽车销售公司如何使用SageMaker Feature Store特征处理器从原始销售数据中获得有价值的洞察,具体包括:
- 使用Spark批量摄取和转换数据
- 通过SageMaker流水线操作化特征工程工作流程
- 提供血统跟踪和单一环境以监视流水线和探索特征
- 准备用于训练ML模型的优化聚合特征
通过遵循这些步骤,该公司能够将以前无法使用的数据转化为结构化特征,然后用这些特征来训练模型以预测汽车价格。SageMaker Feature Store使他们能够专注于特征工程而不是底层基础设施。
我们希望本文能帮助您使用SageMaker Feature Store特征处理器从自己的数据中解锁有价值的ML洞察!
有关更多信息,请参阅Amazon SageMaker Feature Store: Feature Processor Introduction中的特征处理和SageMaker示例。