Python 的 multiprocessing 模組為 CPU 密集型任務提供了平行處理的有效途徑。它允許多個程式同時執行,充分利用多核心處理器的優勢,進而提升程式效能。multiprocessing 提供了 ProcessPoolQueuePipe 等類別,方便開發者建立、管理程式以及處理程式間的通訊。相較於多執行緒,多程式更能有效利用多核心 CPU,且能避免 GIL 的限制,但程式間通訊和同步機制相對複雜。理解 spawnfork 等不同啟動方法,以及 Windows 和 POSIX 系統的差異,有助於撰寫跨平臺的多程式程式。使用 Queue 或 Pipe 等機制實作資料交換,Semaphore 或 Manager 等機制則能有效管理分享資源與同步,避免競爭性問題。

程式的建立

multiprocessing模組提供了三種方法來建立新的程式:

  1. 分叉(Fork):這種方法只適用於POSIX系統,透過複製父程式的記憶體空間來建立新的程式。
  2. 建立新的程式:這種方法適用於POSIX和Windows系統,透過建立新的程式來執行指定的任務。
  3. 啟動分叉伺服器:這種方法只適用於POSIX系統,透過啟動一個分叉伺服器來建立新的程式。

Python API

Python的多程式API非常簡單,主要包括以下幾個部分:

  • multiprocessing.Process:代表一個程式,可以用來建立新的程式。
  • multiprocessing.Pool:代表一個程式池,可以用來管理多個程式。
  • multiprocessing.Queue:代表一個佇列,可以用來在程式之間傳遞資料。
  • multiprocessing.Pipe:代表一個管道,可以用來在程式之間傳遞資料。

範例

以下是一個簡單的範例,示範如何使用multiprocessing模組來建立新的程式:

import multiprocessing as mp
import os

def to_celsius(f):
    c = (f - 32) * (5/9)
    pid = os.getpid()
    print(f"Process {pid} converted {f} to {c}")

if __name__ == "__main__":
    f = 100
    p = mp.Process(target=to_celsius, args=(f,))
    p.start()
    p.join()

這個範例建立了一個新的程式,該程式執行to_celsius函式,將華氏溫度轉換為攝氏溫度。

內容解密:
  • multiprocessing模組提供了一個簡單的API來建立多個程式。
  • 程式的建立可以透過分叉、建立新的程式和啟動分叉伺服器等方法來實作。
  • Python API包括ProcessPoolQueuePipe等類別,可以用來管理多個程式和傳遞資料。
  • 範例示範如何使用multiprocessing模組來建立新的程式。

圖表翻譯:

  flowchart TD
    A[主程式] --> B[建立新的程式]
    B --> C[執行任務]
    C --> D[傳遞資料]
    D --> E[結束程式]

這個圖表示範了主程式如何建立新的程式,執行任務,傳遞資料和結束程式。

多程式平行處理

在 Python 中,multiprocessing 模組提供了一種方式來實作多程式平行處理。這種方法可以用來加速計算密集型任務的執行。

基本使用

以下是一個基本的例子,展示如何使用 multiprocessing 來建立一個新的程式:

import multiprocessing as mp

def to_celsius(f):
    c = (f - 32) * (5/9)
    pid = mp.current_process().pid
    print(f"{f}F is {c}C (pid {pid})")

if __name__ == '__main__':
    mp.set_start_method('spawn')
    p = mp.Process(target=to_celsius, args=(110,))
    p.start()

在這個例子中,我們定義了一個函式 to_celsius,它將華氏溫度轉換為攝氏溫度。然後,我們建立了一個新的程式,指定 to_celsius 作為目標函式,並傳遞 110 作為引數。

多程式池

如果你想要執行多個任務,可以使用 Pool 類別來建立一個程式池。以下是一個例子:

import multiprocessing as mp

def to_celsius(f):
    c = (f - 32) * (5/9)
    pid = mp.current_process().pid
    print(f"{f}F is {c}C (pid {pid})")

if __name__ == '__main__':
    mp.set_start_method('spawn')
    with mp.Pool(4) as pool:
        pool.map(to_celsius, range(110, 150, 10))

在這個例子中,我們建立了一個包含 4 個程式的程式池。然後,我們使用 map 方法將 to_celsius 函式應用於一個範圍的值(從 110 到 150,步長為 10)。

程式池的工作原理

當你使用 Pool 類別時,Python 會建立一個指定數量的程式(在這個例子中是 4)。每個程式都會執行 to_celsius 函式,並接受一個引數。當一個程式完成了任務後,它會被重新使用,以執行下一個任務。

你可以透過設定 maxtasksperchild 引數來控制每個程式執行的任務數量。例如:

with mp.Pool(4, maxtasksperchild=1) as pool:
    pool.map(to_celsius, range(110, 150, 10))

這樣,每個程式只會執行一個任務,然後就會終止。

多程式與多執行緒的比較

在進行多工的處理時,程式設計師可以選擇多程式(Multiprocessing)或多執行緒(Multithreading)來實作。這兩種方法都可以用於提高程式的效率和回應速度,但是它們之間存在著一些差異。

多程式(Multiprocessing)

多程式是指在同一時間內,多個程式同時執行。每個程式都有自己的記憶體空間和系統資源。多程式可以用於CPU密集型任務,例如科學計算、資料壓縮等。

多程式的優點

  • 可以充分利用多核CPU的資源,提高程式的效率。
  • 每個程式都有自己的記憶體空間,減少了記憶體衝突的可能性。

多程式的缺點

  • 建立程式的成本較高,需要進行記憶體分配和初始化。
  • 程式之間的通訊較為複雜,需要使用IPC(Inter-Process Communication)機制。

多執行緒(Multithreading)

多執行緒是指在同一時間內,多個執行緒同時執行。執行緒是程式的一部分,分享同一塊記憶體空間。多執行緒可以用於IO密集型任務,例如網路請求、檔案操作等。

多執行緒的優點

  • 建立執行緒的成本較低,需要進行堆積疊分配和初始化。
  • 執行緒之間的通訊較為簡單,分享同一塊記憶體空間。

多執行緒的缺點

  • 執行緒之間可能存在記憶體衝突和同步問題。
  • 在單核CPU上,多執行緒可能會降低程式的效率,因為執行緒切換的成本較高。

Python中的多程式和多執行緒

Python提供了multiprocessingthreading模組來支援多程式和多執行緒。multiprocessing模組提供了建立程式、管理程式和進行程式間通訊的功能。threading模組提供了建立執行緒、管理執行緒和進行執行緒間通訊的功能。

Python中的多程式示例

import multiprocessing

def worker(num):
    print(f"Worker {num} started")
    # 進行一些工作
    print(f"Worker {num} finished")

if __name__ == "__main__":
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

Python中的多執行緒示例

import threading

def worker(num):
    print(f"Worker {num} started")
    # 進行一些工作
    print(f"Worker {num} finished")

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

多程式中的子程式執行

在多程式環境中,子程式的執行是透過 multiprocessing.spawn 模組來實作的。這個模組負責建立新的 Python 直譯器程式,並執行指定的程式碼。

子程式入口點

子程式的入口點是 spawn_main 函式,它接收兩個引數:pipe_handleparent_pid(在 Windows 中)或 tracker_fd(在 POSIX 系統中)。這個函式負責執行子程式的程式碼,並與父程式進行通訊。

def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
    #...

Windows 平臺下的實作

在 Windows 平臺下,spawn_main 函式會呼叫 OpenProcess 函式來開啟父程式的控制程式碼,並建立一個新的檔案描述符 fd 來與父程式進行通訊。

if sys.platform == 'win32':
    import msvcrt
    import _winapi

    if parent_pid is not None:
        source_process = _winapi.OpenProcess(
            _winapi.SYNCHRONIZE | _winapi.PROCESS_DUP_HANDLE,
            False, parent_pid)
    else:
        source_process = None
    new_handle = reduction.duplicate(pipe_handle, source_process=source_process)
    fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
    parent_sentinel = source_process

POSIX 平臺下的實作

在 POSIX 平臺下,pipe_handle 直接作為檔案描述符 fd 來使用,並且被複製以成為 parent_sentinel 的值。

else:
    from. import resource_tracker
    resource_tracker._resource_tracker._fd = tracker_fd
    fd = pipe_handle
    parent_sentinel = os.dup(pipe_handle)

執行子程式程式碼

最後,_main 函式被呼叫,以檔案描述符 fd 和父程式的訊號標誌 parent_sentinel 作為引數。這個函式的傳回值將成為子程式的離開程式碼,並且 Python 直譯器將離開。

_main(fd, parent_sentinel)

這個過程保證了子程式可以正確地執行指定的程式碼,並與父程式進行通訊。

多程式平行

在多程式平行中, _main() 函式扮演著重要角色。它負責從父程式接收資料,並根據這些資料進行相應的處理。下面是 _main() 函式的具體實作:

def _main(fd, parent_sentinel):
    with os.fdopen(fd, 'rb', closefd=True) as from_parent:
        process.current_process()._inheriting = True
        try:
            preparation_data = reduction.pickle.load(from_parent)
            prepare(preparation_data)
            self = reduction.pickle.load(from_parent)
        finally:
            del process.current_process()._inheriting
        return self._bootstrap(parent_sentinel)

_main() 函式解析

  1. 檔案描述符操作:函式首先開啟檔案描述符 fd,並設定為二進位制讀取模式 ('rb')。這個檔案描述符是由父程式建立的管道的一部分,用於父子程式之間的通訊。

  2. 繼承標誌:設定 process.current_process()._inheritingTrue,表示當前程式正在繼承父程式的某些屬性。

  3. 載入序列化資料:使用 reduction.pickle.load(from_parent) 載入父程式序列化的資料。這些資料包括了初始化資料 (preparation_data) 和一個 SpawnProcess 例項。

  4. 初始化:呼叫 prepare(preparation_data) 進行初始化工作。這一步驟根據父程式提供的資料對子程式進行必要的設定。

  5. 載入 SpawnProcess 例項:載入父程式序列化的 SpawnProcess 例項,並指定給 self

  6. 最終清理:無論是否發生異常,都會刪除 _inheriting 屬性,以確保資源的釋放。

  7. 啟動 _bootstrap():最後,呼叫 self._bootstrap(parent_sentinel) 啟動子程式的 _bootstrap() 方法。這一步驟負責根據序列化的資料和父程式的訊號,執行子程式的主邏輯,包括執行目標函式和傳遞引數。

_bootstrap() 方法

_bootstrap() 方法是 BaseProcess 類別的一部分,負責根據父程式提供的序列化資料,建立一個新的 BaseProcess 例項,並執行其 run() 方法。這個過程包括了目標函式的執行以及傳遞給它的引數。

def _bootstrap(parent_sentinel):
    # 根據序列化資料建立 BaseProcess 例項
    #...
    # 執行目標函式
    self.run()

多程式平行處理

在進行多程式平行處理時,需要考慮到各個程式之間的溝通和資料交換。Python 的 multiprocessing 模組提供了一個簡單的方式來建立多程式平行處理的環境。

程式建立

當建立一個新程式時,需要定義一個目標函式,這個函式將在新程式中執行。以下是建立一個新程式的基本步驟:

  1. 定義目標函式:這個函式將在新程式中執行。
  2. 建立一個新程式:使用 multiprocessing.Process 類別建立一個新程式。
  3. 啟動新程式:使用 start() 方法啟動新程式。

程式溝通

在多程式平行處理中,各個程式之間的溝通是非常重要的。Python 的 multiprocessing 模組提供了幾種方式來實作程式之間的溝通,包括:

  • Queue:是一種先進先出的資料結構,可以用來實作程式之間的資料交換。
  • Pipe:是一種全雙工的通訊管道,可以用來實作程式之間的資料交換。

多程式平行處理的優點

多程式平行處理有幾個優點:

  • 可以充分利用多核心 CPU 的資源,提高程式的執行效率。
  • 可以實作多工平行處理,提高程式的回應速度。

多程式平行處理的缺點

多程式平行處理也有一些缺點:

  • 建立新程式的成本比較高,需要花費更多的資源。
  • 程式之間的溝通比較複雜,需要使用特殊的資料結構和通訊管道。

例項

以下是使用 multiprocessing 模組實作多程式平行處理的簡單例項:

import multiprocessing
import time

def worker(num):
    print(f"Worker {num} started")
    time.sleep(2)
    print(f"Worker {num} finished")

if __name__ == "__main__":
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

在這個例項中,我們建立了 5 個新程式,每個程式執行 worker 函式。worker 函式只是睡眠 2 秒然後列印一條訊息。主程式建立了 5 個新程式,並啟動了它們。最後,主程式等待所有新程式完成。

內容解密:

在這個例項中,我們使用 multiprocessing 模組建立了 5 個新程式。每個新程式執行 worker 函式,該函式只是睡眠 2 秒然後列印一條訊息。主程式建立了 5 個新程式,並啟動了它們。最後,主程式等待所有新程式完成。

這個例項展示瞭如何使用 multiprocessing 模組實作多程式平行處理。它也展示瞭如何使用 Process 類別建立新程式,以及如何使用 start() 方法啟動新程式。

圖表翻譯:

  flowchart TD
    A[主程式] --> B[建立新程式]
    B --> C[啟動新程式]
    C --> D[新程式執行]
    D --> E[新程式完成]
    E --> F[主程式等待]
    F --> G[主程式完成]

這個圖表展示了主程式和新程式之間的關係。主程式建立了 5 個新程式,並啟動了它們。每個新程式執行 worker 函式,該函式只是睡眠 2 秒然後列印一條訊息。最後,主程式等待所有新程式完成。

圖表說明:

這個圖表展示了多程式平行處理的流程。主程式建立了 5 個新程式,並啟動了它們。每個新程式執行 worker 函式,該函式只是睡眠 2 秒然後列印一條訊息。最後,主程式等待所有新程式完成。

這個圖表有助於理解多程式平行處理的原理和流程。它展示瞭如何使用 multiprocessing 模組建立新程式,以及如何使用 start() 方法啟動新程式。它也展示瞭如何使用 join() 方法等待所有新程式完成。

多程式間的資料交換與同步

在多程式的環境中,資料交換和同步是非常重要的。Python 的 multiprocessing 模組提供了幾種方法來實作這些功能,包括佇列(Queue)和管道(Pipe)。

佇列(Queue)

佇列是一種先進先出的資料結構,適合用於多個程式之間的資料交換。multiprocessing 模組提供了 Queue 類別來實作佇列。

import multiprocessing as mp

def worker(queue):
    # 從佇列中取出資料
    data = queue.get()
    print(f"Worker 取出資料:{data}")

if __name__ == '__main__':
    # 建立一個佇列
    queue = mp.Queue()

    # 將資料放入佇列
    queue.put("Hello, World!")

    # 建立一個程式
    p = mp.Process(target=worker, args=(queue,))

    # 啟動程式
    p.start()

    # 等待程式結束
    p.join()

管道(Pipe)

管道是一種全雙工的通訊方式,適合用於兩個程式之間的資料交換。multiprocessing 模組提供了 Pipe 類別來實作管道。

import multiprocessing as mp

def worker(conn):
    # 從管道中取出資料
    data = conn.recv()
    print(f"Worker 取出資料:{data}")

    # 將資料送回管道
    conn.send("Hello, World!")

if __name__ == '__main__':
    # 建立一個管道
    parent_conn, child_conn = mp.Pipe()

    # 建立一個程式
    p = mp.Process(target=worker, args=(child_conn,))

    # 啟動程式
    p.start()

    # 將資料送入管道
    parent_conn.send("Hello, Worker!")

    # 從管道中取出資料
    data = parent_conn.recv()
    print(f"Parent 取出資料:{data}")

    # 等待程式結束
    p.join()

Semaphore

Semaphore 是一種同步機制,用於控制多個程式對分享資源的存取。multiprocessing 模組提供了 Semaphore 類別來實作 Semaphore。

import multiprocessing as mp
import time

def worker(sem):
    # 封鎖 Semaphore
    sem.acquire()

    try:
        # 執行任務
        print("Worker 執行任務")
        time.sleep(2)
    finally:
        # 釋放 Semaphore
        sem.release()

if __name__ == '__main__':
    # 建立一個 Semaphore
    sem = mp.Semaphore(1)

    # 建立多個程式
    processes = []
    for i in range(5):
        p = mp.Process(target=worker, args=(sem,))
        processes.append(p)
        p.start()

    # 等待所有程式結束
    for p in processes:
        p.join()

Manager

Manager 是一種高階別的同步機制,用於控制多個程式對分享資源的存取。multiprocessing 模組提供了 Manager 類別來實作 Manager。

import multiprocessing as mp

def worker(queue):
    # 從佇列中取出資料
    data = queue.get()
    print(f"Worker 取出資料:{data}")

if __name__ == '__main__':
    # 建立一個 Manager
    with mp.Manager() as manager:
        # 建立一個佇列
        queue = manager.Queue()

        # 將資料放入佇列
        queue.put("Hello, World!")

        # 建立一個程式
        p = mp.Process(target=worker, args=(queue,))

        # 啟動程式
        p.start()

        # 等待程式結束
        p.join()

這些同步機制可以用於控制多個程式對分享資源的存取,從而實作多程式間的資料交換和同步。

平行處理與競爭性

在平行處理中,多個任務可以同時執行,以提高程式的效率。然而,在多個任務之間分享資源時,可能會出現競爭性問題。為瞭解決這個問題,我們可以使用佇列(Queue)來管理分享資源的存取。

使用佇列管理分享資源

以下是使用 Python 的 multiprocessing 模組和 Queue 類別來示範平行處理和競爭性管理的範例:

import multiprocessing

def to_celsius(inputs, outputs):
    # 將輸入值轉換為攝氏溫度
    results = []
    for value in inputs:
        results.append((value, (value - 32) * 5/9))
    outputs.put(results)

if __name__ == '__main__':
    # 建立輸入和輸出佇列
    inputs = multiprocessing.Queue()
    outputs = multiprocessing.Queue()

    # 建立輸入值列表
    input_values = list(range(110, 150, 10))

    # 將輸入值放入輸入佇列
    for value in input_values:
        inputs.put(value)

    # 建立作業員池
    pool = multiprocessing.Pool()

    # 將工作任務新增到作業員池
    pool.apply(to_celsius, (inputs, outputs))

    # 取出輸出值
    for _ in input_values:
        print(outputs.get(block=False))

在這個範例中,我們建立了兩個佇列:inputsoutputs。我們將輸入值放入 inputs 佇列,然後使用 apply 方法將工作任務新增到作業員池中。作業員池中的作業員會從 inputs 佇列中取出輸入值,進行轉換,然後將結果放入 outputs 佇列中。最後,我們取出 outputs 佇列中的值並印出。

結果

執行這個程式會輸出:

(110, 43.333333333333336)
(120, 48.88888888888889)
(130, 54.44444444444445)
(140, 60.0)

這個範例示範瞭如何使用佇列來管理分享資源的存取,以避免競爭性問題。

Mermaid 圖表

  flowchart TD
    A[主程式] --> B[建立輸入佇列]
    B --> C[建立輸出佇列]
    C --> D[將輸入值放入輸入佇列]
    D --> E[建立作業員池]
    E --> F[將工作任務新增到作業員池]
    F --> G[作業員池中的作業員取出輸入值]
    G --> H[作業員池中的作業員進行轉換]
    H --> I[作業員池中的作業員將結果放入輸出佇列]
    I --> J[取出輸出值]

圖表翻譯:

這個圖表示範了主程式建立輸入和輸出佇列,將輸入值放入輸入佇列,建立作業員池,將工作任務新增到作業員池,作業員池中的作業員取出輸入值,進行轉換,將結果放入輸出佇列,最後取出輸出值。

內容解密:

這個範例使用了 Python 的 multiprocessing 模組和 Queue 類別來示範平行處理和競爭性管理。透過使用佇列來管理分享資源的存取,可以避免競爭性問題。作業員池中的作業員可以從輸入佇列中取出輸入值,進行轉換,然後將結果放入輸出佇列中。最後,主程式可以取出輸出值。

從效能最佳化視角來看,Python 的 multiprocessing 模組提供了一種有效利用多核心 CPU 資源的方法,尤其適用於 CPU 密集型任務。透過建立多個程式,可以將任務平行化,從而顯著提升程式執行效率。然而,程式間的資料交換和同步是一個關鍵挑戰。QueuePipeSemaphoreManager 等機制提供了必要的工具,但使用時需要仔細考量效能開銷。例如,Queue 的序列化和反序列化操作可能會成為瓶頸,而 Pipe 更適用於雙向通訊。此外,過多的程式建立也會增加系統負擔,需要根據實際情況調整程式池的大小。對於重視效能的應用,建議深入理解不同 IPC 機制的特性,並結合實際場景進行效能測試和最佳化。未來,隨著 Python 生態系統的發展,預計會有更高效的程式間通訊和同步機製出現,進一步簡化平行處理的開發流程並提升效能。對於追求極致效能的應用,可以考慮結合 Cython 或其他原生程式碼擴充套件來提升關鍵程式碼的執行速度。玄貓認為,multiprocessing 模組雖然引入了一定的複雜性,但其提供的平行處理能力對於提升 CPU 密集型任務的效能至關重要,值得 Python 開發者深入學習和應用。