释放效率:利用Amazon SageMaker Pipelines中的选择性执行的力量
释放效率:利用Amazon SageMaker Pipelines的选择性执行
MLOps是一门关键学科,通常负责监督将机器学习(ML)模型投入生产的路径。专注于要训练和部署的单个模型是很自然的。然而,在现实中,您可能会与数十甚至数百个模型一起工作,并且该过程可能涉及多个复杂步骤。因此,建立基础设施以跟踪、训练、部署和监控具有不同复杂性的模型是很重要的。这就是MLOps工具的用武之地。MLOps工具可以帮助您可重复、可靠地构建和简化这些过程,形成适合机器学习的工作流程。
Amazon SageMaker Pipelines是Amazon SageMaker的一个功能,它是一个专为机器学习而设计的工作流程编排服务,可帮助您自动化大规模的端到端机器学习工作流程。它通过提供一个集中平台来协调数据准备、模型训练、调优和验证等任务,简化了ML模型的开发和维护。SageMaker Pipelines可以帮助您简化工作流程管理,加速实验和更轻松地重新训练模型。
在本文中,我们将重点介绍SageMaker Pipelines的一个令人兴奋的新功能,即选择性执行。这个新功能使您能够有选择地运行ML工作流程的特定部分,从而通过限制运行范围内的流水线步骤并消除运行范围外的步骤的需要,实现显著的时间和计算资源节省。此外,我们还将探讨使用选择性执行的各种用例,进一步巩固其价值主张。
解决方案概述
SageMaker Pipelines通过发布选择性执行来不断改进其开发者体验。ML构建者现在可以选择在流水线中运行特定步骤,而无需重新运行整个流水线。这个功能使您能够在修改与所选步骤相关的运行时参数的同时,重新运行流水线的特定部分。
- 遇见Embroid:一种AI方法,可以将LLM与来自多个较小模型的嵌入信息相互拼接,从而实现在没有监督的情况下自动纠正LLM预测
- 使用ONNX框架提升模型的互操作性和效率
- 汤森路透(Thomson Reuters)在不到6周的时间内开发了Open Arena,一个企业级大型语言模型平台的游乐场
需要注意的是,所选步骤可能依赖于非选定步骤的结果。在这种情况下,这些非选定步骤的输出将从当前流水线版本的参考运行中重用。这意味着参考运行必须已经完成。默认的参考运行是当前流水线版本的最新运行,但您也可以选择使用当前流水线版本的其他运行作为参考。
参考运行的整体状态必须为成功、失败或停止。当选择性执行尝试使用其输出时,它不能为运行中。在使用选择性执行时,您可以选择任意数量的步骤运行,只要它们形成流水线的连续部分。
下图说明了完整运行的流水线行为。
下图说明了使用选择性执行的流水线行为。
在接下来的几节中,我们将展示如何在各种场景中使用选择性执行,包括流水线有向无环图(DAGs)中的复杂工作流程。
先决条件
要开始尝试选择性执行,我们需要首先设置SageMaker环境的以下组件:
- SageMaker Python SDK – 确保您的Python环境中安装了更新的SageMaker Python SDK。您可以从笔记本或终端运行以下命令来安装或升级SageMaker Python SDK版本至2.162.0或更高版本:
python3 -m pip install sagemaker>=2.162.0
或pip3 install sagemaker>=2.162.0
。 - 访问SageMaker Studio(可选) – Amazon SageMaker Studio可用于可视化流水线运行并与现有流水线ARN进行交互。如果您没有访问SageMaker Studio或者正在使用按需笔记本或其他IDE,您仍然可以按照本文并使用Python SDK与流水线ARN进行交互。
完整端到端演示的示例代码可在GitHub存储库中找到。
设置
使用sagemaker>=1.162.0
的Python SDK,我们引入了sagemaker.workflow.selective_execution_config
模块中的SelectiveExecutionConfig
类。选择性执行功能依赖于先前标记为成功、失败或停止的流水线ARN。下面的代码片段演示了如何导入SelectiveExecutionConfig
类,检索参考流水线ARN,并收集控制流水线运行的关联流水线步骤和运行时参数:
import boto3
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.selective_execution_config import SelectiveExecutionConfig
sm_client = boto3.client('sagemaker')
# 引用您的示例流水线名称
pipeline_name = "鲍鱼流水线"
# 过滤出先前成功的流水线执行arn
pipeline_executions = [_exec
for _exec in Pipeline(name=pipeline_name).list_executions()['PipelineExecutionSummaries']
if _exec['PipelineExecutionStatus'] == "成功"
]
# 获取最后一个成功的执行
latest_pipeline_arn = pipeline_executions[0]['PipelineExecutionArn']
print(latest_pipeline_arn)
>>> arn:aws:sagemaker:us-east-1:123123123123:pipeline/鲍鱼流水线/execution/x62pbar3gs6h
# 列出示例流水线的所有步骤
execution_steps = sm_client.list_pipeline_execution_steps(
PipelineExecutionArn=latest_pipeline_arn
)['PipelineExecutionSteps']
print(execution_steps)
>>>
[{'StepName': '鲍鱼-预处理',
'StartTime': datetime.datetime(2023, 6, 27, 4, 41, 30, 519000, tzinfo=tzlocal()),
'EndTime': datetime.datetime(2023, 6, 27, 4, 41, 30, 986000, tzinfo=tzlocal()),
'StepStatus': '成功',
'AttemptCount': 0,
'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:123123123123:processing-job/pipelines-fvsmu7m7ki3q-鲍鱼-预处理-d68CecvHLU'}},
'SelectiveExecutionResult': {'SourcePipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:123123123123:pipeline/鲍鱼流水线/execution/ksm2mjwut6oz'}},
{'StepName': '鲍鱼-训练',
'StartTime': datetime.datetime(2023, 6, 27, 4, 41, 31, 320000, tzinfo=tzlocal()),
'EndTime': datetime.datetime(2023, 6, 27, 4, 43, 58, 224000, tzinfo=tzlocal()),
'StepStatus': '成功',
'AttemptCount': 0,
'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:123123123123:training-job/pipelines-x62pbar3gs6h-鲍鱼-训练-PKhAc1Q6lx'}}},
{'StepName': '鲍鱼-评估',
'StartTime': datetime.datetime(2023, 6, 27, 4, 43, 59, 40000, tzinfo=tzlocal()),
'EndTime': datetime.datetime(2023, 6, 27, 4, 57, 43, 76000, tzinfo=tzlocal()),
'StepStatus': '成功',
'AttemptCount': 0,
'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:123123123123:processing-job/pipelines-x62pbar3gs6h-鲍鱼-评估-vmkZDKDwhk'}}},
{'StepName': '鲍鱼-MSE检查',
'StartTime': datetime.datetime(2023, 6, 27, 4, 57, 43, 821000, tzinfo=tzlocal()),
'EndTime': datetime.datetime(2023, 6, 27, 4, 57, 44, 124000, tzinfo=tzlocal()),
'StepStatus': '成功',
'AttemptCount': 0,
'Metadata': {'Condition': {'Outcome': 'True'}}}]
# 列出所有可配置的流水线参数
# 参数可以在选择性执行期间更改
parameters = sm_client.list_pipeline_parameters_for_execution(
PipelineExecutionArn=latest_pipeline_arn
)['PipelineParameters']
print(parameters)
>>>
[{'Name': 'XGBNumRounds', 'Value': '120'},
{'Name': 'XGBSubSample', 'Value': '0.9'},
{'Name': 'XGBGamma', 'Value': '2'},
{'Name': 'TrainingInstanceCount', 'Value': '1'},
{'Name': 'XGBMinChildWeight', 'Value': '4'},
{'Name': 'XGBETA', 'Value': '0.25'},
{'Name': 'ApprovalStatus', 'Value': 'PendingManualApproval'},
{'Name': 'ProcessingInstanceCount', 'Value': '1'},
{'Name': 'ProcessingInstanceType', 'Value': 'ml.t3.medium'},
{'Name': 'MseThreshold', 'Value': '6'},
{'Name': 'ModelPath',
'Value': 's3://sagemaker-us-east-1-123123123123/鲍鱼/models/'},
{'Name': 'XGBMaxDepth', 'Value': '12'},
{'Name': 'TrainingInstanceType', 'Value': 'ml.c5.xlarge'},
{'Name': 'InputData',
'Value': 's3://sagemaker-us-east-1-123123123123/sample-dataset/abalone/abalone.csv'}]
应用场景
在本部分中,我们展示了一些选择性执行可以潜在地节省时间和资源的场景。我们使用典型的流水线流程作为参考,包括数据提取、训练、评估、模型注册和部署等步骤,来演示选择性执行的优势。
SageMaker Pipelines允许您使用管道参数为管道运行定义运行时参数。当触发新的运行时,它通常会从头到尾运行整个流水线。然而,如果启用了步骤缓存,SageMaker Pipelines将尝试找到具有相同属性值的当前流水线步骤的先前运行。如果找到匹配项,SageMaker Pipelines将使用先前运行的输出而不是重新计算该步骤。请注意,即使启用了步骤缓存,SageMaker Pipelines默认情况下仍会运行整个工作流至结束。
通过选择性执行功能的发布,您现在可以重新运行整个流水线工作流程或选择性地运行一部分步骤,而无需启用步骤缓存。以下应用场景说明了您可以使用选择性执行的各种方式。
应用场景1:运行单个步骤
数据科学家通常只关注MLOps流水线的训练阶段,不想担心预处理或部署步骤。选择性执行允许数据科学家只关注训练步骤,并即时修改训练参数或超参数来改进模型。这可以节省时间和降低成本,因为计算资源仅用于运行用户选择的流水线步骤。请参考以下代码:
# 选择参考流水线 ARN 和要执行的子步骤
selective_execution_config = SelectiveExecutionConfig(
source_pipeline_execution_arn="arn:aws:sagemaker:us-east-1:123123123123:pipeline/AbalonePipeline/execution/9e3ljoql7s0n",
selected_steps=["Abalone-Train"]
)
# 开始运行子流水线
select_execution = pipeline.start(
selective_execution_config=selective_execution_config,
parameters={
"XGBNumRounds": 120,
"XGBSubSample": 0.9,
"XGBGamma": 2,
"XGBMinChildWeight": 4,
"XGBETA": 0.25,
"XGBMaxDepth": 12
}
)
以下图片示例了正在进行的带有一个步骤的流水线,然后完成的流水线。
应用场景2:运行多个连续的流水线步骤
继续前一个应用场景,数据科学家想要训练一个新模型,并将其性能与黄金测试数据集进行评估。这个评估是确保模型满足用户验收测试(UAT)或生产部署的严格指南的关键。然而,数据科学家并不想运行整个流水线工作流程或部署模型。他们可以使用选择性执行只关注训练和评估步骤,节省时间和资源,同时仍然获得所需的验证结果:
# 选择参考流水线 ARN 和要执行的子步骤
selective_execution_config = SelectiveExecutionConfig(
source_pipeline_execution_arn="arn:aws:sagemaker:us-east-1:123123123123:pipeline/AbalonePipeline/execution/9e3ljoql7s0n",
selected_steps=["Abalone-Train", "Abalone-Evaluate"]
)
# 开始运行子流水线
select_execution = pipeline.start(
selective_execution_config=selective_execution_config,
parameters={
"ProcessingInstanceType": "ml.t3.medium",
"XGBNumRounds": 120,
"XGBSubSample": 0.9,
"XGBGamma": 2,
"XGBMinChildWeight": 4,
"XGBETA": 0.25,
"XGBMaxDepth": 12
}
)
用例 3:更新和重新运行失败的流水线步骤
您可以使用选择性执行来重新运行流水线中的失败步骤,或从失败步骤之后的步骤继续运行流水线。这对于故障排除和调试失败的步骤非常有用,因为它允许开发人员专注于需要解决的具体问题。这可以提高问题解决的效率并加快迭代时间。以下示例说明了如何选择仅重新运行流水线的失败步骤。
# 选择先前失败的流水线 arn
selective_execution_config = SelectiveExecutionConfig(
source_pipeline_execution_arn="arn:aws:sagemaker:us-east-1:123123123123:pipeline/AbalonePipeline/execution/fvsmu7m7ki3q",
selected_steps=["Abalone-Evaluate"]
)
# 开始执行失败的流水线子集
select_execution = pipeline.start(
selective_execution_config=selective_execution_config
)
或者,数据科学家可以通过在SelectiveExecutionConfig
中指定失败的步骤和其后的所有步骤,从失败的步骤恢复流水线的工作流到结束。
用例 4:流水线覆盖率
在某些流水线中,某些分支的运行频率较低。例如,可能会有一个分支仅在特定条件失败时运行。测试这些运行频率较低的分支非常重要,以确保它们在发生故障时按预期工作。通过测试这些较少运行的分支,开发人员可以验证他们的流水线的稳健性,并确保错误处理机制能够有效地维护期望的工作流程并产生可靠的结果。
selective_execution_config = SelectiveExecutionConfig(
source_pipeline_execution_arn="arn:aws:sagemaker:us-east-1:123123123123:pipeline/AbalonePipeline/execution/9e3ljoql7s0n",
selected_steps=["Abalone-Train", "Abalone-Evaluate", "Abalone-MSECheck", "Abalone-FailNotify"]
)
结论
在本文中,我们讨论了SageMaker Pipelines的选择性执行功能,它使您能够选择性地运行机器学习工作流程中的特定步骤。这种能力可以节省大量的时间和计算资源。我们在GitHub存储库中提供了一些示例代码,演示了如何使用选择性执行,并介绍了各种情况下它对用户的优势。如果您想了解更多关于选择性执行的内容,请参阅我们的开发人员指南和API参考指南。
要详细了解SageMaker Pipelines工作流程中可用的步骤,请参阅Amazon SageMaker模型构建流水线和SageMaker工作流程。此外,您可以在AWS SageMaker示例GitHub存储库中找到更多展示使用SageMaker Pipelines的不同用例和实现方法的示例。这些资源可以进一步提升您的理解并帮助您充分利用SageMaker Pipelines和选择性执行在当前和未来的机器学习项目中的潜力。