Python 的 GIL(全域直譯器鎖)機制讓許多開發者對多執行緒的效能產生疑慮。實際上,Python 多執行緒並非一無是處,尤其在 I/O 密集型任務中,它能有效提升程式效率。然而,使用多執行緒時,資料競爭問題是不可忽視的陷阱。本文將剖析 Python 多執行緒的運作原理,並提供實戰技巧,讓你避開 GIL 的陷阱,充分發揮多執行緒的優勢。在 I/O 密集型任務中,程式頻繁地等待外部資源,例如網路請求、檔案讀寫或使用者輸入。這些等待時間會讓 CPU 閒置,而多執行緒允許程式在等待 I/O 操作完成的同時執行其他任務,從而提高整體效率。然而,當多個執行緒同時存取和修改分享資料時,就可能發生資料競爭,導致程式出現不可預期的錯誤。理解 GIL 的作用範圍至關重要,它主要影響 CPU 密集型任務,而 I/O 操作通常發生在作業系統層級,GIL 並不會限制它們的平行執行。因此,即使在 GIL 的限制下,多執行緒仍然可以有效地處理 I/O 密集型任務。

Python 多行程平行處理:玄貓的實戰經驗分享

平行(Parallelism)與並發(Concurrency)是提升程式效率的關鍵概念。並發是指程式能夠同時處理多個任務,而平行則是真正地同時執行多個任務。在單核 CPU 上,作業系統透過快速切換執行緒,模擬並發的效果。而在多核 CPU 上,則可以實作真正的平行。

為何 Python 適合並發而非平行?

Python 由於其 GIL(Global Interpreter Lock)的限制,在多執行緒環境下難以實作真正的平行。GIL 確保同一時間只有一個執行緒可以執行 Python bytecode,這限制了 CPU 密集型任務的平行效能。但 Python 在並發處理方面非常出色,尤其是在 I/O 密集型任務中。

使用 subprocess 模組管理子行程:釋放 CPU 潛能

subprocess 模組是 Python 中管理子行程的利器。它允許 Python 程式啟動新的行程,並且之進行互動,例如讀取輸出、傳送輸入等。這使得 Python 能夠有效地整合其他工具和程式,尤其是在需要利用多個 CPU 核心時。

範例:啟動子行程並讀取輸出

以下是一個簡單的範例,展示如何使用 subprocess 啟動一個子行程,並讀取其輸出:

import subprocess

# 啟動 echo 命令,並將輸出導向到管道
proc = subprocess.Popen(
    ['echo', 'Hello from the child!'],
    stdout=subprocess.PIPE
)

# 讀取子行程的輸出
out, err = proc.communicate()

# 解碼輸出並印出
print(out.decode('utf-8'))

內容解密

  1. subprocess.Popen(): 啟動一個新的子行程。
    • ['echo', 'Hello from the child!']: 指定要執行的命令及其引數。
    • stdout=subprocess.PIPE: 將子行程的標準輸出導向到管道,以便 Python 程式可以讀取它。
  2. proc.communicate(): 讀取子行程的輸出,並等待子行程結束。
    • out: 子行程的標準輸出。
    • err: 子行程的標準錯誤輸出。
  3. out.decode('utf-8'): 將位元組串流的輸出解碼為 UTF-8 字串,以便正確顯示。

範例:平行執行多個子行程

以下是如何平行執行多個子行程的範例:

import subprocess
import time

def run_sleep(period):
    """啟動 sleep 命令,並傳回 Popen 物件"""
    proc = subprocess.Popen(['sleep', str(period)])
    return proc

start = time.time()
procs = []

# 啟動 10 個 sleep 行程
for _ in range(10):
    proc = run_sleep(0.1)
    procs.append(proc)

# 等待所有行程結束
for proc in procs:
    proc.communicate()

end = time.time()
print('Finished in %.3f seconds' % (end - start))

內容解密

  1. run_sleep(period) 函式: 封裝了啟動 sleep 命令的邏輯,方便重複使用。
  2. procs = []: 建立一個列表,用於儲存所有啟動的 Popen 物件。
  3. 迴圈啟動行程: 迴圈啟動 10 個 sleep 行程,每個行程休眠 0.1 秒。
  4. proc.communicate(): 等待每個行程結束。由於行程是平行執行的,所以總執行時間約為 0.1 秒,而不是 1 秒。

範例:使用管道傳輸資料

subprocess 模組還允許你將資料從 Python 程式傳輸到子行程,並取得其輸出。以下範例展示如何使用 openssl 命令列工具加密資料:

import subprocess
import os

def run_openssl(data):
    """使用 openssl 加密資料"""
    env = os.environ.copy()
    env['password'] = b'\xe24U\n\xd0Ql3S\x11'  # 設定密碼環境變數
    proc = subprocess.Popen(
        ['openssl', 'enc', '-des3', '-pass', 'env:password'],
        env=env,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE
    )
    proc.stdin.write(data)  # 將資料寫入子行程的標準輸入
    proc.stdin.flush()  # 確保資料被傳送到子行程
    return proc

procs = []
for _ in range(3):
    data = os.urandom(10)  # 產生隨機位元組
    proc = run_openssl(data)
    procs.append(proc)

for proc in procs:
    out, err = proc.communicate()
    print(out[-10:])  # 印出加密後的最後 10 個位元組

內容解密

  1. run_openssl(data) 函式: 封裝了使用 openssl 加密資料的邏輯。
  2. env = os.environ.copy(): 複製目前的環境變數,以便設定 openssl 命令所需的密碼。
  3. env['password'] = b'\xe24U\n\xd0Ql3S\x11': 設定密碼環境變數。
  4. proc.stdin.write(data): 將要加密的資料寫入子行程的標準輸入。
  5. proc.stdin.flush(): 確保資料被傳送到子行程。

玄貓的建議:

  • 善用 subprocess: subprocess 模組是 Python 進行平行處理的強大工具,尤其是在需要整合外部程式時。
  • 理解 GIL 的限制: 在 CPU 密集型任務中,多執行緒可能無法帶來效能提升。考慮使用多行程或 C 擴充套件來克服 GIL 的限制。
  • 仔細設計程式架構: 合理地將任務分解為多個子行程,可以充分利用多核 CPU 的效能。

透過 subprocess 模組,Python 可以輕鬆地管理和協調多個子行程,實作平行處理,從而提升程式的整體效能。這是在 Python 中實作平行的重要途徑之一。

Python Subprocess 的妙用:開發高效能資料處理管線

在資料處理的世界裡,效率至關重要。身為玄貓(BlackCat),我經常需要處理大量資料,並將其轉換為可用的資訊。其中一個我最喜歡的工具是 Python 的 subprocess 模組。它讓我能夠輕鬆地與作業系統互動,並利用外部程式來加速我的工作流程。

子程式的力量:平行處理的起點

subprocess 模組的核心概念是建立子程式。這些子程式就像是獨立的小幫手,可以同時執行不同的任務。這對於需要大量 CPU 運算的任務來說非常有用,因為你可以將工作分散到多個核心上,從而縮短處理時間。

我曾經在一個金融科技專案中,需要快速計算大量交易資料的 MD5 雜湊值。如果使用 Python 內建的 hashlib 模組,速度可能會比較慢。因此,我決定使用 subprocess 模組來呼叫系統上的 openssl 命令列工具。

開發資料處理管線:opensslmd5 的完美結合

以下是一個簡單的範例,展示如何使用 subprocess 模組來建立一個資料處理管線:

import subprocess
import os

# 定義一個函式來執行 openssl 命令
def run_openssl(data):
    proc = subprocess.Popen(
        ['openssl', 'enc', '-aes-256-cbc', '-k', 'password', '-salt'],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
    )
    proc.stdin.write(data)
    proc.stdin.close()
    return proc

# 定義一個函式來執行 md5 命令
def run_md5(input_data):
    proc = subprocess.Popen(
        ['md5'],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
    )
    proc.stdin.write(input_data)
    proc.stdin.close()
    return proc

程式碼解密:

  1. run_openssl(data) 函式: 這個函式會建立一個 openssl 子程式,並將資料傳遞給它進行加密。
    • subprocess.Popen:建立一個新的子程式。
    • ['openssl', 'enc', '-aes-256-cbc', '-k', 'password', '-salt']:指定要執行的命令和引數。
    • stdin=subprocess.PIPE:將子程式的標準輸入設定為管道。
    • stdout=subprocess.PIPE:將子程式的標準輸出設定為管道。
    • proc.stdin.write(data):將資料寫入子程式的標準輸入。
    • proc.stdin.close():關閉子程式的標準輸入。
  2. run_md5(input_data) 函式: 這個函式會建立一個 md5 子程式,並將加密後的資料傳遞給它來計算雜湊值。
    • subprocess.Popen:建立一個新的子程式。
    • ['md5']:指定要執行的命令。
    • stdin=subprocess.PIPE:將子程式的標準輸入設定為管道。
    • stdout=subprocess.PIPE:將子程式的標準輸出設定為管道。
    • proc.stdin.write(input_data):將資料寫入子程式的標準輸入。
    • proc.stdin.close():關閉子程式的標準輸入。

接下來,我可以啟動一組 openssl 程式來加密一些資料,然後再啟動另一組程式來計算加密後輸出的 MD5 雜湊值:

input_procs = []
hash_procs = []
for _ in range(3):
    data = os.urandom(10)
    proc = run_openssl(data)
    input_procs.append(proc)
    hash_proc = run_md5(proc.stdout)
    hash_procs.append(hash_proc)

子程式之間的 I/O 會在你啟動它們後自動發生。你只需要等待它們完成並印出最終輸出即可:

for proc in input_procs:
    proc.communicate()
for proc in hash_procs:
    out, err = proc.communicate()
    print(out.strip())

超時處理:避免程式卡死

有時候,子程式可能會因為某些原因而無法完成,導致程式卡死。為了避免這種情況,可以使用 communicate 方法的 timeout 引數。如果子程式在指定的時間內沒有回應,就會引發一個例外,讓你能夠終止該子程式。

import subprocess

def run_sleep(sleep_duration):
    proc = subprocess.Popen(['sleep', str(sleep_duration)])
    return proc

proc = run_sleep(10)
try:
    proc.communicate(timeout=0.1)
except subprocess.TimeoutExpired:
    proc.terminate()
    proc.wait()
    print('Exit status', proc.poll())

玄貓提醒: timeout 引數僅在 Python 3.3 及更高版本中可用。在較早的版本中,你需要使用 select 模組來實作 I/O 超時。

重點回顧

  • 使用 subprocess 模組來執行子程式並管理它們的輸入和輸出流。
  • 子程式與 Python 直譯器平行執行,讓你能夠最大化 CPU 使用率。
  • 使用 communicate 方法的 timeout 引數來避免死鎖和程式卡死。

Python Thread 的正確開啟方式:非平行運算的 Blocking I/O

Python 的標準實作稱為 CPython。CPython 執行 Python 程式需要兩個步驟。首先,它解析原始碼並將其編譯為位元組碼。然後,它使用根據堆積疊的直譯器來執行位元組碼。位元組碼直譯器具有在 Python 程式執行時必須維護和保持一致的狀態。Python 使用一種稱為全域直譯器鎖 (GIL) 的機制來強制執行一致性。

GIL 本質上是一個互斥鎖 (mutex),可防止 CPython 受到搶佔式多執行緒的影響,在搶佔式多執行緒中,一個執行緒透過中斷另一個執行緒來控制程式。如果在意外的時間發生此類別中斷,可能會損壞直譯器狀態。GIL 可防止這些中斷,並確保每個位元組碼指令都能與 CPython 實作及其 C 擴充模組正確協同工作。

GIL 具有重要的負面影響。對於用 C++ 或 Java 等語言編寫的程式,擁有多個執行緒意味著你的程式可以同時利用多個 CPU 核心。雖然 Python 支援多個執行緒,但 GIL 會導致一次只有一個執行緒取得進展。這意味著,當你使用執行緒進行平行運算並加速 Python 程式時,你會非常失望。

舉例來說,假設你想用 Python 做一些運算密集的事情。我將使用一個簡單的數字分解演算法作為範例。

import time

def factorize(number):
    for i in range(1, number + 1):
        if number % i == 0:
            yield i

串列分解一組數字需要相當長的時間。

numbers = [2139079, 1214759, 1516637, 1852285]
start = time.time()
for number in numbers:
    list(factorize(number))
end = time.time()
print('Took %.3f seconds' % (end - start))

在其他語言中,使用多個執行緒進行此運算會很有意義,因為你可以利用電腦的所有 CPU 核心。讓我在 Python 中嘗試一下。在這裡,我定義一個 Python 執行緒,用於執行與之前相同的運算:

from threading import Thread

class FactorizeThread(Thread):
    def __init__(self, number):
        super().__init__()
        self.number = number

    def run(self):
        self.factors = list(factorize(self.number))

然後,我啟動一個執行緒來平行分解每個數字。

start = time.time()
threads = []
for number in numbers:
    thread = FactorizeThread(number)
    thread.start()
    threads.append(thread)

最後,我等待所有執行緒完成。

for thread in threads:
    thread.join()
end = time.time()
print('Took %.3f seconds' % (end - start))

令人驚訝的是,這甚至比串列執行 factorize 花費的時間更長。使用每個數字一個執行緒,你可能會期望在其他語言中獲得小於 4 倍的加速,因為建立執行緒和與它們協調需要額外的開銷。你可能期望在我用來執行此程式碼的雙核心機器上僅獲得 2 倍的加速。但是你永遠不會期望這些執行緒的效能在使用多個 CPU 時會更差。這展示了 GIL 對在標準 CPython 直譯器中執行的程式的影響。

有一些方法可以讓 CPython 利用多個核心,但它不適用於標準 Thread 類別(請參閱專案 41:「考慮使用 concurrent.futures 進行真正的平行運算」),並且可能需要大量的努力。瞭解這些限制後,你可能會想知道,為什麼 Python 仍然支援執行緒?有兩個很好的理由。

首先,多個執行緒使你的程式很容易看起來像在同時執行多個操作。管理同時任務的協調工作非常困難,

Python多執行緒的真相:解鎖I/O並避開GIL的陷阱

多執行緒在許多程式語言中都是實作平行運算的常用手段。Python 雖然也支援多執行緒,但由於全域直譯器鎖(GIL)的存在,讓許多開發者對 Python 的多執行緒機制感到困惑。今天,玄貓(BlackCat)將探討 Python 多執行緒的本質,揭示其在 I/O 密集型任務中的妙用,並說明如何避免潛在的資料競爭問題。

為何 Python 需要多執行緒?

Python 支援多執行緒主要有兩個原因:

  1. 公平性:即使在單一 CPU 核心上,CPython 也能確保各個 Python 執行緒之間的執行具有一定的公平性。
  2. 處理阻塞式 I/O:當 Python 進行某些系統呼叫時,例如讀寫檔案、網路互動或裝置通訊,會發生阻塞式 I/O。多執行緒可以隔離程式,使其不受作業系統回應時間的影響。

突破 I/O 瓶頸:多執行緒的妙用

讓玄貓(BlackCat)用一個例子來說明。假設你需要透過序列埠向遠端遙控直升機傳送訊號。為了模擬這個過程,玄貓(BlackCat)使用一個會阻塞 0.1 秒的 select 系統呼叫:

import select
import time
from threading import Thread

def slow_systemcall():
    select.select([], [], [], 0.1)

如果循序執行這個系統呼叫,所需時間會線性增加:

start = time.time()
for _ in range(5):
    slow_systemcall()
end = time.time()
print('Took %.3f seconds' % (end - start))
Took 0.503 seconds

問題在於,當 slow_systemcall 執行時,程式的其他部分無法繼續執行。主執行緒被 select 系統呼叫阻塞,這在實際應用中是難以接受的。你需要在傳送訊號的同時計算直升機的下一步動作,否則直升機可能會墜毀。

此時,多執行緒就派上用場了。玄貓(BlackCat)可以將系統呼叫移到不同的執行緒中,讓它們平行執行。

start = time.time()
threads = []
for _ in range(5):
    thread = Thread(target=slow_systemcall)
    thread.start()
    threads.append(thread)

def compute_helicopter_location(index):
    # 計算直升機位置
    pass

for i in range(5):
    compute_helicopter_location(i)

for thread in threads:
    thread.join()
end = time.time()
print('Took %.3f seconds' % (end - start))
Took 0.102 seconds

平行執行時間遠小於循序執行時間。即使受到 GIL 的限制,系統呼叫仍然可以平行執行。這是因為 Python 執行緒在進行系統呼叫之前會釋放 GIL,完成後再重新取得。

資料競爭:GIL 無法保護你

許多 Python 開發者誤以為有了 GIL,就可以避免在程式碼中使用互斥鎖(mutexes)。他們認為既然 Python 執行緒無法在多個 CPU 核心上平行執行,那麼 GIL 應該也能保護程式的資料結構。但玄貓(BlackCat)要告訴你,這是一個非常危險的誤解!

雖然一次只有一個 Python 執行緒可以執行,但執行緒對資料結構的操作可能會在任何兩個位元組碼指令之間被中斷。如果在多個執行緒中同時存取相同的物件,資料結構的恆定性可能會隨時被破壞,導致程式進入損毀狀態。

例如,假設你想編寫一個程式來平行計算多個感測器的讀數。你可以使用一個類別來彙總這些讀數:

class Counter(object):
    def __init__(self):
        self.count = 0

    def increment(self, offset):
        self.count += offset

每個感測器都有自己的工作執行緒,因為從感測器讀取資料需要進行阻塞式 I/O。每次感測器測量後,工作執行緒會增加計數器的值,直到達到所需讀數的最大值。

def worker(sensor_index, how_many, counter):
    for _ in range(how_many):
        # 從感測器讀取資料
        # ...
        counter.increment(1)

以下是如何為每個感測器啟動一個工作執行緒,並等待它們完成讀數:

from threading import Thread

def run_threads(func, how_many, counter):
    threads = []
    for i in range(5):
        args = (i, how_many, counter)
        thread = Thread(target=func, args=args)
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()

平行執行五個執行緒看似簡單,結果應該也很明顯:

how_many = 10**5
counter = Counter()
run_threads(worker, how_many, counter)
print('Counter should be %d, found %d' %
      (5 * how_many, counter.count))
Counter should be 500000, found 278328

但結果卻大錯特錯!這是怎麼回事?為什麼這麼簡單的事情會出錯?