隨著資料科學的發展,自動化機器學習流程變得越來越重要。本文將探討如何利用 PySpark 建立一個自動化機器學習流程,並以客戶流失預測案例示範從資料處理到模型佈署的完整步驟。此流程涵蓋了資料清洗、特徵工程、模型訓練、效能評估以及使用 Docker 容器化佈署和建構即時評分 API 等關鍵環節。透過 PySpark 的分散式運算能力,可以有效處理大規模資料集,並提升模型訓練效率。在模型評估階段,我們將使用多種指標,例如 KS 值、ROC 曲線和準確率,來評估模型的效能。最後,我們將使用 Docker 將模型封裝成可移植的容器,並透過 Flask 框架建立即時評分 API,以便在實際應用中提供即時預測服務。

自動化機器學習流程與實務應用

在機器學習的生命週期中,引數調校、輸入調整和資料處理流程的迭代可能變得非常繁瑣。建立自動化的流程可以節省大量的時間。本章將結合前幾章所學的資料處理、演算法和模型技術,建立一個自動化的 PySpark 流程,用於生成基準模型以進行快速實驗。

資料集與目標

本實驗使用來自 Kaggle 的客戶流失資料集(https://www.kaggle.com/shrutimechlearn/churn-modelling#Churn_Modelling.csv)。該銀行資料集包含了客戶屬性和客戶是否流失的資訊。目標是根據給定的屬性識別可能流失的客戶。表 8-1 列出了資料集中的屬性。

表 8-1:資料集屬性說明

欄位名稱描述
RowNumber識別符號
CustomerId客戶的唯一ID
Surname客戶的姓氏
CreditScore客戶的信用評分
Geography客戶所屬國家
Gender性別(男或女)
Age客戶年齡
Tenure客戶與銀行的合作年限
Balance客戶的銀行餘額
NumOfProducts客戶使用的銀行產品數量
HasCrCard是否持有該銀行的信用卡(二元標誌)
IsActiveMember是否為活躍客戶(二元標誌)
EstimatedSalary預估的客戶年薪(美元)
Exited是否流失(二元標誌:1 表示已流失,0 表示仍為客戶)

自動化模型建立的需求與框架

在設計任何流程時,定義預期的輸出和需求是至關重要的。對於一個二元分類別問題,以下輸出是有用的:

  • KS(Kolmogorov-Smirnov 測試):衡量好壞客戶的分離程度,越高越好。
  • ROC(Receiver Operating Characteristic 曲線):比較診斷測試的方法,繪製真正率與假正率的關係圖,越高越好。
  • 準確率(Accuracy):正確預測的比例,越高越好。

對於自動化模型建立工具,以下需求是必要的:

  • 能夠自行處理缺失值和類別變數。
  • 能夠進行變數選擇。
  • 能夠測試多種演算法並選擇最佳模型。
  • 能夠根據偏好的指標比較不同演算法並選出冠軍模型和挑戰者模型。
  • 能夠根據字首、字尾或變數名稱移除特定變數。
  • 能夠收集和儲存所有輸出指標並生成報告。
  • 能夠儲存模型物件並自動生成評分程式碼以便佈署。

這些需求是根據 CRISP-DM 框架(見圖 8-9)所設計的。

圖 8-9:CRISP-DM 框架

自動化流程的步驟

本章將自動化流程分為以下幾個邏輯步驟:

  1. 資料處理:處理缺失值、類別變數編碼、資料分割等。
  2. 特徵選擇:使用隨機森林等方法選擇最重要的變數。
  3. 模型建立:測試多種演算法並選擇最佳模型。
  4. 指標計算:計算 KS、ROC、準確率等指標。
  5. 驗證與圖表生成:驗證模型並生成相關圖表。
  6. 模型選擇:根據指標選擇最佳模型。
  7. 評分程式碼生成:自動生成評分程式碼。
  8. 結果整理:收集和儲存所有輸出指標並生成報告。

以下是一個簡單的 PySpark 程式碼範例,用於展示模型的評分流程:

print("predictions.type:", type(predictions))
predictions.printSchema()
df = predictions.select('rawPrediction','probability', 'label', 'features')
df.show(5, False)

#### 內容解密:

這段程式碼展示瞭如何檢視 PySpark 中模型預測結果的型別和結構,並選取特定的欄位進行展示。首先,print("predictions.type:", type(predictions)) 用於輸出 predictions 物件的型別。接著,predictions.printSchema() 用於顯示預測結果的結構,包括各欄位的名稱和資料型別。然後,df = predictions.select('rawPrediction','probability', 'label', 'features') 選取了 rawPredictionprobabilitylabelfeatures 四個欄位,並將結果儲存在 df 資料框中。最後,df.show(5, False) 用於展示 df 資料框的前 5 行,且不截斷欄位內容。

自動化機器學習流程與輸出結果分析

在現代資料科學領域中,自動化機器學習流程已成為提升工作效率的關鍵技術。本文將探討一個完整的機器學習自動化框架,包括其各個模組的功能、實作細節以及最終生成的輸出結果。

機器學習自動化框架架構

該自動化框架由多個獨立模組組成,每個模組負責特定的任務,共同構成了完整的機器學習流程。

資料處理模組

資料處理是整個機器學習流程的基礎,主要負責資料清理、轉換和特徵工程。該模組透過 PySpark 實作了高效的資料處理能力,能夠處理大規模資料集。

特徵選擇模組

特徵選擇模組採用特定的演算法來評估不同特徵對模型預測能力的貢獻度,從而篩選出最重要的特徵子集,有效降低模型的複雜度並提升預測效能。

模型建構模組

模型建構模組實作了多種常見的機器學習演算法,包括:

  • 邏輯迴歸(Logistic Regression)
  • 隨機森林(Random Forest)
  • 梯度提升樹(Gradient Boosting)
  • 神經網路(Neural Network)

這些演算法均針對二元分類別問題進行了最佳化,能夠滿足不同的業務需求。

模型評估與驗證

模型評估是機器學習流程中的關鍵環節,該框架透過多種評估指標來全面衡量模型的效能,包括:

  • ROC 曲線下面積(AUC-ROC)
  • 準確率(Accuracy)
  • Kolmogorov-Smirnov 統計量(KS 統計量)

透過這些指標的綜合評估,可以準確判斷不同模型的預測能力。

自動化流程實作細節

環境組態與執行

該框架設計為可在 Docker 環境中執行,透過以下指令啟動:

docker run -it -p 8888:8888 -v /Users/ramcharankakarla/demo_data/:/home/jovyan/work/ jupyter/pyspark-notebook:latest bash

執行前需確保安裝必要的套件:

pip install openpyxl
pip install xlsxwriter

模型訓練與評估的執行指令如下:

spark-submit --master local[*] /home/jovyan/work/Chapter8_automator/build_and_execute_pipe.py

程式碼結構解析

框架中的各個模組均以 Python 實作,並透過 PySpark 進行資料處理和模型訓練。主要模組包括:

  1. 資料處理模組:data_manipulations.py
  2. 特徵選擇模組:feature_selection.py
  3. 模型建構模組:model_builder.py
  4. 評估指標計算模組:metrics_calculator.py
  5. 模型驗證與視覺化模組:validation_and_plots.py

#### 內容解密:

該框架的程式碼結構清晰地劃分了不同的功能模組,每個模組負責特定的任務。這種模組化的設計使得整個流程具有良好的可維護性和擴充套件性。

輸出結果分析

該自動化框架能夠生成多種有價值的輸出結果,包括:

  1. 特徵重要性分析

    • 圖表顯示不同模型對特徵重要性的評估結果
    • 有助於理解不同特徵對預測結果的貢獻度
  2. 模型效能比較

    • 表格形式呈現不同模型的評估指標(ROC、準確率、KS 統計量等)
    • 便於比較不同模型的預測效能
  3. 視覺化結果

    • ROC 曲線圖
    • 混淆矩陣圖
    • KS 統計量圖

這些輸出結果為模型評估和選擇提供了全面的依據。

佈署機器學習模型

在前一章中,我們探討了模型管理和佈署的挑戰,以及如何使用MLflow來管理實驗和佈署模型。本章將重點介紹佈署機器學習模型的最佳實踐,並使用三種不同的方法來演示生產環境中的模型佈署。

儲存模型物件和建立評分程式碼

在建立模型並使用多種指標進行驗證後,需要儲存模型物件以便未來進行評分,並建立評分程式碼。

  • 模型物件是指包含訓練過程中的資訊的PySpark或Python引數。需要識別出所有相關的模型物件,以便在不重新訓練模型的情況下重現結果。
  • 評分程式碼是指用於在新資料集上執行模型物件以產生評分的必要程式碼。評分程式碼只用於評分過程,不應包含模型訓練指令碼。

本例中,需要儲存六個物件以便未來使用:

  1. char_labels:用於對新資料進行標籤編碼。
  2. assembleModel:用於組裝向量並使資料準備好進行評分。
  3. clf_model:隨機森林分類別器模型。
  4. features_list:用於從新資料中選擇輸入特徵的列表。
  5. char_vars:字元變數。
  6. num_vars:數值變數。

評分程式碼

評分程式碼是一個Python指令碼檔案,用於執行評分操作。該指令碼檔案不應包含模型訓練指令碼。至少應執行以下任務:

  1. 匯入必要的套件。
  2. 讀取新輸入檔案以進行評分。
  3. 讀取從訓練過程中儲存的模型物件。
  4. 使用模型物件執行必要的轉換。
  5. 使用分類別器/迴歸物件計算模型評分。
  6. 將最終評分輸出到指定位置。

首先,建立一個名為helper.py的輔助指令碼檔案,包含執行評分所需的必要程式碼。然後,使用run.py檔案執行評分操作。

helper.py 範例程式碼

# helper.py
import pyspark.sql.functions as F

def load_model_objects(model_path):
    # 載入模型物件
    char_labels = ...  # 載入 char_labels
    assembleModel = ...  # 載入 assembleModel
    clf_model = ...  # 載入 clf_model
    features_list = ...  # 載入 features_list
    char_vars = ...  # 載入 char_vars
    num_vars = ...  # 載入 num_vars
    
    return char_labels, assembleModel, clf_model, features_list, char_vars, num_vars

def score_data(df, char_labels, assembleModel, clf_model, features_list, char_vars, num_vars):
    # 對資料進行評分
    df = df.transform(assembleModel)
    predictions = clf_model.transform(df)
    
    return predictions

#### 內容解密:
此段程式碼定義了兩個函式:`load_model_objects``score_data`。`load_model_objects` 用於載入儲存的模型物件`score_data` 則使用這些模型物件對輸入資料進行評分評分過程中首先使用 `assembleModel` 對資料進行轉換然後使用 `clf_model` 進行預測

run.py 範例程式碼

# run.py
from helper import load_model_objects, score_data

def main():
    # 載入模型物件
    char_labels, assembleModel, clf_model, features_list, char_vars, num_vars = load_model_objects("model_path")
    
    # 載入新資料
    new_data = ...  # 載入新資料
    
    # 對新資料進行評分
    predictions = score_data(new_data, char_labels, assembleModel, clf_model, features_list, char_vars, num_vars)
    
    # 輸出評分結果
    predictions.write.csv("final_scores", header=True)

if __name__ == "__main__":
    main()

#### 內容解密:
此段程式碼定義了一個 `main` 函式用於執行評分操作首先載入儲存的模型物件然後載入新資料並使用 `score_data` 函式進行評分最後將評分結果輸出到 CSV 檔案中

使用HDFS物件和Pickle檔案佈署模型

這是一種簡單的佈署模型的方法。當有多個模型需要評分時,可以手動提交每個run.py指令碼,或建立排程器來定期執行指令碼。

spark-submit run.py

使用Docker佈署模型

Docker佈署提供了多種好處,例如可攜性、自包含的容器、微服務架構、更快的軟體交付週期等。

Dockerfile 範例

# Dockerfile
FROM apache/spark-py:3.3.1

# 設定工作目錄
WORKDIR /app

# 複製必要檔案
COPY run.py helper.py /app/

# 安裝必要的套件
RUN pip install -r requirements.txt

# 設定環境變數
ENV SPARK_HOME=/opt/spark

# 執行評分操作
CMD ["spark-submit", "run.py"]

#### 圖表翻譯:
此Dockerfile首先從 `apache/spark-py:3.3.1` 映像檔建立一個新的 Docker 映像檔。然後,設定工作目錄並複製必要的檔案到容器中。接著,安裝必要的套件並設定環境變數。最後,定義了容器啟動時要執行的命令,即使用 `spark-submit` 提交 `run.py` 指令碼。

佈署機器學習模型:Docker 與即時評分 API 的實作

在前面的章節中,我們已經完成了模型的建立與訓練,接下來要將模型佈署到實際環境中。本章將介紹如何使用 Docker 將模型容器化,並實作即時評分 API。

Docker 容器化佈署

Docker 提供了一個輕量級的容器化技術,能夠將應用程式及其依賴封裝成一個可移植的容器。我們首先需要建立一個 Docker 映像檔(image),然後使用該映像檔建立容器並執行評分程式。

檔案結構與 Dockerfile

在專案目錄中,我們可以看到兩個新的檔案:Dockerfilerequirements.txtDockerfile 包含了建立 Docker 容器的指令,而 requirements.txt 則列出了執行程式所需的 Python 套件。

# 使用 jupyter/pyspark-notebook 作為基礎映像檔
FROM jupyter/pyspark-notebook:latest

# 設定工作目錄
WORKDIR /deploy/

# 複製 requirements.txt 檔案到容器目錄
COPY ./requirements.txt /deploy/

# 安裝 Python 依賴套件
RUN pip install -r requirements.txt

# 開放 5000 連線埠
EXPOSE 5000

# 複製必要檔案到容器目錄
COPY ./file.pkl /deploy/
COPY ./run.py /deploy/
COPY ./helper.py /deploy/
COPY ./assembleModel.h5 /deploy/assembleModel.h5
COPY ./char_label_model.h5 /deploy/char_label_model.h5
COPY ./clf_model.h5 /deploy/clf_model.h5

# 指定容器啟動時執行的指令
ENTRYPOINT ["spark-submit", "run.py"]

建立 Docker 映像檔與執行容器

使用以下指令建立 Docker 映像檔:

docker build -t scoring_image .

建立完成後,使用以下指令執行容器:

docker run -p 5000:5000 -v ${PWD}:/localuser scoring_image:latest

即時評分 API

即時評分 API 能夠讓模型在即時環境中進行評分,並將結果回傳給使用者。我們使用 Flask 框架來實作即時評分 API。

# app.py
from flask import Flask, request, jsonify
import pandas as pd
from helper import load_model, predict

app = Flask(__name__)

# 載入模型
model = load_model()

@app.route('/predict', methods=['POST'])
def predict_api():
    data = request.get_json()
    df = pd.DataFrame(data)
    predictions = predict(model, df)
    return jsonify(predictions.tolist())

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Dockerfile 更新

為了執行即時評分 API,我們需要更新 Dockerfile 以複製 app.py 檔案到容器目錄,並指定執行 app.py 的指令。

# 使用 jupyter/pyspark-notebook 作為基礎映像檔
FROM jupyter/pyspark-notebook:latest

# 設定工作目錄
WORKDIR /deploy/

# 複製 requirements.txt 檔案到容器目錄
COPY ./requirements.txt /deploy/

# 安裝 Python 依賴套件
RUN pip install -r requirements.txt

# 開放 5000 連線埠
EXPOSE 5000

# 複製必要檔案到容器目錄
COPY ./file.pkl /deploy/
COPY ./run.py /deploy/
COPY ./helper.py /deploy/
COPY ./assembleModel.h5 /deploy/assembleModel.h5
COPY ./char_label_model.h5 /deploy/char_label_model.h5
COPY ./clf_model.h5 /deploy/clf_model.h5
COPY ./app.py /deploy/

# 指定容器啟動時執行的指令
ENTRYPOINT ["python", "app.py"]

Postman API 測試

我們可以使用 Postman API 來測試即時評分 API。首先,建立一個新的 API 請求,並設定請求方法為 POST,請求網址為 http://localhost:5000/predict

設定請求標頭

在 Headers 分頁中,新增一個標頭,名稱為 Content-Type,值為 application/json

設定請求主體

在 Body 分頁中,選擇 raw,並選擇 JSON 作為文字格式。輸入測試資料,例如:

[
    {"feature1": 1, "feature2": 2},
    {"feature1": 3, "feature2": 4}
]

傳送請求

點選 Send 按鈕,傳送請求到即時評分 API。API 將回傳評分結果,例如:

[0.8, 0.9]