Python 的 multiprocessing
模組為 CPU 密集型任務提供了平行處理的有效途徑。它允許多個程式同時執行,充分利用多核心處理器的優勢,進而提升程式效能。multiprocessing
提供了 Process
、Pool
、Queue
和 Pipe
等類別,方便開發者建立、管理程式以及處理程式間的通訊。相較於多執行緒,多程式更能有效利用多核心 CPU,且能避免 GIL 的限制,但程式間通訊和同步機制相對複雜。理解 spawn
、fork
等不同啟動方法,以及 Windows 和 POSIX 系統的差異,有助於撰寫跨平臺的多程式程式。使用 Queue 或 Pipe 等機制實作資料交換,Semaphore 或 Manager 等機制則能有效管理分享資源與同步,避免競爭性問題。
程式的建立
multiprocessing
模組提供了三種方法來建立新的程式:
- 分叉(Fork):這種方法只適用於POSIX系統,透過複製父程式的記憶體空間來建立新的程式。
- 建立新的程式:這種方法適用於POSIX和Windows系統,透過建立新的程式來執行指定的任務。
- 啟動分叉伺服器:這種方法只適用於POSIX系統,透過啟動一個分叉伺服器來建立新的程式。
Python API
Python的多程式API非常簡單,主要包括以下幾個部分:
multiprocessing.Process
:代表一個程式,可以用來建立新的程式。multiprocessing.Pool
:代表一個程式池,可以用來管理多個程式。multiprocessing.Queue
:代表一個佇列,可以用來在程式之間傳遞資料。multiprocessing.Pipe
:代表一個管道,可以用來在程式之間傳遞資料。
範例
以下是一個簡單的範例,示範如何使用multiprocessing
模組來建立新的程式:
import multiprocessing as mp
import os
def to_celsius(f):
c = (f - 32) * (5/9)
pid = os.getpid()
print(f"Process {pid} converted {f} to {c}")
if __name__ == "__main__":
f = 100
p = mp.Process(target=to_celsius, args=(f,))
p.start()
p.join()
這個範例建立了一個新的程式,該程式執行to_celsius
函式,將華氏溫度轉換為攝氏溫度。
內容解密:
multiprocessing
模組提供了一個簡單的API來建立多個程式。- 程式的建立可以透過分叉、建立新的程式和啟動分叉伺服器等方法來實作。
- Python API包括
Process
、Pool
、Queue
和Pipe
等類別,可以用來管理多個程式和傳遞資料。 - 範例示範如何使用
multiprocessing
模組來建立新的程式。
圖表翻譯:
flowchart TD A[主程式] --> B[建立新的程式] B --> C[執行任務] C --> D[傳遞資料] D --> E[結束程式]
這個圖表示範了主程式如何建立新的程式,執行任務,傳遞資料和結束程式。
多程式平行處理
在 Python 中,multiprocessing
模組提供了一種方式來實作多程式平行處理。這種方法可以用來加速計算密集型任務的執行。
基本使用
以下是一個基本的例子,展示如何使用 multiprocessing
來建立一個新的程式:
import multiprocessing as mp
def to_celsius(f):
c = (f - 32) * (5/9)
pid = mp.current_process().pid
print(f"{f}F is {c}C (pid {pid})")
if __name__ == '__main__':
mp.set_start_method('spawn')
p = mp.Process(target=to_celsius, args=(110,))
p.start()
在這個例子中,我們定義了一個函式 to_celsius
,它將華氏溫度轉換為攝氏溫度。然後,我們建立了一個新的程式,指定 to_celsius
作為目標函式,並傳遞 110
作為引數。
多程式池
如果你想要執行多個任務,可以使用 Pool
類別來建立一個程式池。以下是一個例子:
import multiprocessing as mp
def to_celsius(f):
c = (f - 32) * (5/9)
pid = mp.current_process().pid
print(f"{f}F is {c}C (pid {pid})")
if __name__ == '__main__':
mp.set_start_method('spawn')
with mp.Pool(4) as pool:
pool.map(to_celsius, range(110, 150, 10))
在這個例子中,我們建立了一個包含 4 個程式的程式池。然後,我們使用 map
方法將 to_celsius
函式應用於一個範圍的值(從 110 到 150,步長為 10)。
程式池的工作原理
當你使用 Pool
類別時,Python 會建立一個指定數量的程式(在這個例子中是 4)。每個程式都會執行 to_celsius
函式,並接受一個引數。當一個程式完成了任務後,它會被重新使用,以執行下一個任務。
你可以透過設定 maxtasksperchild
引數來控制每個程式執行的任務數量。例如:
with mp.Pool(4, maxtasksperchild=1) as pool:
pool.map(to_celsius, range(110, 150, 10))
這樣,每個程式只會執行一個任務,然後就會終止。
多程式與多執行緒的比較
在進行多工的處理時,程式設計師可以選擇多程式(Multiprocessing)或多執行緒(Multithreading)來實作。這兩種方法都可以用於提高程式的效率和回應速度,但是它們之間存在著一些差異。
多程式(Multiprocessing)
多程式是指在同一時間內,多個程式同時執行。每個程式都有自己的記憶體空間和系統資源。多程式可以用於CPU密集型任務,例如科學計算、資料壓縮等。
多程式的優點
- 可以充分利用多核CPU的資源,提高程式的效率。
- 每個程式都有自己的記憶體空間,減少了記憶體衝突的可能性。
多程式的缺點
- 建立程式的成本較高,需要進行記憶體分配和初始化。
- 程式之間的通訊較為複雜,需要使用IPC(Inter-Process Communication)機制。
多執行緒(Multithreading)
多執行緒是指在同一時間內,多個執行緒同時執行。執行緒是程式的一部分,分享同一塊記憶體空間。多執行緒可以用於IO密集型任務,例如網路請求、檔案操作等。
多執行緒的優點
- 建立執行緒的成本較低,需要進行堆積疊分配和初始化。
- 執行緒之間的通訊較為簡單,分享同一塊記憶體空間。
多執行緒的缺點
- 執行緒之間可能存在記憶體衝突和同步問題。
- 在單核CPU上,多執行緒可能會降低程式的效率,因為執行緒切換的成本較高。
Python中的多程式和多執行緒
Python提供了multiprocessing
和threading
模組來支援多程式和多執行緒。multiprocessing
模組提供了建立程式、管理程式和進行程式間通訊的功能。threading
模組提供了建立執行緒、管理執行緒和進行執行緒間通訊的功能。
Python中的多程式示例
import multiprocessing
def worker(num):
print(f"Worker {num} started")
# 進行一些工作
print(f"Worker {num} finished")
if __name__ == "__main__":
processes = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()
Python中的多執行緒示例
import threading
def worker(num):
print(f"Worker {num} started")
# 進行一些工作
print(f"Worker {num} finished")
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
多程式中的子程式執行
在多程式環境中,子程式的執行是透過 multiprocessing.spawn
模組來實作的。這個模組負責建立新的 Python 直譯器程式,並執行指定的程式碼。
子程式入口點
子程式的入口點是 spawn_main
函式,它接收兩個引數:pipe_handle
和 parent_pid
(在 Windows 中)或 tracker_fd
(在 POSIX 系統中)。這個函式負責執行子程式的程式碼,並與父程式進行通訊。
def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
#...
Windows 平臺下的實作
在 Windows 平臺下,spawn_main
函式會呼叫 OpenProcess
函式來開啟父程式的控制程式碼,並建立一個新的檔案描述符 fd
來與父程式進行通訊。
if sys.platform == 'win32':
import msvcrt
import _winapi
if parent_pid is not None:
source_process = _winapi.OpenProcess(
_winapi.SYNCHRONIZE | _winapi.PROCESS_DUP_HANDLE,
False, parent_pid)
else:
source_process = None
new_handle = reduction.duplicate(pipe_handle, source_process=source_process)
fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
parent_sentinel = source_process
POSIX 平臺下的實作
在 POSIX 平臺下,pipe_handle
直接作為檔案描述符 fd
來使用,並且被複製以成為 parent_sentinel
的值。
else:
from. import resource_tracker
resource_tracker._resource_tracker._fd = tracker_fd
fd = pipe_handle
parent_sentinel = os.dup(pipe_handle)
執行子程式程式碼
最後,_main
函式被呼叫,以檔案描述符 fd
和父程式的訊號標誌 parent_sentinel
作為引數。這個函式的傳回值將成為子程式的離開程式碼,並且 Python 直譯器將離開。
_main(fd, parent_sentinel)
這個過程保證了子程式可以正確地執行指定的程式碼,並與父程式進行通訊。
多程式平行
在多程式平行中, _main()
函式扮演著重要角色。它負責從父程式接收資料,並根據這些資料進行相應的處理。下面是 _main()
函式的具體實作:
def _main(fd, parent_sentinel):
with os.fdopen(fd, 'rb', closefd=True) as from_parent:
process.current_process()._inheriting = True
try:
preparation_data = reduction.pickle.load(from_parent)
prepare(preparation_data)
self = reduction.pickle.load(from_parent)
finally:
del process.current_process()._inheriting
return self._bootstrap(parent_sentinel)
_main()
函式解析
檔案描述符操作:函式首先開啟檔案描述符
fd
,並設定為二進位制讀取模式 ('rb'
)。這個檔案描述符是由父程式建立的管道的一部分,用於父子程式之間的通訊。繼承標誌:設定
process.current_process()._inheriting
為True
,表示當前程式正在繼承父程式的某些屬性。載入序列化資料:使用
reduction.pickle.load(from_parent)
載入父程式序列化的資料。這些資料包括了初始化資料 (preparation_data
) 和一個SpawnProcess
例項。初始化:呼叫
prepare(preparation_data)
進行初始化工作。這一步驟根據父程式提供的資料對子程式進行必要的設定。載入
SpawnProcess
例項:載入父程式序列化的SpawnProcess
例項,並指定給self
。最終清理:無論是否發生異常,都會刪除
_inheriting
屬性,以確保資源的釋放。啟動
_bootstrap()
:最後,呼叫self._bootstrap(parent_sentinel)
啟動子程式的_bootstrap()
方法。這一步驟負責根據序列化的資料和父程式的訊號,執行子程式的主邏輯,包括執行目標函式和傳遞引數。
_bootstrap()
方法
_bootstrap()
方法是 BaseProcess
類別的一部分,負責根據父程式提供的序列化資料,建立一個新的 BaseProcess
例項,並執行其 run()
方法。這個過程包括了目標函式的執行以及傳遞給它的引數。
def _bootstrap(parent_sentinel):
# 根據序列化資料建立 BaseProcess 例項
#...
# 執行目標函式
self.run()
多程式平行處理
在進行多程式平行處理時,需要考慮到各個程式之間的溝通和資料交換。Python 的 multiprocessing
模組提供了一個簡單的方式來建立多程式平行處理的環境。
程式建立
當建立一個新程式時,需要定義一個目標函式,這個函式將在新程式中執行。以下是建立一個新程式的基本步驟:
- 定義目標函式:這個函式將在新程式中執行。
- 建立一個新程式:使用
multiprocessing.Process
類別建立一個新程式。 - 啟動新程式:使用
start()
方法啟動新程式。
程式溝通
在多程式平行處理中,各個程式之間的溝通是非常重要的。Python 的 multiprocessing
模組提供了幾種方式來實作程式之間的溝通,包括:
- Queue:是一種先進先出的資料結構,可以用來實作程式之間的資料交換。
- Pipe:是一種全雙工的通訊管道,可以用來實作程式之間的資料交換。
多程式平行處理的優點
多程式平行處理有幾個優點:
- 可以充分利用多核心 CPU 的資源,提高程式的執行效率。
- 可以實作多工平行處理,提高程式的回應速度。
多程式平行處理的缺點
多程式平行處理也有一些缺點:
- 建立新程式的成本比較高,需要花費更多的資源。
- 程式之間的溝通比較複雜,需要使用特殊的資料結構和通訊管道。
例項
以下是使用 multiprocessing
模組實作多程式平行處理的簡單例項:
import multiprocessing
import time
def worker(num):
print(f"Worker {num} started")
time.sleep(2)
print(f"Worker {num} finished")
if __name__ == "__main__":
processes = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()
在這個例項中,我們建立了 5 個新程式,每個程式執行 worker
函式。worker
函式只是睡眠 2 秒然後列印一條訊息。主程式建立了 5 個新程式,並啟動了它們。最後,主程式等待所有新程式完成。
內容解密:
在這個例項中,我們使用 multiprocessing
模組建立了 5 個新程式。每個新程式執行 worker
函式,該函式只是睡眠 2 秒然後列印一條訊息。主程式建立了 5 個新程式,並啟動了它們。最後,主程式等待所有新程式完成。
這個例項展示瞭如何使用 multiprocessing
模組實作多程式平行處理。它也展示瞭如何使用 Process
類別建立新程式,以及如何使用 start()
方法啟動新程式。
圖表翻譯:
flowchart TD A[主程式] --> B[建立新程式] B --> C[啟動新程式] C --> D[新程式執行] D --> E[新程式完成] E --> F[主程式等待] F --> G[主程式完成]
這個圖表展示了主程式和新程式之間的關係。主程式建立了 5 個新程式,並啟動了它們。每個新程式執行 worker
函式,該函式只是睡眠 2 秒然後列印一條訊息。最後,主程式等待所有新程式完成。
圖表說明:
這個圖表展示了多程式平行處理的流程。主程式建立了 5 個新程式,並啟動了它們。每個新程式執行 worker
函式,該函式只是睡眠 2 秒然後列印一條訊息。最後,主程式等待所有新程式完成。
這個圖表有助於理解多程式平行處理的原理和流程。它展示瞭如何使用 multiprocessing
模組建立新程式,以及如何使用 start()
方法啟動新程式。它也展示瞭如何使用 join()
方法等待所有新程式完成。
多程式間的資料交換與同步
在多程式的環境中,資料交換和同步是非常重要的。Python 的 multiprocessing
模組提供了幾種方法來實作這些功能,包括佇列(Queue)和管道(Pipe)。
佇列(Queue)
佇列是一種先進先出的資料結構,適合用於多個程式之間的資料交換。multiprocessing
模組提供了 Queue
類別來實作佇列。
import multiprocessing as mp
def worker(queue):
# 從佇列中取出資料
data = queue.get()
print(f"Worker 取出資料:{data}")
if __name__ == '__main__':
# 建立一個佇列
queue = mp.Queue()
# 將資料放入佇列
queue.put("Hello, World!")
# 建立一個程式
p = mp.Process(target=worker, args=(queue,))
# 啟動程式
p.start()
# 等待程式結束
p.join()
管道(Pipe)
管道是一種全雙工的通訊方式,適合用於兩個程式之間的資料交換。multiprocessing
模組提供了 Pipe
類別來實作管道。
import multiprocessing as mp
def worker(conn):
# 從管道中取出資料
data = conn.recv()
print(f"Worker 取出資料:{data}")
# 將資料送回管道
conn.send("Hello, World!")
if __name__ == '__main__':
# 建立一個管道
parent_conn, child_conn = mp.Pipe()
# 建立一個程式
p = mp.Process(target=worker, args=(child_conn,))
# 啟動程式
p.start()
# 將資料送入管道
parent_conn.send("Hello, Worker!")
# 從管道中取出資料
data = parent_conn.recv()
print(f"Parent 取出資料:{data}")
# 等待程式結束
p.join()
Semaphore
Semaphore 是一種同步機制,用於控制多個程式對分享資源的存取。multiprocessing
模組提供了 Semaphore
類別來實作 Semaphore。
import multiprocessing as mp
import time
def worker(sem):
# 封鎖 Semaphore
sem.acquire()
try:
# 執行任務
print("Worker 執行任務")
time.sleep(2)
finally:
# 釋放 Semaphore
sem.release()
if __name__ == '__main__':
# 建立一個 Semaphore
sem = mp.Semaphore(1)
# 建立多個程式
processes = []
for i in range(5):
p = mp.Process(target=worker, args=(sem,))
processes.append(p)
p.start()
# 等待所有程式結束
for p in processes:
p.join()
Manager
Manager 是一種高階別的同步機制,用於控制多個程式對分享資源的存取。multiprocessing
模組提供了 Manager
類別來實作 Manager。
import multiprocessing as mp
def worker(queue):
# 從佇列中取出資料
data = queue.get()
print(f"Worker 取出資料:{data}")
if __name__ == '__main__':
# 建立一個 Manager
with mp.Manager() as manager:
# 建立一個佇列
queue = manager.Queue()
# 將資料放入佇列
queue.put("Hello, World!")
# 建立一個程式
p = mp.Process(target=worker, args=(queue,))
# 啟動程式
p.start()
# 等待程式結束
p.join()
這些同步機制可以用於控制多個程式對分享資源的存取,從而實作多程式間的資料交換和同步。
平行處理與競爭性
在平行處理中,多個任務可以同時執行,以提高程式的效率。然而,在多個任務之間分享資源時,可能會出現競爭性問題。為瞭解決這個問題,我們可以使用佇列(Queue)來管理分享資源的存取。
使用佇列管理分享資源
以下是使用 Python 的 multiprocessing
模組和 Queue
類別來示範平行處理和競爭性管理的範例:
import multiprocessing
def to_celsius(inputs, outputs):
# 將輸入值轉換為攝氏溫度
results = []
for value in inputs:
results.append((value, (value - 32) * 5/9))
outputs.put(results)
if __name__ == '__main__':
# 建立輸入和輸出佇列
inputs = multiprocessing.Queue()
outputs = multiprocessing.Queue()
# 建立輸入值列表
input_values = list(range(110, 150, 10))
# 將輸入值放入輸入佇列
for value in input_values:
inputs.put(value)
# 建立作業員池
pool = multiprocessing.Pool()
# 將工作任務新增到作業員池
pool.apply(to_celsius, (inputs, outputs))
# 取出輸出值
for _ in input_values:
print(outputs.get(block=False))
在這個範例中,我們建立了兩個佇列:inputs
和 outputs
。我們將輸入值放入 inputs
佇列,然後使用 apply
方法將工作任務新增到作業員池中。作業員池中的作業員會從 inputs
佇列中取出輸入值,進行轉換,然後將結果放入 outputs
佇列中。最後,我們取出 outputs
佇列中的值並印出。
結果
執行這個程式會輸出:
(110, 43.333333333333336)
(120, 48.88888888888889)
(130, 54.44444444444445)
(140, 60.0)
這個範例示範瞭如何使用佇列來管理分享資源的存取,以避免競爭性問題。
Mermaid 圖表
flowchart TD A[主程式] --> B[建立輸入佇列] B --> C[建立輸出佇列] C --> D[將輸入值放入輸入佇列] D --> E[建立作業員池] E --> F[將工作任務新增到作業員池] F --> G[作業員池中的作業員取出輸入值] G --> H[作業員池中的作業員進行轉換] H --> I[作業員池中的作業員將結果放入輸出佇列] I --> J[取出輸出值]
圖表翻譯:
這個圖表示範了主程式建立輸入和輸出佇列,將輸入值放入輸入佇列,建立作業員池,將工作任務新增到作業員池,作業員池中的作業員取出輸入值,進行轉換,將結果放入輸出佇列,最後取出輸出值。
內容解密:
這個範例使用了 Python 的 multiprocessing
模組和 Queue
類別來示範平行處理和競爭性管理。透過使用佇列來管理分享資源的存取,可以避免競爭性問題。作業員池中的作業員可以從輸入佇列中取出輸入值,進行轉換,然後將結果放入輸出佇列中。最後,主程式可以取出輸出值。
從效能最佳化視角來看,Python 的 multiprocessing
模組提供了一種有效利用多核心 CPU 資源的方法,尤其適用於 CPU 密集型任務。透過建立多個程式,可以將任務平行化,從而顯著提升程式執行效率。然而,程式間的資料交換和同步是一個關鍵挑戰。Queue
、Pipe
、Semaphore
和 Manager
等機制提供了必要的工具,但使用時需要仔細考量效能開銷。例如,Queue
的序列化和反序列化操作可能會成為瓶頸,而 Pipe
更適用於雙向通訊。此外,過多的程式建立也會增加系統負擔,需要根據實際情況調整程式池的大小。對於重視效能的應用,建議深入理解不同 IPC 機制的特性,並結合實際場景進行效能測試和最佳化。未來,隨著 Python 生態系統的發展,預計會有更高效的程式間通訊和同步機製出現,進一步簡化平行處理的開發流程並提升效能。對於追求極致效能的應用,可以考慮結合 Cython 或其他原生程式碼擴充套件來提升關鍵程式碼的執行速度。玄貓認為,multiprocessing
模組雖然引入了一定的複雜性,但其提供的平行處理能力對於提升 CPU 密集型任務的效能至關重要,值得 Python 開發者深入學習和應用。