Ray Tune 提供一個彈性且高效的框架,方便整合多種機器學習框架和超引數搜尋演算法。藉由 Tune,您可以輕鬆地調整 RLlib 和 Keras 模型的超引數,並利用 Ray Data 的分散式資料處理能力加速實驗流程。Tune 支援多種搜尋演算法,例如 Hyperopt,讓您可以根據需求選擇合適的策略。此外,Dataset Pipelines 的引入,更進一步提升了多階段資料處理的效率,有效避免資源閒置,讓您可以更專注於模型的開發與最佳化。
使用Ray Tune進行機器學習
Ray Tune是一個強大的超引數調優工具,可以與多種機器學習框架結合使用。本章節將介紹如何使用Ray Tune來最佳化機器學習模型的超引數。
與RLlib結合使用
RLlib是一個根據Ray的強化學習函式庫,與Tune緊密整合。您可以使用Tune來最佳化RLlib模型的超引數。以下是一個使用Tune最佳化RLlib模型的例子:
from ray import tune
analysis = tune.run(
"DQN",
metric="episode_reward_mean",
mode="max",
config={
"env": "CartPole-v1",
"lr": tune.uniform(1e-5, 1e-4),
"train_batch_size": tune.choice([10000, 20000, 40000]),
},
)
內容解密:
這段程式碼使用Tune來執行一個DQN模型的超引數調優實驗。其中,metric引數指定了要最佳化的指標,mode引數指定了最佳化的方向(最大化或最小化)。config引數定義了模型的超引數搜尋空間,包括學習率(lr)和訓練批次大小(train_batch_size)。
調整Keras模型
本文將介紹如何使用Tune來最佳化Keras模型的超引數。首先,我們需要載入MNIST資料集並進行預處理:
from tensorflow.keras.datasets import mnist
from tensorflow.keras.utils import to_categorical
def load_data():
(x_train, y_train), (x_test, y_test) = mnist.load_data()
num_classes = 10
x_train, x_test = x_train / 255.0, x_test / 255.0
y_train = to_categorical(y_train, num_classes)
y_test = to_categorical(y_test, num_classes)
return (x_train, y_train), (x_test, y_test)
load_data()
內容解密:
這段程式碼定義了一個load_data函式,用於載入MNIST資料集並進行預處理,包括將畫素值歸一化到0和1之間,並將標籤轉換為類別變數。我們首先呼叫load_data函式來下載資料,以避免多個Tune工作執行緒平行下載資料導致的問題。
接下來,我們定義一個Tune目標函式,使用Keras建立一個深度學習模型,並使用Tune的Hyperopt整合來定義超引數搜尋演算法:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten, Dense, Dropout
from ray.tune.integration.keras import TuneReportCallback
def trainable(config):
# 建立Keras模型
model = Sequential()
model.add(Flatten(input_shape=(28, 28)))
model.add(Dense(config["hidden_units"], activation=config["activation"]))
model.add(Dropout(config["dropout_rate"]))
model.add(Dense(10, activation="softmax"))
# 編譯模型
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])
# 載入資料
(x_train, y_train), (x_test, y_test) = load_data()
# 訓練模型
model.fit(x_train, y_train, epochs=10, batch_size=128, validation_data=(x_test, y_test),
callbacks=[TuneReportCallback()])
內容解密:
這段程式碼定義了一個trainable函式,用於建立一個Keras模型並訓練它。模型的超引數,包括隱藏單元數量(hidden_units)、啟用函式(activation)和丟棄率(dropout_rate),都是從config字典中取得的。我們使用TuneReportCallback作為Keras的回撥函式,將準確率指標回報給Tune。
Tune的工作流程
Tune的工作流程如下圖所示:
@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333
title Tune的工作流程
rectangle "定義目標函式" as node1
rectangle "定義搜尋空間" as node2
rectangle "執行試驗" as node3
rectangle "取樣超引數" as node4
rectangle "回報指標" as node5
node1 --> node2
node2 --> node3
node3 --> node4
node4 --> node5
@enduml圖表翻譯: 這張圖描述了Tune的工作流程。首先,Tune定義了一個目標函式和一個搜尋空間。然後,Tune執行多個試驗,每個試驗都從搜尋空間中取樣超引數。試驗執行後,將指標回報給Tune。Tune根據這些指標來最佳化超引數。
使用Ray Tune進行超引數調優與Ray Data資料處理
超引數調優客製化搜尋演算法
在機器學習實驗中,超引數的調優至關重要。Ray Tune提供了一個強大的框架,能夠與多種超引數調優(HPO)函式庫整合,如Hyperopt。以下展示如何使用HyperOptSearch演算法來調優Keras模型。
Keras模型定義與目標函式
def objective(config):
(x_train, y_train), (x_test, y_test) = load_data()
model = Sequential()
model.add(Flatten(input_shape=(28, 28)))
model.add(Dense(config["hidden"], activation=config["activation"]))
model.add(Dropout(config["rate"]))
model.add(Dense(10, activation="softmax"))
model.compile(loss="categorical_crossentropy", metrics=["accuracy"])
model.fit(x_train, y_train, batch_size=128, epochs=10,
validation_data=(x_test, y_test),
callbacks=[TuneReportCallback({"mean_accuracy": "accuracy"})])
使用HyperOptSearch進行超引數調優
from ray import tune
from ray.tune.suggest.hyperopt import HyperOptSearch
initial_params = [{"rate": 0.2, "hidden": 128, "activation": "relu"}]
algo = HyperOptSearch(points_to_evaluate=initial_params)
search_space = {
"rate": tune.uniform(0.1, 0.5),
"hidden": tune.randint(32, 512),
"activation": tune.choice(["relu", "tanh"])
}
analysis = tune.run(
objective,
name="keras_hyperopt_exp",
search_alg=algo,
metric="mean_accuracy",
mode="max",
stop={"mean_accuracy": 0.99},
num_samples=10,
config=search_space,
)
print("Best hyperparameters found were: ", analysis.best_config)
內容解密:
objective函式:定義了Keras模型的架構和訓練過程,接受一個包含超引數的config字典。HyperOptSearch:利用Hyperopt的TPE演算法進行超引數搜尋。可以指定初始引數initial_params來引導搜尋過程。search_space:定義了超引數的搜尋空間,包括丟棄率rate、隱藏層單元數hidden和啟用函式activation。tune.run:執行超引數調優,根據指定的搜尋演算法和搜尋空間進行最佳化。
Ray Data資料處理
Ray Data提供了一套靈活且高效的資料處理能力,能夠與多種資料處理函式庫(如Dask)整合,支援多種資料格式。
Ray Datasets核心概念
Ray Datasets是Ray Data的核心元件,提供了一套可擴充套件的資料處理抽象。它支援讀取、轉換和傳遞資料,並且能夠與Ray的其他元件(如Ray Train和Ray Tune)無縫整合。
Ray Datasets的優勢
- 靈活性:支援多種資料格式,能夠與多種資料處理函式庫整合。
- 效能:針對機器學習工作負載進行了最佳化,支援加速器、Pipeline處理和全域隨機洗牌。
使用Ray Datasets進行資料處理
import ray
# 初始化Ray
ray.init()
# 建立Dataset
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
# 資料轉換
ds = ds.map(lambda x: x * 2)
# 輸出結果
print(ds.take(5))
內容解密:
ray.data.read_csv:從CSV檔案讀取資料,建立一個Ray Dataset。ds.map:對Dataset中的每個元素進行轉換操作。ds.take:取出Dataset中的前幾個元素。
Ray Datasets 基礎介紹
Ray Datasets 是 Ray 框架中的一個重要元件,用於高效處理大規模資料。本文將介紹 Ray Datasets 的基本概念、建立、讀取、寫入和轉換資料集的方法。
建立 Ray Dataset
首先,我們來建立一個簡單的 Dataset 並進行一些基本操作:
import ray
# 建立一個包含整數的資料集,範圍在 [0, 10000)
ds = ray.data.range(10000)
# 基本操作:顯示資料集大小、取得幾個樣本、列印 schema
print(ds.count()) # 輸出:10000
print(ds.take(5)) # 輸出:[0, 1, 2, 3, 4]
print(ds.schema()) # 輸出:<class 'int'>
內容解密:
ray.data.range(10000)建立了一個包含整數的資料集,範圍在 [0, 10000)。ds.count()傳回資料集的大小。ds.take(5)傳回資料集的前 5 個元素。ds.schema()傳回資料集的 schema。
讀取和寫入儲存
在實際應用中,我們通常需要從持久儲存中讀取資料或將結果寫入儲存。Ray Datasets 提供了簡單的 API 來實作這一點。例如,將 Dataset 寫入 CSV 檔案並讀取回記憶體:
# 將資料集寫入本地檔案並讀取回來
ray.data.range(10000).write_csv("local_dir")
ds = ray.data.read_csv("local_dir")
print(ds.count())
內容解密:
write_csv("local_dir")將資料集寫入本地目錄。read_csv("local_dir")從本地目錄讀取資料集。
內建轉換
Ray Datasets 支援多種內建轉換操作,例如:
ds1 = ray.data.range(10000)
ds2 = ray.data.range(10000)
ds3 = ds1.union(ds2)
print(ds3.count()) # 輸出:20000
# 篩選出偶數元素
ds3 = ds3.filter(lambda x: x % 2 == 0)
print(ds3.count()) # 輸出:10000
print(ds3.take(5)) # 輸出:[0, 2, 4, 6, 8]
# 對篩選後的資料集進行排序
ds3 = ds3.sort()
print(ds3.take(5)) # 輸出:[0, 0, 2, 2, 4]
內容解密:
ds1.union(ds2)將兩個資料集合併成一個新的資料集。ds3.filter(lambda x: x % 2 == 0)篩選出偶數元素。ds3.sort()對篩選後的資料集進行排序。
Blocks 和重新分割
Ray Datasets 中的資料被分成多個 blocks,每個 block 代表資料的一個片段。操作的平行度取決於 blocks 的數量。如果 blocks 太少,操作可能無法充分平行化;如果 blocks 太多,則會增加額外的開銷。
ds1 = ray.data.range(10000)
print(ds1.num_blocks()) # 輸出:200
ds2 = ray.data.range(10000)
print(ds2.num_blocks()) # 輸出:200
ds3 = ds1.union(ds2)
print(ds3.num_blocks()) # 輸出:400
# 重新分割資料集
print(ds3.repartition(200).num_blocks()) # 輸出:200
內容解密:
num_blocks()傳回資料集的 blocks 數量。repartition(200)將資料集重新分割成 200 個 blocks。
Ray Datasets 資料處理與運算最佳化
Ray Datasets 提供了一種高效處理大規模資料集的方法,支援多種資料格式如 Python 字典、DataFrames 和序列化的 Parquet 檔案。透過 Ray Datasets,使用者可以輕鬆建立、轉換和運算資料集。
建立與轉換資料集
建立具有結構描述(schema)的資料集最簡單的方法是從 Python 字典列表建立:
ds = ray.data.from_items([{"id": "abc", "value": 1}, {"id": "def", "value": 2}])
print(ds.schema()) # 輸出:id: string, value: int64
內容解密:
ray.data.from_items()方法用於從 Python 字典列表建立 Ray Dataset。ds.schema()用於輸出資料集的結構描述,顯示欄位名稱和資料型別。- 資料集的結構描述是從輸入的字典鍵值自動推斷出來的。
此外,Ray Datasets 可以與 Pandas DataFrames 無縫轉換:
pandas_df = ds.to_pandas() # 將 Ray Dataset 轉換為 Pandas DataFrame
內容解密:
ds.to_pandas()將 Ray Dataset 轉換為 Pandas DataFrame,保持原有的結構描述。- 這種轉換是雙向的,可以從 Pandas DataFrame 建立 Ray Dataset,並自動繼承其結構描述。
在 Ray Datasets 上進行運算
Ray Datasets 提供了強大的運算能力,允許使用者對大規模資料進行自定義轉換。最常用的方法是使用 .map() 對資料集中的每筆記錄進行操作:
ds = ray.data.range(10000).map(lambda x: x ** 2)
ds.take(5) # 輸出:[0, 1, 4, 9, 16]
內容解密:
ray.data.range(10000)生成一個包含 0 到 9999 的整數序列的資料集。.map(lambda x: x ** 2)對資料集中的每個元素進行平方運算。ds.take(5)取出結果的前五筆記錄。
對於需要向量化運算的場景,可以使用 .map_batches() 對批次資料進行操作,以提高效率:
import numpy as np
ds = ray.data.range(10000).map_batches(lambda batch: np.square(batch).tolist())
ds.take(5) # 輸出:[0, 1, 4, 9, 16]
內容解密:
.map_batches()對資料批次進行操作,這裡使用 NumPy 的square方法對批次進行平方運算。- 使用向量化運算(如 NumPy)可以顯著提高效能,特別是在 GPU 上進行深度學習訓練或推斷時。
使用 Ray Actors 進行高效運算
Ray Datasets 支援使用 Ray actors 對資料進行對映(mapping),這對於需要在 GPU 上進行模型推斷等昂貴操作的場景非常有用:
class MLModel:
def __init__(self):
self._model = load_model() # 在 actor 初始化時載入模型
def __call__(self, batch):
return self._model(batch)
ds.map_batches(MLModel, compute="actors")
內容解密:
MLModel類別定義了一個可呼叫的物件,在初始化時載入模型,避免重複載入模型的開銷。ds.map_batches(MLModel, compute="actors")使用 Ray actors 對資料批次進行模型推斷,利用 GPU 資源。
資料集管線(Dataset Pipelines)
預設情況下,Ray Datasets 的操作是阻塞的,即每個操作完成後才會開始下一個操作。這種方式可能導致資源閒置,尤其是在多階段處理流程中。
以下是一個典型的多階段處理流程範例:
ds = (ray.data.read_parquet("s3://my_bucket/input_data")
.map(cpu_intensive_preprocessing)
.map_batches(gpu_intensive_inference, compute="actors", num_gpus=1)
.repartition(10))
ds.write_parquet("s3://my_bucket/output_predictions")
圖表翻譯:
此圖示展示了資料處理流程中的五個階段,包括從遠端儲存讀取資料、CPU密集的前處理、GPU密集的模型推斷、資料重新分割和寫入遠端儲存。每個階段都可能受到不同系統資源的限制,如網路頻寬、CPU或GPU資源等。
內容解密:
- 從遠端儲存讀取資料,需要 ingress 網路頻寬,並可能受儲存系統吞吐量的限制。
- CPU密集的前處理階段,需要大量的 CPU 資源來執行預處理函式。
- GPU密集的模型推斷階段,使用 Ray actors 和 GPU 資源進行向量化推斷。
- 資料重新分割階段,需要網路頻寬在叢集中傳輸資料。
- 最後將結果寫入遠端儲存,需要 egress 網路頻寬,並可能再次受儲存系統吞吐量的限制。
透過使用 Ray Datasets 和 Dataset Pipelines,使用者可以更高效地管理和處理大規模資料集,充分利用叢集資源並最佳化運算流程。