Ray Workflows 作為 Ray 生態系統的一部分,提供了一種建構和管理複雜分散式工作流程的有效方法。它結合了 Ray 的分散式運算能力和持久化機制,讓開發者能夠輕鬆地定義、執行和監控長時間執行、需要高用性的任務。不同於傳統的工作流程引擎,Ray Workflows 能夠根據執行狀態動態調整依賴關係,並利用 Ray 的物件儲存功能實作步驟之間的資料分享和狀態管理。這使得 Ray Workflows 非常適合處理需要動態擴充套件、容錯和高吞吐量的應用場景,例如機器學習模型訓練、資料處理Pipeline和科學計算等。理解 Ray Workflows 的核心概念,例如步驟、物件、工作流程和虛擬演員,對於有效運用其功能至關重要。透過 Python API 和裝飾器,開發者可以簡潔地定義工作流程步驟,並利用 Ray 的分散式能力進行平行運算。此外,Ray Workflows 也提供了豐富的管理工具,用於監控工作流程狀態、處理錯誤和還原失敗的任務,確保工作流程的可靠性和穩定性。

Ray Workflows 的核心概念與應用

Ray Workflows 提供了永續性的工作流程執行功能,利用 Ray 的分散式函式庫,確保低延遲和動態依賴管理。Ray Workflows 提供了多種核心原始元素來構建工作流程,並且這些元素之間可以靈活地進行互動。

基本概念與核心原始元素

步驟(Steps)

步驟是使用 @workflow.step 裝飾器註解的函式。步驟在成功完成時只會執行一次,並在失敗時重試。步驟可以作為其他步驟未來的引數。為了確保可還原性,步驟不支援 ray.getray.wait 呼叫。

@workflow.step
def example_step():
    pass

物件(Objects)

物件是儲存在 Ray 物件儲存中的資料物件,這些物件的參照可以傳遞給步驟並從步驟中傳回。當物件最初從步驟傳回時,它們會被檢查點並可以透過 Ray 物件儲存與其他工作流程步驟分享。

@workflow.step
def create_object() -> ray.ObjectRef:
    return ray.put("some data")

工作流程(Workflows)

工作流程是使用 @Workflow.runWorkflow.run_async 建立的執行圖。工作流程執行開始後,會被記錄到儲存中以實作永續性,並在任何具有儲存存取許可權的 Ray 叢集上失敗時還原。

工作流程還可以是動態的,在執行時生成新的步驟和子工作流程。工作流程支援動態迴圈、巢狀和遞迴。

@Workflow.run
def dynamic_workflow():
    # 動態生成新的步驟
    return new_step()

虛擬演員(Virtual Actors)

虛擬演員類別似於普通的 Ray 演員,可以保持成員狀態。主要區別在於,虛擬演員由永續性儲存支援,而不是僅僅依賴程式記憶體,這樣可以在叢集重啟或工作者失敗後繼續存在。

虛擬演員管理長時間執行的業務工作流程,將其狀態儲存在外部儲存中以實作永續性。它們還支援從方法呼叫啟動子工作流程並接收外部觸發事件。

@workflow.virtual_actor
class Counter:
    def __init__(self):
        self.count = 0

    def increment(self):
        self.count += 1
        return self.count

基本範例

讓我們來看一個簡單的「Hello World」工作流程範例,這展示了步驟、物件和工作流程原始元素如何一起工作。

import ray
from ray import workflow
from typing import List

# 建立一個任意的 Ray 遠端函式
@ray.remote
def hello():
    return "hello"

# 定義一個將物件放入物件儲存的工作流程步驟
@workflow.step
def words() -> List[ray.ObjectRef]:
    return [hello.remote(), ray.put("world")]

# 定義一個接收物件的步驟
@workflow.step
def concat(words: List[ray.ObjectRef]) -> str:
    return " ".join([ray.get(w) for w in words])

# 建立工作流程
workflow.init("tmp/workflow_data")
output: "Workflow[int]" = concat.step(words.step())
# 執行工作流程
assert output.run(workflow_id="workflow_1") == "hello world"
assert workflow.get_status("workflow_1") == workflow.WorkflowStatus.SUCCESSFUL
assert workflow.get_output("workflow_1") == "hello world"

內容解密:

  • hello:這是一個簡單的遠端函式,傳回字串 “hello”。
  • words:這是一個使用 @workflow.step 裝飾器註解的函式,它傳回兩個物件參照:一個是遠端呼叫 hello 的結果,另一個是字串 “world” 的參照。
  • concat:這是另一個使用 @workflow.step 裝飾器註解的函式,它接受一個物件參照列表並傳回拼接後的字串。
  • concat.step(words.step()):這行程式碼將 words 步驟作為引數傳遞給 concat 步驟。
  • output.run(workflow_id="workflow_1"):這行程式碼執行工作流程並取得結果。
  • assert workflow.get_status("workflow_1") == workflow.WorkflowStatus.SUCCESSFULassert workflow.get_output("workflow_1") == "hello world":這些行程式碼檢查工作流程是否成功並取得預期結果。

動態工作流程

除了預定義的 DAG(有向無環圖)工作流程,Ray 還允許您根據當前工作流程執行狀態動態建立步驟。例如,您可以使用遞迴或更複雜的執行流程。以下是一個簡單遞迴計算階乘的範例。

from ray import workflow

@workflow.step
def factorial(n: int) -> int:
    if n == 1:
        return 1
    else:
        return mult.step(n, factorial.step(n - 1))

@workflow.step
def mult(a: int, b: int) -> int:
    return a * b

# 計算 5 的階乘並建立遞迴步驟
factorial_workflow = factorial.step(5).run()
assert factorial_workflow.run() == 120

內容解密:

  • factorial:這是一個遞迴函式,如果 n 是 1 則傳回 1,否則傳回 n 與 n-1 的階乘乘積。
  • mult:這是一個乘法函式,接受兩個整數引數並傳回其乘積。
  • factorial_step(5).run():這行程式碼執行階乘計算並傳回結果。
  • assert factorial_workflow.run() == 120:這行程式碼檢查計算結果是否為預期值。

虛擬演員應用

虛擬演員是 Ray 演員的一種變體,它們由永續性儲存支援而非記憶體。以下範例展示瞭如何使用虛擬演員實作計數器。

from ray import workflow

@workflow.virtual_actor
class Counter:
    def __init__(self):
        self.count = 0

    def increment(self):
        self.count += 1
        return self.count

workflow.init(storage="/tmp/workflows")
counter_instance = Counter.get_or_create("counter_workflw")
assert counter_instance.increment.run() == 1
assert counter_instance.increment.run() == 2

內容解密:

  • @workflow.virtual_actor:這裝飾器將類別定義為虛擬演員。
  • __init__:初始化方法設定計數器初始值為0。
  • increment:方法增加計數器值並傳回新值。
  • Counter.get_or_create("counter_workflw"):建立或取得現有計數器例項。
  • assert counter_instance.increment.run() == 1assert counter_instance.increment.run() == 2:檢查計數器值是否正確增加。

工作流程中的實際應用

在實際應用中,Ray Workflows 應該如何設計和管理呢?以下是一些常見步驟和最佳實踐:

首先,您需要定義每個單獨的工作流程步驟並使用 @workflow.step 裝飾器註解它們。每個步驟都可以接受多個輸入引數,每個輸入可以是具體值或未來值(即之前工作流程步驟完成後的結果)。

@workflow.step(num_gpus=1)
def train_model() -> Model:
    pass # 模型訓練邏輯在此處處理。
train_model.step().run()

內容解密:

  • @workflow.step(num_gpus=1):指定此步需要使用 GPU 資源進行模型訓練。
  • train_model():定義模型訓練邏輯。
  • train_model.step().run():執行模型訓練步驟。

動態迴圈與遞迴

Ray Workflows 支援動態迴圈和遞迴操作。例如您可以根據某些條件動態生成新的工作流程戶程式。

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title Ray Workflows 核心概念、實作與動態工作流程管理

package "Python 應用架構" {
    package "應用層" {
        component [主程式] as main
        component [模組/套件] as modules
        component [設定檔] as config
    }

    package "框架層" {
        component [Web 框架] as web
        component [ORM] as orm
        component [非同步處理] as async
    }

    package "資料層" {
        database [資料庫] as db
        component [快取] as cache
        component [檔案系統] as fs
    }
}

main --> modules : 匯入模組
main --> config : 載入設定
modules --> web : HTTP 處理
web --> orm : 資料操作
orm --> db : 持久化
web --> cache : 快取查詢
web --> async : 背景任務
async --> fs : 檔案處理

note right of web
  Flask / FastAPI / Django
end note

@enduml

此圖示展示了一個簡單的動態迴圈和遞迴過程:

內容解密:

此圖示展示了一個簡單條件判斷過程: A. 開始: 工作開始。 B. 條件判斷: 根據某些條件決定下一步驟是否生成新步驟。 C. 生成新步驟: 滿足條件時生成新的一系列計算任務. D.結束: 不滿足條件時, 工作結束. E.執行新步驟: 控制執行已建立任務.

持久化與高用性

Ray Workflows 的持久化特性使其非常適合長時間執行和需要高用性的任務。透過記錄到持久儲存中並支援在任何有儲存存取許可權的叢集上還原執行。

總結來說,Ray Workflows 提供了強大且靈活的工具來構建和管理分散式、持久化和動態調整依賴關係的工作流程。透過合理設計和利用其各種核心原始元素和功能,您可以構建出高效且可靠的分散式應用系統。

Ray Workflows 實作與管理

Ray Workflows 提供了一個靈活且強大的框架,讓使用者能夠輕鬆構建、管理及監控複雜的工作流程。在這篇文章中,玄貓將探討如何實作工作流程步驟、管理工作流程以及處理動態與條件化的工作流程。

工作流程步驟的實作

首先,讓我們來看看如何使用 Ray Workflows 來實作工作流程步驟。以下是一個簡單的例子,展示如何定義和執行一個基本的工作流程:

from ray import workflow
import random

@workflow.step
def sum(x: int, y: int, z: int) -> int:
    return x + y + z

@workflow.step
def get_value1() -> int:
    return 100

@workflow.step
def get_value2(x: int) -> int:
    return 10 * x

# 建立並執行工作流程
sum_workflow = sum.step(get_value1.step(), get_value2.step(10), 100)
assert sum_workflow.run("sum_example") == 300

內容解密:

  • sum 函式定義了一個簡單的加法步驟,接受三個整數引數並傳回其總和。
  • get_value1 函式傳回一個固定的整數 100。
  • get_value2 函式接受一個整數引數並傳回其十倍。
  • sum.workflow 請求分別呼叫 get_value1get_value2,並將其結果傳遞給 sum 函式。
  • run 方法啟動工作流程並等待完成。

工作流程步驟的命名與管理

在 Ray Workflows 中,步驟可以被明確命名以便於管理和除錯。以下是兩種命名步驟的方式:

  1. 使用 .options(name="step_name")

    sum_workflow = sum.step(get_value1.step().options(name="step1"), get_value2.step(10).options(name="step2"), 100)
    
  2. 使用 @workflows.step(name="step_name") 裝飾器

    @workflow.step(name="step1")
    def get_value1() -> int:
        return 100
    
    @workflow.step(name="step2")
    def get_value2(x: int) -> int:
        return 10 * x
    

工作流程的狀態管理

每個工作流程都有一個唯一的 workflow_id,可以在執行時明確設定或自動生成。工作流程可以處於以下幾種狀態:

  • Running:當前在叢集中執行。
  • Failed:因應用錯誤而失敗,可從失敗步驟還原。
  • Resumable:因系統錯誤而失敗,可從失敗步驟還原。
  • Canceled:已被取消,無法還原且結果不可用。
  • Successful:成功完成。

以下是一些常用的工作流程管理 API:

# 取得單個工作流程狀態
status = workflow.get_status(workflow_id="sum_example")

# 列出所有特定狀態的工作流程
workflows = workflow.list_all(workflow_state=["failed", "resumable"])

# 還原單個工作流程
workflow.resume(workflow_id="failed_workflow")

# 還原所有可還原的工作流程
workflow.resume_all()

# 取消單個工作流程
workflow.cancel(workflow_id="running_workflow")

# 刪除單個工作流程
workflow.delete(workflow_id="finished_workflow")

工作流程的儲存組態

Ray Workflows 預設將工作流程資料儲存在 /tmp/ray/workflow_data。可以透過設定環境變數或使用裝飾器來組態儲存位置:

# 使用環境變陣列態儲存位置
os.environ["RAY_WORKFLOW_STORAGE"] = "/path/to/storage"

# 使用裝飾器組態儲存位置
@workflow.init(storage="/path/to/storage")

支援本地檔案系統和 S3 相容後端。

動態工作品專案案例解析(動態建構)

動態工作品專案案例能夠根據當前狀況來建構步驟。以下是一個計算斐波那契數列的例子:

@workflow.step
def add(a: int, b: int) -> int:
    return a + b

@workflow.step
def fib(n: int) -> int:
    if n <= 1:
        return n
    return add.step(fib.step(n - 1), fib.step(n - 2))

assert fib.step(10).run() == 55

內容解密:

  • add 函式定義了一個簡單的加法步驟。
  • fib 函式遞迴計算斐波那契數列,並根據當前狀況動態生成步驟。

條件化工作品專案案例(書旅行預訂)

條件化工作品專案案例通常用於處理複雜的業務邏輯。以下是一個簡化的旅行預訂案例:

from typing import List

@workflow.step
def book_flight(origin: str, dest: str, date: str) -> str:
    # 模擬航班預訂邏輯
    return f"Flight booked from {origin} to {dest} on {date}"

@workflow.step
def book_hotel(dest: str, date: str) -> str:
    # 模擬酒店預訂邏輯
    return f"Hotel booked in {dest} on {date}"

@workflow.step
def finalize_or_cancel(flights: List[str], hotels: List[str]) -> str:
    # 模擬最終確認或取消邏輯
    return f"Finalized or canceled with flights: {flights} and hotels: {hotels}"

@workflow.step
def book_trip(origin: str, dest: str, dates: List[str]) -> str:
    f1 = book_flight.step(origin, dest, dates[0])
    f2 = book_flight.step(dest, origin, dates[1])
    hotel = book_hotel.step(dest, dates[0])
    return finalize_or_cancel.step([f1, f2], [hotel])

book_trip_result = book_trip.step("OAK", "SAN", ["6/12", "7/5"])
book_trip_result.run()

內容解密:

  • book_flightbook_hotel 函式分別模擬航班和酒店預訂邏輯。
  • finalize_or_cancel 函式模擬最終確認或取消邏輯。
  • book_trip 函式動態生成航班和酒店預訂步驟,並最終確認或取消。

問題處理機制

Ray Workflows 提供兩種錯誤處理機制:自動重試和捕捉異常。

@workflow.step(max_retries=5)
def random_failure() -> str:
    if random.random() > 0.95:
        raise RuntimeError("Found failure")
    return "Success"

@workflow.step(catch_exceptions=True)
def safe_random_failure() -> str:
    result, exception = random_failure()
    if exception:
        return f"Caught exception: {exception}"
    return result

safe_random_failure.run()

內容解密:

  • max_retries:設定最大重試次數,當步驟失敗時自動重試。
  • catch_exceptions:捕捉異常並傳回異常資訊。

其他重要考量

在實際應用中,還需要考慮以下幾點:

  • 依賴管理:確保所有依賴都已安裝並可用。
  • 資源組態:根據工作負載適當組態叢集資源。
  • 監控與日誌:使用監控工具和日誌記錄來追蹤工作流程狀態和效能。

這些考量將幫助你更有效地管理和最佳化 Ray Workflows。