Python 的平行運算能力對於提升程式效能至關重要,尤其在處理 CPU 密集型或 I/O 密集型任務時。多行程利用多核心優勢,繞過 GIL 限制,有效提升 CPU 密集型任務的處理速度。multiprocessing 模組提供 ProcessPool 兩種方式管理行程,前者適用於少量行程的精細控制,後者則更適合大量任務的自動分配和結果收集。多執行緒則更適用於 I/O 密集型任務,透過 concurrent.futures 模組的 ThreadPoolExecutor,能以更簡潔的方式管理執行緒,提升 I/O 操作效率。然而,多執行緒仍受 GIL 影響,對於 CPU 密集型任務提升有限。協程則提供了更輕量級的平行方案,透過 asyncio 模組和 async/await 語法,實作單執行緒下的多工平行,尤其在非同步 I/O 操作中表現出色,能有效提升網路請求、資料函式庫查詢等操作的效率。選擇哪種平行方式需根據任務型別而定,CPU 密集型任務適合多行程,I/O 密集型任務則可選擇多執行緒或協程。

高效平行運算:Python 多行程、執行緒與協程實戰

在追求高效能的 Python 應用程式開發中,平行運算扮演著至關重要的角色。面對 CPU 密集型任務,多行程能有效利用多核心處理器;而對於 I/O 密集型任務,多執行緒或協程則能顯著提升效率。本文玄貓將探討 Python 中實作平行的幾種主要方式,並分享我在實際專案中的經驗與思考。

多行程:突破 CPU 瓶頸

Python 的 multiprocessing 模組提供了一種簡單直接的方式來建立和管理多個行程。每個行程都擁有獨立的記憶體空間,因此非常適合執行 CPU 密集型任務,避免了全域直譯器鎖(GIL)帶來的效能限制。

使用 Process 類別

以下是一個簡單的例子,展示如何使用 Process 類別建立多個行程:

import multiprocessing
import time

def worker(num):
    """工作行程函式"""
    print(f'工作行程 {num} 執行中')
    time.sleep(1)
    print(f'工作行程 {num} 執行完畢')

if __name__ == '__main__':
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()
    print("所有工作行程已完成")

內容解密:

  1. 引入模組: 匯入 multiprocessingtime 模組。
  2. 定義 worker 函式: 這是每個行程要執行的任務。它會印出一條訊息,然後休眠一秒鐘。
  3. 建立 Process 物件:if __name__ == '__main__': 區塊中,我們建立一個 Process 物件列表。每個 Process 物件都指定了要執行的目標函式 (target=worker) 和傳遞給該函式的引數 (args=(i,))。
  4. 啟動行程: 使用 p.start() 啟動每個行程。
  5. 等待行程完成: 使用 p.join() 等待每個行程完成。這可以確保主行程在所有工作行程完成後再繼續執行。

使用 Pool 類別

Pool 類別提供了一種更方便的方式來管理一組工作行程。它可以自動將任務分配給不同的行程,並收集結果。

import multiprocessing
import time

def worker(num):
    """工作行程函式"""
    print(f'工作行程 {num} 執行中')
    time.sleep(1)
    return num

if __name__ == '__main__':
    with multiprocessing.Pool(processes=5) as pool:
        results = pool.map(worker, range(5))
    print("所有工作行程已完成")
    print(f"結果: {results}")

內容解密:

  1. 建立 Pool 物件: 使用 multiprocessing.Pool(processes=5) 建立一個行程池,指定池中行程的數量。with 陳述式確保在使用完畢後正確關閉行程池。
  2. 使用 pool.map 分配任務: pool.map(worker, range(5))worker 函式應用於 range(5) 中的每個元素。map 函式會自動將這些任務分配給池中的不同行程。
  3. 收集結果: pool.map 傳回一個包含所有結果的列表。

玄貓認為,Pool 類別在處理大量獨立任務時非常有用,可以簡化程式碼並提高效率。

concurrent.futures:更高階的平行抽象

concurrent.futures 模組提供了一個更高階的介面,用於非同步執行函式,可以使用執行緒或行程。

使用 ThreadPoolExecutor

ThreadPoolExecutor 使用執行緒池來執行任務,適合 I/O 密集型任務。

import concurrent.futures
import time

def worker(num):
    """工作函式"""
    print(f'工作執行緒 {num} 執行中')
    time.sleep(1)
    return num

if __name__ == '__main__':
    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = [executor.submit(worker, i) for i in range(5)]
        for f in concurrent.futures.as_completed(results):
            print(f.result())

內容解密:

  1. 建立 ThreadPoolExecutor 物件: 使用 concurrent.futures.ThreadPoolExecutor() 建立一個執行緒池執行器。with 陳述式確保在使用完畢後正確關閉執行器。
  2. 提交任務: executor.submit(worker, i)worker 函式和引數 i 提交給執行器。submit 函式傳回一個 Future 物件,代表非同步計算的結果。
  3. 取得結果: concurrent.futures.as_completed(results) 傳回一個迭代器,該迭代器在 Future 物件完成時產生它們。使用 f.result() 取得每個 Future 物件的結果。

使用 ProcessPoolExecutor

ProcessPoolExecutor 使用行程池來執行任務,適合 CPU 密集型任務。

import concurrent.futures
import time

def worker(num):
    """工作函式"""
    print(f'工作行程 {num} 執行中')
    time.sleep(1)
    return num

if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = [executor.submit(worker, i) for i in range(5)]
        for f in concurrent.futures.as_completed(results):
            print(f.result())

玄貓在選擇 ThreadPoolExecutor 還是 ProcessPoolExecutor 時,主要考量任務的型別。對於 I/O 密集型任務,ThreadPoolExecutor 通常更有效率,因為執行緒之間的切換成本較低。而對於 CPU 密集型任務,ProcessPoolExecutor 則能更好地利用多核心處理器。

協程與 asyncio:非同步 I/O 的利器

協程是一種更輕量級的平行方式,它允許單個執行緒平行執行多個任務。Python 的 asyncio 模組提供了對協程的支援,特別適合處理非同步 I/O 操作。

import asyncio

async def my_coroutine(id):
    print(f'協程 {id} 開始')
    await asyncio.sleep(1)
    print(f'協程 {id} 還原')
    return f'協程 {id} 完成'

async def main():
    tasks = [my_coroutine(i) for i in range(5)]
    results = await asyncio.gather(*tasks)
    print(f"結果: {results}")

if __name__ == "__main__":
    asyncio.run(main())

內容解密:

  1. asyncawait 關鍵字: async 關鍵字用於定義協程函式,await 關鍵字用於暫停協程的執行,直到一個非同步操作完成。
  2. 事件迴圈: 事件迴圈是 asyncio 模組的核心,負責排程和執行協程。
  3. asyncio.gather asyncio.gather 函式用於平行執行多個協程,並收集它們的結果。
  4. asyncio.run asyncio.run 函式用於執行主協程。

玄貓認為,協程在處理高併發 I/O 操作時具有顯著優勢,例如網路請求、資料函式庫查詢等。

    An example of making an HTTP request
    asynchronously using aiohttp
    """
    async with aiohttp.ClientSession() as session:
        async with session.get('https://www.example.com') as response:
            print("Status:", response.status)
            print("Content-type:", response.headers['content-type'])
            html = await response.text()
            print("Body:", html[:15], "...")
asyncio.run(main())

In this example, we use the aiohttp.ClientSession to make an HTTP request to https://www.example.com. The async with statement ensures that the session is properly closed after the request is made. We then print the status code, content type, and the first 15 characters of the response body. When using asyncio with third-party libraries, it is important to make sure that the library is designed to work with asyncio. If the library is not designed to work with asyncio, it may block the event loop and make the program unresponsive. In general, it is best to use libraries that are specifically designed to work with asyncio. These libraries are usually designed to be non-blocking and will not block the event loop. Asyncio can be used with third-party libraries that support asyncio. Using these libraries with asyncio is usually straightforward and can make it easier to write concurrent code. However, it is important to make sure that the library is designed to work with asyncio to avoid blocking the event loop.