事件驅動架構的核心概念在於利用事件來觸發和協調系統中的各種操作,藉此提升系統的鬆耦合性和可擴充套件性。實務上,我們將業務邏輯從傳統的服務層轉移至事件處理器,並透過訊息匯流排來分發事件並觸發對應的操作。本文除了探討批次數量變更事件的處理流程與實作細節之外,也包含了領域模型的調整、倉函式庫查詢方法的擴充套件,以及事件處理器的註冊等導向。同時,我們也引入了命令和命令處理器的概念,進一步強化系統的意圖表達和錯誤處理機制。

事件驅動架構的實作與最佳化

在現代軟體開發中,事件驅動架構(Event-Driven Architecture, EDA)已成為一種流行的設計模式。它透過使用事件來觸發和協調系統中的各種操作,從而實作了系統的鬆耦合和高度可擴充套件性。本文將探討事件驅動架構的實作細節,並透過具體的程式碼範例來說明其在實際專案中的應用。

從服務層到事件處理器的轉變

在傳統的服務層架構中,業務邏輯通常被封裝在服務層(Service Layer)中,由外部請求直接呼叫。然而,在事件驅動架構中,我們將業務邏輯轉移到事件處理器(Event Handlers)中,由事件來觸發相應的操作。

程式碼範例:從服務層函式到事件處理器

# 原始的服務層函式
def allocate(orderid, sku, qty, uow):
    # 業務邏輯實作
    pass

# 事件驅動架構下的事件處理器
def handle_allocation_required(event, uow):
    # 業務邏輯實作
    pass

在上述範例中,allocate 函式被替換為 handle_allocation_required 事件處理器。該處理器負責處理 AllocationRequired 事件,並執行相應的業務邏輯。

#### 內容解密:

  1. 事件處理器的角色:事件處理器是事件驅動架構中的核心元件,負責處理特定的事件並執行相應的業務邏輯。
  2. 業務邏輯的分離:透過將業務邏輯移到事件處理器中,我們實作了業務邏輯與外部請求的分離,提高了系統的可維護性和可擴充套件性。
  3. 事件的觸發機制:事件的觸發機制是事件驅動架構的關鍵。系統透過監聽和處理事件來觸發相應的操作,從而實作了系統的自動化和鬆耦合。

訊息匯流排(Message Bus)的實作

訊息匯流排是事件驅動架構中的另一個重要元件,負責將事件分發到相應的事件處理器。

程式碼範例:訊息匯流排的實作

def handle(event: events.Event, uow: unit_of_work.AbstractUnitOfWork):
    results = []
    queue = [event]
    while queue:
        event = queue.pop(0)
        for handler in HANDLERS[type(event)]:
            results.append(handler(event, uow=uow))
        queue.extend(uow.collect_new_events())
    return results

#### 內容解密:

  1. 訊息匯流排的功能:訊息匯流排負責接收事件並將其分發到相應的事件處理器,同時收集和處理新的事件。
  2. 事件處理的遞迴性質:在某些情況下,事件處理器可能會產生新的事件。訊息匯流排透過遞迴處理這些新事件,確保了系統的完整性和一致性。
  3. 結果的傳回機制:訊息匯流排還負責傳回事件處理的結果,這對於某些需要即時回饋的場景至關重要。

新需求的實作:批次數量變更事件

在實際應用中,我們可能會遇到新的需求,例如需要處理批次數量變更的事件。

程式碼範例:批次數量變更事件的處理

@dataclass
class BatchQuantityChanged(Event):
    ref: str
    qty: int

def handle_batch_quantity_changed(event, uow):
    # 處理批次數量變更事件的邏輯
    pass

#### 內容解密:

  1. 新事件的定義:我們定義了一個新的事件 BatchQuantityChanged,用於表示批次數量的變更。
  2. 事件處理器的實作:我們實作了相應的事件處理器 handle_batch_quantity_changed,負責處理 BatchQuantityChanged 事件並執行相應的業務邏輯。
  3. 業務邏輯的調整:在處理批次數量變更事件時,我們需要根據新的數量調整相關的業務邏輯,例如重新分配訂單等。

事件處理與領域模型最佳化

在軟體開發中,事件驅動架構(Event-Driven Architecture, EDA)扮演著舉足輕重的角色。透過事件驅動,可以有效地實作業務邏輯的解耦和系統的可擴充套件性。本文將探討如何透過事件處理機制和領域模型的最佳化來實作一個變更批次數量的新功能。

新的事件處理器

為了實作變更批次數量的功能,首先需要建立一個新的事件處理器。這個處理器負責處理BatchQuantityChanged事件,並呼叫領域模型的相應方法來更新批次數量。

事件處理器實作

def change_batch_quantity(
    event: events.BatchQuantityChanged, uow: unit_of_work.AbstractUnitOfWork
):
    with uow:
        product = uow.products.get_by_batchref(batchref=event.ref)
        product.change_batch_quantity(ref=event.ref, qty=event.qty)
        uow.commit()

內容解密:

  1. change_batch_quantity函式接收BatchQuantityChanged事件和工作單元(Unit of Work)作為引數。
  2. 透過工作單元的products倉函式庫,使用get_by_batchref方法根據批次參考(batchref)取得對應的產品(Product)。
  3. 呼叫產品的change_batch_quantity方法,更新指定批次的數量。
  4. 最後,提交工作單元的變更。

倉函式庫查詢方法的擴充套件

為了支援新的事件處理器,需要在倉函式庫(Repository)中新增一個新的查詢方法get_by_batchref,以便根據批次參考取得產品。

倉函式庫介面定義

class AbstractRepository(abc.ABC):
    ...
    def get_by_batchref(self, batchref) -> model.Product:
        product = self._get_by_batchref(batchref)
        if product:
            self.seen.add(product)
        return product

    @abc.abstractmethod
    def _get_by_batchref(self, batchref) -> model.Product:
        raise NotImplementedError

SQLAlchemy 倉函式庫實作

class SqlAlchemyRepository(AbstractRepository):
    ...
    def _get_by_batchref(self, batchref):
        return self.session.query(model.Product).join(model.Batch).filter(
            orm.batches.c.reference == batchref,
        ).first()

FakeRepository 實作

class FakeRepository(repository.AbstractRepository):
    ...
    def _get_by_batchref(self, batchref):
        return next((
            p for p in self._products for b in p.batches
            if b.reference == batchref
        ), None)

內容解密:

  1. AbstractRepository定義了get_by_batchref抽象方法和其實作邏輯。
  2. SqlAlchemyRepository透過SQLAlchemy實作了根據批次參考查詢產品的功能。
  3. FakeRepository為測試目的提供了假的倉函式庫實作,同樣支援根據批次參考查詢產品。

領域模型的更新

領域模型需要新增一個方法來處理批次數量的變更,並發布新的事件。

產品模型的更新

class Product:
    ...
    def change_batch_quantity(self, ref: str, qty: int):
        batch = next(b for b in self.batches if b.reference == ref)
        batch._purchased_quantity = qty
        while batch.available_quantity < 0:
            line = batch.deallocate_one()
            self.events.append(
                events.AllocationRequired(line.orderid, line.sku, line.qty)
            )

批次模型的更新

class Batch:
    ...
    def deallocate_one(self) -> OrderLine:
        return self._allocations.pop()

內容解密:

  1. Product類別新增了change_batch_quantity方法,用於更新指定批次的數量。
  2. 如果更新後的數量導致可用數量小於零,則進行解除分配,並發布AllocationRequired事件。
  3. Batch類別新增了deallocate_one方法,用於解除分配一個訂單行。

事件處理器的註冊

最後,需要在訊息匯流排(Message Bus)中註冊新的事件處理器。

訊息匯流排更新

HANDLERS = {
    events.BatchCreated: [handlers.add_batch],
    events.BatchQuantityChanged: [handlers.change_batch_quantity],
    events.AllocationRequired: [handlers.allocate],
    events.OutOfStock: [handlers.send_out_of_stock_notification],
}

內容解密:

  1. 在訊息匯流排的HANDLERS字典中註冊了新的事件處理器change_batch_quantity,用於處理BatchQuantityChanged事件。

測試策略

對於事件驅動的系統,可以採用邊緣到邊緣(edge-to-edge)的測試策略,或者使用假的訊息匯流排進行隔離測試。

隔離測試例項

def test_reallocates_if_necessary_isolated():
    uow = FakeUnitOfWorkWithFakeMessageBus()
    # test setup as before
    event_history = [
        events.BatchCreated("batch1", "INDIFFERENT-TABLE", 50, None),
        events.BatchCreated("batch2", "INDIFFERENT-TABLE", 50, date.today()),
        events.AllocationRequired("order1", "INDIFFERENT-TABLE", 20),
        events.AllocationRequired("order2", "INDIFFERENT-TABLE", 20),
    ]
    for e in event_history:
        messagebus.handle(e, uow)
    [batch1, batch2] = uow.products.get(sku="INDIFFERENT-TABLE").batches
    assert batch1.available_quantity == 10
    assert batch2.available_quantity == 50
    messagebus.handle(events.BatchQuantityChanged("batch1", 25), uow)
    # assert on new events emitted rather than downstream side-effects
    [reallocation_event] = uow.events_published
    assert isinstance(reallocation_event, events.AllocationRequired)
    assert reallocation_event.orderid in {'order1', 'order2'}
    assert reallocation_event.sku == 'INDIFFERENT-TABLE'

內容解密:

  1. 使用假的工作單元和假的訊息匯流排進行隔離測試。
  2. 測試了當批次數量變更時,是否正確發布了新的AllocationRequired事件。

命令與命令處理器

在前一章中,我們討論了使用事件作為系統輸入的一種方式,並將應用程式轉變為訊息處理機制。為了實作這一點,我們將所有使用案例函式轉換為事件處理器。當API接收到POST請求以建立新的批次時,它會建立一個新的BatchCreated事件並將其視為內部事件進行處理。這可能感覺有點違反直覺,因為批次尚未建立;這就是為什麼我們呼叫API。我們將透過引入命令並展示如何透過相同的訊息匯流排處理它們,但使用稍微不同的規則來解決這個概念上的問題。

命令與事件

與事件一樣,命令是一種訊息——由系統的一部分傳送到另一部分的指令。我們通常用簡單的資料結構表示命令,並以與事件相同的方式處理它們。

然而,命令和事件之間的差異很重要。命令由一個參與者傳送到另一個特定的參與者,期望發生特定的事情。當我們向API處理器發布表單時,我們正在傳送一個命令。我們使用具有命令式動詞短語(如「分配庫存」或「延遲出貨」)的名稱來命名命令。

命令捕捉意圖。它們表達了我們對系統做某事的願望。因此,當它們失敗時,傳送者需要接收錯誤資訊。

事件由參與者廣播給所有感興趣的監聽者。當我們發布BatchQuantityChanged時,我們不知道誰將接收它。我們使用過去式的動詞短語(如「訂單已分配庫存」或「出貨已延遲」)來命名事件。

事件通常用於傳播成功命令的知識。事件捕捉過去發生的事情的事實。由於我們不知道誰正在處理事件,傳送者不應該關心接收者是否成功或失敗。表10-1總結了差異。

表10-1. 事件與命令

事件命令
名稱過去式命令式
錯誤處理獨立失敗嘈雜失敗
傳送至所有監聽者一個接收者

從系統中提取命令(src/allocation/domain/commands.py)

from dataclasses import dataclass

class Command:
    pass

@dataclass
class Allocate(Command):
    orderid: str
    sku: str
    qty: int

內容解密:

  1. Command類別被定義為基礎類別,用於表示系統中的命令。
  2. Allocate類別繼承自Command,並使用@dataclass裝飾器定義了三個屬性:orderidskuqty,分別代表訂單ID、產品SKU和數量。
  3. 這種設計允許我們建立具體的命令類別,如Allocate,以表示特定的系統操作。

為什麼需要命令和命令處理器?

在前一章中,我們將所有使用案例函式轉換為事件處理器。然而,這種方法存在一些概念上的問題。事件通常用於表示已經發生的事情,而命令則用於表示我們希望系統做的事情。透過引入命令和命令處理器,我們可以更好地表達系統的意圖,並提供更明確的錯誤處理機制。