Ray 提供了比 Python 內建 multiprocessing 模組更強大的分散式計算能力,尤其在處理跨機器、容錯、大型引數傳遞和流程間通訊等方面更具優勢。遠端函式即使在單機上也能有效運用,Ray 會自動將函式呼叫對映到合適的流程,並在後台處理分散式呼叫。建立 Ray 遠端函式時,呼叫後會立即傳回一個 ObjectRef,作為指向遠端物件的參考。ray.get 可取得物件值,但它是阻塞方法,需等待任務完成。遠端物件可能位於其他節點,而 ObjectRefs 就像指向這些物件的指標。使用 ray.put 可明確建立 ObjectRefs。將迭代器轉換為列表後再傳遞給 ray.get 是必要的,尤其在使用 ray.map 進行平行處理時,能有效提高效率。需要注意的是,短於幾秒的小型任務不適合使用遠端呼叫,因為每次呼叫都會產生額外開銷,反而降低效率。

遠端函式:快速入門

在現代應用程式開發中,分散式或平行計算往往是不可或缺的。許多 Python 開發者通常是透過 multiprocessing 模組初次接觸平行計算。然而,multiprocessing 在處理現代應用所需的需求方面有其侷限性。這些需求包括:

  • 在多個核心或機器上執行相同的程式碼
  • 使用工具來處理機器和處理器故障
  • 高效處理大型引數
  • 簡單地在不同的流程之間傳遞資訊

multiprocessing 不同,Ray 的遠端函式能夠滿足這些需求。需要注意的是,雖然遠端函式的名稱暗示它們必須在不同的電腦上執行,但實際上它們可以在同一台機器上執行。Ray 提供的是將函式呼叫對映到合適的流程,並且在後台進行分散式呼叫。

遠端函式的基本概念

在本章節中,我們將瞭解如何建立遠端函式、等待它們完成並取得結果。當你掌握了基本概念之後,我們將探討如何組合遠端函式以建立更複雜的操作。

首先,讓我們來看看一些之前章節中提到但未探討的細節。

建立基本的 Ray 遠端函式

在例子 2-7 中,我們學到了如何建立一個基本的 Ray 遠端函式。當你呼叫一個遠端函式時,Ray 會立即傳回一個 ObjectRef(未來值),這是一個指向遠端物件的參考。Ray 在背景中建立並執行一個任務,然後在完成後將結果寫入原始參考中。你可以使用 ray.get 來取得物件的值。

需要注意的是,ray.get 是一個阻塞方法,它會等待任務執行完成後才傳回結果。

Ray 中的遠端物件

遠端物件只是物件而已,可能位於其他節點上。ObjectRefs 就像指向物件的指標或 ID,你可以用它來取得或查詢遠端函式的狀態或值。除了從遠端函式呼叫中建立外,你也可以使用 ray.put 函式明確地建立 ObjectRefs

Ray 中的 ObjectRefs

在例子 2-7 中有一些細節值得深入理解。該例子將迭代器轉換為列表後再傳遞給 ray.get。當 ray.get 接受一個未來值列表或單個未來值時,你需要這樣做。

def process_data(data):
    results = ray.map(process_item, data)
    return ray.get(results)

這段程式碼使用了 ray.map 函式來將每個專案分配給獨立的工作流程處理。這樣做可以提高平行效能並減少等待時間。

超過一秒鐘以上才會被視為不值得排程執行

如果你在每次遠端呼叫中執行大量工作,則很有可能會導致效能瓶頸。例如,使用 ray.remote 遞迴計算階乘會比在本地計算慢得多,即使整體工作量很大也是如此。具體所需時間取決於你叢集中的繁忙程度,但一般規則是任何不使用特殊資源且在幾秒鐘內完成的操作都不值得進行排程。

錯誤教訓

曾經試圖使用 ray.remote 進行小型工作量操作時發現效率不高,因為每次呼叫都會產生額外開銷。從此以後我就避免使用它來處理小型任務,專注於大型批次處理或需要高度平行性的工作。

實際案例

案例:批次資料處理

假設我們有一批需要進行清洗和分析的資料集:

import ray
import numpy as np

ray.init()

@ray.remote
def process_item(item):
    # 假設這裡有一些複雜的計算
    return item * 2

data = np.random.randint(0, 100, size=(1000,))

results = [process_item.remote(item) for item in data]
final_results = ray.get(results)

print(final_results)

此圖示展示瞭如何使用 Ray 的遠端函式來加速資料處理任務。

  graph TD;
    A[建立資料集] --> B[生成Remote呼叫];
    B --> C[等待任務完成];
    C --> D[取得結果];

內容解密:

  1. 建立資料集:首先我們生成了一組隨機整數。
  2. 生成Remote呼叫:對每個專案生成一個遠端呼叫。
  3. 等待任務完成:使用 Ray 的機制等待所有任務完成。
  4. 取得結果:最後從所有未來值中取得結果。

透過這種方式,我們可以高效地利用多核心或多機器來處理大型資料集。

總結來說,Ray 的遠端函式提供了一種強大且靈活的方法來進行分散式和平行計算。透過合理設計和實踐經驗分享,我們可以更好地利用 Ray 提供的功能來提升應用程式的效能和效率。

遠端函式生命週期與最佳化

遠端函式的工作流程

在Ray的架構中,遠端函式的生命週期從任務提交開始,直到結果傳回並解析為止。提交任務的過程中,Ray會確保所有依賴項(ObjectRef物件)都可用。這些依賴項可以是本地的,也可以是分佈在叢集中的。當所有依賴項都準備就緒時,Ray會向分散式排程器請求資源來執行任務。一旦資源可用,排程器會指定一個工作節點來執行該任務。

在任務執行過程中,工作節點會將結果儲存在本地分享記憶體中。如果傳回值較小(預設小於100 KiB),工作節點會直接將值傳回給提交者(owner),並將其儲存在本地物件儲存中。如果傳回值較大,工作節點會將物件儲存在分享記憶體中,並通知提交者這些物件已在分散式記憶體中。這樣可以讓提交者無需將物件取回到本地節點即可參照它們。

任務錯誤處理

在Ray中,任務可能會因應用層錯誤或系統層錯誤而失敗。應用層錯誤是指工作者程式還在執行,但任務以錯誤結束(例如Python中的IndexError)。系統層錯誤則是指工作者程式意外終止(例如段錯或Raylet的當機)。

應用層錯誤的任務不會重試,異常會被捕捉並儲存為任務的傳回值。而系統層錯誤的任務則可能會自動重試,具體重試次數可以透過設定來控制。

@ray.remote
def remote_task(x):
    time.sleep(x)
    return x

內容解密:

這段程式碼定義了一個遠端函式 remote_task,該函式接受一個引數 x,並根據這個引數進行睡眠(模擬耗時操作),然後傳回 x。這個函式的設計目的是展示如何處理不同執行時間的遠端任務。

使用 ray.wait 最佳化效能

當多個遠端任務有不同的執行時間時,直接使用 ray.get 來取得結果可能會導致資源浪費。因為 ray.get 會等待所有任務完成後才傳回結果。相比之下,ray.wait 可以在任務完成後立即傳回已完成的任務結果,從而提高效率。

# Process in order
def in_order():
    # Make the futures
    futures = list(map(lambda x: remote_task.remote(x), things))
    values = ray.get(futures)
    for v in values:
        print(f" Completed {v}")
        time.sleep(1) # Business logic goes here

內容解密:

這段程式碼展示瞭如何使用 ray.get 來按順序處理遠端任務的結果。它首先建立了一組遠端任務,然後使用 ray.get 等待所有任務完成後取得結果。

# Process as results become available
def as_available():
    # Make the futures
    futures = list(map(lambda x: remote_task.remote(x), things))
    while len(futures) > 0:
        ready_futures, rest_futures = ray.wait(futures)
        print(f"Ready {len(ready_futures)} rest {len(rest_futures)}")
        for id in ready_futures:
            print(f'completed value {id}, result {ray.get(id)}')
            time.sleep(1) # Business logic goes here
        futures = rest_futures

內容解密:

這段程式碼展示瞭如何使用 ray.wait 來處理遠端任務的結果。它首先建立了一組遠端任務,然後使用 ray.wait 等待部分或全部任務完成後取得結果。這樣可以避免因等待所有任務完成而導致的資源浪費。

ray.wait 的選項

ray.wait 提供了幾個可選引數來控制其行為:

  • num_returns:指定要等待完成的 ObjectRef 數量。
  • timeout:指定等待的最大時間。
  • fetch_local:如果僅需確保未來已完成而不需要取得結果時,可設定為 false。

使用這些引數可以根據具體需求最佳化 ray.wait 的效能。

效能比較

透過實際測試發現,使用 ray.wait 比直接使用 ray.get 效能更好。特別是在非平行化業務邏輯比較複雜的情況下,效能差異更加顯著。因此,建議在實際應用中使用 ray.wait 來最佳化效能。

  graph TD;
    A[Task Submitted] --> B[Wait for Dependencies];
    B --> C{Dependencies Ready?};
    C -- Yes --> D[Request Resources];
    C -- No --> B;
    D --> E{Resources Available?};
    E -- Yes --> F[Send Task Specification];
    E -- No --> D;
    F --> G[Execute Task];
    G --> H{Return Values Small?};
    H -- Yes --> I[Return Values Inline];
    H -- No --> J[Store Objects in Shared Memory];
    J --> K[Notify Owner];

此圖示展示了Ray遠端函式生命週期的主要流程:

  1. Task Submitted:提交遠端函式。
  2. Wait for Dependencies:等待所有依賴項準備就緒。
  3. Dependencies Ready?:檢查依賴項是否都可用。
  4. Request Resources:向排程器請求執行資源。
  5. Resources Available?:檢查資源是否可用。
  6. Send Task Specification:將任務規範傳送到工作節點。
  7. Execute Task:工作節點執行任務。
  8. Return Values Small?:檢查傳回值大小。
  9. Return Values Inline:如果傳回值較小,直接傳回給提交者。
  10. Store Objects in Shared Memory:如果傳回值較大,儲存在分享記憶體中並通知提交者。

使用 Ray 組合遠端函式的技術深度分析

在分散式計算環境中,Ray 提供了強大的工具來處理遠端函式(Remote Functions)。這些工具不僅能夠提升計算效率,還能夠實作複雜的任務排程與錯誤處理。本文將探討 Ray 的 getwait 函式的超時處理機制,以及如何透過組合遠端函式來提升系統效能。

Ray 的超時處理機制

Ray 的 getwait 函式在處理超時時有所不同。當 ray.wait 函式遇到超時時,並不會丟擲異常,而是簡單地傳回少於期望的已完成的未來任務(futures)。然而,當 ray.get 函式遇到超時時,Ray 會丟擲 GetTimeoutError。這意味著在超時後,遠端函式並不會立即終止,它們仍會在專用的程式中繼續執行。如果需要釋放資源,則需要手動終止未來任務。

取消未完成的任務

當某個任務在合理時間內無法完成時(例如出現「拖延者」straggler),可以使用 ray.cancel 函式來取消這些任務。以下是一個具體的範例:

import ray
import threading

@ray.remote
def remote_task(x):
    # 模擬一個長時間執行的任務
    return x * x

futures = list(map(lambda x: remote_task.remote(x), [1, threading.TIMEOUT_MAX]))

# 當還有未完成的任務時
while len(futures) > 0:
    # 在實際應用中,10秒可能過短
    ready_futures, rest_futures = ray.wait(futures, timeout=10, num_returns=1)

    # 如果傳回的已完成任務少於期望值
    if len(ready_futures) < 1:
        print(f"超時:{rest_futures}")
        # 取消未完成的任務
        ray.cancel(*rest_futures)
        # 超過超時時間則中止迴圈
        break

    for id in ready_futures:
        print(f'已完成任務 ID {id},結果 {ray.get(id)}')

    futures = rest_futures

內容解密:

  • 程式碼邏輯:這段程式碼定義了一個遠端函式 remote_task,並且模擬了一個長時間執行的任務。接著,它使用 map 函式生成多個未來任務。
  • 超時處理:在迴圈中,使用 ray.wait 函式等待任務完成,設定超時時間為 10秒。如果在指定時間內沒有任何任務完成,則列印超時資訊並取消未完成的任務。
  • 資源釋放:透過取消未完成的任務,可以釋放系統資源,避免長時間佔用資源。
  • 錯誤處理:當發現頻繁取消任務時,應該調查根本原因,確保系統穩定執行。

處理多值傳回

雖然之前的範例只傳回單一值,但 Ray 的遠端函式實際上可以傳回多個值,就像普通的 Python 函式一樣。這在需要處理多個結果的場景中非常有用。

分散式計算中的錯誤容忍

在分散式環境中,錯誤容忍是一個重要考量。如果執行任務的工作節點意外當機(無論是程式當機還是機器故障),Ray 會自動重新執行該任務(在延遲後),直到該任務成功或達到最大重試次數。更多關於錯誤容忍的討論將在第五章進行詳細介紹。

組合遠端 Ray 函式

Ray 提供了兩種主要的組合方式來提升遠端函式的功能:管道化(pipelining)和巢狀平行(nested parallelism)。

管道化

管道化是指使用前一個遠端函式傳回的 ObjectRef 作為後一個遠端函式的引數。這樣可以自動取得並傳遞 ObjectRef 中的實際物件,從而實作無阻塞、高效率地協調不同階段之間的計算。

import ray
import random
import time

@ray.remote
def generate_number(s: int, limit: int, sl: float) -> int:
    random.seed(s)
    time.sleep(sl)
    return random.randint(0, limit)

@ray.remote
def sum_values(v1: int, v2: int, v3: int) -> int:
    return v1 + v2 + v3

# 獲得結果
result = ray.get(sum_values.remote(
    generate_number.remote(1, 10, .1),
    generate_number.remote(5, 20, .2),
    generate_number.remote(7, 15, .3)
))
print(result)

內容解密:

  • 程式碼邏輯:這段程式碼定義了兩個遠端函式:generate_numbersum_values
  • 生成隨機數generate_number 函式生成隨機數並休眠一定時間。
  • 求和計算sum_values 函式將三個隨機數相加。
  • 管道化實作:透過將 generate_number 的傳回值作為 sum_values 的引數,實作了管道化操作。
  • 資料傳遞:Ray 自動取得並傳遞 ObjectRef 中的實際物件,避免了額外的資料傳輸開銷。

巢狀平行

巢狀平行是指在一個遠端函式內部啟動其他遠端函式。這種方式常用於實作遞迴演算法和結合超參調優與平行模型訓練等場景。

@ray.remote
def nested_parallel_task(param):
    # 模擬一些計算
    time.sleep(param)
    return param * param

@ray.remote
def parent_task(params):
    futures = [nested_parallel_task.remote(p) for p in params]
    results = ray.get(futures)
    return sum(results)

# 啟動父級任務
parent_future = parent_task.remote([1, 2, 3])
print(ray.get(parent_future))

內容解密:

  • 程式碼邏輯:這段程式碼定義了兩個遠端函式:nested_parallel_taskparent_task
  • 巢狀平行:在 parent_task 函式內部啟動多個 nested_parallel_task 遠端函式。
  • 資料聚合:透過 ray.get 函式取得所有子任務的結果並進行聚合計算。
  • 效率提升:巢狀平行可以顯著提升計算效率,特別是在需要多層次計算或遞迴操作的場景中。

錯誤示範與修正建議

以下是一個錯誤示範和修正建議:

@ray.remote
def generate_number(s: int, limit: int, sl: float) -> int:
    random.seed(s)
    time.sleep(sl)
    return random.randint(0, limit)

@ray.remote
def sum_values(values: list) -> int:
    return sum(values)

# 不正確的使用方式
try:
    result = ray.get(sum_values.remote([
        generate_number.remote(1, 10, .1),
        generate_number.remote(5, 20, .2),
        generate_number.remote(7, 15, .3)
    ]))
except Exception as e:
    print(e)

錯誤說明:

  • 問題所在:這段程式碼嘗試將包含 ObjectRef 的列表直接傳遞給遠端函式。Ray 不會自動解析列表中的 ObjectRef,
  • 錯誤表現:會導致型別錯誤 (TypeError) 或其他異常情況。

建議修正:

  • 直接傳遞 ObjectRef:應該直接傳遞 ObjectRef 作為引數,
  • 使用 ray.wait 和 ray.get:如果需要處理列表中的 ObjectRef,應該在遠端函式內部使用 ray.waitray.get 排程其執行。