在現今資料驅動的應用程式中,高效的資料函式庫操作至關重要。Python 的 asyncpg 函式庫提供了一個絕佳的方案,讓我們能以非同步的方式與 PostgreSQL 資料函式庫互動,大幅提升應用程式的效能。本文將探討 asyncpg 的核心概念,包含連線池的建立與使用、交易管理的技巧,並搭配實務程式碼範例,帶您掌握非同步資料函式庫操作的精髓。

連線池:提升效能的關鍵

建立資料函式庫連線是一個相當耗時的動作。為了避免每次查詢都重新建立連線,我們可以利用連線池(Connection Pool)的概念。連線池維護一組可重複使用的連線,讓應用程式能更有效率地與資料函式庫互動。asyncpg 提供了 create_pool() 協程來建立連線池。

import asyncio
import asyncpg

async def main():
    pool = await asyncpg.create_pool(host='127.0.0.1',
                                    port=5432,
                                    user='postgres',
                                    database='products',
                                    password='password',
                                    min_size=5,  # 最小連線數量
                                    max_size=10) # 最大連線數量
    async with pool.acquire() as connection:
        results = await connection.fetch('SELECT * FROM products')
    print(results)
    await pool.close()

asyncio.run(main())

這段程式碼示範瞭如何建立一個連線池,並使用 pool.acquire() 方法取得一個可用的連線。min_sizemax_size 引數分別設定了連線池的最小和最大連線數量。透過 async with 區塊,我們確保連線在使用完畢後會自動釋放回連線池,供其他操作使用。最後,pool.close() 關閉連線池,釋放所有資源。

平行查詢:釋放非同步的威力

藉由連線池,我們可以實作查詢的平行執行,進一步提升效能。以下程式碼示範如何使用 asyncio.gather() 平行執行多個查詢:

import asyncio
import asyncpg

async def query_product(pool, product_id):
    async with pool.acquire() as connection:
        return await connection.fetchrow('SELECT * FROM products WHERE product_id = $1', product_id)

async def main():
    pool = await asyncpg.create_pool(...) # 建立連線池
    tasks = [query_product(pool, i) for i in range(1, 11)] # 建立 10 個查詢任務
    results = await asyncio.gather(*tasks)
    print(results)
    await pool.close()

asyncio.run(main())

這個例子中,query_product() 協程負責查詢單一產品資訊。我們利用 list comprehension 建立了 10 個查詢任務,並使用 asyncio.gather() 平行執行。如此一來,我們可以充分利用非同步的優勢,大幅縮短查詢時間。

  graph LR
    Task1[Task1]
    Task2[Task2]
    Task3[Task3]
subgraph Connection Pool
    A[Connection 1] --> Database
    B[Connection 2] --> Database
    C[Connection 3] --> Database
end
Task1 --> A
Task2 --> B
Task3 --> C
Task4 -.-> A
Task4 -.-> B
Task4 -.-> C

圖表説明: 此圖表展示了多個任務如何從連線池中取得連線並且資料函式庫互動。當連線數量不足時,任務會等待可用連線。

交易管理:確保資料一致性

在資料函式庫操作中,交易管理扮演著至關重要的角色,它確保一系列操作的原子性,也就是要嘛全部成功,要嘛全部失敗。asyncpg 提供了 connection.transaction() 上下文管理器,讓我們輕鬆管理交易。

import asyncio
import asyncpg

async def main():
    connection = await asyncpg.connect(...)
    try:
        async with connection.transaction():
            await connection.execute("INSERT INTO products (name, price) VALUES ('Product A', 100)")
            await connection.execute("INSERT INTO products (name, price) VALUES ('Product B', 200)")
            # 模擬錯誤
            raise Exception("Something went wrong!")
    except Exception as e:
        print(f"Transaction rolled back: {e}")
    finally:
        await connection.close()

asyncio.run(main())

這段程式碼示範瞭如何在交易中執行多個 SQL 指令。async with connection.transaction() 區塊確保了這些指令的原子性。如果區塊內發生任何錯誤,交易會自動回復,避免資料函式庫處於不一致的狀態。

asyncpg 讓 Python 開發者能以更有效率的方式操作 PostgreSQL 資料函式庫。透過連線池和非同步特性,我們可以大幅提升應用程式的效能,並藉由交易管理機制確保資料一致性。希望本文能幫助您更深入地瞭解 asyncpg 的應用,並在您的專案中發揮其強大威力。


在資料函式庫應用中,處理大量資料時,若一次性將所有查詢結果載入記憶體,可能導致記憶體不足,影響系統效能。我將探討如何使用 Python  `asyncpg` 函式庫實作非同步資料函式庫串流,有效解決這個問題。同時,我會介紹非同步產生器的概念,並示範如何結合串流遊標,逐行擷取資料,提升應用程式效能。

## 擺脫效能瓶頸:非同步產生器與串流結果集

`asyncpg` 預設的 `fetch` 方法會一次性將所有查詢結果載入記憶體,這在處理大量資料時可能造成效能瓶頸。雖然 `LIMIT` 陳述式和分頁技術可以解決部分問題,但它們會增加資料函式庫負擔。

串流結果集提供更有效率的解決方案。透過串流,我們可以根據需要逐行擷取資料,避免一次性載入所有資料到記憶體。Postgres 使用遊標來實作串流查詢結果,遊標如同指向結果集中目前位置的指標,每次擷取資料後便會移動到下一個位置。

`asyncpg` 利用非同步產生器實作串流功能,它可以非同步地逐個產生結果,類別似於 Python 產生器。我們可以使用 `async for` 迴圈迭代這些結果。

## 深入理解非同步產生器

許多開發者熟悉同步 Python 中的產生器,它允許我們「惰性地」定義和迭代資料序列,對於大型資料序列非常有用。

同步產生器是一個包含 `yield` 陳述式的 Python 函式,以下是一個範例:

```python
def positive_integers(until: int):
    for integer in range(until):
        yield integer

positive_iterator = positive_integers(2)
print(next(positive_iterator))  # 輸出 0
print(next(positive_iterator))  # 輸出 1

每次呼叫 next 會觸發函式中迴圈的一次迭代,並產生 yield 陳述式的結果。

若要非同步地產生一系列值,可以使用 Python 的非同步產生器和 async for 語法:

import asyncio
from util import delay, async_timed  # 假設您已定義 delay 和 async_timed

async def positive_integers_async(until: int):
    for integer in range(1, until):
        await delay(integer)
        yield integer

@async_timed()
async def main():
    async_generator = positive_integers_async(3)
    print(type(async_generator))  # 輸出 <class 'async_generator'>
    async for number in async_generator:
        print(f'Got number {number}')

asyncio.run(main())

非同步產生器與普通產生器的差異在於,它產生的元素是協程,需要使用 awaitasync for 才能獲得結果。

結合非同步產生器與串流遊標實戰

非同步產生器與串流資料函式庫遊標完美結合。使用這些產生器,我們可以使用類別似 for 迴圈的語法逐行擷取資料。asyncpg 需要在事務中使用遊標。啟動事務後,呼叫 Connection 類別的 cursor 方法,傳入要串流的查詢,即可取得一個非同步產生器,逐行串流結果。

以下是如何使用遊標逐行擷取所有產品的範例:

import asyncpg
import asyncio

async def main():
    connection = await asyncpg.connect(host='127.0.0.1',
                                        port=5432,
                                        user='postgres',
                                        database='products',
                                        password='password')
    query = 'SELECT product_id, product_name FROM product'
    async with connection.transaction():
        async for product in connection.cursor(query):
            print(product)
    await connection.close()

asyncio.run(main())

即使資料表中有大量產品,程式也只會一次載入少量資料到記憶體。預設情況下,遊標會預先擷取 50 條記錄,您可以透過設定 prefetch 引數調整此行為。

上述程式碼示範瞭如何使用 asyncpg 實作非同步資料函式庫串流。透過 connection.cursor() 方法和 async for 迴圈,我們可以逐行處理查詢結果,避免一次性載入所有資料到記憶體,有效處理大量資料,提升應用程式效能。prefetch 引數可以平衡記憶體使用和網路流量。

  graph LR
    B[B]
    D[D]
    A[啟動事務] --> B{執行查詢,使用 connection.cursor()};
    B --> C[建立非同步產生器];
    C --> D{async for 迴圈,逐行處理資料};
    D --> E[關閉連線];

此流程圖簡化了非同步串流的流程,突出了關鍵步驟:啟動事務、使用 connection.cursor() 執行查詢並建立非同步產生器、使用 async for 迴圈逐行處理資料,以及最後關閉連線。

  sequenceDiagram
    participant Client
    participant Asyncpg
    participant Database

    Client->>Asyncpg: 啟動事務
    activate Asyncpg
    Asyncpg->>Database: 執行查詢,建立遊標
    activate Database
    Database-->>Asyncpg: 傳回非同步產生器
    deactivate Database
    loop 逐行處理
        Asyncpg->>Client: 傳回單行資料
    end
    Client->>Asyncpg: 關閉連線
    deactivate Asyncpg

此序列圖更詳細地展示了客戶端、asyncpg 和資料函式庫之間的互動流程,強調了非同步產生器如何逐行傳回資料,讓客戶端能夠有效處理大量資料。


在資料函式庫應用程式開發中,處理巨量資料是一項常見的挑戰。我發現,傳統的同步方法在面對海量資料時往往力不從心,容易導致記憶體不足或效能瓶頸。因此,本文將探討如何結合 `asyncpg` 的非同步特性與 Python 的多程式機制,有效處理巨量資料,開發高效能資料函式庫應用程式。

## asyncpg:非同步 PostgreSQL 驅動程式

`asyncpg` 是一個高效能的非同步 PostgreSQL 驅動程式,它允許我們以非同步的方式與資料函式庫互動,避免阻塞主執行緒。我認為,這對於提升應用程式效能至關重要,尤其是在 I/O 密集型操作中,例如資料函式庫查詢。

### 遊標與非同步產生器

`asyncpg` 提供了遊標(cursor)的功能,可以更精細地控制資料擷取過程。遊標本身也是一個非同步產生器,可以逐筆產生資料,避免一次載入所有資料到記憶體,有效防止記憶體不足的窘境。

```python
import asyncpg
import asyncio

async def fetch_products(connection, batch_size=1000):
    async with connection.transaction():
        query = 'SELECT product_id, product_name FROM product'
        async for product in connection.cursor(query, prefetch=batch_size):
            yield product

async def main():
    connection = await asyncpg.connect(host='127.0.0.1', port=5432, user='postgres', database='products', password='password')
    async for product in fetch_products(connection):
        print(product)
    await connection.close()

asyncio.run(main())

fetch_products 函式利用 asyncpg 的遊標和 prefetch 引數,每次預取 batch_size 筆資料。透過非同步產生器 yield 逐筆回傳資料,實作資料的分批處理,避免記憶體爆滿。

aiostream:簡化非同步流程

aiostream 函式庫提供了更豐富的非同步產生器處理功能,可以簡化程式碼並提升效率。

import asyncpg
import asyncio
from aiostream import stream

async def process_product(product):
    # 模擬耗時操作
    await asyncio.sleep(0.1)
    return f"Processed: {product}"

async def main():
    connection = await asyncpg.connect(...)
    products = stream.iterate(fetch_products(connection))
    processed_products = products.map(process_product).to_list()
    results = await processed_products
    print(results)
    await connection.close()

asyncio.run(main())

使用 aiostreamstream.iteratefetch_products 的非同步產生器轉換為串流,再利用 map 方法將 process_product 函式應用到每一筆資料,最後使用 to_list 收集處理結果。這種寫法更加簡潔易懂,也更方便進行非同步流程控制。

多程式:突破 GIL 限制

Python 的全域性直譯器鎖(GIL)限制了多執行緒的平行運算能力。我發現,使用多程式可以有效繞過 GIL 的限制,充分利用多核心 CPU 的效能。

from multiprocessing import Pool
import asyncpg
import asyncio

async def process_batch(connection_args, products):
    connection = await asyncpg.connect(**connection_args)
    results = [await process_product(p) for p in products]  # 使用先前定義的 process_product 函式
    await connection.close()
    return results

if __name__ == "__main__":
    connection_args = dict(host='127.0.0.1', port=5432, user='postgres', database='products', password='password')
    batch_size = 10000
    with Pool() as pool:
        async def main():
            connection = await asyncpg.connect(**connection_args)
            product_batches = []
            async for i, product in enumerate(fetch_products(connection)):
                if i % batch_size == 0:
                    product_batches.append([])
                product_batches[-1].append(product)

            results = pool.map_async(process_batch, [(connection_args, batch) for batch in product_batches])
            all_processed_products = []
            for batch_result in results.get():
                all_processed_products.extend(batch_result)

            print(f"Processed {len(all_processed_products)} products.")
            await connection.close()
        asyncio.run(main())

程式碼首先將資料分成多個批次,然後使用 Pool.map_async 將每個批次分配給不同的程式處理。每個子程式都建立一個獨立的資料函式庫連線,並使用 asyncpg 進行非同步操作。results.get() 會取得所有子程式的結果,最後將所有結果合併。

流程視覺化

  graph LR
    B[B]
    A[取得資料函式庫連線] --> B{建立非同步產生器};
    B --> C[分批處理資料];
    C --> D[多程式平行處理];
    D --> E[彙總結果];

圖表説明: 此流程圖展示了使用 asyncpg 和多程式處理巨量資料的主要步驟。

  sequenceDiagram
    participant Main
    participant Pool
    participant Process 1
    participant Process 2

    Main->>Pool: 提交資料批次
    activate Pool
    Pool->>Process 1: 處理批次 1
    activate Process 1
    Pool->>Process 2: 處理批次 2
    activate Process 2
    Process 1-->>Pool: 傳回結果 1
    deactivate Process 1
    Process 2-->>Pool: 傳回結果 2
    deactivate Process 2
    Pool-->>Main: 傳回所有結果
    deactivate Pool

圖表説明: 此循序圖展示了多程式平行處理資料的流程。

透過結合 asyncpg 的非同步特性、遊標控制、非同步產生器、aiostream 函式庫以及 Python 的多程式機制,我們可以更有效率地處理巨量資料,避免效能瓶頸,開發更強健的資料函式庫應用程式。我深信,這種方法在處理巨量資料時具有顯著的優勢,值得廣泛應用。

在現代軟體開發中,並發程式設計已成為提升程式效能不可或缺的技術。Go 語言以其簡潔而強大的並發模型而聞名,其核心概念 goroutine 和 channel 提供了優雅與高效的並發處理方案。本文將探討 Go 的並發機制,並以實際程式碼範例示範如何運用 goroutine 和 channel 構建高效能的並發程式。

package main

import (
	"fmt"
	"sync"
	"time"
)

func sayHello(name string) string {
	return fmt.Sprintf("Hi there, %s", name)
}

func count(countTo int) int {
	start := time.Now()
	counter := 0
	for counter < countTo {
		counter++
	}
	fmt.Printf("計數到 %d 花費時間:%s\n", countTo, time.Since(start))
	return counter
}

func main() {
	// 使用 goroutine 和 channel 實作非同步結果處理
	{
		jeffChan := make(chan string)
		johnChan := make(chan string)

		go func() { jeffChan <- sayHello("Jeff") }()
		go func() { johnChan <- sayHello("John") }()

		fmt.Println(<-jeffChan)
		fmt.Println(<-johnChan)
	}

	// 使用 WaitGroup 和 channel 處理多個非同步操作
	{
		var wg sync.WaitGroup
		results := make(chan int, 5) // 使用 buffered channel 避免阻塞

		numbers := []int{1, 3, 5, 22, 100000000}
		for _, num := range numbers {
			wg.Add(1)
			go func(n int) {
				defer wg.Done()
				results <- count(n)
			}(num)
		}

		go func() {
			wg.Wait() // 等待所有 goroutine 完成
			close(results) // 關閉 channel,通知接收方所有結果已傳送
		}()

		for result := range results {
			fmt.Println(result)
		}
	}
}

這段程式碼展示了 Go 語言並發處理的精髓。第一部分利用 goroutine 和 channel 進行非同步操作,sayHello 函式在獨立的 goroutine 中執行,結果透過 channel 傳回。第二部分則示範如何使用 sync.WaitGroup 和 channel 協調多個 goroutine 的執行,count 函式模擬耗時操作,WaitGroup 確保所有 goroutine 完成後才繼續執行,buffered channel 則避免了阻塞,提升了程式效能。

我認為,Go 的並發模型相較於 Python 的多程式或執行緒池機制更為輕量與高效。Python 的全域解譯器鎖(GIL)限制了多執行緒的真正平行,而多程式則有較高的溝通成本。Go 的 goroutine 和 channel 則巧妙地避開了這些問題,讓開發者能更輕鬆地編寫高效能的並發程式。

  graph LR
A[主 Goroutine] --> B(建立 Goroutine 1);
A --> C(建立 Goroutine 2);
B --> D[Channel];
C --> D;
D --> E[結果處理];

圖表説明:此圖展示了 goroutine 和 channel 的協作關係。主 goroutine 建立多個 goroutine,並透過 channel 進行資料交換和同步。

  sequenceDiagram
    participant Main
    participant Goroutine1
    participant Goroutine2
    participant Channel

    Main ->> Goroutine1: 啟動
    Main ->> Goroutine2: 啟動
    Goroutine1 ->> Channel: 傳送資料
    Goroutine2 ->> Channel: 傳送資料
    Main ->> Channel: 接收資料
    Main ->> Main: 處理資料

圖表説明:此循序圖更詳細地描述了 goroutine 和 channel 的互動流程,展現了 Go 語言並發處理的優雅和高效。

從實務經驗來看,Go 的 goroutine 和 channel 非常適合處理 I/O 密集型任務,例如網路程式設計和檔案處理。其輕量級的特性讓建立大量 goroutine 成為可能,而 channel 則提供安全與高效的溝通機制,有效降低了並發程式設計的複雜度。

在技術選型上,若您的應用程式需要處理大量並發請求,與對效能有較高要求,那麼 Go 語言將是一個值得考慮的選項。其簡潔的語法、強大的並發模型以及高效的執行效能,能協助您構建更具擴充套件性和可靠性的應用程式。