Ray 提供了 Actor 模型來實作狀態化的分散式計算。本文將探討如何利用非同步 (asyncio) 和多執行緒提升 Ray 遠端角色的效能,並分享一些實務經驗與最佳實踐。非同步角色適用於 I/O 密集型任務,透過 asyncio 事件迴圈實作非阻塞操作,提升並發處理能力。多執行緒角色則適合 CPU 密集型任務,利用多執行緒平行處理,充分利用多核心 CPU 資源。選擇合適的角色型別取決於任務的特性。Ray 也提供了 max_concurrency 引數來控制角色的最大並發數,避免資源耗盡。除了效能最佳化,容錯性也是分散式系統的關鍵。Ray 的遠端函式和角色都具備一定的容錯能力,例如自動重試和角色重啟機制。理解 Ray 的資源管理方式,例如物件的垃圾回收和 detached 資源的處理,對於構建穩定的分散式應用至關重要。此外,瞭解 Ray 物件的操作方式,包括 ObjectRef 和 ray.put 的使用,能更有效地管理資料和提升效能。
Ray 遠端角色:非同步與多執行緒的實作與最佳實踐
在現代軟體開發中,Ray 是一個強大的分散式計算框架,能夠有效地管理資源並提升應用程式的效能。Ray 的遠端角色(Remote Actors)提供了一種方便的方式來實作狀態化的執行。這些角色可以使用非同步(asyncio)或多執行緒(threaded)來提升效能。玄貓將探討如何建立和使用這些角色,並提供一些實務經驗和最佳實踐。
非同步角色的建立
要建立一個使用 asyncio 的角色,必須定義至少一個非同步方法。Ray 會為這些角色建立一個 asyncio 事件迴圈來執行其方法。從呼叫者的角度來看,提交任務給這些角色與提交給普通角色是相同的。唯一的區別在於,當任務在角色中執行時,它會被發布到一個在背景執行緒或執行緒池中的 asyncio 事件迴圈中,而不是直接在主執行緒上執行。需要注意的是,在非同步角色方法中使用阻塞式的 ray.get 或 ray.wait 輸出是不被允許的,因為它們會阻塞事件迴圈的執行。
簡單非同步角色範例
以下是一個簡單的非同步角色範例:
import ray
import asyncio
@ray.remote
class AsyncActor:
async def computation(self, num):
print(f'Actor waiting for {num} sec')
for x in range(num):
await asyncio.sleep(1)
print(f'Actor slept for {x+1} sec')
return num
actor = AsyncActor.remote()
內容解密:
- 定義非同步角色:
@ray.remote裝飾器將類別標記為 Ray 的遠端角色。 - 非同步方法:
computation方法定義為async,這意味著它是一個非阻塞式的非同步操作。 - 非阻塞等待:
await asyncio.sleep(1)使得程式在每秒鐘暫停一次,而不是阻塞整個事件迴圈。 - 傳回值:最後傳回
num。
控制最大並發數
Ray 允許在建立角色時指定最大並發數,這可以幫助控制角色的並發執行數量:
actor = AsyncActor.options(max_concurrency=5).remote()
內容解密:
- 最大並發數:
max_concurrency=5指定了這個角色最多可以同時處理五個任務。 - 執行緒池:內部實作可能會使用執行緒池來管理這些並發任務。
多執行緒角色的建立
建立多執行緒角色與非同步角色類別似,但它們不需要 async 語法。相反,它們使用 Python 的 threading 模組來管理並發性。
簡單多執行緒角色範例
以下是一個簡單的多執行緒角色範例:
import ray
from time import sleep
@ray.remote
class ThreadedActor:
def computation(self, num):
print(f'Actor waiting for {num} sec')
for x in range(num):
sleep(1)
print(f'Actor slept for {x+1} sec')
return num
actor = ThreadedActor.options(max_concurrency=3).remote()
內容解密:
- 執行緒等待:
sleep(1)是一個阻塞式的等待操作,適合用於 I/O 操作。 - 最大並發數:
max_concurrency=3指定了這個角色最多可以同時處理三個任務。
選擇適當的擴充套件方式
在選擇擴充套件方式時,需要考慮應用程式的特性和需求。根據「Multiprocessing vs. Threading vs. AsyncIO in Python」一文中的總結,可以參考以下幾種擴充套件方式:
| 擴充套件方式 | 特性 | 單位使用標準 |
|---|---|---|
| Actor pool | 較多程式、高 CPU 使用率 | CPU-bound (CPU 繫結) |
| Async actor | 單一程式、單一執行緒、協同多工、任務協同切換 | Slow I/O-bound (慢 I/O-bound) |
| Threaded actor | 單一程式、多重執行緒、搶佔式多工、作業系統決定任務切換 | Fast I/O-bound and nonasync libraries (快速 I/O-bound 和非非同步函式庫) |
根據具體需求選擇合適的擴充套件方式可以顯著提升應用程式的效能和穩定性。
Ray 遠端角色最佳實踐
由於 Ray 的遠端角色本質上是遠端函式,因此適用於一般 Ray 遠端函式的一些最佳實踐也適用於遠端角色。此外,Ray 提供了一些特定於角色的最佳實踐:
- 故障容錯:Ray 支援角色自動重啟機制。當角色或宿主節點當機時,角色會被自動重建。
- 全域變數:避免在遠端函式中修改全域變數。相反地,應該使用角色來封裝和存取這些變數。
- 角色池:對於需要頻繁執行相同遠端函式的情況下,可以使用角色池來提供更加受控制且高效能的執行環境。
此圖示
graph TD;
A[Ray Driver] --> B[Remote Function];
A --> C[Actor Pool];
B --> D[New Process];
C --> E[Controlled Workers];
圖示解說:
- Ray Driver:負責協調和管理遠端函式和角色。
- Remote Function:直接執行新程式。
- Actor Pool:提供受控制且高效能的工作者集合。
- New Process:每次執行遠端函式都會建立新程式。
- Controlled Workers:角色池中的工作者集合。
錯誤教訓與改進點
在實際應用中,玄貓曾經遇到過一些錯誤教訓:
- 避免阻塞操作:在非同步角色中避免使用阻塞操作,如
ray.get或ray.wait。 - 適當選擇擴充套件方式:根據具體需求選擇合適的擴充套件方式(如 CPU-bound 或 I/O-bound)。
- 全域變數管理:避免在遠端函式中修改全域變數,應該使用角色來封裝和存取。
未來趨勢與評估
隨著技術的不斷進步,Ray 的遠端角色將會變得更加強大和靈活。未來可能會看到更多關於故障容錯、狀態還原以及更高效能管理工具的支援。對於開發者來說,瞭解這些技術趨勢並適時採用最新技術將是提升應用程式效能和穩定性的一個重要途徑。
透過以上分析與例項,玄貓希望能夠幫助讀者更好地理解和應用 Ray 的遠端角色技術。無論是使用非同步還是多執行緒角色,選擇合適的擴充套件方式並遵循最佳實踐都是成功關鍵。
Ray 設計細節
在使用了遠端函式與演員(actor)之後,現在是瞭解背後運作機制的時候了。在本章中,我們將探討一些重要的分散式系統概念,例如容錯性、Ray 的資源管理以及如何加速遠端函式與演員。這些細節在分散式使用 Ray 的情況下尤為重要,但即使是本地使用者也能從中受益。瞭解 Ray 的運作方式將有助於決定何時及如何使用它。
容錯性
容錯性是指系統如何處理從使用者程式碼到框架本身或執行它們的機器的各種失敗。Ray 為每個系統提供不同的容錯機制。與許多系統一樣,Ray 無法從頭節點失敗中還原。某些不可還原的錯誤存在於 Ray 中,目前無法透過組態來避免。如果頭節點、GCS 或應用程式與頭節點之間的連線失敗,則應用程式將失敗且無法透過 Ray 還原。如果需要這些情況下的容錯性,則必須自行實作高用性,可能使用 ZooKeeper 或類別似的低層工具。
釋出/訂閱系統
釋出/訂閱(pub/sub)系統允許流程訂閱分類別更新。總體而言,Ray 的架構(如圖 5-1 所示)由應用層和系統層組成,兩者都可以處理失敗。
此圖示
graph TD;
A[應用層] --> B[系統層];
B --> C[GCS];
B --> D[分散式排程器];
B --> E[分散式物件儲存];
系統層
系統層由三個主要元件組成:全域性控制狀態(GCS)、分散式排程器和分散式物件儲存。除了 GCS 之外,所有元件都可以水平擴充套件且具備容錯性。
GCS
GCS 是 Ray 架構的核心,負責維護整個系統的控制狀態。內部來說,GCS 是一個具有釋出/訂閱功能的鍵值儲存。目前,GCS 是單點故障並執行在頭節點上。使用 GCS 可以使整體架構簡單化,因為它使系統層中的其他元件可以是無狀態的。這種設計對於容錯性至關重要(即在發生故障時,元件只需重新啟動並從 GCS 中讀取線緒),並使得分散式物件儲存和排程器能夠獨立擴充套件,因為所有元件透過 GCS 分享所需的狀態。
離線函式
由於離線函式不包含任何持久狀態,因此從其失敗中還原相對簡單。Ray 會不斷重試直到成功或達到最大重試次數。如上一章所述,您可以透過 @ray.remote 註解中的 max_retries 引數控制重試次數。要嘗試並更好地理解 Ray 的容錯性,請編寫一個偶爾失敗的離線函式,如範例 5-1 所示。
嘗試重啟遠端函式
import ray
ray.init()
@ray.remote
def flaky_remote_fun(x):
import random
import sys
if random.randint(0, 2) == 1:
sys.exit(0)
return x
r = flaky_remote_fun.remote(1)
## 內容解密:
本段程式碼展示瞭如何定義一個具有故障率的遠端函式 flaky_remote_fun。這個函式在執行時有三分之一的機率會失敗並離開程式。這樣設計是為了測試 Ray 的容錯能力。
import random
import sys
這兩行程式碼引入了 random 和 sys 模組。random 模組用於生成隨機數,而 sys 模組則用於離開程式。
if random.randint(0, 2) == 1:
sys.exit(0)
這段邏輯會生成一個範圍在 0 到 2 之間的隨機整數。如果生成的隨機整數等於 1(即有三分之一的機率),則會呼叫 sys.exit(0) 離開程式。
return x
如果沒有離開程式,則傳回輸入引數 x。
r = flaky_remote_fun.remote(1)
這行程式碼將 flaky_remote_fun 函式提交到 Ray 作為一個遠端任務,並且傳遞引數 1。
離線演員
對於容錯性來說,離線演員是比較複雜的情況,因為它們包含狀態。這就是為什麼在第四章中探討了持久化和還原狀態的選項。演員可以在設定、訊息處理或訊息之間經歷故障。
演員故障處理
與遠端函式不同的是,如果演員在處理訊息時失敗,Ray 不會自動重試它。即使設定了 max_restarts ,Ray 則會重啟您的演員以便處理下一條訊息。出現錯誤時您會收到 RayActorError 異常。
演員懶初始化
Ray 演員是懶初始化的,因此初始化階段的故障與第一條訊息的故障相同。
if random.randint(0, 2) == 1:
sys.exit(0)
演員重啟策略
當演員在訊息之間失敗時,Ray 在下次呼叫時會自動嘗試還原該演員(最多重試 max_retries 次)。如果您編寫了良好的狀態還原程式碼,「訊息之間」失敗通常看不見(除了稍微慢一些),如果沒有狀態還原程式碼則每次重啟都會把演員設定回初始值。
資源管理
如果您的應用程式失敗了,應用程式使用的幾乎所有資源最終都將被垃圾回收掉。唯一例外是 detached 資源(例如 detached 演員或 detached placement groups)。Ray 則會根據組態重新啟動超出當前程式生命週期之外提供叢集不失敗的情況下保證叢集能正常執行而不被縮小規模。
Ray 操作物件
Ray 物件可以包含任何可序列化內容(在下一節中進行介紹),包括其他 Ray 物件的參照(稱為 ObjectRef)。ObjectRef 本質上是一個唯一 ID ,它指向一個遠端物件並概念上類別似於 future 。Ray 物件是為任務結果自動建立的以及大型引數 actor 和遠端 function 。您可以透過呼叫 ray.put 輸入來手動建立物件 ,它將傳回一個立即就綠 ObjectRef (例如 o = ray.put(1))。