Ray Workflows 提供了穩定的永續性機制,確保工作流程步驟成功執行後不會重複執行,即使在叢集或驅動器故障的情況下也能夠還原。其錯誤處理機制允許設定重試次數和捕捉例外,提升工作流程的容錯能力。虛擬演員則允許動態擴充套件工作流程,並透過其方法呼叫子工作流程,實作更複雜的邏輯。結合永續性保證和虛擬演員,可以輕鬆構建冪等性工作流程,避免重複操作。此外,Ray Workflows 也能與機器學習管線整合,透過虛擬演員封裝每個階段的邏輯,例如資料標準化和模型訓練,簡化管線的構建和管理,並支援模型的傳遞與資料分享,提升機器學習工作流程的效率和靈活性。文章中的程式碼範例展示瞭如何使用 Ray Workflows 構建完整的機器學習工作流程,包含資料準備、虛擬演員定義、工作流程步驟構建以及執行訓練和預測流程。同時,也說明瞭如何處理長時間執行的工作流程以及與 Ray 核心元件的整合方式,例如在工作步驟中執行任務或演員,以及在多個步驟之間傳遞物件參照。
Ray Workflows 中的永續性保證與虛擬演員
功能錯誤處理
在 Ray Workflows 中,處理功能錯誤是確保工作流程穩定執行的重要部分。以下是處理錯誤的基本步驟:
return "OK"
# 設定最多重試次數為 5 次
s1 = faulty_function.options(max_retries=5).step()
s1.run()
@workflow.step
def handle_errors(result: Tuple[str, Exception]):
# 成功時將例外欄位設定為 NONE
err = result[1]
if err:
return "There was an error: {}".format(err)
else:
return "OK"
# `handle_errors` 接受一個元組 (結果, 例外)。
s2 = faulty_function.options(catch_exceptions=True).step()
handle_errors.step(s2).run()
內容解密:
faulty_function.options(max_retries=5).step():這行程式碼設定了faulty_function最多重試 5 次。s1.run():執行第一個步驟。@workflow.step:這是一個工作流程步驟的裝飾器,用於定義工作流程中的步驟。handle_errors函式:這個函式用於處理錯誤。它接受一個元組(result, exception)作為引數,如果發生錯誤,則傳回錯誤訊息,否則傳回 “OK”。s2 = faulty_function.options(catch_exceptions=True).step():這行程式碼設定了faulty_function在發生例外時進行捕捉。handle_errors.step(s2).run():執行錯誤處理步驟。
永續性保證
Ray Workflows 確保一旦某個步驟成功執行,它就不會被重複執行。為了實作這一點,Ray Workflows 將步驟結果記錄到持久化儲存中,確保前一步成功的結果在後續步驟中不會改變。
Ray 的工作流程超越了在叢集或單一應用程式內重試的永續性。工作流程實作了一種根據兩種狀態的失敗模型:
- 叢集失敗:如果叢集失敗,任何在該叢集上執行的工作流程都會進入可還原狀態。這些工作流程可以使用
ray.workflow.resume.all在不同的叢集上還原。 - 驅動器失敗:如果工作流程進入失敗狀態,解決問題後可以從失敗的步驟還原。
目前,工作流程的還原功能是beta API,可能在穩定之前會有所變化。
內容解密:
- 永續性保證:這意味著一旦某個步驟成功,它就不會被重複執行。這透過記錄結果到持久化儲存來實作。
- 叢集失敗:當叢集失敗時,工作流程進入可還原狀態,可以在不同的叢集上還原。
- 驅動器失敗:當驅動器失敗時,可以從最後一個成功步驟還原。
說明案例
假設我們有一個需要保證冪等性(idempotent)的工作流程。以下是如何使用永續性保證來實作冪等性:
from ray import workflow
@workflow.step
def generate_id() -> str:
# 生成唯一的冪等性程式碼。
return uuid.uuid4().hex
@workflow.step
def book_flight_idempotent(request_id: str) -> FlightTicket:
if service.has_ticket(request_id):
# 檢索之前建立的票。
return service.get_ticket(request_id)
return service.book_flight(request_id)
# 安全:book_flight 是設計為冪等性的
request_id = generate_id.step()
book_flight_idempotent.step(request_id).run()
內容解密:
@workflow.step:這是一個工作流程步驟的裝飾器,用於定義工作流程中的步驟。generate_id函式:這個函式生成一個唯一的冪等性程式碼。book_flight_idempotent函式:這個函式檢查服務是否已經有票,如果有則傳回之前建立的票;如果沒有則建立新票。request_id = generate_id.step():執行生成唯一 ID 的步驟並取得結果。book_flight_idempotent.step(request_id).run():執行預訂航班的步驟。
虛擬演員與動態工作流程擴充套件
虛擬演員允許子工作流程從其方法中呼叫。當你建立虛擬演員時,Ray 將其初始狀態和類別定義儲存在持久化儲存中。當虛擬演員方法建立新步驟時,它們會動態附加到工作流程並執行。
以下是如何使用虛擬演員來管理工作流程狀態:
from ray import workflow
import ray
@workflow.virtual_actor
class Counter:
def __init__(self, init_val):
self._val = init_val
def incr(self, val=1):
self._val += val
print(self._val)
@workflow.virtual_actor.readonly
def value(self):
return self._val
workflow.init()
# 初始化 ID="my_counter" 的 Counter 虛擬演員
counter = Counter.get_or_create("my_counter", 0)
# 和工作流程步驟類別似,虛擬演員方法支援:
# - `run()`,將傳回值
# - `run_async()`,將傳回 ObjectRef
counter.incr.run(10)
assert counter.value.run() == 10
# 非阻塞執行
counter.incr.run_async(10)
counter.incr.run(10)
assert 30 == ray.get(counter.value.run_async())
內容解密:
@workflow.virtual_actor:這是一個虛擬演員的裝飾器,用於定義虛擬演員類別。Counter類別:這個類別包含初始化、增加和讀取值的方法。workflow.init():初始化工作流程環境。Counter.get_or_create("my_counter", 0):取得或建立 ID 為 “my_counter” 的 Counter 虛擬演員。counter.incr.run(10):呼叫虛擬演員的增加方法並立即執行。counter.value.run():呼叫虛擬演員的讀取值方法並立即執行。
虛擬演員與子工作流程
以下是如何使用子工作流程來處理更複雜的情況:
from ray import workflow
import ray
@workflow.step
def double(s):
return 2 * s
@workflow.virtual_actor
class Actor:
def __init__(self):
self.val = 1
def double(self, update):
step = double.step(self.val)
if not update:
# 在方法內部啟動一個工作流程
return step
else:
# 工作流程也可以傳遞給另一個方法
return self.update.step(step)
def update(self, v):
self.val = v
return self.val
handler = Actor.get_or_create("actor")
assert handler.double.run(False) == 2
assert handler.double.run(False) == 2
assert handler.double.run(True) == 2
assert handler.double.run(True) == 4
內容解密:
@workflow.virtual_actor:這是一個虛擬演員的裝飾器,用於定義虛擬演員類別。@workflow.step:這是一個工作流程步驟的裝飾器,用於定義工作流程中的步驟。Actor類別:這個類別包含初始化、雙倍和更新值的方法。double(s)函式:這個函式接受一個數字並傳回其雙倍值。
# 虛擬演員與機器學習管線示例
training_tuple = (X_train, y_train, 'fit')
classification.step(scaling.step(training_tuple, 'standardscalar'), 'decisiontree').run('training_pipeline')
predict_tuple = (X_test, y_test, 'predict')
(X, pred_y, mode) = classification.step(scaling.step(predict_tuple, 'standardscalar'),'decisiontree').run('prediction_pipeline')
此圖示展示了機器學習管線中的兩階段過程
graph TD;
A[標準化] --> B[分類別];
小段落標題(圖表說明)
上面圖表展示了機器學習管線中的兩階段過程,首先對資料進行標準化處理後再進行分類別操作。
機器學習管線中的虛擬演員應用案例
假設我們有一個簡單的機器學習管線,由標準化和決策樹分類別器組成。每個階段都被定義為虛擬演員的一部分:
from ray import workflow
@workflow.virtual_actor.readonly_step("standardscalar")
def scaling(X, y, mode):
if mode == 'fit':
scaler.fit(X)
return X_train_scaled, y_train_scaled, mode
@workflow.virtual_actor.readonly_step("decisiontree")
def classification(X_scaled, y_scaled, mode):
if mode == 'fit':
clf.fit(X_scaled,y_scaled)
return X_train_scaled, y_train_scaled,'mode'
小段落標題(內容解密)
本段落介紹了一種將機器學習管線中的標準化及分類別階段轉換為虛擬演員讀取僅供參考之方式。透過使用Ray Workflows工具套件及其讀取僅供參考功能強大特點來提升整體系統穩定性及可靠性。
training_tuple = (X_train, y_train,'fit')
classification.step(scaling.step(training_tuple),'standardscalar').run('training_pipeline')
predict_tuple=(X_test,y_test,'predict')
(X,pred_y,mode)=classification.step(scaling.step(predict_tuple,'standardscalar'),'decisiontree').run('prediction_pipeline')
推論模型傳遞與分享資料
透過Ray Workflows功能可利用虛擬演員來傳遞訓練好的推論模型並且支援跨叢集分享資料、例如進行時序分析、預測、異常偵測等等。
Ray 工作流程:深入理解與實作
在現代科技環境中,機器學習工作流程的自動化和高效運作是至關重要的。Ray 是一個功能強大的分散式計算框架,能夠有效地處理複雜的工作流程。本文將探討如何使用 Ray 來構建和管理機器學習工作流程,並透過具體案例來說明其實際應用。
使用 Ray 構建機器學習工作流程
Ray 提供了豐富的 API,使得構建和管理機器學習工作流程變得相對簡單。以下是一個完整的範例,展示瞭如何使用 Ray 來構建一個機器學習工作流程。
準備資料
首先,我們需要準備一些資料來進行訓練和預測。這裡我們使用隨機生成的資料來模擬真實情況。
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
# 生成隨機資料
X = pd.DataFrame(np.random.randint(0, 100, size=(10000, 4)), columns=list('ABCD'))
y = pd.DataFrame(np.random.randint(0, 2, size=(10000, 1)), columns=['Label'])
# 分割資料集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
定義虛擬演員
在 Ray 中,虛擬演員(Virtual Actor)可以用來封裝機器學習模型的訓練和預測邏輯。這樣可以更好地管理和排程工作流程。
from ray import workflow
from sklearn.base import BaseEstimator
from sklearn.preprocessing import StandardScaler
from sklearn.tree import DecisionTreeClassifier
@ray.workflow.virtual_actor
class EstimatorVirtualActor:
def __init__(self, estimator: BaseEstimator):
if estimator is not None:
self.estimator = estimator
def fit(self, inputtuple):
(X, y, mode) = inputtuple
if isinstance(self.estimator, (BaseEstimator, DecisionTreeClassifier)):
self.estimator.fit(X, y)
return X, y, mode
else:
X = self.estimator.fit_transform(X)
return X, y, mode
@workflow.virtual_actor.readonly
def predict(self, inputtuple):
(X, y, mode) = inputtuple
if isinstance(self.estimator, (BaseEstimator, DecisionTreeClassifier)):
pred_y = self.estimator.predict(X)
return X, pred_y, mode
else:
X = self.estimator.transform(X)
return X, y, mode
def run_workflow_step(self, inputtuple):
(X, y, mode) = inputtuple
if mode == 'fit':
return self.fit(inputtuple)
elif mode == 'predict':
return self.predict(inputtuple)
def __getstate__(self):
return self.estimator
def __setstate__(self, estimator):
self.estimator = estimator
構建工作流程步驟
接下來,我們定義兩個工作流程步驟:標準化和分類別。這兩個步驟將分別使用標準化處理和決策樹分類別器來處理資料。
@workflow.step
def scaling(inputtuple, name):
va = EstimatorVirtualActor.get_or_create(name, StandardScaler())
outputtuple = va.run_workflow_step.run_async(inputtuple)
return outputtuple
@workflow.step
def classification(inputtuple, name):
va = EstimatorVirtualActor.get_or_create(name,
DecisionTreeClassifier(max_depth=3))
outputtuple = va.run_workflow_step.run_async(inputtuple)
return outputtuple
執行工作流程
最後,我們將這些步驟組合起來,構建完整的訓練和預測工作流程。
training_tuple = (X_train, y_train, 'fit')
classification.step(scaling.step(training_tuple, 'standardscalar'), 'decisiontree').run('training_pipeline')
predict_tuple = (X_test, y_test, 'predict')
(X_pred_y) = classification.step(scaling.step(predict_tuple,
'standardscalar'), 'decisiontree').run('prediction_pipeline')
assert pred_y.shape[0] == 2000 # 檢查預測結果的數量是否正確
長時間執行的工作流程處理
長時間執行的工作流程需要特殊處理,以避免阻塞後續的演員方法呼叫。Ray 提供了一些最佳實踐來處理這種情況。
非阻塞工作流程
以下是一個非阻塞工作流程的範例,展示瞭如何避免阻塞問題。
@workflow.virtual_actor
class ShoppingCart:
...
# 檢查狀態以避免阻塞
def do_checkout(self):
# 生成確定性的工作流程 ID 以實作冪等性。
self.shipment_workflow_id = "ship_{}".format(self.order_id)
# 作為獨立的非同步工作流程執行配送工作流程。
ship_items.step(self.items).run_async(
workflow_id=self.shipment_workflow_id)
工作流程與 Ray 原始元件的整合
Ray 工作流程可以與 Ray 的核心原始元件無縫整合。以下是一些常見的整合場景。
在 Ray 工作步驟中執行任務或演員
我們可以在 Ray 工作步驟中執行任務或演員,從而利用 Ray 的分散式計算能力。
@ray.remote
def do_add(a: int , b: int) -> int:
return a + b
@workflow.step
def add(a: int , b: int) -> int:
return do_add.remote(a , b)
add.step(ray.put(10), ray.put(20)).run() == 30 # 檢查結果是否正確
在多個步驟之間傳遞物件參照
Ray 物件參照可以在多個步驟之間傳遞,這樣可以提高工作流程的效率和可靠性。
from typing import List
@workflow.step
def add(values: List[int]) -> int:
return sum(values)
@workflow.step
def get_val() -> int:
return 10
ret = add.step([get_val.step() for _ in range(3)])
assert ret.run() == 30 # 檢查結果是否正確
建立事件觸發器
Ray 提供了外掛式事件系統,允許外部事件觸發工作流程。這樣可以更靈活地處理各種情況。
建立事件觸發器步驟
以下是一個事件觸發器步驟的範例,展示瞭如何使用事件來觸發工作流程。
import time
# 建立一個在60秒後完成的事件。
event1_step = workflow.wait_for_event(
workflow.event_listener.TimerListener , time.time() + 60)
# 建立另一個在30秒後完成的事件。
event2_step = workflow.wait_for_event(
workflow.event_listener.TimerListener , time.time() + 30)
說明圖示:
此圖示展示了 Ray 工作流程中的各個元件之間的關係及其執行邏輯。
graph TD;
A[開始] --> B[準備資料];
B --> C[定義虛擬演員];
C --> D[構建標準化步驟];
D --> E[構建分類別步驟];
E --> F[執行訓練工作流程];
F --> G[執行預測工作流程];
G --> H[結束];