在多執行緒程式設計中,觀察者模式的應用需要特別謹慎處理執行緒安全問題。本文提供的多執行緒觀察者模式範例程式碼,使用 threading.Lock 確保觀察者列表和狀態變更的原子性,避免資料競爭和不一致的狀態。此外,我們也探討瞭如何使用弱參照來管理觀察者,避免主題持有對觀察者的強參照導致的記憶體洩漏風險,提升程式的穩定性和資源利用效率。接著,我們介紹了命令模式,它能將請求封裝成物件,讓開發者能更彈性地處理請求的執行順序、引數化、紀錄和復原等操作。

import threading
import weakref
from abc import ABC, abstractmethod
from dataclasses import dataclass

# 觀察者模式

class Observer(ABC):
    @abstractmethod
    def update(self, subject):
        pass

class WeakSubject:
    def __init__(self):
        self._observers = weakref.WeakSet()
        self._state = None
        self._lock = threading.Lock()

    def attach(self, observer: Observer):
        with self._lock:
            self._observers.add(observer)

    def detach(self, observer: Observer):
        with self._lock:
            self._observers.discard(observer)

    def notify(self):
        with self._lock:
            for observer in self._observers:
                observer.update(self)

    def set_state(self, state):
        with self._lock:
            self._state = state
            self.notify()

    def get_state(self):
        with self._lock:
            return self._state

# 命令模式

class Command(ABC):
    @abstractmethod
    def execute(self):
        pass

class Receiver:
    def action(self, data):
        print(f"Receiver processing: {data}")

class ConcreteCommand(Command):
    def __init__(self, receiver: Receiver, data):
        self._receiver = receiver
        self._data = data

    def execute(self):
        self._receiver.action(self._data)

class Invoker:
    def __init__(self):
        self._commands = []

    def add_command(self, command: Command):
        self._commands.append(command)

    def run(self):
        for command in self._commands:
            command.execute()

@dataclass
class Event:
    event_type: str
    payload: dict

class AdvancedSubject:
    def __init__(self):
        self._observers = set()
        self._state = None

    def attach(self, observer):
        self._observers.add(observer)

    def detach(self, observer):
        self._observers.discard(observer)

    def notify(self, event: Event):
        for observer in self._observers:
            observer.update(self, event)

    def set_state(self, state):
        self._state = state
        event = Event(event_type="state_change", payload={"state": state})
        self.notify(event)

    def get_state(self):
        return self._state

class AdvancedObserver(ABC):
    @abstractmethod
    def update(self, subject: AdvancedSubject, event: Event):
        pass

觀察者模式的多執行緒實作

觀察者模式是一種常見的設計模式,允許物件之間相互溝通,當一個物件的狀態發生改變時,可以通知其他相關物件。然而,在多執行緒環境中,觀察者模式需要特別注意,以確保執行緒安全和正確的通知。

基本觀察者模式實作

首先,讓我們看一下基本的觀察者模式實作:

from abc import ABC, abstractmethod

class Observer(ABC):
    @abstractmethod
    def update(self, subject):
        pass

class Subject:
    def __init__(self):
        self._observers = set()
        self._state = None

    def attach(self, observer):
        self._observers.add(observer)

    def detach(self, observer):
        self._observers.discard(observer)

    def notify(self):
        for observer in self._observers:
            observer.update(self)

    def set_state(self, state):
        self._state = state
        self.notify()

    def get_state(self):
        return self._state

class ConcreteObserver(Observer):
    def update(self, subject):
        print(f"{self.__class__.__name__} observed state: {subject.get_state()}")

多執行緒環境下的觀察者模式

在多執行緒環境中,觀察者模式需要特別注意,以確保執行緒安全和正確的通知。以下是使用 threading.Lock 來實作執行緒安全的觀察者模式:

import threading
from abc import ABC, abstractmethod

class ThreadSafeSubject:
    def __init__(self):
        self._observers = set()
        self._state = None
        self._lock = threading.Lock()

    def attach(self, observer):
        with self._lock:
            self._observers.add(observer)

    def detach(self, observer):
        with self._lock:
            self._observers.discard(observer)

    def notify(self):
        with self._lock:
            for observer in self._observers:
                observer.update(self)

    def set_state(self, state):
        with self._lock:
            self._state = state
            self.notify()

    def get_state(self):
        with self._lock:
            return self._state

使用 threading.Lock 的好處

使用 threading.Lock 來實作執行緒安全的觀察者模式,有以下好處:

  • 執行緒安全threading.Lock 可以確保只有一個執行緒可以存取觀察者列表和狀態,從而避免了多執行緒環境下的競爭條件。
  • 簡單易用:使用 threading.Lock 來實作執行緒安全的觀察者模式相對簡單易用,只需要在相關方法中新增 with self._lock: 即可。

觀察者模式的實作

觀察者模式是一種設計模式,允許物件在狀態改變時通知其他物件。以下是觀察者模式的實作:

觀察者模式的核心元件

  • 觀察者(Observer):觀察者是指想要接收通知的物件。它必須實作更新方法,以便在被觀察物件狀態改變時接收通知。
  • 被觀察物件(Subject):被觀察物件是指狀態可能改變的物件。它維護了一個觀察者集合,並在狀態改變時通知所有觀察者。

實作觀察者模式

from abc import ABC, abstractmethod
import threading

class Observer(ABC):
    """觀察者抽象類別"""

    @abstractmethod
    def update(self, subject):
        """更新方法,當被觀察物件狀態改變時呼叫"""
        pass

class Subject:
    """被觀察物件類別"""

    def __init__(self):
        self._observers = set()
        self._state = None
        self._lock = threading.Lock()

    def attach(self, observer: 'Observer'):
        """新增觀察者"""
        with self._lock:
            self._observers.add(observer)

    def detach(self, observer: 'Observer'):
        """移除觀察者"""
        with self._lock:
            self._observers.discard(observer)

    def notify(self):
        """通知所有觀察者"""
        with self._lock:
            observers = list(self._observers)
            for observer in observers:
                observer.update(self)

    def set_state(self, state):
        """設定被觀察物件的狀態"""
        with self._lock:
            self._state = state
            self.notify()

    def get_state(self):
        """取得被觀察物件的狀態"""
        with self._lock:
            return self._state

# 範例使用
class ConcreteObserver(Observer):
    """具體觀察者類別"""

    def update(self, subject):
        print(f"收到通知:{subject.get_state()}")

if __name__ == "__main__":
    subject = Subject()
    observer1 = ConcreteObserver()
    observer2 = ConcreteObserver()

    subject.attach(observer1)
    subject.attach(observer2)

    subject.set_state("狀態1")
    subject.set_state("狀態2")

    subject.detach(observer1)
    subject.set_state("狀態3")

內容解密:

  • Observer 類別定義了更新方法 update,它是抽象方法,必須由具體觀察者類別實作。
  • Subject 類別維護了一個觀察者集合,並提供了新增、移除觀察者和通知觀察者的方法。
  • attach 方法新增一個觀察者到集合中。
  • detach 方法移除一個觀察者從集合中。
  • notify 方法通知所有觀察者,被觀察物件狀態改變時呼叫。
  • set_state 方法設定被觀察物件的狀態,並通知所有觀察者。
  • get_state 方法取得被觀察物件的狀態。

圖表翻譯:

  classDiagram
    class Observer {
        +update(subject: Subject)
    }
    class Subject {
        -_observers: Set~Observer~
        -_state: any
        +attach(observer: Observer)
        +detach(observer: Observer)
        +notify()
        +set_state(state: any)
        +get_state(): any
    }
    class ConcreteObserver {
        +update(subject: Subject)
    }
    Observer <|-- ConcreteObserver
    Subject --* Observer

圖表說明:

  • Observer 類別定義了更新方法 update
  • Subject 類別維護了一個觀察者集合,並提供了新增、移除觀察者和通知觀察者的方法。
  • ConcreteObserver 類別實作了更新方法 update

觀察者模式中的記憶體管理

在觀察者模式中,記憶體管理是一個重要的挑戰,尤其是在具有自動垃圾回收的語言中,如 Python。觀察者可能會建立記憶體洩漏,如果主題持有強參照,防止適當的垃圾回收。一個複雜的解決方案涉及使用弱參照。Python 的weakref模組允許主題維護一個弱觀察者集合,自動清除當觀察者不再使用時的參照。

弱參照觀察者模式實作

import weakref
from abc import ABC, abstractmethod

class Observer(ABC):
    @abstractmethod
    def update(self, subject):
        pass

class WeakSubject:
    def __init__(self):
        self._observers = weakref.WeakSet()
        self._state = None

    def attach(self, observer: Observer):
        self._observers.add(observer)

    def detach(self, observer: Observer):
        self._observers.discard(observer)

    def notify(self):
        for observer in self._observers:
            observer.update(self)

    def set_state(self, state):
        self._state = state
        self.notify()

    def get_state(self):
        return self._state

class ConcreteObserver(Observer):
    def update(self, subject: WeakSubject):
        print(f"{self.__class__.__name__} observed state: {subject.get_state()}")

# 建立弱主題和觀察者
weak_subject = WeakSubject()
observer = ConcreteObserver()

# 將觀察者附加到主題
weak_subject.attach(observer)

# 設定主題狀態
weak_subject.set_state("新狀態")

在這個實作中,WeakSubject類別使用weakref.WeakSet來維護觀察者集合。當觀察者不再需要時,垃圾回收器可以自動清除對其的參照,防止記憶體洩漏。

記憶體管理優點

使用弱參照觀察者模式可以提供以下優點:

  • 防止記憶體洩漏:透過使用弱參照,觀察者可以在不再需要時被垃圾回收器自動清除。
  • 減少記憶體使用:弱參照可以幫助減少記憶體使用,因為不再需要的觀察者可以被清除。
  • 改善系統穩定性:透過防止記憶體洩漏,弱參照觀察者模式可以幫助改善系統的穩定性和可靠性。

觀察者模式的進階應用

在複雜系統中,觀察者模式(Observer Pattern)是一種重要的設計模式,允許物件之間的溝通和協調。然而,當系統變得更加複雜時,觀察者模式也需要進一步的改進和最佳化。這篇文章將探討觀察者模式的進階應用,包括如何實作更細粒度的通知和如何擴充套件觀察者介面以支援多種事件型別。

問題描述

在傳統的觀察者模式中,觀察者會接收到所有狀態變化的通知,但這可能會導致效能瓶頸或過度的通知。另一方面,如果通知過於粗糙,觀察者的反應就會減少。為瞭解決這個問題,我們需要在觀察者介面中加入篩選機制,允許觀察者只對特定的狀態變化或值範圍感興趣。

解決方案

為瞭解決上述問題,我們可以使用以下幾種方法:

  1. 篩選機制:在觀察者介面中加入篩選機制,允許觀察者只對特定的狀態變化或值範圍感興趣。
  2. 事件物件:使用事件物件(Event Object)來傳遞狀態變化的詳細資訊,例如狀態變化的型別和相關資料。
  3. 多種事件型別:擴充套件觀察者介面以支援多種事件型別,允許觀察者只對特定的事件型別感興趣。

實作細節

以下是實作細節:

from dataclasses import dataclass
from abc import ABC, abstractmethod

@dataclass
class Event:
    """事件物件"""
    event_type: str
    payload: dict

class AdvancedSubject:
    """進階主題類別"""
    def __init__(self):
        self._observers = set()
        self._state = None

    def attach(self, observer: 'AdvancedObserver'):
        """新增觀察者"""
        self._observers.add(observer)

    def detach(self, observer: 'AdvancedObserver'):
        """移除觀察者"""
        self._observers.discard(observer)

    def notify(self, event: Event):
        """通知觀察者"""
        for observer in self._observers:
            observer.update(self, event)

    def set_state(self, state):
        """設定狀態"""
        self._state = state
        event = Event(event_type="state_change", payload={"state": state})
        self.notify(event)

    def get_state(self):
        """取得狀態"""
        return self._state

class AdvancedObserver(ABC):
    """進階觀察者介面"""
    @abstractmethod
    def update(self, subject: AdvancedSubject, event: Event):
        """更新方法"""
        pass

class ConcreteAdvancedObserver(AdvancedObserver):
    """具體進階觀察者類別"""
    def update(self, subject: AdvancedSubject, event: Event):
        if event.event_type == "state_change":
            print(f"{self.__class__.__name__} received new state: {event.payload['state']}")

# 使用範例
advanced_subject = AdvancedSubject()
observer = ConcreteAdvancedObserver()
advanced_subject.attach(observer)
advanced_subject.set_state("new_state")

封裝請求為物件:命令模式

命令模式是一種強大的設計模式,能夠將請求封裝為物件,從而實作請求的引數化、佇列化、記錄化和復原等功能。這種模式透過將請求的接收者和執行者解耦,實作了高程度的模組化和靈活性。

命令模式的核心概念

命令模式圍繞著三個基本概念:命令介面、具體命令實作和呼叫者。命令介面定義了執行方法,通常命名為 execute(),該方法封裝了對接收者的行為。具體命令類別實作了這個介面,並將接收者繫結到特定的動作上。呼叫者持有對命令物件的參照,並觸發其執行,通常不需要了解命令的內部邏輯。

基本實作

以下是 Python 中的一個基本實作,使用抽象基礎類別來正式定義命令介面:

from abc import ABC, abstractmethod

class Command(ABC):
    @abstractmethod
    def execute(self):
        pass

class Receiver:
    def action(self, data):
        print(f"Receiver processing: {data}")

class ConcreteCommand(Command):
    def __init__(self, receiver: Receiver, data):
        self._receiver = receiver
        self._data = data

    def execute(self):
        self._receiver.action(self._data)

class Invoker:
    def __init__(self):
        self._commands = []

    def add_command(self, command: Command):
        self._commands.append(command)

    def run(self):
        for command in self._commands:
            command.execute()

# 使用示例
receiver = Receiver()
command = ConcreteCommand(receiver, "Hello, World!")
invoker = Invoker()
invoker.add_command(command)
invoker.run()

命令模式的優點

命令模式提供了多種優點,包括:

  • 解耦: 命令模式允許請求的傳送者和接收者之間解耦,這提高了系統的模組化和靈活性。
  • 引數化: 命令可以被引數化,這使得系統可以根據不同的輸入執行不同的動作。
  • 佇列化: 命令可以被佇列化,這使得系統可以延遲執行請求或重新排序請求。
  • 記錄化: 命令可以被記錄,這使得系統可以追蹤請求的歷史和狀態。
  • 復原: 命令模式提供了一種復原機制,允許系統在執行錯誤時還原到之前的狀態。

命令模式的應用場景

命令模式在以下場景中特別有用:

  • GUI 程式設計: 命令模式可以用於實作 GUI 中的按鈕、選單項等互動元素的行為。
  • 網路請求處理: 命令模式可以用於實作網路請求的處理,例如 HTTP 請求。
  • 工作流程管理: 命令模式可以用於實作工作流程管理,例如任務排程和執行。

從技術架構視角來看,觀察者模式的多執行緒實作,核心在於如何確保執行緒安全並有效管理觀察者集合。本文探討了從基礎的 threading.Lock 應用到更進階的弱參照 weakref.WeakSet 的實作方式,有效解決了潛在的競爭條件和記憶體洩漏問題。更進一步,透過事件物件和篩選機制,觀察者模式的通知粒度得以精細化,避免了過度通知造成的效能損耗,也提升了系統回應的靈活性。然而,引入弱參照需要仔細考量觀察者生命週期管理,避免過早回收造成程式錯誤。對於重視效能和資源利用的系統,建議優先採用弱參照方案,並搭配事件機制實作更精確的狀態變化通知。隨著系統複雜度的提升,預期更多細粒度的控制和客製化通知機制將成為觀察者模式演進的重點方向,例如根據發布/訂閱模式的擴充套件,以支援更複雜的事件路由和過濾邏輯。對於追求高效能和可維護性的開發團隊而言,深入理解並善用這些進階技巧,將有助於打造更穩健且靈活的應用程式。