Ray Serve 是一個用於建構線上推理 API 的 Ray 原生函式庫,專注於解決生產環境中佈署機器學習模型的挑戰。它能有效擴充套件模型、分配資源,並將多個模型與商業邏輯整合。本文將示範如何使用 Ray Serve 建構一個整合情感分析、文字摘要和命名實體識別模型的 NLP API,並提供程式碼範例與佈署,包含手動建立 Ray 叢集與 Kubernetes 佈署的詳細步驟。透過 Ray Serve,我們可以有效地管理模型佈署、擴充套件和資源分配,簡化線上推理 API 的建構流程。文章中也包含了使用 Pydantic 定義 API 回應模型、使用 FastAPI 建構 HTTP 處理邏輯,以及使用 Ray Cluster 進行擴充套件的說明,提供一個完整的實作範例。此外,文章也說明瞭如何使用 KubeRay Operator 在 Kubernetes 上佈署和管理 Ray 叢集,並提供了一些常見問題的解決方案,例如連線失敗的處理和連線成功的驗證。

建構以 NLP 為基礎的 API:一個完整的例項

介紹

在本章節中,我們將展示如何使用 Ray Serve 建構一個以自然語言處理(NLP)為基礎的 API。此 API 將會結合多個 NLP 模型,包括情感分析、文字摘要和命名實體識別,以提供對輸入文字的深入分析。

NLP 模型佈署

情感分析模型

首先,我們定義一個情感分析模型,用於判斷輸入文字的情感是正面還是負面。這個模型將根據其判斷結果決定是否繼續後續的處理流程。

@serve.deployment
class SentimentAnalysis:
    def __init__(self):
        self._sentiment_classifier = pipeline("sentiment-analysis")

    def __call__(self, input_text: str) -> bool:
        result = self._sentiment_classifier(input_text)
        return result[0]["label"] == "POSITIVE"

內容解密:

  • SentimentAnalysis 類別初始化了一個情感分析的 pipeline,用於對輸入文字進行情感判斷。
  • __call__ 方法接收輸入文字,傳回一個布林值,表示文字的情感是否為正面。

文字摘要模型

接下來,我們使用一個文字摘要模型來為選定的文章提供簡潔的摘要。這個模型可以接受一個可選的 max_length 引數來限制摘要的長度。

@serve.deployment(num_replicas=2)
class Summarizer:
    def __init__(self, max_length: Optional[int] = None):
        self._summarizer = pipeline("summarization")
        self._max_length = max_length

    def __call__(self, input_text: str) -> str:
        result = self._summarizer(input_text, max_length=self._max_length, truncation=True)
        return result[0]["summary_text"]

內容解密:

  • Summarizer 類別初始化了一個文字摘要的 pipeline,用於對輸入文字進行摘要。
  • num_replicas=2 表示該佈署將會啟動兩個副本,以提高處理並發請求的能力。
  • __call__ 方法接收輸入文字,傳回一個字串,表示文字的摘要。

命名實體識別模型

最後,我們定義了一個命名實體識別模型,用於從文字中提取命名實體。這個模型可以根據置信度閾值過濾結果,並限制傳回的實體數量。

@serve.deployment
class EntityRecognition:
    def __init__(self, threshold: float = 0.90, max_entities: int = 10):
        self._entity_recognition = pipeline("ner")
        self._threshold = threshold
        self._max_entities = max_entities

    def __call__(self, input_text: str) -> List[str]:
        final_results = []
        for result in self._entity_recognition(input_text):
            if result["score"] > self._threshold:
                final_results.append(result["word"])
            if len(final_results) == self._max_entities:
                break
        return final_results

內容解密:

  • EntityRecognition 類別初始化了一個命名實體識別的 pipeline,用於對輸入文字進行實體識別。
  • thresholdmax_entities 引數用於控制傳回結果的置信度和數量。
  • __call__ 方法接收輸入文字,傳回一個列表,表示提取出的命名實體。

HTTP 處理和驅動邏輯

定義 API 回應模型

我們使用 Pydantic 定義了 API 的回應模型,包括是否成功、狀態訊息、摘要和命名實體等資訊。

from pydantic import BaseModel

class Response(BaseModel):
    success: bool
    message: str = ""
    summary: str = ""
    named_entities: List[str] = []

定義驅動佈署

驅動佈署負責協調下游模型的呼叫,並根據結果傳回正確的回應。

from fastapi import FastAPI

app = FastAPI()

@serve.deployment
@serve.ingress(app)
class NLPPipelineDriver:
    def __init__(self, sentiment_analysis, summarizer, entity_recognition):
        self._sentiment_analysis = sentiment_analysis
        self._summarizer = summarizer
        self._entity_recognition = entity_recognition

    @app.get("/", response_model=Response)
    async def summarize_article(self, search_term: str) -> Response:
        # Fetch the top page content for the search term if found.
        page_content = fetch_wikipedia_page(search_term)
        if page_content is None:
            return Response(success=False, message="No pages found.")
        
        # Conditionally continue based on the sentiment analysis.
        is_positive = await self._sentiment_analysis.remote(page_content)
        if not is_positive:
            return Response(success=False, message="Only positivitiy allowed!")
        
        # Query the summarizer and named entity recognition models in parallel.
        summary_result = self._summarizer.remote(page_content)
        entities_result = self._entity_recognition.remote(page_content)
        return Response(
            success=True,
            summary=await summary_result,
            named_entities=await entities_result
        )

內容解密:

  • NLPPipelineDriver 類別初始化了三個 NLP 模型,分別用於情感分析、文字摘要和命名實體識別。
  • summarize_article 方法負責處理 API 請求,首先進行情感分析,然後平行呼叫文字摘要和命名實體識別模型,最後傳回整合後的結果。

綜合上述步驟

首先,我們需要例項化每個佈署,並將相關的輸入引數傳遞給它們。然後,使用 serve run 命令執行整個 NLP 管道。

serve run app:nlp_pipeline_driver

這將在本機佈署四個佈署,並使驅動程式可在 http://localhost:8000 上存取。我們可以使用請求查詢管道,以觀察其執行情況。

使用Ray Serve建立線上NLP API

Ray Serve是一個用於建置線上推理API的Ray原生函式庫,專注於解決在生產環境中提供機器學習模型的獨特挑戰。透過Ray Serve,我們可以有效地擴充套件模型、分配資源,並將多個模型與商業邏輯組合在一起。

建立NLP API

首先,我們需要建立一個NLP API,該API將處理搜尋請求並傳回相關結果。讓我們嘗試查詢一個不存在的頁面:

print(requests.get("http://localhost:8000/", params={"search_term": "不存在的頁面"}).text)

輸出結果為:

'{"success":false,"message":"No pages found","summary":"","named_entities":[]}'

接下來,我們嘗試查詢一些更常見的內容:

print(requests.get("http://localhost:8000/", params={"search_term": "war"}).text)

輸出結果為:

'{"success":false,"message":"Only positivitiy allowed!","summary":"","named_entities":[]}'

這次查詢失敗是因為我們的情感分析器認為結果太負面。讓我們嘗試查詢一些更中立的內容:

print(requests.get("http://localhost:8000/", params={"search_term": "physicist"}).text)

輸出結果為:

'{"success":true,"message":"","summary":" 物理學是研究物質、其基本組成、運動和透過空間和時間的行為的自然科學...","named_entities":["科學革命", "古代", "希臘", "埃及人"]}'

內容解密:

此段程式碼展示瞭如何使用Ray Serve建立一個線上NLP API,該API可以處理搜尋請求並傳回相關結果。程式碼中使用了requests函式庫來傳送HTTP請求到API端點,並傳遞搜尋詞作為引數。API傳回的結果是一個JSON物件,包含了搜尋結果的摘要和相關的命名實體。

Ray Serve的功能和優勢

Ray Serve提供了以下功能和優勢:

  • 有效地擴充套件模型和分配資源
  • 將多個模型與商業邏輯組合在一起
  • 提供了一個通用的解決方案,避免了供應商鎖定

使用Ray Cluster進行擴充套件

Ray Cluster是Ray的一個關鍵功能,它允許我們將應用程式擴充套件到多台機器上。我們可以使用Ray Cluster Launcher或Kubernetes運算元來啟動和管理Ray Cluster。

手動建立Ray Cluster

要手動建立Ray Cluster,我們需要在每台機器上安裝Ray,並選擇一台機器作為頭節點。在頭節點上執行以下命令:

ray start --head --port=6379

這將啟動Ray GCS伺服器並列印出其位址。然後,我們可以在其他節點上執行以下命令來連線到頭節點:

ray start --address=<head-address>:6379

內容解密:

此段程式碼展示瞭如何手動建立一個Ray Cluster。首先,我們需要在頭節點上執行ray start --head --port=6379命令來啟動Ray GCS伺服器。然後,我們可以在其他節點上執行ray start --address=<head-address>:6379命令來連線到頭節點。這樣,我們就可以建立一個由多台機器組成的Ray Cluster。

手動建立 Ray 叢集與 Kubernetes 佈署

在前面的章節中,我們討論了 Ray 的基本概念及其在分散式運算中的重要性。現在,讓我們探討如何手動建立 Ray 叢集以及如何使用 Kubernetes 進行佈署。

手動建立 Ray 叢集

要手動建立 Ray 叢集,首先需要在主節點(head node)上啟動 Ray。啟動主節點的命令如下:

ray start --head --address=<head-address>

當主節點成功啟動後,您可以透過在其他節點上執行以下命令來將它們連線到叢集:

ray start --address=<head-address>

連線失敗的處理

如果連線失敗,您可能會看到「Unable to connect to GCS at …」的錯誤訊息。這通常是由於主節點無法存取或指定的地址錯誤所致。您可以使用 nmapnc 工具檢查埠是否可存取。

$ nmap -sV --reason -p $PORT $HEAD_ADDRESS
$ nc -vv -z $HEAD_ADDRESS $PORT

連線成功的驗證

如果連線成功,您應該能夠看到類別似以下的輸出:

$ nmap -sV --reason -p $PORT $HEAD_ADDRESS
Nmap scan report for compute04.berkeley.edu (123.456.78.910)
Host is up, received echo-reply ttl 60 (0.00087s latency).
rDNS record for 123.456.78.910: compute04.berkeley.edu
PORT STATE SERVICE REASON VERSION
6379/tcp open redis? syn-ack
Service detection performed. Please report any incorrect results at https://nmap.org/submit/ .
$ nc -vv -z $HEAD_ADDRESS $PORT
Connection to compute04.berkeley.edu 6379 port [tcp/...] succeeded!

#### 內容解密:

  • $PORT$HEAD_ADDRESS 是環境變數,分別代表需要檢查的埠和主節點的地址。
  • nmap 命令用於掃描指定埠的狀態,而 nc 命令則用於測試與該埠的連線。
  • 如果連線成功,則表示該埠是開放的,可以正常存取。

Kubernetes 上的佈署

Kubernetes 是業界標準的叢集資源管理平台,能夠無縫地佈署、管理和擴充套件業務應用程式。KubeRay 專案提供了在 Kubernetes 上佈署和管理 Ray 叢集的標準方法。

KubeRay Operator 的主要功能

  • 透過自定義資源管理 RayCluster。
  • 支援單個 Ray Cluster 中的異質工作節點型別。
  • 內建 Prometheus 監控。
  • 使用 PodTemplate 建立 Ray pods。
  • 根據執行中的 pods 更新狀態。
  • 自動填充容器中的環境變數。
  • 自動在容器命令前新增 Ray start 命令。
  • 自動在 /dev/shm 新增分享記憶體的 volumeMount。

設定您的第一個 KubeRay 叢集

首先,需要透過以下命令佈署 KubeRay operator:

export KUBERAY_VERSION=v0.3.0
kubectl create -k "github.com/ray-project/kuberay/manifests/cluster-scope-resources?ref=${KUBERAY_VERSION}&timeout=90s"
kubectl apply -k "github.com/ray-project/kuberay/manifests/base?ref=${KUBERAY_VERSION}&timeout=90s"

然後,可以透過以下命令驗證 operator 是否已成功佈署:

kubectl -n ray-system get pods

接下來,下載預設的叢集組態檔案並建立一個新的 Ray Cluster:

wget "https://raw.githubusercontent.com/ray-project/kuberay/${KUBERAY_VERSION}/ray-operator/config/samples/ray-cluster.complete.yaml"
kubectl create -f ray-cluster.complete.yaml

#### 內容解密:

  • KUBERAY_VERSION 環境變數指定了 KubeRay 的版本。
  • kubectl createkubectl apply 命令用於建立和組態 KubeRay operator 的資源。
  • ray-cluster.complete.yaml 檔案包含了預設的 Ray Cluster 組態。

Kubernetes 服務的識別

建立叢集後,可以透過以下命令識別 KubeRay operator 組態的 Kubernetes 服務:

kubectl get service --selector=ray.io/cluster=raycluster-complete

輸出的服務資訊中,包含了三個重要的埠:6379、8265 和 10001,分別對應於 Ray head pod 的 GCS 服務、Ray Dashboard 和 Ray Job Submission 服務,以及 Ray Client 伺服器。

#### 內容解密:

  • kubectl get service 命令用於取得 Kubernetes 中的服務資訊。
  • --selector 引數用於篩選具有特定標籤的服務。
  • 輸出的服務資訊包括服務名稱、型別、叢集 IP 和外部 IP,以及埠和年齡等。