Python 的非同步程式設計能力近年來日益重要,尤其在網路應用、資料處理等領域。本文將深入探討 Python 非同步程式設計的各種技巧,從協程的基礎概念到 asyncio 模組的應用,再到 RxPY 框架的反應式程式設計正規化,帶領讀者逐步掌握 Python 非同步程式設計的精髓。協程是 Python 非同步程式設計的基本,透過 yield
和 send
關鍵字,可以實作協程之間的資料交換與控制流程。asyncio 模組提供更進階的非同步程式設計工具,例如事件迴圈、任務排程和非阻塞網路請求,讓開發者能更有效率地處理 I/O 密集型任務。最後,RxPY 框架引入了反應式程式設計的正規化,透過可觀察序列和運運算元,以更簡潔、宣告式的方式處理非同步資料流。
基本概念
要建立一個協程,需要使用 yield
關鍵字將值從協程中產生出來。要向協程傳送值,可以使用 send
方法。這樣,協程就可以接收和處理值。
使用 yield
和 send
的協程
以下是一個簡單的協程範例:
def parrot():
while True:
message = yield
print(message)
gen = parrot()
gen.send(None) # 啟動協程
gen.send("Hello")
gen.send("World")
在這個範例中,parrot
函式是一個協程,它使用 yield
關鍵字接收值。要啟動協程,需要先傳送 None
,然後就可以傳送其他值。
使用 asyncio
的非同步程式設計
Python 的 asyncio
模組提供了一種更直觀的方式來定義協程。使用 async def
關鍵字可以定義一個協程:
import asyncio
async def hello():
print("Hello, async!")
coro = hello()
print(coro) # 輸出:<coroutine object hello at 0x...>
在這個範例中,hello
函式是一個協程,它使用 async def
關鍵字定義。當呼叫 hello
函式時,傳回的是一個協程物件。
執行協程
要執行協程,需要使用 asyncio
的事件迴圈(event loop)。以下是一個簡單的範例:
import asyncio
async def hello():
print("Hello, async!")
loop = asyncio.get_event_loop()
coro = hello()
loop.run_until_complete(coro)
在這個範例中,使用 asyncio.get_event_loop
函式取得事件迴圈物件,然後使用 run_until_complete
方法執行協程。
非同步程式設計的美麗:Native Coroutines
Native Coroutines 是使用 async def
陳述式定義的協程。這種協程可以使用 await
語法等待資源,讓程式設計師可以撰寫乾淨、簡潔的非同步程式碼。
使用 asyncio.sleep 函式
例如,我們可以使用 asyncio.sleep
函式讓程式暫停一段時間,然後執行某個動作。以下是範例程式碼:
import asyncio
async def wait_and_print(msg):
await asyncio.sleep(1)
print("Message: ", msg)
loop = asyncio.get_event_loop()
loop.run_until_complete(wait_and_print("Hello"))
這段程式碼使用 asyncio.sleep
函式暫停 1 秒,然後印出 “Message: Hello”。
事件迴圈的斷點
await
語法提供了事件迴圈的斷點,讓事件迴圈可以在等待資源的同時,管理其他協程。這使得協程可以彼此之間進行非同步的溝通。
協程的串接
協程也可以使用 await
語法串接在一起,讓程式設計師可以撰寫複雜的非同步程式碼。以下是範例程式碼:
async def network_request(number):
await asyncio.sleep(1.0)
return {"success": True, "result": number ** 2}
async def main():
result = await network_request(10)
print(result)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
這段程式碼使用 asyncio.sleep
函式模擬網路請求,然後使用 await
語法串接 network_request
協程和 main
協程。
內容解密:
async def
陳述式用於定義協程。await
語法用於等待資源。asyncio.sleep
函式用於暫停程式執行。- 協程可以使用
await
語法串接在一起。
圖表翻譯:
flowchart TD A[開始] --> B[等待資源] B --> C[執行動作] C --> D[傳回結果] D --> E[結束]
這個圖表展示了協程的執行流程,包括等待資源、執行動作和傳回結果。
208 實作並發
要實作並發,玄貓可以使用 asyncio
的 ensure_future
函式。這個函式可以排程協程(coroutine)和期物(future)以便執行。以下是使用 ensure_future
的範例:
import asyncio
async def fetch_square(number):
# 模擬網路請求
response = await network_request(number)
if response["success"]:
print("結果是:{}".format(response["result"]))
# 排程多個任務
asyncio.ensure_future(fetch_square(2))
asyncio.ensure_future(fetch_square(3))
asyncio.ensure_future(fetch_square(4))
# 啟動事件迴圈
loop = asyncio.get_event_loop()
loop.run_forever()
在這個範例中,fetch_square
是一個協程,負責模擬網路請求。使用 ensure_future
函式可以排程多個 fetch_square
任務,以便並發執行。
使用 run_until_complete
執行任務
如果您想要執行單一任務,可以使用 run_until_complete
函式。以下是範例:
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_square(2))
loop.run_until_complete(fetch_square(3))
loop.run_until_complete(fetch_square(4))
但是,這種方法不適合於並發執行多個任務。
使用 ensure_future
的好處
使用 ensure_future
的好處是可以在事件迴圈已經啟動的情況下提交新的任務。這在實際應用中非常有用,因為我們通常需要在事件迴圈已經啟動的情況下提交新的任務。
圖表翻譯:
flowchart TD A[啟動事件迴圈] --> B[提交任務] B --> C[執行任務] C --> D[完成任務] D --> E[提交新任務] E --> B
這個圖表展示了事件迴圈的執行流程。首先啟動事件迴圈,然後提交任務,執行任務,完成任務,最後提交新任務。這個流程可以不斷重複,以便實作高效的並發執行。
非阻塞程式設計與 asyncio
在 Python 中,asyncio 是一個強大的工具,讓我們可以輕易地實作非阻塞的程式設計。然而,在某些情況下,我們可能需要使用阻塞的 API 或執行長時間的計算。這個時候,我們需要找到方法將阻塞的程式碼轉換為非阻塞的程式碼。
處理阻塞程式碼
有一個有效的策略是將阻塞的程式碼執行在一個獨立的執行緒中。執行緒是由作業系統實作的,允許我們平行執行阻塞的程式碼。Python 提供了 Executor
介面來執行任務在一個獨立的執行緒中,並使用 Future
來監控其進度。
使用 ThreadPoolExecutor
我們可以使用 ThreadPoolExecutor
來初始化一個執行緒池。執行緒池會產生一組執行緒(稱為工作者),這些工作者會等待執行任務。一旦提交了一個函式,執行緒池會負責將其分派給一個可用的工作者執行緒,並追蹤結果。max_workers
引數可以用來選擇執行緒的數量。
import time
from concurrent.futures import ThreadPoolExecutor
# 初始化執行緒池
executor = ThreadPoolExecutor(max_workers=3)
def wait_and_return(msg):
# 阻塞 1 秒
time.sleep(1)
return msg
# 提交任務
future = executor.submit(wait_and_return, "Hello, World!")
print(future) # <Future at 0x7ff616ff6748 state=running>
結果
在上面的例子中,我們建立了一個 ThreadPoolExecutor
例項,具有 3 個工作者執行緒。然後,我們提交了一個 wait_and_return
函式,該函式會阻塞 1 秒並傳回一個訊息字串。最後,我們使用 submit
方法來安排其執行。
非同步執行與並發實作
在 Python 中,非同步執行和並發可以使用 asyncio
和 concurrent.futures
模組來實作。以下是使用 loop.run_in_executor
方法來管理任務執行的範例。
使用 loop.run_in_executor
方法
loop.run_in_executor
方法可以用來將任務提交到執行器(executor)中執行。這個方法會傳回一個 asyncio.Future
例項,可以用來等待任務完成。
import asyncio
# 建立一個執行器
executor = asyncio.get_event_loop().run_in_executor(None, lambda: None)
# 定義一個同步函式
def wait_and_return(msg):
# 等待 1 秒
import time
time.sleep(1)
return msg
# 提交任務到執行器
loop = asyncio.get_event_loop()
fut = loop.run_in_executor(None, wait_and_return, "Hello, asyncio executor")
# 等待任務完成
result = loop.run_until_complete(fut)
print(result) # 輸出:Hello, asyncio executor
實作並發網頁抓取
以下是使用 loop.run_in_executor
方法來實作並發網頁抓取的範例。
import asyncio
import requests
# 建立一個執行器
loop = asyncio.get_event_loop()
# 定義一個非同步函式
async def fetch_urls(urls):
responses = []
for url in urls:
# 提交任務到執行器
response = await loop.run_in_executor(None, requests.get, url)
responses.append(response)
return responses
# 定義要抓取的網頁 URL
urls = ["http://example.com", "http://www.python.org"]
# 執行非同步函式
responses = loop.run_until_complete(fetch_urls(urls))
# 列印抓取的網頁內容
for response in responses:
print(response.text)
在這個範例中,fetch_urls
函式會提交多個任務到執行器中執行,每個任務都會抓取一個網頁的內容。然後,loop.run_until_complete
方法會等待所有任務完成,並傳回抓取的網頁內容。
非阻塞式網路請求與反應式程式設計
在前面的章節中,我們討論瞭如何使用 Python 的 asyncio 函式函式庫實作非阻塞式網路請求。然而,當面對大量的網路請求時,使用 asyncio.gather 來提交所有的 coroutine 並收集結果可能不是最有效的方法。這是因為 asyncio.gather 會將所有的 coroutine 提交給事件迴圈,然後等待所有的結果傳回。
使用 aiohttp 實作非阻塞式網路請求
為了避免這個限制,我們可以使用一個原生非阻塞式的函式函式庫,如 aiohttp。aiohttp 是一個根據 asyncio 的 HTTP 客戶端和伺服器函式函式庫,允許我們實作非阻塞式的網路請求。
import aiohttp
import asyncio
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = ["http://example.com", "http://example.org"]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(main())
反應式程式設計
反應式程式設計是一種程式設計正規化,旨在建立更好的並發系統。反應式應用程式設計目的是符合以下要求:
- 回應式:系統立即回應使用者的輸入。
- 彈性:系統可以處理不同級別的負載,並可以適應增加的需求。
- 容錯:系統優雅地處理故障。
- 根據訊息:系統不應該阻塞,並且應該利用事件和訊息。
使用 RxPY 實作反應式程式設計
RxPY 是一個 Python 的反應式程式設計函式函式庫,提供了一個簡單的方式來實作反應式程式設計。以下是使用 RxPY 實作一個簡單的反應式程式設計範例:
import rx
from rx import operators as ops
# 建立一個可觀察序列
observable = rx.from_iterable([1, 2, 3, 4, 5])
# 訂閱可觀察序列
observable.pipe(
ops.map(lambda x: x * 2),
ops.filter(lambda x: x > 4)
).subscribe(
on_next=lambda x: print(f"接收到值: {x}"),
on_error=lambda e: print(f"發生錯誤: {e}"),
on_completed=lambda: print("完成")
)
在這個範例中,我們建立了一個可觀察序列,然後使用 map
和 filter
運算子來轉換和過濾值。最後,我們訂閱了可觀察序列,並處理接收到的值、錯誤和完成事件。
瞭解 Reactive Programming 的基礎
Reactive Programming 是一種程式設計風格,主要思想是對事件做出反應。在前面的章節中,我們已經看到了一些使用回呼函式(callbacks)的例子,當事件發生時,回呼函式會被執行。在 Reactive Programming 中,這個思想被擴充套件到事件被視為資料流的概念。
資料流的概念
資料流可以從一個迭代器(iterator)中建立。例如,使用 from_iterable
方法可以從一個範圍(range)建立一個資料流:
from rx import from_iterable
obs = from_iterable(range(4))
訂閱資料流
要接收資料流中的資料,我們可以使用 subscribe
方法,並傳入一個函式,這個函式會在資料源發出每個值時被執行:
obs.subscribe(print)
這會輸出:
0
1
2
3
觀察者(Observables)和迭代器(Iterators)
觀察者和迭代器都是有序的集合,類似於列表或其他迭代器。觀察者這個術語來自於觀察者(observer)和迭代器(iterable)的組合。觀察者是一個對變數的變化做出反應的物件,而迭代器是一個可以產生和跟蹤迭代器的物件。
在 Python 中,迭代器是定義了 __next__
方法的物件,其元素可以透過 next
函式或 for
迴圈提取。一次迭代器可以被取得,然後我們可以使用 next
函式或 for
迴圈提取元素。當一個元素被消費後,就不能回頭了。
示例:使用迭代器
下面是使用迭代器的示例:
collection = list([1, 2, 3, 4, 5])
iterator = iter(collection)
print("Next")
print(next(iterator)) # 輸出: 1
print(next(iterator)) # 輸出: 2
這個示例展示瞭如何從一個列表中建立一個迭代器,並使用 next
函式提取元素。
圖表翻譯:
graph LR A[資料流] --> B[觀察者] B --> C[迭代器] C --> D[元素提取] D --> E[消費元素] E --> F[不能回頭]
這個圖表展示了資料流、觀察者、迭代器、元素提取、消費元素和不能回頭的關係。
反應式程式設計與迭代器
在探討反應式程式設計的世界時,瞭解迭代器和生成器的差異是非常重要的。迭代器是一種可以用於迴圈遍歷序列的物件,它允許我們一次取出序列中的一個元素。生成器則是一種特殊的迭代器,它可以在執行過程中動態生成序列中的元素。
迭代器的使用
下面是一個簡單的迭代器範例:
iterator = iter([1, 2, 3, 4, 5])
print(next(iterator)) # 輸出: 1
print(next(iterator)) # 輸出: 2
print("For loop")
for i in iterator:
print(i) # 輸出: 3, 4, 5
在這個範例中,next()
函式用於從迭代器中取出下一個元素。當我們使用 for
迴圈時,迭代器也會自動地產生序列中的元素。
迭代器與生成器的差異
雖然迭代器和生成器都可以用於迴圈遍歷序列,但是它們之間存在著一些差異。生成器是一種特殊的迭代器,它可以在執行過程中動態生成序列中的元素。換句話說,生成器可以在執行過程中產生新的元素,而迭代器只能從既有的序列中取出元素。
觀察者模式
觀察者模式是一種設計模式,它允許物件在狀態改變時通知其他物件。觀察者模式可以用於實作反應式程式設計的功能。觀察者可以在資料源中註冊,當資料源的狀態改變時,觀察者會收到通知。
下面是一個簡單的觀察者模式範例:
import rx
from rx import operators as ops
# 建立一個觀察者
observable = rx.from_iterable([1, 2, 3, 4, 5])
# 註冊回呼函式
observable.subscribe(
on_next=lambda i: print(f"接收到資料: {i}"),
on_error=lambda e: print(f"發生錯誤: {e}"),
on_completed=lambda: print("資料傳輸完成")
)
在這個範例中,rx.from_iterable()
函式用於建立一個觀察者,subscribe()
方法用於註冊回呼函式。當觀察者收到資料時,會呼叫 on_next()
回呼函式;當觀察者發生錯誤時,會呼叫 on_error()
回呼函式;當觀察者完成資料傳輸時,會呼叫 on_completed()
回呼函式。
圖表翻譯:
graph LR A[資料源] -->|註冊|> B[觀察者] B -->|接收資料|> C[回呼函式] C -->|處理資料|> D[結果] D -->|通知觀察者|> B
在這個圖表中,資料源註冊觀察者,觀察者接收資料並呼叫回呼函式,回呼函式處理資料並產生結果,結果通知觀察者。這個過程實作了觀察者模式的功能。
RxPy 的觀察者模式和迭代器的相似性
RxPy 的觀察者模式和迭代器有著相似的特性。觀察者模式可以使用 on_next
函式來處理下一個可用的專案,使用 on_completed
函式來處理沒有更多資料的情況。以下是一個範例:
from rx import from_iterable
obs = from_iterable(range(4))
obs.subscribe(
on_next=lambda x: print("Next item:", x),
on_completed=lambda: print("No more data")
)
這個範例會輸出:
Next item: 0
Next item: 1
Next item: 2
Next item: 3
No more data
RxPy 的運運算元
RxPy 提供了許多運運算元來建立、轉換、過濾和分組觀察者。這些運運算元傳回新的觀察者,可以方便地連結和組合在一起。一個簡單的範例是使用 take
運運算元。
from rx import from_iterable
from rx.operators import take
op = take(4)
obs = from_iterable(range(1000))
op(obs).subscribe(print)
這個範例會輸出前 4 個專案:
0
1
2
3
take
運運算元會傳回一個新的觀察者,在接收到 4 個專案後停止。這個範例展示了 RxPy 的觀察者模式和迭代器的相似性,以及 RxPy 的運運算元如何方便地連結和組合在一起。
反應式程式設計中的運運算元
反應式程式設計(Reactive Programming)是一種強大的程式設計模型,允許開發人員以宣告式的方式處理非同步資料流。在 RxPy 中,運運算元(Operators)是用於轉換和操作可觀察序列(Observables)的基本單位。
有用的運運算元
在本節中,我們將探討一些常用的運運算元,它們可以用於轉換和操作可觀察序列中的元素。其中最著名的運運算元是 map
,它將一個函式應用於可觀察序列中的每個元素,並傳回一個新的可觀察序列。
例如,我們可以使用 map
運運算元計算一系列數字的平方:
from rx.operators import map
numbers = [1, 2, 3, 4]
squared_numbers = map(lambda x: x**2)(numbers)
print(list(squared_numbers)) # [1, 4, 9, 16]
運運算元可以使用大理石圖(Marble Diagrams)來表示,這有助於我們更好地理解運運算元的工作原理,尤其是在考慮到元素可以在一段時間內發射的情況下。
大理石圖
在大理石圖中,一個資料流(在我們的例子中是一個可觀察序列)由一條水平線表示。一個圓圈(或其他形狀)代表一個發射的值,一個 X 符號代表一個錯誤,而一條垂直線代表資料流的結束。
以下是 map
運運算元的大理石圖:
+-----------+
| Source |
+-----------+
|
|
v
+-----------+
| Transformation |
+-----------+
|
|
v
+-----------+
| Result |
+-----------+
另一個轉換運運算元的例子是 group_by
,它根據一個鍵將專案分組。group_by
運運算元可以用於動態地將專案分組,當專案被發射時。
分組運運算元
以下是 group_by
運運算元的大理石圖:
+-----------+
| Source |
+-----------+
|
|
v
+-----------+
| Grouping |
+-----------+
|
|
v
+-----------+
| Result |
+-----------+
假設我們想要根據一個數字是偶數還是奇數來分組。 我們可以使用 group_by
運運算元來實作這個功能:
from rx.operators import group_by
numbers = [1, 2, 3, 4, 5, 6]
grouped_numbers = group_by(lambda x: x % 2 == 0)(numbers)
print(list(grouped_numbers)) # [(True, [2, 4, 6]), (False, [1, 3, 5])]
在這個例子中,group_by
運運算元根據一個數字是偶數還是奇數來分組,並傳回一個新的可觀察序列,其中包含分組的結果。
內容解密:
在這個例子中,我們使用 map
和 group_by
運運算元來轉換和操作可觀察序列中的元素。 map
運運算元將一個函式應用於每個元素,並傳回一個新的可觀察序列。 group_by
運運算元根據一個鍵將專案分組,並傳回一個新的可觀察序列,其中包含分組的結果。
圖表翻譯:
以下是 map
和 group_by
運運算元的大理石圖:
+-----------+
| Source |
+-----------+
|
|
v
+-----------+
| Transformation |
+-----------+
|
|
v
+-----------+
| Result |
+-----------+
這個圖表顯示了 map
和 group_by
運運算元的工作原理,包括轉換和分組的過程。
使用 group_by
進行資料分組
在 reactive programming 中,group_by
是一個非常有用的 operator,可以根據某個條件將資料分組。以下是使用 group_by
的範例:
from rx.operators import group_by
# 定義一個 observable,包含 0 到 3 的整數
obs = group_by(lambda x: x % 2)(range(4))
# 訂閱 obs 並印出其內容
obs.subscribe(print)
這將輸出兩個 observable,分別對應於偶數和奇數。
提取分組資料
要提取分組資料,可以使用 key
屬性來確定分組的 key。例如,要提取所有偶數,可以取第一個 observable(對應於 key 等於 0)並訂閱它:
obs.subscribe(lambda x: print("group key: ", x.key))
這將輸出:
group key: 0
group key: 1
然後,可以使用 take(1)
取得第一個 observable 並訂閱它:
take(1)(obs).subscribe(lambda x: x.subscribe(print))
這將輸出:
0
2
合併 observable
group_by
會產生一個 observable,該 observable 會發出其他 observable。這是一個常見的模式,在 reactive programming 中。有許多函式可以用來合併不同的 observable。
一個有用的工具是 merge_all
,它可以取多個 observable 並產生一個單一的 observable,包含所有 observable 的元素,以發出順序排列。以下是 marble diagram:
+----+----+----+
| A | B | C |
+----+----+----+
| | |
| | |
v v v
+----+----+----+
| A | B | C |
+----+----+----+
可以使用 merge_all
將 group_by
傳回的 observable 合併成一個單一的 observable:
from rx.operators import merge_all
# 定義一個 observable,包含 0 到 3 的整數
obs = group_by(lambda x: x % 2)(range(4))
# 合併 observable
merged_obs = merge_all()(obs)
# 訂閱 merged_obs 並印出其內容
merged_obs.subscribe(print)
這將輸出:
0
1
2
3
這是因為 merge_all
會合併所有 observable 的元素,以發出順序排列。
並發實作
在前面的章節中,我們學習瞭如何使用 from_
方法建立可觀察物件。RxPy 提供了更多工具來建立更有趣的事件源。
從底層實作到高階應用的全面檢視顯示,Python 的非同步程式設計能力已日趨成熟,async
和 await
語法讓撰寫非同步程式碼更直觀,而asyncio
以及aiohttp
等函式函式庫的發展,更簡化了非同步程式碼的複雜性。透過多維度效能指標的實測分析,非同步程式設計在I/O密集型應用中展現出顯著的效能優勢,尤其在網路請求和資料處理等場景。然而,技術限制深析顯示,處理CPU密集型任務時,非同步程式設計的優勢並不明顯,甚至可能因為上下文切換的開銷而降低效能。將非同步程式設計整合至現有系統的策略和價值,在於提升系統的回應速度和資源利用率,但需仔細評估整合成本和潛在的程式碼複雜度。對於重視效能和可擴充套件性的應用,玄貓認為,Python 的非同步程式設計已成為不可或缺的工具,開發者應積極掌握asyncio
、aiohttp
、RxPY 等函式函式庫的應用技巧,並深入理解反應式程式設計的精髓,才能在實務中充分發揮其效能優勢。未來幾年,隨著更多原生非同步函式函式庫的出現和生態系統的完善,預見Python 非同步程式設計的應用將更加普及,並在更多領域展現其強大潛力。