Python 的非同步程式設計能力近年來日益重要,尤其在網路應用、資料處理等領域。本文將深入探討 Python 非同步程式設計的各種技巧,從協程的基礎概念到 asyncio 模組的應用,再到 RxPY 框架的反應式程式設計正規化,帶領讀者逐步掌握 Python 非同步程式設計的精髓。協程是 Python 非同步程式設計的基本,透過 yieldsend 關鍵字,可以實作協程之間的資料交換與控制流程。asyncio 模組提供更進階的非同步程式設計工具,例如事件迴圈、任務排程和非阻塞網路請求,讓開發者能更有效率地處理 I/O 密集型任務。最後,RxPY 框架引入了反應式程式設計的正規化,透過可觀察序列和運運算元,以更簡潔、宣告式的方式處理非同步資料流。

基本概念

要建立一個協程,需要使用 yield 關鍵字將值從協程中產生出來。要向協程傳送值,可以使用 send 方法。這樣,協程就可以接收和處理值。

使用 yieldsend 的協程

以下是一個簡單的協程範例:

def parrot():
    while True:
        message = yield
        print(message)

gen = parrot()
gen.send(None)  # 啟動協程
gen.send("Hello")
gen.send("World")

在這個範例中,parrot 函式是一個協程,它使用 yield 關鍵字接收值。要啟動協程,需要先傳送 None,然後就可以傳送其他值。

使用 asyncio 的非同步程式設計

Python 的 asyncio 模組提供了一種更直觀的方式來定義協程。使用 async def 關鍵字可以定義一個協程:

import asyncio

async def hello():
    print("Hello, async!")

coro = hello()
print(coro)  # 輸出:<coroutine object hello at 0x...>

在這個範例中,hello 函式是一個協程,它使用 async def 關鍵字定義。當呼叫 hello 函式時,傳回的是一個協程物件。

執行協程

要執行協程,需要使用 asyncio 的事件迴圈(event loop)。以下是一個簡單的範例:

import asyncio

async def hello():
    print("Hello, async!")

loop = asyncio.get_event_loop()
coro = hello()
loop.run_until_complete(coro)

在這個範例中,使用 asyncio.get_event_loop 函式取得事件迴圈物件,然後使用 run_until_complete 方法執行協程。

非同步程式設計的美麗:Native Coroutines

Native Coroutines 是使用 async def 陳述式定義的協程。這種協程可以使用 await 語法等待資源,讓程式設計師可以撰寫乾淨、簡潔的非同步程式碼。

使用 asyncio.sleep 函式

例如,我們可以使用 asyncio.sleep 函式讓程式暫停一段時間,然後執行某個動作。以下是範例程式碼:

import asyncio

async def wait_and_print(msg):
    await asyncio.sleep(1)
    print("Message: ", msg)

loop = asyncio.get_event_loop()
loop.run_until_complete(wait_and_print("Hello"))

這段程式碼使用 asyncio.sleep 函式暫停 1 秒,然後印出 “Message: Hello”。

事件迴圈的斷點

await 語法提供了事件迴圈的斷點,讓事件迴圈可以在等待資源的同時,管理其他協程。這使得協程可以彼此之間進行非同步的溝通。

協程的串接

協程也可以使用 await 語法串接在一起,讓程式設計師可以撰寫複雜的非同步程式碼。以下是範例程式碼:

async def network_request(number):
    await asyncio.sleep(1.0)
    return {"success": True, "result": number ** 2}

async def main():
    result = await network_request(10)
    print(result)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

這段程式碼使用 asyncio.sleep 函式模擬網路請求,然後使用 await 語法串接 network_request 協程和 main 協程。

內容解密:

  • async def 陳述式用於定義協程。
  • await 語法用於等待資源。
  • asyncio.sleep 函式用於暫停程式執行。
  • 協程可以使用 await 語法串接在一起。

圖表翻譯:

  flowchart TD
    A[開始] --> B[等待資源]
    B --> C[執行動作]
    C --> D[傳回結果]
    D --> E[結束]

這個圖表展示了協程的執行流程,包括等待資源、執行動作和傳回結果。

208 實作並發

要實作並發,玄貓可以使用 asyncioensure_future 函式。這個函式可以排程協程(coroutine)和期物(future)以便執行。以下是使用 ensure_future 的範例:

import asyncio

async def fetch_square(number):
    # 模擬網路請求
    response = await network_request(number)
    if response["success"]:
        print("結果是:{}".format(response["result"]))

# 排程多個任務
asyncio.ensure_future(fetch_square(2))
asyncio.ensure_future(fetch_square(3))
asyncio.ensure_future(fetch_square(4))

# 啟動事件迴圈
loop = asyncio.get_event_loop()
loop.run_forever()

在這個範例中,fetch_square 是一個協程,負責模擬網路請求。使用 ensure_future 函式可以排程多個 fetch_square 任務,以便並發執行。

使用 run_until_complete 執行任務

如果您想要執行單一任務,可以使用 run_until_complete 函式。以下是範例:

loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_square(2))
loop.run_until_complete(fetch_square(3))
loop.run_until_complete(fetch_square(4))

但是,這種方法不適合於並發執行多個任務。

使用 ensure_future 的好處

使用 ensure_future 的好處是可以在事件迴圈已經啟動的情況下提交新的任務。這在實際應用中非常有用,因為我們通常需要在事件迴圈已經啟動的情況下提交新的任務。

圖表翻譯:

  flowchart TD
    A[啟動事件迴圈] --> B[提交任務]
    B --> C[執行任務]
    C --> D[完成任務]
    D --> E[提交新任務]
    E --> B

這個圖表展示了事件迴圈的執行流程。首先啟動事件迴圈,然後提交任務,執行任務,完成任務,最後提交新任務。這個流程可以不斷重複,以便實作高效的並發執行。

非阻塞程式設計與 asyncio

在 Python 中,asyncio 是一個強大的工具,讓我們可以輕易地實作非阻塞的程式設計。然而,在某些情況下,我們可能需要使用阻塞的 API 或執行長時間的計算。這個時候,我們需要找到方法將阻塞的程式碼轉換為非阻塞的程式碼。

處理阻塞程式碼

有一個有效的策略是將阻塞的程式碼執行在一個獨立的執行緒中。執行緒是由作業系統實作的,允許我們平行執行阻塞的程式碼。Python 提供了 Executor 介面來執行任務在一個獨立的執行緒中,並使用 Future 來監控其進度。

使用 ThreadPoolExecutor

我們可以使用 ThreadPoolExecutor 來初始化一個執行緒池。執行緒池會產生一組執行緒(稱為工作者),這些工作者會等待執行任務。一旦提交了一個函式,執行緒池會負責將其分派給一個可用的工作者執行緒,並追蹤結果。max_workers 引數可以用來選擇執行緒的數量。

import time
from concurrent.futures import ThreadPoolExecutor

# 初始化執行緒池
executor = ThreadPoolExecutor(max_workers=3)

def wait_and_return(msg):
    # 阻塞 1 秒
    time.sleep(1)
    return msg

# 提交任務
future = executor.submit(wait_and_return, "Hello, World!")
print(future)  # <Future at 0x7ff616ff6748 state=running>

結果

在上面的例子中,我們建立了一個 ThreadPoolExecutor 例項,具有 3 個工作者執行緒。然後,我們提交了一個 wait_and_return 函式,該函式會阻塞 1 秒並傳回一個訊息字串。最後,我們使用 submit 方法來安排其執行。

非同步執行與並發實作

在 Python 中,非同步執行和並發可以使用 asyncioconcurrent.futures 模組來實作。以下是使用 loop.run_in_executor 方法來管理任務執行的範例。

使用 loop.run_in_executor 方法

loop.run_in_executor 方法可以用來將任務提交到執行器(executor)中執行。這個方法會傳回一個 asyncio.Future 例項,可以用來等待任務完成。

import asyncio

# 建立一個執行器
executor = asyncio.get_event_loop().run_in_executor(None, lambda: None)

# 定義一個同步函式
def wait_and_return(msg):
    # 等待 1 秒
    import time
    time.sleep(1)
    return msg

# 提交任務到執行器
loop = asyncio.get_event_loop()
fut = loop.run_in_executor(None, wait_and_return, "Hello, asyncio executor")

# 等待任務完成
result = loop.run_until_complete(fut)
print(result)  # 輸出:Hello, asyncio executor

實作並發網頁抓取

以下是使用 loop.run_in_executor 方法來實作並發網頁抓取的範例。

import asyncio
import requests

# 建立一個執行器
loop = asyncio.get_event_loop()

# 定義一個非同步函式
async def fetch_urls(urls):
    responses = []
    for url in urls:
        # 提交任務到執行器
        response = await loop.run_in_executor(None, requests.get, url)
        responses.append(response)
    return responses

# 定義要抓取的網頁 URL
urls = ["http://example.com", "http://www.python.org"]

# 執行非同步函式
responses = loop.run_until_complete(fetch_urls(urls))

# 列印抓取的網頁內容
for response in responses:
    print(response.text)

在這個範例中,fetch_urls 函式會提交多個任務到執行器中執行,每個任務都會抓取一個網頁的內容。然後,loop.run_until_complete 方法會等待所有任務完成,並傳回抓取的網頁內容。

非阻塞式網路請求與反應式程式設計

在前面的章節中,我們討論瞭如何使用 Python 的 asyncio 函式函式庫實作非阻塞式網路請求。然而,當面對大量的網路請求時,使用 asyncio.gather 來提交所有的 coroutine 並收集結果可能不是最有效的方法。這是因為 asyncio.gather 會將所有的 coroutine 提交給事件迴圈,然後等待所有的結果傳回。

使用 aiohttp 實作非阻塞式網路請求

為了避免這個限制,我們可以使用一個原生非阻塞式的函式函式庫,如 aiohttp。aiohttp 是一個根據 asyncio 的 HTTP 客戶端和伺服器函式函式庫,允許我們實作非阻塞式的網路請求。

import aiohttp
import asyncio

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = ["http://example.com", "http://example.org"]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for result in results:
            print(result)

asyncio.run(main())

反應式程式設計

反應式程式設計是一種程式設計正規化,旨在建立更好的並發系統。反應式應用程式設計目的是符合以下要求:

  • 回應式:系統立即回應使用者的輸入。
  • 彈性:系統可以處理不同級別的負載,並可以適應增加的需求。
  • 容錯:系統優雅地處理故障。
  • 根據訊息:系統不應該阻塞,並且應該利用事件和訊息。

使用 RxPY 實作反應式程式設計

RxPY 是一個 Python 的反應式程式設計函式函式庫,提供了一個簡單的方式來實作反應式程式設計。以下是使用 RxPY 實作一個簡單的反應式程式設計範例:

import rx
from rx import operators as ops

# 建立一個可觀察序列
observable = rx.from_iterable([1, 2, 3, 4, 5])

# 訂閱可觀察序列
observable.pipe(
    ops.map(lambda x: x * 2),
    ops.filter(lambda x: x > 4)
).subscribe(
    on_next=lambda x: print(f"接收到值: {x}"),
    on_error=lambda e: print(f"發生錯誤: {e}"),
    on_completed=lambda: print("完成")
)

在這個範例中,我們建立了一個可觀察序列,然後使用 mapfilter 運算子來轉換和過濾值。最後,我們訂閱了可觀察序列,並處理接收到的值、錯誤和完成事件。

瞭解 Reactive Programming 的基礎

Reactive Programming 是一種程式設計風格,主要思想是對事件做出反應。在前面的章節中,我們已經看到了一些使用回呼函式(callbacks)的例子,當事件發生時,回呼函式會被執行。在 Reactive Programming 中,這個思想被擴充套件到事件被視為資料流的概念。

資料流的概念

資料流可以從一個迭代器(iterator)中建立。例如,使用 from_iterable 方法可以從一個範圍(range)建立一個資料流:

from rx import from_iterable

obs = from_iterable(range(4))

訂閱資料流

要接收資料流中的資料,我們可以使用 subscribe 方法,並傳入一個函式,這個函式會在資料源發出每個值時被執行:

obs.subscribe(print)

這會輸出:

0
1
2
3

觀察者(Observables)和迭代器(Iterators)

觀察者和迭代器都是有序的集合,類似於列表或其他迭代器。觀察者這個術語來自於觀察者(observer)和迭代器(iterable)的組合。觀察者是一個對變數的變化做出反應的物件,而迭代器是一個可以產生和跟蹤迭代器的物件。

在 Python 中,迭代器是定義了 __next__ 方法的物件,其元素可以透過 next 函式或 for 迴圈提取。一次迭代器可以被取得,然後我們可以使用 next 函式或 for 迴圈提取元素。當一個元素被消費後,就不能回頭了。

示例:使用迭代器

下面是使用迭代器的示例:

collection = list([1, 2, 3, 4, 5])
iterator = iter(collection)

print("Next")
print(next(iterator))  # 輸出: 1
print(next(iterator))  # 輸出: 2

這個示例展示瞭如何從一個列表中建立一個迭代器,並使用 next 函式提取元素。

圖表翻譯:

  graph LR
    A[資料流] --> B[觀察者]
    B --> C[迭代器]
    C --> D[元素提取]
    D --> E[消費元素]
    E --> F[不能回頭]

這個圖表展示了資料流、觀察者、迭代器、元素提取、消費元素和不能回頭的關係。

反應式程式設計與迭代器

在探討反應式程式設計的世界時,瞭解迭代器和生成器的差異是非常重要的。迭代器是一種可以用於迴圈遍歷序列的物件,它允許我們一次取出序列中的一個元素。生成器則是一種特殊的迭代器,它可以在執行過程中動態生成序列中的元素。

迭代器的使用

下面是一個簡單的迭代器範例:

iterator = iter([1, 2, 3, 4, 5])
print(next(iterator))  # 輸出: 1
print(next(iterator))  # 輸出: 2

print("For loop")
for i in iterator:
    print(i)  # 輸出: 3, 4, 5

在這個範例中,next() 函式用於從迭代器中取出下一個元素。當我們使用 for 迴圈時,迭代器也會自動地產生序列中的元素。

迭代器與生成器的差異

雖然迭代器和生成器都可以用於迴圈遍歷序列,但是它們之間存在著一些差異。生成器是一種特殊的迭代器,它可以在執行過程中動態生成序列中的元素。換句話說,生成器可以在執行過程中產生新的元素,而迭代器只能從既有的序列中取出元素。

觀察者模式

觀察者模式是一種設計模式,它允許物件在狀態改變時通知其他物件。觀察者模式可以用於實作反應式程式設計的功能。觀察者可以在資料源中註冊,當資料源的狀態改變時,觀察者會收到通知。

下面是一個簡單的觀察者模式範例:

import rx
from rx import operators as ops

# 建立一個觀察者
observable = rx.from_iterable([1, 2, 3, 4, 5])

# 註冊回呼函式
observable.subscribe(
    on_next=lambda i: print(f"接收到資料: {i}"),
    on_error=lambda e: print(f"發生錯誤: {e}"),
    on_completed=lambda: print("資料傳輸完成")
)

在這個範例中,rx.from_iterable() 函式用於建立一個觀察者,subscribe() 方法用於註冊回呼函式。當觀察者收到資料時,會呼叫 on_next() 回呼函式;當觀察者發生錯誤時,會呼叫 on_error() 回呼函式;當觀察者完成資料傳輸時,會呼叫 on_completed() 回呼函式。

圖表翻譯:

  graph LR
    A[資料源] -->|註冊|> B[觀察者]
    B -->|接收資料|> C[回呼函式]
    C -->|處理資料|> D[結果]
    D -->|通知觀察者|> B

在這個圖表中,資料源註冊觀察者,觀察者接收資料並呼叫回呼函式,回呼函式處理資料並產生結果,結果通知觀察者。這個過程實作了觀察者模式的功能。

RxPy 的觀察者模式和迭代器的相似性

RxPy 的觀察者模式和迭代器有著相似的特性。觀察者模式可以使用 on_next 函式來處理下一個可用的專案,使用 on_completed 函式來處理沒有更多資料的情況。以下是一個範例:

from rx import from_iterable

obs = from_iterable(range(4))
obs.subscribe(
    on_next=lambda x: print("Next item:", x),
    on_completed=lambda: print("No more data")
)

這個範例會輸出:

Next item: 0
Next item: 1
Next item: 2
Next item: 3
No more data

RxPy 的運運算元

RxPy 提供了許多運運算元來建立、轉換、過濾和分組觀察者。這些運運算元傳回新的觀察者,可以方便地連結和組合在一起。一個簡單的範例是使用 take 運運算元。

from rx import from_iterable
from rx.operators import take

op = take(4)

obs = from_iterable(range(1000))
op(obs).subscribe(print)

這個範例會輸出前 4 個專案:

0
1
2
3

take 運運算元會傳回一個新的觀察者,在接收到 4 個專案後停止。這個範例展示了 RxPy 的觀察者模式和迭代器的相似性,以及 RxPy 的運運算元如何方便地連結和組合在一起。

反應式程式設計中的運運算元

反應式程式設計(Reactive Programming)是一種強大的程式設計模型,允許開發人員以宣告式的方式處理非同步資料流。在 RxPy 中,運運算元(Operators)是用於轉換和操作可觀察序列(Observables)的基本單位。

有用的運運算元

在本節中,我們將探討一些常用的運運算元,它們可以用於轉換和操作可觀察序列中的元素。其中最著名的運運算元是 map,它將一個函式應用於可觀察序列中的每個元素,並傳回一個新的可觀察序列。

例如,我們可以使用 map 運運算元計算一系列數字的平方:

from rx.operators import map

numbers = [1, 2, 3, 4]
squared_numbers = map(lambda x: x**2)(numbers)
print(list(squared_numbers))  # [1, 4, 9, 16]

運運算元可以使用大理石圖(Marble Diagrams)來表示,這有助於我們更好地理解運運算元的工作原理,尤其是在考慮到元素可以在一段時間內發射的情況下。

大理石圖

在大理石圖中,一個資料流(在我們的例子中是一個可觀察序列)由一條水平線表示。一個圓圈(或其他形狀)代表一個發射的值,一個 X 符號代表一個錯誤,而一條垂直線代表資料流的結束。

以下是 map 運運算元的大理石圖:

  +-----------+
  |  Source  |
  +-----------+
           |
           |
           v
  +-----------+
  |  Transformation  |
  +-----------+
           |
           |
           v
  +-----------+
  |  Result  |
  +-----------+

另一個轉換運運算元的例子是 group_by,它根據一個鍵將專案分組。group_by 運運算元可以用於動態地將專案分組,當專案被發射時。

分組運運算元

以下是 group_by 運運算元的大理石圖:

  +-----------+
  |  Source  |
  +-----------+
           |
           |
           v
  +-----------+
  |  Grouping  |
  +-----------+
           |
           |
           v
  +-----------+
  |  Result  |
  +-----------+

假設我們想要根據一個數字是偶數還是奇數來分組。 我們可以使用 group_by 運運算元來實作這個功能:

from rx.operators import group_by

numbers = [1, 2, 3, 4, 5, 6]
grouped_numbers = group_by(lambda x: x % 2 == 0)(numbers)
print(list(grouped_numbers))  # [(True, [2, 4, 6]), (False, [1, 3, 5])]

在這個例子中,group_by 運運算元根據一個數字是偶數還是奇數來分組,並傳回一個新的可觀察序列,其中包含分組的結果。

內容解密:

在這個例子中,我們使用 mapgroup_by 運運算元來轉換和操作可觀察序列中的元素。 map 運運算元將一個函式應用於每個元素,並傳回一個新的可觀察序列。 group_by 運運算元根據一個鍵將專案分組,並傳回一個新的可觀察序列,其中包含分組的結果。

圖表翻譯:

以下是 mapgroup_by 運運算元的大理石圖:

  +-----------+
  |  Source  |
  +-----------+
           |
           |
           v
  +-----------+
  |  Transformation  |
  +-----------+
           |
           |
           v
  +-----------+
  |  Result  |
  +-----------+

這個圖表顯示了 mapgroup_by 運運算元的工作原理,包括轉換和分組的過程。

使用 group_by 進行資料分組

在 reactive programming 中,group_by 是一個非常有用的 operator,可以根據某個條件將資料分組。以下是使用 group_by 的範例:

from rx.operators import group_by

# 定義一個 observable,包含 0 到 3 的整數
obs = group_by(lambda x: x % 2)(range(4))

# 訂閱 obs 並印出其內容
obs.subscribe(print)

這將輸出兩個 observable,分別對應於偶數和奇數。

提取分組資料

要提取分組資料,可以使用 key 屬性來確定分組的 key。例如,要提取所有偶數,可以取第一個 observable(對應於 key 等於 0)並訂閱它:

obs.subscribe(lambda x: print("group key: ", x.key))

這將輸出:

group key:  0
group key:  1

然後,可以使用 take(1) 取得第一個 observable 並訂閱它:

take(1)(obs).subscribe(lambda x: x.subscribe(print))

這將輸出:

0
2

合併 observable

group_by 會產生一個 observable,該 observable 會發出其他 observable。這是一個常見的模式,在 reactive programming 中。有許多函式可以用來合併不同的 observable。

一個有用的工具是 merge_all,它可以取多個 observable 並產生一個單一的 observable,包含所有 observable 的元素,以發出順序排列。以下是 marble diagram:

+----+----+----+
|  A  |  B  |  C  |
+----+----+----+
   |  |  |
   |  |  |
   v  v  v
+----+----+----+
|  A  |  B  |  C  |
+----+----+----+

可以使用 merge_allgroup_by 傳回的 observable 合併成一個單一的 observable:

from rx.operators import merge_all

# 定義一個 observable,包含 0 到 3 的整數
obs = group_by(lambda x: x % 2)(range(4))

# 合併 observable
merged_obs = merge_all()(obs)

# 訂閱 merged_obs 並印出其內容
merged_obs.subscribe(print)

這將輸出:

0
1
2
3

這是因為 merge_all 會合併所有 observable 的元素,以發出順序排列。

並發實作

在前面的章節中,我們學習瞭如何使用 from_ 方法建立可觀察物件。RxPy 提供了更多工具來建立更有趣的事件源。

從底層實作到高階應用的全面檢視顯示,Python 的非同步程式設計能力已日趨成熟,asyncawait 語法讓撰寫非同步程式碼更直觀,而asyncio 以及aiohttp等函式函式庫的發展,更簡化了非同步程式碼的複雜性。透過多維度效能指標的實測分析,非同步程式設計在I/O密集型應用中展現出顯著的效能優勢,尤其在網路請求和資料處理等場景。然而,技術限制深析顯示,處理CPU密集型任務時,非同步程式設計的優勢並不明顯,甚至可能因為上下文切換的開銷而降低效能。將非同步程式設計整合至現有系統的策略和價值,在於提升系統的回應速度和資源利用率,但需仔細評估整合成本和潛在的程式碼複雜度。對於重視效能和可擴充套件性的應用,玄貓認為,Python 的非同步程式設計已成為不可或缺的工具,開發者應積極掌握asyncioaiohttp、RxPY 等函式函式庫的應用技巧,並深入理解反應式程式設計的精髓,才能在實務中充分發揮其效能優勢。未來幾年,隨著更多原生非同步函式函式庫的出現和生態系統的完善,預見Python 非同步程式設計的應用將更加普及,並在更多領域展現其強大潛力。