與 PostgreSQL 資料函式庫互動:建立連線與執行查詢

首先,我們需要建立與 PostgreSQL 資料函式庫的連線,才能執行 SQL 查詢。asyncpg 提供了 connect() 協程用於建立連線,execute() 協程用於執行 SQL 陳述式,以及 fetch() 協程用於取得查詢結果。以下程式碼示範如何插入和查詢品牌資料:

import asyncpg
import asyncio
from asyncpg import Record
from typing import List

async def main():
    conn = await asyncpg.connect(host='127.0.0.1',
                                        port=5432,
                                        user='postgres',
                                        database='products',
                                        password='password')
    await conn.execute("INSERT INTO brand VALUES(DEFAULT, 'Levis')")
    await conn.execute("INSERT INTO brand VALUES(DEFAULT, 'Seven')")
    brand_query = 'SELECT brand_id, brand_name FROM brand'
    results: List[Record] = await conn.fetch(brand_query)
    for brand in results:
        print(f'id: {brand["brand_id"]}, name: {brand["brand_name"]}')
    await conn.close()

asyncio.run(main())

這段程式碼首先透過 asyncpg.connect() 建立與 PostgreSQL 資料函式庫的連線。接著,它依序插入兩個品牌 “Levis” 和 “Seven” 到 brand 表中。然後,它利用 conn.fetch() 執行 brand_query 查詢所有品牌資料,並將結果儲存在 results 變數中,這個變數的型別是 List[Record],其中 Record 物件類別似於字典,允許我們使用欄位名稱取值。最後,程式迴圈印出每個品牌的 ID 和名稱,並關閉資料函式庫連線。

非同步查詢:提升效能的關鍵

asyncpg 的 execute()fetch() 都是協程,這讓我們能充分利用 asyncio 的 API 進行非同步操作,例如 asyncio.gather()。非同步操作允許多個查詢同時執行,尤其在 I/O 密集型操作中,能大幅提升效能。

  graph LR
    B[B]
    C[C]
    A[建立連線] --> B{執行查詢 1};
    A --> C{執行查詢 2};
    B --> D[處理結果 1];
    C --> E[處理結果 2];
    D --> F[結束];
    E --> F;

這個流程圖清楚地展示了非同步查詢的執行流程。建立連線後,我們可以同時執行查詢 1 和查詢 2,不必等其中一個查詢完成才能執行另一個。這種平行處理方式顯著縮短了整體執行時間。

  sequenceDiagram
    participant 使用者
    participant 應用程式
    participant 資料函式庫

    使用者->>應用程式: 傳送請求
    activate 應用程式
    應用程式->>資料函式庫: 非同步查詢 1
    應用程式->>資料函式庫: 非同步查詢 2
    資料函式庫-->>應用程式: 傳回結果 1
    資料函式庫-->>應用程式: 傳回結果 2
    應用程式-->>使用者: 回應結果
    deactivate 應用程式

此序列圖展現了使用者、應用程式和資料函式庫之間的互動流程。應用程式利用非同步查詢同時向資料函式庫傳送查詢 1 和查詢 2,並在收到兩個查詢的結果後,將結果回傳給使用者。這種非同步的處理方式有效地減少了使用者的等待時間。

我個人在開發高併發網路應用時,經常使用 asyncpg 搭配連線池來管理資料函式庫連線,有效地避免了頻繁建立和關閉連線的開銷,進一步提升了系統的整體效能和穩定性。

```python
import asyncio
import aiohttp

async def fetch_status(session, url, delay=0):
    await asyncio.sleep(delay)
    try:
        async with session.get(url) as response:
            return response.status
    except aiohttp.ClientError as e:
        print(f"網址 {url} 發生錯誤: {e}")
        raise

async def main():
    async with aiohttp.ClientSession() as session:
        urls = ['https://www.example.com', 'https://www.google.com', 'https://不存在的網址.com']
        tasks = [fetch_status(session, url, delay=i) for i, url in enumerate(urls)]
        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)

        for task in done:
            try:
                result = task.result()
                print(f"網址 {urls[tasks.index(task)]} 的狀態碼: {result}")
            except Exception as e:
                print(f"任務 {task} 丟擲異常: {e}")

        for task in pending:
            task.cancel()
            print(f"任務 {task} 已取消")

asyncio.run(main())

這個程式碼範例示範了使用 asyncio.wait 並搭配 asyncio.FIRST_EXCEPTION 引數。當其中一個任務丟擲異常時,asyncio.wait 會立即傳回,並將已完成和未完成的任務分別放入 donepending 集合中。程式碼接著取消所有未完成的任務,避免浪費資源。fetch_status 函式加入了錯誤處理,模擬網路請求可能發生的錯誤,並使用 delay 引數模擬不同任務的執行時間。

  sequenceDiagram
    participant main
    participant task1
    participant task2
    participant task3

    main->>task1: 建立 task1 (delay=0)
    main->>task2: 建立 task2 (delay=1)
    main->>task3: 建立 task3 (delay=2)

    main->>+asyncio.wait: 等待任務,return_when=FIRST_EXCEPTION

    alt task3 發生錯誤
        task3-->>-asyncio.wait: 丟擲異常
        main->>task2: 取消 task2
    else task2 發生錯誤
        task2-->>-asyncio.wait: 丟擲異常
        main->>task3: 取消 task3
    else task1 發生錯誤
        task1-->>-asyncio.wait: 丟擲異常
        main->>task2: 取消 task2
        main->>task3: 取消 task3
    else 其他情況
        note right of main: 所有任務正常完成
    end

使用 asyncio.waitreturn_when=asyncio.FIRST_EXCEPTION 可以更有效地管理併發任務,特別是在需要及時處理錯誤並停止其他任務的情況下。

  graph LR
    ALL_COMPLETED[ALL_COMPLETED]
    B[B]
    C[C]
    E[E]
    FIRST_EXCEPTION[FIRST_EXCEPTION]
    A[建立任務列表] --> B{使用 asyncio.wait 等待任務};
    B -- FIRST_EXCEPTION --> C{第一個任務發生異常};
    C --> D[取消剩餘任務];
    B -- ALL_COMPLETED --> E{所有任務完成};
    E --> F[處理結果];
import asyncio
import asyncpg

async def connect_to_database(user, password, database, host='127.0.0.1', port=5432):
    conn = await asyncpg.connect(user=user, password=password,
                                database=database, host=host, port=port)
    return conn

async def fetch_product_data(conn, product_ids):
    query = "SELECT * FROM products WHERE id = ANY($1::int[])"
    return await conn.fetch(query, product_ids)

async def main():
    conn = await connect_to_database('user', 'password', 'products')
    try:
        product_ids = [1, 2, 3, 4, 5]
        products = await fetch_product_data(conn, product_ids)
        for product in products:
            print(product)
    finally:
        await conn.close()

asyncio.run(main())

這段程式碼示範使用asyncpg 進行資料函式庫查詢。connect_to_database 函式建立與資料函式庫的連線。fetch_product_data 函式則執行 SQL 查詢,利用 ANY 運算元有效查詢多個產品 ID。main 函式中,我們取得產品資料並印出。finally 區塊確保資料函式庫連線關閉。

  sequenceDiagram
    participant User
    participant Application
    participant Database

    User->>Application: 請求產品資料
    activate Application
    Application->>Database: 傳送 SQL 查詢 (asyncpg)
    activate Database
    Database-->>Application: 回傳查詢結果
    deactivate Database
    Application-->>User: 顯示產品資料
    deactivate Application
  graph LR
    B[B]
    C[C]
    A[建立資料函式庫連線] --> B{執行 SQL 查詢}
    B --> C{處理結果}
    C --> D[關閉連線]

透過 asyncpg,我們可以有效利用非同步程式設計的優勢,提升資料函式庫查詢效率。以下是一些進階應用:

  • 連線池: 建立連線池可以重複利用資料函式庫連線,減少建立連線的開銷。asyncpg 提供了 create_pool 函式來建立連線池。
  • 交易管理: asyncpg 支援交易操作,確保資料一致性。可以使用 conn.transaction() 方法來管理交易。
  • Prepared Statements: 使用 Prepared Statements 可以提高查詢效能,並防止 SQL 注入攻擊。

asyncpg 提供了豐富的功能和高度的靈活性,讓開發者能夠在 Python 的非同步環境中輕鬆操作 PostgreSQL 資料函式庫,開發高效能的應用程式。

藉由結合非同步程式設計和高效的資料函式庫驅動程式 asyncpg,我們可以顯著提升應用程式在高併發情境下的效能。這對於需要處理大量資料函式庫操作的應用,例如電商平台、社群網路等,至關重要。

使用 Asyncpg 進行高效能 Python 資料函式庫查詢

身為一個技術工作者,我經常需要處理大量資料,而資料函式庫查詢的效率對應用程式的效能至關重要。Asyncpg 是一個絕佳的 PostgreSQL 非同步客戶端函式庫,它允許我們以非同步的方式執行資料函式庫操作,從而提升應用程式的反應速度和吞吐量。以下,我將分享一些使用 Asyncpg 進行高效能資料函式庫查詢的技巧和心得。

建立資料函式庫連線和 Schema

在開始任何資料函式庫操作之前,我們必須先建立與資料函式庫的連線。以下程式碼片段示範如何使用 Asyncpg 連線到 PostgreSQL 資料函式庫,並檢查伺服器版本:

import asyncpg
import asyncio

async def connect_to_database(user, password, database):
    conn = await asyncpg.connect(user=user, password=password, database=database)
    version = conn.get_server_version()
    print(f"已連線 PostgreSQL 版本:{version}")
    return conn

async def main():
    conn = await connect_to_database('postgres', 'password', 'products')
    await conn.close()

asyncio.run(main())

這個 connect_to_database 函式封裝了連線邏輯,接收使用者名稱、密碼和資料函式庫名稱作為引數,並回傳一個 Asyncpg 連線物件。main 函式示範瞭如何使用這個函式連線到名為 products 的資料函式庫。

建立連線後,我們需要定義資料函式庫 Schema。以下 圖表展示了產品資料函式庫的 Schema 設計:

  erDiagram
    Brand {
        int brand_id PK
        text brand_name
    }
    Product {
        int product_id PK
        text product_name
        int brand_id FK
    }
    SKU {
        int sku_id PK
        int product_id FK
        int size_id FK
        int color_id FK
    }
    Size {
        int size_id PK
        text size_name
    }
    Color {
        int color_id PK
        text color_name
    }

    Brand ||--o{ Product : has
    Product ||--o{ SKU : has
    Size ||--o{ SKU : has
    Color ||--o{ SKU : has}}}}

這個實體關係圖 (ERD) 描述了產品、品牌、SKU、尺寸和顏色之間的關係。每個實體都有其主鍵,SKU 則透過外部索引鍵與產品、尺寸和顏色關聯。

接著,我們可以使用 SQL 陳述式建立這些表格:

CREATE_BRAND = """CREATE TABLE IF NOT EXISTS brand (brand_id SERIAL PRIMARY KEY, brand_name TEXT NOT NULL);"""
# ... 其他 SQL 建立表格陳述式 ...

async def create_tables(conn):
    await conn.execute(CREATE_BRAND)
    # ... 執行其他 SQL 建立表格陳述式 ...

async def main():
    conn = await connect_to_database('postgres', 'password', 'products')
    await create_tables(conn)
    await conn.close()

asyncio.run(main())

create_tables 函式使用 conn.execute() 方法執行 SQL 建立表格陳述式。

執行資料函式庫查詢和操作

建立表格後,我們可以開始執行資料函式庫查詢和操作。以下程式碼示範如何插入產品資料:

INSERT_PRODUCT = """INSERT INTO product (product_name, brand_id) VALUES ($1, $2);"""

async def insert_product(conn, product_name, brand_id):
    await conn.execute(INSERT_PRODUCT, product_name, brand_id)

# ...其他插入資料的函式...

insert_product 函式使用引數化查詢,有效防止 SQL 注入攻擊。

Asyncpg 也支援非同步的 fetch() 方法,用於查詢資料:

async def get_all_products(conn):
    rows = await conn.fetch("SELECT * FROM product;")
    return rows

get_all_products 函式查詢所有產品資料,並以列表形式回傳。

Asyncpg 提供了強大的非同步資料函式庫操作功能,能大幅提升 Python 應用程式的效能。透過結合非同步程式設計和 Asyncpg,我們可以更有效率地處理資料函式庫操作,開發更具回應性和高吞吐量的應用程式。