事件驅動架構有效降低系統模組耦合度,提升系統彈性與擴充套件性。本文以 Python 為例,示範如何實作事件驅動架構,並利用訊息匯流排處理領域事件。領域事件在領域驅動設計中扮演重要角色,用於描述領域中的關鍵事件,例如產品缺貨。訊息匯流排作為發布訂閱系統,將領域事件分發給對應的處理器。文章探討了服務層發布事件、服務層直接建立發布事件、工作單元發布事件三種方式,並提供程式碼範例說明。此外,文章還討論瞭如何利用儲存函式庫追蹤聚合根,簡化服務層,並使用包裝類別替代繼承。最後,文章介紹瞭如何將服務層函式重構為訊息處理器,以及如何修改訊息匯流排和工作單元以適應新的架構。

事件驅動架構的實作與訊息匯流排的整合

在軟體開發中,事件驅動架構(Event-Driven Architecture, EDA)是一種能夠有效解耦系統元件、提高系統可擴充套件性和維護性的設計模式。本文將探討如何在 Python 專案中實作事件驅動架構,並利用訊息匯流排(Message Bus)來處理領域事件(Domain Events)。

為何需要事件驅動架構?

在傳統的軟體架構中,不同模組之間的耦合度往往較高,這使得系統的維護和擴充套件變得困難。事件驅動架構透過引入事件的概念,能夠有效地降低模組之間的耦合度,使得系統更加靈活和可擴充套件。

領域事件的定義與處理

在領域驅動設計(Domain-Driven Design, DDD)中,領域事件是用於描述領域中重要事件的物件。例如,在一個電商系統中,「產品缺貨」可以被視為一個領域事件。

# 定義領域事件
class OutOfStock(events.Event):
    def __init__(self, sku):
        self.sku = sku

當領域事件發生時,系統需要對其進行處理。這通常涉及到通知相關人員或觸發其他業務邏輯。

訊息匯流排的實作

訊息匯流排是一個簡單的發布-訂閱系統,它負責將領域事件分發給對應的處理器(Handler)。

# 簡單的訊息匯流排實作
def handle(event: events.Event):
    for handler in HANDLERS[type(event)]:
        handler(event)

# 定義事件處理器
def send_out_of_stock_notification(event: events.OutOfStock):
    email.send_mail(
        'stock@made.com',
        f'Out of stock for {event.sku}',
    )

# 事件與處理器的對映關係
HANDLERS = {
    events.OutOfStock: [send_out_of_stock_notification],
}

內容解密:

  1. handle函式:負責接收領域事件,並根據事件型別呼叫對應的處理器。
  2. send_out_of_stock_notification函式:當OutOfStock事件發生時,傳送通知郵件給相關人員。
  3. HANDLERS字典:定義了事件與處理器之間的對映關係,使得訊息匯流排能夠正確地將事件分發給對應的處理器。

將事件發布到訊息匯流排

為了使領域事件能夠被訊息匯流排處理,需要在適當的時機將事件發布到訊息匯流排。有三種常見的方式來實作這一點:

  1. 服務層發布事件:在服務層中顯式地收集領域事件,並將其發布到訊息匯流排。
# 服務層顯式收集事件並發布到訊息匯流排
def allocate(
    orderid: str, sku: str, qty: int,
    uow: unit_of_work.AbstractUnitOfWork
) -> str:
    # ...
    try:
        batchref = product.allocate(line)
        uow.commit()
        return batchref
    finally:
        messagebus.handle(product.events)

內容解密:

  • try/finally區塊:確保即使在發生異常的情況下,也能正確地處理領域事件。
  • messagebus.handle(product.events):將產品相關的領域事件發布到訊息匯流排。
  1. 服務層直接建立並發布事件:服務層直接建立領域事件,並將其發布到訊息匯流排。
# 服務層直接建立並發布事件
def allocate(
    orderid: str, sku: str, qty: int,
    uow: unit_of_work.AbstractUnitOfWork
) -> str:
    # ...
    batchref = product.allocate(line)
    uow.commit()
    if batchref is None:
        messagebus.handle(events.OutOfStock(line.sku))
    return batchref

內容解密:

  • if batchref is None條件:當分配失敗時,建立OutOfStock事件並發布到訊息匯流排。
  1. 工作單元(UoW)發布事件:工作單元負責收集領域事件,並在提交後將其發布到訊息匯流排。
# 工作單元發布事件到訊息匯流排
class AbstractUnitOfWork(abc.ABC):
    # ...
    def commit(self):
        self._commit()
        self.publish_events()

    def publish_events(self):
        for product in self.products.seen:
            while product.events:
                event = product.events.pop(0)
                messagebus.handle(event)

內容解密:

  • publish_events方法:遍歷所有被儲存函式庫追蹤的產品,並將其領域事件發布到訊息匯流排。
  • self._commit()方法:提交事務,確保資料的一致性。

領域事件:工作流程的管理與實作

在軟體開發中,特別是在複雜的業務系統中,工作流程的管理是一個重要的課題。領域事件(Domain Events)提供了一種有效的方式來處理系統中的工作流程。本文將探討如何使用領域事件來管理和實作工作流程。

領域事件的基本概念

領域事件是指在業務領域中發生的重要事件,例如訂單的建立、產品的分配等。這些事件通常會觸發後續的業務流程。透過將事件作為系統中的第一類別公民(first-class citizen),我們可以更好地建模業務流程,提高程式碼的可測試性和可觀察性。

使用領域事件管理工作流程

在系統中使用領域事件,可以將不同的業務邏輯解耦,使得系統更加靈活和可維護。例如,當一個訂單被建立時,可以觸發一個事件,該事件會被多個處理器(handler)捕捉並執行相應的業務邏輯。

儲存函式庫(Repository)的實作

儲存函式庫是領域事件實作中的一個關鍵元件。它負責跟蹤在當前會話中使用的聚合根(aggregate)。以下是一個基本的儲存函式庫實作:

class AbstractRepository(abc.ABC):
    def __init__(self):
        self.seen = set()  # type: Set[model.Product]

    def add(self, product: model.Product):
        self._add(product)
        self.seen.add(product)

    def get(self, sku) -> model.Product:
        product = self._get(sku)
        if product:
            self.seen.add(product)
        return product

    @abc.abstractmethod
    def _add(self, product: model.Product):
        raise NotImplementedError

    @abc.abstractmethod
    def _get(self, sku) -> model.Product:
        raise NotImplementedError

class SqlAlchemyRepository(AbstractRepository):
    def __init__(self, session):
        super().__init__()
        self.session = session

    def _add(self, product):
        self.session.add(product)

    def _get(self, sku):
        return self.session.query(model.Product).filter_by(sku=sku).first()

服務層(Service Layer)的簡化

透過使用領域事件和儲存函式庫,服務層的實作可以大大簡化。以下是一個簡化的服務層函式:

def allocate(
    orderid: str, sku: str, qty: int,
    uow: unit_of_work.AbstractUnitOfWork
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f'Invalid sku {line.sku}')
        batchref = product.allocate(line)
        uow.commit()
        return batchref

#### 內容解密:

此段程式碼展示瞭如何使用領域事件和儲存函式庫簡化服務層的實作。allocate函式負責分配產品,首先檢查產品是否存在,如果不存在則丟擲異常。如果產品存在,則呼叫allocate方法進行分配,並提交事務。

使用包裝類別(Wrapper Class)替代繼承

為了避免使用繼承,可以使用包裝類別來新增功能。以下是一個包裝儲存函式庫的例子:

class TrackingRepository:
    seen: Set[model.Product]

    def __init__(self, repo: AbstractRepository):
        self.seen = set()  # type: Set[model.Product]
        self._repo = repo

    def add(self, product: model.Product):
        self._repo.add(product)
        self.seen.add(product)

    def get(self, sku) -> model.Product:
        product = self._repo.get(sku)
        if product:
            self.seen.add(product)
        return product

#### 內容解密:

此段程式碼展示瞭如何使用包裝類別替代繼承來實作儲存函式庫的功能。TrackingRepository類別包裝了一個AbstractRepository例項,並增加了跟蹤功能的實作。

事件驅動架構與訊息匯流排的進階應用

在軟體開發中,當多個關注點混合在同一處時,程式碼會變得混亂。事件驅動架構可以幫助我們保持程式碼的整潔,將主要的業務邏輯與次要的業務邏輯分離。此外,事件驅動架構還可以用於聚合物件之間的通訊,避免長時間鎖定多個資料表。

訊息匯流排:事件的路由機制

訊息匯流排可以被視為一個將事件對映到其處理程式的字典。它不瞭解事件的含義,只是系統中傳遞訊息的基礎設施。透過訊息匯流排,我們可以實作事件的發布與訂閱機制。

事件驅動架構的三種實作方式

  1. 服務層觸發事件並傳遞給訊息匯流排:在服務層中,我們可以在提交工作單元後呼叫 bus.handle(some_new_event) 來觸發事件。這種方式簡單直接,易於實作。

  2. 領域模型觸發事件,服務層傳遞給訊息匯流排:將事件觸發的邏輯放在領域模型中,可以提高系統的設計和可測試性。在提交工作單元後,服務層可以從模型物件中收集事件並傳遞給訊息匯流排。

  3. 工作單元收集聚合物件的事件並傳遞給訊息匯流排:這種方式是將事件收集和傳遞的責任交給工作單元,使其負責將聚合物件觸發的事件傳遞給訊息匯流排。這種方式雖然實作較為複雜,但使用起來非常方便。

邁向事件驅動的核心架構

在第九章中,我們將進一步探討如何使事件成為應用程式內部結構的基本組成部分。我們的目標是將目前的架構轉變為一個以訊息匯流排為核心的事件驅動架構。

新需求驅動架構變革

在現實世界的軟體系統中,意外事件頻繁發生。為了應對這些意外事件,我們需要設計出能夠靈活應變的系統。例如,當批次數量變更時,我們需要重新分配相關的訂單。

實作事件驅動的核心架構

為了實作事件驅動的核心架構,我們可以將所有的 API 呼叫視為捕捉事件,而服務層函式則成為事件處理程式。這樣一來,我們就不再需要區分內部和外部事件處理程式。

# 範例程式碼:事件處理程式
def change_batch_quantity(event):
    # 修改批次數量
    batch = batch_repository.get(event.batch_id)
    batch.change_quantity(event.new_quantity)
    # 重新分配相關訂單
    if batch.quantity < batch.allocated_quantity:
        deallocate_excess_orders(batch)
        # 發出 AllocationRequired 事件
        for order_line in batch.deallocated_order_lines:
            bus.handle(AllocationRequired(order_line))

# #### 內容解密:
# 1. `change_batch_quantity` 函式是一個事件處理程式,用於處理 `BatchQuantityChanged` 事件。
# 2. 當批次數量變更時,該函式會修改批次數量,並重新分配相關的訂單。
# 3. 如果新的批次數量小於已分配的數量,則會取消多餘的訂單分配,並發出 `AllocationRequired` 事件。

事件驅動架構

@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333

title 事件驅動架構

rectangle "捕捉事件" as node1
rectangle "修改批次數量" as node2
rectangle "發出 AllocationRequired 事件" as node3
rectangle "分配新批次" as node4

node1 --> node2
node2 --> node3
node3 --> node4

@enduml

重構服務函式為訊息處理器

在重構服務層至事件處理器的過程中,首先定義了兩個事件,分別是BatchCreatedAllocationRequired,用於捕捉目前API的輸入。

BatchCreated 和 AllocationRequired 事件

# src/allocation/domain/events.py

@dataclass
class BatchCreated(Event):
    ref: str
    sku: str
    qty: int
    eta: Optional[date] = None

@dataclass
class AllocationRequired(Event):
    orderid: str
    sku: str
    qty: int

內容解密:

  1. BatchCreated 事件: 當新的批次被建立時觸發,包含批次參考(ref)、產品SKU(sku)、數量(qty)及預計到貨日期(eta)等資訊。
  2. AllocationRequired 事件: 當需要分配訂單時觸發,包含訂單ID(orderid)、產品SKU(sku)及所需數量(qty)。

重構服務層為事件處理器

將原有的services.py重新命名為handlers.py,並將現有的訊息處理函式(如send_out_of_stock_notification)加入其中。最重要的是,將所有處理函式的輸入引數統一為事件和工作單元(UoW)。

# src/allocation/service_layer/handlers.py

def add_batch(
    event: events.BatchCreated, uow: unit_of_work.AbstractUnitOfWork
):
    with uow:
        product = uow.products.get(sku=event.sku)
        # ...

def allocate(
    event: events.AllocationRequired, uow: unit_of_work.AbstractUnitOfWork
) -> str:
    line = OrderLine(event.orderid, event.sku, event.qty)
    # ...

def send_out_of_stock_notification(
    event: events.OutOfStock, uow: unit_of_work.AbstractUnitOfWork,
):
    email.send(
        'stock@made.com',
        f'Out of stock for {event.sku}',
    )

內容解密:

  1. add_batch 處理器: 處理BatchCreated事件,將新的批次加入系統。
  2. allocate 處理器: 處理AllocationRequired事件,執行訂單分配邏輯。
  3. send_out_of_stock_notification 處理器: 處理OutOfStock事件,傳送缺貨通知郵件。

訊息匯流排現在負責收集來自UoW的事件

修改訊息匯流排(message bus),使其負責收集和處理新的事件,從而避免UoW和訊息匯流排之間的迴圈依賴。

# src/allocation/service_layer/messagebus.py

def handle(event: events.Event, uow: unit_of_work.AbstractUnitOfWork):
    queue = [event]
    while queue:
        event = queue.pop(0)
        for handler in HANDLERS[type(event)]:
            handler(event, uow=uow)
        queue.extend(uow.collect_new_events())

內容解密:

  1. 初始化事件佇列: 將初始事件加入佇列。
  2. 迴圈處理事件: 從佇列中取出事件並呼叫對應的處理器。
  3. 收集新事件: 在每個處理器執行完畢後,收集新產生的事件並加入佇列。

工作單元(UoW)不再直接將事件釋出到訊息匯流排

修改UoW,將publish_events方法改為collect_new_events,使其僅收集新事件而不直接釋出到訊息匯流排。

# src/allocation/service_layer/unit_of_work.py

class AbstractUnitOfWork(abc.ABC):
    # ...

    def commit(self):
        self._commit()
        # 不再呼叫 self.publish_events()

內容解密:

  1. collect_new_events 方法: 取代原有的publish_events,僅收集新事件傳回給呼叫者(訊息匯流排)。