隨著深度學習模型日益複雜,訓練資料量不斷增長,單機訓練的效率已難以滿足需求。Ray 作為一個新興的分散式運算框架,提供了一套完整的工具鏈,涵蓋從資料處理、模型訓練到線上推論的完整流程。本文將聚焦於 Ray 生態系中的 Ray Train 和 Ray Serve,帶領讀者瞭解如何利用這些工具構建高效的分散式機器學習應用。Ray Train 簡化了分散式訓練的流程,讓開發者無需深入瞭解分散式系統的細節,即可輕鬆擴充套件訓練規模。透過設定 scaling_config,我們可以指定工作節點數量和 GPU 使用情況,快速提升訓練速度。同時,Ray Train 也支援內建和自訂的資料前處理器,確保訓練和線上推論資料處理的一致性,避免潛在的效能差異。

分散式訓練與 Ray Train

在深度學習中,分散式訓練是一種常見的技術,用於加速模型訓練的速度。Ray Train 是一個強大的工具,允許我們輕鬆地實作分散式訓練。以下是使用 Ray Train 進行分散式訓練的步驟:

準備模型

首先,我們需要準備好模型。這涉及到初始化模型、定義損失函式和最佳化器等步驟。

model = prepare_model(model)
loss_fn = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)

定義訓練迴圈

接下來,我們需要定義訓練迴圈。這個迴圈會在每個工作者上執行,負責訓練模型。

def train_one_epoch(model, loss_fn, optimizer):
    # 訓練模型
    pass

建立 TorchTrainer

現在,我們可以建立一個 TorchTrainer 例項。TorchTrainer 需要三個引數:train_loop_per_workerdatasetsscaling_config

from ray.train.torch import TorchTrainer

trainer = TorchTrainer(
    train_loop_per_worker=distributed_training_loop,
    scaling_config=ScalingConfig(
        num_workers=2,
        use_gpu=False
    ),
    datasets={"train": train_dataset}
)

執行訓練

最後,我們可以呼叫 fit() 方法來執行訓練。

result = trainer.fit()

這樣就完成了使用 Ray Train 進行分散式訓練的基本步驟。

內容解密:

在上面的程式碼中,我們首先定義了模型、損失函式和最佳化器。然後,我們定義了訓練迴圈,負責訓練模型。在建立 TorchTrainer 例項時,我們需要指定訓練迴圈、資料集和擴充套件配置。最後,我們呼叫 fit() 方法來執行訓練。

圖表翻譯:

以下是使用 Mermaid 圖表來展示上述過程:

  flowchart TD
    A[準備模型] --> B[定義訓練迴圈]
    B --> C[建立 TorchTrainer]
    C --> D[執行訓練]
    D --> E[取得結果]

這個圖表展示了使用 Ray Train 進行分散式訓練的基本步驟。

分散式訓練與預處理

Ray Train 的設計理念是讓使用者無需考慮如何將程式碼平行化。透過指定 scaling_config,您可以在不需要撰寫分散式邏輯的情況下,對訓練進行擴充套件,並宣告式地指定計算資源。這種方法的優點在於您不需要考慮底層硬體。例如,您可以使用數百個工作者來進行訓練。

擴充套件訓練

要擴充套件訓練,您需要先初始化 Ray,然後定義一個 ScalingConfig 物件,指定工作者的數量和是否使用 GPU。接著,您可以建立一個 XGBoostTrainer 物件,傳入 scaling_config 作為引數。

import ray
from ray.train.xgboost import XGBoostTrainer

ray.init(address="auto")
scaling_config = ScalingConfig(num_workers=200, use_gpu=True)

trainer = XGBoostTrainer(
    scaling_config=scaling_config,
    # ...
)

預處理

預處理是一種常見的技術,用於將原始資料轉換為機器學習模型的特徵。Ray Train 提供了多個內建的前處理器,同時也允許您定義自己的自訂邏輯。前處理器的核心類別是 Preprocessor,每個前處理器都具有以下 API:

  • transform(): 用於對資料集進行處理和應用轉換。
  • fit(): 用於計算和儲存前處理器的聚合狀態,傳回 self 以便鏈式呼叫。
  • fit_transform(): 用於執行需要聚合狀態的轉換,可能會在實作級別上進行最佳化。
  • transform_batch(): 用於對批次資料進行相同的轉換,用於預測。

您可以使用這些前處理器來確保訓練和服務時的資料處理的一致性。這可以幫助避免訓練和服務時的效能差異。

from ray.data.preprocessors import StandardScaler
from ray.train.xgboost import XGBoostTrainer

trainer = XGBoostTrainer(
    preprocessor=StandardScaler(),
    # ...
)

result = trainer.fit()

一些預處理操作,例如 one-hot 編碼,很容易在訓練和服務時轉移。然而,其他操作,例如標準化,可能更複雜,因為您不希望在服務時進行大量的資料計算。

幸運的是,Ray Train 的前處理器是可序列化的,因此您可以輕鬆地在訓練和服務之間取得一致性。例如,您可以使用 pickle 將前處理器序列化:

import pickle
from ray.data.preprocessors import StandardScaler

# ...
preprocessor = StandardScaler()
with open("preprocessor.pkl", "wb") as f:
    pickle.dump(preprocessor, f)

這樣,您就可以在服務時載入序列化的前處理器,以確保資料處理的一致性。

整合 Ray Train 和 Ray Tune 進行超引數最佳化

Ray Train 提供了與 Ray Tune 的整合,允許您只需幾行程式碼就能進行超引數最佳化(HPO)。Tune 會為每個超引數配置建立一個試驗。在每個試驗中,將初始化一個新的 Trainer 並使用生成的配置執行訓練函式。

以下程式碼示例中,我們建立了一個 XGBoostTrainer,並為常見的超引數指定了範圍。具體而言,我們將在訓練場景中之間選擇兩種不同的前處理器。為了精確起見,我們將使用 StandardScaler,它將每個指定的列轉換和縮放(結果列將遵循標準正態分佈),以及 MinMaxScaler,它簡單地將每個列縮放到 [0, 1] 範圍。

以下是我們將要搜尋的引數空間:

import ray
from ray import tune
from ray.data.preprocessors import StandardScaler, MinMaxScaler

# 建立示例資料集
dataset = ray.data.from_items(
    [{"X": x, "Y": 1} for x in range(0, 100)] +
    [{"X": x, "Y": 0} for x in range(100, 200)]
)

# 定義兩種前處理器
prep_v1 = StandardScaler(columns=["X"])
prep_v2 = MinMaxScaler(columns=["X"])

# 定義超引數搜尋空間
param_space = {
    "scaling_config": {
        "num_workers": tune.grid_search([2, 4]),
        "resources_per_worker": {
            "CPU": 2,
            "GPU": 0,
        },
    },
    "preprocessor": tune.grid_search([prep_v1, prep_v2]),
    "params": {
        "objective": "binary:logistic",
        "tree_method": "hist",
        "eval_metric": ["logloss", "error"],
        "eta": tune.loguniform(1e-4, 1e-1),
        "subsample": tune.uniform(0.5, 1.0),
        "max_depth": tune.randint(1, 9),
    },
}

現在,我們可以建立一個 Trainer,如前所述,這次使用 XGBoostTrainer,然後將其傳遞給 Ray Tune 的 Tuner 例項,然後使用 .fit() 方法進行訓練:

from ray.train.xgboost import XGBoostTrainer
from ray.tune import Tuner

# 建立 XGBoostTrainer
trainer = XGBoostTrainer(
    dataset=dataset,
    scaling_config=param_space["scaling_config"],
    preprocessor=param_space["preprocessor"],
    params=param_space["params"],
)

# 建立 Tuner
tuner = Tuner(
    trainer,
    param_space=param_space,
)

# 進行超引數最佳化
tuner.fit()

這樣就可以使用 Ray Tune 進行超引數最佳化,並使用 XGBoostTrainer 進行模型訓練。最終,Tune 會傳回最佳的超引數配置和對應的模型效能指標。

線上推論與 Ray Serve

在前幾章中,我們探討瞭如何使用 Ray 進行資料處理、機器學習模型訓練和批次推論。但是,許多機器學習的應用案例都需要線上推論。

線上推論是指使用機器學習模型來增強 API 端點的過程,尤其是在使用者直接或間接互動的情況下。這在延遲時間很重要的情況下尤其重要,因為我們不能簡單地將模型應用於資料並在後臺提供結果。有許多實際案例表明,線上推論可以提供很多價值,例如:

  • 推薦系統:為使用者提供產品或內容推薦是一個機器學習的基本應用案例。雖然可以離線進行,但推薦系統通常可以從使用者的實時行為中受益。這需要使用最近的行為作為關鍵特徵來進行線上推論。
  • 自然語言處理:線上推論可以用於實時回答使用者的問題或提供語言翻譯等自然語言處理任務。
  • 影像識別:線上推論可以用於實時識別影像或視訊中的物體或人臉等。

Ray Serve 是一個根據 Ray 的線上推論框架,提供了一個簡單的方式來部署和管理機器學習模型。下面是一個使用 Ray Serve 進行線上推論的例子:

from ray import serve
import numpy as np

# 定義一個機器學習模型
class MyModel:
    def __init__(self):
        self.model = np.random.rand(10, 10)

    def predict(self, input_data):
        return np.dot(input_data, self.model)

# 建立一個 Ray Serve 代理
serve.init()

# 部署模型
serve.create_endpoint("my_model", MyModel)

# 定義一個 API 端點
@serve.deployment(route_prefix="/predict")
class MyDeployment:
    def __init__(self):
        self.model = MyModel()

    async def predict(self, request):
        input_data = np.array(request.json["input_data"])
        output = self.model.predict(input_data)
        return {"output": output.tolist()}

# 部署 API 端點
serve.create_deployment("my_deployment", MyDeployment)

# 測試 API 端點
import requests
input_data = np.random.rand(10)
response = requests.post("http://localhost:8000/predict", json={"input_data": input_data.tolist()})
print(response.json())

在這個例子中,我們定義了一個機器學習模型 MyModel,然後使用 Ray Serve 部署模型和 API 端點。最後,我們使用 requests 庫測試 API 端點。

圖表翻譯:

  graph LR
    A[使用者請求] --> B[API 端點]
    B --> C[Ray Serve]
    C --> D[機器學習模型]
    D --> E[預測結果]
    E --> F[使用者回應]

這個圖表顯示了使用者請求、API 端點、Ray Serve、機器學習模型、預測結果和使用者回應之間的關係。

內容解密:

在這個例子中,我們使用 Ray Serve 部署了一個機器學習模型和 API 端點。Ray Serve 提供了一個簡單的方式來部署和管理機器學習模型,讓我們可以專注於模型的開發和訓練。透過使用 Ray Serve,我們可以輕鬆地將機器學習模型部署到生產環境中,並提供實時的預測結果。

線上推論的挑戰

線上推論是指使用機器學習模型進行實時預測或分類的過程。這個過程需要低延遲和高可擴充套件性,以滿足使用者的需求。然而,線上推論也面臨著許多挑戰,例如模型的計算複雜度、資源分配和可擴充套件性等。

Ray Serve 介紹

Ray Serve是一個根據Ray的線上推論框架,提供了一個可擴充套件和高效能的平臺,用於部署和管理機器學習模型。Ray Serve的核心功能包括支援多種機器學習框架、動態資源分配和自動擴充套件等。

Ray Serve 的架構

Ray Serve的架構包括多個部署(deployment),每個部署是一個被管理的Ray演員(actor)群組。每個演員都可以處理請求,並且可以與其他演員直接通訊。Ray Serve還提供了一個HTTP代理,用於負載均衡和路由請求。

定義一個基本的HTTP端點

使用Ray Serve,可以定義一個基本的HTTP端點,用於提供機器學習模型的預測或分類功能。這個過程需要定義一個Python類別,並使用@serve.deployment裝飾器將其轉換為一個Ray Serve部署。

Ray Serve 的優點

Ray Serve的優點包括支援多種機器學習框架、動態資源分配和自動擴充套件等。這些功能使得Ray Serve成為一個強大的線上推論框架,能夠滿足使用者的需求。

  flowchart TD
    A[使用者請求] --> B[Ray Serve]
    B --> C[部署]
    C --> D[演員]
    D --> E[模型預測]
    E --> F[傳回結果]

圖表翻譯:

此圖表示Ray Serve的工作流程。使用者請求被送到Ray Serve,然後被路由到相應的部署。部署負責管理演員,演員執行模型預測並傳回結果。

內容解密:

Ray Serve是一個強大的線上推論框架,提供了一個可擴充套件和高效能的平臺,用於部署和管理機器學習模型。使用Ray Serve,可以定義一個基本的HTTP端點,用於提供機器學習模型的預測或分類功能。Ray Serve的優點包括支援多種機器學習框架、動態資源分配和自動擴充套件等。

線上推論與 Ray Serve

Ray Serve 是一個強大的框架,能夠讓你輕鬆地部署機器學習模型到線上環境中。以下是使用 Ray Serve 部署一個簡單的文字情感分析模型的範例。

基本部署

首先,我們需要定義一個類別,繼承自 serve.deployment。在這個類別中,我們可以定義模型的初始化和推論邏輯。

from ray import serve
from transformers import pipeline

@serve.deployment
class SentimentAnalysis:
    def __init__(self):
        self._classifier = pipeline("sentiment-analysis")

    def __call__(self, input_text: str) -> str:
        return self._classifier(input_text)[0]["label"]

然後,我們可以使用 serve.run 函式來啟動這個部署。

serve run app:SentimentAnalysis

這會啟動一個單一的複製品(replica),並且會在本地主機上啟動一個 HTTP 伺服器。

測試

我們可以使用 requests 函式來測試這個部署。

import requests

print(requests.get("http://localhost:8000/", params={"input_text": "Hello friend!"}).json())

這會輸出正確的結果,例如 "POSITIVE"

使用 FastAPI

Ray Serve 也可以與 FastAPI 整合,讓你可以使用 FastAPI 的強大功能來定義 API 的行為。

from fastapi import FastAPI
from ray import serve

app = FastAPI()

@serve.deployment
@serve.ingress(app)
class SentimentAnalysis:
    def __init__(self):
        self._classifier = pipeline("sentiment-analysis")

    @app.get("/")
    def classify(self, input_text: str) -> str:
        return self._classifier(input_text)[0]["label"]

這會讓你可以使用 FastAPI 的功能來定義 API 的行為,例如定義輸入引數的型別和格式。

圖表翻譯:

  graph LR
    A[Ray Serve] -->|部署|> B[模型]
    B -->|推論|> C[結果]
    C -->|輸出|> D[使用者]

這個圖表展示了 Ray Serve 的基本流程,從部署模型到輸出結果。

內容解密:

Ray Serve 的部署過程涉及到多個步驟,包括定義模型的初始化和推論邏輯,使用 serve.run 函式來啟動部署,和使用 FastAPI 的功能來定義 API 的行為。使用 Ray Serve,可以輕鬆地部署機器學習模型到線上環境中,並且可以使用 FastAPI 的功能來定義 API 的行為。

什麼是 Ray Serve?

Ray Serve 是一個根據 Ray 的開源框架,旨在簡化機器學習模型的線上推理和部署。它提供了一個簡單且高效的方式來部署和管理機器學習模型,同時支援多種資源分配策略和請求批次處理。

Ray Serve 的特點

Ray Serve 的一些主要特點包括:

  • 支援多種資源分配策略,包括 CPU、GPU、TPU 等
  • 支援動態自動擴充套件,根據請求負載動態調整模型的複製數
  • 支援請求批次處理,提高模型推理的效率和吞吐量
  • 支援多種模型格式,包括 TensorFlow、PyTorch 等

Ray Serve 的應用

Ray Serve 可以用於多種機器學習模型的線上推理和部署,包括但不限於:

  • 文字分類和情感分析
  • 影像分類和物體偵測
  • 語音識別和語音合成
  • 推薦系統和個性化

Ray Serve 的優點

Ray Serve 的一些主要優點包括:

  • 簡單且高效的模型部署和管理
  • 支援多種資源分配策略和請求批次處理
  • 高度可擴充套件和靈活的架構
  • 支援多種模型格式和框架

Ray Serve 的使用

Ray Serve 的使用包括以下步驟:

  1. 安裝 Ray Serve
  2. 建立一個 Ray Serve 的部署
  3. 定義模型和推理函式
  4. 配置資源分配策略和請求批次處理
  5. 啟動 Ray Serve 服務

以下是一個簡單的例子,展示如何使用 Ray Serve 部署一個情感分析模型:

import ray
from ray import serve
from transformers import pipeline

# 建立一個 Ray Serve 的部署
serve.start()

# 定義模型和推理函式
class SentimentAnalysis:
    def __init__(self):
        self._classifier = pipeline("sentiment-analysis")

    @serve.batch(max_batch_size=10, batch_wait_timeout_s=0.1)
    async def classify_batched(self, batched_inputs):
        results = self._classifier(batched_inputs)
        return [result["label"] for result in results]

# 配置資源分配策略和請求批次處理
@serve.deployment
class SentimentAnalysisDeployment:
    def __init__(self):
        self._sentiment_analysis = SentimentAnalysis()

    async def classify(self, input_text: str) -> str:
        return await self._sentiment_analysis.classify_batched([input_text])

# 啟動 Ray Serve 服務
sentiment_analysis_deployment = SentimentAnalysisDeployment.get_handle()
print(sentiment_analysis_deployment.classify.remote("I love this product!"))

這個例子展示瞭如何使用 Ray Serve 部署一個情感分析模型,支援請求批次處理和動態自動擴充套件。

多模型推理圖

Ray Serve 的真正強大之處在於其能夠將多個模型以及正規的 Python 邏輯組合成一個單一的應用程式。這是透過在一個部署的建構函式中傳遞對另一個部署的參照來實作的。每個這樣的部署都可以使用我們迄今為止討論的所有功能:它們可以獨立縮放,執行請求批處理,並使用靈活的資源分配。

核心功能:繫結多個部署

所有 Ray Serve 中的多模型推理圖都圍繞著將一個部署的參照傳遞給另一個部署的建構函式的能力。為此,我們使用 .bind() API 的另一個功能:一個繫結的部署可以傳遞給另一個 .bind() 呼叫,這將在執行時解析為對部署的參照。

這使得部署可以獨立部署和例項化,然後在執行時相互呼叫。以下是 Ray Serve 中最基本的多部署應用程式示例:

@serve.deployment
class DownstreamModel:
    def __call__(self, inp: str):
        return "Hi from downstream model!"

@serve.deployment
class Driver:
    def __init__(self, downstream):
        self._d = downstream

    async def __call__(self, *args) -> str:
        return await self._d.remote()

downstream = DownstreamModel.bind()
driver = Driver.bind(downstream)

在這個示例中,下游模型被傳遞給「driver」部署。然後在執行時,driver 部署呼叫下游模型。driver 可以接受任意多個模型作為輸入,下游模型甚至可以接受自己的下游模型。

模式 1:管道

多模型應用程式中最常見的模式之一是「管道」:按順序呼叫多個模型,其中一個模型的輸入取決於前一個模型的輸出。例如,影像處理通常由多個階段組成,包括裁剪、分割和物體識別或光學字元識別(OCR)。每個這些模型可能具有不同的屬性,其中一些是可以在 CPU 上執行的輕量級轉換,其他的是在 GPU 上執行的重型深度學習模型。

這樣的管道可以使用 Serve 的 API輕鬆表達。管道的每個階段都被定義為一個獨立的部署,每個部署都被傳遞給一個頂級的「管道驅動程式」。在以下示例中,我們將兩個部署傳遞給一個頂級驅動程式,驅動程式按順序呼叫它們。請注意,對驅動程式的多個請求可能正在同時發生;因此,可能會高效地飽和管道的所有階段:

@serve.deployment
class DownstreamModel:
    def __init__(self, my_val: str):
        self._my_val = my_val

    def __call__(self, inp: str):
        return inp + "|" + self._my_val

@serve.deployment
class PipelineDriver:
    def __init__(self, downstream1, downstream2):
        self._d1 = downstream1
        self._d2 = downstream2

    async def __call__(self, inp: str):
        output1 = await self._d1.remote(inp)
        output2 = await self._d2.remote(output1)
        return output2

圖表翻譯:

  graph LR
    A[Pipeline Driver] --> B[Downstream Model 1]
    B --> C[Downstream Model 2]
    C --> D[Output]

這個圖表展示了管道驅動程式如何呼叫下游模型 1,然後呼叫下游模型 2,最終產生輸出。這種管道模式允許高效地組合多個模型,從而實作複雜的機器學習工作流程。

線上推論與 Ray Serve

Ray Serve 是一個強大的框架,允許使用者輕鬆地將機器學習模型部署到線上環境中。它提供了多種模式來組織和管理模型,包括管道(Pipelining)和廣播(Broadcasting)。

從技術架構視角來看,Ray 生態圈為機器學習任務提供了一套完整高效的解決方案,涵蓋了從資料預處理、分散式訓練到線上推論的整個流程。Ray Train 簡化了分散式訓練的複雜性,讓開發者無需深入底層分散式邏輯即可輕鬆擴充套件訓練規模。其與 Ray Tune 的整合更進一步簡化了超引數調整的流程,提升了模型訓練效率。Ray Serve 則提供了一個強大的線上推論框架,支援多種部署模式,例如管道和廣播,方便管理和組合多個模型,滿足複雜的線上應用需求。然而,Ray Serve 的自動擴充套件和資源分配策略仍有改進空間,例如更精細的資源排程和更靈活的擴充套件策略。對於重視線上推論效能和可擴充套件性的企業而言,深入理解 Ray Serve 的架構和部署策略至關重要。技術團隊應關注如何最佳化模型部署和資源配置,才能充分發揮 Ray Serve 的優勢,構建高效能、可擴充套件的線上機器學習應用。未來,隨著 Ray 生態圈的持續發展,我們預見其線上上機器學習領域的應用將更加廣泛,並推動更多創新應用場景的出現。