Ray 提供了簡化的 API 和無伺服器擴充套件能力,適合處理中大型規模的分散式計算問題。對於 x86 架構的使用者,可以直接使用 pip 安裝;而 ARM 架構使用者則需要從原始碼編譯安裝,過程中需要注意一些依賴項的安裝和設定。Ray 的核心概念之一是遠端函式,它允許在不同機器上非同步執行任務。透過 @ray.remote 裝飾器,可以將普通 Python 函式轉換為遠端函式,並使用 ray.get 取得執行結果。這個機制有效簡化了分散式程式設計的複雜度,讓開發者更專注於業務邏輯。

除了基本的遠端函式,Ray 還支援巢狀任務和任務連結,這對於處理複雜的遞迴演算法或工作流程非常有用。開發者可以在一個任務內部啟動其他任務,而無需中央協調器的介入,提高了分散式計算的靈活性。此外,Ray 的 Datasets API 提供了處理結構化資料的便捷方式,根據 Apache Arrow 格式,可以與其他主流資料處理工具無縫銜接。利用 Datasets API,可以輕鬆進行資料轉換、分組、聚合等操作,簡化資料處理流程。最後,Ray 的演員模型提供了一種狀態管理機制,允許開發者建立具有內部狀態的物件,並透過訊息傳遞的方式與之互動,有效解決了分散式系統中狀態管理的難題。

Ray 入門:從本地環境開始

玄貓認為,Ray 是一個強大的工具,能夠有效管理從單一電腦到叢集的資源。雖然 Ray 可以處理一些小型問題,但最適合用來解決中型到大型規模的問題。它提供了一個統一的 API,簡化了開發與營運流程,並且具有無伺服器擴充套件能力。如果你的問題涵蓋了 Ray 支援的多個領域,或者你厭倦了管理自己的叢集所帶來的營運負擔,玄貓建議你嘗試使用 Ray。

安裝 Ray

本地環境安裝

安裝 Ray 可能會從相對簡單到相當複雜。Ray 會在 Python Package Index (PyPI) 上發布輪子(wheels),這些輪子目前僅提供 x86 使用者使用。ARM 使用者通常需要從原始碼編譯 Ray。

x86 與 M1 ARM 使用者安裝

大多數使用者可以使用 pip install -U ray 命令自動從 PyPI 安裝 Ray。當你需要在多台機器上分配計算任務時,最好使用 Conda 環境來比對 Python 版本和知曉套件依賴性。以下是使用 Conda 環境安裝 Ray 的命令範例:

conda create -n ray python=3.7 mamba -y
conda activate ray
pip install jinja2 python-dateutil cloudpickle packaging pygments psutil nbconvert ray
ARM 使用者從原始碼安裝

對於 ARM 使用者或沒有預編譯輪子的系統架構使用者,你需要從原始碼編譯 Ray。以下是 ARM Ubuntu 系統上的安裝步驟:

sudo apt-get install -y git tzdata bash libhdf5-dev curl pkg-config wget cmake build-essential zlib1g-dev zlib1g openssh-client gnupg unzip libunwind8 libunwind-dev openjdk-11-jdk git
sudo apt-get install -y libhdf5-100 || sudo apt-get install -y libhdf5-103
wget -q https://github.com/bazelbuild/bazelisk/releases/download/v1.10.1/bazelisk-linux-arm64 -O /tmp/bazel
chmod a+x /tmp/bazel
sudo mv /tmp/bazel /usr/bin/bazel
curl -fsSL https://deb.nodesource.com/setup_16.x | sudo bash -
sudo apt-get install -y nodejs

對於 M1 Mac 使用者,可以使用 Homebrew 和 pip 安裝必要的依賴項:

brew install bazelisk wget python@3.8 npm
export PATH=$(brew --prefix)/opt/python@3.8/bin/:$PATH
echo "export PATH=$(brew --prefix)/opt/python@3.8/bin/:$PATH" >> ~/.zshrc
echo "export PATH=$(brew --prefix)/opt/python@3.8/bin/:$PATH" >> ~/.bashrc
pip3 install --user psutil cython colorama

接著,克隆 Ray 倉函式庫並編譯必要的元件:

git clone https://github.com/ray-project/ray.git
cd ray
pushd python/ray/new_dashboard/client; npm install && npm ci && npm run build; popd
export USE_BAZEL_VERSION=4.2.1
cd python
rm -rf ./thirdparty_files  # Mac ARM 使用者必須清理錯誤的第三方檔案
pip install -e .

編譯過程中可能遇到的挑戰

編譯過程中最耗時的是編譯 C++ 程式碼,這可能需要數小時時間。如果你有一個包含多個 ARM 機器的叢集,建議一次編譯輪子並重複使用。

Hello World 與基礎 API

現在,讓我們來看看一些基礎的 Ray API。這些 API 在後續章節會有更詳細的介紹。

Ray Remote (Task/Futures) Hello World

Ray 的核心概念之一是遠端函式(Remote Functions),這些函式傳回未來物件(Futures)。這些函式可以在主程式或其他機器上執行。

import ray

# 初始化 Ray
ray.init()

@ray.remote
def hello_world():
    return "Hello, World!"

# 建立一個遠端任務並取得未來物件
future = hello_world.remote()

# 取得任務結果並列印預出來。
print(ray.get(future))

內容解密:

這段程式碼展示瞭如何在 Ray 中使用遠端函式。首先,我們需要初始化 Ray。然後,我們定義了一個遠端函式 hello_world,這個函式傳回一個簡單的字串 “Hello, World!"。

接著,我們呼叫了 hello_world.remote() 方法建立一個遠端任務,並取得未來物件 future。最後,我們使用 ray.get(future) 方法取得任務結果並列印預出來。

這段程式碼展示瞭如何在 Ray 中使用遠端函式來進行非同步計算。遠端函式可以在不同的機器上執行,這使得它們非常適合用於分散式計算。

  graph TD;
    A[初始化Ray] --> B[定義遠端函式];
    B --> C[建立遠端任務];
    C --> D[取得未來物件];
    D --> E[取得任務結果並列印];
此圖示說明:
  1. 初始化Ray:首先需要初始化 Ray。
  2. 定義遠端函式:定義一個遠端函式 hello_world
  3. 建立遠端任務:呼叫 hello_world.remote() 建立一個遠端任務。
  4. 取得未來物件:獲得未來物件 future
  5. 取得任務結果並列印:使用 ray.get(future) 取得任務結果並列印。

推薦與展望

玄貓認為,Ray 不僅能夠簡化開發和營運過程,還能提供無伺服器擴充套件能力。如果你有跨越多個領域的問題或厭倦了管理自己的叢集所帶來的營運負擔,玄貓強烈推薦你嘗試使用 Ray。在下一章節中,玄貓將指導你如何在本地環境中安裝和組態 Ray,並展示一些基本的例子來幫助你快速上手。

希望玄貓此文能幫助你更好地理解和使用 Ray。如果有任何問題或需要進一步的幫助,歡迎隨時聯絡玄貓。

使用Ray進行分散式計算

Ray是一個強大的分散式計算框架,能夠在單機或多機環境中高效地分配和執行任務。它的設計目的是簡化分散式計算的複雜性,使得開發者可以專注於應用邏輯而不必擔心底層的平行和通訊細節。以下是一些基本的概念和例項,幫助你理解如何使用Ray進行分散式計算。

瞭解遠端函式

Ray透過遠端函式來實作分散式計算。遠端函式可以在不同的程式或機器上執行,這使得它們能夠平行處理任務。首先,我們來看一個簡單的本地函式示例:

def hi():
    import os
    import socket
    return f"Running on {socket.gethostname()} in pid {os.getpid()}"

這個函式會傳回當前執行它的主機名稱和程式ID。接下來,我們將這個函式轉換為Ray的遠端函式:

import ray

@ray.remote
def remote_hi():
    import os
    import socket
    return f"Running on {socket.gethostname()} in pid {os.getpid()}"

future = remote_hi.remote()
result = ray.get(future)
print(result)

內容解密:

  • import ray:匯入Ray函式庫,以便使用其功能。
  • @ray.remote:這個裝飾器將普通函式轉換為遠端函式,使其可以在Ray的執行時環境中執行。
  • remote_hi.remote():呼叫遠端函式時,會傳回一個未來物件(future),而不是立即執行。
  • ray.get(future):這個函式會阻塞直到未來物件完成,然後傳回結果。

當你執行這些程式碼時,會發現第一個函式在同一個程式中執行,而第二個函式則由Ray排程到另一個程式中執行。

使用遠端函式進行平行處理

要理解如何使用遠端未來物件來加速計算,我們可以建立一個故意讓其執行較慢的函式,並在普通函式呼叫和Ray遠端呼叫之間進行比較。

import timeit
import ray

def slow_task(x):
    time.sleep(2)  # 模擬耗時操作
    return x

@ray.remote
def remote_task(x):
    return slow_task(x)

things = range(10)
very_slow_result = map(slow_task, things)
slowish_result = map(lambda x: remote_task.remote(x), things)

slow_time = timeit.timeit(lambda: list(very_slow_result), number=1)
fast_time = timeit.timeit(lambda: list(ray.get(list(slowish_result))), number=1)

print(f"In sequence {slow_time}, in parallel {fast_time}")

內容解密:

  • slow_task:這是一個模擬耗時操作的普通函式。
  • remote_task:將slow_task轉換為Ray的遠端函式。
  • things:建立一個範圍從0到9的列表。
  • very_slow_result:使用普通函式呼叫處理每個元素。
  • slowish_result:使用遠端函式呼叫處理每個元素。
  • timeit.timeit:測量兩種方法所需的時間。
  • ray.get(list(slowish_result)):取得所有未來物件的結果。

當你執行這段程式碼時,會發現使用Ray遠端函式可以顯著加速處理時間。儘管Python的多處理模組也能實作類別似功能,但Ray會自動處理所有細節並且能夠擴充套件到多台機器。

巢狀和連結任務

Ray在分散式處理領域中的一大特色是支援巢狀和連結任務。這意味著你可以在一個任務內部啟動更多工,從而簡化某些遞迴演算法的實作。

例如,以下是一個使用巢狀任務實作網頁爬蟲的例子:

import ray
import requests
from bs4 import BeautifulSoup

@ray.remote
def crawl(url, depth=0, maxdepth=1, maxlinks=4):
    links = []
    link_futures = []

    try:
        f = requests.get(url)
        links += [(url, f.text)]
        if depth > maxdepth:
            return links  # 基本情況

        soup = BeautifulSoup(f.text, 'html.parser')
        c = 0
        for link in soup.find_all('a'):
            try:
                c += 1
                link_futures += [crawl.remote(link["href"], depth=(depth + 1), maxdepth=maxdepth)]
                if c > maxlinks:
                    break
            except:
                pass

        for r in ray.get(link_futures):
            links += r

        return links

    except requests.exceptions.InvalidSchema:
        return []  # 跳過非網頁連結
    except requests.exceptions.MissingSchema:
        return []  # 跳過非網頁連結

result = ray.get(crawl.remote("http://holdenkarau.com/"))

內容解密:

  • @ray.remote:定義了一個可在分散環境中執行的爬蟲功能。
  • requests.get(url):請求目標網頁並取得其HTML內容。
  • BeautifulSoup:解析HTML內容並提取所有連結。
  • crawl.remote():遞迴地呼叫爬蟲功能以存取更多連結。

許多其他系統要求所有任務都由中央協調器啟動。即使是那些支援巢狀任務啟動的系統通常也依賴於中央排程器。然而,Ray允許在任務內部啟動更多工而不需要中央排程器。

資料集API

Ray提供了用於處理結構化資料的一些有限API,稱為Datasets API。Apache Arrow是支援這些API的核心技術。Arrow是一種列向、語言獨立格式,具有許多流行操作。許多知名工具如Spark、Dask和TensorFlow都支援Arrow格式,從而實作了資料之間的無縫轉移。

以下是如何構建一組網頁URL資料集:

urls = ray.data.from_items([
    "https://github.com/scalingpythonml/scalingpythonml",
    "https://github.com/ray-project/ray"
])

def fetch_page(url):
    response = requests.get(url)
    return response.text

pages = urls.map(fetch_page)

# 檢視第一個網頁以確保成功取得
first_page = pages.take(1)
print(first_page[0])

內容解密:

  • ray.data.from_items():建立一組包含URL的資料集。
  • fetch_page():定義了一個取得網頁內容的功能。
  • urls.map(fetch_page):將fetch_page功能應用到每個URL上以取得其HTML內容。

Ray還支援根據資料集的分組和聚合操作。例如,你可以計算每個網頁中的單詞頻率:

words = pages.flat_map(lambda x: x.split(" ")).map(lambda w: (w, 1))
grouped_words = words.groupby(lambda wc: wc[0])

內容解密:

  • flat_map(lambda x: x.split(" ")):將每個網頁內容拆分為單詞列表。
  • map(lambda w: (w, 1)):將每個單詞轉換為(單詞, 1)元組。
  • groupby(lambda wc: wc[0]):根據單詞對元組進行分組。

當你需要超出內建操作時,Ray也支援自定義聚合操作。具體細節會在第9章中介紹。

演員模型(Actors)

演員(Actors)是Ray的一大特色之一。演員提供了一種管理狀態執行的工具,這對於擴充套件系統來說是非常具有挑戰性的一部分。演員透過傳送和接收訊息來更新其狀態。這些訊息可以來自其他演員、程式或主執行執行緒。

每當你建立一個演員時,Ray都會啟動一個專用程式來執行該演員。每個演員都有一個信箱來存放待處理訊息。當你呼叫演員方法時,Ray會將訊息新增到該信箱中。這樣做可以序列化訊息處理過程並避免昂貴的分散式鎖定機制。

總結來說,Ray 是一種非常靈活且高效地進行分散式計算的一種方式。透過使用遠端函式、巢狀任務、資料集API以及演員模型,Ray 提供了強大且易於使用的工具來進行各種平行計算需求。