在航空运营中使用Evidently和Streamlit监控数据和模型生产

运用 Evidently 和 Streamlit 监控数据和模型生产在航空运营中

介绍

您是否曾经遇到过训练和评估中表现良好的模型在生产环境中表现较差的挫折?在生产阶段面临这个常见挑战时,Evidently.ai,一款出色的开源工具就派上了用场,它能使我们的机器学习模型变得可观察和易于监控。本指南将涵盖生产环境中数据和模型性能变化的原因以及实施所需的必要操作。我们还将学习如何将这个工具与Streamlit预测应用程序集成。让我们开始我们非凡的旅程。

本文作为数据科学博文马拉松的一部分发布。

必要条件

1) 克隆代码库

git clone "https://github.com/VishalKumar-S/Flight-Delay-Prediction-and-live-Monitoring-with-Azure-Evidently-and-Streamlit-with-MVC-Architecture.git"

2) 创建并激活虚拟环境

# 创建一个虚拟环境python3 -m venv venv# 在您的项目文件夹中激活虚拟环境source venv/bin/activate

# 此命令安装requirements.txt文件中列出的Python包pip install -r requirements.txt

4) 安装Streamlit和Evidently

pip install streamlitpip install evidently

项目结构:

project_root/│├── assets/│├── data/│   └── Monitoring_data.csv│├── models/│   ├── best_model.pkl│   ├── lightgml_model.pkl│   ├── linear_regression_model.pkl│   ├── random_forest_model.pkl│   ├── ridge_regression.pkl│   ├── svm_model.pkl│   └── xgboost_model.pkl│├── notebooks/│   └── EDA.ipynb│├── src/│   ├── controller/│   │   └── flight_delay_controller.py│   ││   ├── model/│   │   └── flight_delay_model.py│   ││   ├── view/│   │   └── flight_delay_view.py│   ││   ├── data_preprocessing.py│   ├── model_evaluation.py│   └── modeling.py│├── .gitignore├── Dockerfile├── LICENSE├── Readme.md├── app.py└── requirements.txt

数据预处理

从云端获取

在此项目中,我们将从Azure获取数据集。首先,在Azure中创建存储容器,然后创建blob存储,然后将原始数据集上传到那里,使其公开访问,并将其用于未来的数据预处理步骤。

数据集链接: https://www.kaggle.com/datasets/giovamata/airlinedelaycauses

以下是从云端获取数据的代码片段,

class DataPreprocessorTemplate:    """    数据预处理的模板方法模式,可定制步骤。    """    def __init__(self, data_url):        """        初始化DataPreprocessor模板,包括带有SAS令牌的数据URL。        Args:            data_url (str): 带有SAS令牌的数据URL。        """        self.data_url = data_url    def fetch_data(self):        """        从Azure Blob Storage获取数据。        Returns:            pd.DataFrame: 作为Pandas DataFrame的获取数据集。        """        try:            # 使用提供的URL获取数据集            print("正在从云端获取数据...")            data = pd.read_csv(self.data_url)            return data        except Exception as e:            raise Exception("在数据检索过程中发生错误: " + str(e))def main():  # 包括SAS令牌在内的数据URL  data_url = "https://flightdelay.blob.core.windows.net/flight-delayed-dataset/DelayedFlights.csv"  output_path = "../data/cleaned_flight_delays.csv"  data_preprocessor = DataPreprocessorTemplate(data_url)  data = data_preprocessor.fetch_data()  cleaned_data=data_preprocessor.clean_data(data)  data_preprocessor.save_cleaned_data(cleaned_data, output_path)    

蔚蓝色图像:

数据清洗和转换

在数据世界中,数据转换将原始数据变成了精确的数据。在这里,我们将进行数据预处理的所有步骤,例如去除不需要的特征、填补缺失值、对类别列进行编码、以及去除异常值。在这里,我们已经使用了虚拟编码。然后,我们将使用Z-score检验去除异常值。

数据清洗代码段:

def clean_data(self,df):        """        清洗和预处理输入数据。        Args:            df (pd.DataFrame): 输入数据集。        Returns:            pd.DataFrame: 清洗和预处理后的数据集。        """        print("清洗数据...")        df=self.remove_features(df)        df=self.impute_missing_values(df)        df=self.encode_categorical_features(df)        df=self.remove_outliers(df)        return df    def remove_features(self,df):        """        从数据集中删除不必要的列。        Args:            df (pd.DataFrame): 输入数据集。        Returns:            pd.DataFrame: 删除不必要列后的数据集。        """        print("删除不必要的列...")        df=df.drop(['Unnamed: 0','Year','CancellationCode','TailNum','Diverted','Cancelled','ArrTime','ActualElapsedTime'],axis=1)        return df    def impute_missing_values(self,df):        """        填补数据集中的缺失值。        Args:            df (pd.DataFrame): 输入数据集。        Returns:            pd.DataFrame: 填补缺失值后的数据集。        """        print("填补缺失值...")        delay_colns=['CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']        # 使用 0 填补这些列中的缺失值        df[delay_colns]=df[delay_colns].fillna(0)        # 使用中位数填补这些列中的缺失值        columns_to_impute = ['AirTime', 'ArrDelay', 'TaxiIn','CRSElapsedTime']        df[columns_to_impute]=df[columns_to_impute].fillna(df[columns_to_impute].median())        return df    def encode_categorical_features(self,df):        """        对数据集中的类别特征进行编码。        Args:            df (pd.DataFrame): 输入数据集。        Returns:            pd.DataFrame: 对类别特征进行编码后的数据集。        """        print("对类别特征进行编码...")        df=pd.get_dummies(df,columns=['UniqueCarrier', 'Origin', 'Dest'], drop_first=True)        return df    def remove_outliers(self,df):        """        从数据集中去除异常值。        Args:            df (pd.DataFrame): 输入数据集。        Returns:            pd.DataFrame: 去除异常值后的数据集。        """        print("去除异常值...")        z_threshold=3        z_scores=np.abs(stats.zscore(df[self.numerical_columns]))        outliers=np.where(z_scores>z_threshold)        df_no_outliers=df[(z_scores<=z_threshold).all(axis=1)]        print("数据清洗后的形状:", df_no_outliers.shape)        return df_no_outliers

然后,我们将保存清洗后的数据集,以供未来的模型训练和评估使用。我们将使用joblib来保存清洗后的数据集。

以下是代码片段:

def save_cleaned_data(self,cleaned_data, output_path):    """    将清洗后的数据保存为CSV文件。    Args:        cleaned_data (pd.DataFrame): 清洗后的数据集。        output_path (str): 保存清洗后数据的路径。    """    print("保存清洗后的数据...")                cleaned_data.to_csv(output_path,index=False)

训练和评估

数据清洗后,我们将把数据集分为两部分 – 训练集和测试集。然后,我们将训练多个回归模型,如线性回归、随机森林回归器、xgboost、岭回归和lightgbm模型。然后,我们将使用joblib将所有文件保存为.pkl文件,以减少时间和优化资源使用。

现在,我们可以使用训练好的模型文件进行评估和预测。然后,我们将根据评估指标,如R2得分、MAE(平均绝对误差)值和RMSE(均方根误差)值对模型进行评估。然后,保存表现最佳的模型以便部署使用。

“`

模型训练的代码片段:

# 创建机器学习模型的函数def create_model(model_name):    if model_name == "random_forest":        return RandomForestRegressor(n_estimators=50, random_state=42)    elif model_name == "linear_regression":        return LinearRegression()    elif model_name == "xgboost":        return xgb.XGBRegressor()    elif model_name == "ridge_regression":        return Ridge(alpha=1.0)  # 根据需要调整 alpha    elif model_name == "lightgbm":        return lgb.LGBMRegressor()# 加载清洗后的数据集print("开始加载模型...")cleaned_data = pd.read_csv("../data/cleaned_flight_delays.csv")print("模型加载完成")# 定义目标变量(ArrDelay)和特征(X)target_variable = "ArrDelay"X = cleaned_data.drop(columns=[target_variable])y = cleaned_data[target_variable]# 分割数据为训练集和测试集print("分割数据为训练集和测试集...")X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)print("数据分割完成。")

# 训练和保存模型的函数def train_and_save_model(model_name, X_train, y_train):    model = create_model(model_name)    print(f"正在训练 {model_name} 模型...")    start_time = time.time()    model.fit(X_train, y_train)    print("模型训练完成...")    end_time = time.time()    elapsed_time = end_time - start_time    print(f"训练时间:{elapsed_time:.2f} 秒")    # 保存训练好的模型供以后使用    joblib.dump(model, f"../models/{model_name}_model.pkl")    print(f"{model_name} 模型已保存为 {model_name}_model.pkl")# 创建并训练 Random Forest 模型train_and_save_model("random_forest", X_train, y_train)# 训练线性回归模型train_and_save_model("linear_regression", X_train, y_train)# 创建并训练 XGBoost 模型train_and_save_model("xgboost", X_train, y_train)# 创建并训练 Ridge Regression 模型train_and_save_model("ridge_regression", X_train, y_train)# 创建并训练 LightGBM 模型train_and_save_model("lightgbm", X_train, y_train)

模型评估的代码片段

# 加载清洗后的数据集cleaned_data = pd.read_csv("../data/cleaned_flight_delays.csv")# 加载训练过的机器学习模型random_forest_model = joblib.load("../models/random_forest_model.pkl")linear_regression_model = joblib.load("../models/linear_regression_model.pkl")xgboost_model = joblib.load("../models/xgboost_model.pkl")ridge_regression_model = joblib.load("../models/ridge_regression_model.pkl")lightgbm_model = joblib.load("../models/lightgbm_model.pkl")# 定义目标变量(ArrDelay)和特征(X)target_variable = "ArrDelay"X = cleaned_data.drop(columns=[target_variable])y = cleaned_data[target_variable]# 分割数据为训练集和测试集X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)# 定义评估模型的函数def evaluate_model(model, X_test, y_test):    y_pred = model.predict(X_test)    mae = mean_absolute_error(y_test, y_pred)    mse = mean_squared_error(y_test, y_pred)    r2 = r2_score(y_test, y_pred)    return mae, mse, r2# 创建一个模型字典以进行评估models = {    "Random Forest": random_forest_model,    "Linear Regression": linear_regression_model,    "XGBoost": xgboost_model,    "Ridge Regression": ridge_regression_model,    "LightGBM": lightgbm_model,}# 评估每个模型并存储其度量指标metrics = {}for model_name, model in models.items():    mae, mse, r2 = evaluate_model(model, X_test, y_test)    metrics[model_name] = {"MAE": mae, "MSE": mse, "R2": r2}# 打印所有模型的评估指标for model_name, model_metrics in metrics.items():    print(f"{model_name} 的指标:")    print(f"平均绝对误差(MAE):{model_metrics['MAE']:.2f}")    print(f"均方误差(MSE):{model_metrics['MSE']:.2f}")    print(f"R平方得分(R2):{model_metrics['R2']:.2f}")    print()

“““

评估后,我们将选择最佳的部署模型。

保存和打印最佳模型的代码片段:

# 基于 R2 分数找到最佳模型
best_model = max(metrics, key=lambda model: metrics[model]["R2"])
# 打印结果
print(f"所有训练模型中最佳的模型是 {best_model},具有以下指标:")
print(f"平均绝对误差(MAE):{metrics[best_model]['MAE']}")
print(f"均方误差(MSE):{metrics[best_model]['MSE']}")
print(f"R2 分数:{metrics[best_model]['R2']}")
# 保存最佳模型供以后使用
joblib.dump(models[best_model], "../models/best_model.pkl")
print("最佳模型已保存为 best_model.pkl")

输出:

明显:用于数据和模型监测

在生产环境中,模型的正常运行可能会出现各种问题。以下是一些关键注意事项:

  1. 训练和服务之间的差异:当我们用于训练和实验的数据之间存在显著差异时出现。
  2. 数据质量和完整性问题:可能存在数据处理问题,如:中断的流程、基础设施问题、数据模式的更改或来自源头的任何其他数据问题。
  3. 上游模型出错:在生产环境中,模型通常形成一个链,其中一个模型的输入依赖于另一个模型的输出。因此,一个模型输出中的任何问题都会影响整体模型的预测。

4. 渐进的概念漂移:随着时间的推移,我们正在处理的概念可能会发生变化(或)目标变量发生变化,因此需要进行监测。还可能存在意外情况,监测至关重要,我们的模型预测可能会出错。例如,任何灾难/自然灾害。

为了评估数据和模型的质量,我们将考虑两个数据集-参考数据集和当前数据集。

参考数据集:这个数据集是当前数据集质量指标的基准。虽然 Evidently 选择了默认的阈值值,但基于我们的参考数据集,我们也可以根据我们的特定需求提供自定义指标阈值。

当前数据集:它代表用于评估的实时未知数据。

我们将使用这两个数据集计算数据漂移、目标漂移、数据质量和模型性能报告。

报告将包括数据漂移、目标漂移、数据质量和模型性能指标。通常,监测在批处理中按特定间隔进行。

在重新训练模型之前需要考虑的事项:

1. 检查数据漂移:如果检测到数据漂移,建议首先检查数据质量,然后检查任何影响漂移的外部因素,如流行病和自然灾害。

2. 评估模型性能:在解决数据漂移问题后考虑模型性能。如果数据和模型报告显示漂移,重新训练模型可能是个好主意。在做出这个决定之前,考虑第三点。

3. 重新训练考虑事项:重新训练并不总是解决方案。在许多情况下,我们将没有足够的新数据来重新训练模型。在使用新数据进行训练时要谨慎,因为也有可能由于特定原因导致数据不稳定和错误。

4. 为数据漂移设置警报:在设置数据漂移警报之前,分析该特定数据/特征对预测的重要性。并非所有数据漂移都重要,需要采取行动。始终分析每个特征对预测的影响的重要性。

“`

可以生成各种格式的报告,例如HTML、JPEG、JSON等。让我们来探索生成报告的代码片段。

模型性能报告代码片段

# 创建回归性能报告对象regression_performance_report = Report(metrics=[RegressionPreset()])# 运行回归性能报告regression_performance_report.run(    reference_data=reference,  # 用于比较的参考数据集    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],  # 用于分析的当前数据集    column_mapping=column_mapping  # 参考数据集和当前数据集之间的列映射)# 指定保存模型性能报告HTML文件的路径model_performance_report_path = reports_dir / 'model_performance.html'# 将回归性能报告保存为HTML文件regression_performance_report.save_html(model_performance_report_path)

目标漂移代码片段

# 创建目标漂移报告对象target_drift_report = Report(metrics=[TargetDriftPreset()])# 运行目标漂移报告target_drift_report.run(    reference_data=reference,  # 用于比较的参考数据集    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],  # 用于分析的当前数据集    column_mapping=column_mapping  # 参考数据集和当前数据集之间的列映射)# 指定保存目标漂移报告HTML文件的路径target_drift_report_path = reports_dir / 'target_drift.html'# 将目标漂移报告保存为HTML文件target_drift_report.save_html(target_drift_report_path)

数据漂移

# 创建列映射对象column_mapping = ColumnMapping()column_mapping.numerical_features = numerical_features  # 定义映射的数值特征# 创建数据漂移报告对象data_drift_report = Report(metrics=[DataDriftPreset()])# 运行数据漂移报告data_drift_report.run(    reference_data=reference,  # 用于比较的参考数据集    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],  # 用于分析的当前数据集    column_mapping=column_mapping  # 参考数据集和当前数据集之间的数值特征映射)# 指定保存数据漂移报告HTML文件的路径data_drift_report_path = reports_dir / 'data_drift.html'# 将数据漂移报告保存为HTML文件data_drift_report.save_html(data_drift_report_path)column_mapping = ColumnMapping()column_mapping.numerical_features = numerical_featuresdata_drift_report = Report(metrics=[DataDriftPreset()])data_drift_report.run(    reference_data=reference,    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],    column_mapping=column_mapping)data_drift_report_path = reports_dir / 'data_drift.html'data_drift_report.save_html(data_drift_report_path)

数据质量代码片段

# 创建列映射对象column_mapping = ColumnMapping()column_mapping.numerical_features = numerical_features  # 定义映射的数值特征# 创建数据质量报告对象data_quality_report = Report(metrics=[DataQualityPreset()])# 运行数据质量报告data_quality_report.run(    reference_data=reference,  # 用于比较的参考数据集    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],  # 用于分析的当前数据集    column_mapping=column_mapping  # 参考数据集和当前数据集之间的数值特征映射)# 指定保存数据质量报告HTML文件的路径data_quality_report_path = reports_dir / 'data_quality.html'# 将数据质量报告保存为HTML文件data_quality_report.save_html(data_quality_report_path)

了解模型-视图-控制器(MVC)架构

在这部分中,让我们来学习MVC架构:

模型-视图-控制器(MVC)架构是一种在Web应用程序中使用的设计模式。它用于简化代码复杂性和提高代码可读性。让我们来看看它的组成部分。

模型:逻辑部分

模型组件代表我们应用程序的核心逻辑。它处理数据处理和机器学习模型的交互。模型脚本位于src/model/目录中。

视图:构建用户界面

视图组件负责创建用户界面。它与用户进行交互并显示信息。视图脚本位于src/view/目录中。

控制器:协调桥梁

控制器组件充当模型和视图组件之间的中介桥梁。它处理用户输入和请求。控制器脚本位于src/controller/目录中。

MVC图表:

以下是MVC架构的可视化表示:

让我们看一下MVC架构中预测延迟的代码片段:

1)模型(flight_delay_model.py):

# 导入必要的库和模块import pandas as pdimport joblibimport xgboost as xgbfrom typing import Dictclass FlightDelayModel:    def __init__(self, model_file="models/best_model.pkl"):        """        初始化FlightDelayModel。        参数:            model_file(str):训练模型文件的路径。        """        # 从提供的文件中加载预训练模型        self.model = joblib.load(model_file)    def predict_delay(self, input_data: Dict):        """        根据输入数据预测航班延误。        参数:            input_data(Dict):包含预测输入特征的字典。        返回:            float:预测的航班延误。        """        # 将输入数据字典转换为DataFrame进行预测        input_df = pd.DataFrame([input_data])        # 使用预训练模型进行预测        prediction = self.model.predict(input_df)        return prediction

2)视图(flight_delay_view.py):

import streamlit as stfrom typing import Dictclass FlightDelayView:    def display_input_form(self):        """        在Streamlit侧边栏中显示输入表单供用户输入数据。        """        st.sidebar.write("输入数值:")        # 创建输入表单元素的编码部分将在此处进行。    def display_selected_inputs(self, selected_data: Dict):        """        根据用户输入显示所选的输入值。        参数:            selected_data(Dict):包含所选输入值的字典。        返回:            Dict:具有所选值的同一字典以供参考。        """        # 显示所选的输入值的编码部分将在此处进行        # 显示所选的输入值,如滑块、数字输入和选择框        return selected_data    def display_predicted_delay(self, flight_delay):        """        向用户显示预测的航班延误。        参数:            flight_delay:预测的航班延误值。        """        # 显示预测延迟的编码部分将在此处进行        # 向用户显示预测的航班延误值

3)控制器(flight_delay_controller.py):

import streamlit as stfrom src.view.flight_delay_view import FlightDelayViewfrom src.model.flight_delay_model import FlightDelayModelfrom typing import Dictclass FlightDelayController:    def __init__(self):        self.model = FlightDelayModel()        self.view = FlightDelayView()        self.selected_data = self.model.selected_data()    def run_prediction(self):        # 显示输入表单,收集用户输入并显示所选的输入        self.view.display_input_form()        input_data = self.get_user_inputs()        self.view.display_selected_inputs(input_data)        if st.button("预测航班延误"):            # 当点击预测按钮时,预测航班延误并显示结果            flight_delay = self.model.predict_delay(input_data)            self.view.display_predicted_delay(flight_delay)    def get_user_inputs(self):        # 从Streamlit侧边栏收集用户输入的编码部分将在此处进行。        user_inputs = {}        # 创建一个空字典来存储用户输入        # 您可以使用Streamlit的侧边栏小部件来收集用户输入,例如 st.sidebar.slider、st.sidebar.selectbox 等        # 在这里,您可以添加代码来收集用户输入并填充'user_inputs'字典。        # 示例:        # user_inputs['selected_feature'] = st.sidebar.slider("选择特征", min_value, max_value, default_value)        # 对于要收集的每个用户输入,重复此步骤。        return user_inputs        # 返回包含用户输入的字典

将预测和监控与Streamlit集成

在这里,我们将将Evidently监控与Streamlit预测集成在一起,以允许用户进行预测和监控数据和模型。

这种方法允许用户通过遵循MVC架构设计模式无缝地进行预测、监控或两者。

代码

在src/flight_delay_Controller.py中实现的选择参考和当前数据集的代码片段

# 导入必要的库import streamlit as stimport pandas as pdimport timeclass FlightDelayController:    def run_monitoring(self):        # Streamlit应用标题和介绍        st.title("数据和模型监控应用")        st.write("您正在使用数据和模型监控应用。从侧边栏中选择日期和月份范围,然后点击“提交”以开始模型训练和监控。")        # 允许用户选择其优选的日期范围        new_start_month = st.sidebar.selectbox("开始月份", range(1, 12), 1)        new_end_month = st.sidebar.selectbox("结束月份", range(1, 12), 1)        new_start_day = st.sidebar.selectbox("开始日期", range(1, 32), 1)        new_end_day = st.sidebar.selectbox("结束日期", range(1, 32), 30)        # 如果用户点击“提交”按钮        if st.button("提交"):            st.write("获取当前批次数据中...")            # 计算获取数据所用的时间            data_start = time.time()            df = pd.read_csv("data/Monitoring_data.csv")            data_end = time.time()            time_taken = data_end - data_start            st.write(f"在 {time_taken:.2f} 秒内获取到数据")            # 根据所选日期范围筛选数据            date_range = (                (df['Month'] >= new_start_month) & (df['DayofMonth'] >= new_start_day) &                (df['Month'] <= new_end_month) & (df['DayofMonth'] <= new_end_day)            )            # 根据日期范围创建参考和当前数据集。            reference_data = df[~date_range]            current_data = df[date_range]

选择参考和当前数据集后,我们将生成数据漂移、数据质量、模型质量和目标漂移的报告。实现报告生成的代码根据MVC架构设计分为3个部分。我们来看看这3个文件中的代码。

flight_delay_Controller.py的代码片段

import streamlit as st
from src.view.flight_delay_view import FlightDelayView
from src.model.flight_delay_model import FlightDelayModel
import numpy as np
import pandas as pd
from scipy import stats
import time

# Controllerclass 
FlightDelayController:
    """    
    Flight Delay Prediction App的控制器组件。    
    这个类协调模型和视图组件之间的交互。
    """    
    def __init__(self):
        # 通过创建模型和视图的实例来初始化控制器。
        self.model = FlightDelayModel()
        # 创建FlightDelayModel的实例。
        self.view = FlightDelayView()
        # 创建FlightDelayView的实例。
        self.selected_data = self.model.selected_data()
        # 从模型中获取选定的数据。
        self.categorical_options = self.model.categorical_features()
        # 从模型中获取分类特征。

    def run_monitoring(self):
        # 运行监控应用的函数。
        # 设置标题和简要说明。
        st.title("数据和模型监控应用")
        st.write("您现在在数据和模型监控应用中。从侧边栏选择日期和月份范围,点击“提交”开始模型训练和监控。")
        # 使用复选框选择要生成的报告。
        st.subheader("选择要生成的报告")
        generate_model_report = st.checkbox("生成模型性能报告")
        generate_target_drift = st.checkbox("生成目标漂移报告")
        generate_data_drift = st.checkbox("生成数据漂移报告")
        generate_data_quality = st.checkbox("生成数据质量报告")
        
        if st.button("提交"):
            # 'date_range'和'df'在这里没有显示,因为所有这些在之前的代码片段中已经显示了。
            # 剔除选定日期范围之外的参考数据。
            reference_data = df[~date_range]
            # 在选定的日期范围内得到当前数据。
            current_data = df[date_range]
            
            self.view.display_monitoring(reference_data, current_data)  # 显示监控数据。
            self.model.train_model(reference_data, current_data)  # 训练模型。
            
            # 生成选定的报告并显示。
            if generate_model_report:
                st.write("### 模型性能报告")
                st.write("正在生成模型性能报告...")
                performance_report = self.model.performance_report(reference_data, current_data)
                self.view.display_report(performance_report, "模型性能报告")
                
            if generate_target_drift:
                st.write("### 目标漂移报告")
                st.write("正在生成目标漂移报告...")
                target_report = self.model.target_report(reference_data, current_data)
                self.view.display_report(target_report, "目标漂移报告")
                
            if generate_data_drift:
                st.write("### 数据漂移报告")
                st.write("正在生成数据漂移报告...")
                data_drift_report = self.model.data_drift_report(reference_data, current_data)
                self.view.display_report(data_drift_report, "数据漂移报告")
                
            if generate_data_quality:
                st.write("### 数据质量报告")
                st.write("正在生成数据质量报告...")
                data_quality_report = self.model.data_quality_report(reference_data, current_data)
                self.view.display_report(data_quality_report, "数据质量报告")

Flight_delay_model.py的代码

import pandas as pd
import joblib
import xgboost as xgb
from evidently.pipeline.column_mapping import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
from evidently.metric_preset import TargetDriftPreset
from evidently.metric_preset import DataQualityPreset
from evidently.metric_preset.regression_performance import RegressionPreset
import time
import streamlit as st
from typing import Dict

# Modelclass 
FlightDelayModel:
    """    
    Flight Delay Prediction App的模型组件。    
    这个类处理数据加载、模型加载和延误预测。
    """    
    def __init__(self, model_file="models/best_model.pkl"):
        # 初始化FlightDelayModel类
        # 为数据分析定义列映射
        self.column_mapping = ColumnMapping()
        self.column_mapping.target = self.target
        self.column_mapping.prediction = 'prediction'
        self.column_mapping.numerical_features = self.numerical_features
    
    # 模型性能报告
    def performance_report(self, reference_data: pd.DataFrame, current_data: pd.DataFrame):
        """
        为模型预测生成性能报告。
        参数:
        reference_data (pd.DataFrame): 用于比较的参考数据集。
        current_data (pd.DataFrame): 包含预测结果的当前数据集。
        返回:
        Report: 包含回归性能指标的报告。
        """
        regression_performance_report = Report(metrics=[RegressionPreset()])
        regression_performance_report.run(
            reference_data=reference_data,
            current_data=current_data,
            column_mapping=self.column_mapping
        )
        return regression_performance_report
    
    def target_report(self, reference_data: pd.DataFrame, current_data: pd.DataFrame):
        """
        生成目标漂移分析报告。
        参数:
        reference_data (pd.DataFrame): 用于比较的参考数据集。
        current_data (pd.DataFrame): 包含预测结果的当前数据集。
        返回:
        Report: 包含目标漂移指标的报告。
        """
        target_drift_report = Report(metrics=[TargetDriftPreset()])
        target_drift_report.run(
            reference_data=reference_data,
            current_data=current_data,
            column_mapping=self.column_mapping
        )
        return target_drift_report
    
    def data_drift_report(self, reference_data: pd.DataFrame, current_data: pd.DataFrame):
        """
        生成数据漂移分析报告。
        参数:
        reference_data (pd.DataFrame): 用于比较的参考数据集。
        current_data (pd.DataFrame): 包含预测结果的当前数据集。
        返回:
        Report: 包含数据漂移指标的报告。
        """
        data_drift_report = Report(metrics=[DataDriftPreset()])
        data_drift_report.run(
            reference_data=reference_data,
            current_data=current_data,
            column_mapping=self.column_mapping
        )
        return data_drift_report
    
    def data_quality_report(self, reference_data: pd.DataFrame, current_data: pd.DataFrame):
        """
        生成数据质量报告(注意:可能需要大约10分钟的时间)。
        参数:
        reference_data (pd.DataFrame): 用于比较的参考数据集。
        current_data (pd.DataFrame): 包含预测结果的当前数据集。
        返回:
        Report: 包含数据质量指标的报告。
        """
        st.write("生成数据质量报告需要更长的时间,大约10分钟,因为它需要进行详细分析。如果时间不足,您可以等待或浏览其他报告。")
        data_quality_report = Report(metrics=[DataQualityPreset()])
        data_quality_report.run(
            reference_data=reference_data,
            current_data=current_data,
            column_mapping=self.column_mapping
        )
        return data_quality_report

 Flight_delay_view.py 代码

# 导入所需的库
import streamlit as st
import pandas as pd

# 为Flight Delay View定义一个类
class FlightDelayView:
    """
    Flight Delay Prediction App的视图组件。
    此类处理Streamlit web应用程序的显示。
    """
    @staticmethod
    def display_input_form():
        """
        在Streamlit应用程序上显示输入表单。
        """
        st.title("Flight Delay Prediction App")
        st.write("此应用程序预测飞行延误时间(分钟)。")
        st.sidebar.header("用户输入")

    @staticmethod
    def display_monitoring(reference_data, current_data):
        """
        显示监控信息。
        Args:
            reference_data (DataFrame): 参考数据集。
            current_data (DataFrame): 当前数据集。
        """
        st.write("请向下滚动以查看报告")
        st.write("参考数据集的形状:", reference_data.shape)
        st.write("当前数据集的形状:", current_data.shape)
        # 模型训练信息
        st.write("### 模型正在训练中...")

    @staticmethod
    def display_selected_inputs(selected_data):
        """
        显示用户选择的输入。
        Args:
            selected_data (dict): 用户提供的输入数据。
        """
        input_data = pd.DataFrame([selected_data])
        st.write("已选输入:")
        st.write(input_data)
        return input_data

    @staticmethod
    def display_predicted_delay(flight_delay):
        """
        显示预测的飞行延误时间。
        Args:
            flight_delay (float): 预测的飞行延误时间(分钟)。
        """
        st.write("预测的飞行延误时间(分钟):", round(flight_delay[0], 2))

    @staticmethod
    def display_report(report, report_name: str):
        """
        显示由Evidently生成的报告。
        Args:
            report (Report): 要显示的Evidently报告。
            report_name (str): 报告名称(例如“模型性能报告”)。
        """
        st.write(f"{report_name}")
        # 以HTML格式显示Evidently报告,并启用滚动
        st.components.v1.html(report.get_html(), height=1000, scrolling=True)

现在,最终的Streamlit应用程序已就绪,我们已经集成了预测和监控。

app.py

import streamlit as st
import pandas as pd
from src.controller.flight_delay_controller import FlightDelayController

def main():
    st.set_page_config(page_title="Flight Delay Prediction App", layout="wide")
    # 初始化控制器
    controller = FlightDelayController()
    # 创建Streamlit应用程序
    st.sidebar.title("Flight Delay Prediction and Data & Model Monitoring App")
    choice = st.sidebar.radio("选择一个选项:", ("进行预测", "监控数据和模型"))
    if choice == "进行预测":
        controller.run_prediction()
    elif choice == "监控数据和模型":
        controller.run_monitoring()

if __name__ == '__main__':
    main()

要运行Streamlit应用程序,请执行以下命令

# 要运行应用程序,请执行以下命令:
streamlit run app.py

请在您的Web浏览器中访问http://localhost:8501以使用该应用

预测仪表盘:

监控仪表盘:

使用Evidently报告解锁见解

数据漂移报告:

在这里,我们可以看到我们项目的数据漂移报告。我们可以看到这里有5列数据发生了漂移。因此,下一步是与领域专家一起分析导致这些特征漂移的潜在原因。

目标漂移:

在这里,我们可以看到目标漂移;这里没有检测到漂移。

模型性能报告

在分析新的批处理数据集之后,我们可以看到我们模型的所有性能指标。我们可以根据这些指标做出所需的决策。

数据质量报告

通常,我们使用数据质量报告来处理原始和未处理的数据。我们可以看到所有与数据相关的指标,如列数、相关性、重复值等。我们也可以将其用于进行探索性数据分析。

部署的Docker集成:

理解Docker及其重要性:

将我们的项目进行容器化,以便在任何环境下都能运行而不出现依赖问题是非常重要的。Docker是打包和分发应用程序的基本工具。以下是设置和使用Docker进行此项目的步骤。

Dockerfile:

我们项目目录中的Dockerfile包含构建Docker映像的指令。使用正确的语法编写Dockerfile。

项目结构Docker文件:

编写高效的Dockerfile

将Docker映像的大小尽量减小是非常重要的。我们可以使用多阶段构建等技术来减小映像大小,并为Python使用非常轻量的基础映像。

代码:

# 使用官方的Python运行时作为父映像FROM python:3.9-slim# 设置工作目录为/appWORKDIR /app# 创建必要的目录RUN mkdir -p /app/data /app/models /app/src/controller /app/src/model /app/src/view# 将文件从主机复制到容器中COPY app.py /app/COPY src/controller/flight_delay_controller.py /app/src/controller/COPY src/model/flight_delay_model.py /app/src/model/COPY src/view/flight_delay_view.py /app/src/view/COPY requirements.txt /app/# 创建并激活虚拟环境RUN python -m venv venvRUN /bin/bash -c "source venv/bin/activate"# 根据 requirements.txt 安装所需的包RUN pip install -r requirements.txt# 安装wgetRUN apt-get update && apt-get install -y wget# 使用wget下载数据集文件RUN wget -O /app/data/DelayedFlights.csv https://flightdelay.blob.core.windows.net/flight-delayed-dataset/DelayedFlights.csv# 使用wget下载最佳模型文件RUN wget -O /app/models/best_model.pkl https://flightdelay.blob.core.windows.net/flight-delayed-dataset/best_model.pkl# 将端口号8501暴露给容器外部EXPOSE 8501# 定义默认命令来运行Streamlit应用程序CMD ["streamlit", "run", "app.py"]

构建和运行Docker容器

编写Dockerfile之后,按照以下步骤在终端中构建Docker映像并运行容器。

docker build -t flight-delay-prediction .docker run -p 8501:8501 flight-delay-prediction

使用Streamlit Sharing共享我们的Streamlit应用程序:

在构建好我们的Streamlit应用程序后以与他人共享,我们可以使用Streamlit Sharing来免费托管我们的应用程序。以下是所需的步骤:

1)组织项目:确保在根目录中有一个app.py文件。

2)定义依赖:创建一个requirements.txt文件,以便Streamlit共享安装此文件中提到的所有必要包。

3)使用GitHub:将存储库推送到GitHub,以与Streamlit共享无缝集成。

然后,注册Streamlit分享,在其中粘贴您的GitHub仓库链接,然后点击“部署”,现在您的项目URL将被生成。您可以与他人共享它。

结论

这个指南教会了我们如何整合像Streamlit、Azure、Docker和Evidently这样的工具来构建出色的数据驱动应用程序。随着我们结束这个指南,您将拥有足够的知识来使用Streamlit构建Web应用程序、通过Docker构建可移植应用程序,并通过Evidently确保数据和模型的质量。然而,这个指南讲述的是阅读和运用这些知识在您即将进行的数据科学项目中。尝试实验、创新和探索更多可以完成的增强功能。感谢您一直陪伴指南至结束。保持学习、持续行动和不断成长!

主要观点

  • 将我们的ML项目进行Docker化至关重要,以便在任何环境中运行而无需任何依赖问题。
  • 为了减小Docker镜像大小,考虑使用多阶段构建。
  • 在代码中实现MVC架构可以减少复杂的Web应用程序的复杂性,并提高代码的可读性。
  • 整合帮助我们在生产环境中监控数据和模型的质量。
  • 在分析所有数据和模型报告之后,必须采取适当的行动。

常见问题

GitHub仓库: https://github.com/VishalKumar-S/Flight-Delay-Prediction-and-live-Monitoring-with-Azure-Evidently-and-Streamlit-with-MVC-Architecture