Ray 是一個新興的分散式計算框架,能有效簡化機器學習工作流程。本文將引導讀者如何在 GCP 和 Azure 上部署 Ray 叢集,並運用 Ray AIR 進行模型訓練、調整與部署。首先,我們會介紹如何在 YAML 設定檔中定義叢集規格,包含叢集名稱、節點數量、雲端平臺設定和 SSH 連線資訊,並使用 ray up 命令部署叢集。接著,我們將深入探討 Ray AIR 的核心概念,包含 Dataset、Preprocessor、Trainer、Tuner 和 Predictor,並說明如何利用這些元件構建完整的機器學習流程。文章也會示範如何使用 Ray AIR 內建的前處理器,像是 StandardScaler,以及如何自定義前處理器來滿足特定需求。後續將會介紹如何使用 Trainer 進行模型訓練,並利用 Tuner 進行超引數調整,以最佳化模型效能。最後,我們將探討如何使用 BatchPredictor 進行批次預測,以及如何透過 Ray Serve 將訓練好的模型部署成線上服務,提供即時預測能力。

部署 Ray 叢集於其他雲端平臺

Ray 叢集可以部署在大多數主要的雲端平臺上,包括 Google Cloud Platform (GCP) 和 Microsoft Azure。以下是如何在這些平臺上部署 Ray 叢集的範例。

在 Google Cloud Platform (GCP) 上部署 Ray 叢集

要在 GCP 上部署 Ray 叢集,需要建立一個 YAML 檔案,定義叢集的設定。以下是範例設定檔:

cluster_name: my-ray-cluster
max_workers: 1
provider:
  type: gcp
  region: us-west1
  availability_zone: us-west1-a
  project_id: my-project-id
auth:
  ssh_user: ubuntu

這個設定檔定義了一個名為 my-ray-cluster 的叢集,最大工作者節點數為 1,使用 GCP 的 us-west1 區域和 us-west1-a 可用性區域。同時,設定檔也指定了 SSH 使用者名稱為 ubuntu

在 Microsoft Azure 上部署 Ray 叢集

要在 Azure 上部署 Ray 叢集,需要建立一個 YAML 檔案,定義叢集的設定。以下是範例設定檔:

cluster_name: my-ray-cluster
max_workers: 1
provider:
  type: azure
  location: westus2
  resource_group: my-resource-group
auth:
  ssh_user: ubuntu

這個設定檔定義了一個名為 my-ray-cluster 的叢集,最大工作者節點數為 1,使用 Azure 的 westus2 位置和 my-resource-group 資源群組。同時,設定檔也指定了 SSH 使用者名稱為 ubuntu

部署 Ray 叢集

在建立設定檔後,可以使用 Ray 的 ray up 命令來部署叢集。以下是範例命令:

ray up my-ray-cluster.yaml

這個命令會部署名為 my-ray-cluster 的叢集,使用 my-ray-cluster.yaml 設定檔中的設定。

內容解密:

Ray 的 ray up 命令會根據設定檔中的設定,自動建立和配置叢集的節點。同時,命令也會設定叢集的安全性和網路設定。使用者可以使用 ray down 命令來刪除叢集。

圖表翻譯:

  flowchart TD
    A[Ray 叢集設定] --> B[建立 YAML 檔案]
    B --> C[設定叢集名稱和最大工作者節點數]
    C --> D[設定雲端平臺和區域]
    D --> E[設定 SSH 使用者名稱]
    E --> F[部署 Ray 叢集]
    F --> G[設定叢集安全性和網路設定]

這個圖表展示了部署 Ray 叢集的流程,從建立 YAML 檔案到設定叢集安全性和網路設定。

Ray AI Runtime 入門

Ray AI Runtime(AIR)是一個統一的工具包,用於管理機器學習(ML)工作流程。它提供了一個簡單的 API,讓您可以輕鬆地建立和部署 ML 模型。Ray AIR 是根據 Ray Core 建立的,Ray Core 是一個高效能的分散式計算框架。

Ray AI Runtime 的優點

Ray AIR 有以下幾個優點:

  • 統一的 API:Ray AIR 提供了一個統一的 API,讓您可以輕鬆地建立和部署 ML 模型。
  • 可擴充套件性:Ray AIR 可以輕鬆地擴充套件到大型的叢集中,讓您可以處理大量的資料。
  • 高效能:Ray AIR 根據 Ray Core,提供了高效能的計算能力。
  • 靈活性:Ray AIR 可以輕鬆地與其他 ML 框架和工具整合。

Ray AI Runtime 的核心概念

Ray AIR 有以下幾個核心概念:

  • Dataset:Ray Dataset 是一個用於儲存和管理資料的工具。
  • Preprocessor:Ray Preprocessor 是一個用於預處理資料的工具。
  • Trainer:Ray Trainer 是一個用於訓練 ML 模型的工具。
  • Tuner:Ray Tuner 是一個用於調整超引數的工具。
  • Predictor:Ray Predictor 是一個用於預測的工具。

Ray AI Runtime 的使用

Ray AIR 可以用於以下幾個場景:

  • 建立和部署 ML 模型:Ray AIR 可以用於建立和部署 ML 模型。
  • 資料預處理:Ray AIR 可以用於預處理資料。
  • 超引數調整:Ray AIR 可以用於調整超引數。
  • 預測:Ray AIR 可以用於預測。

Ray AI Runtime 的優勢

Ray AIR 有以下幾個優勢:

  • 簡單的 API:Ray AIR 提供了一個簡單的 API,讓您可以輕鬆地建立和部署 ML 模型。
  • 高效能:Ray AIR 根據 Ray Core,提供了高效能的計算能力。
  • 靈活性:Ray AIR 可以輕鬆地與其他 ML 框架和工具整合。

使用 Ray AI Runtime 進行機器學習

Ray AI Runtime 是一個強大的機器學習框架,提供了許多預先封裝的前處理器,涵蓋了許多用例。如果您找不到所需的前處理器,可以輕鬆定義自訂前處理器。

在本例中,我們想要從 S3 儲存桶中讀取 CSV 檔案到列式資料集,然後將資料集分割成訓練和測試資料集,並定義一個 AIR 前處理器,StandardScaler,將資料集中的指定欄位標準化為均值 0 和方差 1。

import ray
from ray.data.preprocessors import StandardScaler

# 讀取 CSV 檔案
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")

# 分割資料集
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.2)
test_dataset = valid_dataset.drop_columns(cols=["target"])

# 定義前處理器
preprocessor = StandardScaler(columns=["mean radius", "mean texture"])

Ray AI Runtime 提供了多種前處理器,包括特徵縮放器、通用前處理器、類別編碼器和文字編碼器。

前處理器型別例子
特徵縮放器MaxAbsScaler, MinMaxScaler, Normalizer, PowerTransformer, StandardScaler
通用前處理器BatchMapper, Chain, Concatenator, SimpleImputer
類別編碼器Categorizer, LabelEncoder, OneHotEncoder
文字編碼器Tokenizer, FeatureHasher

定義前處理器後,可以使用 Trainer 執行機器學習演算法。Trainer 是 Ray Train 包的一部分,提供了一個一致的包裝器,用於訓練框架,如 TensorFlow、PyTorch 或 XGBoost。

from ray.train.xgboost import XGBoostTrainer

# 定義 Trainer
trainer = XGBoostTrainer(
    scaling_config=None,
    label_column="target",
    datasets={"train": train_dataset, "valid": valid_dataset},
    preprocessor=preprocessor,
    params={"num_boosting_rounds": 10}
)

Trainer 需要指定多個引數,包括:

  • scaling_config: 描述如何在 Ray Cluster 上擴充套件訓練
  • label_column: 指定資料集中的哪一欄位用作監督學習中的標籤
  • datasets: 指定訓練和驗證資料集
  • preprocessor: 指定前處理器
  • params: 指定框架特定的引數

使用 Ray AI Runtime,可以輕鬆地定義和執行機器學習工作流程,從資料預處理到模型訓練和評估。

Ray AI Runtime 的 Trainer 和 Tuner

Ray AI Runtime 提供了 Trainer 和 Tuner 來實作可擴充套件的機器學習訓練和超引數調整。Trainer 可以用於訓練模型,而 Tuner 可以用於調整超引數。

Trainer

Trainer 是 Ray AI Runtime 中的一個核心概念,它提供了一種簡單的方式來訓練模型。Trainer 可以用於訓練不同的模型,包括 XGBoost、TensorFlow 等。

以下是使用 Trainer 訓練 XGBoost 模型的例子:

from ray import tune
from ray.air import ScalingConfig
from ray.air import Trainer

# 定義 Trainer 的配置
scaling_config = ScalingConfig(
    num_workers=2,
    use_gpu=False,
)

# 定義 Trainer 的引數
params = {
    "objective": "binary:logistic",
    "eval_metric": ["logloss", "error"],
}

# 定義 Trainer 的資料集
datasets = {"train": train_dataset, "valid": valid_dataset}

# 建立 Trainer
trainer = Trainer(
    scaling_config=scaling_config,
    label_column="target",
    num_boost_round=20,
    params=params,
    datasets=datasets,
)

# 訓練模型
result = trainer.fit()

# 印出訓練結果
print(result.metrics)

Tuner

Tuner 是 Ray AI Runtime 中的一個核心概念,它提供了一種簡單的方式來調整超引數。Tuner 可以用於調整不同的超引數,包括模型的超引數、最佳化器的超引數等。

以下是使用 Tuner 調整 XGBoost 模型的超引數的例子:

from ray import tune
from ray.tune.tuner import Tuner, TuneConfig

# 定義 Tuner 的引數空間
param_space = {"params": {"max_depth": tune.randint(1, 9)}}

# 定義 Tuner 的配置
metric = "train-logloss"
tune_config = TuneConfig(num_samples=2, metric=metric, mode="min")

# 建立 Tuner
tuner = Tuner(
    trainer,
    param_space=param_space,
    run_config=RunConfig(verbose=1),
    tune_config=tune_config,
)

# 調整超引數
result_grid = tuner.fit()

# 印出最佳結果
best_result = result_grid.get_best_result()
print("Best Result:", best_result)

使用 Ray AI Runtime 進行模型訓練和部署

Ray AI Runtime (AIR) 是一個強大的工具,允許您使用不同的機器學習框架進行模型訓練和部署。在這篇文章中,我們將探討如何使用 AIR 進行模型訓練、檢查點管理和批次預測。

模型訓練和檢查點管理

首先,我們需要定義一個模型並進行訓練。以下是使用 TensorFlow Keras 進行模型定義和訓練的例子:

import tensorflow as tf
from ray.train.tensorflow import TensorflowCheckpoint

model = tf.keras.Sequential([
    tf.keras.layers.InputLayer(input_shape=(1,)),
    tf.keras.layers.Dense(1)
])

keras_checkpoint = TensorflowCheckpoint.from_model(model)

在訓練過程中,AIR 會自動儲存檢查點。檢查點是模型訓練過程中的快照,允許您在稍後恢復訓練。您可以使用 TensorflowCheckpoint 類別從模型中建立檢查點。

批次預測

一旦您已經訓練了模型,您就可以使用批次預測器進行預測。批次預測器是從檢查點中建立的,允許您在批次資料上進行預測。以下是使用批次預測器進行預測的例子:

from ray.train.batch_predictor import BatchPredictor
from ray.train.xgboost import XGBoostPredictor

checkpoint = best_result.checkpoint
batch_predictor = BatchPredictor.from_checkpoint(checkpoint, XGBoostPredictor)
predicted_probabilities = batch_predictor.predict(test_dataset)

在這個例子中,我們從檢查點中建立了一個批次預測器,然後使用它進行預測。

部署模型

除了使用批次預測器,您還可以使用 Ray Serve 部署模型。Ray Serve 是一個強大的工具,允許您將模型部署為 Web 服務。以下是使用 Ray Serve 部署模型的例子:

from ray import serve
from fastapi import Request
import pandas as pd
from ray.serve import PredictorDeployment

async def adapter(request: Request):
    payload = await request.json()
    return pd.DataFrame.from_dict(payload)

在這個例子中,我們定義了一個介面卡函式,將 JSON 資料轉換為 Pandas 資料框。然後,我們使用 Ray Serve 部署模型。

圖表翻譯:

  flowchart TD
    A[模型訓練] --> B[檢查點管理]
    B --> C[批次預測]
    C --> D[模型部署]
    D --> E[Web 服務]

這個圖表顯示了模型訓練、檢查點管理、批次預測和模型部署的流程。

部署XGBoost模型到Ray Serve

在上一節中,我們已經訓練好了XGBoost模型並儲存為checkpoint。現在,我們要將這個模型部署到Ray Serve上,以便可以透過HTTP請求進行預測。

首先,我們需要啟動Ray Serve:

serve.start(detached=True)

接下來,我們建立一個 PredictorDeployment 物件,指定部署的名稱為 “XGBoostService”:

deployment = PredictorDeployment.options(name="XGBoostService")

然後,我們部署模型到Ray Serve上,傳入模型checkpoint、adapter函式和XGBoostPredictor類:

deployment.deploy(
    XGBoostPredictor,
    checkpoint,
    http_adapter=adapter
)

部署完成後,我們可以透過 deployment.url 獲取部署的URL:

print(deployment.url)

現在,我們可以使用Requests庫向部署的URL傳送HTTP請求,進行預測。首先,我們從測試資料集中取出第一個樣本,並將其轉換為Python字典:

first_item = test_dataset.take(1)
sample_input = dict(first_item[0])

然後,我們使用Requests庫傳送POST請求到部署的URL,傳入樣本輸入:

result = requests.post(
    deployment.url,
    json=[sample_input]
)

最後,我們可以打印出預測結果:

print(result.json())

當我們完成使用服務後,可以安全地關閉Ray Serve:

serve.shutdown()

這樣,我們就完成了XGBoost模型的部署和預測。Ray Serve提供了一個簡單的方式將機器學習模型部署為Web服務,從而可以輕鬆地將模型整合到其他應用程式中。

人工智慧工作負載與 Ray AI Runtime

Ray AI Runtime(AIR)是一個強大的工具,能夠幫助開發者管理和部署人工智慧(AI)工作負載。AIR 支援多種工作負載,包括無狀態計算、有狀態計算、複合工作負載和線上服務。

無狀態計算

無狀態計算是指不需要維護任何狀態的計算任務,例如資料預處理或模型預測。這型別的工作負載可以使用 Ray tasks 進行計算,Ray tasks 是一個輕量級的計算單元,可以輕鬆地進行平行計算。

有狀態計算

有狀態計算是指需要維護狀態的計算任務,例如模型訓練或超引數調整。這型別的工作負載需要使用 Ray actors 進行計算,Ray actors 是一個可以維護狀態的計算單元,可以用於分散式訓練。

複合工作負載

複合工作負載是指結合無狀態計算和有狀態計算的工作負載,例如在模型訓練過程中進行資料預處理。這型別的工作負載可以使用 Ray tasks 和 Ray actors 進行計算。

線上服務

線上服務是指使用模型進行預測的工作負載,例如使用模型進行圖片分類或自然語言處理。這型別的工作負載可以使用 Ray Serve 進行計算,Ray Serve 是一個可以用於線上服務的工具,可以輕鬆地部署和管理模型。

Ray AI Runtime 的執行模型

Ray AI Runtime 的執行模型是根據 Ray tasks 和 Ray actors 的。無狀態計算使用 Ray tasks 進行計算,有狀態計算使用 Ray actors 進行計算。複合工作負載可以使用 Ray tasks 和 Ray actors 進行計算。

無狀態執行

無狀態執行使用 Ray tasks 進行計算,Ray tasks 是一個輕量級的計算單元,可以輕鬆地進行平行計算。無狀態執行可以使用 pipelining 技術,pipelining 技術是指將資料流水線化,減少資料的複製和移動。

有狀態執行

有狀態執行使用 Ray actors 進行計算,Ray actors 是一個可以維護狀態的計算單元,可以用於分散式訓練。有狀態執行需要使用 Ray actors 進行計算,Ray actors 可以維護狀態和進行計算。

第10章:開始使用Ray AI Runtime

合成工作負載執行

合成工作負載同時利用任務和演員基礎的計算。這可能會導致有趣的資源分配挑戰。試驗演員需要提前保留其資源,但無狀態任務不需要。您可能遇到的問題是,所有可用的資源都可能被您的訓練演員保留,沒有資源留給資料載入任務。

Ray AI Runtime(AIR)透過限制節點的CPU使用率來防止這種情況。您可以調整這個引數,但這是一個合理的預設值,確保基本的資源可用性供無狀態計算使用。在您的訓練操作使用GPU而資料處理步驟不使用的情況下,這不是一個問題。

線上服務執行

Ray Serve管理了一組無狀態演員來服務您的請求。一些演員監聽傳入請求並呼叫其他演員進行預測。請求使用輪詢演算法自動負載平衡到演員池。負載指標被傳送到Serve元件以執行自動擴充套件。

AIR記憶體管理

在本文中,我們將深入探討AIR的具體記憶體管理技術。我們將討論Ray物件儲存如何被AIR使用。您可以跳過本文,如果您認為它太技術化。簡而言之,Ray使用智慧技術確保您的資料和計算被正確分佈和排程。

當您使用Ray Datasets載入資料時,您已經知道這些資料集被分割成資料塊。資料塊是Ray物件的集合。選擇合適的塊大小是困難的,並且需要在管理太多小塊的開銷和風險太大塊導致記憶體不足(OOM)異常之間進行權衡。AIR採取了一種實用主義的方法,嘗試以不超過512 MB的塊大小進行分佈。如果不能保證,將發出警告。如果塊不適合記憶體,AIR將您的資料寫入本地磁碟。

您的有狀態工作負載將使用Ray物件儲存以不同的程度。例如,RLlib使用Ray物件廣播模型權重到個別的滾動工人和經驗資料收集。Tune使用它來設定試驗。由於技術原因,演員風險執行到OOM問題,如果太多記憶體相對於分配的資源被要求。

如果您提前知道記憶體需求,您可以根據需要調整ScalingConfig中的記憶體,或簡單地要求額外的CPU資源。

在合成工作負載中,有狀態演員(例如,訓練)需要訪問由無狀態任務(例如,預處理)建立的資料,這使得記憶體分配更加具有挑戰性。讓我們看兩種情景:

  • 如果負責訓練的演員有足夠的空間(在物件儲存中)來適合所有訓練資料,情況很簡單。首先,預處理步驟執行,然後所有資料塊被下載到各個節點。訓練演員然後簡單地迭代記憶體中的資料。
  • 否則,資料處理需要管道化執行,這意味著資料將由無狀態任務處理。如果相關的訓練演員與節點上的無狀態任務共同定位,資料將從共享記憶體中檢索。

AIR故障模型

AIR為大多數無狀態計算提供了容錯能力,透過線性重建。這意味著Ray將在節點故障的情況下重建Dataset塊,從而使工作負載可以擴充套件到大型叢集。請注意,容錯能力不適用於頭節點故障。Global Control Service(GCS)儲存叢集後設資料的崩潰將殺死叢集中的所有工作。

自動擴充套件AIR工作負載

AIR庫可以在自動擴充套件的Ray叢集上執行。對於無狀態工作負載,Ray將自動擴充套件,如果有排隊的任務(或Dataset計算演員)。對於有狀態工作負載,Ray將自動擴充套件,如果有待處理的放置群組(即Tune試驗)。Ray將在節點空閒時自動擴充套件下。節點被視為空閒,如果節點上沒有資源使用和沒有Ray物件在記憶體或磁碟上。

您應該知道,自動擴充套件可能會導致叢集中資料平衡不佳,因為早期啟動的節點自然會執行更多工。請考慮限制(例如,從最小叢集大小開始)或停用自動擴充套件,以最佳化資料密集型工作負載的效率。

使用 Ray AIR 進行資料載入和模型訓練

Ray AIR 是一個強大的工具,能夠幫助我們進行資料載入、模型訓練和部署。在這個章節中,我們將展示如何使用 Ray AIR 來載入和轉換資料,如何定義和訓練模型,如何使用 MLflow 進行超引數調整,和如何使用 Gradio 進行模型部署。

資料載入和轉換

首先,我們需要載入和轉換資料。為了示範這個過程,我們將使用 PyTorch 的 torchvision 包來載入 CIFAR-10 資料集。CIFAR-10 是一個常用的影像分類資料集,包含 32x32 的影像和 10 個類別的標籤。

import torch
from torchvision import transforms, datasets

def load_cifar(train: bool):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])

    return datasets.CIFAR10(
        root="./data",
        download=True,
        train=train,
        transform=transform
    )

接下來,我們需要將這個資料集轉換為 Ray Dataset。Ray Dataset 是 Ray AIR 中的一個重要概念,能夠幫助我們進行資料的載入、轉換和儲存。

from ray.data import from_torch

train_dataset = from_torch(load_cifar(train=True))
test_dataset = from_torch(load_cifar(train=False))

資料轉換和模型定義

現在,我們需要將資料轉換為模型能夠接受的格式。為了示範這個過程,我們將定義一個簡單的模型和訓練迴圈。

import numpy as np

def to_labeled_image(batch):
    return {
        "image": np.array([image.numpy() for image, _ in batch]),
        "label": np.array([label for _, label in batch])
    }

train_dataset = train_dataset.map_batches(to_labeled_image)
test_dataset = test_dataset.map_batches(to_labeled_image)

模型訓練和超引數調整

接下來,我們需要定義和訓練模型。為了示範這個過程,我們將使用 PyTorch 的 nn.Module 來定義模型,並使用 Ray Train 進行模型訓練。

import torch.nn as nn

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(32 * 32 * 3, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = x.view(-1, 32 * 32 * 3)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

模型部署

最後,我們需要部署模型。為了示範這個過程,我們將使用 Gradio 進行模型部署。

import gradio as gr

def deploy_model(model):
    def predict(image):
        image = torch.tensor(image)
        output = model(image)
        return torch.argmax(output)

    demo = gr.Interface(
        predict,
        gr.Image(type="numpy"),
        gr.Label(label="Prediction")
    )

    demo.launch()

高階第三方整合

Ray 與複雜的資料處理系統進行整合,例如 Spark on Ray(RayDP)、Dask on Ray、MARS on Ray 或 Pandas on Ray(Modin)。在「分散式 Python 框架」章節中,我們將更詳細地探討 Ray 與 Dask 或 Spark 等系統的關係。

從技術架構視角來看,Ray 提供了相當靈活且完整的機器學習訓練及部署方案。透過 Ray AIR,開發者能以簡潔的 API 管理從資料處理、模型訓練、超引數調整到部署的完整機器學習生命週期,同時支援多種機器學習框架,展現高度的整合性。然而,Ray 的自動擴充套件機制在資料密集型工作負載上,可能導致叢集資料分配不均,需要開發者關注並調整相關設定。綜合評估,Ray AIR 有效降低了分散式機器學習的門檻,其高度整合性及彈性,使其成為建構複雜 AI 應用程式值得關注的選擇。預期未來 Ray 將持續強化其生態系統,並針對不同工作負載的效能最佳化提供更精細的控制,以滿足日益增長的 AI 應用需求。對於追求高效能和可擴充套件性的機器學習應用,技術團隊應深入研究 Ray AIR 的核心概念和最佳實務,以充分發揮其潛力。