Ray Tune 提供一個彈性且高效的框架,方便整合多種機器學習框架和超引數搜尋演算法。藉由 Tune,您可以輕鬆地調整 RLlib 和 Keras 模型的超引數,並利用 Ray Data 的分散式資料處理能力加速實驗流程。Tune 支援多種搜尋演算法,例如 Hyperopt,讓您可以根據需求選擇合適的策略。此外,Dataset Pipelines 的引入,更進一步提升了多階段資料處理的效率,有效避免資源閒置,讓您可以更專注於模型的開發與最佳化。

使用Ray Tune進行機器學習

Ray Tune是一個強大的超引數調優工具,可以與多種機器學習框架結合使用。本章節將介紹如何使用Ray Tune來最佳化機器學習模型的超引數。

與RLlib結合使用

RLlib是一個根據Ray的強化學習函式庫,與Tune緊密整合。您可以使用Tune來最佳化RLlib模型的超引數。以下是一個使用Tune最佳化RLlib模型的例子:

from ray import tune

analysis = tune.run(
    "DQN",
    metric="episode_reward_mean",
    mode="max",
    config={
        "env": "CartPole-v1",
        "lr": tune.uniform(1e-5, 1e-4),
        "train_batch_size": tune.choice([10000, 20000, 40000]),
    },
)

內容解密:

這段程式碼使用Tune來執行一個DQN模型的超引數調優實驗。其中,metric引數指定了要最佳化的指標,mode引數指定了最佳化的方向(最大化或最小化)。config引數定義了模型的超引數搜尋空間,包括學習率(lr)和訓練批次大小(train_batch_size)。

調整Keras模型

本文將介紹如何使用Tune來最佳化Keras模型的超引數。首先,我們需要載入MNIST資料集並進行預處理:

from tensorflow.keras.datasets import mnist
from tensorflow.keras.utils import to_categorical

def load_data():
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    num_classes = 10
    x_train, x_test = x_train / 255.0, x_test / 255.0
    y_train = to_categorical(y_train, num_classes)
    y_test = to_categorical(y_test, num_classes)
    return (x_train, y_train), (x_test, y_test)

load_data()

內容解密:

這段程式碼定義了一個load_data函式,用於載入MNIST資料集並進行預處理,包括將畫素值歸一化到0和1之間,並將標籤轉換為類別變數。我們首先呼叫load_data函式來下載資料,以避免多個Tune工作執行緒平行下載資料導致的問題。

接下來,我們定義一個Tune目標函式,使用Keras建立一個深度學習模型,並使用Tune的Hyperopt整合來定義超引數搜尋演算法:

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten, Dense, Dropout
from ray.tune.integration.keras import TuneReportCallback

def trainable(config):
    # 建立Keras模型
    model = Sequential()
    model.add(Flatten(input_shape=(28, 28)))
    model.add(Dense(config["hidden_units"], activation=config["activation"]))
    model.add(Dropout(config["dropout_rate"]))
    model.add(Dense(10, activation="softmax"))
    
    # 編譯模型
    model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])
    
    # 載入資料
    (x_train, y_train), (x_test, y_test) = load_data()
    
    # 訓練模型
    model.fit(x_train, y_train, epochs=10, batch_size=128, validation_data=(x_test, y_test), 
              callbacks=[TuneReportCallback()])

內容解密:

這段程式碼定義了一個trainable函式,用於建立一個Keras模型並訓練它。模型的超引數,包括隱藏單元數量(hidden_units)、啟用函式(activation)和丟棄率(dropout_rate),都是從config字典中取得的。我們使用TuneReportCallback作為Keras的回撥函式,將準確率指標回報給Tune。

Tune的工作流程

Tune的工作流程如下圖所示:

@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333

title Tune的工作流程

rectangle "定義目標函式" as node1
rectangle "定義搜尋空間" as node2
rectangle "執行試驗" as node3
rectangle "取樣超引數" as node4
rectangle "回報指標" as node5

node1 --> node2
node2 --> node3
node3 --> node4
node4 --> node5

@enduml

圖表翻譯: 這張圖描述了Tune的工作流程。首先,Tune定義了一個目標函式和一個搜尋空間。然後,Tune執行多個試驗,每個試驗都從搜尋空間中取樣超引數。試驗執行後,將指標回報給Tune。Tune根據這些指標來最佳化超引數。

使用Ray Tune進行超引數調優與Ray Data資料處理

超引數調優客製化搜尋演算法

在機器學習實驗中,超引數的調優至關重要。Ray Tune提供了一個強大的框架,能夠與多種超引數調優(HPO)函式庫整合,如Hyperopt。以下展示如何使用HyperOptSearch演算法來調優Keras模型。

Keras模型定義與目標函式

def objective(config):
    (x_train, y_train), (x_test, y_test) = load_data()
    model = Sequential()
    model.add(Flatten(input_shape=(28, 28)))
    model.add(Dense(config["hidden"], activation=config["activation"]))
    model.add(Dropout(config["rate"]))
    model.add(Dense(10, activation="softmax"))
    model.compile(loss="categorical_crossentropy", metrics=["accuracy"])
    model.fit(x_train, y_train, batch_size=128, epochs=10,
              validation_data=(x_test, y_test),
              callbacks=[TuneReportCallback({"mean_accuracy": "accuracy"})])

使用HyperOptSearch進行超引數調優

from ray import tune
from ray.tune.suggest.hyperopt import HyperOptSearch

initial_params = [{"rate": 0.2, "hidden": 128, "activation": "relu"}]
algo = HyperOptSearch(points_to_evaluate=initial_params)
search_space = {
    "rate": tune.uniform(0.1, 0.5),
    "hidden": tune.randint(32, 512),
    "activation": tune.choice(["relu", "tanh"])
}
analysis = tune.run(
    objective,
    name="keras_hyperopt_exp",
    search_alg=algo,
    metric="mean_accuracy",
    mode="max",
    stop={"mean_accuracy": 0.99},
    num_samples=10,
    config=search_space,
)
print("Best hyperparameters found were: ", analysis.best_config)

內容解密:

  1. objective函式:定義了Keras模型的架構和訓練過程,接受一個包含超引數的config字典。
  2. HyperOptSearch:利用Hyperopt的TPE演算法進行超引數搜尋。可以指定初始引數initial_params來引導搜尋過程。
  3. search_space:定義了超引數的搜尋空間,包括丟棄率rate、隱藏層單元數hidden和啟用函式activation
  4. tune.run:執行超引數調優,根據指定的搜尋演算法和搜尋空間進行最佳化。

Ray Data資料處理

Ray Data提供了一套靈活且高效的資料處理能力,能夠與多種資料處理函式庫(如Dask)整合,支援多種資料格式。

Ray Datasets核心概念

Ray Datasets是Ray Data的核心元件,提供了一套可擴充套件的資料處理抽象。它支援讀取、轉換和傳遞資料,並且能夠與Ray的其他元件(如Ray Train和Ray Tune)無縫整合。

Ray Datasets的優勢

  1. 靈活性:支援多種資料格式,能夠與多種資料處理函式庫整合。
  2. 效能:針對機器學習工作負載進行了最佳化,支援加速器、Pipeline處理和全域隨機洗牌。

使用Ray Datasets進行資料處理

import ray

# 初始化Ray
ray.init()

# 建立Dataset
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

# 資料轉換
ds = ds.map(lambda x: x * 2)

# 輸出結果
print(ds.take(5))

內容解密:

  1. ray.data.read_csv:從CSV檔案讀取資料,建立一個Ray Dataset。
  2. ds.map:對Dataset中的每個元素進行轉換操作。
  3. ds.take:取出Dataset中的前幾個元素。

Ray Datasets 基礎介紹

Ray Datasets 是 Ray 框架中的一個重要元件,用於高效處理大規模資料。本文將介紹 Ray Datasets 的基本概念、建立、讀取、寫入和轉換資料集的方法。

建立 Ray Dataset

首先,我們來建立一個簡單的 Dataset 並進行一些基本操作:

import ray

# 建立一個包含整數的資料集,範圍在 [0, 10000)
ds = ray.data.range(10000)

# 基本操作:顯示資料集大小、取得幾個樣本、列印 schema
print(ds.count())  # 輸出:10000
print(ds.take(5))  # 輸出:[0, 1, 2, 3, 4]
print(ds.schema())  # 輸出:<class 'int'>

內容解密:

  1. ray.data.range(10000) 建立了一個包含整數的資料集,範圍在 [0, 10000)。
  2. ds.count() 傳回資料集的大小。
  3. ds.take(5) 傳回資料集的前 5 個元素。
  4. ds.schema() 傳回資料集的 schema。

讀取和寫入儲存

在實際應用中,我們通常需要從持久儲存中讀取資料或將結果寫入儲存。Ray Datasets 提供了簡單的 API 來實作這一點。例如,將 Dataset 寫入 CSV 檔案並讀取回記憶體:

# 將資料集寫入本地檔案並讀取回來
ray.data.range(10000).write_csv("local_dir")
ds = ray.data.read_csv("local_dir")
print(ds.count())

內容解密:

  1. write_csv("local_dir") 將資料集寫入本地目錄。
  2. read_csv("local_dir") 從本地目錄讀取資料集。

內建轉換

Ray Datasets 支援多種內建轉換操作,例如:

ds1 = ray.data.range(10000)
ds2 = ray.data.range(10000)
ds3 = ds1.union(ds2)
print(ds3.count())  # 輸出:20000

# 篩選出偶數元素
ds3 = ds3.filter(lambda x: x % 2 == 0)
print(ds3.count())  # 輸出:10000
print(ds3.take(5))  # 輸出:[0, 2, 4, 6, 8]

# 對篩選後的資料集進行排序
ds3 = ds3.sort()
print(ds3.take(5))  # 輸出:[0, 0, 2, 2, 4]

內容解密:

  1. ds1.union(ds2) 將兩個資料集合併成一個新的資料集。
  2. ds3.filter(lambda x: x % 2 == 0) 篩選出偶數元素。
  3. ds3.sort() 對篩選後的資料集進行排序。

Blocks 和重新分割

Ray Datasets 中的資料被分成多個 blocks,每個 block 代表資料的一個片段。操作的平行度取決於 blocks 的數量。如果 blocks 太少,操作可能無法充分平行化;如果 blocks 太多,則會增加額外的開銷。

ds1 = ray.data.range(10000)
print(ds1.num_blocks())  # 輸出:200

ds2 = ray.data.range(10000)
print(ds2.num_blocks())  # 輸出:200

ds3 = ds1.union(ds2)
print(ds3.num_blocks())  # 輸出:400

# 重新分割資料集
print(ds3.repartition(200).num_blocks())  # 輸出:200

內容解密:

  1. num_blocks() 傳回資料集的 blocks 數量。
  2. repartition(200) 將資料集重新分割成 200 個 blocks。

Ray Datasets 資料處理與運算最佳化

Ray Datasets 提供了一種高效處理大規模資料集的方法,支援多種資料格式如 Python 字典、DataFrames 和序列化的 Parquet 檔案。透過 Ray Datasets,使用者可以輕鬆建立、轉換和運算資料集。

建立與轉換資料集

建立具有結構描述(schema)的資料集最簡單的方法是從 Python 字典列表建立:

ds = ray.data.from_items([{"id": "abc", "value": 1}, {"id": "def", "value": 2}])
print(ds.schema())  # 輸出:id: string, value: int64

內容解密:

  • ray.data.from_items() 方法用於從 Python 字典列表建立 Ray Dataset。
  • ds.schema() 用於輸出資料集的結構描述,顯示欄位名稱和資料型別。
  • 資料集的結構描述是從輸入的字典鍵值自動推斷出來的。

此外,Ray Datasets 可以與 Pandas DataFrames 無縫轉換:

pandas_df = ds.to_pandas()  # 將 Ray Dataset 轉換為 Pandas DataFrame

內容解密:

  • ds.to_pandas() 將 Ray Dataset 轉換為 Pandas DataFrame,保持原有的結構描述。
  • 這種轉換是雙向的,可以從 Pandas DataFrame 建立 Ray Dataset,並自動繼承其結構描述。

在 Ray Datasets 上進行運算

Ray Datasets 提供了強大的運算能力,允許使用者對大規模資料進行自定義轉換。最常用的方法是使用 .map() 對資料集中的每筆記錄進行操作:

ds = ray.data.range(10000).map(lambda x: x ** 2)
ds.take(5)  # 輸出:[0, 1, 4, 9, 16]

內容解密:

  • ray.data.range(10000) 生成一個包含 0 到 9999 的整數序列的資料集。
  • .map(lambda x: x ** 2) 對資料集中的每個元素進行平方運算。
  • ds.take(5) 取出結果的前五筆記錄。

對於需要向量化運算的場景,可以使用 .map_batches() 對批次資料進行操作,以提高效率:

import numpy as np
ds = ray.data.range(10000).map_batches(lambda batch: np.square(batch).tolist())
ds.take(5)  # 輸出:[0, 1, 4, 9, 16]

內容解密:

  • .map_batches() 對資料批次進行操作,這裡使用 NumPy 的 square 方法對批次進行平方運算。
  • 使用向量化運算(如 NumPy)可以顯著提高效能,特別是在 GPU 上進行深度學習訓練或推斷時。

使用 Ray Actors 進行高效運算

Ray Datasets 支援使用 Ray actors 對資料進行對映(mapping),這對於需要在 GPU 上進行模型推斷等昂貴操作的場景非常有用:

class MLModel:
    def __init__(self):
        self._model = load_model()  # 在 actor 初始化時載入模型
    
    def __call__(self, batch):
        return self._model(batch)

ds.map_batches(MLModel, compute="actors")

內容解密:

  • MLModel 類別定義了一個可呼叫的物件,在初始化時載入模型,避免重複載入模型的開銷。
  • ds.map_batches(MLModel, compute="actors") 使用 Ray actors 對資料批次進行模型推斷,利用 GPU 資源。

資料集管線(Dataset Pipelines)

預設情況下,Ray Datasets 的操作是阻塞的,即每個操作完成後才會開始下一個操作。這種方式可能導致資源閒置,尤其是在多階段處理流程中。

以下是一個典型的多階段處理流程範例:

ds = (ray.data.read_parquet("s3://my_bucket/input_data")
      .map(cpu_intensive_preprocessing)
      .map_batches(gpu_intensive_inference, compute="actors", num_gpus=1)
      .repartition(10))
ds.write_parquet("s3://my_bucket/output_predictions")

圖表翻譯:

此圖示展示了資料處理流程中的五個階段,包括從遠端儲存讀取資料、CPU密集的前處理、GPU密集的模型推斷、資料重新分割和寫入遠端儲存。每個階段都可能受到不同系統資源的限制,如網路頻寬、CPU或GPU資源等。

內容解密:

  1. 從遠端儲存讀取資料,需要 ingress 網路頻寬,並可能受儲存系統吞吐量的限制。
  2. CPU密集的前處理階段,需要大量的 CPU 資源來執行預處理函式。
  3. GPU密集的模型推斷階段,使用 Ray actors 和 GPU 資源進行向量化推斷。
  4. 資料重新分割階段,需要網路頻寬在叢集中傳輸資料。
  5. 最後將結果寫入遠端儲存,需要 egress 網路頻寬,並可能再次受儲存系統吞吐量的限制。

透過使用 Ray Datasets 和 Dataset Pipelines,使用者可以更高效地管理和處理大規模資料集,充分利用叢集資源並最佳化運算流程。