Databricks 提供強大的機器學習平台,允許開發者建立自動化的機器學習流程。本文示範如何建立定時執行和事件觸發的機器學習工作,並以 spaCy 自然語言處理模型為例,說明如何利用 AWS Lambda 函式實作事件觸發管線。此外,文章也介紹如何使用 MLflow 管理模型版本、佈署模型為 REST API,並透過瀏覽器、cURL 和 Python 等方式呼叫模型服務,取得預測結果並進行解析。此文涵蓋了從資料處理、模型訓練、佈署到監控的完整機器學習流程,提供開發者在 Databricks 上建構高效能機器學習應用的實用。

在Databricks上建立機器學習工作排程

在前面的章節中,我們展示了Databricks在執行機器學習模型方面的速度優勢。現在,讓我們進一步建立一個機器學習流程(Machine Learning Pipeline),並探討如何在Databricks上根據事件或排程自動觸發該流程。

建立生產環境用的筆記本(Production Pipeline Notebook)

首先,我們需要在Databricks的工作空間(Workspace)中建立一個新的筆記本,名為“nlp-demo-pipeline”。這個筆記本將包含應用spaCy模型到AG News Dataset的基本步驟。

載入必要的函式庫

# 載入函式庫
import spacy
import numpy as np
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType
from pyspark.sql.types import *

定義讀取資料的函式並載入資料

# 定義函式讀取資料
def read_data(file):
    read_path = '/content/drive/My Drive/Applied-NLP-in-the-Enterprise'
    data = pd.read_csv(read_path+file)
    return data

# 載入資料
inputPath = getArgument("inputPath", "default")
df = spark.read.format('csv').options(header='true', inferSchema='true', quote="\"", escape= "\"").load(inputPath)

#### 內容解密:

  • getArgument函式用於取得在建立工作排程時定義的引數值,如輸入路徑(inputPath)。
  • spark.read.format('csv')用於從指定的路徑讀取CSV檔案。

定義schema與取得實體的函式

# 定義schema
schema = ArrayType(StructType([
    StructField("text", StringType(), False),
    StructField("start_char", IntegerType(), False),
    StructField("end_char", IntegerType(), False),
    StructField("label", StringType(), False)
]))

# 定義函式取得實體
def get_entities(text):
    global nlp
    try:
        doc = nlp(text)
    except:
        nlp = spacy.load('en_ner_base_V3')
        doc = nlp(text)
    return [[e.text, e.start_char, e.end_char, e.label_] for e in doc.ents]

get_entities_udf = udf(lambda x: get_entities(x), schema)

#### 內容解密:

  • schema定義了從文字中提取的實體的結構,包括實體的文字、起始字元位置、結束字元位置和標籤。
  • get_entities函式使用spaCy模型對輸入文字進行實體識別,並傳回識別出的實體列表。

應用函式並寫入Parquet檔案

# 取得實體
documents_df = df.withColumn('entities', get_entities_udf('description'))

# 寫入Parquet檔案
outPath = getArgument("outputPath", "default")
documents_df.write.format("parquet").mode("overwrite").save(outPath)

#### 內容解密:

  • 使用withColumn方法在DataFrame中新增一個名為“entities”的列,該列的內容是透過get_entities_udf函式處理“description”列得到的。
  • 將處理後的DataFrame以Parquet格式寫入指定的輸出路徑。

排程機器學習工作(Scheduled Machine Learning Jobs)

  1. 在Databricks首頁點選“Jobs”,然後點選“Create Job”按鈕。
  2. 設定工作詳情,包括工作名稱、選擇之前建立的筆記本等。
  3. 在“Parameters”下點選“Edit”,輸入輸入路徑(inputPath)和輸出路徑(outputPath)。
  4. 在“Dependent Libraries”下新增所需的函式庫,如spaCy。
  5. 設定叢集組態,可以選擇現有的叢集或建立新的叢集。
  6. 設定排程頻率或點選“Run Now”立即執行工作。

這樣,您就成功地在Databricks上建立了一個定時執行的機器學習流程。執行的結果和日誌可以在Databricks的工作頁面檢視。

事件驅動的機器學習管線

雖然定時執行的機器學習任務對於生產環境中的重複性機器學習任務非常有幫助,但通常我們希望機器學習任務只在特定事件發生時才被觸發,例如新的資料被寫入S3儲存桶。事件驅動的管線更適合用於當我們希望機器學習模型在新資料可用時立即執行並更新輸出;否則,如果選擇定時任務,機器學習模型將不會立即在新資料上執行,而是依賴於排程。

在許多情況下,事件驅動的管線比定時任務更為合適;例如,如果我們有一個新聞推薦應用程式,我們希望應用程式在新聞更新可用時立即更新新聞推薦,而不是等待定時任務執行。

設定事件驅動的機器學習管線

讓我們來設定一個事件驅動的機器學習管線,該管線在新的資料被寫入AWS上的S3儲存桶時被觸發。為了實作這一點,我們將使用AWS Lambda函式,該函式將在S3儲存桶中有新的寫入操作時觸發我們的Databricks上的機器學習任務。

AWS Lambda非常適合這種場景,因為它們只在需要時才會觸發計算資源,一旦工作完成,計算資源就會被釋放。

設定Lambda函式

導航到AWS帳戶中的Lambda服務,並點選橙色的「Create function」按鈕。我們將從零開始建立一個函式,因此請確保選擇了「Author from scratch」標籤。

輸入函式的名稱。我們將使用Node.js來編寫函式。點選橙色的「Create function」按鈕。

編寫Lambda函式程式碼

在「Function code」部分的程式碼區塊中,輸入以下程式碼片段。您需要修改程式碼中的三個地方。首先,從Databricks的「Jobs」標籤中輸入Job ID。這應該是我們在上一節中一起建立的Job ID(參見圖11-22)。然後,在「Host」下輸入您的Databricks URL。

const https = require("https");
exports.handler = (event, context, callback) => {
    var data = JSON.stringify({
        "job_id": XXX
    });
    var options = {
        host: "XXX-XXXXXXX-XXX.cloud.databricks.com",
        port: 443,
        path: "/api/2.0/jobs/run-now",
        method: "POST",
        // authentication headers
        headers: {
            "Authorization": "Bearer XXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
            "Content-Type": "application/json",
            "Content-Length": Buffer.byteLength(data)
        }
    };
    var request = https.request(options, function(res){
        var body = "";
        res.on("data", function(data) {
            body += data;
        });
        res.on("end", function() {
            console.log(body);
        });
        res.on("error", function(e) {
            console.log("Got error: " + e.message);
        });
    });
    request.write(data);
    request.end();
};

內容解密:

  1. 引入https模組:使用const https = require("https");來引入Node.js的https模組,以便能夠傳送HTTPS請求。
  2. 定義Lambda函式處理程式exports.handler是Lambda函式的入口點,負責處理事件。
  3. 設定請求資料:將Job ID字串化為JSON格式,用於觸發Databricks上的Job。
  4. 設定請求選項:定義了HTTPS請求的選項,包括主機、埠、路徑、方法和請求頭。其中,「Authorization」頭部用於驗證,使用Bearer Token進行身份驗證。
  5. 傳送HTTPS請求:使用https.request方法傳送請求到Databricks API,以觸發指定的Job。
  6. 處理回應:對回應進行處理,包括資料接收、請求結束和錯誤處理。

新增觸發器

修改完程式碼後,讓我們新增一個觸發器。在Lambda函式頁面中,點選「Add trigger」,然後在「Trigger configuration」下選擇S3。輸入您希望Lambda函式監控的儲存桶名稱。

選擇所有物件建立事件作為觸發條件。在「Prefix」中輸入路徑字首,在「Suffix」中輸入.csv,以確保只有CSV檔案才能觸發Lambda函式。

測試Lambda函式

首先,傳回Databricks並修改Job的inputPath引數。導航到「Jobs」,選擇我們之前建立的Job,然後點選「Edit」。

確保outputPath引數設定為您希望輸出Parquet檔案的位置。

現在,我們準備測試Lambda函式。上傳train_prepared.csv到您為Lambda函式觸發器選擇的S3儲存桶和路徑。如果一切正常,您應該在Databricks的Job頁面中看到新的執行例項。

驗證結果

檢查Databricks上的Job執行結果,並驗證輸出Parquet檔案是否已寫入指定的輸出路徑。同時,也可以在Databricks的「Clusters」→「Job Clusters」下檢視用於執行Job的叢集。

使用MLflow管理機器學習生命週期

現在,我們已經展示了Spark在處理機器學習操作方面的速度,並佈署了定時和批次基礎的機器學習管線。接下來,讓我們使用開源機器學習平台MLflow將我們的模型佈署為REST API。

MLflow管理整個機器學習生命週期,包括實驗跟蹤、模型註冊和佈署。團隊可以透過MLflow更好地協作,重現同行的結果,並利用先前的實驗和建模工作。

登入和註冊模型

我們將登入和註冊在第3章中訓練的spaCy文字分類別模型,而不是命名實體識別模型。該模型接受文章描述並生成四個類別的分類別預測:Business、Sci_Tech、Sports和World。

透過使用MLflow,我們可以更好地管理模型的生命週期,並將其佈署為REST API,以供其他應用程式呼叫。

在 Databricks 上使用 MLflow 佈署 spaCy 文字分類別模型

步驟一:建立筆記本並載入必要函式庫

首先,從 Databricks 首頁建立一個名為 mlflow_spacy_model 的新筆記本,並開啟該筆記本。確保叢集已啟動並執行,且已連線到筆記本。

接下來,載入 MLflow 和 spaCy 模型所需的函式庫:

# 載入函式庫
import spacy
import mlflow
import mlflow.spacy

# 載入模型
nlp = spacy.load("en_textcat_prodigy_V3_base_full")

內容解密:

  • import spacy:載入 spaCy 函式庫,用於自然語言處理任務。
  • import mlflowimport mlflow.spacy:載入 MLflow 函式庫及其 spaCy 擴充套件,用於模型追蹤和佈署。
  • spacy.load("en_textcat_prodigy_V3_base_full"):載入預訓練的 spaCy 文字分類別模型。

步驟二:檢視模型元資料並使用 MLflow 記錄模型

檢視模型的元資料以確認文字分類別的詳細資訊:

# 列印元資料
nlp.meta

內容解密:

  • nlp.meta:顯示模型的元資料,包括模型的描述、精確度等資訊。

接下來,使用 MLflow 記錄模型:

# MLflow 追蹤
with mlflow.start_run(run_name='SpaCy-TextCat-Prodigy-V3-Base-Full'):
    mlflow.set_tag('model_flavor', 'spacy')
    mlflow.spacy.log_model(spacy_model=nlp, artifact_path='model')
    mlflow.log_metric('textcat_score', 91.774875419)
    my_run_id = mlflow.active_run().info.run_id

內容解密:

  • mlflow.start_run:開始一個新的 MLflow 執行,並指定執行的名稱。
  • mlflow.set_tag:設定執行的標籤,這裡設定 model_flavorspacy
  • mlflow.spacy.log_model:記錄 spaCy 模型,指定模型和成品路徑。
  • mlflow.log_metric:記錄模型的精確度指標,這裡是 textcat_score
  • mlflow.active_run().info.run_id:取得當前執行 ID。

步驟三:檢視實驗結果並註冊模型

執行完程式碼後,點選頁面右上角的「Experiment」標籤,檢視執行的詳細資訊。展開最新的執行,可以看到執行的詳細資訊,包括指標、引數和標籤。

導航到頁面底部,找到「Artifacts」部分,選擇「model」,可以看到 MLflow 模型的詳細資訊。點選藍色的「Register Model」按鈕,將模型註冊到模型倉函式庫。

步驟四:佈署模型並透過 REST API 提供服務

點選左側面板的「Models」,找到剛註冊的 spaCy 文字分類別模型。點選模型名稱,然後點選「Version 1」,可以檢視該版本模型的詳細資訊。

導航到「Serving」標籤,點選「Enable Serving」按鈕,啟用模型服務。啟用後,可以透過瀏覽器、cURL 或 Python 呼叫模型的 REST API。

使用瀏覽器測試 REST API

在請求框中輸入正確格式的文字列表,例如:

[
    "Reuters - Short-sellers, Wall Street's dwindling band of ultra-cynics, are seeing green again.",
    "Reuters - Private investment firm Carlyle Group, which has a reputation for making well-timed and occasionally controversial plays in the defense industry, has quietly placed its bets on another part of the market.",
    "Reuters - Soaring crude prices plus worries about the economy and the outlook"
]

內容解密:

  • 這裡輸入的是 AG News Dataset 中的文章描述,用於測試模型的文字分類別功能。

透過上述步驟,我們成功地在 Databricks 上使用 MLflow 佈署了 spaCy 文字分類別模型,並透過 REST API 提供服務。

透過REST API與cURL實作模型服務呼叫

前言

本篇文章將探討如何利用REST API和cURL技術來呼叫已佈署的機器學習模型服務。我們將以一個具體的案例來說明整個過程,包括準備資料、傳送請求以及處理回應。

準備資料

首先,我們需要準備用於測試模型服務的資料。在本案例中,我們使用了AG News Dataset,並選取了前10條新聞描述作為測試資料。

# 載入必要的函式庫
import numpy as np
import pandas as pd

# 定義讀取資料的函式
def read_data(file):
    read_path = '/content/drive/My Drive/Applied-NLP-in-the-Enterprise'
    data = pd.read_csv(read_path+file)
    return data

# 讀取資料
data = read_data('/data/ag_dataset/prepared/train_prepared.csv')

# 將前10條描述轉換為JSON格式並儲存
data.loc[:10,"description"].to_json(path_or_buf='/content/drive/My Drive/Applied-NLP-in-the-Enterprise/data/ag_dataset/prepared/sample.json', orient="records")

內容解密:

  1. 載入函式庫:匯入必要的Python函式庫,包括numpypandas,用於資料處理。
  2. 定義讀取資料函式:建立一個函式read_data,用於從指定路徑讀取CSV檔案。
  3. 讀取資料:呼叫read_data函式讀取AG News Dataset的訓練資料。
  4. 轉換為JSON:將DataFrame中的前10條新聞描述轉換為JSON格式,並儲存到指定路徑。

傳送cURL請求

接下來,我們將透過cURL命令來呼叫模型服務。

curl \
-u token:$DATABRICKS_TOKEN \
-H "Content-Type: application/json; format=pandas-records" \
-d@data.json $MODEL_PATH

內容解密:

  1. curl命令:使用curl命令傳送HTTP請求到模型服務。
  2. 認證資訊:透過-u引數傳遞Databricks的token進行身份驗證。
  3. 請求頭:設定Content-Typeapplication/json; format=pandas-records,表明請求體的格式。
  4. 請求體:使用-d引數指定JSON檔案的路徑,檔案中包含了用於預測的新聞描述。
  5. 模型路徑$MODEL_PATH代表模型服務的URL。

在Google Colab中執行cURL請求

為了在Google Colab中執行cURL請求,我們需要準備相應的引數。

# 呼叫模型 - CURL
MODEL_VERSION_URI = "XXXXXX"  # 模型路徑
DATABRICKS_TOKEN = "XXXXXX"  # Databricks token
JSON_PATH = "XXXXXX"  # JSON檔案路徑

!curl -u token:$DATABRICKS_TOKEN -H \
"Content-Type: application/json; format=pandas-records" \
-d@$JSON_PATH $MODEL_VERSION_URI

內容解密:

  1. 引數設定:分別設定MODEL_VERSION_URIDATABRICKS_TOKENJSON_PATH
  2. 執行cURL命令:使用!curl在Google Colab中執行cURL命令,呼叫模型服務。

處理回應

成功傳送請求後,我們將收到模型的預測結果。

[
    {
        "predictions": {
            "World": 0.0957181304693222,
            "Sci_Tech": 0.6589348912239075,
            "Business": 0.24223914742469788,
            "Sports": 0.0031078553292900324
        }
    },
    ...
]

內容解密:

  1. 預測結果:模型傳回每個新聞描述對應四個類別(World、Sci_Tech、Business、Sports)的預測機率。
  2. 結果解析:可以根據預測機率判斷新聞描述最可能屬於哪個類別。