現代軟體開發中,非同步處理和效能最佳化至關重要。本文首先介紹 Python 的 concurrent.futures 和 asyncio 函式庫,並示範如何使用它們實作 Future 和 Promise 模式,以有效管理非同步操作。接著,文章探討反應式程式設計中的觀察者模式,以及 ReactiveX 和 Observable 的應用,說明如何處理非同步資料流。最後,文章介紹了 Cache-Aside 效能模式,並以 Python 和 Redis 為例,展示如何實作此模式以提升應用程式效能。
深入理解 Future 與 Promise 模式
在現代軟體開發中,處理非同步操作已成為日常任務。Python 提供了多種方式來實作非同步處理,其中包括 concurrent.futures 模組和 asyncio 函式庫。本文將探討這兩種實作方式。
使用 ThreadPoolExecutor 實作 Future 和 Promise 模式
程式碼實作
from concurrent.futures import ThreadPoolExecutor, as_completed
def square(x):
    """計算數字的平方"""
    return x * x
with ThreadPoolExecutor() as executor:
    future1 = executor.submit(square, 2)
    future2 = executor.submit(square, 3)
    future3 = executor.submit(square, 4)
    futures = [future1, future2, future3]
    for future in as_completed(futures):
        print(f"結果:{future.result()}")
內容解密:
- 定義任務函式:我們首先定義了一個名為 square的函式,用於計算輸入數字的平方。
- 建立 ThreadPoolExecutor:使用 with陳述式建立一個ThreadPoolExecutor物件,該物件會在區塊結束時自動關閉。
- 提交任務:透過 executor.submit方法提交三個任務,分別計算 2、3 和 4 的平方,並獲得對應的Future物件。
- 收集結果:使用 as_completed函式迭代已完成的Future物件,並列印其結果。
使用 asyncio 實作 Future 和 Promise 模式
程式碼實作
import asyncio
async def square(x):
    """模擬 I/O 繫結操作並計算平方"""
    # 模擬 I/O 操作
    await asyncio.sleep(1)
    return x * x
async def main():
    """主函式,建立 Future 物件並收集結果"""
    fut1 = asyncio.ensure_future(square(2))
    fut2 = asyncio.ensure_future(square(3))
    fut3 = asyncio.ensure_future(square(4))
    results = await asyncio.gather(fut1, fut2, fut3)
    for result in results:
        print(f"結果:{result}")
if __name__ == "__main__":
    asyncio.run(main())
內容解密:
- 定義協程:使用 async def定義了一個名為square的協程,模擬 I/O 操作並計算平方。
- 建立 Future 物件:在 main協程中,使用asyncio.ensure_future建立三個Future物件。
- 收集結果:使用 asyncio.gather等待所有Future物件完成並收集結果。
- 執行事件迴圈:透過 asyncio.run(main())執行事件迴圈。
反應式程式設計中的觀察者模式
觀察者模式是一種行為設計模式,用於在物件狀態改變時通知其他物件。然而,在處理多個相互依賴的事件時,傳統的觀察者模式可能會導致程式碼複雜且難以維護。反應式程式設計提供了一種更優雅的解決方案。
ReactiveX 與 Observable
ReactiveX 提供了一種非同步程式設計的 API,根據可觀察的資料流。Observable 可以視為一個不斷流動的資料流,Observer 可以訂閱這個資料流並對接收到的資料做出反應。
真實世界的例子
- 機場航班資訊顯示系統:該系統不斷更新航班狀態,觀察者(旅客、航空公司員工等)可以訂閱這些更新並做出相應的反應。
- 試算表應用程式:當使用者更改某個儲存格的值時,試算表會自動重新計算依賴該儲存格的所有公式並更新顯示。
觀察者模式在反應式程式設計中的應使用案例項
觀察者模式(Observer Pattern)在反應式程式設計(Reactive Programming)中扮演著重要的角色,尤其是在處理非同步資料流時。其中一個典型的應用場景是集合管道(Collection Pipeline)的概念,這是由Martin Fowler在其部落格中提出的。
集合管道模式
集合管道是一種將計算過程組織成一系列操作的程式設計模式,這些操作透過將一個集合作為上一個操作的輸出並將其饋送到下一個操作來組合。在反應式程式設計中,我們可以使用Observable來執行諸如“對映和歸約”(map and reduce)或“分組”(groupby)等操作,以處理資料序列。
觀察者模式的實作
在這個例子中,我們構建了一個根據文字檔(people.txt)的人名串流,並根據此建立了一個Observable。首先,我們定義了一個函式firstnames_from_db(),它從文字檔中傳回一個Observable,並對資料進行了一系列的轉換操作,包括flat_map()、filter()、map()和group_by()。
from pathlib import Path
import reactivex as rx
from reactivex import operators as ops
def firstnames_from_db(path: Path):
    file = path.open()
    # 蒐集並推播儲存的人名首名
    return rx.from_iterable(file).pipe(
        ops.flat_map(
            lambda content: rx.from_iterable(
                content.split(", ")
            )
        ),
        ops.filter(lambda name: name != ""),
        ops.map(lambda name: name.split()[0]),
        ops.group_by(lambda firstname: firstname),
        ops.flat_map(
            lambda grp: grp.pipe(
                ops.count(),
                ops.map(lambda ct: (grp.key, ct)),
            )
        ),
    )
def main():
    db_path = Path(__file__).parent / Path("people.txt")
    # 每5秒發出資料
    rx.interval(5.0).pipe(
        ops.flat_map(lambda i: firstnames_from_db(db_path))
    ).subscribe(lambda val: print(str(val)))
    # 保持執行直到使用者按下任意鍵
    input("啟動中... 按任意鍵並輸入ENTER以離開\n")
#### 內容解密:
1. `firstnames_from_db()`函式開啟指定的文字檔並讀取其內容。
2. 使用`rx.from_iterable()`將檔案內容轉換為Observable。
3. 透過一系列的運算元對Observable進行轉換,包括分割字串、過濾空值、提取首名、分組並計數。
4. 在`main()`函式中,建立了一個每5秒發出資料的Observable,並將其與`firstnames_from_db()`傳回的Observable合併。
5. 訂閱合併後的Observable並列印發出的值。
### 處理新的資料流
為了使這個例子更加動態,我們使用了Faker這個第三方模組來生成假資料。透過執行`peoplelist.py`指令碼,可以生成新的假人名並追加到`people.txt`檔案中。當重新執行`rx_peoplelist.py`時,它會讀取更新後的檔案並發出新的資料。
#### 生成假資料的指令碼
```python
from faker import Faker
import sys
fake = Faker()
args = sys.argv[1:]
if len(args) == 1:
    output_filename = args[0]
    persons = []
    for _ in range(0, 20):
        p = {"firstname": fake.first_name(), "lastname": fake.last_name()}
        persons.append(p)
    persons = iter(persons)
    data = [f"{p['firstname']} {p['lastname']}" for p in persons]
    data = ", ".join(data) + ", "
    with open(output_filename, "a") as f:
        f.write(data)
else:
    print("您需要傳遞輸出檔案路徑!")
#### 內容解密:
1. 使用Faker模組生成假的人名資料。
2. 將生成的資料寫入指定的輸出檔案中。
3. 每次執行指令碼都會追加新的資料到檔案中。
### 其他並發和非同步模式
除了觀察者模式外,還有其他一些並發和非同步模式可供開發者使用,例如Actor模型。Actor模型是一種處理並發計算的概念模型,它定義了actor例項應該如何行為的規則:一個actor可以做出本地決策、建立更多actor、傳送更多訊息,並決定如何回應收到的下一個訊息。
## 效能模式探討
在前一章中,我們討論了並發和非同步模式,這些模式對於編寫能夠高效處理多工的軟體至關重要。接下來,我們將探討特定的效能模式,這些模式能夠提升應用程式的速度和資源利用率。
效能模式旨在解決常見的瓶頸和最佳化挑戰,為開發者提供經過驗證的方法來改善執行時間、減少記憶體使用並實作有效擴充套件。
本章節主要涵蓋以下主題:
* Cache-Aside 模式
* Memoization 模式
* Lazy Loading 模式
### 技術需求
請參閱第1章節中提到的需求。本章節中討論的程式碼額外技術需求如下:
* 使用以下命令將 Faker 模組新增到您的 Python 環境中:`python -m pip install faker`
* 使用以下命令將 Redis 模組新增到您的 Python 環境中:`python -m pip install redis`
* 安裝 Redis 伺服器並使用 Docker 執行:`docker run --name myredis -p 6379:6379 redis`
如有需要,請參考 https://redis.io/docs/latest/ 的檔案。
## Cache-Aside 模式
在資料讀取頻率遠高於更新頻率的情況下,應用程式通常會使用快取來最佳化對儲存在資料函式庫或資料儲存中的資訊的重複存取。某些系統中,這種快取機制是內建的並自動運作。當這種情況不存在時,我們必須在應用程式中自行實施適合特定使用案例的快取策略。
其中一種策略稱為 Cache-Aside,透過將頻繁存取的資料儲存在快取中,減少了重複從資料儲存中取得資料的需求,從而提高效能。
### 真實世界範例
我們可以在軟體領域中參照以下範例:
* Memcached 常用作快取伺服器。它是一種流行的記憶體鍵值儲存,用於儲存資料函式庫呼叫、API 呼叫或 HTML 頁面內容的結果等小型資料塊。
* Redis 是另一種用作快取的伺服器解決方案。目前,它是我用於快取或應用程式記憶體儲存使用案例的首選伺服器,因為它表現出色。
* 根據 Amazon 的檔案網站(https://docs.aws.amazon.com/elasticache/),Amazon 的 ElastiCache 是一項 Web 服務,使得在雲中設定、管理和擴充套件分散式記憶體資料儲存或快取環境變得容易。
### Cache-Aside 模式的使用案例
當我們需要減少應用程式中的資料函式庫負載時,Cache-Aside 模式非常有用。透過快取頻繁存取的資料,可以減少傳送到資料函式庫的查詢次數。它還有助於提高應用程式的回應速度,因為可以更快地檢索快取資料。
請注意,此模式適用於不經常更改的資料,以及不依賴於儲存中一組條目的完整性(多個鍵)的資料儲存。例如,它可能適用於某些型別的檔案儲存或資料函式庫,其中鍵永遠不會被更新,偶爾會刪除資料條目,但沒有強烈的需求在一段時間內繼續提供它們(直到快取被重新整理)。
### 實施 Cache-Aside 模式
我們可以總結實施 Cache-Aside 模式所需的步驟,包括資料函式庫和快取,如下所示:
* 情況1 – 當我們想要取得一個資料項時:如果在快取中找到該項,則從快取中傳回該項。如果在快取中未找到,則從資料函式庫中讀取資料。將我們獲得的項放入快取中並傳回它。
#### 程式碼範例:
```python
import redis
# 連線到 Redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_data(key):
    # 首先檢查快取
    cached_data = redis_client.get(key)
    if cached_data is not None:
        # 快取中有資料,直接傳回
        return cached_data.decode('utf-8')
    else:
        # 快取中無資料,從資料函式庫取得
        data = fetch_data_from_database(key)
        # 將資料放入快取
        redis_client.set(key, data)
        return data
def fetch_data_from_database(key):
    # 模擬從資料函式庫取得資料
    return f"Data for {key}"
# 示例呼叫
print(get_data("example_key"))
內容解密:
此程式碼範例演示瞭如何實施 Cache-Aside 模式。首先,我們嘗試從 Redis 快取中取得資料。如果資料存在於快取中,則直接傳回。否則,我們從資料函式庫中取得資料,將其存入快取,並傳回資料。此過程減少了對資料函式庫的查詢次數,提高了應用程式的效能。
- 連線到 Redis:使用 redis.Redis連線到本地執行的 Redis 伺服器。
- get_data函式:檢查指定鍵的資料是否在 Redis 快取中。如果是,則傳回快取中的資料;否則,從資料函式庫取得資料,並將其存入 Redis 快取。
- fetch_data_from_database函式:模擬從資料函式庫取得資料的操作。
- 示例呼叫:呼叫 get_data函式來取得指定鍵的資料,展示了 Cache-Aside 模式的工作流程。
此範例清晰展示了 Cache-Aside 模式如何透過減少對資料函式庫的直接存取來提高應用程式的效率。
 
            