Python 的 concurrent.futures 模組提供了一種簡潔有效的方式來實作平行處理,透過 ThreadPoolExecutor 和 ProcessPoolExecutor 分別運用執行緒和行程來提升程式效能。然而,concurrent.futures 本身缺乏更進階的任務管理功能。Futurist 根據 concurrent.futures 擴充套件,提供了更豐富的功能,例如監控任務執行狀態、取得統計資訊等,讓開發者更方便地控制平行任務。對於需要長時間執行的背景程式,Cotyledon 提供了更強大的控制能力,使其成為管理 daemon 程式的理想選擇。

平行處理的效能優勢

平行處理(concurrent processing)是指在同一時間內,執行多個任務或程式,以提高系統的效能和生產力。在 Python 中,concurrent.futures 模組提供了一種簡單且高效的方式來實作平行處理。

執行緒(Thread)與程式(Process)

concurrent.futures 中,有兩種方式來實作平行處理:執行緒(Thread)和程式(Process)。執行緒是指在同一程式中,執行多個任務或程式,而程式是指在不同的程式中,執行多個任務或程式。

使用 concurrent.futures.ThreadPoolExecutor

使用 concurrent.futures.ThreadPoolExecutor 來實作執行緒平行處理,可以簡單地透過提交任務到執行緒池中來實作。以下是範例程式碼:

import concurrent.futures
import random

def compute():
    return sum([random.randint(1, 100) for i in range(1000000)])

with concurrent.futures.ThreadPoolExecutor() as executor:
    futs = [executor.submit(compute) for _ in range(8)]
    results = [f.result() for f in futs]
    print("Results: %s" % results)

使用 concurrent.futures.ProcessPoolExecutor

使用 concurrent.futures.ProcessPoolExecutor 來實作程式平行處理,可以簡單地透過提交任務到程式池中來實作。以下是範例程式碼:

import concurrent.futures
import random

def compute():
    return sum([random.randint(1, 100) for i in range(1000000)])

with concurrent.futures.ProcessPoolExecutor() as executor:
    futs = [executor.submit(compute) for _ in range(8)]
    results = [f.result() for f in futs]
    print("Results: %s" % results)

效能比較

使用程式平行處理比使用執行緒平行處理更快,因為程式可以利用多核 CPU 的優勢。以下是範例輸出:

$ time python futures-threads-worker.py
Results: [50485099, 50461662, 50553224, 50458097, 50520276,...
python futures-threads-worker.py 19.48s user 0.30s syste

$ time python futures-processes-worker.py
Results: [50485099, 50461662, 50553224, 50458097, 50520276,...
python futures-processes-worker.py 2.35s user 0.10s syste

注意事項

使用 concurrent.futures 時,需要注意的是,執行緒和程式的管理方式不同。執行緒池會為每個任務建立一個新的執行緒,而程式池會為每個任務建立一個新的程式。這意味著,如果您提交了太多工,執行緒池可能會建立太多執行緒,而程式池可能會建立太多程式。

圖表翻譯:

  flowchart TD
    A[開始] --> B[提交任務]
    B --> C[執行緒池/程式池]
    C --> D[任務執行]
    D --> E[結果傳回]
    E --> F[結束]

內容解密:

以上範例程式碼展示瞭如何使用 concurrent.futures 來實作平行處理。使用 ThreadPoolExecutorProcessPoolExecutor 可以簡單地提交任務到執行緒池或程式池中,並取得結果。需要注意的是,執行緒和程式的管理方式不同,需要根據具體情況選擇合適的方式。

進階的Futures使用

在第2.3節中,我們瞭解瞭如何使用Futures來平行化任務。Futurist函式庫是在concurrent.futures基礎上構建的,並提供了一些額外的功能。在這裡,我們將介紹這些功能。由於Futurist函式庫幾乎是對concurrent.futures的透明替換,因此任何根據concurrent.futures的程式碼都可以輕鬆地適應這個函式庫。

使用Futurist.ThreadPoolExecutor的工作者

以下是一個使用Futurist函式庫的ThreadPoolExecutor的例子:

import futurist
from futurist import waiters
import random

def compute():
    # 進行一些計算任務
    return sum([random.randint(1, 100) for i in range(10000)])

with futurist.ThreadPoolExecutor(max_workers=8) as executor:
    # 提交8個計算任務
    futs = [executor.submit(compute) for _ in range(8)]
    
    # 列印執行器的統計資訊
    print(executor.statistics)
    
    # 等待所有任務完成
    results = waiters.wait_for_all(futs)

在這個例子中,我們建立了一個ThreadPoolExecutor執行器,最大工作者數為8。然後,我們提交8個計算任務,並等待所有任務完成。最終,我們列印執行器的統計資訊。

進階功能

Futurist函式庫提供了一些進階功能,例如:

  • wait_for_all:等待所有Future物件完成。
  • statistics:獲得執行器的統計資訊,例如已完成任務數、正在執行任務數等。

這些功能可以幫助您更好地控制和監視您的平行化任務。

圖表翻譯:

  graph LR
    A[建立執行器] --> B[提交任務]
    B --> C[等待任務完成]
    C --> D[列印統計資訊]

在這個圖表中,我們展示了建立執行器、提交任務、等待任務完成和列印統計資訊的流程。這個圖表幫助您瞭解Futurist函式庫的基本使用流程。

使用Futurist進行平行計算與任務管理

Futurist是一個強大的Python函式庫,提供了多種方式來進行平行計算和任務管理。以下是使用Futurist進行平行計算和任務管理的範例。

執行緒池(ThreadPoolExecutor)

Futurist提供了執行緒池(ThreadPoolExecutor)的功能,允許使用者提交任務到執行緒池中進行執行。以下是使用執行緒池的範例:

import futurist
from futurist import ThreadPoolExecutor

def compute():
    return sum([random.randint(1, 100) for _ in range(1000000)])

with ThreadPoolExecutor(max_workers=8) as executor:
    futs = [executor.submit(compute) for _ in range(20)]
    results = [f.result() for f in futs]
    print("Results:", results)

在這個範例中,我們建立了一個執行緒池,最大工作者數為8。然後,我們提交了20個任務到執行緒池中進行執行。最後,我們列印預出了所有任務的結果。

限制佇列大小

Futurist還提供了限制佇列大小的功能,避免記憶體溢位。以下是使用限制佇列大小的範例:

import futurist
from futurist import rejection

def compute():
    return sum([random.randint(1, 100) for _ in range(1000000)])

with futurist.ThreadPoolExecutor(
    max_workers=8,
    check_and_reject=rejection.reject_when_reached(2)
) as executor:
    futs = [executor.submit(compute) for _ in range(20)]
    results = [f.result() for f in futs]
    print("Results:", results)

在這個範例中,我們建立了一個執行緒池,最大工作者數為8,並且限制佇列大小為2。如果提交的任務數量超過2,則會引發futurist.RejectedSubmission異常。

定期執行任務

Futurist還提供了定期執行任務的功能,允許使用者提交任務到定期執行器中進行執行。以下是使用定期執行任務的範例:

import time
from futurist import periodics

@periodics.periodic(1)
def every_one(started_at):
    print("1:", time.time() - started_at)

w = periodics.PeriodicWorker([
    (every_one, (time.time(),), {}),
])

在這個範例中,我們建立了一個定期執行器,每1秒鐘執行一次every_one函式。

使用Futurist實作週期性任務

Futurist是一個強大的Python函式庫,允許您輕鬆地實作週期性任務。以下是使用Futurist實作週期性任務的範例:

import time
from futurist import periodics

def print_stats():
    print("統計:", [w.iter_watchers() for w in periodics._watchers])

w = periodics.PeriodicWorker()
w.add(print_stats)

w.start()

while True:
    print("任務執行中:", time.time())
    time.sleep(1)

在這個範例中,我們定義了一個print_stats函式,該函式會列印預出當前的統計資訊。然後,我們建立了一個PeriodicWorker物件,並將print_stats函式新增到其中。最後,我們啟動了週期性任務,並在無窮迴圈中執行任務。

輸出結果

當我們執行這個範例時,輸出結果如下:

任務執行中: 1.00364780426
任務執行中: 2.00827693939
任務執行中: 3.00964093208
統計: [<Watcher object at 0x1104fc790 (runs=3, successes=3), <Watcher object at 0x1104fc810 (runs=0, successes=0)]
任務執行中: 4.00993490219
任務執行中: 5.01245594025
任務執行中: 6.01481294632
任務執行中: 7.0150718689
統計: [<Watcher object at 0x1104fc790 (runs=7, successes=7), <Watcher object at 0x1104fc810 (runs=1, successes=1)]
...

從輸出結果中,我們可以看到任務正在週期性地執行,並且統計資訊正在被列印預出來。

內容解密:

  • periodics.PeriodicWorker():建立了一個週期性任務工作者物件。
  • w.add(print_stats) :將print_stats函式新增到週期性任務工作者中。
  • w.start() :啟動了週期性任務。
  • while True::在無窮迴圈中執行任務。
  • print("任務執行中:", time.time()) :列印預出當前的時間。
  • time.sleep(1) :暫停1秒鐘。

圖表翻譯:

  flowchart TD
    A[啟動週期性任務] --> B[執行任務]
    B --> C[列印統計資訊]
    C --> D[暫停1秒鐘]
    D --> B

這個圖表展示了週期性任務的執行流程。首先,啟動週期性任務,然後執行任務,列印統計資訊,暫停1秒鐘,然後再次執行任務。

使用 Cotyledon 建立長時間執行的背景程式

在 Python 中,使用多個程式來安排不同的工作是非常有效的。一個常見的使用案例是執行長時間執行的背景程式(通常稱為 daemon),這些程式負責定期安排某些任務或從佇列中處理工作。

雖然可以使用 concurrent.futuresProcessPoolExecutor 來實作這一點,但這種方法並不提供對工作派遣的控制。同樣,使用 multiprocessing 模組也存在這個問題。這兩種方法都使得控制背景任務的執行變得困難。

什麼是 Cotyledon?

Cotyledon 是一個 Python 函式庫,旨在建立長時間執行的程式。它提供了一種簡單的方式來建立和管理 daemon 程式。

從系統資源消耗與處理效率的衡量來看,Python 平行處理技術能顯著提升程式效能。本文分析了 concurrent.futuresfuturistcotyledon 三種重要的平行處理工具,比較了執行緒與程式兩種平行方式的效能差異,並深入探討瞭如何利用這些工具實作更精細的任務管理,例如限制佇列大小、週期性任務執行以及長時間執行背景程式。然而,平行處理並非沒有限制,開發者仍需關注任務之間的同步、資源競爭以及死鎖等潛在問題。對於追求極致效能的應用,更需仔細評估任務粒度、CPU 核心數以及 I/O 瓶頸等因素。展望未來,隨著 Python 生態的持續發展,預計會有更多更精細化的平行處理工具出現,進一步簡化開發流程並提升程式效能。對於注重效能的 Python 開發者而言,深入理解並掌握這些工具將是提升程式效能的關鍵。玄貓認為,在處理 CPU 密集型任務時,ProcessPoolExecutorcotyledon 更具優勢;而 I/O 密集型任務則更適合使用 ThreadPoolExecutorfuturist。選擇正確的工具並妥善管理資源,才能最大化發揮平行處理的效能優勢。