入门者完全指南:简化MLOps的ZenML
穿越MLOps的完全指南:简化ZenML
您是新学习数据科学、机器学习或MLOps,并且被工具选择弄得不知所措吗?考虑一下ZenML——一个用于简化生产流水线的编排工具。在本文中,我们将探索ZenML的功能和特点,以简化您的MLOps之旅。
学习目标
- ZenML的概念和命令
- 使用ZenML创建流水线
- 元数据跟踪、缓存和版本控制
- 参数和配置
- ZenML的高级功能
本文是作为数据科学博客马拉松的一部分发表的。
首先,让我们了解一下ZenML是什么,为什么它与其他工具不同以及如何使用它。
ZenML是什么?
ZenML是一个开源的MLOps(机器学习运维)框架,用于数据科学家、机器学习工程师和MLOps开发人员。它促进了在开发生产就绪的ML流水线中的协作。ZenML因其简单性、灵活性和工具无关性而闻名。它提供了专门为ML工作流程设计的接口和抽象,允许用户无缝集成其首选工具并自定义工作流程以满足其独特需求。
- 使用亚马逊Kendra的新网络爬虫对您的网络抓取内容进行索引
- 驾驭舵轮:NVIDIA NeMo SteerLM允许公司在推理过程中定制模型的回答
- Geoffrey Hinton关于高级人工智能的承诺和风险
为什么应该使用ZenML?
ZenML在以下几个方面对数据科学家、机器学习工程师和MLOps工程师有益:
- 简化流水线创建:使用ZenML的@step和@pipeline装饰器轻松构建ML流水线。
- 轻松的元数据跟踪和版本控制:ZenML提供了一个用户友好的仪表板,用于跟踪流水线、运行、组件和工件。
- 自动化部署:ZenML通过在定义为流水线时自动部署模型,简化了模型部署的流程,不再需要自定义Docker镜像。
- 云灵活性:使用ZenML的简单命令轻松在基于云的平台上部署您的模型。
- 标准化的MLOps基础架构:ZenML允许所有团队成员通过将ZenML配置为暂存环境和生产环境来运行流水线,以确保标准化的MLOps设置。
- 无缝集成:轻松将ZenML与实验跟踪工具(如Weights and Biases、MLflow等)集成。
ZenML安装指南
要在终端中安装ZenML,请使用以下命令:
安装ZenML:
pip install zenml
要访问本地仪表板,请使用以下命令进行安装:
pip install "zenml[server]
要验证ZenML是否正确安装并检查其版本,请运行:
zenml version
重要的ZenML术语
- 流水线:机器学习工作流程中的一系列步骤。
- 工件:流水线中每个步骤的输入和输出。
- 工件仓库:用于存储工件的版本化存储库,增强了流水线执行速度。ZenML默认提供了一个本地存储库,存储在您的本地系统上。
- 组件:用于ML流水线中使用的函数的配置。
- 堆栈:由组件和基础设施组成的集合。ZenML的默认堆栈包括:
- 工件仓库
- 编排器
这个图像的左边是我们将其作为一个流水线进行编码的部分,右边是基础设施。这两者之间有明确的分离,因此很容易更改流水线运行的环境。
- 口味:通过将其他MLOps工具与ZenML集成而创建的解决方案,从组件的基本抽象类扩展而来。
- 材料化程序:定义如何通过工件仓库在步骤之间传递输入和输出。所有材料化程序都属于基本材料化程序类。您还可以创建自定义材料化程序以集成ZenML中不存在的工具。
- ZenML服务器:用于部署ML模型和进行预测。
重要的ZenML命令
初始化新的存储库的命令:
zenml init
本地运行仪表板的命令:
zenml up
输出:

查看ZenML流水线状态的命令:
zenml show
查看活动堆栈配置的命令:
zenml stack describe
CLI:

查看所有注册堆栈列表的命令:
zenml stack list
输出:

仪表板:

创建您的第一个流水线
首先,我们需要导入来自ZenML的流水线和步骤以创建我们的流水线:
#导入创建步骤和流水线所需的模块
from zenml import pipeline, step
#定义步骤并返回一个字符串
@step
def sample_step_1() -> str:
return "欢迎来到"
#获取2个输入并打印输出
@step
def sample_step_2(input_1: str, input_2: str) -> None:
print(input_1 + " " + input_2)
#定义一个流水线
@pipeline
def my_first_pipeline():
input_1 = sample_step_1()
sample_step_2(input_1, "Analytics Vidhya")
#执行流水线
my_first_pipeline()
在这个示例流水线中,我们构建了两个独立的步骤,然后将它们整合到整体流水线中。我们使用了@step和@pipeline装饰器来实现这一点。
仪表板:享受您的流水线可视化
![]()
参数和重命名流水线
您可以通过引入参数来增强此流水线。例如,我将演示如何使用with_options()方法修改流水线的运行名称为“Analytics Vidya run”,并指定run_name参数。
#在这里,我们使用with_options()方法来修改流水线的运行名称
my_first_pipeline = my_first_pipeline.with_options(
run_name="Analytics Vidya run")
您可以在仪表板中看到新的名称:
![]()
如果一个步骤有多个输出,最好对其进行元组注释。例如:
#这里有4个输出,所以我们使用了元组。在这里,我们使用注释来说明这些输出的含义。
def train_data() -> Tuple[
Annotated[pd.DataFrame, "X_train"],
Annotated[pd.DataFrame, "X_test"],
Annotated[pd.Series, "Y_train"],
Annotated[pd.Series, "Y_test"],
]:
我们还可以加上日期和时间。
#在这里,我们使用日期和时间作为占位符,它们将自动被当前日期和时间替换。
my_first_pipeline = my_first_pipeline.with_options(
run_name="new_run_name_{{date}}_{{time}}")
my_first_pipeline()
仪表板:
![]()
缓存
通过利用以前运行的输出,当没有代码更改时,缓存加速管道执行过程,节省时间和资源。要启用缓存,只需在@pipeline装饰器旁边包含一个参数。
#在此处,将缓存作为一个参数启用到函数中。@pipeline(enable_cache=True)def my_first_pipeline():
有时我们需要动态调整我们的代码或输入。这种情况下,您可以通过将 enable_cache 设置为 False 来禁用缓存。
在仪表板中,层次结构将如下所示:
您可以利用模型属性来检索管道信息。例如,在下面的示例中,我们使用 model.name 来访问管道的名称。
model=my_first_pipeline.modelprint(model.name)
![]()
您可以通过以下方式查看管道的最后一次运行:
model = my_first_pipeline.modelprint(model.name)# 现在我们可以访问管道的最后一次运行run = model.last_runprint("last run is:", run)
输出:

使用CLI访问管道
您可以利用 Client().get_pipeline() 方法,在不依赖管道定义的情况下检索管道。
命令:
from zenml.client import Clientpipeline_model = Client().get_pipeline("my_first_pipeline")
输出:

虽然您可以方便地在ZenML仪表板中查看所有管道和运行,但值得注意的是,您也可以通过ZenML客户端和CLI访问这些信息。
使用 Client():
#在这里,我们创建了ZenML Client()的一个实例,用于使用 list_pipelines() 方法pipelines=Client().list_pipelines()
输出:

使用 CLI:
zenml pipeline list
输出:

仪表板:

ZenML Stack Components CLI
要查看所有现有的艺术品,只需执行以下命令:
zenml artifact-store list
输出:
![]()
仪表板:
![]()
要查看调度程序列表,
zenml orchestrator list
输出:
![]()
仪表板:
![]()
要注册新的艺术品存储,按照以下命令操作:
zenml artifact-store register my_artifact_store --flavor=local
您还可以使用“update”或“delete”关键字替换“register”关键字,对当前工件存储进行更新或删除。要查看注册的堆栈的详细信息,可以执行以下命令:
zenml artifact-store describe my_artifact_store
输出:

仪表板:
![]()
正如我们之前演示的一样,您还可以切换到不同的活动堆栈。
zenml stack register my_stack -o default -a my_artifact_store

正如我们之前演示的一样,您还可以切换到不同的活动堆栈。
zenml stack set my_stack
现在您可以看到活动堆栈已成功从“default”切换到“my_stack”。

仪表板:您可以在仪表板中查看新的堆栈。
![]()
建议和最佳实践
1. 通过将强大的日志记录方法纳入项目中:
#导入所需的模块from zenml import pipeline, stepfrom zenml.client import Clientfrom zenml.logger import get_loggerlogger=get_logger(__name__)#在此处,我们创建了一个包含2个步骤的流水线.@stepdef sample_step_1()->str: return "欢迎来到"@stepdef sample_step_2(input_1:str,input_2:str)->None: print(input_1+" "+input_2)@pipelinedef my_first_pipeline():#在这里,'logger'用于记录信息消息 logger.info("这是一个演示项目") input_1=sample_step_1() sample_step_2(input_1,"Analytics Vidya")my_first_pipeline()
输出:
![]()
2. 确保您的项目拥有良好结构的模板。清晰的模板增强了代码可读性,并便于其他审查您项目的人更容易理解。
My_Project/ # 项目存储库├── data/ # 数据集文件夹├── notebook/ .ipynb # Jupyter notebook文件├── pipelines/ # ZenML流水线文件夹│ ├── deployment_pipeline.py # 部署流水线│ ├── training_pipeline.py # 训练流水线│ └── *其他文件├──assets├── src/ # 源代码文件夹├── steps/ # ZenML步骤文件夹├── app.py # Web应用├── Dockerfile(*可选)├── requirements.txt # 项目所需软件包列表├── README.md # 项目文档└── .zen/
为了创建一个全面的端到端MLOps项目,建议遵循此项目模板。始终确保您的步骤文件和流水线文件组织在一个单独的文件夹中。包含详尽的文档以增强代码理解能力。在使用“zenml init”命令初始化ZenML时,.zen文件夹会自动创建。您还可以使用笔记本来存储您的Colab或Jupyter notebook文件。
3. 当处理步骤中的多个输出时,建议使用Tuple注释。
4. 记得将enable_cache设置为False,特别是在定期更新的管道运行中,例如动态导入新数据(我们将在本博客中详细介绍时间调度)。
ZenML服务器及其部署
ZenML服务器用作存储、管理和执行流水线的集中式中心。通过下面的图片,您可以了解其功能:
在此设置中,SQLite数据库存储所有的堆栈、组件和流水线。”部署”是指在生产环境中使您训练的模型对实时数据生成预测。ZenML提供两种部署选项:ZenML云和自托管部署。
步骤的执行顺序
默认情况下,ZenML按照定义的顺序执行步骤。但是,可以更改这个顺序。让我们来看一下如何操作:
from zenml import pipeline@pipeline定义 my_first_pipeline():# 在这里,我们指定了只有在步骤2执行完后才执行步骤1。 sample_step_1 = step_1(after="step_2") sample_step_2 = step_2()#然后,我们会在执行步骤1和步骤2之后执行步骤3。 step_3(sample_step_1, sample_step_2)
在这个场景中,我们修改了步骤的默认执行顺序。具体来说,我们安排了步骤1在步骤2之后运行,并且在步骤1和步骤2都执行完之后运行步骤3。
启用/禁用日志
您可以通过调整”enable_step_logs”参数来启用或禁用在工件存储中保存日志的功能。让我们看一下如何操作:
# 在此,我们正在为步骤禁用日志,作为一个参数。@step(enable_step_logs=False)def sample_step_2(input_1: str, input_2: str) -> None: print(input_1 + " " + input_2)
输出:
记录之前:

记录之后:

设置的类型
ZenML中有两种类型的设置:
- 通用设置:这些设置可以在所有的流水线中使用(例如Docker设置)。
- 堆栈组件特定设置:这些是运行时特定的配置设置,与静态的堆栈组件注册设置不同,因为这些是动态的。例如,MLFlowTrackingURL是一个注册设置,而实验名称及其相关的运行时配置是堆栈组件特定设置。堆栈组件特定设置可以在运行时进行覆盖,但注册设置不能。
定时调度模型
我们可以通过使用cron作业在特定时间自动部署ML模型。这不仅节省时间,还确保进程在指定的时间内按计划运行,不会延迟。让我们来看一下如何设置:
from zenml.config.schedule import Schedulefrom zenml import step,pipeline#定义步骤并返回一个字符串。@stepdef sample_step_1()->str: return "欢迎您"#接收两个输入并打印输出@stepdef sample_step_2(input_1:str,input_2:str)->None: print(input_1+" "+input_2) @pipelinedef my_first_pipeline(): logger.info("这是一个演示项目") input_1=sample_step_1() sample_step_2(input_1,"Analytics Vidya")#在这里,我们使用cron作业来调度我们的流水线。schedule = Schedule(cron_expression="0 7 * * 1")my_first_pipeline = my_first_pipeline.with_options(schedule=schedule)my_first_pipeline()
在这个上下文中,CRON作业表达式遵循格式(分钟、小时、月中的日期、月份、星期中的日期)。在这里,我已经计划使流水线每周一上午7点运行。
或者,我们还可以使用时间间隔:
from zenml.config.schedule import Schedulefrom zenml import pipeline@pipelinedef my_first_pipeline(): input_1 = sample_step_1() sample_step_2(input_1, "Analytics Vidya")#在这里,我们使用datetime.now()来指定当前时间和#interval_second参数用于指定需要执行的常规时间间隔.schedule = Schedule(start_time=datetime.now(), interval_second=3000)my_first_pipeline = my_first_pipeline.with_options(schedule=schedule)my_first_pipeline()
我编写了一段代码来启动我们的管道,从现在开始,并且每隔5分钟重复执行一次。
步骤上下文
步骤上下文用于访问有关当前执行的步骤的信息,例如其名称、运行名称和管道名称。这些信息对于日志记录和调试非常有价值。
#导入必要的模块from zenml import pipeline, stepfrom zenml.client import Clientfrom zenml.logger import get_loggerfrom zenml.config.schedule import Schedulefrom zenml import get_step_context#获取当前模块的日志记录器logger = get_logger(__name__)@stepdef sample_step_1() -> str:# 在步骤函数中访问步骤上下文 step_context = get_step_context() pipeline_name = step_context.pipeline.name run_name = step_context.pipeline_run.name step_name = step_context.step_run.name logger.info("Pipeline Name: %s", pipeline_name) logger.info("Run Name: %s", run_name) logger.info("Step Name: %s", step_name) logger.info("这是一个演示项目") return "欢迎"@step()def sample_step_2(input_1: str, input_2: str) -> None: # 在第二个步骤函数中访问步骤上下文 step_context = get_step_context() pipeline_name = step_context.pipeline.name run_name = step_context.pipeline_run.name step_name = step_context.step_run.name logger.info("Pipeline Name: %s", pipeline_name) logger.info("Run Name: %s", run_name) logger.info("Step Name: %s", step_name) print(input_1 + " " + input_2)@pipelinedef my_first_pipeline(): input_1 = sample_step_1() sample_step_2(input_1, "Analytics Vidya")my_first_pipeline()
输出:

结论
在这个综合指南中,我们涵盖了关于ZenML的所有必备知识,从安装到高级特性,如自定义执行顺序、创建时间计划表和利用步骤上下文。我们希望这些围绕ZenML的概念能够让您更高效地创建机器学习管道,使您的MLOps之旅更简单、更轻松。
主要要点
- ZenML通过@step和@pipeline等装饰器简化了机器学习管道的创建,使其对初学者来说更加易于上手。
- ZenML仪表板提供了对管道、堆栈组件、工件和运行的轻松跟踪,简化了项目管理。
- ZenML无缝集成了其他MLOps工具,如Weights & Biases和MLflow,增强了您的工具箱。
- 步骤上下文提供了关于当前步骤的有价值的信息,有助于有效的日志记录和调试。




