一个基于MLOps技术增强的客户流失预测项目
基于MLOps技术的客户流失预测项目的增强
介绍
当我们听到数据科学时,第一个想到的是在笔记本上建模并训练数据。但这在真实世界的数据科学中并不是这样。在现实世界中,数据科学家构建模型并将其投入生产环境中。生产环境在模型的开发、部署和可靠性之间存在差距,并为促进高效和可扩展的运营而使用MLOps(机器学习运营)在生产环境中构建和部署ML应用。在本文中,我们将使用MLOps构建和部署一个客户流失预测项目。

学习目标
在本文中,您将学到:
- 项目概述
- 我们将介绍ZenML和MLOPS的基础知识。
- 学习如何在本地部署模型进行预测
- 深入了解数据预处理和工程、训练和评估模型
本文是数据科学博客马拉松的一部分。
- 遇见密斯特拉尔·特里斯迈吉斯托7B:一份关于秘传、属灵、玄学和智慧传统的指令数据集…
- 用深度学习揭开基因调控:一种理解可变剪接的新人工智能方法
- 斯坦福大学研究人员提出了MLAgentBench:一套用于对比AI研究智能体的机器学习任务集合
项目概述
首先,我们需要了解我们的项目是什么。对于这个项目,我们拥有一家电信公司的数据集。现在,我们要建立一个模型来预测用户是否有可能继续使用该公司的服务。我们将借助ZenML和MLFlow来构建这个ML应用。以下是我们项目的工作流程。
我们项目的工作流程
- 数据收集
- 数据预处理
- 训练模型
- 评估模型
- 部署
什么是MLOps?
MLOps是从开发到部署和持续维护的端到端机器学习生命周期。MLOps是优化和自动化机器学习模型整个生命周期的实践,同时确保可扩展性、可靠性和效率。
让我们通过一个简单的例子来解释:
想象一下,在你的城市中建造一座摩天大楼。建筑物的建设已经完成。但缺乏电力、自来水、排水系统等。这座摩天大楼将无法运作,没有实际意义。
同样适用于机器学习模型。如果这些模型在不考虑模型的部署、可扩展性和长期维护的情况下设计,它们可能会变得无效和不切实际。这对于数据科学家在构建用于生产环境的机器学习模型时构成了重大障碍。

MLOps是一组最佳实践和策略,指导机器学习模型的生产、部署和长期维护。它确保这些模型不仅能够提供准确的预测,还能保持稳健、可扩展和有价值的公司资产。因此,没有MLOps,这些任务的高效完成将成为一场噩梦,具有挑战性。在本项目中,我们将解释MLOps的工作原理、不同阶段,并进行一个端到端的项目,展示如何构建一个客户流失预测模型。
介绍ZenML
ZenML是一个开源的MLOPS框架,用于构建便携和面向生产的流水线。ZenML框架将帮助我们使用MLOps完成这个项目。
⚠️ 如果您是Windows用户,请尝试在电脑上安装wsl。Zenml不支持Windows。
在继续进行项目之前。
MLOPS的基本概念
- 步骤:步骤是流水线或工作流中的单个任务单元。每个步骤表示需要执行的特定操作或操作,以开发机器学习工作流。例如,数据清理、数据预处理、训练模型等都是开发机器学习模型的某些步骤。
- 流水线:它们将多个步骤连接在一起,以创建用于机器学习任务的结构化和自动化的过程。例如,数据处理流水线、模型评估流水线和模型训练流水线。
开始入门
为项目创建虚拟环境:
conda create -n churn_prediction python=3.9
然后安装这些库:
pip install numpy pandas matplotlib scikit-learn
安装完毕后,安装ZenML:
pip install zenml["server"]
然后初始化ZenML存储库。
zenml init
![]()
如果屏幕显示这个画面,则意味着你已经成功初始化。此时,你的目录中会创建一个名为.zenml的文件夹。
在目录中创建一个用于存放数据的文件夹,通过这个链接获取数据:
按照这个结构创建文件夹。
![]()
数据收集
在这一步中,我们将从CSV文件中导入数据。这些数据将在清洗和编码后用于训练模型。
在文件夹“steps”中创建一个名为ingest_data.py的文件。
import pandas as pdimport numpy as npimport loggingfrom zenml import stepclass IngestData: """ 将数据收集到工作流中。 """ def __init__(self, path:str) -> None: """ Args: data_path(str): 数据文件的路径 """ self.path = path def get_data(self): df = pd.read_csv(self.path) logging.info("成功读取csv文件。") return df @step(enable_cache = False)def ingest_df(data_path:str) -> pd.DataFrame: """ 用于从CSV文件中获取数据的ZenML步骤。 """ try: # 创建IngestData类的实例,并将数据导入 ingest_data = IngestData(data_path) df = ingest_data.get_data() logging.info("数据导入完成") return df except Exception as e: # 如果数据导入失败,则记录错误消息并引发异常 logging.error("数据导入时发生错误") raise e
点击这里查看项目的链接。
在此代码中,我们首先创建了IngestData类来封装数据导入逻辑。然后,我们创建了一个名为ingest_df的ZenML步骤,它是数据收集流水线的一个单独单元。
在文件夹“pipeline”中创建一个名为training_pipeline.py的文件。
![]()
编写代码
from zenml import pipelinefrom steps.ingest_data import ingest_df# 定义一个名为training_pipeline的ZenML流水线。@pipeline(enable_cache=False)def train_pipeline(data_path:str): ''' 用于训练模型的数据流水线。 Args: data_path (str): 要导入的数据的路径。 ''' df = ingest_df(data_path=data_path)
在这里,我们正在创建一个训练流水线,用于使用一系列步骤训练机器学习模型。
然后,在基础目录中创建一个名为run_pipeline.py的文件以运行pipeline。
from pipelines.training_pipeline import train_pipelineif __name__ == '__main__': # 运行流水线 train_pipeline(data_path="/mnt/e/Customer_churn/data/WA_Fn-UseC_-Telco-Customer-Churn.csv")
此代码用于运行流水线。
现在,我们已经完成了数据导入流水线。让我们运行它。
在终端中运行以下命令:
python run_pipeline.py
![]()
然后,您可以看到表明训练流程已成功完成的命令。
数据预处理
在这一步中,我们将为清理数据创建不同的策略。不需要的列将被删除,并且使用标签编码对分类列进行编码。最后,数据将被分割为训练和测试数据。
在src目录中创建一个名为clean_data.py的文件。
在这个文件中,我们将创建用于清理数据的策略类。
import pandas as pdimport numpy as npimport loggingfrom sklearn.model_selection import train_test_splitfrom abc import abstractmethod, ABCfrom typing import Unionfrom sklearn.preprocessing import LabelEncoderclass DataStrategy(ABC): @abstractmethod def handle_data(self, df:pd.DataFrame) -> Union[pd.DataFrame,pd.Series]: pass # 数据预处理策略类class DataPreprocessing(DataStrategy): def handle_data(self, df: pd.DataFrame) -> Union[pd.DataFrame, pd.Series]: try: df['TotalCharges'] = df['TotalCharges'].replace(' ', 0).astype(float) df.drop('customerID', axis=1, inplace=True) df['Churn'] = df['Churn'].replace({'Yes': 1, 'No': 0}).astype(int) service = ['PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies'] for col in service: df[col] = df[col].replace({'No phone service': 'No', 'No internet service': 'No'}) logging.info("df的长度:", len(df.columns)) return df except Exception as e: logging.error("预处理错误", e) raise e# 特征编码策略类class LabelEncoding(DataStrategy): def handle_data(self, df: pd.DataFrame) -> Union[pd.DataFrame, pd.Series]: try: df_cat = ['gender', 'Partner', 'Dependents', 'PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling', 'PaymentMethod'] lencod = LabelEncoder() for col in df_cat: df[col] = lencod.fit_transform(df[col]) logging.info(df.head()) return df except Exception as e: logging.error(e) raise e # 数据拆分策略类class DataDivideStrategy(DataStrategy): def handle_data(self, df:pd.DataFrame) -> Union[pd.DataFrame, pd.Series]: try: X = df.drop('Churn', axis=1) y = df['Churn'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1) return X_train, X_test, y_train, y_test except Exception as e: logging.error("DataDividing错误", e) raise e
这段代码实现了一个用于机器学习的模块化数据预处理流程。它包括数据预处理、特征编码和数据拆分步骤,用于清理预测建模的数据。
1. DataPreprocessing:这个类负责删除不需要的列和处理数据集中的缺失值(NA值)。
2. LabelEncoding:LabelEncoding类旨在将分类变量编码为机器学习算法可以有效处理的数值格式。它将基于文本的类别转换为数字值。
3. DataDivideStrategy:这个类将数据集分成独立变量(X)和因变量(y),然后将数据分成训练集和测试集。
我们将逐步实施它们,为机器学习任务准备我们的数据。
这些策略确保数据的结构和格式对于模型的训练和评估是正确的。
在steps文件夹中创建data_cleaning.py文件。
import pandas as pdimport numpy as npfrom src.clean_data import DataPreprocessing, DataDivideStrategy, LabelEncodingimport loggingfrom typing_extensions import Annotatedfrom typing import Tuplefrom zenml import step# 为数据清洗和预处理定义一个ZenML步骤@step(enable_cache=False)def cleaning_data(df: pd.DataFrame) -> Tuple[ Annotated[pd.DataFrame, "X_train"], Annotated[pd.DataFrame, "X_test"], Annotated[pd.Series, "y_train"], Annotated[pd.Series, "y_test"],]: try: # 实例化DataPreprocessing策略 data_preprocessing = DataPreprocessing() # 对输入DataFrame应用数据预处理 data = data_preprocessing.handle_data(df) # 实例化LabelEncoding策略 feature_encode = LabelEncoding() # 对预处理后的数据应用标签编码 df_encoded = feature_encode.handle_data(data) # 记录DataFrame列的信息 logging.info(df_encoded.columns) logging.info("列数:", len(df_encoded)) # 实例化DataDivideStrategy策略 split_data = DataDivideStrategy() # 将编码后的数据拆分为训练集和测试集 X_train, X_test, y_train, y_test = split_data.handle_data(df_encoded) # 将拆分后的数据作为元组返回 return X_train, X_test, y_train, y_test except Exception as e: # 处理并记录数据清洗过程中发生的任何错误 logging.error("清洗数据步骤出错", e) raise e
在这一步中,我们实现了在clean_data.py中创建的策略
让我们在training_pipeline.py中实现这个步骤
from zenml import pipeline#importing steps from steps.ingest_data import ingest_dffrom steps.data_cleaning import cleaning_dataimport logging#定义一个名为training_pipeline的ZenML管道.@pipeline(enable_cache=False)def train_pipeline(data_path:str): ''' 用于训练模型的数据流水线。 ''' df = ingest_df(data_path=data_path) X_train, X_test, y_train, y_test = cleaning_data(df=df)
就这样,我们已经完成了训练流程中的数据预处理步骤。
模型训练
现在,我们将为这个项目建立模型。在这里,我们正在预测一个二分类问题。我们可以使用逻辑回归。我们的重点不是模型的准确性,而是基于MLOps的部分。
对于那些不了解逻辑回归的人,您可以在这里阅读相关内容。我们将实施与数据预处理步骤相同的步骤。首先,在src文件夹中创建一个名为training_model.py的文件。
import pandas as pdfrom sklearn.linear_model import LogisticRegressionfrom abc import ABC, abstractmethodimport logging#抽象模型类class Model(ABC): @abstractmethod def train(self,X_train:pd.DataFrame,y_train:pd.Series): """ 在给定数据上训练模型 """ pass class LogisticReg(Model): """ 实现逻辑回归模型。 """ def train(self, X_train: pd.DataFrame, y_train: pd.Series): """ 训练模型 Args: X_train: pd.DataFrame, y_train: pd.Series """ logistic_reg = LogisticRegression() logistic_reg.fit(X_train,y_train) return logistic_reg
我们定义了一个抽象的Model类,具有一个所有模型都必须实现的’train’方法。LogisticReg类是使用逻辑回归的具体实现。下一步是在steps文件夹中配置一个名为config.py的文件。创建一个名为config.py的文件并放置在steps文件夹中。
配置模型参数
from zenml.steps import BaseParameters"""该文件用于配置和指定与机器学习模型和训练过程相关的各种参数"""class ModelName(BaseParameters): """ 模型配置 """ model_name: str = "logistic regression"
在名为config.py的文件中,您正在配置与机器学习模型相关的参数。您可以创建一个继承自BaseParameters的ModelName类来指定模型名称。这使得更改模型类型变得容易。
import logging import pandas as pdfrom src.training_model import LogisticRegfrom zenml import stepfrom .config import ModelName#定义一个名为train_model的步骤@step(enable_cache=False)def train_model(X_train:pd.DataFrame,y_train:pd.Series,config:ModelName): """ 根据配置的模型训练数据 """ try: model = None if config == "logistic regression": model = LogisticReg() else: raise ValueError("不支持的模型名称") trained_model = model.train(X_train=X_train,y_train=y_train) return trained_model except Exception as e: logging.error("训练模型时发生错误",e) raise e
在steps文件夹中的名为model_train.py的文件中,使用ZenML定义一个名为train_model的步骤。这个步骤的目的是根据ModelName中的模型名称训练一个机器学习模型。
在程序中
检查配置的模型名称。如果是“logistic regression”,则创建LogisticReg模型的实例,并用提供的训练数据(X_train和y_train)对其进行训练。如果不支持该模型名称,则引发错误。在此过程中记录任何错误,并引发错误。
完成后,我们将在training_pipeline.py中实现这一步骤
from zenml import pipelinefrom steps.ingest_data import ingest_dffrom steps.data_cleaning import cleaning_datafrom steps.model_train import train_modelimport logging#定义一个名为training_pipeline的ZenML管道.@pipeline(enable_cache=False)def train_pipeline(data_path:str): ''' 用于训练模型的数据流水线。 ''' #将数据导入到ingesting data步骤:返回数据。 df = ingest_df(data_path=data_path) #清理数据的步骤。 X_train, X_test, y_train, y_test = cleaning_data(df=df) #训练模型 model = train_model(X_train=X_train,y_train=y_train)
现在,我们已经在流水线中实施了train_model步骤。 因此,model_train.py步骤已经完成。
评估模型
在这一步中,我们将评估我们的模型效果有多好。 为此,我们将检查预测测试数据的准确性得分。因此,首先我们要创建在流水线中将使用的策略。
在src文件夹中创建一个名为evaluate_model.py的文件。
import loggingfrom sklearn.metrics import confusion_matrix, classification_report, accuracy_scorefrom abc import ABC, abstractmethodimport numpy as np# 模型评估的抽象类class Evaluate(ABC): @abstractmethod def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray) -> float: """ 评估机器学习模型效果的抽象方法。 Args: y_true (np.ndarray): 真实标签。 y_pred (np.ndarray): 预测标签。 Returns: float: 评估结果。 """ pass# 计算准确性得分的类class Accuracy_score(Evaluate): """ 计算并返回模型预测的准确性得分。 """ def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray) -> float: try: accuracy_scr = accuracy_score(y_true=y_true, y_pred=y_pred) * 100 logging.info("准确性得分:", accuracy_scr) return accuracy_scr except Exception as e: logging.error("在评估模型准确性时出错",e) raise e# 计算精确性得分的类class Precision_Score(Evaluate): def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray) -> float: """ 生成并返回模型预测的精确性得分。 """ try: precision = precision_score(y_true=y_true,y_pred=y_pred) logging.info("精确性得分: ",precision) return float(precision) except Exception as e: logging.error("计算精确性得分时出错",e) raise eclass F1_Score(Evaluate): def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray): """ 生成并返回模型预测的F1得分。 """ try: f1_scr = f1_score(y_pred=y_pred, y_true=y_true) logging.info("F1得分: ", f1_scr) return f1_scr except Exception as e: logging.error("计算F1得分时出错", e) raise e
现在,我们已经构建了评估策略,我们将使用它们来评估模型。让我们在steps文件夹中实现代码evaluate_model.py中的代码。在这里,召回率、准确性得分和精确性得分是我们用作评估模型指标的策略。
让我们在步骤中实现这些。在steps文件夹中创建一个名为evaluation.py的文件:
import loggingimport pandas as pdimport numpy as npfrom zenml import stepfrom src.evaluate_model import ClassificationReport, ConfusionMatrix, Accuracy_scorefrom typing import Tuplefrom typing_extensions import Annotatedfrom sklearn.base import ClassifierMixin@step(enable_cache=False)def evaluate_model( model: ClassifierMixin, X_test: pd.DataFrame, y_test: pd.Series) -> Tuple[ Annotated[np.ndarray,"confusion_matix"], Annotated[str,"classification_report"], Annotated[float,"accuracy_score"], Annotated[float,"precision_score"], Annotated[float,"recall_score"] ]: """ 使用常见的度量指标评估机器学习模型的性能。 """ try: y_pred = model.predict(X_test) precision_score_class = Precision_Score() precision_score = precision_score_class.evaluate_model(y_pred=y_pred,y_true=y_test) mlflow.log_metric("精确性得分 ",precision_score) accuracy_score_class = Accuracy_score() accuracy_score = accuracy_score_class.evaluate_model(y_true=y_test, y_pred=y_pred) logging.info("准确性得分:",accuracy_score) return accuracy_score, precision_score except Exception as e: logging.error("在评估模型时出错",e) raise e
现在,让我们将这一步实现到流水线中。 更新training_pipeline.py:
这段代码在机器学习流水线中定义了一个evaluate_model步骤。 它以训练好的分类模型(model)、独立的测试数据(X_test)和测试数据的真实标签(y_test)作为输入。 然后,它使用常见的分类度量指标评估模型的性能,并返回结果,如精确性得分和准确性得分。
现在,让我们在流水线中实施这一步骤。更新 training_pipeline.py :
from zenml import pipelinefrom steps.ingest_data import ingest_dffrom steps.data_cleaning import cleaning_datafrom steps.model_train import train_modelfrom steps.evaluation import evaluate_modelimport logging# 定义一个名为training_pipeline的ZenML流水线.@pipeline(enable_cache=False)def train_pipeline(data_path:str): '''训练模型的数据流水线。 args:data_path(str):要导入的数据路径。 ''' #步骤导入数据:返回数据。 df = ingest_df(data_path=data_path) #步骤清理数据。 X_train,X_test,y_train,y_test = cleaning_data(df=df) #培训模型 model = train_model(X_train=X_train,y_train=y_train) #数据的评估指标 accuracy_score,precision_score = evaluate_model(model=model,X_test=X_test,y_test=y_test)
就是这样。现在,我们已经完成了培训流水线。运行
python run_pipeline.py
在终端中。如果它成功运行。现在,我们已经成功运行本地培训流水线,它将如下所示:
实验跟踪器是什么?
实验跟踪器是机器学习中用于记录、监控和管理各种实验的工具。开展数据科学家试验不同模型以获得最佳结果。因此,他们需要跟踪数据并使用不同的模型。如果他们使用Excel表格手动记录,那将非常困难。
MLflow
MLflow是一种有助于高效跟踪和管理机器学习实验的工具。它自动化实验跟踪、监控模型迭代和相关数据。这简化了模型开发过程,并提供了一个用户友好的界面来可视化结果。
将MLflow与ZenML集成可增强机器学习操作框架中的实验稳健性和管理性。
要在ZenML中设置MLflow,请按照以下步骤进行操作:
- 安装MLflow集成:
- 使用以下命令安装MLflow集成:
zenml integration install mlflow -y
2. 注册MLflow实验跟踪器:
使用此命令在MLflow中注册实验跟踪器:
zenml experiment-tracker register mlflow_tracker --flavor=mlflow
3. 注册堆栈:
在ZenML中,堆栈是定义ML工作流中的任务的组件集合。它有助于高效组织和管理ML流水线步骤。使用以下命令注册堆栈:
您可以在文档中找到更多细节。
zenml model-deployer register mlflow --flavor=mlflowzenml stack register mlflow_stack -a default -o default -d mlflow -e mlflow_tracker --set
这将将您的Stack与特定的制品存储、编排程序、部署目标和实验跟踪关联起来。
4. 查看堆栈详细信息:
您可以使用以下命令查看您的堆栈组件:
zenml stack describe
这将显示与“mlflow_tracker”堆栈关联的组件。
现在,让我们在训练模型中实现一个实验跟踪器并评估模型:
您可以看到组件的名称为mlflow_tracker。
设置ZenML实验跟踪器
首先,开始更新 train_model.py:
import loggingimport mlflowimport pandas as pdfrom src.training_model import LogisticRegfrom sklearn.base import ClassifierMixinfrom zenml import stepfrom .config import ModelName#import from zenml.client import Client# 获取活动堆栈的实验跟踪器experiment_tracker = Client().active_stack.experiment_tracker#定义名为train_model的步骤@步骤(experiment_tracker = experiment_tracker.name,enable_cache=False)def train_model( X_train:pd.DataFrame, y_train:pd.Series, config:ModelName )-> ClassifierMixin: """ 基于配置的模型训练数据 Args: X_train:pd.DataFrame = 独立的训练数据, y_train:pd.Series = 依赖的训练数据。 """ 尝试: model = None if config.model_name ==“逻辑回归”: #自动记录分数、模型等... mlflow.sklearn.autolog() model = LogisticReg() else: raise ValueError(“不支持的模型名称”) trained_model = model.train(X_train=X_train,y_train=y_train) logging.info(“训练模型完成。”) return trained_model except Exception as e: logging.error(“训练模型步骤错误”,e) raise e
在这段代码中,我们使用 mlflow.sklearn.autolog()来设置实验跟踪器,该跟踪器会自动记录有关模型的所有细节,使得跟踪和分析实验变得更加容易。
在 evaluation.py
from zenml.client import Clientexperiment_tracker = Client().active_stack.experiment_tracker@step(experiment_tracker=experiment_tracker.name, enable_cache = False)
运行管道
将您的 run_pipeline.py 脚本更新如下:
from pipelines.training_pipeline import train_pipelinefrom zenml.client import Clientif __name__ == '__main__': # 打印实验跟踪 URI print(Client().active_stack.experiment_tracker.get_tracking_uri()) # 运行管道 train_pipeline(data_path="/mnt/e/Customer_churn/data/WA_Fn-UseC_-Telco-Customer-Churn.csv")
复制并粘贴到此命令中。
![]()
mlflow ui --backend-store-uri "--uri 文件顶部的文件:/home/ "
探索您的实验
单击上述命令生成的链接打开 MLflow 用户界面。在这里,您将会发现一宝库的见解:
![]()
- 管道:轻松访问您运行过的所有管道。
![]()
- 模型详情:点击管道可了解关于模型的每个细节。
- 指标:深入指标部分可视化您的模型性能。
现在,你可以用 ZenML 和 MLflow 来掌控你的机器学习实验跟踪了!
部署
在接下来的部分,我们将部署这个模型。您需要了解以下概念:
a). 连续部署管道
这个管道将自动化模型部署过程。一旦模型通过了评估标准,它将自动部署到生产环境中。例如,它从数据预处理、数据清洗、数据训练、模型评估等开始。
b). 推理部署管道
推理部署管道专注于将机器学习模型部署到实时或批量推断中。推理部署管道专门用于在生产环境中部署用于进行预测的模型。例如,它设置了一个 API 端点,用户可以发送文本到此端点。它确保模型的可用性和可扩展性,并监控其实时性能。这些管道对于维护机器学习系统的效率和效果至关重要。现在,我们将要实现连续管道。
在 pipelines 文件夹中创建一个名为 deployment_pipeline.py 的文件。
import numpy as npimport jsonimport loggingimport pandas as pdfrom zenml import pipeline, stepfrom zenml.config import DockerSettingsfrom zenml.constants import DEFAULT_SERVICE_START_STOP_TIMEOUTfrom zenml.integrations.constants import MLFLOWfrom zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import ( MLFlowModelDeployer,)from zenml.integrations.mlflow.services import MLFlowDeploymentServicefrom zenml.integrations.mlflow.steps import mlflow_model_deployer_stepfrom zenml.steps import BaseParameters, Outputfrom src.clean_data import FeatureEncodingfrom .utils import get_data_for_testfrom steps.data_cleaning import cleaning_datafrom steps.evaluation import evaluate_modelfrom steps.ingest_data import ingest_df# 使用 MLflow 集成定义 Docker 设置docker_settings = DockerSettings(required_integrations = {MLFLOW})# 定义部署管道配置的类class DeploymentTriggerConfig(BaseParameters): min_accuracy:float = 0.92@step def deployment_trigger( accuracy: float, config: DeploymentTriggerConfig,): """ 仅当准确度大于最小准确度时触发部署。 参数: accuracy: 模型的准确度。 config: 最小准确度阈值。 """ try: return accuracy >= config.min_accuracy except Exception as e: logging.error("部署触发器错误",e) raise e# 定义连续管道@pipeline(enable_cache=False,settings={"docker":docker_settings})def continuous_deployment_pipeline( data_path:str, min_accuracy:float = 0.92, workers: int = 1, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT): df = ingest_df(data_path=data_path) X_train, X_test, y_train, y_test = cleaning_data(df=df) model = train_model(X_train=X_train, y_train=y_train) accuracy_score, precision_score = evaluate_model(model=model, X_test=X_test, y_test=y_test) deployment_decision = deployment_trigger(accuracy=accuracy_score) mlflow_model_deployer_step( model=model, deploy_decision = deployment_decision, workers = workers, timeout = timeout )
ZenML框架用于机器学习项目
这段代码使用ZenML框架定义了一个用于机器学习项目的持续部署。
1. 导入必要的库:导入部署模型所需的必要库。
2. Docker设置:通过配置Docker设置与MLflow一起使用,Docker可以一致地打包和运行这些模型。
3. DeploymentTriggerConfig: 这是一个配置模型部署的最低准确度阈值的类。
4. deployment_trigger: 如果模型准确度超过最低准确度,此步骤将返回。
5. continuous_deployment_pipeline: 此管道由多个步骤组成:数据摄取、数据清理、模型训练和模型评估。模型只会在满足最低准确度阈值时进行部署。
接下来,我们将在deployment_pipeline.py中实现推理管道。
从 zenml.steps 中导入 BaseParameters, Output从 zenml.integrations.mlflow.model_deployers.mlflow_model_deployer 导入 MLFlowModelDeployer从 zenml.integrations.mlflow.services 导入 MLFlowDeploymentServiceclass MLFlowDeploymentLoaderStepParameters(BaseParameters): pipeline_name: str step_name: str running: bool = True@step(enable_cache=False)def dynamic_importer() -> str: data = get_data_for_test() return data@step(enable_cache=False)def prediction_service_loader( pipeline_name: str, pipeline_step_name: str, running: bool = True, model_name: str = "model",) -> MLFlowDeploymentService: model_deployer = MLFlowModelDeployer.get_active_model_deployer() existing_services = model_deployer.find_model_server( pipeline_name=pipeline_name, pipeline_step_name=pipeline_step_name, model_name=model_name, running=running, ) if not existing_services: raise RuntimeError( f"当前没有由{pipeline_name}管道中的{pipeline_step_name}步骤部署的" f"{model_name}模型的MLflow预测服务正在运行。" ) return existing_services[0]@stepdef predictor(service: MLFlowDeploymentService, data: str) -> np.ndarray: service.start(timeout=10) data = json.loads(data) prediction = service.predict(data) return prediction@pipeline(enable_cache=False, settings={"docker": docker_settings})def inference_pipeline(pipeline_name: str, pipeline_step_name: str): batch_data = dynamic_importer() model_deployment_service = prediction_service_loader( pipeline_name=pipeline_name, pipeline_step_name=pipeline_step_name, running=False, ) prediction = predictor(service=model_deployment_service, data=batch_data) return prediction
这段代码设置了一个管道,用于通过MLflow对部署的机器学习模型进行预测。它导入数据,加载部署的模型,并使用其进行预测。
我们需要在pipelines文件夹中的utils.py中创建get_data_for_test()函数,以便更高效地管理我们的代码。
从 src.clean_data 中导入 DataPreprocessing, LabelEncoding# 为测试目的获取数据的函数def get_data_for_test(): try: df = pd.read_csv('./data/WA_Fn-UseC_-Telco-Customer-Churn.csv') df = df.sample(n=100) data_preprocessing = DataPreprocessing() data = data_preprocessing.handle_data(df) # 实例化FeatureEncoding策略 label_encode = LabelEncoding() df_encoded = label_encode.handle_data(data) df_encoded.drop(['Churn'],axis=1,inplace=True) logging.info(df_encoded.columns) result = df_encoded.to_json(orient="split") return result except Exception as e: logging.error("e") raise e
现在,让我们实现我们创建的管道,以部署模型并在部署模型上进行预测。
在项目目录中创建run_deployment.py文件:
从 click 中导入 click # 用于处理命令行参数import logging from typing import cast从 rich 中导入 print # 用于控制台输出格式化引入部署和推理的管道from pipelines.deployment_pipeline 导入 continuous_deployment_pipeline, inference_pipeline# 导入MLflow实用程序和组件from zenml.integrations.mlflow.mlflow_utils 导入 get_tracking_urifrom zenml.integrations.mlflow.model_deployers.mlflow_model_deployer 导入 MLFlowModelDeployerfrom zenml.integrations.mlflow.services 导入 MLFlowDeploymentService# 定义不同配置的常量:DEPLOY, PREDICT, DEPLOY_AND_PREDICTDEPLOY = "deploy"PREDICT = "predict"DEPLOY_AND_PREDICT = "deploy_and_predict"# 定义一个使用Click处理命令行参数的主要函数@click.command()@click.option( "--config", "-c", type=click.Choice([DEPLOY, PREDICT, DEPLOY_AND_PREDICT]), default=DEPLOY_AND_PREDICT, help="如果需要,您可以选择仅运行部署管道以训练和部署模型(`deploy`)," "或仅对部署模型进行预测(`predict`)。默认情况下,两者都将运行(`deploy_and_predict`)。",)@click.option( "--min-accuracy", default=0.92, help="部署模型所需的最低准确度",)def run_main(config:str, min_accuracy:float ): # 获取活动的MLFlow模型部署组件 mlflow_model_deployer_component = MLFlowModelDeployer.get_active_model_deployer() # 确定用户是要部署模型(deploy),还是要进行预测(predict),或同时进行(deploy_and_predict) deploy = config == DEPLOY or config == DEPLOY_AND_PREDICT predict = config == PREDICT or config == DEPLOY_AND_PREDICT # 如果请求部署模型: if deploy: continuous_deployment_pipeline( data_path='/mnt/e/Customer_churn/data/WA_Fn-UseC_-Telco-Customer-Churn.csv', min_accuracy=min_accuracy, workers=3, timeout=60 ) # 如果请求进行预测: if predict: # 初始化推理管道运行 inference_pipeline( pipeline_name="continuous_deployment_pipeline", pipeline_step_name="mlflow_model_deployer_step", ) # 打印在MLflow UI中查看实验运行的指令 print( "您可以运行:\n " f"[italic green] mlflow ui --backend-store-uri '{get_tracking_uri()}" "[/italic green]\n ... 来在MLflow UI中查看您的实验运行。" "您可以在`mlflow_example_pipeline`实验中找到您的运行。" "在那里,您还可以比较两个或多个运行。\n\n" ) # 检索具有相同管道名称、步骤名称和模型名称的现有服务 existing_services = mlflow_model_deployer_component.find_model_server( pipeline_name = "continuous_deployment_pipeline", pipeline_step_name = "mlflow_model_deployer_step", ) # 检查预测服务器的状态: if existing_services: service = cast(MLFlowDeploymentService, existing_services[0]) if service.is_running: print( f"MLflow预测服务器正在本地作为守护进程服务运行,并接受推理请求,地址为:\n" f" {service.prediction_url}\n" f"要停止服务,请运行" f"[italic green]zenml model-deployer models delete" f"{str(service.uuid)}'[/italic green]。" ) elif service.is_failed: print( f"MLflow预测服务器处于失败状态:\n" f" 上一个状态:'{service.status.state.value}'\n" f" 上一个错误:'{service.status.last_error}'" ) else: print( "当前没有运行任何MLflow预测服务器。首先必须运行部署 pipeline" "来训练模型并部署它。执行相同的命令并带上'--deploy'参数来部署模型。" ) # 入口点:如果直接执行此脚本,则运行主要函数if __name__ == "__main__": run_main()
此代码是使用MLFlow和ZenML管理和部署机器学习模型的命令行脚本。
现在,让我们部署模型。
在终端上运行以下命令。
python run_deployment.py --config deploy
现在,我们已经部署了我们的模型。您的流水线将成功运行,并且您可以在zenml仪表板中查看它们。
![]()
python run_deployment.py --config predict
启动预测过程
![]()
现在,我们的MLFlow预测服务器正在运行。
我们需要一个Web应用程序来输入数据并查看结果。您可能想知道为什么我们必须从头开始创建Web应用程序。
实际上不是。我们将使用Streamlit,这是一个开源的前端框架,可以帮助您快速轻松地构建用于机器学习模型的前端Web应用程序。
安装库
pip install streamlit
在您的项目目录中创建一个名为streamlit_app.py的文件。
import jsonimport loggingimport numpy as npimport pandas as pdimport streamlit as stfrom PIL import Imagefrom pipelines.deployment_pipeline import prediction_service_loaderfrom run_deployment import maindef main(): st.title("使用ZenML的端到端客户满意度流水线") st.markdown( """ #### 问题陈述 这里的目标是预测给定订单的客户满意度得分,基于订单状态、价格、付款等特征。我将使用[ZenML](https://zenml.io/)来构建一个用于预测下一个订单或购买的客户满意度得分的生产就绪流水线。 """ ) st.markdown( """ 上面是整个流水线的图示,我们首先摄入数据,清洗数据,训练模型,评估模型,并且如果数据源发生变化或任何超参数值发生变化,将会触发部署并(重新)训练模型,如果模型满足最低准确性要求,将部署模型。 """ ) st.markdown( """ #### 特征描述 该应用程序旨在预测给定客户的客户满意度得分。您可以输入下面列出的产品特征并获取客户满意度得分。 | Models | 描述 | | ------------- | - | | SeniorCitizen | 指示客户是否为老年人。 | | tenure | 客户与公司合作的月数。 | | MonthlyCharges | 客户产生的月费用。 | | TotalCharges | 客户产生的总费用。 | | gender | 客户的性别(男:1,女:0)。 | | Partner | 客户是否有伴侣(是:1,否:0)。 | | Dependents | 客户是否有抚养人(是:1,否:0)。 | | PhoneService | 客户是否有电话服务(是:1,否:0)。 | | MultipleLines | 客户是否有多条线路(是:1,否:0)。 | | InternetService | 网络服务类型(否:1,其他:0)。 | | OnlineSecurity | 客户是否有在线安全服务(是:1,否:0)。 | | OnlineBackup | 客户是否有在线备份服务(是:1,否:0)。 | | DeviceProtection | 客户是否有设备保护服务(是:1,否:0)。 | | TechSupport | 客户是否有技术支持服务(是:1,否:0)。 | | StreamingTV | 客户是否有流媒体电视服务(是:1,否:0)。 | | StreamingMovies | 客户是否有流媒体电影服务(是:1,否:0)。 | | Contract | 合同类型(一年:1,其他:0)。 | | PaperlessBilling | 客户是否采用无纸化计费(是:1,否:0)。 | | PaymentMethod | 付款方式(信用卡:1,其他:0)。 | | Churn | 客户是否流失(是:1,否:0)。 | """ ) payment_options = { 2: "电子支票", 3: "邮寄支票", 1: "银行转账(自动)", 0: "信用卡(自动)" } contract = { 0: "按月付款", 2: "两年", 1: "一年" } def format_func(PaymentMethod): return payment_options[PaymentMethod] def format_func_contract(Contract): return contract[Contract] display = ("男性", "女性") options = list(range(len(display))) # 定义具有其各自值的数据列 SeniorCitizen = st.selectbox("您是否是老年人?", options=[True, False],) tenure = st.number_input("合作时间") MonthlyCharges = st.number_input("月费用:") TotalCharges = st.number_input("总费用:") gender = st.radio("性别:", options, format_func=lambda x: display[x]) Partner = st.radio("您是否有伴侣? ", options=[True, False]) Dependents = st.radio("有抚养人吗? ", options=[True, False]) PhoneService = st.radio("您是否有电话服务?: ", options=[True, False]) MultipleLines = st.radio("是否有多条线路? ", options=[True, False]) InternetService = st.radio("是否订阅互联网服务? ", options=[True, False]) OnlineSecurity = st.radio("是否订阅在线安全服务? ", options=[True, False]) OnlineBackup = st.radio("是否订阅在线备份服务? ", options=[True, False]) DeviceProtection = st.radio("是否仅订阅设备保护服务?", options=[True, False]) TechSupport =st.radio("是否订阅技术支持? ", options=[True, False]) StreamingTV = st.radio("是否订阅电视流媒体", options=[True, False]) StreamingMovies = st.radio("是否订阅流媒体电影? ", options=[True, False]) Contract = st.radio("合同期限: ", options=list(contract.keys()), format_func=format_func_contract) PaperlessBilling = st.radio("是否使用无纸化计费? ", options=[True, False]) PaymentMethod = st.selectbox("付款方式:", options=list(payment_options.keys()), format_func=format_func) # 使用付款方式获取所选付款方式的数值 if st.button("预测"): service = prediction_service_loader( pipeline_name="continuous_deployment_pipeline", pipeline_step_name="mlflow_model_deployer_step", running=False, ) if service is None: st.write( "找不到服务。将首先运行流水线以创建服务。" ) run_main() try: data_point = { 'SeniorCitizen': int(SeniorCitizen), 'tenure': tenure, 'MonthlyCharges': MonthlyCharges, 'TotalCharges': TotalCharges, 'gender': int(gender), 'Partner': int(Partner), 'Dependents': int(Dependents), 'PhoneService': int(PhoneService), 'MultipleLines':这段代码定义了一个 StreamLit,将为基于客户数据和人口统计学详细信息,在电信公司中预测客户流失提供前端界面。
用户可以通过用户友好界面输入他们的信息,代码使用经过训练的机器学习模型(使用 ZenML 和 MLflow 部署)进行预测。
预测结果随后显示给用户。
现在运行以下命令:
⚠️ 确保您的预测模型正在运行
streamlit run streamlit_app.py
点击链接。
就是这样;我们完成了我们的项目。


就是这样,我们已经成功完成了我们的端到端机器学习项目,展示了专业人士如何处理整个过程。
结论
通过开发和部署客户流失预测模型,我们在机器学习运营(MLOps)的全面探索中见证了 MLOps 的变革力量,它可以简化机器学习生命周期。从数据收集和预处理到模型训练、评估和部署,我们的项目展示了 MLOps 在开发和生产之间构建桥梁的重要作用。随着组织越来越依赖于数据驱动的决策,这里展示的高效可扩展实践突显了 MLOps 在确保机器学习应用成功方面的关键重要性。
关键要点
- MLOps(机器学习运营)对于简化端到端机器学习生命周期至关重要,确保高效、可靠和可扩展的运营。
- ZenML 和 MLflow 是功能强大的框架,有助于在实际应用中开发、追踪和部署机器学习模型。
- 正确的数据预处理,包括清洁、编码和拆分,是构建健壮的机器学习模型的基础。
- 准确率、精确度、召回率和 F1 分数等评估指标提供了对模型性能的全面理解。
- 像 MLflow 这样的实验跟踪工具增强了数据科学项目中的协作和实验管理。
- 持续和推理部署管道对于在生产环境中保持模型效率和可用性至关重要。
常见问题
本文章中所显示的媒体不属于 Analytics Vidhya 所有,并由作者自行决定使用。





