RxPY 回應式程式設計的核心概念在於處理非同步資料流,其中熱可觀察物件和冷可觀察物件的區別至關重要。冷可觀察物件,例如使用 interval 建立的計時器,只有在訂閱後才會開始運作,每個訂閱者都會觸發獨立的計時器執行。而熱可觀察物件則會持續傳送資料,無論是否有訂閱者,所有訂閱者分享同一資料流。publish 方法能將冷可觀察物件轉換為熱可觀察物件,確保所有訂閱者都能接收到相同的資料序列。理解這兩種觀察者的差異對於構建高效的回應式應用至關重要。

熱可觀察物件和冷可觀察物件

我們可以使用 interval 方法建立一個可觀察物件,它會在指定的時間間隔內發出事件。以下程式碼定義了一個可觀察物件 obs,它會每秒發出一個數字,從 0 開始。我們使用 take 運算子來限制計時器的事件數量:

from rx import interval
from rx.operators import take

obs = take(4)(interval(1))
obs.subscribe(print)

輸出:

0
1
2
3

合併多個可觀察物件

RxPy 還提供了 merge_all 方法,可以用來合併多個可觀察物件。以下程式碼示範瞭如何使用 merge_all 合併多個可觀察物件:

from rx.operators import merge_all
import rx

obs1 = rx.from_iterable([0, 1, 2])
obs2 = rx.from_iterable([3, 4, 5])

merged_obs = merge_all([obs1, obs2])
merged_obs.subscribe(print)

輸出:

0
1
2
3
4
5

個別合併可觀察物件

RxPy 也提供了 merge 方法,可以用來個別合併可觀察物件。以下程式碼示範瞭如何使用 merge 合併兩個可觀察物件:

import rx

obs1 = rx.from_iterable([0, 1, 2])
obs2 = rx.from_iterable([3, 4, 5])

merged_obs = rx.merge(obs1, obs2)
merged_obs.subscribe(print)

輸出:

0
1
2
3
4
5

圖表翻譯:

  flowchart TD
    A[可觀察物件 1] --> B[合併]
    C[可觀察物件 2] --> B
    B --> D[合併後的可觀察物件]
    D --> E[訂閱者]

在這個圖表中,兩個可觀察物件 obs1obs2 被合併成一個新的可觀察物件 merged_obs,然後被訂閱者訂閱。

間隔觀察(Interval Observables)

在 reactive programming 中,間隔觀察(interval)是一種特殊的觀察者,它可以在指定的時間間隔內產生值。然而,間隔觀察有一個重要的特性:計時器只有在訂閱(subscribe)後才會開始運作。

以下是一個示例,使用 time 函式來觀察間隔觀察的行為:

import time
from rx import interval
from rx import operators as ops

start = time.time()
obs = interval(1).pipe(
    ops.map(lambda a: (a, time.time() - start))
)

# 等待 2 秒後再訂閱
time.sleep(2)
obs.pipe(ops.take(4)).subscribe(print)

輸出結果:

(0, 3.003735303878784)
(1, 4.004871129989624)
(2, 5.005947589874268)
(3, 6.00749135017395)

如您所見,第一個元素(對應於索引 0)是在 3 秒後產生的,這意味著計時器在 subscribe 方法被呼叫時才開始運作。

這種觀察者被稱為「懶惰觀察者」(lazy observables),因為它們只有在被請求(即訂閱)時才會開始產生值。這種觀察者也被稱為「冷觀察者」(cold observables),因為它們只有在訂閱後才會開始運作。

如果我們將兩個訂閱者附加到同一個間隔觀察者上,計時器將會被多次啟動。以下是一個示例:

import time
from rx import interval
from rx import operators as ops

obs = interval(1)

# 第一個訂閱者
obs.pipe(ops.take(4)).subscribe(lambda x: print(f"訂閱者 1: {x}"))

# 等待 0.5 秒後再新增第二個訂閱者
time.sleep(0.5)
obs.pipe(ops.take(4)).subscribe(lambda x: print(f"訂閱者 2: {x}"))

輸出結果將顯示兩個訂閱者的輸出時間不同,證明計時器被多次啟動。

時間間隔觀察與訂閱

在觀察者模式中,訂閱者可以接收到觀察者發出的事件或資料。在這個例子中,我們使用 interval 函式建立一個每秒傳送事件的觀察者。

import time
from rx import interval

start = time.time()

# 建立一個每秒傳送事件的觀察者
obs = map(lambda a: (a, time.time() - start))(interval(1))

接下來,我們等待 2 秒鐘後開始訂閱觀察者。

# 等待 2 秒鐘
time.sleep(2)

然後,我們使用 take 函式限制觀察者傳送的事件數量為 4,並訂閱觀察者。

from rx import take

# 訂閱觀察者並列印接收到的事件
take(4)(obs).subscribe(lambda x: print("First subscriber: {}".format(x)))

等待 0.5 秒鐘後,我們再次訂閱觀察者。

# 等待 0.5 秒鐘
time.sleep(0.5)

# 再次訂閱觀察者
take(4)(obs).subscribe(lambda x: print("Second subscriber: {}".format(x)))

這個例子展示了觀察者模式的非同步性質,兩個訂閱者可以在不同的時間點接收到相同的事件。

內容解密:

  • interval(1) 建立一個每秒傳送事件的觀察者。
  • map 函式用於轉換觀察者傳送的事件,增加了事件傳送的時間戳。
  • take(4) 函式限制觀察者傳送的事件數量為 4。
  • subscribe 函式用於訂閱觀察者,接收到事件後列印事件內容。

圖表翻譯:

  flowchart TD
    A[觀察者] --> B[事件傳送]
    B --> C[訂閱者1]
    B --> D[訂閱者2]
    C --> E[列印事件]
    D --> F[列印事件]

這個圖表展示了觀察者模式的基本結構,觀察者傳送事件,多個訂閱者可以接收到事件並進行處理。

瞭解RxPY的publish方法

在RxPY中,publish方法是一種強大的工具,允許我們控制Observable的行為,特別是在多個訂閱者訂閱同一個資料源的情況下。當我們使用publish方法時,Observable會被轉換成ConnectableObservable,這意味著資料的生產會被延遲,直到我們呼叫connect方法。

使用publish和connect

以下是使用publishconnect的範例:

import time
from rx import interval
from rx.operators import publish, take, map

start = time.time()

# 建立一個Observable,傳送間隔1秒的資料
obs = interval(1).pipe(
    map(lambda a: (a, time.time() - start)),
    publish()  # 將Observable轉換成ConnectableObservable
)

# 訂閱者1
take(4)(obs).subscribe(lambda x: print("First subscriber: {}".format(x)))

# 等待2秒
time.sleep(2)

# 訂閱者2
take(4)(obs).subscribe(lambda x: print("Second subscriber: {}".format(x)))

# 啟動資料生產
obs.connect()

在這個範例中,publish方法被用來轉換Observable成ConnectableObservable。然後,兩個訂閱者被建立,分別訂閱同一個資料源。由於publish方法的作用,資料的生產會被延遲,直到connect方法被呼叫。

結果

當我們執行這個範例時,兩個訂閱者都會收到相同的資料。這是因為publish方法保證了所有訂閱者都會收到相同的資料,即使他們在不同的時間點訂閱。

First subscriber: (0, 2.0016899108886719)
First subscriber: (1, 3.0027990341186523)
First subscriber: (2, 4.003532648086548)
First subscriber: (3, 5.0043666848449707)
Second subscriber: (0, 2.0016899108886719)
Second subscriber: (1, 3.0027990341186523)
Second subscriber: (2, 4.003532648086548)
Second subscriber: (3, 5.0043666848449707)

在這個結果中,兩個訂閱者都收到了相同的資料,儘管他們在不同的時間點訂閱。這是publish方法的作用,保證了所有訂閱者都會收到相同的資料。

熱資料源(Hot Data Source)與冷資料源(Cold Data Source)

在回應式程式設計中,資料源可以分為兩種型別:熱資料源(Hot Data Source)和冷資料源(Cold Data Source)。熱資料源是指資料的生產與訂閱者無關,資料會持續生產並傳送給所有訂閱者。另一方面,冷資料源則是指資料的生產與訂閱者有關,當訂閱者訂閱資料源時,資料會從頭開始重新生產。

熱資料源的特性

熱資料源的特性是資料的生產與訂閱者無關,資料會持續生產並傳送給所有訂閱者。這意味著,即使訂閱者晚加入,也會從最後一個資料開始接收。熱資料源常見於實時資料處理、事件驅動系統等應用場景。

冷資料源的特性

冷資料源的特性是資料的生產與訂閱者有關,當訂閱者訂閱資料源時,資料會從頭開始重新生產。這意味著,每個訂閱者都會從頭開始接收資料。冷資料源常見於批次處理、離線計算等應用場景。

RxPY 中的 publishreplay 方法

在 RxPY 中,publish 方法可以用來建立一個熱資料源,而 replay 方法可以用來建立一個冷資料源。publish 方法會將資料源轉換為一個熱資料源,資料會持續生產並傳送給所有訂閱者。replay 方法則會將資料源轉換為一個冷資料源,資料會從頭開始重新生產並傳送給每個訂閱者。

範例

以下範例展示了 publishreplay 方法的使用:

import rx
from rx import operators as ops
import time

# 建立一個間隔 1 秒的資料源
source = rx.interval(1).pipe(
    ops.map(lambda x: (x, time.time() - start))
)

# 使用 publish 方法建立一個熱資料源
hot_source = source.pipe(ops.publish())

# 使用 replay 方法建立一個冷資料源
cold_source = source.pipe(ops.replay())

# 訂閱熱資料源
hot_source.subscribe(lambda x: print("First subscriber:", x))

# 等待 2 秒
time.sleep(2)

# 訂閱熱資料源
hot_source.subscribe(lambda x: print("Second subscriber:", x))

# 訂閱冷資料源
cold_source.subscribe(lambda x: print("First subscriber:", x))

# 等待 2 秒
time.sleep(2)

# 訂閱冷資料源
cold_source.subscribe(lambda x: print("Second subscriber:", x))

在這個範例中,publish 方法用來建立一個熱資料源,資料會持續生產並傳送給所有訂閱者。replay 方法則用來建立一個冷資料源,資料會從頭開始重新生產並傳送給每個訂閱者。

並發性實作

在RxPY中,觀察者(Observer)可以訂閱多個觀察序列(Observable),而觀察序列可以是冷的(Cold)或熱的(Hot)。冷觀察序列只有當有訂閱者時才會開始傳送資料,而熱觀察序列則會在建立時立即開始傳送資料。

熱觀察序列

熱觀察序列可以透過多種方式建立,例如使用rx.subject.Subject類。Subject類是一種特殊的觀察序列,可以同時接收和傳送資料。下面的程式碼示範瞭如何使用Subject類建立一個熱觀察序列:

from rx.subject import Subject

s = Subject()
s.subscribe(lambda x: print("觀察序列傳送值:{}".format(x)))
s.on_next(1)
# 觀察序列傳送值:1
s.on_next(2)
# 觀察序列傳送值:2

在上面的程式碼中,建立了一個Subject例項,並訂閱了它。然後,使用on_next方法手動推播資料到觀察序列。每當推播資料時,訂閱者都會被呼叫。

並發性實作

RxPY提供了多種方式來實作並發性,包括使用rx.concurrency模組。下面的程式碼示範瞭如何使用rx.concurrency模組實作並發性:

import rx
from rx import operators as ops
from rx.concurrency import ThreadPoolScheduler

def intensive_task(x):
    # 模擬一個耗時的任務
    import time
    time.sleep(1)
    return x * 2

scheduler = ThreadPoolScheduler()

rx.from_iterable([1, 2, 3, 4, 5]) \
    .pipe(ops.map(intensive_task)) \
    .subscribe_on(scheduler) \
    .subscribe(lambda x: print("任務完成:{}".format(x)))

在上面的程式碼中,使用rx.concurrency模組建立了一個執行緒池排程器(ThreadPoolScheduler)。然後,使用rx.from_iterable建立了一個觀察序列,並使用ops.map運運算元將每個元素對映到一個耗時的任務。最後,使用subscribe_on方法指定排程器,並訂閱觀察序列。每當任務完成時,訂閱者都會被呼叫。

建立 CPU 監控器

現在我們已經掌握了主要的反應式程式設計概念,我們可以實作一個示例應用:一個監控器,它將提供我們的 CPU 使用率的實時資訊,並且可以檢測尖峰。

第一步:實作資料來源

我們將使用 psutil 模組,它提供了一個函式 psutil.cpu_percent(),傳回最新可用的 CPU 使用率作為百分比(且不會阻塞)。以下是示例程式碼:

import psutil
print(psutil.cpu_percent())

第二步:建立間隔觀察器

由於我們正在開發一個監控器,我們希望在幾個時間間隔內取樣此資訊。為了實作這一點,我們可以使用熟悉的間隔觀察器,然後使用 publish()connect() 方法使其變熱。以下是建立 cpu_data 觀察器的完整程式碼:

import psutil
from rx import interval, map, publish

cpu_data = interval(0.1).pipe(
    map(lambda x: psutil.cpu_percent()),
    publish()
)

第三步:測試監控器

我們可以透過以下方式測試監控器:

from rx import take

take(4)(cpu_data).subscribe(print)

結果

輸出結果將顯示 CPU 使用率的實時資訊:

12.5
5.6
4.5

圖表翻譯

以下是 CPU 監控器的 Mermaid 圖表:

  flowchart TD
    A[開始] --> B[建立資料來源]
    B --> C[建立間隔觀察器]
    C --> D[測試監控器]
    D --> E[顯示結果]

圖表翻譯

此圖表顯示了 CPU 監控器的工作流程。首先,建立資料來源,然後建立間隔觀察器,最後測試監控器並顯示結果。

9.6 反應式程式設計

現在我們的主要資料來源已經就位,我們可以使用 matplotlib 實作監視器視覺化。想法是建立一個包含固定數量測量的圖表,並且當新的資料到達時,我們包含最新的測量並移除最舊的那一個。這通常被稱為移動視窗,可以透過圖示來更好地理解。

以下是移動視窗的圖示:

  flowchart TD
    A[cpu_data] --> B[移動視窗]
    B --> C[圖表]
    C --> D[更新圖表]

圖表翻譯:

這個圖示展示了 cpu_data 的移動視窗如何被轉換成圖表。當新的資料到達時,移動視窗會向前移動,圖表會被更新。

要實作這個演算法,我們可以寫一個叫做 monitor_cpu 的函式,它會建立和更新我們的圖表視窗。這個函式會做以下事情:

  • 初始化一個空的圖表並設定正確的圖表限制。
  • 將我們的 cpu_data 可觀察資料轉換為傳回移動視窗的資料。這可以使用 buffer_with_count 運算子實作,它會取視窗中的點數 npoints 作為引數和 1 作為偏移量。
  • 訂閱這個新的資料流並更新圖表以反映傳入的資料。

以下是 monitor_cpu 函式的完整程式碼:

from rx.operators import buffer_with_count
import numpy as np
import pylab as plt

def monitor_cpu(npoints):
    lines, = plt.plot([], [])
    plt.xlim(0, npoints)
    plt.ylim(0, 100)  # 0 to 100 percent

    cpu_data_window = buffer_with_count(npoints, 1)(cpu_data)

    def update_plot(cpu_readings):
        lines.set_xdata(np.arange(len(cpu_readings)))
        # 更新圖表的 y 資料
        lines.set_ydata(cpu_readings)
        plt.draw()
        plt.pause(0.01)

    cpu_data_window.subscribe(update_plot)
    plt.show()

內容解密:

這個程式碼建立了一個圖表視窗,並使用 buffer_with_count 運算子將 cpu_data 可觀察資料轉換為傳回移動視窗的資料。然後,它訂閱這個新的資料流並更新圖表以反映傳入的資料。圖表的 x 資料是視窗中的點數,y 資料是傳入的 cpu_readings 資料。

即時CPU使用率監控系統

在這個範例中,我們將使用Python和matplotlib函式庫來建立一個即時CPU使用率監控系統。這個系統可以即時顯示CPU使用率,並在使用率超過一定閾值時觸發警告。

系統架構

系統架構如下:

  • CPU使用率資料採集:使用psutil函式庫來採集CPU使用率資料。
  • 資料處理:使用numpy函式庫來處理資料。
  • 即時繪圖:使用matplotlib函式庫來即時繪圖。
  • 警告系統:使用matplotlib函式庫來顯示警告訊息。

程式碼實作

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import psutil

# CPU使用率資料採集
def get_cpu_usage():
    return psutil.cpu_percent()

# 即時繪圖
def update_plot(frame):
    cpu_usage = get_cpu_usage()
    lines.set_ydata([cpu_usage])
    plt.draw()

# 警告系統
def update_warning(is_high):
    if is_high:
        label.set_text("高")
    else:
        label.set_text("正常")

# 主函式
def main():
    global lines, label
    fig, ax = plt.subplots()
    lines, = ax.plot([1], [1])
    ax.set_ylim(0, 100)
    label = plt.text(1, 1, "正常")
    plt.draw()

    # 即時繪圖
    ani = animation.FuncAnimation(fig, update_plot, interval=1000)

    # 警告系統
    def check_cpu_usage():
        cpu_usage = get_cpu_usage()
        if cpu_usage > 20:
            update_warning(True)
        else:
            update_warning(False)

    # 每秒檢查一次CPU使用率
    import threading
    def check_cpu_usage_loop():
        while True:
            check_cpu_usage()
            plt.pause(1)

    threading.Thread(target=check_cpu_usage_loop).start()

    plt.show()

if __name__ == "__main__":
    main()

結果

執行這個程式後,會開啟一個互動式視窗,顯示CPU使用率的即時資料。如果使用率超過20%,會顯示警告訊息。

這個範例展示瞭如何使用Python和matplotlib函式庫來建立一個即時CPU使用率監控系統,並在使用率超過一定閾值時觸發警告。

平行處理與反應式程式設計

平行處理是指使用多個核心或處理器來執行多個任務,以提高程式的執行效率。在這一章中,我們將探討平行處理的基本概念和技術,包括使用多個程式、平行Cython和自動平行化。

平行處理的需求

平行處理是解決大規模問題的必要條件。每天,公司都會產生大量的資料,這些資料需要被儲存和分析在多臺電腦上。科學家和工程師在超級電腦上執行平行程式碼來模擬大規模系統。平行處理允許您利用多核心中央處理器(CPU)和圖形處理器(GPU),這些圖形處理器在高度平行的問題上工作得非常好。

平行程式設計的介紹

要平行化一個程式,需要將問題分解成可以獨立執行的子單元。一個子單元完全獨立於其他子單元的問題被稱為「令人尷尬的平行」(embarrassingly parallel)。例如,對陣列進行元素級別的操作是一個典型的例子,因為這種操作只需要知道它正在處理的元素。另一個例子是粒子模擬器,因為沒有相互作用,每個粒子可以獨立於其他粒子演化。

平行程式設計可以分為兩種:分享記憶體和分散式記憶體。在分享記憶體中,子單元可以存取相同的記憶體空間。這種方法的優點是您不需要明確地處理通訊,因為它足以從分享記憶體中讀取或寫入。然而,當多個程式試圖存取和修改相同的記憶體位置時,問題就會出現。需要使用同步技術來避免這種衝突。

在分散式記憶體模型中,每個程式都是完全獨立的,具有自己的記憶體空間。在這種情況下,通訊是明確地在程式之間處理的。通訊的開銷通常比分享記憶體更昂貴,因為資料可能需要透過網路介面傳輸。

反應式程式設計

反應式程式設計是一種程式設計模型,旨在處理資料流和事件。它非常適合處理實時應用和UI中的資料流。在這一章中,我們將探討反應式程式設計的基本概念和技術,包括使用RxPY函式庫。

RxPY是一個Python函式庫,提供了一種簡單的方式來處理資料流和事件。它允許您使用觀察者模式來處理資料流,從而可以簡化複雜的程式設計任務。

圖表翻譯:

此圖示為RxPY的觀察者模式,使用from_iterable建立一個資料流,然後使用mapfilter運運算元處理資料流,最後使用subscribe方法訂閱資料流。

  flowchart TD
    A[資料流] --> B[觀察者模式]
    B --> C[map運運算元]
    C --> D[filter運運算元]
    D --> E[subscribe方法]
    E --> F[處理結果]

從技術架構視角來看,RxPY 提供了豐富的運運算元,例如 intervalmergepublishbuffer_with_count 等,讓開發者能以簡潔的語法實作複雜的非同步資料流操作。然而,深入剖析其冷熱可觀察物件的差異,可以發現正確區分和運用兩者對於避免非預期行為至關重要。冷可觀察物件如 interval,每個訂閱都會觸發新的資料流,而熱可觀察物件則會分享同一資料流。冷熱可觀察物件的選擇取決於應用場景,例如實時監控系統更適合採用熱可觀察物件。技術團隊應著重理解這些核心差異,才能充分發揮 RxPY 的非同步程式設計優勢。對於追求高效能的系統,建議優先考慮熱可觀察物件,並善用 publishconnect 等方法控制資料流的生命週期。接下來,隨著更多根據 RxPY 的實務案例出現,預期其在 Python 非同步程式設計領域的影響力將持續提升。