Ray 是一個新興的分散式計算框架,能有效簡化機器學習工作流程。本文將引導讀者如何在 GCP 和 Azure 上部署 Ray 叢集,並運用 Ray AIR 進行模型訓練、調整與部署。首先,我們會介紹如何在 YAML 設定檔中定義叢集規格,包含叢集名稱、節點數量、雲端平臺設定和 SSH 連線資訊,並使用 ray up
命令部署叢集。接著,我們將深入探討 Ray AIR 的核心概念,包含 Dataset、Preprocessor、Trainer、Tuner 和 Predictor,並說明如何利用這些元件構建完整的機器學習流程。文章也會示範如何使用 Ray AIR 內建的前處理器,像是 StandardScaler,以及如何自定義前處理器來滿足特定需求。後續將會介紹如何使用 Trainer 進行模型訓練,並利用 Tuner 進行超引數調整,以最佳化模型效能。最後,我們將探討如何使用 BatchPredictor 進行批次預測,以及如何透過 Ray Serve 將訓練好的模型部署成線上服務,提供即時預測能力。
部署 Ray 叢集於其他雲端平臺
Ray 叢集可以部署在大多數主要的雲端平臺上,包括 Google Cloud Platform (GCP) 和 Microsoft Azure。以下是如何在這些平臺上部署 Ray 叢集的範例。
在 Google Cloud Platform (GCP) 上部署 Ray 叢集
要在 GCP 上部署 Ray 叢集,需要建立一個 YAML 檔案,定義叢集的設定。以下是範例設定檔:
cluster_name: my-ray-cluster
max_workers: 1
provider:
type: gcp
region: us-west1
availability_zone: us-west1-a
project_id: my-project-id
auth:
ssh_user: ubuntu
這個設定檔定義了一個名為 my-ray-cluster
的叢集,最大工作者節點數為 1,使用 GCP 的 us-west1
區域和 us-west1-a
可用性區域。同時,設定檔也指定了 SSH 使用者名稱為 ubuntu
。
在 Microsoft Azure 上部署 Ray 叢集
要在 Azure 上部署 Ray 叢集,需要建立一個 YAML 檔案,定義叢集的設定。以下是範例設定檔:
cluster_name: my-ray-cluster
max_workers: 1
provider:
type: azure
location: westus2
resource_group: my-resource-group
auth:
ssh_user: ubuntu
這個設定檔定義了一個名為 my-ray-cluster
的叢集,最大工作者節點數為 1,使用 Azure 的 westus2
位置和 my-resource-group
資源群組。同時,設定檔也指定了 SSH 使用者名稱為 ubuntu
。
部署 Ray 叢集
在建立設定檔後,可以使用 Ray 的 ray up
命令來部署叢集。以下是範例命令:
ray up my-ray-cluster.yaml
這個命令會部署名為 my-ray-cluster
的叢集,使用 my-ray-cluster.yaml
設定檔中的設定。
內容解密:
Ray 的 ray up
命令會根據設定檔中的設定,自動建立和配置叢集的節點。同時,命令也會設定叢集的安全性和網路設定。使用者可以使用 ray down
命令來刪除叢集。
圖表翻譯:
flowchart TD A[Ray 叢集設定] --> B[建立 YAML 檔案] B --> C[設定叢集名稱和最大工作者節點數] C --> D[設定雲端平臺和區域] D --> E[設定 SSH 使用者名稱] E --> F[部署 Ray 叢集] F --> G[設定叢集安全性和網路設定]
這個圖表展示了部署 Ray 叢集的流程,從建立 YAML 檔案到設定叢集安全性和網路設定。
Ray AI Runtime 入門
Ray AI Runtime(AIR)是一個統一的工具包,用於管理機器學習(ML)工作流程。它提供了一個簡單的 API,讓您可以輕鬆地建立和部署 ML 模型。Ray AIR 是根據 Ray Core 建立的,Ray Core 是一個高效能的分散式計算框架。
Ray AI Runtime 的優點
Ray AIR 有以下幾個優點:
- 統一的 API:Ray AIR 提供了一個統一的 API,讓您可以輕鬆地建立和部署 ML 模型。
- 可擴充套件性:Ray AIR 可以輕鬆地擴充套件到大型的叢集中,讓您可以處理大量的資料。
- 高效能:Ray AIR 根據 Ray Core,提供了高效能的計算能力。
- 靈活性:Ray AIR 可以輕鬆地與其他 ML 框架和工具整合。
Ray AI Runtime 的核心概念
Ray AIR 有以下幾個核心概念:
- Dataset:Ray Dataset 是一個用於儲存和管理資料的工具。
- Preprocessor:Ray Preprocessor 是一個用於預處理資料的工具。
- Trainer:Ray Trainer 是一個用於訓練 ML 模型的工具。
- Tuner:Ray Tuner 是一個用於調整超引數的工具。
- Predictor:Ray Predictor 是一個用於預測的工具。
Ray AI Runtime 的使用
Ray AIR 可以用於以下幾個場景:
- 建立和部署 ML 模型:Ray AIR 可以用於建立和部署 ML 模型。
- 資料預處理:Ray AIR 可以用於預處理資料。
- 超引數調整:Ray AIR 可以用於調整超引數。
- 預測:Ray AIR 可以用於預測。
Ray AI Runtime 的優勢
Ray AIR 有以下幾個優勢:
- 簡單的 API:Ray AIR 提供了一個簡單的 API,讓您可以輕鬆地建立和部署 ML 模型。
- 高效能:Ray AIR 根據 Ray Core,提供了高效能的計算能力。
- 靈活性:Ray AIR 可以輕鬆地與其他 ML 框架和工具整合。
使用 Ray AI Runtime 進行機器學習
Ray AI Runtime 是一個強大的機器學習框架,提供了許多預先封裝的前處理器,涵蓋了許多用例。如果您找不到所需的前處理器,可以輕鬆定義自訂前處理器。
在本例中,我們想要從 S3 儲存桶中讀取 CSV 檔案到列式資料集,然後將資料集分割成訓練和測試資料集,並定義一個 AIR 前處理器,StandardScaler,將資料集中的指定欄位標準化為均值 0 和方差 1。
import ray
from ray.data.preprocessors import StandardScaler
# 讀取 CSV 檔案
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
# 分割資料集
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.2)
test_dataset = valid_dataset.drop_columns(cols=["target"])
# 定義前處理器
preprocessor = StandardScaler(columns=["mean radius", "mean texture"])
Ray AI Runtime 提供了多種前處理器,包括特徵縮放器、通用前處理器、類別編碼器和文字編碼器。
前處理器型別 | 例子 |
---|---|
特徵縮放器 | MaxAbsScaler, MinMaxScaler, Normalizer, PowerTransformer, StandardScaler |
通用前處理器 | BatchMapper, Chain, Concatenator, SimpleImputer |
類別編碼器 | Categorizer, LabelEncoder, OneHotEncoder |
文字編碼器 | Tokenizer, FeatureHasher |
定義前處理器後,可以使用 Trainer 執行機器學習演算法。Trainer 是 Ray Train 包的一部分,提供了一個一致的包裝器,用於訓練框架,如 TensorFlow、PyTorch 或 XGBoost。
from ray.train.xgboost import XGBoostTrainer
# 定義 Trainer
trainer = XGBoostTrainer(
scaling_config=None,
label_column="target",
datasets={"train": train_dataset, "valid": valid_dataset},
preprocessor=preprocessor,
params={"num_boosting_rounds": 10}
)
Trainer 需要指定多個引數,包括:
scaling_config
: 描述如何在 Ray Cluster 上擴充套件訓練label_column
: 指定資料集中的哪一欄位用作監督學習中的標籤datasets
: 指定訓練和驗證資料集preprocessor
: 指定前處理器params
: 指定框架特定的引數
使用 Ray AI Runtime,可以輕鬆地定義和執行機器學習工作流程,從資料預處理到模型訓練和評估。
Ray AI Runtime 的 Trainer 和 Tuner
Ray AI Runtime 提供了 Trainer 和 Tuner 來實作可擴充套件的機器學習訓練和超引數調整。Trainer 可以用於訓練模型,而 Tuner 可以用於調整超引數。
Trainer
Trainer 是 Ray AI Runtime 中的一個核心概念,它提供了一種簡單的方式來訓練模型。Trainer 可以用於訓練不同的模型,包括 XGBoost、TensorFlow 等。
以下是使用 Trainer 訓練 XGBoost 模型的例子:
from ray import tune
from ray.air import ScalingConfig
from ray.air import Trainer
# 定義 Trainer 的配置
scaling_config = ScalingConfig(
num_workers=2,
use_gpu=False,
)
# 定義 Trainer 的引數
params = {
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
}
# 定義 Trainer 的資料集
datasets = {"train": train_dataset, "valid": valid_dataset}
# 建立 Trainer
trainer = Trainer(
scaling_config=scaling_config,
label_column="target",
num_boost_round=20,
params=params,
datasets=datasets,
)
# 訓練模型
result = trainer.fit()
# 印出訓練結果
print(result.metrics)
Tuner
Tuner 是 Ray AI Runtime 中的一個核心概念,它提供了一種簡單的方式來調整超引數。Tuner 可以用於調整不同的超引數,包括模型的超引數、最佳化器的超引數等。
以下是使用 Tuner 調整 XGBoost 模型的超引數的例子:
from ray import tune
from ray.tune.tuner import Tuner, TuneConfig
# 定義 Tuner 的引數空間
param_space = {"params": {"max_depth": tune.randint(1, 9)}}
# 定義 Tuner 的配置
metric = "train-logloss"
tune_config = TuneConfig(num_samples=2, metric=metric, mode="min")
# 建立 Tuner
tuner = Tuner(
trainer,
param_space=param_space,
run_config=RunConfig(verbose=1),
tune_config=tune_config,
)
# 調整超引數
result_grid = tuner.fit()
# 印出最佳結果
best_result = result_grid.get_best_result()
print("Best Result:", best_result)
使用 Ray AI Runtime 進行模型訓練和部署
Ray AI Runtime (AIR) 是一個強大的工具,允許您使用不同的機器學習框架進行模型訓練和部署。在這篇文章中,我們將探討如何使用 AIR 進行模型訓練、檢查點管理和批次預測。
模型訓練和檢查點管理
首先,我們需要定義一個模型並進行訓練。以下是使用 TensorFlow Keras 進行模型定義和訓練的例子:
import tensorflow as tf
from ray.train.tensorflow import TensorflowCheckpoint
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(1,)),
tf.keras.layers.Dense(1)
])
keras_checkpoint = TensorflowCheckpoint.from_model(model)
在訓練過程中,AIR 會自動儲存檢查點。檢查點是模型訓練過程中的快照,允許您在稍後恢復訓練。您可以使用 TensorflowCheckpoint
類別從模型中建立檢查點。
批次預測
一旦您已經訓練了模型,您就可以使用批次預測器進行預測。批次預測器是從檢查點中建立的,允許您在批次資料上進行預測。以下是使用批次預測器進行預測的例子:
from ray.train.batch_predictor import BatchPredictor
from ray.train.xgboost import XGBoostPredictor
checkpoint = best_result.checkpoint
batch_predictor = BatchPredictor.from_checkpoint(checkpoint, XGBoostPredictor)
predicted_probabilities = batch_predictor.predict(test_dataset)
在這個例子中,我們從檢查點中建立了一個批次預測器,然後使用它進行預測。
部署模型
除了使用批次預測器,您還可以使用 Ray Serve 部署模型。Ray Serve 是一個強大的工具,允許您將模型部署為 Web 服務。以下是使用 Ray Serve 部署模型的例子:
from ray import serve
from fastapi import Request
import pandas as pd
from ray.serve import PredictorDeployment
async def adapter(request: Request):
payload = await request.json()
return pd.DataFrame.from_dict(payload)
在這個例子中,我們定義了一個介面卡函式,將 JSON 資料轉換為 Pandas 資料框。然後,我們使用 Ray Serve 部署模型。
圖表翻譯:
flowchart TD A[模型訓練] --> B[檢查點管理] B --> C[批次預測] C --> D[模型部署] D --> E[Web 服務]
這個圖表顯示了模型訓練、檢查點管理、批次預測和模型部署的流程。
部署XGBoost模型到Ray Serve
在上一節中,我們已經訓練好了XGBoost模型並儲存為checkpoint。現在,我們要將這個模型部署到Ray Serve上,以便可以透過HTTP請求進行預測。
首先,我們需要啟動Ray Serve:
serve.start(detached=True)
接下來,我們建立一個 PredictorDeployment 物件,指定部署的名稱為 “XGBoostService”:
deployment = PredictorDeployment.options(name="XGBoostService")
然後,我們部署模型到Ray Serve上,傳入模型checkpoint、adapter函式和XGBoostPredictor類:
deployment.deploy(
XGBoostPredictor,
checkpoint,
http_adapter=adapter
)
部署完成後,我們可以透過 deployment.url
獲取部署的URL:
print(deployment.url)
現在,我們可以使用Requests庫向部署的URL傳送HTTP請求,進行預測。首先,我們從測試資料集中取出第一個樣本,並將其轉換為Python字典:
first_item = test_dataset.take(1)
sample_input = dict(first_item[0])
然後,我們使用Requests庫傳送POST請求到部署的URL,傳入樣本輸入:
result = requests.post(
deployment.url,
json=[sample_input]
)
最後,我們可以打印出預測結果:
print(result.json())
當我們完成使用服務後,可以安全地關閉Ray Serve:
serve.shutdown()
這樣,我們就完成了XGBoost模型的部署和預測。Ray Serve提供了一個簡單的方式將機器學習模型部署為Web服務,從而可以輕鬆地將模型整合到其他應用程式中。
人工智慧工作負載與 Ray AI Runtime
Ray AI Runtime(AIR)是一個強大的工具,能夠幫助開發者管理和部署人工智慧(AI)工作負載。AIR 支援多種工作負載,包括無狀態計算、有狀態計算、複合工作負載和線上服務。
無狀態計算
無狀態計算是指不需要維護任何狀態的計算任務,例如資料預處理或模型預測。這型別的工作負載可以使用 Ray tasks 進行計算,Ray tasks 是一個輕量級的計算單元,可以輕鬆地進行平行計算。
有狀態計算
有狀態計算是指需要維護狀態的計算任務,例如模型訓練或超引數調整。這型別的工作負載需要使用 Ray actors 進行計算,Ray actors 是一個可以維護狀態的計算單元,可以用於分散式訓練。
複合工作負載
複合工作負載是指結合無狀態計算和有狀態計算的工作負載,例如在模型訓練過程中進行資料預處理。這型別的工作負載可以使用 Ray tasks 和 Ray actors 進行計算。
線上服務
線上服務是指使用模型進行預測的工作負載,例如使用模型進行圖片分類或自然語言處理。這型別的工作負載可以使用 Ray Serve 進行計算,Ray Serve 是一個可以用於線上服務的工具,可以輕鬆地部署和管理模型。
Ray AI Runtime 的執行模型
Ray AI Runtime 的執行模型是根據 Ray tasks 和 Ray actors 的。無狀態計算使用 Ray tasks 進行計算,有狀態計算使用 Ray actors 進行計算。複合工作負載可以使用 Ray tasks 和 Ray actors 進行計算。
無狀態執行
無狀態執行使用 Ray tasks 進行計算,Ray tasks 是一個輕量級的計算單元,可以輕鬆地進行平行計算。無狀態執行可以使用 pipelining 技術,pipelining 技術是指將資料流水線化,減少資料的複製和移動。
有狀態執行
有狀態執行使用 Ray actors 進行計算,Ray actors 是一個可以維護狀態的計算單元,可以用於分散式訓練。有狀態執行需要使用 Ray actors 進行計算,Ray actors 可以維護狀態和進行計算。
第10章:開始使用Ray AI Runtime
合成工作負載執行
合成工作負載同時利用任務和演員基礎的計算。這可能會導致有趣的資源分配挑戰。試驗演員需要提前保留其資源,但無狀態任務不需要。您可能遇到的問題是,所有可用的資源都可能被您的訓練演員保留,沒有資源留給資料載入任務。
Ray AI Runtime(AIR)透過限制節點的CPU使用率來防止這種情況。您可以調整這個引數,但這是一個合理的預設值,確保基本的資源可用性供無狀態計算使用。在您的訓練操作使用GPU而資料處理步驟不使用的情況下,這不是一個問題。
線上服務執行
Ray Serve管理了一組無狀態演員來服務您的請求。一些演員監聽傳入請求並呼叫其他演員進行預測。請求使用輪詢演算法自動負載平衡到演員池。負載指標被傳送到Serve元件以執行自動擴充套件。
AIR記憶體管理
在本文中,我們將深入探討AIR的具體記憶體管理技術。我們將討論Ray物件儲存如何被AIR使用。您可以跳過本文,如果您認為它太技術化。簡而言之,Ray使用智慧技術確保您的資料和計算被正確分佈和排程。
當您使用Ray Datasets載入資料時,您已經知道這些資料集被分割成資料塊。資料塊是Ray物件的集合。選擇合適的塊大小是困難的,並且需要在管理太多小塊的開銷和風險太大塊導致記憶體不足(OOM)異常之間進行權衡。AIR採取了一種實用主義的方法,嘗試以不超過512 MB的塊大小進行分佈。如果不能保證,將發出警告。如果塊不適合記憶體,AIR將您的資料寫入本地磁碟。
您的有狀態工作負載將使用Ray物件儲存以不同的程度。例如,RLlib使用Ray物件廣播模型權重到個別的滾動工人和經驗資料收集。Tune使用它來設定試驗。由於技術原因,演員風險執行到OOM問題,如果太多記憶體相對於分配的資源被要求。
如果您提前知道記憶體需求,您可以根據需要調整ScalingConfig中的記憶體,或簡單地要求額外的CPU資源。
在合成工作負載中,有狀態演員(例如,訓練)需要訪問由無狀態任務(例如,預處理)建立的資料,這使得記憶體分配更加具有挑戰性。讓我們看兩種情景:
- 如果負責訓練的演員有足夠的空間(在物件儲存中)來適合所有訓練資料,情況很簡單。首先,預處理步驟執行,然後所有資料塊被下載到各個節點。訓練演員然後簡單地迭代記憶體中的資料。
- 否則,資料處理需要管道化執行,這意味著資料將由無狀態任務處理。如果相關的訓練演員與節點上的無狀態任務共同定位,資料將從共享記憶體中檢索。
AIR故障模型
AIR為大多數無狀態計算提供了容錯能力,透過線性重建。這意味著Ray將在節點故障的情況下重建Dataset塊,從而使工作負載可以擴充套件到大型叢集。請注意,容錯能力不適用於頭節點故障。Global Control Service(GCS)儲存叢集後設資料的崩潰將殺死叢集中的所有工作。
自動擴充套件AIR工作負載
AIR庫可以在自動擴充套件的Ray叢集上執行。對於無狀態工作負載,Ray將自動擴充套件,如果有排隊的任務(或Dataset計算演員)。對於有狀態工作負載,Ray將自動擴充套件,如果有待處理的放置群組(即Tune試驗)。Ray將在節點空閒時自動擴充套件下。節點被視為空閒,如果節點上沒有資源使用和沒有Ray物件在記憶體或磁碟上。
您應該知道,自動擴充套件可能會導致叢集中資料平衡不佳,因為早期啟動的節點自然會執行更多工。請考慮限制(例如,從最小叢集大小開始)或停用自動擴充套件,以最佳化資料密集型工作負載的效率。
使用 Ray AIR 進行資料載入和模型訓練
Ray AIR 是一個強大的工具,能夠幫助我們進行資料載入、模型訓練和部署。在這個章節中,我們將展示如何使用 Ray AIR 來載入和轉換資料,如何定義和訓練模型,如何使用 MLflow 進行超引數調整,和如何使用 Gradio 進行模型部署。
資料載入和轉換
首先,我們需要載入和轉換資料。為了示範這個過程,我們將使用 PyTorch 的 torchvision 包來載入 CIFAR-10 資料集。CIFAR-10 是一個常用的影像分類資料集,包含 32x32 的影像和 10 個類別的標籤。
import torch
from torchvision import transforms, datasets
def load_cifar(train: bool):
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
return datasets.CIFAR10(
root="./data",
download=True,
train=train,
transform=transform
)
接下來,我們需要將這個資料集轉換為 Ray Dataset。Ray Dataset 是 Ray AIR 中的一個重要概念,能夠幫助我們進行資料的載入、轉換和儲存。
from ray.data import from_torch
train_dataset = from_torch(load_cifar(train=True))
test_dataset = from_torch(load_cifar(train=False))
資料轉換和模型定義
現在,我們需要將資料轉換為模型能夠接受的格式。為了示範這個過程,我們將定義一個簡單的模型和訓練迴圈。
import numpy as np
def to_labeled_image(batch):
return {
"image": np.array([image.numpy() for image, _ in batch]),
"label": np.array([label for _, label in batch])
}
train_dataset = train_dataset.map_batches(to_labeled_image)
test_dataset = test_dataset.map_batches(to_labeled_image)
模型訓練和超引數調整
接下來,我們需要定義和訓練模型。為了示範這個過程,我們將使用 PyTorch 的 nn.Module 來定義模型,並使用 Ray Train 進行模型訓練。
import torch.nn as nn
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.fc1 = nn.Linear(32 * 32 * 3, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = x.view(-1, 32 * 32 * 3)
x = torch.relu(self.fc1(x))
x = self.fc2(x)
return x
模型部署
最後,我們需要部署模型。為了示範這個過程,我們將使用 Gradio 進行模型部署。
import gradio as gr
def deploy_model(model):
def predict(image):
image = torch.tensor(image)
output = model(image)
return torch.argmax(output)
demo = gr.Interface(
predict,
gr.Image(type="numpy"),
gr.Label(label="Prediction")
)
demo.launch()
高階第三方整合
Ray 與複雜的資料處理系統進行整合,例如 Spark on Ray(RayDP)、Dask on Ray、MARS on Ray 或 Pandas on Ray(Modin)。在「分散式 Python 框架」章節中,我們將更詳細地探討 Ray 與 Dask 或 Spark 等系統的關係。
從技術架構視角來看,Ray 提供了相當靈活且完整的機器學習訓練及部署方案。透過 Ray AIR,開發者能以簡潔的 API 管理從資料處理、模型訓練、超引數調整到部署的完整機器學習生命週期,同時支援多種機器學習框架,展現高度的整合性。然而,Ray 的自動擴充套件機制在資料密集型工作負載上,可能導致叢集資料分配不均,需要開發者關注並調整相關設定。綜合評估,Ray AIR 有效降低了分散式機器學習的門檻,其高度整合性及彈性,使其成為建構複雜 AI 應用程式值得關注的選擇。預期未來 Ray 將持續強化其生態系統,並針對不同工作負載的效能最佳化提供更精細的控制,以滿足日益增長的 AI 應用需求。對於追求高效能和可擴充套件性的機器學習應用,技術團隊應深入研究 Ray AIR 的核心概念和最佳實務,以充分發揮其潛力。