持久化是遠端佇列技術的關鍵環節。本文首先示範如何將持久化邏輯從帳戶 Actor 中抽離,建立一個抽象的持久化類別 BasePersistence,並以檔案持久化 FilePersistence 作為具體實作。此舉提升程式碼的可維護性和擴充套件性,允許日後輕鬆切換不同的持久化方案,例如資料函式庫或雲端儲存。接著,針對 Actor 的可擴充套件性,我們引入 Ray 的 Actor 池概念。Actor 池允許我們建立一組 Actor 例項,並以類別似多程式池的方式進行任務排程,有效地將負載分散到多個 Actor 上,提升系統吞吐量。程式碼範例中,我們示範如何將 FilePersistence Actor 放入池中,並在 Account Actor 中使用此池進行狀態的存取。最後,我們簡要討論了 Ray 中的並發機制,包括 Threading 和 Asyncio,這些技術可以進一步提升 Actor 的執行效率,但需注意分享狀態的管理,避免 race condition 等問題。
遠端佇列技術與持久化改良
遠端佇列實作的改變
當我們將這個實作與原本的範例4-1進行比較時,會發現幾個重要的變化:
- 建構函式多了兩個額外的引數:
account_key和basedir。 account_key是帳戶的唯一識別碼,也用作持久化檔案的名稱。basedir引數則指定用於存放持久化檔案的基礎目錄。當建構函式被呼叫時,我們首先檢查是否有此帳戶的持久化狀態儲存,如果有,則忽略傳入的餘額和最低餘額,並從持久化狀態中還原它們。
程式碼範例
class Account:
def __init__(self, balance: float, minimal_balance: float, account_key: str, basedir: str):
self.account_key = account_key
self.basedir = basedir
self.key = account_key
if self.restorestate():
return
if balance < minimal_balance:
raise Exception("Starting balance is less than minimal balance")
self.balance = balance
self.minimal = minimal_balance
self.storestate()
內容解密:
-
建構函式接收四個引數:初始餘額 (
balance)、最低餘額 (minimal_balance)、account_key和basedir。 -
account_key用於唯一識別每個帳戶並作為持久化檔案的名稱。 -
basedir用於指定存放持久化檔案的基礎目錄。 -
如果存在持久化狀態,則從檔案中還原餘額和最低餘額,否則使用傳入的值。
-
最後,呼叫
storestate()方法將當前狀態存入檔案。 -
新增了兩個方法到類別中:
store_state和restore_state。
程式碼範例
def store_state(self):
state = {'balance': self.balance, 'minimal': self.minimal}
bytes = ray.cloudpickle.dumps(state)
with open(self.basedir + '/' + self.key, "wb") as f:
f.write(bytes)
def restore_state(self):
if not os.path.exists(self.basedir + '/' + self.key):
return False
with open(self.basedir + '/' + self.key, "rb") as f:
bytes = f.read()
state = ray.cloudpickle.loads(bytes)
self.balance = state['balance']
self.minimal = state['minimal']
return True
內容解密:
store_state方法將帳戶狀態(餘額和最低餘額)儲存到檔案中。使用 Ray 的雲端序列化技術將狀態轉換為位元組字串,然後寫入指定的檔案。restore_state方法從檔案中讀取位元組字串,並使用 Ray 的雲端序列化技術將其轉換回字典,然後更新帳戶狀態。
更靈活且可擴充套件的持久化設計
上述實作雖然運作良好,但帳戶 Actor 的實作現在包含了過多的持久化相關程式碼,且與檔案持久化緊密耦合。更好的解決方案是將持久化相關程式碼分離到獨立的類別中。
我們先建立一個抽象類別來定義任何持久化類別必須實作的方法(範例4-8)。
抽象類別範例
class BasePersistence:
def exists(self, key: str) -> bool:
pass
def save(self, key: str, data: dict):
pass
def restore(self, key: str) -> dict:
pass
建立具體持久化類別
具體的持久化類別可以根據抽象類別來實作。例如,以下是以檔案作為持久化媒介的具體類別(範例4-9)。
檔案持久化類別範例
class FilePersistence(BasePersistence):
def __init__(self, basedir: str = '.'):
self.basedir = basedir
def exists(self, key: str) -> bool:
return os.path.exists(os.path.join(self.basedir, key))
def save(self, key: str, data: dict):
bytes_data = ray.cloudpickle.dumps(data)
with open(os.path.join(self.basedir, key), "wb") as f:
f.write(bytes_data)
def restore(self, key: str) -> dict:
if not self.exists(key):
return None
with open(os.path.join(self.basedir, key), "rb") as f:
bytes_data = f.read()
return ray.cloudpickle.loads(bytes_data)
內容解密:
FilePersistence類別繼承自BasePersistence,實作了所有必須的方法。__init__方法初始化基礎目錄 (basedir)。exists方法檢查指定鑰匙對應的檔案是否存在。save方法將狀態資料寫入檔案中。restore方法從檔案中讀取並還原狀態資料。
改進後的帳戶 Actor 做法
有了這些改進後,我們可以簡化並普遍化帳戶的實作(範例4-10)。
改進後的帳戶 Actor 做法範例
@ray.remote
class Account:
def __init__(self, balance: float, minimal_balance: float, account_key: str,
persistence: BasePersistence):
self.persistence = persistence
self.key = account_key
if not self.restore_state():
if balance < minimal_balance:
raise Exception("Starting balance is less than minimal balance")
self.balance = balance
self.minimal_balance = minimal_balance
self.store_state()
def balance(self) -> float:
return self.balance
def deposit(self, amount: float) -> float:
if amount < 0:
raise Exception("Cannot deposit negative amount")
self.balance += amount
self.store_state()
return self.balance
def withdraw(self, amount: float) -> float:
if amount < 0:
raise Exception("Cannot withdraw negative amount")
new_balance = self.balance - amount
if new_balance < self.minimal_balance:
raise Exception("Withdrawal is not supported by current balance")
self.balance = new_balance
self.store_state()
return new_balance
def restore_state(self) -> bool:
state = self.persistence.restore(self.key)
if state is not None:
self.balance = state['balance']
self.minimal_balance = state['minimal']
return True
return False
def store_state(self):
state_to_save = {'balance': self.balance, 'minimal': self.minimal_balance}
self.persistence.save(self.key, state_to_save)
內容解密:
- 建構函式現在接受一個
BasePersistence的例項,允許我們輕鬆更換不同的持久化實作而不需要改變 Actor 的程式碼。 restore_state和store_state被普遍化成移動所有持久化相關程式碼到持久化類別中。deposit和withdraw方法在狀態變更後呼叫store_state()以更新持久化狀態。
持續連線需求與演進
上述設計可以靈活支援不同的持續連線需求。例如,如果某些持續連線需要永久性地維護資料函式庫連線(如與資料函式庫互動),可能會導致無法擴充套件問題。這種情況下可以考慮將持續連線實作為另一個 Actor。這樣可以避免維護過多連線而導致系統壓力過大。接著我們來看 Ray 提供的一些擴充套件選項。
Ray 演算遠端佇列之擴充套件選項
Ray 提供多種方式來擴充套件遠端佇列。由於最初定義中的佇列模型通常假設佇列是輕量級且不需要擴充套件,因此在實際應用中可能需要根據不同情境進行調整和擴充套件。以下是一些常見選項:
- 自動擴充套件:Ray 支援自動擴充套件功能,可以根據系統負載自動增加或減少佇列例項數量。
- 手動擴充套件:使用者可以手動調整佇列數量來適應不同負載情況。
- 分割槽管理:針對特定任務或工作流程進行分割槽管理,提高效率和可擴充套件性。
玄貓認為這些改進對於處理複雜且動態變化的業務場景非常有幫助。無論是單純地儲存狀態還是需要高效能及可靠性地進行資料函式庫操作,Ray 提供了強大且靈活的解決方案。希望這些經驗和見解能夠對於開發者在處理這些問題時提供一些啟發。
最佳化 Ray 遠端 Actor 的可擴充套件性
在分散式系統中,Actor 模型是一種常見的設計模式,用來管理並發處理和狀態管理。Ray 是一個流行的分散式計算框架,支援 Actor 模型。本文將探討如何最佳化 Ray 遠端 Actor 的可擴充套件性,並提供具體的例項來說明這些概念。
粗粒度與細粒度 Actor
粗粒度 Actor 是指一個單一的 Actor,可以包含多個狀態片段。與之相反,細粒度 Actor 則是每個狀態片段都由一個獨立的 Actor 表示。這種設計類別似於粗粒度鎖定或平行化。
在 Ray 和類別似的系統(如 Akka)中,Actor 通常用於較粗粒度的實作,並且可能需要進行擴充套件。Actor 的擴充套件可以分為水平擴充套件(跨程式/機器)和垂直擴充套件(增加資源)。本文將重點介紹水平擴充套件。
水平擴充套件與 Actor 池
水平擴充套件是指增加更多的程式來處理請求。Ray 提供了 ray.util 模組中的 Actor 池,類別似於多程式池,讓你可以在固定的 Actor 池中排程任務。
Actor 池有效地使用了一組固定的 Actor 作為單一實體,並管理哪個 Actor 接收下一個請求。需要注意的是,池中的每個 Actor 仍然是獨立的,它們的狀態不會合併。因此,這種擴充套件選項僅適用於 Actor 的狀態在建構函式中建立且在執行過程中不變化的情況。
具體例項:使用 Actor 池來提升 Account 類別的可擴充套件性
以下是一個使用 Actor 池來提升 Account 類別可擴充套件性的具體例項:
pool = ActorPool([
FilePersistence.remote(), FilePersistence.remote(), FilePersistence.remote()])
@ray.remote
class Account:
def __init__(self, balance: float, minimal_balance: float,
account_key: str, persistence: ActorPool):
self.persistence = persistence
self.key = account_key
if not self.restore_state():
if balance < minimal_balance:
raise Exception("Starting balance is less than minimal balance")
self.balance = balance
self.minimal = minimal_balance
self.store_state()
def balance(self) -> float:
return self.balance
def deposit(self, amount: float) -> float:
if amount < 0:
raise Exception("Cannot deposit negative amount")
self.balance += amount
self.store_state()
return self.balance
def withdraw(self, amount: float) -> float:
if amount < 0:
raise Exception("Cannot withdraw negative amount")
new_balance = self.balance - amount
if new_balance < self.minimal:
raise Exception("Withdrawal is not supported by current balance")
self.balance = new_balance
self.store_state()
return new_balance
def restore_state(self) -> bool:
while self.persistence.has_next():
self.persistence.get_next()
state = self.persistence.submit(
lambda a, v: a.restore.remote(v), self.key)
if state is not None:
print(f'Restoring state {state}')
self.balance = state['balance']
self.minimal = state['minimal']
return True
return False
def store_state(self):
self.persistence.submit(
lambda a, v: a.save.remote(v),
(self.key, {'balance': self.balance, 'minimal': self.minimal}))
account_actor = Account.options(name='Account').remote(
balance=100., minimal_balance=20.,
account_key='1234567', persistence=pool)
內容解密:
- Actor 池建立:首先建立了一個包含三個相同 FilePersistence Actors 的池。
- Account 類別初始化:將這個池傳遞給 Account 類別。
- 存取狀態方法:
store_state和restore_state方法使用池來存取狀態。 - 彈性擴充套件:
store_state方法是非同步執行的,不等待結果即開始執行。而restore_state則需要等待結果才能繼續執行。 - 排程與管理:池內部管理請求和結果佇列,確保正確的請求和結果對應。
Concurrency 擴充套件
除了多程式池之外,Ray 還支援透過 Concurrency(並發)來擴充套件 Actors 的執行。Ray 提供了兩種並發方式:執行緒(Threading)和非同步執行(Async Execution)。
Threading 與 Asyncio
- Threading:適用於程式碼大量阻塞但不主動讓出控制權的情況。作業系統決定何時執行哪個執行緒。
- Asyncio:Python 的協同多工處理函式庫。當程式碼或函式庫顯示等待結果時,Python 可以切換到另一個任務執行。適合 I/O 操作或呼叫原生函式庫時。
必要注意事項
使用並發時需要小心管理分享狀態。無論是執行緒還是 asyncio,都需要使用鎖來確保只有一個執行緒或任務可以存取特定記憶體。
內容解密:
- Actor Pool:表示由多個 Actors 構成的池。
- State Management:所有 Actors 處理狀態管理。
- 水平擴充套件:增加更多的 Actors 來處理請求。
- 垂直擴充套件:為單個 Actor 提供更多資源來處理請求。