Ray Serve 提供了靈活的模型部署和組合策略,允許開發者建構複雜的機器學習應用。本文將深入探討模型串接、模型廣播和條件邏輯的實作技巧,並以 NLP 管線為例,展示如何結合多個模型,例如情感分析、摘要和實體識別,來處理實際的應用場景。此外,文章也涵蓋瞭如何使用 Ray Serve 建立和管理 Ray 叢集,以實作應用程式的分散式部署和自動擴充套件。透過這些技術,開發者可以更有效地管理和部署機器學習模型,並提升應用程式的效能和可擴充套件性。
管道模式
管道模式允許使用者將多個模型串接起來,形成一個管道。每個模型都可以接收前一個模型的輸出作為輸入,並將自己的輸出傳遞給下一個模型。這種模式可以用於多種應用,例如資料預處理、特徵提取和模型組合。
以下是管道模式的範例:
@serve.deployment
class PipelineDriver:
def __init__(self, model1, model2):
self._m1 = model1
self._m2 = model2
async def __call__(self, *args) -> str:
intermediate = self._m1.remote("input")
final = self._m2.remote(intermediate)
return await final
m1 = DownstreamModel.bind("val1")
m2 = DownstreamModel.bind("val2")
pipeline_driver = PipelineDriver.bind(m1, m2)
在這個範例中,PipelineDriver
類別負責管理管道中的模型。它接收兩個模型作為輸入,model1
和 model2
,並將它們串接起來。當使用者呼叫 pipeline_driver
時,它會先呼叫 model1
,然後將 model1
的輸出傳遞給 model2
,最後傳回 model2
的輸出。
廣播模式
廣播模式允許使用者將輸入或中間結果廣播到多個模型中。這種模式可以用於整合(Ensembling)或將多個獨立模型的結果結合起來。
以下是廣播模式的範例:
@serve.deployment
class BroadcastDriver:
def __init__(self, model1, model2):
self._m1 = model1
self._m2 = model2
async def __call__(self):
results = await asyncio.gather(
self._m1.remote(),
self._m2.remote()
)
return results
m1 = DownstreamModel.bind("val1")
m2 = DownstreamModel.bind("val2")
broadcast_driver = BroadcastDriver.bind(m1, m2)
在這個範例中,BroadcastDriver
類別負責管理廣播中的模型。它接收兩個模型作為輸入,model1
和 model2
,並將輸入廣播到這兩個模型中。然後,它會等待兩個模型的結果,並傳回這兩個結果。
flowchart TD A[輸入] --> B[模型1] B --> C[模型2] C --> D[輸出]
這個圖表展示了管道模式的流程。輸入先被傳遞給模型1,然後模型1的輸出被傳遞給模型2,最後模型2的輸出被傳回給使用者。
flowchart TD A[輸入] --> B[模型1] A --> C[模型2] B --> D[輸出] C --> D
這個圖表展示了廣播模式的流程。輸入被廣播到模型1和模型2中,然後兩個模型的結果被結合起來並傳回給使用者。
平行模型呼叫與條件邏輯
在機器學習應用中,常見的模式包括單一模型呼叫、模型串接和模型廣播。然而,在某些情況下,我們需要根據特定條件來決定哪個模型應該被呼叫。這種需求可以透過Ray Serve的多模型API來實作。
平行模型呼叫
首先,讓我們看一下如何實作平行模型呼叫。假設我們有兩個下游模型(DownstreamModel),分別綁定了不同的值(“val1"和"val2”)。我們可以使用BroadcastDriver來平行呼叫這兩個模型。
@serve.deployment
class DownstreamModel:
def __init__(self, my_val: str):
self._my_val = my_val
def __call__(self):
return self._my_val
@serve.deployment
class BroadcastDriver:
def __init__(self, model1, model2):
self._m1 = model1
self._m2 = model2
async def __call__(self, *args) -> str:
output1, output2 = self._m1.remote(), self._m2.remote()
return [await output1, await output2]
m1 = DownstreamModel.bind("val1")
m2 = DownstreamModel.bind("val2")
broadcast_driver = BroadcastDriver.bind(m1, m2)
條件邏輯
接下來,讓我們看一下如何實作條件邏輯。假設我們需要根據某個條件來決定哪個模型應該被呼叫。例如,我們可以使用一個隨機數生成器來決定呼叫哪個模型。
@serve.deployment
class ConditionalDriver:
def __init__(self, model1, model2):
self._m1 = model1
self._m2 = model2
async def __call__(self, *args) -> str:
import random
if random.random() > 0.5:
return await self._m1.remote()
else:
return await self._m2.remote()
m1 = DownstreamModel.bind("val1")
m2 = DownstreamModel.bind("val2")
conditional_driver = ConditionalDriver.bind(m1, m2)
在這個例子中,ConditionalDriver會根據隨機數的值來決定呼叫哪個模型。如果隨機數大於0.5,則呼叫m1,否則呼叫m2。
flowchart TD A[開始] --> B[隨機數生成] B --> C{隨機數 > 0.5} C -->|是| D[呼叫m1] C -->|否| E[呼叫m2] D --> F[傳回m1結果] E --> F[傳回m2結果] F --> G[結束]
這個圖表展示了ConditionalDriver的工作流程。首先,生成一個隨機數,然後根據隨機數的值來決定呼叫哪個模型。最終,傳回被呼叫模型的結果。
自然語言處理(NLP)pipeline 的建構
本文中,我們將使用 Ray Serve 建構一個端到端的 NLP pipeline,該 pipeline 將提供一個維基百科摘要端點,利用多個 NLP 模型和自定義邏輯,為給定的搜尋詞彙提供最相關的維基百科頁面的摘要。
pipeline 的架構
pipeline 的架構如圖 8-2 所示:
- 使用者提供一個關鍵詞搜尋詞彙。
- 我們抓取最相關的維基百科文章的內容。
- 對文章內容進行情感分析,如果文章內容具有負面情感,則拒絕並傳回。
- 將文章內容廣播到摘要模型和命名實體識別模型。
- 根據摘要模型和命名實體識別模型的輸出,傳回一個組合結果。
所需的 Python 套件
在開始之前,您需要安裝以下 Python 套件:
pip install "ray[serve]==2.2.0" "transformers==4.21.2"
pip install "requests==2.28.1" "wikipedia==1.4.0"
抓取內容和預處理
第一步是抓取最相關的維基百科頁面,給定一個使用者提供的搜尋詞彙。為此,我們將使用 Wikipedia 套件進行搜尋和抓取頁面內容:
from typing import Optional
import wikipedia
def fetch_wikipedia_page(search_term: str) -> Optional[str]:
results = wikipedia.search(search_term)
# 如果沒有結果,傳回 None
if len(results) == 0:
return None
# 傳回最相關頁面的內容
return wikipedia.page(results[0]).content
NLP 模型
接下來,我們需要定義 ML 模型,該模型將進行 NLP 任務。第一個模型是情感分析模型,使用 Hugging Face Transformers 套件:
from ray import serve
from transformers import pipeline
from typing import List
@serve.deployment
class SentimentAnalysis:
def __init__(self):
self._classifier = pipeline("sentiment-analysis")
@serve.batch(max_batch_size=10, batch_wait_timeout_s=0.1)
async def is_positive_batched(self, inputs: List[str]) -> List[bool]:
results = self._classifier(inputs, truncation=True)
return [result["label"] == "POSITIVE" for result in results]
async def __call__(self, input_text: str) -> bool:
return await self.is_positive_batched(input_text)
我們還需要一個文字摘要模型,提供給選定的文章的摘要。該模型接受一個可選的 max_length
引數,限制摘要的長度:
@serve.deployment(num_replicas=2)
class Summarizer:
def __init__(self, max_length: Optional[int] = None):
self._summarizer = pipeline("summarization")
self._max_length = max_length
def __call__(self, input_text: str) -> str:
# ...
flowchart TD A[使用者提供搜尋詞彙] --> B[抓取維基百科頁面] B --> C[情感分析] C --> D[摘要模型] D --> E[命名實體識別模型] E --> F[傳回組合結果]
這個 pipeline 將提供一個維基百科摘要端點,利用多個 NLP 模型和自定義邏輯,為給定的搜尋詞彙提供最相關的維基百科頁面的摘要。
自然語言處理(NLP)技術在 API 中的應用
在本文中,我們將探討如何使用自然語言處理(NLP)技術建立一個強大的 API。NLP 是一種人工智慧(AI)技術,允許電腦理解和處理人類語言。
文字摘要技術
首先,我們需要建立一個文字摘要模型。這個模型可以從一段長文字中提取出最重要的資訊,然後將其壓縮成一個短小的摘要。以下是使用 Hugging Face Transformers 實作的文字摘要模型:
from transformers import pipeline
class Summarizer:
def __init__(self, max_length: int = 100):
self._summarizer = pipeline("summarization")
self._max_length = max_length
def __call__(self, input_text: str) -> str:
result = self._summarizer(
input_text, max_length=self._max_length, truncation=True
)
return result[0]["summary_text"]
這個模型使用 Hugging Face 的 pipeline
函式建立一個文字摘要模型。然後,使用 __call__
方法對輸入文字進行摘要。
實體識別技術
接下來,我們需要建立一個實體識別模型。這個模型可以從文字中提取出具體的實體,例如人名、地名、組織名等。以下是使用 Hugging Face Transformers 實作的實體識別模型:
from transformers import pipeline
class EntityRecognition:
def __init__(self, threshold: float = 0.90, max_entities: int = 10):
self._entity_recognition = pipeline("ner")
self._threshold = threshold
self._max_entities = max_entities
def __call__(self, input_text: str) -> List[str]:
final_results = []
for result in self._entity_recognition(input_text):
if result["score"] > self._threshold:
final_results.append(result["word"])
if len(final_results) == self._max_entities:
break
return final_results
這個模型使用 Hugging Face 的 pipeline
函式建立一個實體識別模型。然後,使用 __call__
方法對輸入文字進行實體識別。
HTTP 處理和驅動邏輯
最後,我們需要建立一個 HTTP API 和驅動邏輯。以下是使用 FastAPI 實作的 HTTP API:
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Response(BaseModel):
success: bool
message: str = ""
summary: str = ""
entities: List[str] = []
@app.post("/analyze")
def analyze(input_text: str):
# 進行文字摘要和實體識別
summary = Summarizer()(input_text)
entities = EntityRecognition()(input_text)
# 傳回結果
return Response(
success=True,
message="分析成功",
summary=summary,
entities=entities
)
這個 API 使用 FastAPI 框架建立一個 HTTP API。然後,使用 @app.post("/analyze")
裝飾器建立一個路由,接收輸入文字並傳回分析結果。
flowchart TD A[輸入文字] --> B[文字摘要] B --> C[實體識別] C --> D[傳回結果] D --> E[HTTP API]
這個圖表展示了文字分析的流程,從輸入文字到傳回結果。
自然語言處理管線驅動程式
為了實作自然語言處理管線,我們需要定義一個驅動程式,負責呼叫下游模型部署並解釋結果。驅動程式本身不會執行任何實際的計算,而是負責呼叫三個下游模型部署並傳回最終結果。
驅動程式定義
from fastapi import FastAPI
app = FastAPI()
class NLPPipelineDriver:
def __init__(self, sentiment_analysis, summarizer, entity_recognition):
self._sentiment_analysis = sentiment_analysis
self._summarizer = summarizer
self._entity_recognition = entity_recognition
@app.get("/", response_model=Response)
async def summarize_article(self, search_term: str) -> Response:
# 取得搜尋結果的頁面內容
page_content = fetch_wikipedia_page(search_term)
if page_content is None:
return Response(success=False, message="No pages found.")
# 進行情感分析
is_positive = await self._sentiment_analysis.remote(page_content)
if not is_positive:
return Response(success=False, message="Only positivity allowed!")
# 平行呼叫摘要和實體識別模型
summary_result = self._summarizer.remote(page_content)
entities_result = self._entity_recognition.remote(page_content)
return Response(
success=True,
summary=await summary_result,
named_entities=await entities_result
)
管線工作流程
- 取得搜尋結果的頁面內容。
- 進行情感分析,如果結果為負面,則終止並傳回錯誤。
- 平行呼叫摘要和實體識別模型。
- 將模型結果合併並傳回最終結果。
執行管線
現在,我們已經定義了所有核心邏輯。接下來,只需要繫結部署圖並執行即可。
flowchart TD A[搜尋結果] --> B[情感分析] B --> C[摘要和實體識別] C --> D[傳回結果]
驅動程式的 summarize_article
方法負責取得搜尋結果的頁面內容,進行情感分析,然後平行呼叫摘要和實體識別模型。最終結果是合併模型結果並傳回最終結果。這個過程中,驅動程式不會阻塞下游模型的呼叫,允許多個請求平行執行。
自然語言處理管線驅動器
為了建立一個完整的自然語言處理(NLP)管線,我們需要將多個模型和業務邏輯整合在一起。這包括情感分析、摘要和實體識別等任務。以下是如何使用Ray Serve建立一個NLP管線驅動器的示例。
模型繫結
首先,我們需要將每個模型繫結到Ray Serve的部署中。這包括情感分析、摘要和實體識別模型。每個模型可以有自己的輸入引數,例如實體識別模型的閾值和最大實體數量。
sentiment_analysis = SentimentAnalysis.bind()
summarizer = Summarizer.bind()
entity_recognition = EntityRecognition.bind(threshold=0.95, max_entities=5)
管線驅動器
接下來,我們需要建立一個管線驅動器來協調這些模型之間的計算。管線驅動器需要參照每個模型,以便它可以呼叫它們。
nlp_pipeline_driver = NLPPipelineDriver.bind(
sentiment_analysis, summarizer, entity_recognition
)
執行管線
現在,我們可以使用serve run
命令來執行管線。這將部署每個模型和驅動器到本地,並使驅動器可用於呼叫。
serve run app:nlp_pipeline_driver
測試管線
我們可以使用requests
庫來測試管線。首先,讓我們嘗試查詢一個不存在的頁面。
import requests
print(requests.get("http://localhost:8000/nlp_pipeline_driver?query=不存在的頁面").text)
這應該會傳回一個「沒有找到頁面」的訊息。接下來,讓我們嘗試查詢一個比較常見的主題。
print(requests.get("http://localhost:8000/nlp_pipeline_driver?query=歷史").text)
這可能會傳回一個「只有正面情緒允許」的訊息,因為這篇文章可能對於我們的情感分析模型來說太負面了。最後,讓我們嘗試查詢一個中立的主題。
print(requests.get("http://localhost:8000/nlp_pipeline_driver?query=科學").text)
這應該會傳回一個成功的訊息,包括一個科學文章的摘要和相關實體的列表。
Ray 叢集
Ray 叢集是 Ray 的一個重要特性,允許使用者將應用程式無縫擴充套件到多臺機器上。在本章中,我們將介紹如何建立和管理 Ray 叢集。
手動建立 Ray 叢集
手動建立 Ray 叢集是最基本的方法。要建立一個 Ray 叢集,我們需要有一份可以相互通訊的機器列表,並且每臺機器上都需要安裝 Ray。
首先,選擇一臺機器作為頭結點,在這臺機器上執行以下命令:
ray start --head --port=6379
這個命令會啟動 Ray 的全球控制伺服器(GCS),並列印出頭結點的 IP 地址和埠號。
接下來,其他機器可以透過以下命令連線到頭結點:
ray start --address='<head-address>:6379'
請注意,需要替換 <head-address>
為頭結點的實際 IP 地址和埠號。
使用 Kubernetes Operator 建立 Ray 叢集
Kubernetes Operator 是一個更方便的方法,允許使用者使用 Kubernetes 來建立和管理 Ray 叢集。使用 Kubernetes Operator,可以自動化 Ray 叢集的建立和管理過程。
使用 Cluster Launcher CLI 工具建立 Ray 叢集
Cluster Launcher CLI 工具是另一個建立 Ray 叢集的方法。這個工具允許使用者使用命令列介面建立和管理 Ray 叢集。
Ray 叢集的自動擴充套件
Ray 叢集可以自動擴充套件,允許使用者根據需求動態增加或減少叢集中的機器數量。這個功能可以透過 Kubernetes Operator 或 Cluster Launcher CLI 工具來實作。
在這個章節中,我們使用了 Ray 的 start
命令來建立和管理 Ray 叢集。start
命令可以用來啟動 Ray 的全球控制伺服器(GCS),並連線到其他機器。透過 --head
和 --address
引數,可以指定頭結點和其他機器的 IP 地址和埠號。
graph LR A[頭結點] -->|啟動 Ray GCS| B[Ray GCS] B -->|連線到其他機器| C[其他機器] C -->|啟動 Ray| D[Ray]
這個圖表展示瞭如何建立和管理 Ray 叢集。首先,頭結點啟動 Ray 的全球控制伺服器(GCS),然後其他機器連線到頭結點,最後啟動 Ray。
Ray Serve的模型組合模式,包含管道和廣播,為機器學習應用部署提供了高度靈活性。深入剖析這兩種模式的核心機制,可以發現它們有效解決了複雜機器學習任務拆解和整合的關鍵挑戰。管道模式透過模型串接,實作了資料流的順暢傳遞和處理,尤其適用於多階段處理流程,例如自然語言處理中的文字預處理、情感分析、摘要提取等。廣播模式則透過將輸入廣播至多個模型,實作平行處理和結果整合,適用於模型整合和多模型決策場景。
技術限制深析方面,雖然Ray Serve簡化了模型組合的流程,但仍需仔細考量模型間的依賴關係、資料傳輸效率和錯誤處理機制。例如,在管道模式中,上游模型的錯誤會影響下游模型的執行,需要設計相應的錯誤隔離和恢復策略。廣播模式則需要關注平行任務的資源分配和同步問題。此外,模型組合的複雜度提升也增加了除錯和維護的難度,需要更完善的監控和日誌系統。
技術演進預測方面,隨著Serverless計算和模型即服務(Model-as-a-Service)的興起,Ray Serve這類模型組合框架將扮演更重要的角色。預計未來會出現更自動化的模型組合方案,例如根據AutoML技術的自動管線生成和最佳化。同時,模型組合的標準化和可移植性也將成為重要的發展方向,以促進不同平臺和框架之間的模型互通。
玄貓認為,Ray Serve的模型組合模式為構建高效能、可擴充套件的機器學習應用提供了堅實基礎。開發團隊應深入理解其工作原理和最佳實務,並結合自身業務需求,才能充分發揮其效能優勢,打造更具競爭力的AI解決方案。