隨著資料規模和模型複雜度的提升,傳統的單機訓練方式已無法滿足需求。Ray 提供了一個分散式機器學習框架,結合 Ray Tune 和 Ray Dataset,可以更有效率地進行模型訓練和超引數調整。利用 Ray Tune,可以輕鬆整合 HyperOptSearch 等演算法,快速找到最佳的超引陣列合,提升模型效能。同時,Ray Dataset 提供了分散式資料處理能力,能有效載入、預處理和轉換大規模資料,並與 TensorFlow、PyTorch 等深度學習框架無縫整合。此外,Ray 還支援與 Dask 等外部庫整合,進一步擴充套件其功能,滿足不同場景下的資料處理需求,為 Python 分散式機器學習提供更全面的解決方案。
載入資料
首先,我們需要載入MNIST資料集。為了避免多個工作者同時下載資料,我們先定義一個load_data
函式,然後立即呼叫它以便下載資料。
import numpy as np
def load_data():
(x_train, y_train), (x_test, y_test) = ...
return (x_train, y_train), (x_test, y_test)
(x_train, y_train), (x_test, y_test) = load_data()
定義目標函式
接下來,我們定義一個目標函式objective
,它將接受一組超引數配置,並傳回模型的效能指標。在這個例子中,我們使用Keras的順序模型,新增兩個全連線層和一個Dropout層。
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten, Dense, Dropout
from ray.tune.integration.keras import TuneReportCallback
def objective(config):
model = Sequential()
model.add(Flatten(input_shape=(28, 28)))
model.add(Dense(config["hidden"], activation=config["activation"]))
model.add(Dropout(config["rate"]))
model.add(Dense(10, activation="softmax"))
model.fit(x_train, y_train, batch_size=128, epochs=10,
validation_data=(x_test, y_test),
callbacks=[TuneReportCallback({"mean_accuracy": "accuracy"})])
使用HyperOptSearch進行搜尋
現在,我們可以使用HyperOptSearch演算法來搜尋最佳的超引數配置。首先,我們需要安裝Hyperopt庫。
pip install hyperopt==0.2.7
然後,我們定義了一個初始的超引數配置,並建立了一個HyperOptSearch例項。
from ray import tune
from ray.tune.suggest.hyperopt import HyperOptSearch
initial_params = [{"rate": 0.2, "hidden": 128, "activation": "relu"}]
algo = HyperOptSearch(points_to_evaluate=initial_params)
search_space = {
"rate": tune.uniform(0.1, 0.5),
"hidden": tune.randint(32, 512),
"activation": tune.choice(["relu", "tanh"])
}
執行搜尋
最後,我們可以使用ray.tune.run
函式來執行搜尋。
analysis = tune.run(
objective,
name="keras_hyperopt_exp",
search_alg=algo,
metric="mean_accuracy",
mode="max",
stop={"mean_accuracy": 0.99},
num_samples=10,
config=search_space,
)
print("Best hyperparameters found were: ", analysis.best_config)
這個例子展示瞭如何使用Ray Tune來最佳化一個簡單的Keras模型。Ray Tune提供了多種搜尋演算法和排程器,能夠與多個機器學習框架和最佳化庫進行整合,使其成為一個強大的超引數最佳化工具。
資料處理入門
在第五章中,您學習瞭如何為機器學習實驗調整超引數。當然,應用機器學習的關鍵組成部分是資料。在本章中,我們將探索 Ray 的核心資料處理能力:Ray 資料。
Ray 資料並不打算取代更通用的資料處理系統,如 Apache Spark 或 Apache Hadoop,但它提供了基本的資料處理能力和一個標準的方式來載入、轉換和傳遞資料給 Ray 應用程式的不同部分。這使得 Ray 上的庫可以使用相同的語言來混合和匹配功能,以滿足使用者的需求。
Ray 資料生態系統的核心元件是 Ray 資料集,它提供了載入、轉換和傳遞資料參考的核心抽象。資料集是不同庫之間相互運作的「膠水」,使其能夠在 Ray 上共同工作。
Ray 資料集的優點
Ray 資料集具有以下優點:
- 彈性:它支援廣泛的資料格式,與庫整合(如 Dask on Ray)無縫合作,並且可以在 Ray 任務和演員之間傳遞而不需要複製資料。
- 機器學習工作負載的效能:它提供了加速器支援、管道化和全域隨機洗牌等重要功能,以加速機器學習訓練和推理工作負載。
Ray 資料集的基礎
Ray 資料集是一個列表的 Ray 物件參考,每個參考都指向一個「區塊」資料。這些區塊可以是 Apache Arrow 表格或 Python 列表(對於不支援 Apache Arrow 的資料),儲存在 Ray 的共享記憶體物件儲存中。對資料的計算(如對映或過濾操作)發生在 Ray 任務(和有時演員)中。
由於 Ray 資料集依賴於 Ray 的核心原語(tasks 和 objects),它繼承了 Ray 的關鍵優點:可擴充套件性、記憶體使用效率和物件溢位和恢復等。
Ray 資料集的使用
要開始使用 Ray 資料集,首先需要安裝 Ray 資料集:
pip install "ray[data]==2.2.0"
然後,可以建立一個簡單的 Ray 資料集並執行一些基本操作:
import ray
# 建立一個 Ray 資料集
dataset = ray.data.range(100)
# 執行一些基本操作
dataset = dataset.filter(lambda x: x % 2 == 0)
dataset = dataset.map(lambda x: x * 2)
# 顯示結果
print(dataset.take(10))
這個例子展示瞭如何建立一個 Ray 資料集、過濾資料和對映資料。
圖表翻譯:
graph LR A[Ray 資料集] -->|建立|> B[Ray 資料] B -->|過濾|> C[過濾後的資料] C -->|對映|> D[對映後的資料] D -->|顯示|> E[結果]
這個圖表展示了 Ray 資料集的建立、過濾、對映和顯示的流程。
內容解密:
Ray 資料集是 Ray 的核心資料處理能力,它提供了一個標準的方式來載入、轉換和傳遞資料給 Ray 應用程式的不同部分。它支援廣泛的資料格式,與庫整合無縫合作,並且可以在 Ray 任務和演員之間傳遞而不需要複製資料。Ray 資料集的優點包括彈性和機器學習工作負載的效能。它可以用於建立複雜的資料密集型應用程式,並且可以與其他 Ray 庫整合使用。
建立資料集和基本操作
Ray Dataset是一種強大的資料處理工具,允許使用者建立、轉換和分析大型資料集。以下是建立一個範圍為0到10000的整數資料集的示例:
import ray
# 建立一個範圍為0到10000的整數資料集
ds = ray.data.range(10000)
這個資料集包含10000個整數,從0到9999。接下來,我們可以對這個資料集進行一些基本操作,例如顯示資料集的大小、取出一些樣本和列印資料集的結構:
# 顯示資料集的大小
print(ds.count()) # -> 10000
# 取出一些樣本
print(ds.take(5)) # -> [0, 1, 2, 3, 4]
# 列印資料集的結構
print(ds.schema()) # -> <class 'int'>
讀寫資料集
Ray Dataset支援多種資料格式,包括CSV、JSON和Parquet。以下是將資料集寫入本地檔案和讀取回資料集的示例:
# 將資料集寫入本地檔案
ds.write_csv("local_dir")
# 讀取資料集
ds = ray.data.read_csv("local_dir")
Ray Dataset也支援自定義資料來源,可以用於寫入任何不支援的外部資料儲存系統。
內建轉換
Ray Dataset提供多種內建轉換,可以用於資料集的轉換和分析。以下是三個基本操作的示例:
# 建立兩個範圍為0到10000的整數資料集
ds1 = ray.data.range(10000)
ds2 = ray.data.range(10000)
# 將兩個資料集合併
ds3 = ds1.union(ds2)
# 篩選合併的資料集,僅保留偶數元素
ds3 = ds3.filter(lambda x: x % 2 == 0)
print(ds3.take(5)) # -> [0, 2, 4, 6, 8]
# 對篩選的資料集進行排序
ds3 = ds3.sort()
這些內建轉換可以用於資料集的轉換、篩選和排序等操作。
圖表翻譯:
graph LR A[建立資料集] --> B[基本操作] B --> C[讀寫資料集] C --> D[內建轉換] D --> E[篩選和排序]
這個圖表展示了Ray Dataset的基本操作流程,從建立資料集到基本操作、讀寫資料集和內建轉換等。
資料處理與 Ray
Ray 是一個高效能的分散式計算框架,提供了強大的資料處理能力。Ray Datasets 是 Ray 中的一個重要模組,提供了高效的資料處理和分析功能。在本文中,我們將探討 Ray Datasets 的基本概念和功能。
資料集(Datasets)
Ray Datasets 是一個用於儲存和處理大規模資料的模組。它提供了高效的資料處理和分析功能,包括過濾、排序、合併等。Ray Datasets 支援多種資料格式,包括 Arrow 的 columnar 格式、Python 字典、DataFrames 和序列化的 Parquet 檔案。
建立資料集
建立資料集可以透過多種方式,包括從 Python 列表、DataFrames 和 Parquet 檔案中建立。以下是建立資料集的示例:
import ray
# 建立一個資料集從 Python 列表
ds = ray.data.from_items([{"id": "abc", "value": 1}, {"id": "def", "value": 2}])
# 建立一個資料集從 DataFrames
pandas_df = pd.DataFrame({"id": ["abc", "def"], "value": [1, 2]})
ds = ray.data.from_pandas(pandas_df)
資料集操作
Ray Datasets 提供了多種資料集操作,包括過濾、排序、合併等。以下是資料集操作的示例:
# 過濾資料集
ds = ds.filter(lambda x: x["value"] > 1)
# 排序資料集
ds = ds.sort(lambda x: x["value"])
# 合併資料集
ds1 = ray.data.from_items([{"id": "abc", "value": 1}, {"id": "def", "value": 2}])
ds2 = ray.data.from_items([{"id": "ghi", "value": 3}, {"id": "jkl", "value": 4}])
ds = ds1.union(ds2)
資料集分割槽(Blocks)
Ray Datasets 的資料集分割槽是指資料集被分割成多個小塊(blocks)的過程。這些小塊可以被平行處理,從而提高資料處理的效率。以下是資料集分割槽的示例:
# 建立一個資料集
ds = ray.data.range(10000)
# 檢視資料集的分割槽數
print(ds.num_blocks()) # -> 200
# 將資料集重新分割槽為 100 個小塊
ds = ds.repartition(100)
# 檢視資料集的分割槽數
print(ds.num_blocks()) # -> 100
結構和資料格式
Ray Datasets 支援多種資料格式,包括 Arrow 的 columnar 格式、Python 字典、DataFrames 和序列化的 Parquet 檔案。以下是結構和資料格式的示例:
# 建立一個資料集從 Python 列表
ds = ray.data.from_items([{"id": "abc", "value": 1}, {"id": "def", "value": 2}])
# 檢視資料集的結構
print(ds.schema()) # -> id: string, value: int64
# 將資料集轉換為 Pandas DataFrame
pandas_df = ds.to_pandas()
計算和轉換
Ray Datasets 提供了多種計算和轉換功能,包括 .map()
、.filter()
、.sort()
等。以下是計算和轉換的示例:
# 建立一個資料集
ds = ray.data.range(10000)
# 將資料集的每個元素平方
ds = ds.map(lambda x: x ** 2)
內容解密:
在上述示例中,我們建立了一個 Ray Datasets 的資料集,並進行了多種操作,包括過濾、排序、合併等。同時,我們也展示瞭如何建立資料集、檢視資料集的結構和分割槽數、將資料集轉換為 Pandas DataFrame 等。
圖表翻譯:
以下是 Ray Datasets 的架構圖:
graph LR A[Ray Datasets] --> B[資料集] B --> C[結構] C --> D[資料格式] D --> E[計算和轉換] E --> F[輸出]
在這個圖表中,Ray Datasets 是一個資料處理框架,資料集是 Ray Datasets 的基本單位。結構和資料格式是資料集的重要屬性,計算和轉換是 Ray Datasets 的核心功能。輸出是 Ray Datasets 的最終結果。
資料處理與 Ray
Ray 是一個強大的資料處理框架,允許使用者高效地處理大規模的資料集。其中,Ray Datasets 是一個重要的模組,提供了高效的資料處理和分析功能。
資料集的建立和操作
Ray Datasets 支援多種資料格式,包括 Parquet、CSV 和 JSON 等。使用者可以使用 ray.data
模組建立和操作資料集。例如,以下程式碼建立了一個範圍為 0 到 10000 的整數資料集,並對其進行對映操作:
import numpy as np
ds = ray.data.range(10000).map_batches(lambda batch: np.square(batch).tolist())
這個範例使用 map_batches
方法對資料集進行批次操作,使用 NumPy 的 square
函式對每個批次的資料進行平方運算。
向量化計算
Ray Datasets 支援向量化計算,允許使用者使用高效的演算法和實作對資料進行操作。向量化計算尤其適合於 GPU 上的深度學習訓練和推理。例如,以下程式碼使用 map_batches
方法對資料集進行批次操作,使用 NumPy 的 square
函式對每個批次的資料進行平方運算:
ds = ray.data.range(10000).map_batches(lambda batch: np.square(batch).tolist())
Ray Actors
Ray Actors 是長期存在的物件,可以持有狀態,與無狀態的 Ray Tasks 不同。Ray Actors 可以用於快取昂貴的操作成本,例如載入模型到 GPU 上。例如,以下程式碼定義了一個 MLModel
類別,使用 load_model
函式載入模型到 GPU 上,並使用 map_batches
方法對資料集進行批次操作:
class MLModel:
def __init__(self):
self._model = load_model()
def __call__(self, batch):
return self._model(batch)
ds.map_batches(MLModel, compute="actors")
資料集管道
Ray Datasets 的操作是阻塞的,意味著它們會同步地從開始到結束,並且只有一個操作在進行。這種模式可能對某些工作負載來說效率不高。例如,以下程式碼對 Parquet 資料進行了一系列的轉換操作:
ds = (ray.data.read_parquet("s3://my_bucket/input_data")
.map(cpu_intensive_preprocessing)
.map_batches(gpu_intensive_inference, compute="actors", num_gpus=1)
.repartition(10))
ds.write_parquet("s3://my_bucket/output_predictions")
這個範例對資料集進行了五個階段的操作,每個階段都會對系統的不同部分施加壓力。
圖表翻譯:
graph LR A[資料集] --> B[對映操作] B --> C[批次操作] C --> D[向量化計算] D --> E[Ray Actors] E --> F[資料集管道] F --> G[輸出]
這個圖表展示了 Ray Datasets 的操作流程,從資料集的建立到輸出的過程。
大規模資料處理與 Ray
在大規模資料處理中,資料的讀取、預處理、模型推論和結果寫回等步驟都需要大量的計算資源和網路頻寬。Ray是一個能夠幫助我們管理和最佳化這些步驟的框架。
資料處理流程
資料處理流程通常包括以下幾個步驟:
- 資料讀取:從遠端儲存中讀取資料,需要網路頻寬和計算資源。
- 預處理:對資料進行預處理,例如資料清理、轉換等,需要CPU資源。
- 模型推論:使用模型對資料進行推論,需要GPU資源。
- 結果寫回:將結果寫回遠端儲存,需要網路頻寬和計算資源。
Ray 的優點
Ray能夠幫助我們最佳化資料處理流程,提高效率和降低成本。Ray的優點包括:
- 平行處理:Ray能夠平行處理多個任務,提高效率和降低成本。
- 資源管理:Ray能夠管理計算資源和網路頻寬,確保資源的合理使用。
- 資料共享:Ray能夠共享資料,減少資料的複製和傳輸。
Ray Dataset
Ray Dataset是一個能夠幫助我們管理和最佳化資料處理流程的工具。Ray Dataset能夠:
- 讀取資料:從遠端儲存中讀取資料。
- 預處理資料:對資料進行預處理。
- 模型推論:使用模型對資料進行推論。
- 結果寫回:將結果寫回遠端儲存。
Ray Dataset 的優點
Ray Dataset的優點包括:
- 簡單易用:Ray Dataset簡單易用,能夠快速地管理和最佳化資料處理流程。
- 高效能:Ray Dataset能夠平行處理多個任務,提高效率和降低成本。
- 資源管理:Ray Dataset能夠管理計算資源和網路頻寬,確保資源的合理使用。
分散式機器學習工作流程
在這個例子中,我們將實作一個分散式機器學習工作流程,使用 Ray 進行資料處理和模型訓練。首先,我們需要定義一個 TrainingWorker
類別,負責訓練模型和評估模型的表現。
TrainingWorker 類別
import ray
@ray.remote
class TrainingWorker:
def __init__(self, alpha):
self.alpha = alpha
self.model = SGDClassifier(alpha=alpha)
def train(self, data):
X, y = data
self.model.fit(X, y)
def test(self, X, y):
return self.model.score(X, y)
資料準備和分散式訓練
接下來,我們需要準備訓練和驗證資料,並將訓練資料轉換為 Ray Dataset。
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from ray.data import from_items
# 生成訓練和驗證資料
X_train, X_test, Y_train, Y_test = train_test_split(*make_classification())
# 將訓練資料轉換為 Ray Dataset
train_ds = from_items(list(zip(X_train, Y_train)))
# 定義 DatasetPipeline
shards = (train_ds.repeat(10)
.random_shuffle_each_window()
.split(len(workers), locality_hints=workers))
分散式模型訓練和評估
現在,我們可以使用 TrainingWorker
類別和 DatasetPipeline 進行分散式模型訓練和評估。
# 定義 alpha 值
ALPHA_VALS = [0.00008, 0.00009, 0.0001, 0.00011, 0.00012]
# 建立 TrainingWorker 例項
workers = [TrainingWorker.remote(alpha) for alpha in ALPHA_VALS]
# 進行分散式模型訓練
ray.get([worker.train.remote(shard) for worker, shard in zip(workers, shards)])
# 進行模型評估
results = ray.get([worker.test.remote(X_test, Y_test) for worker in workers])
print(results)
結果和討論
在這個例子中,我們實作了一個分散式機器學習工作流程,使用 Ray 進行資料處理和模型訓練。結果顯示,分散式模型訓練和評估可以有效地提高模型的表現和效率。
圖表翻譯:
flowchart TD A[資料準備] --> B[分散式模型訓練] B --> C[模型評估] C --> D[結果和討論]
在這個圖表中,我們可以看到分散式機器學習工作流程的主要步驟,包括資料準備、分散式模型訓練、模型評估和結果和討論。這個圖表可以幫助我們更好地理解分散式機器學習工作流程的邏輯和步驟。
外部庫整合
Ray Datasets 支援許多常見的資料處理功能,但它並不是完整的資料處理系統。相反地,它更注重於執行「最後一哩路」的處理,例如基本的資料載入、清理和特徵化,以便進行機器學習訓練或推論。
Ray 支援多個外部庫整合,包括:
- Dask on Ray
- RayDP (Spark on Ray)
- Modin (Pandas on Ray)
- MARS on Ray
這些庫是獨立的資料處理庫,與 Ray Core 整合,提供更豐富的資料處理功能。這些整合使得 Ray Datasets 能夠與其他資料處理庫合作,實作端到端的資料處理。
Dask on Ray
Dask 是一個 Python 庫,專門用於平行計算,特別是針對分析和科學計算工作負載的擴充套件。Dask DataFrames 是 Dask 中的一個重要功能,提供了一個子集的 Pandas DataFrame API,可以擴充套件到叢集中。
要使用 Dask on Ray,需要安裝 Ray 和 Dask:
pip install "ray[data]==2.2.0" "dask==2022.2.0"
然後,需要連線到 Ray Cluster 或啟動 Ray 並啟用 Ray 排程器後端:
import ray
from ray.util.dask import enable_dask_on_ray
ray.init() # 啟動或連線到 Ray
enable_dask_on_ray() # 啟用 Ray 排程器後端
現在,可以執行 Dask DataFrames 程式碼,並將其擴充套件到 Ray Cluster。例如,可以使用標準的 DataFrame 運算,如過濾和分組:
import dask
df = dask.datasets.timeseries()
df = df[df.y > 0].groupby("name").x.std()
Dask on Ray 還可以與 Ray Datasets 整合,將 Ray Dataset 轉換為 Dask DataFrame 或反之亦然:
import ray
ds = ray.data.range(10000)
df = ds.to_dask()
print(df.std().compute()) # -> 2886.89568
這樣,Ray Datasets 和 Dask on Ray 可以共同實作端到端的資料處理。
分散式模型訓練基礎
分散式模型訓練是機器學習中的一個重要領域,尤其是在處理大規模資料和複雜模型時。Ray Train是一個專門為分散式模型訓練設計的庫,它提供了一個簡單且高效的方式來進行分散式模型訓練。
從技術架構視角來看,Ray提供了一個高效且易用的平臺,用於大規模資料處理和分散式機器學習。透過Ray Dataset和DatasetPipeline,開發者可以輕鬆地進行資料的載入、轉換、預處理以及模型訓練。整合Dask、Spark等外部庫更進一步豐富了Ray的資料處理能力,使其能夠處理更複雜的資料處理工作流程。然而,Ray的效能瓶頸仍需關注,例如在資料集分割槽和跨節點通訊方面,仍有最佳化空間。對於重視效能的應用,需仔細評估資料集大小、分割槽策略和網路頻寬等因素。展望未來,Ray有望在更廣泛的機器學習場景中扮演關鍵角色,尤其是在大規模資料處理、分散式模型訓練和模型服務方面。隨著Ray生態系統的持續發展,我們預見其應用門檻將大幅降低,更多開發者將受益於其強大的分散式計算能力。對於追求高效能和可擴充套件性的機器學習應用,Ray是一個值得深入研究和應用的技術方案。