在 Python 的多執行緒環境中,即使看似簡單的計數器操作,也可能因為執行緒的切換時機而引發資料競爭。這是因為像 counter.count += offset 這樣的操作,實際上是由讀取、計算和寫入三個步驟組成。如果執行緒在這些步驟之間被中斷,就可能導致資料不一致。例如,兩個執行緒同時讀取同一個計數值,各自加 1 後再寫回,最終結果只增加了 1 而不是 2。為瞭解決這個問題,Python 的 threading 模組提供了 Lock 類別,也就是互斥鎖。透過在程式碼區塊使用 with self.lock:,可以確保同一時間只有一個執行緒能存取被保護的程式碼,有效避免資料競爭。

除了鎖機制,佇列也是 Python 多執行緒程式設計中重要的工具,尤其適用於處理類別似生產線的工作流程。例如,一個程式需要下載圖片、調整大小,最後上傳。這三個步驟可以看作三個階段,每個階段由專門的函式負責。利用佇列,可以將每個階段的輸出作為下一個階段的輸入,形成一個Pipeline。每個階段可以由獨立的執行緒處理,提高程式平行處理能力。為避免佇列操作也產生資料競爭,同樣需要使用鎖機制確保佇列的存取安全。實務上,collections.deque 搭配 threading.Lock 可以實作一個簡單的執行緒安全佇列。在設計多執行緒程式時,需要仔細考慮執行緒間的互動和資料同步,才能確保程式正確與高效地執行。 身為玄貓(BlackCat),我將根據提供的技術內容,以台灣頂尖技術專家的視角進行全面重寫與重構。所有標題與內容都將經過重新設計,確保技術深度、可讀性與原創性。

解決Python多執行緒資料競爭:玄貓的鎖定策略

Python 的多執行緒環境為了確保公平性,會分配給每個執行緒大致相同的執行時間。然而,這種強制切換執行緒的機制,可能在我們意想不到的時刻發生,即使是看似原子性的操作,也可能被中斷。這就可能導致資料競爭的問題,就像原始碼中計數器範例所展示的那樣。

counter.count += offset 這個簡單的陳述式,實際上在 Python 內部被分解為三個獨立的操作:

value = getattr(counter, 'count')
result = value + offset
setattr(counter, 'count', result')

執行緒可能在這三個操作中的任何兩個之間被暫停,這會導致舊的 value 被指定給計數器,從而產生錯誤的結果。以下是一個執行緒 A 和執行緒 B 之間發生不良互動的例子:

# 執行緒 A
value_a = getattr(counter, 'count')
# 切換到執行緒 B
value_b = getattr(counter, 'count')
result_b = value_b + 1
setattr(counter, 'count', result_b)
# 切換回執行緒 A
result_a = value_a + 1
setattr(counter, 'count', result_a)

在這個例子中,執行緒 A 覆寫了執行緒 B 的結果,導致計數器增加的進度丟失。

為了避免這種資料競爭以及其他形式的資料結構損壞,Python 的 threading 模組提供了一系列強大的工具。其中最簡單與最有用的就是 Lock 類別,它實作了互斥鎖(mutex)。

透過使用鎖,Counter 類別可以保護其當前值,防止多個執行緒同時存取。在任何給定時間,只有一個執行緒能夠獲得鎖。以下是如何使用 Lock 類別來保護 Counter 類別的範例:

from threading import Lock

class LockingCounter(object):
    def __init__(self):
        self.lock = Lock()
        self.count = 0

    def increment(self, offset):
        with self.lock:
            self.count += offset

在這個範例中,with 陳述式用於取得和釋放鎖。這使得更容易看到哪些程式碼在持有鎖時正在執行。

現在,我們可以像之前一樣執行工作執行緒,但使用 LockingCounter 而不是 Counter

counter = LockingCounter()
run_threads(worker, how_many, counter)
print('Counter should be %d, found %d' %
      (5 * how_many, counter.count))

執行結果會符合預期。Lock 解決了這個問題。

玄貓提醒:

  • 即使 Python 有全域直譯器鎖(GIL),你仍然有責任保護你的程式免受執行緒之間的資料競爭。
  • 如果允許多個執行緒在沒有鎖的情況下修改相同的物件,你的程式將會損壞其資料結構。
  • threading 內建模組中的 Lock 類別是 Python 的標準互斥鎖實作。

運用Queue在Python執行緒間協調工作:玄貓的生產線策略

在平行處理多項任務的 Python 程式中,協調不同執行緒之間的工作至關重要。其中一種有效的平行工作模式是使用函式管道(pipeline)。

管道的工作方式類別似於製造業中的組裝線。它由多個序列階段組成,每個階段都有特定的函式。新的工作不斷增加到管道的開頭,每個函式可以平行處理其階段中的工作。當每個函式完成時,工作向前移動,直到沒有剩餘的階段。這種方法特別適用於包含阻塞 I/O 或子行程的工作——這些活動可以使用 Python 輕鬆地平行化。

例如,假設你想建立一個系統,該系統可以從你的數位相機中取得連續的影像流,調整它們的大小,然後將它們增加到線上照片函式庫。這樣的程式可以分為管道的三個階段。新的影像在第一階段檢索。下載的影像在第二階段透過調整大小的函式傳遞。調整大小後的影像由最後階段的上傳函式使用。

假設你已經編寫了執行這些階段的 Python 函式:downloadresizeupload。你如何組裝一個管道來平行執行這些工作?

首先,你需要一種在管道階段之間傳遞工作的方式。這可以建模為執行緒安全的生產者-消費者佇列。

from collections import deque
from threading import Lock

class MyQueue(object):
    def __init__(self):
        self.items = deque()
        self.lock = Lock()

    def put(self, item):
        with self.lock:
            self.items.append(item)

    def get(self):
        with self.lock:
            return self.items.popleft()

生產者(你的數位相機)將新的影像增加到待處理專案列表的末尾,而消費者(你的處理管道的第一階段)從待處理專案列表的前面刪除影像。

在這裡,管道的每個階段都表示為一個 Python 執行緒,它從一個佇列中取得工作,對其執行函式,然後將結果放入另一個佇列。我還追蹤了工作執行緒檢查新輸入的次數以及它完成的工作量。

from threading import Thread
from time import sleep

class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0

    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                sleep(0.01)  # No work to do
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1

最棘手的部分是,當輸入佇列為空時,工作執行緒必須正確處理,因為前一階段尚未完成其工作。這發生在我捕捉 IndexError 異常的地方。你可以將其視為組裝線上的延遲。

現在,我可以透過建立協調點的佇列和相應的工作執行緒將三個階段連線在一起。

download_queue = MyQueue()
resize_queue = MyQueue()

玄貓總結: 本篇文章中,玄貓分享了在 Python 多執行緒環境中,如何透過鎖定機制避免資料競爭,以及如何運用佇列(Queue)在不同執行緒間協調工作,建立高效的生產線模式。這些技巧對於開發高並發、高效能的應用程式至關重要。