隨著機器學習專案日益複雜,有效管理實驗、模型和佈署流程變得至關重要。MLflow 提供了一個開源平台,能有效解決這些挑戰。本文以 PySpark 環境下的機器學習專案為例,示範如何使用 MLflow 追蹤實驗、管理模型版本,並簡化佈署流程。透過整合 MLflow,資料科學家可以更專注於模型開發,而不必費心於繁瑣的管理工作。此方法能提升團隊協作效率,並確保機器學習專案的可重複性和可維護性,對於建構穩健的機器學習應用至關重要。
機器學習流程與自動化管線
將模型投入生產是資料科學領域中最具挑戰性的任務之一。這是許多組織持續面臨的「最後一哩」問題。儘管有許多工具可用於管理工作流程,但隨著組織的成熟,其需求會發生變化,管理現有模型可能會變成一項艱鉅的任務。
為何難以管理模型?
當你退一步分析為何如此具有挑戰性時,你會發現這是由於大多陣列織中存在的結構所致。工程團隊維護生產平台,而資料科學工具集與生產平台之間存在著差距。一些資料科學工作可以在 Jupyter Notebook 中開發,對雲端環境的考慮甚少。一些資料流程在本地建立,擴充套件性有限。這類別應用程式往往難以處理大量資料。
軟體開發週期與機器學習生命週期的差異
軟體開發週期中的最佳實踐並不適用於機器學習生命週期,因為涉及的任務種類別繁多。標準主要由組織中的資料科學團隊定義。此外,該領域的快速發展使得模型的管理和佈署存在空白。
本章旨在探討可用於管理和佈署資料科學管線的工具。我們將介紹業界中一些可用的工具,並透過範例演示如何構建自動化管線。我們將使用到目前為止所學的所有概念來構建這些管線。在本章結束時,您將學習如何構建機器學習區塊並將其與自定義資料預處理自動化協同工作。
本章涵蓋以下主題:
- MLflow:介紹 MLflow 及其元件和優勢,並進行演示。在前半部分,我們重點介紹安裝和概念介紹。在後半部分,我們重點介紹在 MLflow 中實作模型。
- 自動化機器學習管線:介紹在 PySpark 中設計和實作自動化 ML 框架的概念。前半部分著重於實作,後半部分則著重於從管線產生的輸出。
MLflow 簡介
透過前幾章的學習,你已經知道如何構建個別的監督式和非監督式模型。通常,你會反覆運算多個模型以選擇最佳模型。你如何追蹤所有實驗和使用的超引數?你如何在多次實驗後重現特定的結果?你如何在存在多種佈署工具和環境的情況下將模型投入生產?是否有更好的方法來管理工作流程並與更廣泛的社群分享工作?這些是你在使用 PySpark 構建模型時會遇到的問題。
Databricks 開發了一款名為 MLflow 的工具,可以優雅地處理上述問題,同時管理工作流程。MLflow 設計為開放介面和開源。MLflow 可以輕鬆地處理實驗,並圍繞 REST API 構建,多種工具都可以輕鬆使用該 API。此框架還可以輕鬆新增現有的機器學習程式碼區塊。其開源性質使得跨不同團隊分享工作流程和模型變得容易。簡而言之,MLflow 提供了一個框架來管理機器學習生命週期,包括實驗、可重現性、佈署和中央模型註冊。
MLflow 的元件
MLflow 的元件如圖 8-1 所示,並在以下清單中描述。
圖 8-1:MLflow 的元件
此圖示呈現了 MLflow 的四個主要元件,分別是 Tracking、Projects、Models 和 Model Registry。
圖表翻譯: 此圖示顯示了 MLflow 的架構,包括四個主要元件:Tracking 用於記錄實驗過程、Projects 用於封裝機器學習程式碼、Models 用於佈署模型、Model Registry 用於儲存和管理模型。
- Tracking:此元件有助於記錄和紀錄程式碼、組態、資料和結果。它還提供了一種查詢實驗的機制。
- Projects:此元件有助於將機器學習程式碼封裝成易於在多個平台上複製的格式。
- Models:此元件有助於在多樣化的環境中佈署機器學習程式碼。
- Model Registry:此元件充當儲存、管理和搜尋模型的儲存函式庫。
MLflow 程式碼設定和安裝
在本文中,我們將介紹支援 MLflow 所需的程式碼變更及其安裝步驟。我們使用 PySpark Docker 版本進行所有演示。啟動和執行 MLflow 有兩個主要步驟,如下所示:
- 程式碼/指令碼變更
- Docker 端變更和 MLflow 伺服器安裝
程式碼變更範例
# 以下是一個簡單的範例,展示如何使用 MLflow 進行實驗追蹤
import mlflow
# 初始化 MLflow
mlflow.set_experiment("我的實驗")
# 開始一個新的執行
with mlflow.start_run():
# 紀錄引數
mlflow.log_param("param1", 5)
# 紀錄指標
mlflow.log_metric("metric1", 0.85)
# 紀錄模型
# model = ...
# mlflow.sklearn.log_model(model, "model")
內容解密:
此範例程式碼展示瞭如何使用 MLflow 進行實驗追蹤。首先,我們初始化 MLflow 並設定實驗名稱。接著,我們開始一個新的執行,並在執行中紀錄引數、指標和模型。這使得我們可以追蹤和管理實驗結果。
自動化機器學習管線
在本文中,我們將介紹在 PySpark 中設計和實作自動化 ML 框架的概念。我們將使用 PySpark 提供的各種工具和技術來構建自動化管線。
自動化管線範例
# 以下是一個簡單的範例,展示如何使用 PySpark 構建自動化機器學習管線
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
# 定義管線階段
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashing_tf = HashingTF(inputCol="words", outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# 構建管線
pipeline = Pipeline(stages=[tokenizer, hashing_tf, lr])
# 訓練管線
model = pipeline.fit(training_data)
內容解密:
此範例程式碼展示瞭如何使用 PySpark 構建自動化機器學習管線。我們首先定義了管線的各個階段,包括分詞、特徵提取和邏輯迴歸。接著,我們構建了管線並訓練了模型。這使得我們可以自動化機器學習流程並提高效率。
MLflow 與自動化機器學習流程的整合應用
在機器學習專案中,追蹤實驗結果和模型效能是一項重要的任務。MLflow 提供了一套完整的解決方案,能夠幫助資料科學家和機器學習工程師更有效地管理和比較不同的實驗結果。本文將介紹如何將 MLflow 整合到 PySpark 的機器學習程式碼中,以實作實驗追蹤和模型管理的自動化。
為何需要 MLflow?
在進行機器學習模型開發時,我們經常需要調整不同的超引數或輸入變數,以觀察模型效能的變化。傳統的做法是手動記錄每次實驗的結果,但這種方法在面對大量實驗時變得非常低效。MLflow 的出現正是為瞭解決這個問題,它允許我們輕鬆地記錄實驗引數、評估指標和模型本身,從而實作實驗結果的有效管理和比較。
在 PySpark 中整合 MLflow
以下是一個簡單的範例,展示如何將 MLflow 整合到一個使用 PySpark 進行隨機森林分類別的程式碼中。
# 匯入必要的函式庫
import pyspark
from pyspark.sql import SparkSession
import mlflow
import mlflow.spark
import sys
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
# 初始化 SparkSession
spark = SparkSession.builder.appName("mlflow_example").getOrCreate()
# 載入資料集
filename = "bank-full.csv"
target_variable_name = "y"
df = spark.read.csv(filename, header=True, inferSchema=True, sep=';')
# 資料預處理
df = df.withColumn('label', F.when(F.col("y") == 'yes', 1).otherwise(0))
df = df.drop('y')
train, test = df.randomSplit([0.7, 0.3], seed=12345)
# 選擇特徵欄位
features_list = ['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous']
assembler = VectorAssembler(inputCols=features_list, outputCol='features')
stages = [assembler]
pipeline = Pipeline(stages=stages)
assembleModel = pipeline.fit(df)
df = assembleModel.transform(df).select('label', 'features')
# 分割訓練和測試資料集
assembled_train_df, assembled_test_df = df.randomSplit([0.7, 0.3], seed=12345)
#### 內容解密:
此段程式碼主要進行資料預處理,包括載入資料、轉換目標變數、分割訓練和測試資料集,以及使用 `VectorAssembler` 將特徵欄位組合成一個向量欄位。
# 設定 MLflow 追蹤伺服器
mlflow.set_tracking_uri(uri="http://127.0.0.1:8080")
mlflow.set_experiment("MLflow PySpark Example")
# 從命令列引數取得超引數
maxBinsVal = float(sys.argv[1])
maxDepthVal = float(sys.argv[2])
with mlflow.start_run():
# 定義隨機森林分類別器
classifier = RandomForestClassifier(labelCol='label', featuresCol='features', maxBins=maxBinsVal, maxDepth=maxDepthVal)
stages_tree = [classifier]
pipeline_tree = Pipeline(stages=stages_tree)
# 訓練模型
RFmodel = pipeline_tree.fit(assembled_train_df)
# 評估模型
predictions = RFmodel.transform(assembled_test_df)
evaluator = BinaryClassificationEvaluator()
roc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
# 紀錄引數和評估指標到 MLflow
mlflow.log_param("maxBins", maxBinsVal)
mlflow.log_param("maxDepth", maxDepthVal)
mlflow.log_metric("ROC", roc)
# 紀錄模型到 MLflow
mlflow.spark.log_model(RFmodel, "spark-model")
#### 內容解密:
在此段程式碼中,我們使用 MLflow 的 `start_run` 方法開始一個新的實驗執行,並在其中訓練一個隨機森林分類別模型。模型的超引數 `maxBins` 和 `maxDepth` 是從命令列引數取得的,並使用 `log_param` 方法記錄到 MLflow。同時,我們使用 `BinaryClassificationEvaluator` 評估模型的 ROC 指標,並使用 `log_metric` 方法記錄到 MLflow。最後,我們使用 `log_model` 方法將訓練好的模型記錄到 MLflow。
### 使用 MLflow UI 檢視實驗結果
執行上述程式碼後,我們可以開啟 MLflow UI(通常位於 `http://127.0.0.1:8080/`)來檢視實驗結果。在 MLflow UI 中,我們可以看到不同實驗執行的詳細資訊,包括引數、評估指標和模型等。
```plantuml
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title MLflow 自動化機器學習管線建置與管理
package "機器學習流程" {
package "資料處理" {
component [資料收集] as collect
component [資料清洗] as clean
component [特徵工程] as feature
}
package "模型訓練" {
component [模型選擇] as select
component [超參數調優] as tune
component [交叉驗證] as cv
}
package "評估部署" {
component [模型評估] as eval
component [模型部署] as deploy
component [監控維護] as monitor
}
}
collect --> clean : 原始資料
clean --> feature : 乾淨資料
feature --> select : 特徵向量
select --> tune : 基礎模型
tune --> cv : 最佳參數
cv --> eval : 訓練模型
eval --> deploy : 驗證模型
deploy --> monitor : 生產模型
note right of feature
特徵工程包含:
- 特徵選擇
- 特徵轉換
- 降維處理
end note
note right of eval
評估指標:
- 準確率/召回率
- F1 Score
- AUC-ROC
end note
@enduml
圖表翻譯: 此圖表呈現了使用 MLflow 進行實驗追蹤的流程。首先,我們開始一個新的實驗並設定 MLflow 追蹤伺服器。接著,我們定義實驗內容,包括訓練模型和評估模型。最後,我們將實驗的引數、評估指標和模型記錄到 MLflow 中,以便於後續的比較和分析。
MLflow 在機器學習生命週期中的應用
MLflow 是一個開源平台,用於管理機器學習生命週期,包括實驗、可重現性和佈署。在本章中,我們將探討如何使用 MLflow 來跟蹤和比較不同的機器學習實驗,並展示如何使用 MLflow 來佈署和評分模型。
使用 MLflow 跟蹤實驗
要使用 MLflow 跟蹤實驗,我們需要安裝 MLflow 並將其整合到我們的機器學習程式碼中。以下是一個使用 Spark 和 MLflow 的例子:
spark-submit --master local chapter8_mlflow_example.py --maxBinsVal 16 --maxDepthVal 3
spark-submit --master local chapter8_mlflow_example.py --maxBinsVal 16 --maxDepthVal 5
spark-submit --master local chapter8_mlflow_example.py --maxBinsVal 32 --maxDepthVal 5
內容解密:
- 上述命令用於執行三個不同的機器學習實驗,每個實驗使用不同的超引數(
maxBinsVal和maxDepthVal)。 spark-submit命令用於提交 Spark 作業,--master local表示在本地模式下執行。chapter8_mlflow_example.py是包含機器學習程式碼的 Python 指令碼。
使用 MLflow UI 檢視實驗結果
執行完上述命令後,我們可以透過存取 http://127.0.0.1:8080 來檢視 MLflow UI。MLflow UI 提供了一個視覺化的介面,用於檢視和管理實驗結果。
圖表翻譯:
- 圖 8-2 展示了 MLflow UI 的主介面,顯示了所有實驗的列表。
- 每個實驗都有詳細的資訊,包括超引數、指標和模型。
使用 MLflow 比較實驗結果
MLflow UI 允許我們比較不同的實驗結果。透過選擇多個實驗並點選“Compare”按鈕,我們可以檢視不同實驗之間的指標比較。
圖表翻譯:
- 圖 8-3 展示了兩個實驗之間的指標比較。
- 圖表顯示了不同超引數設定下模型的效能。
使用 MLflow 佈署和評分模型
要佈署和評分模型,我們需要將模型註冊到 MLflow。註冊模型後,我們可以使用 MLflow 提供的 API 來載入模型並對新資料進行評分。
# Importing necessary libraries
import mlflow
import mlflow.spark
import sys
from pyspark.sql import SparkSession
# Load the Spark ML model from MLflow
model_uri = sys.argv[1]
model = mlflow.spark.load_model(model_uri)
# Prepare the data for scoring
spark = SparkSession.builder.appName("mlflow_predict").getOrCreate()
filename = "bank-full.csv"
target_variable_name = "y"
df = spark.read.csv(filename, header=True, inferSchema=True, sep=';')
# Assemble individual columns to one column - 'features'
def assemble_vectors(df, features_list, target_variable_name):
stages = []
assembler = VectorAssembler(inputCols=features_list, outputCol='features')
stages = [assembler]
selectedCols = [target_variable_name, 'features']
pipeline = Pipeline(stages=stages)
assembleModel = pipeline.fit(df)
df = assembleModel.transform(df).select(selectedCols)
return df
# Apply the function on our dataframe
features_list = df.columns
features_list.remove('label')
assembled_test_df = assemble_vectors(test, features_list, 'label')
# Make predictions on the test data
predictions = model.transform(assembled_test_df)
內容解密:
- 上述程式碼用於載入 MLflow 中註冊的模型並對新資料進行評分。
mlflow.spark.load_model用於載入 Spark ML 模型。assemble_vectors函式用於將多個列組合成一個“features”列。model.transform用於對測試資料進行預測。