Ray Serve 提供了簡潔易用的介面,方便開發者快速佈署機器學習模型,並構建線上推理應用。透過調整副本數量和資源分配,可以有效控制服務效能。批次處理功能則能提升模型推論效率,尤其適用於向量化運算的場景。此外,Ray Serve 支援多種多模型服務模式,例如Pipeline、廣播和條件邏輯控制,賦予開發者高度靈活性,以應對複雜的線上推理需求。結合 FastAPI 等 Web 框架,可以輕鬆建構功能完善的線上推理 API,提供穩定的服務。
線上推理與 Ray Serve 的進階應用
調整佈署規模與資源分配
Ray Serve 允許使用者透過兩種方式調整佈署的資源:改變佈署的副本數量以及調整每個副本的資源分配。預設情況下,一個佈署包含一個使用單一 CPU 的副本,但這些引數可以在 @serve.deployment 裝飾器中調整。
以下範例將 SentimentClassifier 修改為多個副本,並將每個副本的 CPU 資源調整為 2:
app = FastAPI()
@serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 2})
@serve.ingress(app)
class SentimentAnalysis:
def __init__(self):
self._classifier = pipeline("sentiment-analysis")
@app.get("/")
def classify(self, input_text: str) -> str:
import os
print("from process:", os.getpid())
return self._classifier(input_text)[0]["label"]
scaled_deployment = SentimentAnalysis.bind()
內容解密:
@serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 2}):此行程式碼設定佈署的副本數量為 2,每個副本使用 2 個 CPU。self._classifier = pipeline("sentiment-analysis"):在此初始化了一個用於情感分析的 Hugging Face pipeline。print("from process:", os.getpid()):這行程式碼印出處理請求的程式 ID,以展示請求現在被負載平衡到兩個副本上。
資源分配策略
Ray Serve 支援多種資源分配策略,例如:
- 使用 GPU:只需設定
num_gpus而非num_cpus。 - 分數資源:允許副本有效地分享資源,如將
num_gpus設定為 0.5 以在兩個模型之間分享一個 GPU。 - 自動擴充套件:根據請求數量動態調整副本數量。
請求批次處理
許多機器學習模型可以高效地向量化處理,Ray Serve 提供了伺服器端批次處理的支援。使用 @serve.batch 裝飾器,可以輕鬆實作批次處理。
app = FastAPI()
@serve.deployment
@serve.ingress(app)
class SentimentAnalysis:
def __init__(self):
self._classifier = pipeline("sentiment-analysis")
@serve.batch(max_batch_size=10, batch_wait_timeout_s=0.1)
async def classify_batched(self, batched_inputs):
print("Got batch size:", len(batched_inputs))
results = self._classifier(batched_inputs)
return [result["label"] for result in results]
@app.get("/")
async def classify(self, input_text: str) -> str:
return await self.classify_batched(input_text)
batched_deployment = SentimentAnalysis.bind()
內容解密:
@serve.batch(max_batch_size=10, batch_wait_timeout_s=0.1):此裝飾器啟用了批次處理,最大批次大小為 10,等待超時為 0.1 秒。classify_batched方法對輸入進行批次處理,並傳回對應的結果列表。classify方法現在是非同步的,並呼叫classify_batched方法。
多模型推理圖
Ray Serve 的強大之處在於能夠組合多個模型以及常規的 Python 邏輯到一個應用中。這可以透過例項化多個不同的佈署並傳遞控制來實作。
@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333
title 多模型推理圖
rectangle "請求" as node1
rectangle "負載平衡" as node2
rectangle "處理請求" as node3
node1 --> node2
node2 --> node3
@enduml圖表翻譯: 此圖示展示了客戶端傳送請求到 Ray Serve,並由 Ray Serve 將請求負載平衡到不同的佈署(佈署1 和 佈署2),每個佈署處理請求並使用不同的模型(模型1 和 模型2)進行推理。
圖表內容解密:
- 圖表展示了 Ray Serve 的負載平衡機制,將請求分配到不同的佈署。
- 每個佈署可以使用不同的模型進行推理,從而實作多模型推理圖。
Ray Serve 多模型服務模式詳解
Ray Serve 提供多種多模型服務模式,能夠滿足不同人工智慧應用場景的需求。本章節將探討三種常見的多模型服務模式:Pipeline處理、廣播處理以及條件邏輯控制,並詳細解析如何使用 Ray Serve 實作這些模式。
核心功能:繫結多個佈署
Ray Serve 的多模型推理圖形圍繞著將一個佈署的參考傳遞給另一個佈署的建構函式這一核心能力。這透過 .bind() API 實作,使得佈署之間能夠在執行時相互呼叫。
@serve.deployment
class 下游模型:
def __call__(self, 輸入: str):
return "來自下游模型的問候!"
@serve.deployment
class 驅動器:
def __init__(self, 下游):
self._下游 = 下游
async def __call__(self, *args) -> str:
return await self._下游.remote()
下游 = 下游模型.bind()
驅動器 = 驅動器.bind(下游)
內容解密:
- 定義下游模型:
下游模型類別是一個簡單的佈署,包含一個__call__方法,用於處理輸入字串。 - 定義驅動器:
驅動器類別在建構函式中接收下游模型的例項,並在__call__方法中呼叫下游模型的remote方法。 - 繫結佈署:使用
.bind()方法將下游模型和驅動器繫結,並在驅動器中傳遞下游模型的參考。
模式 1:Pipeline處理
Pipeline處理涉及按順序呼叫多個模型,每個模型的輸入依賴於前一個模型的輸出。Ray Serve 可以輕鬆實作這種Pipeline結構,每個階段作為獨立的佈署,並透過頂層的「Pipeline驅動器」協調。
@serve.deployment
class 下游模型:
def __init__(self, 值: str):
self._值 = 值
def __call__(self, 輸入: str):
return 輸入 + "|" + self._值
@serve.deployment
class Pipeline驅動器:
def __init__(self, 模型1, 模型2):
self._模型1 = 模型1
self._模型2 = 模型2
async def __call__(self, *args) -> str:
中間結果 = self._模型1.remote("輸入")
最終結果 = self._模型2.remote(中間結果)
return await 最終結果
模型1 = 下游模型.bind("值1")
模型2 = 下游模型.bind("值2")
Pipeline驅動器 = Pipeline驅動器.bind(模型1, 模型2)
內容解密:
- 定義下游模型:每個
下游模型例項在建構時接收一個特定值,並在被呼叫時將該值附加到輸入字串後。 - 定義Pipeline驅動器:
Pipeline驅動器在建構函式中接收兩個模型,並按順序呼叫它們,將第一個模型的輸出作為第二個模型的輸入。 - 繫結與執行:透過
serve runAPI 測試該Pipeline,傳送請求後傳回最終結果,例如"輸入|值1|值2"。
模式 2:廣播處理
廣播處理涉及將輸入或中間結果平行傳送到多個模型。這可以用於整合多個模型的結果,或在不同輸入上選擇最合適的模型。
@serve.deployment
class 下游模型:
def __init__(self, 值: str):
self._值 = 值
def __call__(self):
return self._值
@serve.deployment
class 廣播驅動器:
def __init__(self, 模型1, 模型2):
self._模型1 = 模型1
self._模型2 = 模型2
async def __call__(self, *args) -> str:
結果1, 結果2 = self._模型1.remote(), self._模型2.remote()
return [await 結果1, await 結果2]
模型1 = 下游模型.bind("值1")
模型2 = 下游模型.bind("值2")
廣播驅動器 = 廣播驅動器.bind(模型1, 模型2)
內容解密:
- 平行呼叫:
廣播驅動器同時呼叫兩個下游模型,並等待它們的結果。 - 結果整合:最終傳回兩個模型的輸出列表,例如
["值1", "值2"]。
模式 3:條件邏輯控制
在許多實際應用中,靜態控制流可能過於受限。Ray Serve 的多模型 API 允許嵌入自定義邏輯,因為計算圖是透過普通 Python 邏輯定義的,而非靜態圖。
import random
@serve.deployment
class 簡單模型:
def __call__(self):
if random.random() > 0.5:
return "大於0.5"
else:
return "小於或等於0.5"
簡單模型例項 = 簡單模型.bind()
內容解密:
- 隨機結果生成:該範例展示瞭如何根據隨機條件傳回不同結果,模擬實際應用中的動態邏輯控制。
使用Ray Serve建立自然語言處理API
本章節將介紹如何使用Ray Serve構建端對端的自然語言處理(NLP)線上推論管道,並將其佈署為API。我們的目標是建立一個Wikipedia摘要端點,能夠利用多個NLP模型和自定義邏輯,為給定的搜尋詞提供最相關的Wikipedia頁面摘要。
管道架構
我們的線上推論管道架構如圖 8-2所示:
- 使用者提供關鍵字搜尋詞。
- 擷取與搜尋詞最相關的Wikipedia文章內容。
- 對文章進行情感分析,任何具有「負面」情感的文章都將被拒絕,並提前傳回結果。
- 將文章內容廣播到摘要模型和命名實體辨識模型。
- 根據摘要模型和命名實體辨識模型的輸出結果,傳回組合結果。
此圖示
顯示了Wikipedia文章摘要NLP管道的架構。
環境準備
在開始之前,請確保已安裝以下Python套件:
pip install "ray[serve]==2.2.0" "transformers==4.21.2"
pip install "requests==2.28.1" "wikipedia==1.4.0"
擷取內容和預處理
首先,我們需要根據使用者提供的搜尋詞,從Wikipedia擷取最相關的頁面內容。為此,我們將使用Wikipedia套件來完成這項工作。我們將首先搜尋該詞,然後選擇第一個結果並傳回其頁面內容。如果沒有找到結果,我們將傳回None,這個邊緣情況將在稍後定義API時處理:
from typing import Optional
import wikipedia
def fetch_wikipedia_page(search_term: str) -> Optional[str]:
results = wikipedia.search(search_term)
# 如果沒有結果,傳回給呼叫者。
if len(results) == 0:
return None
# 取得第一個結果的頁面。
return wikipedia.page(results[0]).content
內容解密:
此函式使用wikipedia套件根據搜尋詞搜尋Wikipedia,並傳回最相關頁面的內容。如果沒有找到結果,則傳回None。
NLP模型
接下來,我們需要定義將為我們的API執行主要工作的機器學習模型。我們將使用Hugging Face Transformers函式庫,因為它提供了方便的API來存取預訓練的最先進的機器學習模型,這樣我們就可以專注於服務邏輯。
第一個模型是情感分類別器,與前面的範例中使用的相同。該模型的佈署將利用Serve的批次API進行向量化計算:
from ray import serve
from transformers import pipeline
from typing import List
@serve.deployment
class SentimentAnalysis:
def __init__(self):
self._classifier = pipeline("sentiment-analysis")
@serve.batch(max_batch_size=10, batch_wait_timeout_s=0.1)
async def is_positive_batched(self, inputs: List[str]) -> List[bool]:
results = self._classifier(inputs, truncation=True)
return [result["label"] == "POSITIVE" for result in results]
async def __call__(self, input_text: str) -> bool:
return await self.is_positive_batched(input_text)
內容解密:
此類別定義了一個情感分析模型,使用Hugging Face的pipeline函式載入預訓練的情感分析模型。is_positive_batched方法允許批次處理輸入文字,以提高效率。__call__方法允許例項像函式一樣被呼叫,用於進行情感分析。