Ray Datasets 提供了一種在機器學習流程中最佳化資料處理和模型訓練的有效方法。藉由 DatasetPipeline 功能,可以將資料讀取、預處理和推斷等階段Pipeline化,在多個區塊的視窗中平行執行,從而最大限度地減少資源閒置並提高整體效率。這對於處理大量資料和執行計算密集型操作特別有用。此外,Ray Datasets 還支援在多個平行訓練任務之間分享記憶體中的資料,簡化了超引數調優等過程,有助於提升分散式機器學習訓練的效率。它與 Dask 等外部函式庫的整合,更進一步擴充套件了資料處理能力,為構建更複雜的機器學習流程提供了堅實基礎。
使用 Ray Datasets 最佳化資料處理與機器學習訓練
在現代機器學習工作流程中,資料處理和模型訓練是兩個非常重要的階段。Ray Datasets 提供了一種高效的方式來處理這些任務,特別是在分散式環境中。本文將探討如何使用 Ray Datasets 來最佳化資料處理流程,並展示如何將其應用於分散式機器學習訓練。
資料處理的最佳化
在傳統的資料處理流程中,各個階段(如資料讀取、預處理、推斷等)通常是順序執行的,這可能導致資源閒置和效率低下。如圖 6-1 所示,這種順序執行方式會在階段之間產生閒置資源。
圖 6-1:傳統的資料處理流程導致階段間資源閒置
為了最佳化這一流程,我們可以使用 Ray Datasets 的 DatasetPipeline 功能,將資料處理流程進行Pipeline化(pipelining),使得各個階段能夠重疊執行,如圖 6-2 所示。
圖 6-2:最佳化後的 DatasetPipeline 減少了階段間的資源閒置
透過使用 ds.window() 方法,可以將 Dataset 轉換為 DatasetPipeline,並指定每個視窗的區塊數量(blocks_per_window)。這樣,各個階段可以在五個區塊的視窗中平行執行,從而減少資源閒置,提高整體效率。
ds = (ray.data.read_parquet("s3://my_bucket/input_data")
.window(blocks_per_window=5)
.map(cpu_intensive_preprocessing)
.map_batches(gpu_intensive_inference, compute="actors", num_gpus=1)
.repartition(10))
ds.write_parquet("s3://my_bucket/output_predictions")
內容解密:
ray.data.read_parquet("s3://my_bucket/input_data"):從指定的 S3 儲存桶中讀取 Parquet 格式的資料。.window(blocks_per_window=5):將資料集轉換為 DatasetPipeline,並設定每個視窗包含 5 個區塊。.map(cpu_intensive_preprocessing):對資料進行 CPU 密集型的預處理操作。.map_batches(gpu_intensive_inference, compute="actors", num_gpus=1):使用 GPU 加速推斷操作,每批次資料將被分配到具有一個 GPU 的 actor 上進行處理。.repartition(10):將資料重新分割槽為 10 個部分,以便於後續處理或輸出。ds.write_parquet("s3://my_bucket/output_predictions"):將處理後的資料寫入指定的 S3 儲存桶中,以 Parquet 格式儲存。
分散式機器學習訓練
在機器學習訓練中,超引數調優是一個常見的需求。Ray Datasets 可以輕鬆地在多個平行訓練任務之間分享記憶體中的資料,從而提高訓練效率。
圖 6-3:使用 ray.data 包建立 Ray Dataset
透過使用 Ray Datasets,我們可以實作分散式訓練的多個副本,每個副本使用不同的超引數進行訓練。下面是一個示例,展示瞭如何使用 Ray Datasets 進行分散式訓練。
首先,定義一個 TrainingWorker 類別,該類別將負責在一份資料上訓練一個分類別器模型。
from sklearn import datasets
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import train_test_split
@ray.remote
class TrainingWorker:
# 類別的實作細節
內容解密:
@ray.remote:將TrainingWorker類別標記為一個可以在 Ray 分散式叢集中遠端執行的 actor。- 使用 scikit-learn 的
SGDClassifier演算法進行模型訓練。 - 超引數調優是透過調整分類別器的正則化項來實作的。
使用Ray進行分散式機器學習訓練
簡介
本文將介紹如何使用Ray進行分散式機器學習訓練。Ray是一個用於分散式計算的Python函式庫,能夠輕鬆地擴充套件到多台機器上。
TrainingWorker類別實作
def __init__(self, alpha: float):
self._model = SGDClassifier(alpha=alpha)
def train(self, train_shard: ray.data.Dataset):
for i, epoch in enumerate(train_shard.iter_epochs()):
X, Y = zip(*list(epoch.iter_rows()))
self._model.partial_fit(X, Y, classes=[0, 1])
return self._model
def test(self, X_test: np.ndarray, Y_test: np.ndarray):
return self._model.score(X_test, Y_test)
程式碼解密:
- 類別初始化:
__init__方法初始化了一個SGDClassifier模型,引數alpha控制了模型的正則化強度。 - 訓練方法:
train方法接收一個ray.data.Dataset物件,並對其進行迭代訓練。在每個迭代中,資料被分割成輸入特徵X和目標變數Y,然後用於模型的partial_fit方法進行訓練。 - 測試方法:
test方法用於評估訓練好的模型在測試資料集上的表現,傳回模型的準確率。
分散式訓練流程
ALPHA_VALS = [0.00008, 0.00009, 0.0001, 0.00011, 0.00012]
workers = [TrainingWorker.remote(alpha) for alpha in ALPHA_VALS]
X_train, X_test, Y_train, Y_test = train_test_split(*datasets.make_classification())
train_ds = ray.data.from_items(list(zip(X_train, Y_train)))
shards = (train_ds.repeat(10)
.random_shuffle_each_window()
.split(len(workers), locality_hints=workers))
ray.get([worker.train.remote(shard) for worker, shard in zip(workers, shards)])
程式碼解密:
- 初始化多個TrainingWorker例項:使用不同的
alpha值建立多個TrainingWorker例項。 - 生成訓練和測試資料:使用
train_test_split函式生成訓練和測試資料集。 - 建立DatasetPipeline:將訓練資料轉換為
ray.data.Dataset,並使用.repeat()建立一個DatasetPipeline,以定義訓練的迭代次數。 - 隨機洗牌和分割資料:對資料進行隨機洗牌,並將其分割成多個小資料集,分別分配給不同的worker。
- 分散式訓練:對每個worker呼叫
train方法,並傳入對應的資料分片。
模型評估
print(ray.get([worker.test.remote(X_test, Y_test) for worker in workers]))
程式碼解密:
- 遠端呼叫測試方法:對每個worker呼叫
test方法,評估模型在測試資料集上的表現。 - 取得評估結果:使用
ray.get()取得所有worker的測試結果。
Ray Datasets與外部函式庫整合
Ray Datasets支援與多個外部函式庫整合,例如Dask、RayDP、Modin等,以實作更豐富的資料處理功能。
Dask on Ray例項
import ray
from ray.util.dask import enable_dask_on_ray
ray.init()
enable_dask_on_ray()
程式碼解密:
- 初始化Ray:啟動Ray叢集或連線到現有的叢集。
- 啟用Dask on Ray:透過呼叫
enable_dask_on_ray(),使Dask能夠使用Ray作為其排程器後端。
在Ray上使用Dask DataFrames進行分散式資料處理
現在我們可以執行常規的Dask DataFrames程式碼,並將其擴充套件到Ray叢集上。例如,我們可能想要使用標準的DataFrame操作(如篩選和分組)進行時間序列分析,並計算標準差(範例取自Dask檔案):
import dask
df = dask.datasets.timeseries()
df = df[df.y > 0].groupby("name").x.std()
df.compute() # 觸發任務圖的評估。
內容解密:
這段程式碼首先匯入dask函式庫,然後使用dask.datasets.timeseries()生成一個範例時間序列資料集。接著,它篩選出y大於0的資料,並按name欄位分組,計算每個組別中x欄位的標準差。最後,呼叫compute()方法觸發任務圖的執行,得到最終結果。
如果你習慣使用Pandas或其他DataFrame函式庫,你可能會疑惑為什麼需要呼叫df.compute()。這是因為Dask預設是懶惰執行的,只有在需要結果時才會計算,這使得它能夠最佳化跨叢集執行的任務圖。
Dask與Ray Datasets的整合
Dask在Ray上的強大之處在於它與Ray Datasets的整合非常順暢。我們可以使用內建的工具將Ray Dataset轉換為Dask DataFrame,反之亦然:
import ray
ds = ray.data.range(10000)
# 將Dataset轉換為Dask DataFrame。
df = ds.to_dask()
print(df.std().compute()) # -> 2886.89568
# 將Dask DataFrame轉換回Dataset。
ds = ray.data.from_dask(df)
print(ds.std()) # -> 2886.89568
內容解密:
這段程式碼展示瞭如何在Ray Dataset和Dask DataFrame之間進行轉換。首先,它建立了一個包含10000個元素的Ray Dataset,然後將其轉換為Dask DataFrame,並計算標準差。接著,它將Dask DataFrame轉換回Ray Dataset,並再次計算標準差。
這個簡單的例子可能看起來並不令人印象深刻,因為我們可以使用Dask DataFrames或Ray Datasets計算標準差。然而,當我們在下一節建立端對端的機器學習流程時,你將會看到這使得強大的工作流程成為可能。
建立機器學習流程
雖然我們在上一節中能夠從頭開始建立一個簡單的分散式訓練應用程式,但要建立一個真實世界的應用程式,還有很多邊緣情況、效能最佳化機會和可用性功能需要處理。Ray擁有豐富的函式庫生態系統,使我們能夠建立生產就緒的機器學習應用程式。
在本文中,我們將探討如何使用Datasets作為“膠水層”來建立一個端對端的機器學習流程。
機器學習流程的挑戰
要成功地將機器學習模型投入生產,我們需要使用標準的ETL流程收集和編目資料。然而,這還不是故事的全部:為了訓練模型,我們通常還需要在將資料輸入訓練過程之前進行特徵工程,而我們如何將資料輸入訓練過程會強烈影響成本和效能。在訓練模型之後,我們還需要在許多不同的資料集上執行推論——畢竟,這就是訓練模型的全部意義!
雖然這看起來似乎只是一系列步驟,但在實踐中,機器學習的資料處理工作流程是一個迭代的實驗過程,用於定義正確的特徵集並在其上訓練高效能模型。高效地載入、轉換和將資料輸入訓練和推論對於效能至關重要,這直接轉化為計算密集型模型的成本。
使用Ray,我們能夠將完整的機器學習流程建立為單一應用程式,可以作為單一Python指令碼執行,如圖6-6所示。豐富的內建和第三方函式庫生態系統使得根據給定的使用案例混合和匹配正確的功能成為可能,並建立可擴充套件的、生產就緒的流程。重要的是,Ray Datasets充當了膠水層,能夠高效地載入、預處理和計算資料,同時避免了昂貴的序列化成本,並將中間資料保留在分享記憶體中。
圖表翻譯:
圖6-6展示了典型的機器學習工作流程的簡化版本,以及Ray在該流程中的位置。機器學習的多個步驟通常需要迭代;沒有Ray,這意味著要為一個端對端的流程拼接許多獨立的系統。Ray充當了統一的計算層,使得大部分工作流程能夠作為單一應用程式執行。
分散式訓練的基礎與 Ray Train 介紹
在前一章中,我們探討瞭如何使用 Ray Datasets 在資料分片上訓練簡單模型的副本,但分散式訓練遠不止於此。正如我們在第一章中所指出的,Ray 有一個專門用於分散式訓練的函式庫,稱為 Ray Train。它具有廣泛的機器學習訓練整合,能夠在 Ray 叢集上無縫地擴充套件您的實驗。
本章將首先說明為什麼需要擴充套件您的機器學習訓練,接著介紹不同的擴充套件方法。然後,我們將介紹 Ray Train,並透過一個完整的範例進行詳細說明。同時,我們還將涵蓋使用 Ray Train 所需瞭解的一些關鍵概念,例如前處理器、訓練器和檢查點。最後,我們將介紹 Ray Train 提供的一些進階功能。
分散式模型訓練的基礎
機器學習通常需要大量的計算。根據您所訓練的模型型別,無論是梯度提升樹還是神經網路,您都可能面臨一些訓練機器學習模型的常見問題:
- 完成訓練所需的時間太長。
- 資料太大,無法放入一台機器中。
- 模型本身太大,無法放入單一機器。
對於第一種情況,可以透過增加處理資料的吞吐量來加速訓練。一些機器學習演算法,例如神經網路,可以平行化部分計算以加快訓練速度。
在第二種情況下,您的演算法選擇可能需要將資料集中的所有可用資料放入記憶體,但單一節點的記憶體可能不足。如果是這種情況,您需要將資料分散到多個節點,並以分散式方式進行訓練。另一方面,有時您的演算法可能不需要分散資料,但如果您一開始就使用分散式資料函式庫系統,您仍然希望有一個能夠利用分散式資料的訓練框架。
當您的模型無法放入單一機器時,您可能需要將其拆分成多個部分,分散在多台機器上。這種跨多台機器拆分模型的方法稱為模型平行性。要遇到這個問題,您首先需要一個足夠大的模型,無法放入單一機器。通常,像 Google 或 Meta 這樣的大公司需要模型平行性,並且它們也依賴於內部解決方案來處理分散式訓練。
前兩個問題通常比第三個問題更早出現在機器學習開發中。我們剛才為這些問題所提出的解決方案屬於資料平行訓練的範疇。您不是將模型分散在多台機器上,而是依靠分散式資料來加速訓練。
具體來說,對於第一個問題,如果您可以加快訓練過程,希望在準確度上損失最小或沒有損失,並且可以以成本效益高的方式實作,為什麼不去做呢?如果您有分散式資料,無論是出於演算法的必要性還是儲存資料的方式,您都需要一個能夠處理它的訓練解決方案。正如您將看到的,Ray Train 是為高效的資料平行訓練而構建的。圖 7-1 總結了分散式訓練中的兩種基本型別。
透過範例介紹 Ray Train
Ray Train 是用於在 Ray 上進行分散式資料平行訓練的函式庫。它為訓練工作流程的不同部分提供了關鍵工具,從特徵處理到可擴充套件的訓練,再到與機器學習追蹤工具的整合,以及模型的匯出機制。
在基本的機器學習訓練流程中,您將使用以下 Ray Train 的關鍵元件:
- 訓練器(Trainers):Ray Train 有多個 Trainer 類別,使得進行分散式訓練成為可能。Trainers 是圍繞第三方訓練框架(如 XGBoost、PyTorch 和 TensorFlow)的包裝類別,提供與核心 Ray 角色(用於分佈)、Ray Tune 和 Ray Datasets 的整合。
- 預測器(Predictors):一旦您有了訓練好的模型,您就可以使用它來取得預測結果。對於批次輸入資料,您使用所謂的批次預測器,它們也用於評估模型在驗證集上的效能。
- 前處理器(Preprocessors):Ray Train 提供了幾個常見的 Preprocessor 物件和實用工具,用於將資料集物件處理成可供 Trainers 使用的特徵。
- 檢查點(Checkpoint):Ray Train 提供了一個 Checkpoint 類別,允許您儲存和還原訓練運作的狀態。
讓我們透過一個完整的 Ray Train 範例,將這些元件付諸實踐。首先,我們將利用第 6 章的知識,大量使用 Ray Datasets 來載入訓練資料。
使用 Ray 預測紐約市計程車大筆小費
本文將透過一個實際的、端對端的範例,展示如何使用 Ray 建立深度學習流程。我們將建立一個二元分類別模型,以預測一次計程車行程是否會產生大筆小費(> 車費的 20%),使用公開的紐約市計程車和豪華轎車委員會(TLC)行程記錄資料。我們的流程將與典型的機器學習從業者的流程非常相似:
- 載入資料,進行一些基本的預處理,並計算我們將在模型中使用的特徵。
- 定義一個神經網路,並使用分散式資料平行訓練進行訓練。
載入與預處理資料
首先,我們需要載入紐約市計程車資料集,並進行必要的預處理。這包括選擇相關特徵、處理缺失值和轉換資料型別等步驟。
import ray
from ray import train
from ray.train import Trainer
from ray.data import Dataset
# 初始化 Ray
ray.init()
# 載入資料集
dataset = ray.data.read_csv("nyc_taxi_data.csv")
# 資料預處理步驟
def preprocess_data(dataset: Dataset) -> Dataset:
# 選擇相關特徵
dataset = dataset.select_columns(["feature1", "feature2", "label"])
# 處理缺失值
dataset = dataset.fillna({"feature1": 0, "feature2": 0})
# 轉換資料型別
dataset = dataset.map_batches(lambda df: df.astype({"feature1": "float64", "feature2": "float64"}))
return dataset
dataset = preprocess_data(dataset)
定義神經網路與分散式訓練
接下來,我們定義一個簡單的神經網路,並使用 Ray Train 進行分散式訓練。
import torch
import torch.nn as nn
from ray.train.torch import TorchTrainer
# 定義神經網路模型
class NeuralNetwork(nn.Module):
def __init__(self):
super(NeuralNetwork, self).__init__()
self.fc1 = nn.Linear(2, 128)
self.fc2 = nn.Linear(128, 2)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = self.fc2(x)
return x
# 定義訓練函式
def train_func(config):
# 從 config 中取得必要的引數
batch_size = config["batch_size"]
epochs = config["epochs"]
# 初始化模型、損失函式和最佳化器
model = NeuralNetwork()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# 取得訓練和驗證資料集
train_dataset = train.get_dataset_shard("train")
val_dataset = train.get_dataset_shard("val")
# 進行多輪訓練
for epoch in range(epochs):
# 訓練迴圈
model.train()
total_loss = 0
for batch in train_dataset.iter_batches(batch_size=batch_size):
inputs = torch.tensor(batch["feature1", "feature2"].values)
labels = torch.tensor(batch["label"].values)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}, Loss: {total_loss / len(train_dataset)}")
# 驗證迴圈
model.eval()
with torch.no_grad():
total_correct = 0
for batch in val_dataset.iter_batches(batch_size=batch_size):
inputs = torch.tensor(batch["feature1", "feature2"].values)
labels = torch.tensor(batch["label"].values)
outputs = model(inputs)
_, predicted = torch.max(outputs, dim=1)
total_correct += (predicted == labels).sum().item()
accuracy = total_correct / len(val_dataset)
print(f"Epoch {epoch+1}, Val Accuracy: {accuracy:.4f}")
# 組態 TorchTrainer
trainer = TorchTrainer(
train_func,
scaling_config={"num_workers": 4, "use_gpu": False},
datasets={"train": dataset.train_test_split(test_size=0.2, shuffle=True)["train"],
"val": dataset.train_test_split(test_size=0.2, shuffle=True)["test"]},
config={"batch_size": 128, "epochs": 10}
)
# 開始訓練
result = trainer.fit()
print("最終結果:", result.metrics)
結果分析與未來改進方向
透過上述程式碼,我們成功地使用 Ray Train 在紐約市計程車資料集上進行了分散式神經網路訓練。未來,我們可以進一步最佳化模型結構、調整超引數或嘗試不同的分散式策略來提高模型的效能。
#### 程式碼重點解說:
1. **載入與預處理資料**:利用 `ray.data.read_csv` 載入 CSV 資料,並透過自定義函式 `preprocess_data` 對資料進行預處理,包括選擇特徵欄位、填補缺失值以及轉換資料型別。
2. **定義神經網路**:使用 PyTorch 定義了一個簡單的全連線神經網路,用於二元分類別任務。
3. **分散式訓練**:藉助 `TorchTrainer` 和 `train_func`,我們組態了分散式訓練環境,包括設定 worker 節點數量、是否使用 GPU 以及傳入必要的超引數等。並透過 `datasets` 引數傳入已分割好的訓練集與驗證集。
4. **結果輸出**:透過 `trainer.fit()` 方法啟動分散式訓練,並輸出最終結果指標(如準確率等)。
圖表翻譯:
此圖示展示了整個分散式深度學習流程,從原始資料載入到最終模型評估,中間經過資料預處理、模型定義、分散式訓練等關鍵步驟。它清晰地闡述了每個階段的主要任務及相互之間的邏輯關係,有助於讀者全面理解整個系統的工作原理及實作細節。
@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333
title 圖表翻譯:
rectangle "載入資料" as node1
rectangle "特徵工程" as node2
rectangle "分割資料" as node3
rectangle "組態超引數" as node4
rectangle "評估模型" as node5
rectangle "反饋結果" as node6
node1 --> node2
node2 --> node3
node3 --> node4
node4 --> node5
node5 --> node6
@enduml圖表翻譯: 此圖示清晰地展示了從原始資料到最終模型評估的完整流程,涵蓋了資料預處理、特徵工程、資料分割、分散式訓練以及模型評估等關鍵步驟。每一步驟之間的邏輯關係明確,有助於讀者深入理解整個深度學習流程的工作原理與實作細節。