現代軟體開發中,為了充分利用多核心處理器的效能,平行與非同步程式設計至關重要。執行緒池模式透過重複使用執行緒,有效降低建立和銷毀執行緒的開銷,提升系統資源利用率。工作者模型模式則將大型任務分解成小型工作單元,讓多個工作者平行處理,進一步提升效率和可擴充套件性。Future 與 Promise 模式則是非同步程式設計的根本,允許程式在等待非同步操作完成的同時執行其他任務,提高程式回應速度。這些模式各有優劣,開發者需要根據應用場景選擇合適的模式。

現代軟體開發中的平行與非同步模式

在上一章中,我們探討了架構設計模式,這些模式有助於解決複雜專案所帶來的獨特挑戰。接下來,我們需要討論平行與非同步模式,這是我們解決方案目錄中的另一個重要類別。

平行與非同步程式設計的重要性

平行允許程式同時管理多個操作,充分利用現代處理器的全部功能。這就像一位廚師同時準備多道菜,每一步都精心安排,以便所有菜餚都能同時完成。另一方面,非同步程式設計允許應用程式在等待操作完成的同時執行其他任務,例如將食物訂單傳送到廚房,並在訂單準備好之前為其他顧客提供服務。

在本章中,我們將涵蓋以下主要主題:

  • 執行緒池模式
  • 工作者模型模式
  • Future 與 Promise 模式
  • 反應式程式設計中的觀察者模式
  • 其他平行與非同步模式

技術需求

請參閱第1章中介紹的技術需求。本章討論的程式碼的額外技術需求如下:

  • Faker,使用 pip install faker 安裝
  • ReactiveX,使用 pip install reactivex 安裝

執行緒池模式

首先,瞭解什麼是執行緒非常重要。在計算中,執行緒是作業系統可以排程的最小處理單元。

執行緒就像可以在電腦上同時執行的執行軌跡,這使得可以同時完成許多活動,從而提高效能。它們在需要多工的應用程式中特別重要,例如服務多個網頁請求或執行多個計算。

現在,讓我們來看看執行緒池模式本身。假設您有很多工要完成,但啟動每個任務(這意味著在這種情況下建立一個執行緒)可能會耗費大量資源和時間。這就像每次有工作要做時僱用一個新員工,然後在工作完成時讓他們離開。這個過程可能效率低下且成本高昂。透過維護一個工作執行緒的集合或池,可以建立一次執行緒,然後在多個作業中重複使用它們,執行緒池模式有助於減少這種低效率。當一個執行緒完成任務時,它不會終止,而是傳回到池中,等待另一個可以再次使用的任務。

什麼是工作執行緒?

工作執行緒是用於執行特定任務或一組任務的執行緒。工作執行緒用於從主執行緒中解除安裝處理任務,透過非同步執行耗時或資源密集型任務來幫助保持應用程式的回應性。

除了更快的應用程式效能之外,還有兩個好處:

  • 減少開銷:透過重複使用執行緒,應用程式避免了為每個任務建立和銷毀執行緒的開銷
  • 更好的資源管理:執行緒池限制了執行緒的數量,防止了因建立太多執行緒而導致的資源耗盡

真實世界的例子

在現實生活中,想象一家只有有限數量的廚師(執行緒)的小餐館,他們為顧客烹飪餐點(任務)。由於廚房空間(系統資源)的限制,餐館一次只能容納一定數量的廚師工作。當新的訂單到來時,如果所有廚師都忙碌,訂單就會在佇列中等待,直到有可用的廚師為止。這樣,餐館就能有效地管理訂單流程,利用現有的廚師,確保所有廚師都被有效地利用,而不會讓廚房不堪重負,也不需要為每個新訂單僱用更多的員工。

內容解密:

  1. 多工處理:現代作業系統支援多工處理,允許電腦同時執行多個任務。執行緒是實作多工處理的一種方式。
  2. 執行緒池的好處:使用執行緒池可以減少建立和銷毀執行緒的開銷,提高系統的回應速度和效能。
  3. 工作執行緒的作用:工作執行緒負責執行實際的任務,將主執行緒從耗時的操作中解放出來,提高了應用程式的整體效能。
  4. 真實世界的類別比:透過餐館廚師的例子,可以直觀地理解執行緒池的工作原理和優勢。
import concurrent.futures
import time

def task(n):
    print(f"Task {n} started")
    time.sleep(2)  # 模擬耗時操作
    print(f"Task {n} finished")
    return n * n

def main():
    # 建立一個包含5個執行緒的執行緒池
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # 提交10個任務到執行緒池
        futures = [executor.submit(task, n) for n in range(10)]
        
        # 取得任務結果
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            print(f"Result: {result}")

if __name__ == "__main__":
    main()

內容解密:

  1. concurrent.futures模組:Python 的 concurrent.futures 模組提供了高階介面,用於非同步執行可呼叫物件。
  2. ThreadPoolExecutor類別ThreadPoolExecutor 類別用於建立一個執行緒池,可以重複使用固定數量的執行緒來執行任務。
  3. max_workers引數max_workers 引數指定了執行緒池中的最大工作者數量,即最多可以同時執行的任務數量。
  4. executor.submit()方法submit() 方法用於向執行緒池提交任務,並傳回一個 Future 物件,表示任務的執行狀態。
  5. future.result()方法result() 方法用於取得任務的結果,如果任務尚未完成,則會阻塞直到任務完成。

其他平行與非同步模式

除了上述模式之外,還有其他一些重要的平行與非同步模式,例如:

  • 工作者模型模式
  • Future 與 Promise 模式
  • 反應式程式設計中的觀察者模式

這些模式各有其特點和適用場景,可以根據具體需求選擇合適的模式來提高應用程式的效能和可維護性。

執行緒池模式(Thread Pool Pattern)與工作者模型模式(Worker Model Pattern)詳解

在軟體開發領域中,多執行緒處理是一種常見的技術,用於提升程式的效能與回應速度。其中,執行緒池模式(Thread Pool Pattern)與工作者模型模式(Worker Model Pattern)是兩種重要的設計模式。本文將探討這兩種模式的原理、應用場景及其實作方法。

執行緒池模式(Thread Pool Pattern)

執行緒池模式是一種用於管理多執行緒的設計模式,主要目的是減少建立和銷毀執行緒所帶來的系統開銷。透過預先建立一定數量的執行緒,並將任務分配給這些執行緒執行,可以有效提高系統資源的利用率。

執行緒池模式的應用場景

  1. 批次處理:當有多個任務可以平行處理時,執行緒池可以將這些任務分配給其工作執行緒,從而提高處理效率。
  2. 負載平衡:執行緒池可以平衡工作負載,確保沒有單一執行緒承擔過多的任務。
  3. 資源最佳化:透過重複使用執行緒,執行緒池可以最小化系統資源(如記憶體和CPU時間)的使用。

實作執行緒池模式

  1. 初始化:在應用程式啟動時,建立一定數量的工作執行緒。
  2. 任務提交:當有任務需要執行時,將任務提交給執行緒池,而不是直接建立新的執行緒。
  3. 任務執行:執行緒池將任務分配給可用的工作執行緒。如果所有執行緒都忙碌,任務可能會在佇列中等待,直到有執行緒可用。
  4. 執行緒重複使用:當一個執行緒完成任務後,它不會被銷毀,而是傳回到池中,準備被分配新的任務。

以下是一個使用Python的concurrent.futures模組中的ThreadPoolExecutor類別實作執行緒池的範例:

from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    print(f"正在執行任務 {n}")
    time.sleep(1)
    print(f"任務 {n} 已完成")

with ThreadPoolExecutor(max_workers=5) as executor:
    for i in range(10):
        executor.submit(task, i)

程式碼詳解

此範例程式碼展示瞭如何使用ThreadPoolExecutor建立一個包含5個工作執行緒的執行緒池,並提交10個任務給該池。工作執行緒會平行執行這些任務,一旦一個工作執行緒完成一個任務,就會接著處理佇列中的下一個任務。

輸出結果顯示,任務完成的順序與提交的順序不同,這表明任務是透過執行緒池中的可用執行緒平行執行的。

工作者模型模式(Worker Model Pattern)

工作者模型模式的核心思想是將大型任務或多個任務分解成較小、可管理的單位(稱為工作者),這些工作者可以平行處理。這種平行處理方式不僅加快了處理速度,還提高了應用程式的效能。

工作者模型模式的優點

  1. 可擴充套件性:可以輕易地透過增加更多的工作者來擴充套件系統,尤其是在分散式系統中,任務可以在多台機器上處理。
  2. 效率:透過將任務分配給多個工作者,系統可以更好地利用可用的運算資源,平行處理任務。
  3. 靈活性:工作者模型模式可以適應各種處理策略,從簡單的根據執行緒的工作者到跨越多台伺服器的複雜分散式系統。

工作者模型模式的應用場景

  1. 資料轉換:當需要轉換大型資料集時,可以將工作分配給多個工作者。
  2. 任務平行:在不同任務之間相互獨立的應用程式中,工作者模型模式非常有效。

Concurrency and Asynchronous Patterns

Worker Model Pattern 的應用與實作

Worker Model pattern 是一種在平行運算中非常實用的設計模式,尤其適用於任務獨立且可平行處理的場景。除了單機環境,該模式還能擴充套件至多台機器,實作分散式運算。

Worker Model Pattern 的核心元件

在探討實作範例之前,我們先了解 Worker Model pattern 的工作原理。該模式涉及三個主要元件:workers、task queue 和可選的 dispatcher:

  • Workers:作為主要執行者,每個 worker 獨立執行任務的一部分。根據實作方式,worker 可一次處理一個任務或同時處理多個任務。
  • Task Queue:作為中央儲存元件,任務在被處理前會暫存在佇列中。Workers 通常從佇列中提取任務,確保任務在 worker 之間的有效分配。佇列扮演緩衝區的角色,將任務提交與任務處理解耦。
  • Dispatcher:在某些實作中,dispatcher 根據 worker 的可用性、負載或優先順序分配任務,有助於最佳化任務分配和資源利用。

實作範例:平行執行函式

以下是一個使用 Python multiprocessing 模組實作 Worker Model pattern 的範例:

from multiprocessing import Process, Queue
import time

def worker(task_queue):
    while not task_queue.empty():
        task = task_queue.get()
        print(f"Worker {task} is processing")
        time.sleep(1)
        print(f"Worker {task} completed")

def main():
    task_queue = Queue()
    for i in range(10):
        task_queue.put(i)

    processes = [
        Process(target=worker, args=(task_queue,))
        for _ in range(5)
    ]

    # 啟動 worker 程式
    for p in processes:
        p.start()

    # 等待所有 worker 程式完成
    for p in processes:
        p.join()

    print("All tasks completed.")

if __name__ == "__main__":
    main()

內容解密:

  1. 建立 Task Queue 和 Workers:在 main 函式中,我們建立了一個 Queue 物件來儲存任務,並將 10 個任務新增到佇列中。然後,我們建立了 5 個 worker 程式,每個程式執行 worker 函式。
  2. worker 函式的執行邏輯worker 函式不斷從 task queue 中提取任務並執行,直到佇列為空。每個 worker 執行一個任務後會模擬耗時操作(time.sleep(1)),然後輸出完成訊息。
  3. 平行處理:透過啟動多個 worker 程式,我們實作了任務的平行處理。每個 worker 獨立執行任務,提高了整體處理效率。
  4. 程式管理:使用 p.start() 啟動 worker 程式,並透過 p.join() 等待所有程式完成,確保主程式在所有任務處理完畢後才終止。

執行範例程式碼後,我們可以看到 5 個 workers 同時處理 task queue 中的 10 個任務,最終輸出 “All tasks completed."。

Future 和 Promise Pattern

在非同步程式設計中,Future 代表一個尚未獲得但最終會可用的值。當函式發起非同步操作時,它會立即傳回一個 Future 物件,而不是阻塞直到操作完成。Future 物件作為稍後可用的實際結果的佔位符。

Future 和 Promise 的關係

  • Future:代表非同步操作的結果,可透過 callback、輪詢或阻塞等方式取得結果。
  • Promise:是 Future 的可寫、可控的對應物,代表非同步操作的生產者端。當操作完成時,Promise 被履行(fulfill)或拒絕(reject),從而解析其關聯的 Future。

實務應用範例

訂製餐桌的例子生動地說明瞭 Future 和 Promise pattern。當你下訂單時,你會收到預計完成日期和設計草圖(Future),這代表了木匠對交付餐桌的承諾(Promise)。隨著木匠的工作,這個承諾逐漸走向履行。餐桌的交付完成了 Future,標誌著木匠對你的承諾的實作。

好處

  • 提高回應性:允許程式在非同步操作完成前繼續執行,使應用程式更加回應迅速。
  • 可組合性:多個非同步操作可以清晰、可管理地組合、排序或平行執行。

Future 與 Promise 模式的未來與應使用案例項

在數位領域中,我們可以找到許多 Future 與 Promise 模式的應使用案例項,例如:

  • 線上購物訂單追蹤:當你線上上商城下單時,網站會立即提供訂單確認和追蹤編號(Future)。隨著訂單的處理、運送和交付,狀態更新(Promise 實作)會即時反映在追蹤頁面上,最終解析為最終的交付狀態。
  • 外送應用程式:當你透過外送應用程式訂購餐點時,你會獲得預估的送達時間(Future)。應用程式會持續更新訂單狀態——從準備到取貨再到送達(Promise 正在實作)——直到餐點送達你的門前,此時 Future 會隨著訂單的完成而被解析。
  • 客戶支援票證:當你在網站上提交支援票證時,你會立即收到票證編號和訊息,表示有人會回覆你(Future)。在幕後,支援團隊根據優先順序或收到票證的順序處理票證。一旦你的票證被處理,你就會收到回覆,實作了提交票證時所做的 Promise。

Future 與 Promise 模式的應用案例

至少有四種情況建議使用 Future 與 Promise 模式:

  1. 資料管道:在資料處理管道中,資料通常需要經過多個階段的轉換才能達到最終形式。透過使用 Future 表示每個階段,你可以有效地管理資料的非同步流程。例如,一個階段的輸出可以作為下一個階段的輸入,但由於每個階段都會傳回 Future,因此後續階段不必等待前一個階段完成。
  2. 任務排程:任務排程系統(例如作業系統或高階應用程式中的任務排程系統)可以使用 Future 表示將來要執行的任務。當任務被排程時,會傳回一個 Future 以表示該任務最終的完成情況。這使得系統或應用程式能夠追蹤任務的狀態而不會阻塞執行。
  3. 複雜的資料函式庫查詢或交易:非同步執行資料函式庫查詢對於保持應用程式的回應速度至關重要,特別是在使用者經驗至關重要的網頁應用程式中。透過使用 Future 表示資料函式庫操作的結果,應用程式可以啟動查詢並立即將控制權傳回給使用者介面或呼叫函式。Future 最終會解析為查詢結果,讓應用程式能夠更新 UI 或處理資料,而不會在等待資料函式庫回應時凍結或變得無回應。
  4. 檔案 I/O 操作:檔案 I/O 操作可能會對應用程式效能產生重大影響,特別是在主執行緒上同步執行時。透過套用 Future 與 Promise 模式,檔案 I/O 操作會被解除安裝到背景程式,並傳回一個 Future 以表示操作的完成。這種方法允許應用程式繼續執行其他任務或回應使用者互動,同時檔案正在被讀取或寫入。一旦 I/O 操作完成,Future 就會解析,應用程式就可以處理或顯示檔案資料。

在這些應用案例中,Future 與 Promise 模式促進了非同步操作,使應用程式能夠保持回應速度和效率,而不會因為長時間執行的任務而阻塞主執行緒。

實作 Future 與 Promise 模式 – 使用 concurrent.futures

要了解如何實作 Future 與 Promise 模式,首先必須瞭解其機制的三個步驟。接下來讓我們來分析這些步驟:

  1. 初始化:初始化步驟涉及使用函式啟動非同步操作,該函式會立即傳回一個「Future」物件,而不是等待操作完成。這個物件充當稍後可用的結果的佔位符。在內部,非同步函式會建立一個「Promise」物件。這個物件負責處理非同步操作的結果。Promise 與 Future 相關聯,這意味著 Promise 的狀態(無論是實作還是拒絕)將直接影響 Future。
  2. 執行:在執行步驟中,操作獨立於主程式流程進行。這使得程式能夠保持回應速度並繼續執行其他任務。一旦非同步任務完成,其結果就需要傳達給發起操作的程式部分。操作的結果(無論是成功的結果還是錯誤)會傳遞給先前建立的 Promise。
  3. 解析:如果操作成功,Promise 就會以結果「實作」。如果操作失敗,Promise 就會以錯誤「拒絕」。Promise 的實作或拒絕會解析 Future。使用結果通常是透過回呼函式或接續函式來完成的,這是一段指定如何處理結果的程式碼。Future 提供機制(例如方法或運算元)來指定這些回呼函式,這些函式將在 Future 被解析時執行。
from concurrent.futures import ThreadPoolExecutor, as_completed

#### 內容解密:
此段程式碼匯入了 `ThreadPoolExecutor`  `as_completed` 函式用於管理非同步任務的執行其中,`ThreadPoolExecutor` 用於建立一個執行緒池 `as_completed` 函式則用於迭代已完成的 Future 物件

在我們的範例中,我們使用 ThreadPoolExecutor 類別的例項來非同步執行任務。submit 方法傳回一個 Future 物件,該物件最終將包含計算結果。

with ThreadPoolExecutor() as executor:
    future = executor.submit(some_function, args)

#### 內容解密:
此段程式碼展示瞭如何使用 `ThreadPoolExecutor` 提交一個非同步任務。`with` 陳述式確保執行緒池在使用後正確關閉。`executor.submit` 方法提交任務並傳回一個代表任務結果的 Future 物件