使用Amazon SageMaker Pipelines构建机器学习工作流程的最佳实践和设计模式

使用Amazon SageMaker Pipelines构建机器学习工作流程的最佳实践和设计模式' can be condensed to '使用Amazon SageMaker Pipelines构建机器学习工作流的最佳实践和设计模式

Amazon SageMaker Pipelines是一项完全托管的AWS服务,用于构建和编排机器学习(ML)工作流。SageMaker Pipelines为ML应用程序开发人员提供了编排ML工作流的能力,包括数据加载、数据转换、训练、调优和部署等不同步骤。您可以使用SageMaker Pipelines来编排SageMaker中的ML作业,其与更大的AWS生态系统的集成还允许您使用AWS Lambda函数、Amazon EMR作业等资源。这使您能够为ML工作流中的特定需求构建一个定制和可重现的流水线。

在本文中,我们提供了一些最佳实践,以最大化SageMaker Pipelines的价值,并使开发体验更加无缝。我们还讨论了构建SageMaker Pipelines时的一些常见设计场景和模式,并提供了解决这些场景的示例。

SageMaker Pipelines的最佳实践

在本节中,我们讨论了在使用SageMaker Pipelines设计工作流时可以遵循的一些最佳实践。采用这些实践可以改善开发过程,并简化SageMaker Pipelines的运营管理。

使用Pipeline Session实现流水线的延迟加载

Pipeline Session可以实现流水线资源的延迟初始化(直到流水线运行时才启动任务)。PipelineSession上下文继承了SageMaker Session,并实现了与其他SageMaker实体和资源(如训练作业、终端节点、Amazon Simple Storage Service(Amazon S3)中的输入数据集等)交互的便捷方法。在定义SageMaker Pipelines时,应使用PipelineSession而不是常规的SageMaker Session:

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
sklearn_processor = SKLearnProcessor(
    framework_version=’0.20.0’,
    instance_type=’ml.m5.xlarge’,
    instance_count=1,
    base_job_name="sklearn-abalone-process",
    role=role,
    sagemaker_session=pipeline_session,
)

在本地模式下运行流水线以实现开发过程中的节约成本和快速迭代

您可以使用LocalPipelineSession上下文在本地模式下运行流水线。在此模式下,流水线和作业在本地机器上使用资源运行,而不是使用SageMaker托管资源。本地模式提供了一种使用较小数据子集对流水线代码进行迭代的经济有效方式。在本地测试流水线后,可以扩展为使用PipelineSession上下文运行。

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline_context import LocalPipelineSession
local_pipeline_session = LocalPipelineSession()
role = sagemaker.get_execution_role()
sklearn_processor = SKLearnProcessor(
    framework_version=’0.20.0’,
    instance_type=’ml.m5.xlarge,
    instance_count=1,
    base_job_name="sklearn-abalone-process",
    role=role,
    sagemaker_session=local_pipeline_session,
)

通过版本控制管理SageMaker流水线

对工件和流水线定义进行版本控制是开发生命周期中的常见需求。您可以通过为流水线对象命名唯一的前缀或后缀来创建多个版本的流水线,其中最常见的是时间戳,如下所示:

from sagemaker.workflow.pipeline_context import PipelineSession
import time

current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
pipeline_name = "pipeline_" + current_time
pipeline_session = PipelineSession()
pipeline = Pipeline(
    name=pipeline_name,
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=pipeline_session,
)

通过与SageMaker Experiments集成来组织和跟踪SageMaker流水线运行

SageMaker Pipelines可以轻松与SageMaker Experiments集成,以组织和跟踪流水线运行。这可以通过在创建流水线对象时指定PipelineExperimentConfig来实现。使用此配置对象,可以指定实验名称和试验名称。SageMaker流水线的运行详细信息将组织在指定的实验和试验下。如果您没有显式指定实验名称,则使用流水线名称作为实验名称。类似地,如果您没有显式指定试验名称,则使用流水线运行ID作为试验或运行组名称。请参阅以下代码:

Pipeline(
    name="MyPipeline",
    parameters=[...],
    pipeline_experiment_config=PipelineExperimentConfig(
        experiment_name = ExecutionVariables.PIPELINE_NAME,
        trial_name = ExecutionVariables.PIPELINE_EXECUTION_ID
        ),
    steps=[...]
)

在私有VPC中安全地运行SageMaker管道

为了保护ML工作负载的安全性,最佳实践是在私有VPC、私有子网和安全组内部署由SageMaker管道编排的作业。为了确保和强制使用这个安全环境,您可以为SageMaker执行角色(管道在运行时所假设的角色)实施以下AWS Identity and Access Management (IAM)策略。您还可以添加策略以以网络隔离模式运行由SageMaker管道编排的作业。

# IAM策略以强制在私有VPC中执行

{

    "Action": [

        "sagemaker:CreateProcessingJob",
        "sagemaker:CreateTrainingJob",
        "sagemaker:CreateModel"
    ],

    "Resource": "*",
    "Effect": "Deny",
    "Condition": {
        "Null": {
            "sagemaker:VpcSubnets": "true"
        }
    }
}

# IAM策略以强制在网络隔离模式下执行
{

    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Deny",
            "Action": [
                "sagemaker:Create*"
            ],
            "Resource": "*",
            "Condition": {
                "StringNotEqualsIfExists": {
                    "sagemaker:NetworkIsolation": "true"
                }
            }
        }
    ]
}

有关在这些安全控制下实施管道的示例,请参阅在安全环境中使用Amazon SageMaker编排作业、模型注册和持续部署。

使用标签监视管道运行的成本

单独使用SageMaker管道是免费的;您需要支付作为处理、训练和批量推理等各个管道步骤的一部分而启动的计算和存储资源的费用。为了按照每个管道运行的成本进行汇总,您可以在创建资源的每个管道步骤中包含标签。然后,您可以在成本资源管理器中使用这些标签来过滤和汇总总体管道运行成本,如以下示例所示:

sklearn_processor = SKLearnProcessor(
    framework_version=’0.20.0’,
    instance_type=’ml.m5.xlarge,
    instance_count=1,
    base_job_name="sklearn-abalone-process",
    role=role,
    tags=[{'Key':'pipeline-cost-tag', 'Value':'<<tag_parameter>>'}]
)

step_process = ProcessingStep(
    name="AbaloneProcess",
    processor=sklearn_processor,
    ...
)

现在,您可以通过成本资源管理器按标签获取成本:

response = client.get_cost_and_usage(
    TimePeriod={
        'Start': '2023-07-01',
        'End': '2023-07-15'
        },
    Metrics=['BLENDED_COST','USAGE_QUANTITY','UNBLENDED_COST'],
    Granularity='MONTHLY',
    Filter={
        'Dimensions': {
            'Key':'USAGE_TYPE',
            'Values': [
                ‘SageMaker:Pipeline’
            ]
        },
        'Tags': {
            'Key': 'keyName',
            'Values': [
                'keyValue',
                ]
        }
    }
)

一些常见情况的设计模式

在本节中,我们将讨论一些使用SageMaker管道的常见用例的设计模式。

使用Lambda步骤运行轻量级Python函数

Python函数在ML工作流中无处不在;它们用于预处理、后处理、评估等等。Lambda是一种无服务器计算服务,它可以让您在不提供或管理服务器的情况下运行代码。使用Lambda,您可以运行包含Python的首选语言的代码。您可以使用它来作为您的管道的一部分运行自定义的Python代码。Lambda步骤使您能够在SageMaker管道中运行Lambda函数。请使用以下代码开始:

%%writefile lambdafunc.py

import json

def lambda_handler(event, context):
    str1 = event["str1"]
    str2 = event["str2"]
    str3 = str1 + str2
    return {
        "str3": str3
    }

</pre

使用SageMaker Python SDK的Lambda助手创建Lambda函数:

from sagemaker.lambda_helper import Lambda

def create_lambda(function_name, script, handler):
    response = Lambda(
        function_name=function_name,
        execution_role_arn=role,
        script= script,
        handler=handler,
        timeout=600,
        memory_size=10240,
    ).upsert()

    function_arn = response['FunctionArn']
    return function_arn

fn_arn = create_Lambda("func", "lambdafunc.py", handler = "lambdafunc.lambda_handler")

调用Lambda步骤:

from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum
)

str3 = LambdaOutput(output_name="str3", output_type=LambdaOutputTypeEnum.String)

# Lambda步骤
step_lambda1 = LambdaStep(
    name="LambdaStep1",
    lambda_func=Lambda(
        function_arn=fn_arn
    ),
    inputs={
        "str1": "Hello",
        "str2": " World"
    },
    outputs=[str3],
)

在步骤之间传递数据

用于流水线步骤的输入数据可以是可访问的数据位置或由流水线中的前一步骤生成的数据。您可以将此信息提供为ProcessingInput参数。让我们看看如何使用ProcessingInput的几个场景。

场景1:将Lambda步骤的输出(原始数据类型)传递给处理步骤

原始数据类型是指标量数据类型,如字符串、整数、布尔值和浮点数。

以下代码片段定义了一个Lambda函数,该函数返回一个包含原始数据类型变量的字典。当从SageMaker流水线中的Lambda步骤调用时,您的Lambda函数代码将返回一个键值对的JSON。

def handler(event, context):
    ...
    return {
        "output1": "string_value",
        "output2": 1,
        "output3": True,
        "output4": 2.0,
    }

在流水线定义中,您可以定义特定数据类型的SageMaker流水线参数,并将变量设置为Lambda函数的输出:

from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum
)
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor

role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()

# 1. 定义Lambda步骤的输出参数

str_outputParam = LambdaOutput(output_name="output1", output_type=LambdaOutputTypeEnum.String)
int_outputParam = LambdaOutput(output_name"output2", output_type=LambdaOutputTypeEnum.Integer)
bool_outputParam = LambdaOutput(output_name"output3", output_type=LambdaOutputTypeEnum.Boolean)
float_outputParam = LambdaOutput(output_name"output4", output_type=LambdaOutputTypeEnum.Float)

# 2. 调用Lambda步骤并返回输出

step_lambda = LambdaStep(
    name="MyLambdaStep",
    lambda_func=Lambda(
        function_arn="arn:aws:lambda:us-west-2:123456789012:function:sagemaker_test_lambda",
        session=PipelineSession(),
        ),
    inputs={"arg1": "foo", "arg2": "foo1"},
    outputs=[
        str_outputParam, int_outputParam, bool_outputParam, float_outputParam
        ],
)

# 3. 提取Lambda的输出

str_outputParam = step_lambda.properties.Outputs["output1"]

# 4. 在后续步骤中使用它,例如处理步骤

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=1,
    sagemaker_session=pipeline_session,
    role=role
)

processor_args = sklearn_processor.run(
    code="code/preprocess.py", # 要运行的Python脚本
    arguments=["--input-args", str_outputParam]
)

step_process = ProcessingStep(
    name="processstep1",
    step_args=processor_args,
)

场景2:将Lambda步骤的输出(非原始数据类型)传递给处理步骤

非原始数据类型是指非标量数据类型(例如,NamedTuple)。您可能会遇到这样的情况,即您需要从Lambda函数返回非原始数据类型。为此,您需要将非原始数据类型转换为字符串:

# 返回非原始数据类型的Lambda函数代码

from collections import namedtuple

def lambda_handler(event, context):
    Outputs = namedtuple("Outputs", "sample_output")
    named_tuple = Outputs(
                    [
                        {'output1': 1, 'output2': 2},
                        {'output3': 'foo', 'output4': 'foo1'}
                    ]
                )
    return {
        "named_tuple_string": str(named_tuple)
    }

# 使用Lambda输出作为“参数输入”的管道步骤

output_ref = step_lambda.properties.Outputs["named_tuple_string"]

然后,您可以将此字符串用作管道中后续步骤的输入。要在代码中使用命名元组,请使用eval()来解析字符串中的Python表达式:

# 在处理逻辑代码中解析字符串

import argparse
from collections import namedtuple

Outputs = namedtuple("Outputs", "sample_output")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--named_tuple_string", type=str, required=True)
    args = parser.parse_args()
    # 使用eval获得命名元组
    named_tuple = eval(args.named_tuple_string)

场景3:通过属性文件传递步骤的输出

您还可以将处理步骤的输出存储在属性JSON文件中,以供ConditionStep或另一个ProcessingStep在下游使用。您可以使用JSONGet函数查询属性文件。请参考以下代码:

# 1. 使用ProcessingOutput定义一个Processor
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="sklearn-abalone-preprocess",
    sagemaker_session=session,
    role=sagemaker.get_execution_role(),
)

step_args = sklearn_processor.run(

                outputs=[
                    ProcessingOutput(
                        output_name="hyperparam",
                        source="/opt/ml/processing/evaluation"
                    ),
                ],
            code="./local/preprocess.py",
            arguments=["--input-data", "s3://my-input"],
)

# 2. 使用与Processor中使用的output_name相匹配的名称定义一个PropertyFile

hyperparam_report = PropertyFile(
    name="AbaloneHyperparamReport",
    output_name="hyperparam",
    path="hyperparam.json",
)

假设属性文件的内容如下:

{
    "hyperparam": {
        "eta": {
            "value": 0.6
        }
    }
}

在这种情况下,可以使用JsonGet函数查询特定的值,并在后续步骤中使用:

# 3. 查询属性文件
eta = JsonGet(
    step_name=step_process.name,
    property_file=hyperparam_report,
    json_path="hyperparam.eta.value",
)

在管道定义中参数化变量

参数化变量以便在运行时使用通常是可取的,例如用于构建S3 URI。您可以使用Join函数对字符串进行参数化处理,以便在运行时进行评估。以下代码片段显示了如何使用Join函数定义变量,并将其用于在处理步骤中设置输出位置:

# 定义变量以存储S3 URI
s3_location = Join(
    on="/", 
    values=[
        "s3:/",
        ParameterString(
            name="MyBucket", 
            default_value=""
        ),
        "training",
        ExecutionVariables.PIPELINE_EXECUTION_ID
    ]
)

# 定义处理步骤
sklearn_processor = SKLearnProcessor(
    framework_version="1.2-1",
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess",
    sagemaker_session=pipeline_session,
    role=role,
)

# 在处理步骤中使用s3uri作为输出位置
processor_run_args = sklearn_processor.run(
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/train",
            destination=s3_location,
        ),
    ],
    code="code/preprocess.py"
)

step_process = ProcessingStep(
    name="PreprocessingJob”,
    step_args=processor_run_args,
)

在可迭代对象上运行并行代码

某些机器学习工作流在静态集合(可迭代对象)上并行运行代码的循环。它可以是在不同数据上运行的相同代码,也可以是需要为每个项运行的不同代码片段。例如,如果您在文件中有非常大量的行并且希望加快处理时间,可以依赖前一种模式。如果您想要对数据中的特定子组执行不同的转换,您可能需要为每个子组运行不同的代码片段。以下两个场景说明了如何设计用于此目的的SageMaker管道。

场景1:在数据的不同部分上实现处理逻辑

您可以使用多个实例运行处理作业(通过将instance_count设置为大于1的值)。这会将来自Amazon S3的输入数据分发到所有处理实例中。然后,您可以使用脚本(process.py)根据实例编号和项目列表中的相应元素来处理数据的特定部分。process.py中的编程逻辑可以编写为根据它处理的项目列表而运行不同模块或代码片段。以下示例定义了一个可用于ProcessingStep的处理器:

sklearn_processor = FrameworkProcessor(
    estimator_cls=sagemaker.sklearn.estimator.SKLearn,
    framework_version="0.23-1",
    instance_type='ml.m5.4xlarge',
    instance_count=4, #并行执行/实例数
    base_job_name="parallel-step",
    sagemaker_session=session,
    role=role,
)

step_args = sklearn_processor.run(
    code='process.py',
    arguments=[
        "--items", 
        list_of_items, #包含项目列表的数据结构
        inputs=[
            ProcessingInput(source="s3://sagemaker-us-east-1-xxxxxxxxxxxx/abalone/abalone-dataset.csv",
                    destination="/opt/ml/processing/input"
            )
        ],
    ]
)

场景2:运行一系列步骤

当您有一系列需要并行运行的步骤时,您可以将每个序列定义为独立的SageMaker管道。这些SageMaker管道的运行可以从Lambda函数中触发,该函数是父管道中的LambdaStep的一部分。以下代码段示例了触发两个不同的SageMaker管道运行的情况:

import boto3
def lambda_handler(event, context):
    items = [1, 2]
    #sagemaker客户端
    sm_client = boto3.client("sagemaker")
    
    #需要触发的管道的名称。
    #如果有多个,您可以使用boto3 api获取可用的管道
    #并根据您的逻辑触发适当的管道。
    pipeline_name = 'child-pipeline-1'

    #为每个项目触发管道
    response_ppl = sm_client.start_pipeline_execution(
                        PipelineName=pipeline_name,
                        PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s),
                    )
    pipeline_name = 'child-pipeline-2'
    response_ppl = sm_client.start_pipeline_execution(
                        PipelineName=pipeline_name,
                        PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s),
                    )
return

总结

在本文中,我们讨论了使用和维护SageMaker管道的一些最佳实践。我们还提供了一些模式,您可以在使用SageMaker Pipelines设计工作流时采用,无论您是在编写新的管道还是将ML工作流从其他编排工具迁移过来。要开始使用SageMaker Pipelines进行ML工作流编排,请参阅GitHub上的代码示例和Amazon SageMaker Model Building Pipelines。