Python 的平行運算能力對於提升程式效能至關重要,尤其在處理 CPU 密集型或 I/O 密集型任務時。多行程利用多核心優勢,繞過 GIL 限制,有效提升 CPU 密集型任務的處理速度。multiprocessing 模組提供 Process 和 Pool 兩種方式管理行程,前者適用於少量行程的精細控制,後者則更適合大量任務的自動分配和結果收集。多執行緒則更適用於 I/O 密集型任務,透過 concurrent.futures 模組的 ThreadPoolExecutor,能以更簡潔的方式管理執行緒,提升 I/O 操作效率。然而,多執行緒仍受 GIL 影響,對於 CPU 密集型任務提升有限。協程則提供了更輕量級的平行方案,透過 asyncio 模組和 async/await 語法,實作單執行緒下的多工平行,尤其在非同步 I/O 操作中表現出色,能有效提升網路請求、資料函式庫查詢等操作的效率。選擇哪種平行方式需根據任務型別而定,CPU 密集型任務適合多行程,I/O 密集型任務則可選擇多執行緒或協程。
高效平行運算:Python 多行程、執行緒與協程實戰
在追求高效能的 Python 應用程式開發中,平行運算扮演著至關重要的角色。面對 CPU 密集型任務,多行程能有效利用多核心處理器;而對於 I/O 密集型任務,多執行緒或協程則能顯著提升效率。本文玄貓將探討 Python 中實作平行的幾種主要方式,並分享我在實際專案中的經驗與思考。
多行程:突破 CPU 瓶頸
Python 的 multiprocessing 模組提供了一種簡單直接的方式來建立和管理多個行程。每個行程都擁有獨立的記憶體空間,因此非常適合執行 CPU 密集型任務,避免了全域直譯器鎖(GIL)帶來的效能限制。
使用 Process 類別
以下是一個簡單的例子,展示如何使用 Process 類別建立多個行程:
import multiprocessing
import time
def worker(num):
"""工作行程函式"""
print(f'工作行程 {num} 執行中')
time.sleep(1)
print(f'工作行程 {num} 執行完畢')
if __name__ == '__main__':
processes = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()
print("所有工作行程已完成")
內容解密:
- 引入模組: 匯入
multiprocessing和time模組。 - 定義
worker函式: 這是每個行程要執行的任務。它會印出一條訊息,然後休眠一秒鐘。 - 建立
Process物件: 在if __name__ == '__main__':區塊中,我們建立一個Process物件列表。每個Process物件都指定了要執行的目標函式 (target=worker) 和傳遞給該函式的引數 (args=(i,))。 - 啟動行程: 使用
p.start()啟動每個行程。 - 等待行程完成: 使用
p.join()等待每個行程完成。這可以確保主行程在所有工作行程完成後再繼續執行。
使用 Pool 類別
Pool 類別提供了一種更方便的方式來管理一組工作行程。它可以自動將任務分配給不同的行程,並收集結果。
import multiprocessing
import time
def worker(num):
"""工作行程函式"""
print(f'工作行程 {num} 執行中')
time.sleep(1)
return num
if __name__ == '__main__':
with multiprocessing.Pool(processes=5) as pool:
results = pool.map(worker, range(5))
print("所有工作行程已完成")
print(f"結果: {results}")
內容解密:
- 建立
Pool物件: 使用multiprocessing.Pool(processes=5)建立一個行程池,指定池中行程的數量。with陳述式確保在使用完畢後正確關閉行程池。 - 使用
pool.map分配任務:pool.map(worker, range(5))將worker函式應用於range(5)中的每個元素。map函式會自動將這些任務分配給池中的不同行程。 - 收集結果:
pool.map傳回一個包含所有結果的列表。
玄貓認為,Pool 類別在處理大量獨立任務時非常有用,可以簡化程式碼並提高效率。
concurrent.futures:更高階的平行抽象
concurrent.futures 模組提供了一個更高階的介面,用於非同步執行函式,可以使用執行緒或行程。
使用 ThreadPoolExecutor
ThreadPoolExecutor 使用執行緒池來執行任務,適合 I/O 密集型任務。
import concurrent.futures
import time
def worker(num):
"""工作函式"""
print(f'工作執行緒 {num} 執行中')
time.sleep(1)
return num
if __name__ == '__main__':
with concurrent.futures.ThreadPoolExecutor() as executor:
results = [executor.submit(worker, i) for i in range(5)]
for f in concurrent.futures.as_completed(results):
print(f.result())
內容解密:
- 建立
ThreadPoolExecutor物件: 使用concurrent.futures.ThreadPoolExecutor()建立一個執行緒池執行器。with陳述式確保在使用完畢後正確關閉執行器。 - 提交任務:
executor.submit(worker, i)將worker函式和引數i提交給執行器。submit函式傳回一個Future物件,代表非同步計算的結果。 - 取得結果:
concurrent.futures.as_completed(results)傳回一個迭代器,該迭代器在 Future 物件完成時產生它們。使用f.result()取得每個 Future 物件的結果。
使用 ProcessPoolExecutor
ProcessPoolExecutor 使用行程池來執行任務,適合 CPU 密集型任務。
import concurrent.futures
import time
def worker(num):
"""工作函式"""
print(f'工作行程 {num} 執行中')
time.sleep(1)
return num
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor() as executor:
results = [executor.submit(worker, i) for i in range(5)]
for f in concurrent.futures.as_completed(results):
print(f.result())
玄貓在選擇 ThreadPoolExecutor 還是 ProcessPoolExecutor 時,主要考量任務的型別。對於 I/O 密集型任務,ThreadPoolExecutor 通常更有效率,因為執行緒之間的切換成本較低。而對於 CPU 密集型任務,ProcessPoolExecutor 則能更好地利用多核心處理器。
協程與 asyncio:非同步 I/O 的利器
協程是一種更輕量級的平行方式,它允許單個執行緒平行執行多個任務。Python 的 asyncio 模組提供了對協程的支援,特別適合處理非同步 I/O 操作。
import asyncio
async def my_coroutine(id):
print(f'協程 {id} 開始')
await asyncio.sleep(1)
print(f'協程 {id} 還原')
return f'協程 {id} 完成'
async def main():
tasks = [my_coroutine(i) for i in range(5)]
results = await asyncio.gather(*tasks)
print(f"結果: {results}")
if __name__ == "__main__":
asyncio.run(main())
內容解密:
async和await關鍵字:async關鍵字用於定義協程函式,await關鍵字用於暫停協程的執行,直到一個非同步操作完成。- 事件迴圈: 事件迴圈是
asyncio模組的核心,負責排程和執行協程。 asyncio.gather:asyncio.gather函式用於平行執行多個協程,並收集它們的結果。asyncio.run:asyncio.run函式用於執行主協程。
玄貓認為,協程在處理高併發 I/O 操作時具有顯著優勢,例如網路請求、資料函式庫查詢等。
An example of making an HTTP request
asynchronously using aiohttp
"""
async with aiohttp.ClientSession() as session:
async with session.get('https://www.example.com') as response:
print("Status:", response.status)
print("Content-type:", response.headers['content-type'])
html = await response.text()
print("Body:", html[:15], "...")
asyncio.run(main())
In this example, we use the aiohttp.ClientSession to make an HTTP
request to https://www.example.com. The async with statement ensures
that the session is properly closed after the request is made. We then
print the status code, content type, and the first 15 characters of the
response body.
When using asyncio with third-party libraries, it is important to make
sure that the library is designed to work with asyncio. If the library
is not designed to work with asyncio, it may block the event loop and
make the program unresponsive.
In general, it is best to use libraries that are specifically designed
to work with asyncio. These libraries are usually designed to be
non-blocking and will not block the event loop.
Asyncio can be used with third-party libraries that support asyncio.
Using these libraries with asyncio is usually straightforward and can
make it easier to write concurrent code. However, it is important to
make sure that the library is designed to work with asyncio to avoid
blocking the event loop.
