使用亚马逊SageMaker高效地训练、调优和部署自定义集成模型

使用亚马逊SageMaker集成模型

人工智能(AI)已成为技术社区中重要且热门的话题。随着AI的发展,我们看到了不同类型的机器学习(ML)模型出现。其中一种方法被称为集成建模,已在数据科学家和实践者中迅速获得认可。在本文中,我们将讨论什么是集成模型以及为什么使用它们是有益的。然后,我们将提供一个示例,介绍如何使用Amazon SageMaker训练、优化和部署自定义集成模型。

集成学习指的是使用多个学习模型和算法来获得比任何单个独立学习算法更准确的预测。它们在各种应用和学习环境中被证明是高效的,例如网络安全[1]和欺诈检测、遥感、金融决策中预测最佳下一步、医学诊断,甚至计算机视觉和自然语言处理(NLP)任务。我们倾向于根据用于训练它们的技术、它们的组合方式以及它们将不同的预测合并成单个推理的方式对集成模型进行分类。这些类别包括:

  • 提升(Boosting) – 顺序训练多个弱学习器,其中每个序列中以前学习器的每个错误预测都赋予更高的权重并输入到下一个学习器中,从而创建一个更强大的学习器。示例包括AdaBoost、梯度提升和XGBoost。
  • 装袋(Bagging) – 使用多个模型来减少单个模型的方差。示例包括随机森林和极端随机树。
  • 堆叠(blending) – 常常使用不同的模型,其中每个独立评估器的预测被堆叠在一起并用作输入到最终评估器中,该评估器处理预测。这个最终评估器的训练过程通常使用交叉验证。

将预测结果合并为最终输出的单个结果有多种方法,例如使用元估计器(例如线性学习器)、使用多个模型进行基于多数投票的分类任务预测,或者对回归进行集成平均。

虽然有几个库和框架提供集成模型的实现,例如XGBoost、CatBoost或scikit-learn的随机森林,但在本文中,我们专注于使用自己的模型并将其用作堆叠集成的方式。然而,与为每个模型使用专用资源(专用训练和调优作业以及每个模型的托管端点)不同,我们使用单个SageMaker训练作业、单个调优作业来训练、调优和部署自定义集合(多个模型),并部署到单个端点,从而减少可能的成本和运营开销。

BYOE:带上你的集成模型

使用SageMaker训练和部署异构集成模型有几种方法:您可以在单独的训练作业中训练每个模型,并使用Amazon SageMaker自动模型调优来优化每个模型。在托管这些模型时,SageMaker提供了各种经济高效的方式来在同一租户基础架构上托管多个模型。有关这种设置的详细部署模式可以在Amazon SageMaker中的模型托管模式,第1部分:构建Amazon SageMaker上机器学习应用程序的常见设计模式中找到。这些模式包括使用多个端点(用于每个训练模型)或单个多模型端点,甚至是单个多容器端点,其中容器可以单独调用或在管道中链接。所有这些解决方案都包括一个元估计器(例如在AWS Lambda函数中),它调用每个模型并实现混合或投票功能。

然而,运行多个培训作业可能会引入操作和成本开销,特别是如果您的集成模型需要在相同的数据上训练。类似地,将不同模型托管在单独的端点或容器上,并结合它们的预测结果以获得更好的准确性,需要多次调用,从而引入了额外的管理、成本和监控工作。例如,SageMaker支持使用Triton推理服务器的集成ML模型,但此解决方案要求模型或模型集合受到Triton后端的支持。此外,客户需要额外的工作来设置Triton服务器并学习如何使用不同的Triton后端。因此,客户更喜欢一种更直接的实现解决方案的方式,他们只需要一次将调用发送到端点,并具有灵活性来控制如何聚合结果以生成最终输出。

解决方案概述

为了解决这些问题,我们将通过一个例子来介绍如何使用单个训练作业进行集成训练,优化模型的超参数,并使用单个容器部署到无服务器端点。我们在我们的集成堆栈中使用了两个模型:CatBoost和XGBoost(都是提升集成模型)。对于我们的数据,我们使用了scikit-learn库中的糖尿病数据集[2]:它包含10个特征(年龄、性别、体重、血压和六项血清测量),我们的模型预测基线特征收集后1年后的疾病进展(回归模型)。

完整的代码存储库可以在GitHub上找到。

在单个SageMaker作业中训练多个模型

为了训练我们的模型,我们在脚本模式下使用SageMaker训练作业。使用脚本模式,您可以编写自定义的训练(以及后续的推理代码),同时使用SageMaker框架容器。框架容器使您能够使用由AWS管理的预配置环境,其中包括所有必要的配置和模块。为了演示如何自定义框架容器,我们使用预构建的SKLearn容器作为示例,该容器不包括XGBoost和CatBoost软件包。有两种方法可以添加这些软件包:要么扩展内置容器以安装CatBoost和XGBoost(然后部署为自定义容器),要么使用SageMaker训练作业脚本模式功能,在创建训练估计器时提供一个requirements.txt文件。SageMaker训练作业会在运行时安装requirements.txt文件中列出的库。这样,您就不需要管理自己的Docker镜像存储库,并且可以更灵活地运行需要额外Python软件包的训练脚本。

以下代码块显示了我们用于启动训练的代码。 entry_point参数指向我们的训练脚本。我们还使用了SageMaker SDK API的两个强大功能:

  • 首先,我们在source_dirdependencies参数中指定源目录和依赖项的本地路径。SDK将压缩并上传这些目录到Amazon Simple Storage Service(Amazon S3),SageMaker将在训练实例上的工作目录/opt/ml/code下提供这些目录。
  • 其次,我们使用SDK的SKLearn估计器对象,指定我们偏好的Python和框架版本,以便SageMaker拉取相应的容器。我们还定义了一个自定义训练指标“validation:rmse”,这将在训练日志中发出并被SageMaker捕获。稍后,我们将使用此指标作为调优作业的目标指标。
hyperparameters = {"num_round": 6, "max_depth": 5}
estimator_parameters = {
    "entry_point": "multi_model_hpo.py",
    "source_dir": "code",
    "dependencies": ["my_custom_library"],
    "instance_type": training_instance_type,
    "instance_count": 1,
    "hyperparameters": hyperparameters,
    "role": role,
    "base_job_name": "xgboost-model",
    "framework_version": "1.0-1",
    "keep_alive_period_in_seconds": 60,
    "metric_definitions":[
       {'Name': 'validation:rmse', 'Regex': 'validation-rmse:(.*?);'}
    ]
}
estimator = SKLearn(**estimator_parameters)

接下来,我们编写我们的训练脚本(multi_model_hpo.py)。我们的脚本按照简单的流程进行:捕捉作业配置的超参数,并训练CatBoost模型和XGBoost模型。我们还实现了一个k折交叉验证函数。请参阅下面的代码:

if __name__ == "__main__":
    parser = argparse.ArgumentParser()

    # Sagemaker特定的参数。默认值在环境变量中设置。
    parser.add_argument("--output-data-dir", type=str, default=os.environ["SM_OUTPUT_DATA_DIR"])
    parser.add_argument("--model-dir", type=str, default=os.environ["SM_MODEL_DIR"])
    parser.add_argument("--train", type=str, default=os.environ["SM_CHANNEL_TRAIN"])
    parser.add_argument("--validation", type=str, default=os.environ["SM_CHANNEL_VALIDATION"])
    .
    .
    .
    
    """
    训练CatBoost
    """
    
    K = args.k_fold    
    catboost_hyperparameters = {
        "max_depth": args.max_depth,
        "eta": args.eta,
    }
    rmse_list, model_catboost = cross_validation_catboost(train_df, K, catboost_hyperparameters)
    .
    .
    .
    
    """
    训练XGBoost模型
    """

    hyperparameters = {
        "max_depth": args.max_depth,
        "eta": args.eta,
        "objective": args.objective,
        "num_round": args.num_round,
    }

    rmse_list, model_xgb = cross_validation(train_df, K, hyperparameters)

在模型训练完成后,我们计算CatBoost和XGBoost预测结果的平均值。结果pred_mean是我们集成模型的最终预测结果。然后,我们计算预测结果与验证集的均方根误差mean_squared_error。在训练过程中,使用val_rmse来评估整个集成模型。需要注意的是,我们还按照metric_definitions中使用的正则表达式模式打印了RMSE值。稍后,SageMaker自动模型调优将使用该值来捕捉目标指标。请参考以下代码:

pred_mean = np.mean(np.array([pred_catboost, pred_xgb]), axis=0)
val_rmse = mean_squared_error(y_validation, pred_mean, squared=False)
print(f"最终评估结果:验证集均方根误差:{val_rmse}")

最后,我们的脚本将两个模型的结果保存到位于/opt/ml/model的输出文件夹中。

当训练作业完成时,SageMaker将将/opt/ml/model目录的内容打包为压缩的TAR格式文件,并将其复制到您在作业配置中指定的S3位置。在我们的情况下,SageMaker将两个模型打包成一个TAR文件,并在训练作业结束时将其上传到Amazon S3。请参考以下代码:

model_file_name = 'catboost-regressor-model.dump'
   
    # 保存CatBoost模型
    path = os.path.join(args.model_dir, model_file_name)
    print('将模型文件保存到{}'.format(path))
    model.save_model(path)
   .
   .
   .
   # 保存XGBoost模型
   model_location = args.model_dir + "/xgboost-model"
   pickle.dump(model, open(model_location, "wb"))
   logging.info("已在{}处保存训练好的模型".format(model_location))

总结一下,您应该注意到在这个过程中,我们只下载了一次数据,并使用单个训练作业训练了两个模型。

自动集成模型调优

由于我们正在构建一组机器学习模型,探索所有可能的超参数组合是不切实际的。SageMaker提供了自动模型调优(AMT)功能,它通过专注于您指定范围内最有希望的数值组合来寻找最佳的模型超参数(您需要定义适合探索的正确范围)。SageMaker支持多种优化方法供您选择。

我们首先定义优化过程的两个部分:目标指标和要调优的超参数。在我们的示例中,我们使用验证集的RMSE作为目标指标,并调优etamax_depth(其他超参数请参考XGBoost超参数和CatBoost超参数):

from sagemaker.tuner import (
    IntegerParameter,
    ContinuousParameter,
    HyperparameterTuner,
)

hyperparameter_ranges = {
    "eta": ContinuousParameter(0.2, 0.3),
    "max_depth": IntegerParameter(3, 4)
}
metric_definitions = [{"Name": "validation:rmse", "Regex": "validation-rmse:([0-9\\.]+)"}]
objective_metric_name = "validation:rmse"

我们还需要确保在训练脚本中,超参数不是硬编码的,而是从SageMaker的运行时参数中获取:

catboost_hyperparameters = {
    "max_depth": args.max_depth,
    "eta": args.eta,
}

SageMaker还会将超参数写入JSON文件,并可以从训练实例上的/opt/ml/input/config/hyperparameters.json中读取。

与CatBoost类似,我们还捕获了XGBoost模型的超参数(请注意objectivenum_round没有被调优):

catboost_hyperparameters = {
    "max_depth": args.max_depth,
    "eta": args.eta,
}

最后,我们使用这些配置启动超参数调优作业:

tuner = HyperparameterTuner(
    estimator, 
    objective_metric_name,
    hyperparameter_ranges, 
    max_jobs=4, 
    max_parallel_jobs=2, 
    objective_type='Minimize'
)
tuner.fit({"train": train_location, "validation": validation_location}, include_cls_metadata=False)

当作业完成后,您可以检索得到最佳训练作业(具有最小RMSE)的值:

job_name=tuner.latest_tuning_job.name
attached_tuner = HyperparameterTuner.attach(job_name)
attached_tuner.describe()["BestTrainingJob"]

有关AMT的更多信息,请参阅在SageMaker中执行自动模型调整。

部署

为了部署我们的自定义集成模型,我们需要提供一个处理推理请求并配置SageMaker托管的脚本。在这个例子中,我们使用了一个包含训练和推理代码的单个文件(multi_model_hpo.py)。SageMaker在部署和提供模型时使用__name__ == "__main__"下的代码进行训练,以及model_fninput_fnpredict_fn函数进行模型的部署和服务。

推理脚本

与训练一样,我们使用SageMaker SKLearn框架容器和自己的推理脚本。脚本将实现SageMaker所需的三个方法。

首先,model_fn方法读取我们保存的模型文件并将其加载到内存中。在我们的例子中,该方法将返回我们的集成模型all_model,它是一个Python列表,但您也可以使用以模型名称为键的字典。

def model_fn(model_dir):
    catboost_model = CatBoostRegressor()
    catboost_model.load_model(os.path.join(model_dir, model_file_name))
    
    model_file = "xgboost-model"
    model = pickle.load(open(os.path.join(model_dir, model_file), "rb"))
    
    all_model = [catboost_model, model]
    return all_model

其次,input_fn方法将反序列化请求输入数据,以传递给我们的推理处理程序。有关输入处理程序的更多信息,请参阅适应您自己的推理容器。

def input_fn(input_data, content_type):
    dtype=None
    payload = StringIO(input_data)
    return np.genfromtxt(payload, dtype=dtype, delimiter=",")

第三,predict_fn方法负责从模型获取预测。该方法接受模型和从input_fn返回的数据作为参数,并返回最终的预测结果。在我们的例子中,我们首先从模型列表的第一个成员(model[0])获取CatBoost的结果,然后从第二个成员(model[1])获取XGBoost的结果,并使用一个融合函数返回两个预测结果的平均值:

def predict_fn(input_data, model):
    predictions_catb = model[0].predict(input_data)
    
    dtest = xgb.DMatrix(input_data)
    predictions_xgb = model[1].predict(dtest,
                                       ntree_limit=getattr(model, "best_ntree_limit", 0),
                                       validate_features=False)
    
    return np.mean(np.array([predictions_catb, predictions_xgb]), axis=0)

现在我们已经有了训练好的模型和推理脚本,我们可以配置环境来部署我们的集成模型。

SageMaker无服务器推理

尽管SageMaker有许多托管选项,在这个例子中,我们使用了无服务器端点。无服务器端点会自动启动计算资源,并根据流量的多少进行扩缩。这样就不需要管理服务器了。这个选项适用于在流量突增之间有空闲时间,并且可以容忍冷启动的工作负载。

配置无服务器端点非常简单,因为我们不需要选择实例类型或管理扩缩策略。我们只需要提供两个参数:内存大小和最大并发数。无服务器端点会根据您选择的内存大小自动分配计算资源。如果选择较大的内存大小,容器将可以访问更多的虚拟CPU。您应该根据您的模型大小选择端点的内存大小。我们需要提供的第二个参数是最大并发数。对于单个端点,该参数可以设置为最大200(截至目前,每个区域的无服务器端点总数限制为50)。您应该注意,对于单个端点,最大并发数会阻止该端点占用您帐户允许的所有调用次数,因为超出最大并发数的任何端点调用都会被限制(有关每个区域所有无服务器端点的总并发数的更多信息,请参阅Amazon SageMaker端点和配额)。

from sagemaker.serverless.serverless_inference_config import ServerlessInferenceConfig
serverless_config = ServerlessInferenceConfig(
    memory_size_in_mb=6144,
    max_concurrency=1,
)

现在我们已经配置了端点,我们可以最终部署在超参数优化作业中选择的模型:

estimator=attached_tuner.best_estimator()
predictor = estimator.deploy(serverless_inference_config=serverless_config)

清理

尽管无服务器端点在不使用时没有成本,但在运行完本示例后,您应该确保删除端点:

predictor.delete_endpoint(predictor.endpoint)

结论

在本文中,我们介绍了一种训练、优化和部署自定义集成模型的方法。我们详细介绍了使用单个训练作业训练多个模型的过程,如何使用自动模型调优来优化集成模型的超参数,以及如何部署一个单一的无服务器端点来融合多个模型的推断结果。

使用这种方法可以解决潜在的成本和运营问题。训练作业的成本基于您在使用期间所使用的资源。通过仅下载数据一次来训练两个模型,我们减少了作业的数据下载阶段和存储数据的使用量,从而降低了训练作业的总成本。此外,AMT作业运行了四个训练作业,每个作业都有上述减少的时间和存储,因此节约了4倍的成本!至于在无服务器端点上部署模型,因为您还需要支付处理的数据量,通过仅调用一次端点来处理两个模型,您只需支付一半的I/O数据费用。

尽管本文只展示了两个模型的好处,但您可以使用该方法训练、优化和部署多个集成模型,以实现更大的效果。

参考资料

[1] Raj Kumar, P. Arun; Selvakumar, S. (2011). “Distributed denial of service attack detection using an ensemble of neural classifier”. Computer Communications. 34 (11): 1328–1341. doi:10.1016/j.comcom.2011.01.012.

[2] Bradley Efron, Trevor Hastie, Iain Johnstone and Robert Tibshirani (2004) “Least Angle Regression,” Annals of Statistics (with discussion), 407-499. (https://web.stanford.edu/~hastie/Papers/LARS/LeastAngle_2002.pdf)