Python 的 concurrent.futures 模組提供了一種簡潔有效的方式來實作平行處理,透過 ThreadPoolExecutor 和 ProcessPoolExecutor 分別運用執行緒和行程來提升程式效能。然而,concurrent.futures 本身缺乏更進階的任務管理功能。Futurist 根據 concurrent.futures 擴充套件,提供了更豐富的功能,例如監控任務執行狀態、取得統計資訊等,讓開發者更方便地控制平行任務。對於需要長時間執行的背景程式,Cotyledon 提供了更強大的控制能力,使其成為管理 daemon 程式的理想選擇。
平行處理的效能優勢
平行處理(concurrent processing)是指在同一時間內,執行多個任務或程式,以提高系統的效能和生產力。在 Python 中,concurrent.futures 模組提供了一種簡單且高效的方式來實作平行處理。
執行緒(Thread)與程式(Process)
在 concurrent.futures 中,有兩種方式來實作平行處理:執行緒(Thread)和程式(Process)。執行緒是指在同一程式中,執行多個任務或程式,而程式是指在不同的程式中,執行多個任務或程式。
使用 concurrent.futures.ThreadPoolExecutor
使用 concurrent.futures.ThreadPoolExecutor 來實作執行緒平行處理,可以簡單地透過提交任務到執行緒池中來實作。以下是範例程式碼:
import concurrent.futures
import random
def compute():
return sum([random.randint(1, 100) for i in range(1000000)])
with concurrent.futures.ThreadPoolExecutor() as executor:
futs = [executor.submit(compute) for _ in range(8)]
results = [f.result() for f in futs]
print("Results: %s" % results)
使用 concurrent.futures.ProcessPoolExecutor
使用 concurrent.futures.ProcessPoolExecutor 來實作程式平行處理,可以簡單地透過提交任務到程式池中來實作。以下是範例程式碼:
import concurrent.futures
import random
def compute():
return sum([random.randint(1, 100) for i in range(1000000)])
with concurrent.futures.ProcessPoolExecutor() as executor:
futs = [executor.submit(compute) for _ in range(8)]
results = [f.result() for f in futs]
print("Results: %s" % results)
效能比較
使用程式平行處理比使用執行緒平行處理更快,因為程式可以利用多核 CPU 的優勢。以下是範例輸出:
$ time python futures-threads-worker.py
Results: [50485099, 50461662, 50553224, 50458097, 50520276,...
python futures-threads-worker.py 19.48s user 0.30s syste
$ time python futures-processes-worker.py
Results: [50485099, 50461662, 50553224, 50458097, 50520276,...
python futures-processes-worker.py 2.35s user 0.10s syste
注意事項
使用 concurrent.futures 時,需要注意的是,執行緒和程式的管理方式不同。執行緒池會為每個任務建立一個新的執行緒,而程式池會為每個任務建立一個新的程式。這意味著,如果您提交了太多工,執行緒池可能會建立太多執行緒,而程式池可能會建立太多程式。
圖表翻譯:
flowchart TD
A[開始] --> B[提交任務]
B --> C[執行緒池/程式池]
C --> D[任務執行]
D --> E[結果傳回]
E --> F[結束]
內容解密:
以上範例程式碼展示瞭如何使用 concurrent.futures 來實作平行處理。使用 ThreadPoolExecutor 和 ProcessPoolExecutor 可以簡單地提交任務到執行緒池或程式池中,並取得結果。需要注意的是,執行緒和程式的管理方式不同,需要根據具體情況選擇合適的方式。
進階的Futures使用
在第2.3節中,我們瞭解瞭如何使用Futures來平行化任務。Futurist函式庫是在concurrent.futures基礎上構建的,並提供了一些額外的功能。在這裡,我們將介紹這些功能。由於Futurist函式庫幾乎是對concurrent.futures的透明替換,因此任何根據concurrent.futures的程式碼都可以輕鬆地適應這個函式庫。
使用Futurist.ThreadPoolExecutor的工作者
以下是一個使用Futurist函式庫的ThreadPoolExecutor的例子:
import futurist
from futurist import waiters
import random
def compute():
# 進行一些計算任務
return sum([random.randint(1, 100) for i in range(10000)])
with futurist.ThreadPoolExecutor(max_workers=8) as executor:
# 提交8個計算任務
futs = [executor.submit(compute) for _ in range(8)]
# 列印執行器的統計資訊
print(executor.statistics)
# 等待所有任務完成
results = waiters.wait_for_all(futs)
在這個例子中,我們建立了一個ThreadPoolExecutor執行器,最大工作者數為8。然後,我們提交8個計算任務,並等待所有任務完成。最終,我們列印執行器的統計資訊。
進階功能
Futurist函式庫提供了一些進階功能,例如:
- wait_for_all:等待所有Future物件完成。
- statistics:獲得執行器的統計資訊,例如已完成任務數、正在執行任務數等。
這些功能可以幫助您更好地控制和監視您的平行化任務。
圖表翻譯:
graph LR
A[建立執行器] --> B[提交任務]
B --> C[等待任務完成]
C --> D[列印統計資訊]
在這個圖表中,我們展示了建立執行器、提交任務、等待任務完成和列印統計資訊的流程。這個圖表幫助您瞭解Futurist函式庫的基本使用流程。
使用Futurist進行平行計算與任務管理
Futurist是一個強大的Python函式庫,提供了多種方式來進行平行計算和任務管理。以下是使用Futurist進行平行計算和任務管理的範例。
執行緒池(ThreadPoolExecutor)
Futurist提供了執行緒池(ThreadPoolExecutor)的功能,允許使用者提交任務到執行緒池中進行執行。以下是使用執行緒池的範例:
import futurist
from futurist import ThreadPoolExecutor
def compute():
return sum([random.randint(1, 100) for _ in range(1000000)])
with ThreadPoolExecutor(max_workers=8) as executor:
futs = [executor.submit(compute) for _ in range(20)]
results = [f.result() for f in futs]
print("Results:", results)
在這個範例中,我們建立了一個執行緒池,最大工作者數為8。然後,我們提交了20個任務到執行緒池中進行執行。最後,我們列印預出了所有任務的結果。
限制佇列大小
Futurist還提供了限制佇列大小的功能,避免記憶體溢位。以下是使用限制佇列大小的範例:
import futurist
from futurist import rejection
def compute():
return sum([random.randint(1, 100) for _ in range(1000000)])
with futurist.ThreadPoolExecutor(
max_workers=8,
check_and_reject=rejection.reject_when_reached(2)
) as executor:
futs = [executor.submit(compute) for _ in range(20)]
results = [f.result() for f in futs]
print("Results:", results)
在這個範例中,我們建立了一個執行緒池,最大工作者數為8,並且限制佇列大小為2。如果提交的任務數量超過2,則會引發futurist.RejectedSubmission異常。
定期執行任務
Futurist還提供了定期執行任務的功能,允許使用者提交任務到定期執行器中進行執行。以下是使用定期執行任務的範例:
import time
from futurist import periodics
@periodics.periodic(1)
def every_one(started_at):
print("1:", time.time() - started_at)
w = periodics.PeriodicWorker([
(every_one, (time.time(),), {}),
])
在這個範例中,我們建立了一個定期執行器,每1秒鐘執行一次every_one函式。
使用Futurist實作週期性任務
Futurist是一個強大的Python函式庫,允許您輕鬆地實作週期性任務。以下是使用Futurist實作週期性任務的範例:
import time
from futurist import periodics
def print_stats():
print("統計:", [w.iter_watchers() for w in periodics._watchers])
w = periodics.PeriodicWorker()
w.add(print_stats)
w.start()
while True:
print("任務執行中:", time.time())
time.sleep(1)
在這個範例中,我們定義了一個print_stats函式,該函式會列印預出當前的統計資訊。然後,我們建立了一個PeriodicWorker物件,並將print_stats函式新增到其中。最後,我們啟動了週期性任務,並在無窮迴圈中執行任務。
輸出結果
當我們執行這個範例時,輸出結果如下:
任務執行中: 1.00364780426
任務執行中: 2.00827693939
任務執行中: 3.00964093208
統計: [<Watcher object at 0x1104fc790 (runs=3, successes=3), <Watcher object at 0x1104fc810 (runs=0, successes=0)]
任務執行中: 4.00993490219
任務執行中: 5.01245594025
任務執行中: 6.01481294632
任務執行中: 7.0150718689
統計: [<Watcher object at 0x1104fc790 (runs=7, successes=7), <Watcher object at 0x1104fc810 (runs=1, successes=1)]
...
從輸出結果中,我們可以看到任務正在週期性地執行,並且統計資訊正在被列印預出來。
內容解密:
periodics.PeriodicWorker():建立了一個週期性任務工作者物件。w.add(print_stats):將print_stats函式新增到週期性任務工作者中。w.start():啟動了週期性任務。while True::在無窮迴圈中執行任務。print("任務執行中:", time.time()):列印預出當前的時間。time.sleep(1):暫停1秒鐘。
圖表翻譯:
flowchart TD
A[啟動週期性任務] --> B[執行任務]
B --> C[列印統計資訊]
C --> D[暫停1秒鐘]
D --> B
這個圖表展示了週期性任務的執行流程。首先,啟動週期性任務,然後執行任務,列印統計資訊,暫停1秒鐘,然後再次執行任務。
使用 Cotyledon 建立長時間執行的背景程式
在 Python 中,使用多個程式來安排不同的工作是非常有效的。一個常見的使用案例是執行長時間執行的背景程式(通常稱為 daemon),這些程式負責定期安排某些任務或從佇列中處理工作。
雖然可以使用 concurrent.futures 和 ProcessPoolExecutor 來實作這一點,但這種方法並不提供對工作派遣的控制。同樣,使用 multiprocessing 模組也存在這個問題。這兩種方法都使得控制背景任務的執行變得困難。
什麼是 Cotyledon?
Cotyledon 是一個 Python 函式庫,旨在建立長時間執行的程式。它提供了一種簡單的方式來建立和管理 daemon 程式。
從系統資源消耗與處理效率的衡量來看,Python 平行處理技術能顯著提升程式效能。本文分析了 concurrent.futures、futurist 和 cotyledon 三種重要的平行處理工具,比較了執行緒與程式兩種平行方式的效能差異,並深入探討瞭如何利用這些工具實作更精細的任務管理,例如限制佇列大小、週期性任務執行以及長時間執行背景程式。然而,平行處理並非沒有限制,開發者仍需關注任務之間的同步、資源競爭以及死鎖等潛在問題。對於追求極致效能的應用,更需仔細評估任務粒度、CPU 核心數以及 I/O 瓶頸等因素。展望未來,隨著 Python 生態的持續發展,預計會有更多更精細化的平行處理工具出現,進一步簡化開發流程並提升程式效能。對於注重效能的 Python 開發者而言,深入理解並掌握這些工具將是提升程式效能的關鍵。玄貓認為,在處理 CPU 密集型任務時,ProcessPoolExecutor 與 cotyledon 更具優勢;而 I/O 密集型任務則更適合使用 ThreadPoolExecutor 和 futurist。選擇正確的工具並妥善管理資源,才能最大化發揮平行處理的效能優勢。