使用ZenML和Streamlit进行员工流失率预测

使用ZenML和Streamlit进行员工流失率预测的高效方法

介绍

你是作为HR工作的吗?你是否苦于预测团队中的员工是否会继续工作或考虑离开组织?别担心!你不需要成为占星家来预测这个问题,通过使用数据科学的力量,我们可以准确地预测。让我们用一个简单而强大的MLOps工具——ZenML和streamlit开始我们的员工离职率的精彩之旅。让我们开始我们的旅程。

学习目标

在这篇文章中,我们将学到:

  • ZenML是什么?为什么以及如何使用它?
  • 为什么要使用MLflow以及如何与ZenML集成?
  • 使用部署管道的必要性
  • 实施员工离职率项目并进行预测

这篇文章是作为数据科学博文马拉松的一部分发表的。

项目实施

问题陈述:根据年龄、收入、表现等多个因素预测员工是否会离开组织。

解决方案:建立逻辑回归模型来预测员工的离职率。

数据集:IBM人力资源分析员工离职和绩效

[来源]:https://www.kaggle.com/datasets/pavansubhasht/ibm-hr-analytics-attrition-dataset

在看到我们的项目实施之前,我们先来看一下为什么我们在这里使用ZenML。

为什么选择ZenML?

ZenML是一个简单而强大的MLOps编排工具,用于创建ML管道,缓存管道步骤和节省计算资源。ZenML还提供与多个ML工具的集成,使其成为创建ML管道的最佳工具之一。我们可以跟踪我们的模型步骤、评估指标,可以在仪表盘中直观地看到我们的管道,等等。

在这个项目中,我们将实施一个传统的使用ZenML的管道,并将mlflow与zenml集成,用于实验跟踪。我们还将使用MLflow与ZenML集成实现一个持续部署管道,该管道将摄取和清理数据,训练模型,并在满足一定的评估标准时重新部署模型。通过这个管道,我们可以确保,如果任何新模型的表现优于先前模型的阈值预测值,那么MLFlow部署服务器将使用新模型而不是旧模型进行更新。

常见的ZenML术语

  • 管道:我们项目中的步骤序列。
  • 组件:MLOps管道中的构建块或特定函数。
  • 堆栈:本地/云中组件的集合。
  • 工件:我们项目中的步骤的输入和输出数据,存储在工件存储器中。
  • 工件存储器:用于存储和版本跟踪工件的存储空间。
  • 实体化组件:定义工件如何从工件存储器中存储和检索的组件。
  • 口味:特定工具和用例的解决方案。
  • ZenML服务器:远程运行堆栈组件的部署。

先决条件和基本的ZenML命令

  • 激活您的虚拟环境:
#创建一个虚拟环境python3 -m venv venv#在项目文件夹中激活您的虚拟环境source venv/bin/activate
  • ZenML 命令:

下面给出了所有基本的 ZenML 命令及其功能:

#安装 zenmlpip install zenml#在本地启动 zenml 服务器和仪表盘pip install "zenml[server]"#查看 zenml 版本信息zenml version#初始化新仓库zenml init#在本地运行仪表盘zenml up#了解我们的 zenml 管道的状态zenml show

了解这些命令是与 ZenML 一起工作的必要条件。

将 MLflow 与 ZenML 集成

我们使用 mlflow 作为实验跟踪器,用于跟踪我们的模型、工件和超参数值。我们在这里注册了 stack 组件、实验跟踪器和模型部署器:

#将 mlflow 与 ZenML 集成zenml integration install mlflow -y#注册实验跟踪器zenml experiment-tracker register mlflow_tracker_employee --flavor=mlflow#注册模型部署器zenml model-deployer register mlflow_employee --flavor=mlflow#注册 stackzenml stack register mlflow_stack_employee -a default -o default -d mlflow_employee -e mlflow_tracker_employee --set

Zenml 栈列表

项目结构

employee-attrition-prediction/          # 项目目录├── data/                               │   └── HR-Employee-Attrition.csv       # 数据集文件│├── pipelines/                          │   ├── deployment_pipeline.py          # 部署流程│   ├── training_pipeline.py            # 训练流程│   └── utils.py                        │├── src/                                # 源码 │   ├── data_cleaning.py                # 数据清理和预处理│   ├── evaluation.py                   # 模型评估│   └── model_dev.py                    # 模型开发│                 ├── steps/                              # ZenML 步骤的代码文件│   ├── ingest_data.py                  # 数据摄入│   ├── clean_data.py                   # 数据清理和预处理│   ├── model_train.py                  # 模型训练    │   ├── evaluation.py                   # 模型评估│   └── config.py                       │├── streamlit_app.py                    # Streamlit Web 应用程序│├── run_deployment.py                   # 运行部署和预测流程的代码├── run_pipeline.py                     # 运行训练流程的代码│├── requirements.txt                    # 项目所需软件包的列表├── README.md                           # 项目文档└── .zen/                               # ZenML 目录(ZenML 初始化后自动生成)

数据摄入

我们首先从 data 文件夹中摄入 HR-Employee-Attrition-Rate 数据集的数据。

import pandas as pdfrom zenml import stepclass IngestData:    def get_data(self) -> pd.DataFrame:        df = pd.read_csv("./data/HR-Employee-Attrition.csv")        return df@stepdef ingest_data() -> pd.DataFrame:    ingest_data = IngestData()    df = ingest_data.get_data()    return df    

@step 是一个装饰器,用于将函数 ingest_data() 设置为流程的一步。

探索性数据分析

#了解数据信息df.info()# 查看数据样本df.describe()# 检查样本数据df.head()# 检查空值df.isnull.sum()# 检查留下和离开公司的人的比例:df['Attrition'].value_counts()df_left = df[df['Attrition'] == "Yes"]df_stayed = df[df['Attrition'] == "No"]left_percentage=df_left.shape[0]*100/df.shape[0]stayed_percentage=df_stayed.shape[0]*100/df.shape[0]print(f"离开公司的人的百分比为:{left_percentage}")print(f"留在公司的人的百分比为:{stayed_percentage}")#分析留下和离开公司人员之间的特征差异df_left.describe()df_stayed.describe()

输出

观察

  • 离职员工在公司工作的时间较短。
  • 离职员工比留任员工年轻。
  • 离职员工的办公地点距离家中较远。

数据清洗和处理

  • 数据清洗:我们删除了数据集中不需要的列,例如:“EmployeeCount”,“EmployeeNumber”,“StandardHours”,然后我们将只有Yes(或)No之间数据值的特征更改为二进制1(或)0。
  • 热编码:然后,我们对分类列进行了热编码,例如“BusinessTravel”,“Department”,“EducationField”,“Gender”,“JobRole”,“MaritalStatus”。
import pandas as pdclass DataPreProcessStrategy(DataStrategy):    def __init__(self, encoder=None):        self.encoder = encoder    """此类用于预处理给定的数据集"""    def handle_data(self, data: pd.DataFrame) -> pd.DataFrame:        try:            print("预处理前列名:", data.columns)  # 添加此行            data = data.drop(["EmployeeCount", "EmployeeNumber", "StandardHours"], axis=1)            if 'Attrition' in data.columns:                print("数据中存在离职列。")            else:                print("数据中不存在离职列。")            data["Attrition"] = data["Attrition"].apply(lambda x: 1 if x == "Yes" else 0)            data["Over18"] = data["Over18"].apply(lambda x: 1 if x == "Yes" else 0)            data["OverTime"] = data["OverTime"].apply(lambda x: 1 if x == "Yes" else 0)            # 提取分类变量            cat = data[['BusinessTravel', 'Department', 'EducationField', 'Gender', 'JobRole', 'MaritalStatus']]            # 对分类变量进行独热编码            onehot = OneHotEncoder()            cat_encoded = onehot.fit_transform(cat).toarray()                                    # 将cat_encoded转换为DataFrame            cat_df = pd.DataFrame(cat_encoded)            # 提取数值变量            numerical = data[['Age', 'Attrition', 'DailyRate', 'DistanceFromHome', 'Education', 'EnvironmentSatisfaction', 'HourlyRate', 'JobInvolvement', 'JobLevel', 'JobSatisfaction', 'MonthlyIncome', 'MonthlyRate', 'NumCompaniesWorked', 'Over18', 'OverTime', 'PercentSalaryHike', 'PerformanceRating', 'RelationshipSatisfaction', 'StockOptionLevel', 'TotalWorkingYears', 'TrainingTimesLastYear', 'WorkLifeBalance', 'YearsAtCompany', 'YearsInCurrentRole', 'YearsSinceLastPromotion', 'YearsWithCurrManager']]            # 将X_cat_df和X_numerical连接起来            data = pd.concat([cat_df, numerical], axis=1)            print("预处理后列名:", data.columns)  # 添加此行            print("预处理后的数据:")            print(data.head())            return data        except Exception as e:            logging.error(f"数据预处理错误:{e}")            raise e            

输出

经过所有数据清洗和处理后,数据如下所示:您可以在下图中看到,数据在编码完成后只包含数字数据。

拆分数据

然后,我们将以80:20的比例拆分训练集和测试集。

from sklearn.model_selection import train_test_splitclass DataDivideStrategy(DataStrategy):    def handle_data(self, data: pd.DataFrame) -> Union[pd.DataFrame, pd.Series]:        try:            # 检查数据中是否存在“Attrition”            if 'Attrition' in data.columns:                X = data.drop(['Attrition'], axis=1)                Y = data['Attrition']                X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)                return X_train, X_test, Y_train, Y_test            else:                raise ValueError("数据中未找到“Attrition”列。")        except Exception as e:            logging.error(f"数据处理错误:{str(e)}")            raise e            

模型训练

由于这是一个分类问题,我们在这里使用逻辑回归,还可以使用随机森林分类器、梯度提升等分类算法。

from zenml import pipeline@training_pipelinedef training_pipeline(data_path: str):    df = ingest_data(data_path)    X_train, X_test, y_train, y_test = clean_and_split_data(df)    model = define_model()  # 定义机器学习模型    trained_model = train_model(model, X_train, y_train)    evaluation_metrics = evaluate_model(trained_model, X_test, y_test)

这里,@training_pipeline 装饰器用于将函数training_pipeline() 定义为 ZenML 中的一个 pipeline。

评估

对于二分类问题,我们使用准确率、精确度、F1分数、ROC-AUC曲线等评估指标。我们从 scikit-learn 库导入 classification_report,计算评估指标并生成分类报告。代码如下:

import loggingimport numpy as npfrom sklearn.metrics import classification_reportclass ClassificationReport:    @staticmethod    def calculate_scores(y_true: np.ndarray, y_pred: np.ndarray):        try:            logging.info("计算分类报告")            report = classification_report(y_true, y_pred, output_dict=True)            logging.info(f"分类报告:\n{report}")            return report        except Exception as e:            logging.error(f"计算分类报告时出错:{e}")            raise e

分类报告:

要查看 training_pipeline 的仪表板,需要运行 run_pipelilne.py:

from zenml import pipelinefrom pipelines.training_pipeline import train_pipelinefrom zenml.client import Clientimport pandas as pdif __name__ == "__main__":    uri = Client().active_stack.experiment_tracker.get_tracking_uri()    print(uri)    train_pipeline(data_path="./data/HR-Employee-Attrition.csv")

它将返回跟踪仪表板的 URL,形式如下:“仪表板 URL:http://127.0.0.1:8237/workspaces/default/pipelines/6e7941f4-cf74-4e30-b3e3-ff4428b823d2/runs/2274fc18-aba1-4536-aaee-9d2ed7d26323/dag”。您可以点击该 URL,在 ZenML 仪表板中查看您的训练流程。在这里,整个流程图被分成不同的图像部分以便详细查看。

总体上,在仪表盘中,training_pipeline 的显示如下:

模型部署

部署触发器

class DeploymentTriggerConfig(BaseParameters):    min_accuracy: float = 0.5

在 DeploymentTriggerConfig 类中,我们设置了一个最小准确率参数,它指定了我们的模型的最小准确率应该是多少。

设置部署触发器

@step(enable_cache=False)def deployment_trigger(    accuracy: float,    config: DeploymentTriggerConfig,):    return accuracy > config.min_accuracy

在这里,我们使用了 deployment_trigger() 函数来部署模型,只有当模型的准确率超过最小准确率时才会部署。我们将在下一节中介绍为什么这里要使用缓存。

持续部署流水线

@pipeline(enable_cache=False, settings={"docker":docker_settings})def continuous_deployment_pipeline(   data_path: str,   #data_path="C:/Users/user/Desktop/machine learning/Project/zenml Pipeline/Customer_Satisfaction_project/data/olist_customers_dataset.csv",   min_accuracy:float=0.0,   workers: int=1,   timeout: int=DEFAULT_SERVICE_START_STOP_TIMEOUT,):       df=ingest_data()   # Clean the data and split into training/test sets   X_train,X_test,Y_train,Y_test=clean_df(df)   model=train_model(X_train,X_test,Y_train,Y_test)   evaluation_metrics=evaluate_model(model,X_test,Y_test)   deployment_decision=deployment_trigger(evaluation_metrics)       mlflow_model_deployer_step(      model=model,      deploy_decision=deployment_decision,      workers=workers,      timeout=timeout,    )

在这个 continuous_deployment_pipeline() 函数中,我们将导入数据,清洗数据,训练模型,评估模型,并且只有在满足 deployment_trigger() 条件时才部署模型,以确保我们要部署的新模型的预测准确率超过先前模型的预测准确率,也就是阈值。这就是 continous_deployment_pipeline() 的工作原理。

缓存是指将先前执行的步骤的输出存储在流水线中。输出存储在工件存储中。我们在流水线参数中使用缓存来说明过去运行的输出和当前运行的步骤之间没有变化,这样 zenML 就会重用先前运行的输出。启用缓存将加快流水线运行过程并节省我们的计算资源。但是在某些情况下,比如我们的 continuous_deployment_pipeline() 中输入、参数、输出会有动态变化的情况下,关闭缓存是明智的选择。因此,我们在这里写下了 enable_cache=False。

推理流水线

我们使用推理流水线在部署的模型上对新数据进行预测。让我们看看在项目中如何使用这个流水线。

inference_pipeline()

@pipeline(enable_cache=False,settings={"docker":docker_settings})def inference_pipeline(pipeline_name: str, pipeline_step_name:str):   data=dynamic_importer()   #print("Data Shape for Inference:", data.shape)  # Print the shape of data for inference   service=prediction_service_loader(      pipeline_name=pipeline_name,      pipeline_step_name=pipeline_step_name,      running=False,      )   prediction=predictor(service=service,data=data)   return prediction  

这里的inference_pipeline()函数按照以下顺序工作:

  • dynamic_importer() – 首先,dynamic_importer()函数加载新数据并进行准备工作。
  • prediction_service_loader() – prediction_service_loader()函数根据参数指定的pipeline名称和步骤名称加载已部署的模型。
  • predictor() – 然后,predictor()函数用于基于已部署的模型预测新数据。

让我们来看看这些函数的详细信息:

dynamic_importer()

@step(enable_cache=False)def dynamic_importer() -> str:   data = get_data_for_test()   return data

这里调用了utils.py中的get_data_for_test()函数,用于加载新数据,进行数据处理并返回数据。

prediction_service_loader()

@step(enable_cache=False)def prediction_service_loader(   pipeline_name: str,   pipeline_step_name: str,   running: bool = True,   model_name: str = "model", ) -> MLFlowDeploymentService:   mlflow_model_deployer_component = MLFlowModelDeployer.get_active_model_deployer()   existing_services = mlflow_model_deployer_component.find_model_server(   pipeline_name=pipeline_name,   pipeline_step_name=pipeline_step_name,   model_name=model_name,   running=running,)   if not existing_services:      raise RuntimeError(         f"No MLFlow deployment service found for pipeline {pipeline_name}, step {pipeline_step_name}, and model {model_name}, and pipeline for the model {model_name} is currently running"      )

在这个prediction_service_loader()函数中,根据参数加载与已部署模型相关的部署服务。部署服务是一个运行时环境,其中我们的部署模型已准备好接受推断请求,对新数据进行预测。一行代码existing_services = mlflow_model_deployer_component.find_model_server()根据给定的参数(如pipeline名称和pipeline步骤名称)搜索是否存在任何已部署的模型服务。如果没有现有的服务可用,则意味着部署流水线尚未执行或部署流水线存在问题,因此会抛出一个Runtime Error。

predictor()

@stepdef predictor(    service: MLFlowDeploymentService,    data: str,) -> np.ndarray:        """对预测服务运行推断请求"""    service.start(timeout=21)  # 如果已经启动,应该是一个NOP    data = json.loads(data)    data.pop("columns")    data.pop("index")    columns_for_df = [        0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,"Age","DailyRate","DistanceFromHome","Education","EnvironmentSatisfaction","HourlyRate","JobInvolvement","JobLevel","JobSatisfaction","MonthlyIncome","MonthlyRate","NumCompaniesWorked","Over18","OverTime","PercentSalaryHike","PerformanceRating","RelationshipSatisfaction","StockOptionLevel","TotalWorkingYears","TrainingTimesLastYear","WorkLifeBalance","YearsAtCompany","YearsInCurrentRole","YearsSinceLastPromotion","YearsWithCurrManager",    ]    df = pd.DataFrame(data["data"], columns=columns_for_df)    json_list = json.loads(json.dumps(list(df.T.to_dict().values())))    data = np.array(json_list)    prediction = service.predict(data)    return prediction

在拥有已部署模型和新数据之后,我们可以使用predictor()函数进行预测。

要以可视化方式查看持续部署和推断流水线,需要运行run_deployment.py,其中定义了部署和预测的配置。

@click.option(    "--config",    type=click.Choice([DEPLOY, PREDICT, DEPLOY_AND_PREDICT]),    default=DEPLOY_AND_PREDICT,    help="Optionally you can choose to only run the deployment "    "pipeline to train and deploy a model (`deploy`), or to "    "only run a prediction against the deployed model "    "(`predict`). By default both will be run "    "(`deploy_and_predict`).",)

这里,我们可以通过以下命令运行连续部署流水线或推理流水线:

# 运行连续部署流水线python run_deployment.py# 查看推理流水线(即部署和预测)python run_deployment.py --config predict

执行完命令后,你可以看到 zenML 仪表板的网址,像这样:

仪表板网址:http://127.0.0.1:8237/workspaces/default/pipelines/b437cf1a-971c-4a23-a3b6-c296c1cdf8ca/runs/58826e07-6139-453d-88f9-b3c771bb6695/dag

在仪表板中享受流水线可视化:

连续部署流水线

连续部署流水线(从数据摄取到 mlflow_model_deployer_step),看起来像这样:

推理流水线

构建 Streamlit 应用

Streamlit 是一个令人惊叹的开源 Python 框架,用于快速创建用户界面,我们可以使用 Streamlit 快速构建 Web 应用程序,无需了解后端或前端开发。首先,我们需要在电脑上安装 Streamlit。在本地系统中安装和运行 Streamlit 服务器的命令如下:

# 在本地计算机上安装 Streamlitpip install streamlit# 运行 Streamlit 本地 Web 服务器streamlit run streamlit_app.py

代码:

import jsonimport numpy as npimport pandas as pdimport streamlit as stfrom PIL import Imagefrom pipelines.deployment_pipeline import prediction_service_loaderfrom run_deployment import main# 定义一个全局变量来跟踪服务状态service_started = Falsedef start_service():    global service_started    service = prediction_service_loader(        pipeline_name="continuous_deployment_pipeline",        pipeline_step_name="mlflow_model_deployer_step",        running=False,    )    service.start(timeout=21)  # 启动服务    service_started = True    return servicedef stop_service(service):    global service_started    service.stop()  # 停止服务    service_started = Falsedef main():    st.title("员工流失预测")    age = st.sidebar.slider("年龄", 18, 65, 30)    monthly_income = st.sidebar.slider("月收入", 0, 20000, 5000)    total_working_years = st.sidebar.slider("总工龄", 0, 40, 10)    years_in_current_role = st.sidebar.slider("在现任职位工作年数", 0, 20, 5)    years_since_last_promotion = st.sidebar.slider("距上次晋升年数", 0, 15, 2)    if st.button("预测"):        global service_started        if not service_started:            service = start_service()        input_data = {            "Age": [age],            "MonthlyIncome": [monthly_income],            "TotalWorkingYears": [total_working_years],            "YearsInCurrentRole": [years_in_current_role],            "YearsSinceLastPromotion": [years_since_last_promotion],        }        df = pd.DataFrame(input_data)        json_list = json.loads(json.dumps(list(df.T.to_dict().values())))        data = np.array(json_list)        pred = service.predict(data)        st.success(            "预测的员工流失概率(0 - 1):{:.2f}".format(                pred[0]            )        )        # 预测后停止服务        if service_started:            stop_service(service)if __name__ == "__main__":    main()

在这里,我们创建了一个名为“员工流失预测”的Streamlit网络应用程序,用户可以输入年龄、月收入等指标进行预测。当用户点击“预测”按钮时,输入数据将被发送到部署的模型,预测结果将显示给用户。这就是我们的Streamlit应用程序的工作原理。当我们运行streamlit_app.py文件时,我们会得到类似这样的网络URL:

通过点击网络URL,我们可以看到令人惊叹的Streamlit UI,用于进行预测。

您可以在ZenML仪表板中查看所有的堆栈、组件使用情况以及运行的流水线数量,从而使您的MLOps之旅变得简单。

ZenML仪表板:

堆栈:

组件:

流水线数量:

运行次数:

结论

我们成功构建了一个端到端员工流失率预测MLOps项目。我们对数据进行了摄取和清洗,训练模型,评估模型,触发部署,部署模型,通过获取新数据预测模型,搜索现有模型服务(如存在),从Streamlit Web应用程序获取用户输入并进行预测,从而帮助人力资源部门做出数据驱动的决策。

GitHub代码: https://github.com/VishalKumar-S/Employee-attrition-rate-MLOps-Project

要点

  • ZenML作为一个强大的编排工具,与其他机器学习工具进行集成。
  • 持续部署流水线确保只有最好的模型被部署,有助于实现高精度的预测。
  • 缓存帮助我们节省资源,日志记录帮助我们跟踪流水线,进行调试和错误跟踪。
  • 仪表板帮助我们清晰地了解机器学习流水线的工作流程。

常见问题

本文章中显示的媒体不归Analytics Vidhya所有,仅由作者决定使用。