Ray Serve 是一個用於建構線上推理 API 的 Ray 原生函式庫,專注於解決生產環境中佈署機器學習模型的挑戰。它能有效擴充套件模型、分配資源,並將多個模型與商業邏輯整合。本文將示範如何使用 Ray Serve 建構一個整合情感分析、文字摘要和命名實體識別模型的 NLP API,並提供程式碼範例與佈署,包含手動建立 Ray 叢集與 Kubernetes 佈署的詳細步驟。透過 Ray Serve,我們可以有效地管理模型佈署、擴充套件和資源分配,簡化線上推理 API 的建構流程。文章中也包含了使用 Pydantic 定義 API 回應模型、使用 FastAPI 建構 HTTP 處理邏輯,以及使用 Ray Cluster 進行擴充套件的說明,提供一個完整的實作範例。此外,文章也說明瞭如何使用 KubeRay Operator 在 Kubernetes 上佈署和管理 Ray 叢集,並提供了一些常見問題的解決方案,例如連線失敗的處理和連線成功的驗證。
建構以 NLP 為基礎的 API:一個完整的例項
介紹
在本章節中,我們將展示如何使用 Ray Serve 建構一個以自然語言處理(NLP)為基礎的 API。此 API 將會結合多個 NLP 模型,包括情感分析、文字摘要和命名實體識別,以提供對輸入文字的深入分析。
NLP 模型佈署
情感分析模型
首先,我們定義一個情感分析模型,用於判斷輸入文字的情感是正面還是負面。這個模型將根據其判斷結果決定是否繼續後續的處理流程。
@serve.deployment
class SentimentAnalysis:
def __init__(self):
self._sentiment_classifier = pipeline("sentiment-analysis")
def __call__(self, input_text: str) -> bool:
result = self._sentiment_classifier(input_text)
return result[0]["label"] == "POSITIVE"
內容解密:
SentimentAnalysis類別初始化了一個情感分析的 pipeline,用於對輸入文字進行情感判斷。__call__方法接收輸入文字,傳回一個布林值,表示文字的情感是否為正面。
文字摘要模型
接下來,我們使用一個文字摘要模型來為選定的文章提供簡潔的摘要。這個模型可以接受一個可選的 max_length 引數來限制摘要的長度。
@serve.deployment(num_replicas=2)
class Summarizer:
def __init__(self, max_length: Optional[int] = None):
self._summarizer = pipeline("summarization")
self._max_length = max_length
def __call__(self, input_text: str) -> str:
result = self._summarizer(input_text, max_length=self._max_length, truncation=True)
return result[0]["summary_text"]
內容解密:
Summarizer類別初始化了一個文字摘要的 pipeline,用於對輸入文字進行摘要。num_replicas=2表示該佈署將會啟動兩個副本,以提高處理並發請求的能力。__call__方法接收輸入文字,傳回一個字串,表示文字的摘要。
命名實體識別模型
最後,我們定義了一個命名實體識別模型,用於從文字中提取命名實體。這個模型可以根據置信度閾值過濾結果,並限制傳回的實體數量。
@serve.deployment
class EntityRecognition:
def __init__(self, threshold: float = 0.90, max_entities: int = 10):
self._entity_recognition = pipeline("ner")
self._threshold = threshold
self._max_entities = max_entities
def __call__(self, input_text: str) -> List[str]:
final_results = []
for result in self._entity_recognition(input_text):
if result["score"] > self._threshold:
final_results.append(result["word"])
if len(final_results) == self._max_entities:
break
return final_results
內容解密:
EntityRecognition類別初始化了一個命名實體識別的 pipeline,用於對輸入文字進行實體識別。threshold和max_entities引數用於控制傳回結果的置信度和數量。__call__方法接收輸入文字,傳回一個列表,表示提取出的命名實體。
HTTP 處理和驅動邏輯
定義 API 回應模型
我們使用 Pydantic 定義了 API 的回應模型,包括是否成功、狀態訊息、摘要和命名實體等資訊。
from pydantic import BaseModel
class Response(BaseModel):
success: bool
message: str = ""
summary: str = ""
named_entities: List[str] = []
定義驅動佈署
驅動佈署負責協調下游模型的呼叫,並根據結果傳回正確的回應。
from fastapi import FastAPI
app = FastAPI()
@serve.deployment
@serve.ingress(app)
class NLPPipelineDriver:
def __init__(self, sentiment_analysis, summarizer, entity_recognition):
self._sentiment_analysis = sentiment_analysis
self._summarizer = summarizer
self._entity_recognition = entity_recognition
@app.get("/", response_model=Response)
async def summarize_article(self, search_term: str) -> Response:
# Fetch the top page content for the search term if found.
page_content = fetch_wikipedia_page(search_term)
if page_content is None:
return Response(success=False, message="No pages found.")
# Conditionally continue based on the sentiment analysis.
is_positive = await self._sentiment_analysis.remote(page_content)
if not is_positive:
return Response(success=False, message="Only positivitiy allowed!")
# Query the summarizer and named entity recognition models in parallel.
summary_result = self._summarizer.remote(page_content)
entities_result = self._entity_recognition.remote(page_content)
return Response(
success=True,
summary=await summary_result,
named_entities=await entities_result
)
內容解密:
NLPPipelineDriver類別初始化了三個 NLP 模型,分別用於情感分析、文字摘要和命名實體識別。summarize_article方法負責處理 API 請求,首先進行情感分析,然後平行呼叫文字摘要和命名實體識別模型,最後傳回整合後的結果。
綜合上述步驟
首先,我們需要例項化每個佈署,並將相關的輸入引數傳遞給它們。然後,使用 serve run 命令執行整個 NLP 管道。
serve run app:nlp_pipeline_driver
這將在本機佈署四個佈署,並使驅動程式可在 http://localhost:8000 上存取。我們可以使用請求查詢管道,以觀察其執行情況。
使用Ray Serve建立線上NLP API
Ray Serve是一個用於建置線上推理API的Ray原生函式庫,專注於解決在生產環境中提供機器學習模型的獨特挑戰。透過Ray Serve,我們可以有效地擴充套件模型、分配資源,並將多個模型與商業邏輯組合在一起。
建立NLP API
首先,我們需要建立一個NLP API,該API將處理搜尋請求並傳回相關結果。讓我們嘗試查詢一個不存在的頁面:
print(requests.get("http://localhost:8000/", params={"search_term": "不存在的頁面"}).text)
輸出結果為:
'{"success":false,"message":"No pages found","summary":"","named_entities":[]}'
接下來,我們嘗試查詢一些更常見的內容:
print(requests.get("http://localhost:8000/", params={"search_term": "war"}).text)
輸出結果為:
'{"success":false,"message":"Only positivitiy allowed!","summary":"","named_entities":[]}'
這次查詢失敗是因為我們的情感分析器認為結果太負面。讓我們嘗試查詢一些更中立的內容:
print(requests.get("http://localhost:8000/", params={"search_term": "physicist"}).text)
輸出結果為:
'{"success":true,"message":"","summary":" 物理學是研究物質、其基本組成、運動和透過空間和時間的行為的自然科學...","named_entities":["科學革命", "古代", "希臘", "埃及人"]}'
內容解密:
此段程式碼展示瞭如何使用Ray Serve建立一個線上NLP API,該API可以處理搜尋請求並傳回相關結果。程式碼中使用了requests函式庫來傳送HTTP請求到API端點,並傳遞搜尋詞作為引數。API傳回的結果是一個JSON物件,包含了搜尋結果的摘要和相關的命名實體。
Ray Serve的功能和優勢
Ray Serve提供了以下功能和優勢:
- 有效地擴充套件模型和分配資源
- 將多個模型與商業邏輯組合在一起
- 提供了一個通用的解決方案,避免了供應商鎖定
使用Ray Cluster進行擴充套件
Ray Cluster是Ray的一個關鍵功能,它允許我們將應用程式擴充套件到多台機器上。我們可以使用Ray Cluster Launcher或Kubernetes運算元來啟動和管理Ray Cluster。
手動建立Ray Cluster
要手動建立Ray Cluster,我們需要在每台機器上安裝Ray,並選擇一台機器作為頭節點。在頭節點上執行以下命令:
ray start --head --port=6379
這將啟動Ray GCS伺服器並列印出其位址。然後,我們可以在其他節點上執行以下命令來連線到頭節點:
ray start --address=<head-address>:6379
內容解密:
此段程式碼展示瞭如何手動建立一個Ray Cluster。首先,我們需要在頭節點上執行ray start --head --port=6379命令來啟動Ray GCS伺服器。然後,我們可以在其他節點上執行ray start --address=<head-address>:6379命令來連線到頭節點。這樣,我們就可以建立一個由多台機器組成的Ray Cluster。
手動建立 Ray 叢集與 Kubernetes 佈署
在前面的章節中,我們討論了 Ray 的基本概念及其在分散式運算中的重要性。現在,讓我們探討如何手動建立 Ray 叢集以及如何使用 Kubernetes 進行佈署。
手動建立 Ray 叢集
要手動建立 Ray 叢集,首先需要在主節點(head node)上啟動 Ray。啟動主節點的命令如下:
ray start --head --address=<head-address>
當主節點成功啟動後,您可以透過在其他節點上執行以下命令來將它們連線到叢集:
ray start --address=<head-address>
連線失敗的處理
如果連線失敗,您可能會看到「Unable to connect to GCS at …」的錯誤訊息。這通常是由於主節點無法存取或指定的地址錯誤所致。您可以使用 nmap 或 nc 工具檢查埠是否可存取。
$ nmap -sV --reason -p $PORT $HEAD_ADDRESS
$ nc -vv -z $HEAD_ADDRESS $PORT
連線成功的驗證
如果連線成功,您應該能夠看到類別似以下的輸出:
$ nmap -sV --reason -p $PORT $HEAD_ADDRESS
Nmap scan report for compute04.berkeley.edu (123.456.78.910)
Host is up, received echo-reply ttl 60 (0.00087s latency).
rDNS record for 123.456.78.910: compute04.berkeley.edu
PORT STATE SERVICE REASON VERSION
6379/tcp open redis? syn-ack
Service detection performed. Please report any incorrect results at https://nmap.org/submit/ .
$ nc -vv -z $HEAD_ADDRESS $PORT
Connection to compute04.berkeley.edu 6379 port [tcp/...] succeeded!
#### 內容解密:
$PORT和$HEAD_ADDRESS是環境變數,分別代表需要檢查的埠和主節點的地址。nmap命令用於掃描指定埠的狀態,而nc命令則用於測試與該埠的連線。- 如果連線成功,則表示該埠是開放的,可以正常存取。
Kubernetes 上的佈署
Kubernetes 是業界標準的叢集資源管理平台,能夠無縫地佈署、管理和擴充套件業務應用程式。KubeRay 專案提供了在 Kubernetes 上佈署和管理 Ray 叢集的標準方法。
KubeRay Operator 的主要功能
- 透過自定義資源管理 RayCluster。
- 支援單個 Ray Cluster 中的異質工作節點型別。
- 內建 Prometheus 監控。
- 使用 PodTemplate 建立 Ray pods。
- 根據執行中的 pods 更新狀態。
- 自動填充容器中的環境變數。
- 自動在容器命令前新增 Ray start 命令。
- 自動在
/dev/shm新增分享記憶體的 volumeMount。
設定您的第一個 KubeRay 叢集
首先,需要透過以下命令佈署 KubeRay operator:
export KUBERAY_VERSION=v0.3.0
kubectl create -k "github.com/ray-project/kuberay/manifests/cluster-scope-resources?ref=${KUBERAY_VERSION}&timeout=90s"
kubectl apply -k "github.com/ray-project/kuberay/manifests/base?ref=${KUBERAY_VERSION}&timeout=90s"
然後,可以透過以下命令驗證 operator 是否已成功佈署:
kubectl -n ray-system get pods
接下來,下載預設的叢集組態檔案並建立一個新的 Ray Cluster:
wget "https://raw.githubusercontent.com/ray-project/kuberay/${KUBERAY_VERSION}/ray-operator/config/samples/ray-cluster.complete.yaml"
kubectl create -f ray-cluster.complete.yaml
#### 內容解密:
KUBERAY_VERSION環境變數指定了 KubeRay 的版本。kubectl create和kubectl apply命令用於建立和組態 KubeRay operator 的資源。ray-cluster.complete.yaml檔案包含了預設的 Ray Cluster 組態。
Kubernetes 服務的識別
建立叢集後,可以透過以下命令識別 KubeRay operator 組態的 Kubernetes 服務:
kubectl get service --selector=ray.io/cluster=raycluster-complete
輸出的服務資訊中,包含了三個重要的埠:6379、8265 和 10001,分別對應於 Ray head pod 的 GCS 服務、Ray Dashboard 和 Ray Job Submission 服務,以及 Ray Client 伺服器。
#### 內容解密:
kubectl get service命令用於取得 Kubernetes 中的服務資訊。--selector引數用於篩選具有特定標籤的服務。- 輸出的服務資訊包括服務名稱、型別、叢集 IP 和外部 IP,以及埠和年齡等。