協程是 Python 非同步程式設計的根本,讓程式得以暫停執行並儲存狀態,待適當時機再還原,有效提升 I/O 密集型任務的效率。傳統同步程式設計在 I/O 操作時容易造成阻塞,如同餐廳單一廚師處理所有訂單,而協程則像是多個廚師同時作業,提升整體效率。Python 的 asyncio 模組提供事件迴圈和任務管理等機制,配合 asyncawait 關鍵字,讓開發者能輕鬆定義和管理協程。asyncio 的事件迴圈機制能有效管理 I/O 密集型任務,例如網路請求或檔案讀寫,它會在 I/O 操作期間切換至其他可執行任務,避免阻塞。對於 CPU 密集型任務,則可藉由 concurrent.futures 模組,將任務提交到其他執行緒或行程執行,確保事件迴圈的流暢。

協程的奧秘:解鎖 Python 非同步程式設計的無限可能

協程 (Coroutine) 是 Python 中一種強大的特性,它賦予了程式非同步執行的能力。簡單來說,協程是可以暫停執行、儲存狀態,並在稍後從暫停處還原執行的函式。這種機制對於處理 I/O 密集型任務特別有效,例如網路請求、檔案讀寫等。

為什麼要使用協程?

傳統的同步程式設計模式在處理 I/O 任務時,往往會因為等待 I/O 操作完成而阻塞整個程式。這就像在餐廳點餐,廚師一次只能做一道菜,所有客人都必須排隊等待。而協程則像有多個廚師同時工作,可以先處理其他客人的訂單,等到有菜完成後再送上。

非同步程式設計可以顯著提高程式的效率和回應速度,特別是在高併發的場景下。

如何在 Python 中使用協程?

Python 的 asyncio 模組提供了協程的基礎架構,包括事件迴圈、任務管理等。要定義一個協程,我們需要使用 async def 關鍵字。

import asyncio

async def my_coroutine(id):
    print(f'Coroutine {id} started')
    await asyncio.sleep(1)  # 模擬 I/O 操作
    return f'Coroutine {id} finished'

async def main():
    tasks = [asyncio.create_task(my_coroutine(i)) for i in range(3)]
    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())

內容解密:

  1. async def my_coroutine(id):: 使用 async def 定義一個協程 my_coroutine,它接收一個 ID 作為引數。
  2. print(f’Coroutine {id} started’): 協程開始時印出一條訊息,顯示其 ID。
  3. await asyncio.sleep(1): 使用 await 關鍵字暫停協程的執行,模擬一個耗時的 I/O 操作(例如網路請求)。asyncio.sleep(1) 會讓協程休眠 1 秒。
  4. return f’Coroutine {id} finished’: 協程完成後,傳回一個包含其 ID 的訊息。
  5. async def main():: 定義另一個協程 main,用於管理其他的協程。
  6. tasks = [asyncio.create_task(my_coroutine(i)) for i in range(3)]: 建立一個包含三個 my_coroutine 協程的任務列表。asyncio.create_task 函式用於將協程轉換為一個任務,使其可以在事件迴圈中執行。
  7. results = await asyncio.gather(*tasks): 使用 asyncio.gather 函式並等待所有任務完成。asyncio.gather 接收一個任務列表,並平行執行這些任務,直到所有任務都完成。await 關鍵字用於暫停 main 協程的執行,直到 asyncio.gather 傳回結果。
  8. print(results): 印出所有協程的結果。
  9. asyncio.run(main()): 啟動事件迴圈並執行 main 協程。asyncio.run 函式是一個方便的函式,用於啟動事件迴圈並執行一個協程,它會自動處理事件迴圈的建立和關閉。

在這個例子中,main() 協程建立了三個 my_coroutine() 協程的例項,並使用 asyncio.create_task() 函式將它們轉換為任務。然後,我們使用 asyncio.gather() 函式等待所有任務完成並收集它們的結果,最後印出結果。

Asyncio 如何處理 I/O 密集型任務?

Asyncio 的優勢在於能夠有效地管理 I/O 密集型任務。當程式執行 I/O 操作時,它不需要等待操作完成,而是可以切換到另一個可以執行的任務。當 I/O 操作完成時,相應的任務會被還原執行。

這種方式避免了程式因為等待 I/O 操作而阻塞,提高了程式的整體效率。

import asyncio
import aiohttp

async def download_coroutine(session, url):
    async with session.get(url) as response:
        filename = url.split("/")[-1]
        with open(filename, "wb") as f:
            while True:
                chunk = await response.content.read(1024)
                if not chunk:
                    break
                f.write(chunk)
        print(f"Downloaded {url}")

async def download_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            task = asyncio.ensure_future(download_coroutine(session, url))
            tasks.append(task)
        await asyncio.gather(*tasks)

urls = [
    "https://www.python.org",
    "https://www.google.com",
    "https://www.bing.com",
    "https://www.yahoo.com",
]

loop = asyncio.get_event_loop()
loop.run_until_complete(download_all(urls))

內容解密:

  1. import asyncio: 匯入 asyncio 模組,用於非同步程式設計。
  2. import aiohttp: 匯入 aiohttp 模組,用於發起非同步 HTTP 請求。
  3. async def download_coroutine(session, url):: 定義一個協程函式 download_coroutine,用於下載指定 URL 的內容。它接收一個 session 物件和一個 url 作為引數。
  4. async with session.get(url) as response:: 使用 session.get(url) 發起一個非同步 HTTP GET 請求,並使用 async with 陳述式確保請求完成後會自動關閉連線。response 物件包含了伺服器的回應。
  5. filename = url.split("/")[-1]: 從 URL 中提取檔名。
  6. with open(filename, “wb”) as f:: 以二進位寫入模式開啟一個檔案,用於儲存下載的內容。使用 with 陳述式確保檔案在使用完畢後會自動關閉。
  7. while True:: 進入一個無限迴圈,用於分塊讀取回應內容。
  8. chunk = await response.content.read(1024):: 使用 response.content.read(1024) 非同步讀取回應內容的一個塊(1024 位元組)。await 關鍵字用於暫停協程的執行,直到讀取操作完成。
  9. if not chunk:: 檢查是否讀取到內容。如果 chunk 為空,表示已經讀取完所有內容,跳出迴圈。
  10. f.write(chunk): 將讀取到的內容塊寫入檔案。
  11. print(f"Downloaded {url}"): 印出一條訊息,表示該 URL 已經下載完成。
  12. async def download_all(urls):: 定義一個協程函式 download_all,用於平行下載多個 URL 的內容。它接收一個包含多個 URL 的列表作為引數。
  13. async with aiohttp.ClientSession() as session:: 建立一個 aiohttp.ClientSession 物件,用於管理多個 HTTP 連線。使用 async with 陳述式確保會話在使用完畢後會自動關閉。
  14. tasks = [ ]: 建立一個空列表,用於儲存所有下載任務。
  15. for url in urls:: 遍歷 URL 列表。
  16. task = asyncio.ensure_future(download_coroutine(session, url)):: 為每個 URL 建立一個下載任務。asyncio.ensure_future 函式用於將協程轉換為一個任務,使其可以在事件迴圈中執行。
  17. tasks.append(task): 將下載任務增加到任務列表中。
  18. await asyncio.gather(*tasks):: 使用 asyncio.gather 函式並等待所有下載任務完成。asyncio.gather 接收一個任務列表,並平行執行這些任務,直到所有任務都完成。await 關鍵字用於暫停 download_all 協程的執行,直到 asyncio.gather 傳回結果。
  19. urls = [ … ]: 建立一個包含多個 URL 的列表,用於測試下載功能。
  20. loop = asyncio.get_event_loop(): 取得當前事件迴圈。
  21. loop.run_until_complete(download_all(urls)): 啟動事件迴圈並執行 download_all 協程。loop.run_until_complete 函式會執行指定的協程,直到協程完成。

Asyncio 如何處理 CPU 密集型任務?

雖然 Asyncio 主要用於 I/O 密集型任務,但它也可以用於 CPU 密集型任務。對於 CPU 密集型任務,我們可以將任務提交到單獨的執行緒或行程中執行,以避免阻塞事件迴圈。

import asyncio
import concurrent.futures

async def cpu_bound_task(num):
    """
    一個 CPU 密集型任務,計算前 N 個自然數的和
    """
    return sum(range(num))

async def main():
    """
    主函式,建立一個執行器並執行任務
    """
    loop = asyncio.get_running_loop()
    executor = concurrent.futures.ThreadPoolExecutor()
    result = await loop.run_in_executor(executor, cpu_bound_task, 1000000)
    print(f"The result is {result}")

asyncio.run(main())

內容解密:

  1. import asyncio: 匯入 asyncio 模組,用於非同步程式設計。
  2. import concurrent.futures: 匯入 concurrent.futures 模組,用於平行執行 CPU 密集型任務。
  3. async def cpu_bound_task(num):: 定義一個協程函式 cpu_bound_task,用於執行 CPU 密集型任務。它接收一個數字 num 作為引數,用於指定計算範圍。
  4. “““A sample CPU-bound task that computes the sum of the first N natural numbers”””: 函式的說明檔案,說明該函式是一個 CPU 密集型任務,用於計算前 N 個自然數的和。
  5. return sum(range(num)): 計算前 N 個自然數的和,並傳回結果。
  6. async def main():: 定義一個協程函式 main,用於管理其他的協程和執行 CPU 密集型任務。
  7. loop = asyncio.get_running_loop(): 取得當前事件迴圈。
  8. executor = concurrent.futures.ThreadPoolExecutor(): 建立一個 ThreadPoolExecutor 物件,用於在執行緒池中執行 CPU 密集型任務。
  9. result = await loop.run_in_executor(executor, cpu_bound_task, 1000000): 使用 loop.run_in_executor 方法在執行緒池中執行 cpu_bound_task 函式。loop.run_in_executor 接收一個執行器物件、一個函式和函式的引數。await 關鍵字用於暫停 main 協程的執行,直到 cpu_bound_task 函式執行完成並傳回結果。
  10. print(f"The result is {result}"): 印出計算結果。
  11. asyncio.run(main()): 啟動事件迴圈並執行 main 協程。

在這個例子中,我們定義了一個 CPU 密集型任務,計算前 N 個自然數的和。然後,我們建立了一個 ThreadPoolExecutor,並使用 loop.run_in_executor() 方法在單獨的執行緒中執行該任務。await 關鍵字用於暫停協程的執行,直到任務完成。

如何將 Asyncio 與第三方函式庫結合使用?

Asyncio 可以與許多支援非同步操作的第三方函式庫結合使用,例如 aiohttpaioredisasyncpg 等。這些函式庫通常提供非阻塞的 API,可以與 Asyncio 無縫整合。

import asyncio
import aiohttp

async def main():
    """
    使用 aiohttp 非同步發起 HTTP 請求的範例
    """
    async with aiohttp.ClientSession() as session:
        async with session.get('https://www.example.com') as response:
            print("Status:", response.status)
            print("Content-type:", response.headers['content-type'])
            html = await response.text()
            print("Body:", html[:15], "...")

asyncio.run(main())

內容解密:

  1. import asyncio: 匯入 asyncio 模組,用於非同步程式設計。
  2. import aiohttp: 匯入 aiohttp 模組,用於發起非同步 HTTP 請求。
  3. async def main():: 定義一個協程函式 main,用於執行非同步 HTTP 請求。
  4. “““An example of making an HTTP request asynchronously using aiohttp”””: 函式的說明檔案,說明該函式是一個使用 aiohttp 非同步發起 HTTP 請求的範例。
  5. async with aiohttp.ClientSession() as session:: 建立一個 aiohttp.ClientSession 物件,用於管理 HTTP 連線。使用 async with 陳述式確保會話在使用完畢後會自動關閉。
  6. async with session.get(‘https://www.example.com’) as response:: 使用 session.get 方法發起一個非同步 HTTP GET 請求,請求的 URL 是 https://www.example.com。使用 async with 陳述式確保回應在使用完畢後會自動關閉。
  7. print(“Status:”, response.status): 印出 HTTP 回應的狀態碼。
  8. print(“Content-type:”, response.headers[‘content-type’]): 印出 HTTP 回應的內容型別。
  9. html = await response.text(): 使用 response.text() 方法非同步讀取 HTTP 回應的內容,並將其轉換為文字。await 關鍵字用於暫停 main 協程的執行,直到回應內容讀取完成。
  10. print(“Body:”, html[:15], “…”): 印出 HTTP 回應內容的前 15 個字元。
  11. asyncio.run(main()): 啟動事件迴圈並執行 main 協程。

在這個例子中,我們使用 aiohttp.ClientSession() 建立一個 HTTP 客戶端會話,並使用 session.get() 方法非同步發起 HTTP 請求。await 關鍵字用於暫停協程的執行,直到請求完成並收到回應。

玄貓認為,協程是 Python 中一種強大的非同步程式設計工具,可以顯著提高程式的效率和回應速度。透過 asyncio 模組和相關的第三方函式庫,我們可以輕鬆地編寫非同步程式,處理 I/O 密集型和 CPU 密集型任務。

使用第三方函式庫的 asyncio

asyncio 的強大之處在於它能與眾多第三方函式庫整合,讓非同步程式設計更加便利。許多熱門函式庫都已加入對 asyncio 的支援,讓我們能更輕鬆地編寫非同步程式碼。

以下是一些使用 asyncio 與第三方函式庫的範例:

使用 aiohttp 進行非同步 HTTP 請求

aiohttp 是一個根據 asyncio 的非同步 HTTP 使用者端/伺服器函式庫。它能讓我們傳送 HTTP 請求並非同步地處理回應。

import asyncio
import aiohttp

async def main():
    """
    使用 aiohttp 非同步傳送 HTTP 請求
    """
    async with aiohttp.ClientSession() as session:
        async with session.get('https://www.google.com') as response:
            print(response.status)
            print(await response.text())

asyncio.run(main())

程式碼解密:

  1. 引入函式庫:首先,我們匯入 asyncioaiohttp 函式庫。
  2. 建立非同步 HTTP 請求aiohttp.ClientSession() 建立一個客戶端會話,用於管理連線。
  3. 傳送 GET 請求session.get('https://www.google.com') 傳送一個 GET 請求到 Google 首頁。
  4. 非同步處理回應await response.text() 非同步地讀取回應內容。
  5. 列印狀態碼與內容:最後,我們印出回應的狀態碼和內容。

使用 aioredis 與 Redis 資料函式庫互動

aioredis 是一個根據 asyncio 的 Redis 使用者端。它能讓我們非同步地與 Redis 資料函式庫互動。

import asyncio
import aioredis

async def main():
    """
    使用 aioredis 非同步地與 Redis 互動
    """
    redis = await aioredis.create_redis_pool('redis://localhost')
    await redis.set('key', 'value')
    value = await redis.get('key', encoding='utf-8')
    print(value)
    redis.close()
    await redis.wait_closed()

asyncio.run(main())

程式碼解密:

  1. 引入函式庫:匯入 asyncioaioredis 函式庫。
  2. 建立 Redis 連線池aioredis.create_redis_pool('redis://localhost') 建立一個 Redis 連線池,用於管理與 Redis 伺服器的連線。
  3. 設定鍵值對await redis.set('key', 'value') 設定一個鍵值對。
  4. 取得鍵值await redis.get('key', encoding='utf-8') 取得指定鍵的值。
  5. 列印鍵值:印出取得到的值。
  6. 關閉連線:最後,關閉與 Redis 資料函式庫的連線。

asyncio 程式碼除錯

除錯 asyncio 程式碼可能具有挑戰性,特別是處理複雜的非同步程式時。以下是一些除錯 asyncio 程式碼的技巧與工具:

  • 使用 print 陳述式:這是最簡單直接的除錯方式。然而,由於程式碼的非同步特性,直接使用 print 可能難以追蹤程式的執行流程。
  • 使用 asyncio.gather:使用 asyncio.gather 等待多個協程完成後再列印結果,能更好地掌握程式的執行順序。
  • 使用 asyncio.Task.all_tasks():這個方法能取得所有待處理任務的列表,有助於找出卡住或執行時間過長的任務。

以下範例展示如何使用 asyncio.gatherasyncio.Task.all_tasks() 進行除錯:

import asyncio

async def coroutine1():
    print("協程 1 開始")
    await asyncio.sleep(1)
    print("協程 1 結束")

async def coroutine2():
    print("協程 2 開始")
    await asyncio.sleep(2)
    print("協程 2 結束")

async def main():
    print("主程式開始")
    await asyncio.gather(coroutine1(), coroutine2())
    print("主程式結束")

asyncio.run(main())

程式碼解密:

  1. 定義協程:定義兩個協程 coroutine1coroutine2,分別印出開始訊息、睡眠一段時間,然後印出結束訊息。
  2. 使用 asyncio.gatherasyncio.gather(coroutine1(), coroutine2()) 同時執行兩個協程,並等待它們完成。
  3. 列印訊息:在協程開始和結束時印出訊息,以追蹤程式的執行流程。

另一個範例展示如何使用 asyncio.Task.all_tasks() 找出待處理的任務:

import asyncio

async def coroutine1():
    print("協程 1 開始")
    await asyncio.sleep(1)
    print("協程 1 結束")

async def coroutine2():
    print("協程 2 開始")
    await asyncio.sleep(2)
    print("協程 2 結束")

async def main():
    print("主程式開始")
    task1 = asyncio.create_task(coroutine1())
    task2 = asyncio.create_task(coroutine2())
    tasks = asyncio.Task.all_tasks()
    print("所有任務:", tasks)
    await asyncio.gather(task1, task2)
    print("主程式結束")

asyncio.run(main())

程式碼解密:

  1. 建立任務:使用 asyncio.create_task 建立兩個任務 task1task2,分別執行 coroutine1coroutine2
  2. 取得所有任務asyncio.Task.all_tasks() 取得所有待處理任務的集合。
  3. 列印任務:印出所有任務的資訊,以便追蹤任務的狀態。

此外,aiodebugasyncio.run_in_executor 等工具也能用於除錯 asyncio 程式碼。aiodebug 是一個第三方函式庫,提供 asyncio 應用程式的除錯器。asyncio.run_in_executor 則能讓我們在執行器中執行同步程式碼,方便除錯。

Python 內建模組巡禮

Python 之所以廣受歡迎,原因之一是它擁有豐富的內建模組。這些模組提供各式各樣的功能和工具,讓開發者不必從頭編寫程式碼。

Python 的內建模組是預先存在的模組,它們與 Python 安裝包捆綁在一起,提供各種功能,可用於廣泛的應用程式。這些模組旨在透過提供可立即使用的函式來節省開發人員的時間和精力,這些函式可以快速有效地執行複雜的任務。

接下來,玄貓將探索 Python 中可用的各種內建模組,並討論如何使用它們來簡化開發並提高生產力。玄貓將介紹一系列模組,從更常用的模組(例如 mathdatetimeos)到一些不太知名的模組(例如 ctypespicklehashlib)。

  • math 模組:提供一系列數學函式,包括三角函式、對數函式和指數函式等。該模組廣泛用於科學應用程式,可用於執行各種計算,例如求一個數的平方根或產生隨機數。
  • datetime 模組:提供一系列用於處理日期和時間的函式。該模組可用於執行各種操作,例如計算兩個日期之間的差異、格式化日期和時間以及在不同時區之間進行轉換。
  • os 模組:提供用於與作業系統互動的函式。該模組可用於執行各種任務,例如建立和刪除檔案和目錄、導航檔案系統以及設定環境變數。

總結來說,asyncio 提供了強大的非同步程式設計能力,搭配第三方函式庫和有效的除錯技巧,能讓開發者編寫出高效與易於維護的程式碼。Python 的內建模組則提供了豐富的功能,讓開發更加便捷。