在資料科學領域,我經常需要處理龐大的文字資料集,例如 Google Books Ngram。傳統的 MapReduce 方法在面對海量資料時,I/O 瓶頸往往成為效能殺手。為此,我深入研究了 Python 的 asyncio 非同步 I/O 和行程池,並將它們巧妙地結合起來,開發出一個高效能的 MapReduce 解決方案。

同步 MapReduce 的效能瓶頸

首先,讓我們回顧一下傳統同步 MapReduce 的執行流程以及其效能瓶頸。以下程式碼片段示範了從 Google Books Ngram 資料集中統計單詞出現次數的同步方法:

import time

freqs = {}
with open('googlebooks-eng-all-1gram-20120701-a', encoding='utf-8') as f:
    lines = f.readlines()

start = time.time()
for line in lines:
    data = line.split('\t')
    word = data[0]
    count = int(data[2])
    if word in freqs:
        freqs[word] += count
    else:
        freqs[word] = count
end = time.time()
print(f'{end-start:.4f}')

程式碼逐行讀取 Ngram 資料集,解析每一行資料,累計每個單詞的出現次數。然而,這種同步的 I/O 操作使得程式必須等待檔案讀取完成才能繼續處理,造成 CPU 閒置,大幅降低了處理效率。

非同步 I/O 與行程池:效能提升的關鍵

為了突破 I/O 瓶頸,我決定採用 asyncio 非同步 I/O 和行程池來最佳化 MapReduce 流程。asyncio 允許程式在等待 I/O 操作完成的同時執行其他任務,而行程池則可以充分利用多核心 CPU 的運算能力,實作真正的平行處理。

以下程式碼展示瞭如何結合 asyncio 和行程池來實作高效能的 MapReduce:

import asyncio
import concurrent.futures
import functools
import time
from typing import Dict, List

def partition(data: List, chunk_size: int) -> List:
    for i in range(0, len(data), chunk_size):
        yield data[i:i + chunk_size]

def map_frequencies(chunk: List[str]) -> Dict[str, int]:
    counter = {}
    for line in chunk:
        word, _, count, _ = line.split('\t')
        try:  # 處理可能存在的錯誤資料
            count = int(count)
            counter[word] = counter.get(word, 0) + count
        except ValueError:
            pass  # 或記錄錯誤訊息
    return counter

async def main(partition_size: int):
    with open('googlebooks-eng-all-1gram-20120701-a', encoding='utf-8') as f:
        contents = f.readlines()

    loop = asyncio.get_running_loop()
    tasks = []
    start = time.time()

    with concurrent.futures.ProcessPoolExecutor() as pool:
        for chunk in partition(contents, partition_size):
            tasks.append(loop.run_in_executor(pool, map_frequencies, chunk))

    intermediate_results = await asyncio.gather(*tasks)
    final_result = functools.reduce(lambda x, y: {k: x.get(k, 0) + y.get(k, 0) for k in set(x) | set(y)}, intermediate_results)


    print(f"Aardvark has appeared {final_result.get('Aardvark', 0)} times.")
    end = time.time()
    print(f'MapReduce took: {(end - start):.4f} seconds')

if __name__ == "__main__":
    asyncio.run(main(partition_size=60000))
  1. 資料分塊: partition 函式將資料分割成多個區塊,方便平行處理。
  2. 行程池對映: 使用 concurrent.futures.ProcessPoolExecutor 建立行程池,並利用 loop.run_in_executormap_frequencies 任務提交到行程池中執行,實作真正的平行處理。
  3. 非同步收集結果: asyncio.gather 協程用於非同步收集所有對映任務的結果。
  4. 結果合併: 使用 functools.reduce 和一個更健壯的 lambda 函式合併所有中間結果,得到最終的詞頻統計。
  5. 錯誤處理:map_frequencies 中加入了 try...except 區塊,處理可能出現的資料轉換錯誤,提升程式碼的穩健性。

視覺化 MapReduce 流程

  graph LR
    B[B]
    subgraph 資料分塊
        A[原始資料] --> B{分割資料};
    end
    B --> C[行程 1];
    B --> D[行程 2];
    B --> E[行程 n];
    C --> F(中間結果 1);
    D --> G(中間結果 2);
    E --> H(中間結果 n);
    F --> I[合併結果];
    G --> I;
    H --> I;

圖表説明: 此流程圖展示了資料分塊後,如何在多個行程中平行執行對映操作,最後將中間結果合併成最終結果。

  sequenceDiagram
    participant Main
    participant ProcessPool
    Main ->> ProcessPool: 提交任務到行程池
    activate ProcessPool
    ProcessPool ->> ProcessPool: 平行處理資料區塊
    ProcessPool -->> Main: 傳回中間結果
    deactivate ProcessPool
    Main ->> Main: 合併中間結果

圖表説明: 此序列圖展示了主程式與行程池之間的互動流程,強調了任務提交、平行處理和結果傳回的順序。

透過 asyncio 非同步 I/O 和行程池的協同作用,我成功地提升了 MapReduce 的效能,大幅縮短了處理 Google Books Ngram 資料集所需的時間。這種最佳化策略在我處理其他大型文字資料集時也展現了極佳的效果,有效地解決了 I/O 瓶頸的挑戰。

Python 在處理 I/O 密集型任務時,asyncio 提供了非同步處理的優勢,而多程式則能充分利用多核心 CPU 的效能。結合兩者,便能開發高效的文書處理流程。本文將探討如何最佳化 asyncio 與多程式的整合,並深入解析資料分享與鎖定機制,以避免競爭條件。

首先,我們回顧先前討論的 MapReduce 範例,並著重於 reduce 操作的效能瓶頸。reduce 操作需要整合大量的字典,這個過程可以進一步平行化。以下程式碼展示瞭如何修改 reduce 函式,使其能夠分割資料並在多個 worker 程式中執行:

import asyncio
import concurrent.futures
import functools
import time
from typing import Dict, List
from chapter_06.listing_6_8 import partition, merge_dictionaries, map_frequencies

async def reduce(loop, pool, counters, chunk_size) -> Dict[str, int]:
    chunks: List[List[Dict]] = list(partition(counters, chunk_size))
    reducers = []
    while len(chunks[0]) > 1:
        for chunk in chunks:
            reducer = functools.partial(functools.reduce, merge_dictionaries, chunk)
            reducers.append(loop.run_in_executor(pool, reducer))
        reducer_chunks = await asyncio.gather(*reducers)
        chunks = list(partition(reducer_chunks, chunk_size))
        reducers.clear()
    return chunks[0][0]

async def main(partition_size: int):
    with open('googlebooks-eng-all-1gram-20120701-a', encoding='utf-8') as f:
        contents = f.readlines()
    loop = asyncio.get_running_loop()
    tasks = []
    with concurrent.futures.ProcessPoolExecutor() as pool:
        start = time.time()
        for chunk in partition(contents, partition_size):
            tasks.append(loop.run_in_executor(pool, functools.partial(map_frequencies, chunk)))
        intermediate_results = await asyncio.gather(*tasks)
        final_result = await reduce(loop, pool, intermediate_results, 500)
        print(f"Aardvark has appeared {final_result['Aardvark']} times.")
        end = time.time()
        print(f'MapReduce took: {(end - start):.4f} seconds')

if __name__ == "__main__":
    asyncio.run(main(partition_size=60000))

此程式碼的核心在於 reduce 函式的改造。它將 counters 列表分割成多個 chunk,並利用 ProcessPoolExecutor 將每個 chunk 的 reduce 操作分配給不同的 worker 程式。asyncio.gather 則用於非同步等待所有 worker 程式完成。此過程會迭代進行,直到最終得到單一字典。

然而,我發現平行化 reduce 操作的效能提升並非總是顯著,甚至可能下降。這是因為程式間傳遞字典的序列化和反序列化操作相當耗時,可能抵消平行處理的效益。

  graph LR
    B[B]
    A[分割字典] --> B{chunks > 1?};
    B -- 是 --> C[平行 Reduce];
    C --> D[合併結果];
    D --> B;
    B -- 否 --> E[最終結果];

圖表説明: 此流程圖展示了平行化 reduce 過程。

在多程式環境中,分享資料是一大挑戰。處理不當可能導致難以除錯的錯誤。儘管如此,某些情況下分享資料仍不可避免,例如分享計數器。

  sequenceDiagram
    participant Main
    participant Process 1
    participant Process 2
    participant Shared Memory

    Main->>Shared Memory: 初始化計數器
    Main->>Process 1: 啟動
    activate Process 1
    Process 1->>Shared Memory: 增加計數器
    Main->>Process 2: 啟動
    activate Process 2
    Process 2->>Shared Memory: 增加計數器
    Process 1-->>Main: 完成
    deactivate Process 1
    Process 2-->>Main: 完成
    deactivate Process 2
    Main->>Shared Memory: 讀取計數器

圖表説明: 此時序圖展示了多程式如何透過分享記憶體操作計數器。

透過分享記憶體,多個程式可以存取同一塊記憶體區域。然而,分享狀態的處理非常複雜,若無適當的同步機制,很容易導致競爭條件。

為避免競爭條件,可以使用鎖定機制(Lock 或 Mutex)。鎖定機制允許單一程式鎖定一段程式碼(臨界區段),阻止其他程式同時執行該程式碼。

from multiprocessing import Process, Value

def increment_value(shared_int: Value):
    with shared_int.get_lock():
        shared_int.value = shared_int.value + 1

if __name__ == '__main__':
    for _ in range(100):
        integer = Value('i', 0)
        procs = [Process(target=increment_value, args=(integer,)),
                 Process(target=increment_value, args=(integer,))]
        [p.start() for p in procs]
        [p.join() for p in procs]
        print(integer.value)
        assert (integer.value == 2)

此程式碼示範瞭如何使用 Value 物件和其 get_lock() 方法來保護分享計數器,確保每次只有一個程式可以修改它,避免競爭條件。

後續文章將探討如何使用分享記憶體和鎖定機制安全地分享資料,並解決競爭條件問題,敬請期待。

在現代軟體開發中,提升程式效能至關重要。Python 提供了多行程和 Asyncio 兩種強大的工具來實作併發程式設計,進而提升程式效能。本文將探討如何巧妙地結合這兩種工具,讓它們協同工作,發揮各自的優勢,開發高效能的 Python 應用程式。

分享資料的挑戰與解決方案

多行程環境下,處理分享資料需要格外小心,避免競爭條件造成資料不一致。我發現使用行程池初始化器(ProcessPoolExecutorinitializer 引數)能有效管理分享資料。以下程式碼示範如何安全地操作分享計數器:

from concurrent.futures import ProcessPoolExecutor
import asyncio
from multiprocessing import Value

# 分享計數器
shared_counter: Value

# 初始化函式,設定每個 worker process 的分享計數器
def init(counter: Value):
    global shared_counter
    shared_counter = counter

# 計數器遞增函式
def increment():
    with shared_counter.get_lock():  # 使用鎖確保資料一致性
        shared_counter.value += 1

async def main():
    counter = Value('d', 0)  # 建立分享 Value 物件
    with ProcessPoolExecutor(initializer=init, initargs=(counter,)) as pool:
        await asyncio.get_running_loop().run_in_executor(pool, increment)
    print(counter.value)

if __name__ == "__main__":
    asyncio.run(main())

shared_counter 作為分享計數器,透過 init 函式在每個行程中初始化,確保所有行程指向同一記憶體區域。increment 函式則使用 shared_counter.get_lock() 鎖定機制,確保計數器遞增操作的原子性,避免競爭條件。

MapReduce 應用中的分享資料與進度回報

在 MapReduce 架構中,追蹤 map 操作的進度對於監控程式執行狀態至關重要。以下程式碼展示如何利用分享計數器和後台任務來實作進度回報:

from concurrent.futures import ProcessPoolExecutor
import functools
import asyncio
from multiprocessing import Value
from typing import List, Dict

# 假設的 partition 和 merge_dictionaries 函式
def partition(data: List[any], num_chunks: int) -> List[List[any]]:
    chunk_size = len(data) // num_chunks
    return [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]

def merge_dictionaries(dictionaries: List[Dict]) -> Dict:
    merged = {}
    for d in dictionaries:
        for key, value in d.items():
            merged[key] = merged.get(key, 0) + value
    return merged


map_progress: Value

def init(progress: Value):
    global map_progress
    map_progress = progress

def map_frequencies(chunk: List[str]) -> Dict[str, int]:
    counter = {}
    for item in chunk:
        counter[item] = counter.get(item, 0) + 1
    with map_progress.get_lock():
        map_progress.value += 1
    return counter

async def progress_reporter(total_partitions: int):
    while map_progress.value < total_partitions:
        print(f'已完成 {map_progress.value}/{total_partitions} 個 map 操作')
        await asyncio.sleep(1)

async def main():
    data = ["apple", "banana", "apple", "orange", "banana", "apple"]
    num_partitions = 3
    partitioned_data = partition(data, num_partitions)
    progress = Value('i', 0)

    with ProcessPoolExecutor(initializer=init, initargs=(progress,)) as pool:
        reporter = asyncio.create_task(progress_reporter(num_partitions))
        map_results = await asyncio.gather(
            *[loop.run_in_executor(pool, map_frequencies, chunk) for chunk in partitioned_data]
        )

    await reporter
    merged_results = merge_dictionaries(map_results)
    print(f"合併結果: {merged_results}")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

map_frequencies 函式處理完每個資料區塊後,會遞增 map_progressprogress_reporter 協程則定期檢查 map_progress 並輸出進度。

多事件迴圈提升 I/O 效能

對於 I/O 密集型任務,多行程搭配多事件迴圈能更有效率地分散 I/O 負載。

  graph LR
    B[B]
    C[C]
    D[D]
subgraph 父行程
    A[行程池] --> B{Worker 行程 1}
    A --> C{Worker 行程 2}
    A --> D{Worker 行程 N}
end
B --> E(事件迴圈)
C --> F(事件迴圈)
D --> G(事件迴圈)
B --> H(資料函式庫連線池)
C --> I(資料函式庫連線池)
D --> J(資料函式庫連線池)

圖表説明:父行程建立行程池,每個 worker 行程都擁有獨立的事件迴圈和資料函式庫連線池,有效分散 I/O 負載,提升整體吞吐量。

透過上述技巧,我們可以更有效地結合多行程和 Asyncio,充分發揮多核心 CPU 的效能,並提升 I/O 密集型任務的效率。

Asyncio 與多執行緒整合應用於阻塞式 I/O 操作

Asyncio 擅長處理非阻塞式 I/O,但與既有阻塞式 I/O 程式碼互動時,多執行緒便成為重要的橋樑。以下範例示範如何使用執行緒池執行資料函式庫查詢:

import asyncio
import asyncpg
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from typing import List, Dict

product_query = """
SELECT
    p.brand_id,
    s.sku_id,
    pc.product_color_name,
    ps.product_size_name
FROM product as p
JOIN sku as s on s.product_id = p.product_id
JOIN product_color as pc on pc.product_color_id = s.product_color_id
JOIN product_size as ps on ps.product_size_id = s.product_size_id
WHERE p.product_id = 100
"""

async def query_product(pool):
    async with pool.acquire() as connection:
        return await connection.fetchrow(product_query)

async def query_products_concurrently(pool, queries):
    tasks = [query_product(pool) for _ in range(queries)]
    return await asyncio.gather(*tasks)


async def run_queries(num_queries: int) -> List[Dict]:
    async with asyncpg.create_pool(
        host='127.0.0.1',
        port=5432,
        user='postgres',  # 替換為您的資料函式庫使用者名稱
        password='password',  # 替換為您的資料函式庫密碼
        database='products',  # 替換為您的資料函式庫名稱
        min_size=6,
        max_size=6
    ) as pool:
        results = await query_products_concurrently(pool, num_queries)
        return [dict(result) for result in results]



async def main():
    num_queries = 10000
    with ThreadPoolExecutor() as pool:
        results = await asyncio.get_running_loop().run_in_executor(pool, run_queries, num_queries)

    print(f'Retrieved {len(results)} products the product database.')

if __name__ == "__main__":
    asyncio.run(main())

此程式碼使用 ThreadPoolExecutor 建立執行緒池,並利用 run_in_executor 方法在執行緒池中執行 run_queries 函式,該函式使用 asyncpg 進行非同步資料函式庫查詢。透過這種方式,可以將阻塞式的資料函式庫操作交給執行緒池處理,避免阻塞 asyncio 的事件迴圈。

執行緒與 Asyncio 的協同

async/await 語法能優雅地整合執行緒和 Asyncio。以下範例説明如何使用執行緒執行阻塞式 I/O 操作:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

def blocking_io():
    time.sleep(5)  # 模擬阻塞式 I/O 操作
    return "I/O 完成"

async def main():
    with ThreadPoolExecutor() as pool:
        result = await asyncio.get_running_loop().run_in_executor(pool, blocking_io)
    print(result)


if __name__ == "__main__":
    asyncio.run(main())

blocking_io 函式模擬阻塞式 I/O 操作。main 協程使用 run_in_executor 方法將 blocking_io 提交到執行緒池執行,非同步等待結果,避免阻塞事件迴圈。

透過以上技巧,我們可以更靈活地結合多行程、Asyncio 和多執行緒,充分利用系統資源,提升 Python 應用程式的效能。

在 Python 的非同步程式設計中,asyncio 允許我們以非阻塞的方式執行 I/O 密集型任務,例如網路請求。然而,當遇到阻塞式 I/O 操作時,asyncio 的效能優勢可能會受到限制。這時,結合執行緒池可以有效解決這個問題,讓 asyncio 在處理網路請求等場景下發揮更大的效能提升作用。

執行緒池:解決阻塞式 I/O 的利器

asyncio 的核心概念是非阻塞 I/O,它允許單個執行緒同時處理多個 I/O 操作。然而,當遇到阻塞式 I/O 操作(例如網路請求)時,asyncio 的事件迴圈會被阻塞,直到操作完成。這會降低應用程式的反應速度,尤其是在高併發的場景下。

執行緒池可以有效解決這個問題。藉由將阻塞式 I/O 操作交給執行緒池中的執行緒來執行,asyncio 的事件迴圈可以繼續處理其他任務,而不會被阻塞。當執行緒池中的執行緒完成阻塞式 I/O 操作後,會將結果傳回給 asyncio 的事件迴圈。

程式碼範例:使用執行緒池處理網路請求

以下是一個使用 ThreadPoolExecutorasyncio 處理網路請求的程式碼範例:

import asyncio
import concurrent.futures
import requests

async def fetch_url(url):
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        response = await loop.run_in_executor(pool, requests.get, url)
        return response.text

async def main():
    urls = [
        "https://www.example.com",
        "https://www.google.com",
        "https://www.wikipedia.org",
    ]
    tasks = [fetch_url(url) for url in urls]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result[:100])  # 列印前 100 個字元

asyncio.run(main())

這個程式碼範例中,fetch_url 函式使用 ThreadPoolExecutor 建立一個執行緒池,並使用 run_in_executor 方法將 requests.get 函式提交到執行緒池中執行。await 關鍵字確保程式碼會等待網路請求完成後再繼續執行。main 函式使用 asyncio.gather 併發執行多個網路請求,並收集結果。

asyncio 與執行緒池的協作

  graph LR
    B[B]
    D[D]
    A[asyncio 事件迴圈] --> B{提交任務到執行緒池};
    B --> C[執行緒池執行阻塞式 I/O];
    C --> D{傳回結果給 asyncio};
    D --> A;

優雅地關閉多執行緒應用程式

當使用執行緒池時,正確地關閉執行緒池至關重要,以避免資源洩漏和殭屍程式。在上面的程式碼範例中,我們使用了 with concurrent.futures.ThreadPoolExecutor() as pool: 這個上下文管理器,它會在程式碼塊執行結束後自動關閉執行緒池。

  graph LR
    B[B]
    A[主程式] --> B{建立執行緒池};
    B --> C[提交任務];
    C --> D[執行任務];
    D --> E[關閉執行緒池];
    E --> A;

此圖表展示了程式如何建立、使用和關閉執行緒池的流程。使用 with 陳述式確保即使發生異常,執行緒池也能被正確關閉。

透過結合 asyncio 和執行緒池,我們可以有效地處理阻塞式 I/O 操作,例如網路請求,並提升應用程式的效能和反應速度。同時,正確地關閉執行緒池對於避免資源洩漏和殭屍程式至關重要。 我在實際專案中運用此技巧,發現它能顯著提升應用程式的效能,尤其在處理大量併發網路請求時效果更為明顯。

藉由理解這些技巧,您可以更靈活地運用 asyncio,並在處理既有程式碼的同時,享受非同步程式設計帶來的效能提升。