RxPY 回應式程式設計的核心概念在於處理非同步資料流,其中熱可觀察物件和冷可觀察物件的區別至關重要。冷可觀察物件,例如使用 interval
建立的計時器,只有在訂閱後才會開始運作,每個訂閱者都會觸發獨立的計時器執行。而熱可觀察物件則會持續傳送資料,無論是否有訂閱者,所有訂閱者分享同一資料流。publish
方法能將冷可觀察物件轉換為熱可觀察物件,確保所有訂閱者都能接收到相同的資料序列。理解這兩種觀察者的差異對於構建高效的回應式應用至關重要。
熱可觀察物件和冷可觀察物件
我們可以使用 interval
方法建立一個可觀察物件,它會在指定的時間間隔內發出事件。以下程式碼定義了一個可觀察物件 obs
,它會每秒發出一個數字,從 0 開始。我們使用 take
運算子來限制計時器的事件數量:
from rx import interval
from rx.operators import take
obs = take(4)(interval(1))
obs.subscribe(print)
輸出:
0
1
2
3
合併多個可觀察物件
RxPy 還提供了 merge_all
方法,可以用來合併多個可觀察物件。以下程式碼示範瞭如何使用 merge_all
合併多個可觀察物件:
from rx.operators import merge_all
import rx
obs1 = rx.from_iterable([0, 1, 2])
obs2 = rx.from_iterable([3, 4, 5])
merged_obs = merge_all([obs1, obs2])
merged_obs.subscribe(print)
輸出:
0
1
2
3
4
5
個別合併可觀察物件
RxPy 也提供了 merge
方法,可以用來個別合併可觀察物件。以下程式碼示範瞭如何使用 merge
合併兩個可觀察物件:
import rx
obs1 = rx.from_iterable([0, 1, 2])
obs2 = rx.from_iterable([3, 4, 5])
merged_obs = rx.merge(obs1, obs2)
merged_obs.subscribe(print)
輸出:
0
1
2
3
4
5
圖表翻譯:
flowchart TD A[可觀察物件 1] --> B[合併] C[可觀察物件 2] --> B B --> D[合併後的可觀察物件] D --> E[訂閱者]
在這個圖表中,兩個可觀察物件 obs1
和 obs2
被合併成一個新的可觀察物件 merged_obs
,然後被訂閱者訂閱。
間隔觀察(Interval Observables)
在 reactive programming 中,間隔觀察(interval)是一種特殊的觀察者,它可以在指定的時間間隔內產生值。然而,間隔觀察有一個重要的特性:計時器只有在訂閱(subscribe)後才會開始運作。
以下是一個示例,使用 time
函式來觀察間隔觀察的行為:
import time
from rx import interval
from rx import operators as ops
start = time.time()
obs = interval(1).pipe(
ops.map(lambda a: (a, time.time() - start))
)
# 等待 2 秒後再訂閱
time.sleep(2)
obs.pipe(ops.take(4)).subscribe(print)
輸出結果:
(0, 3.003735303878784)
(1, 4.004871129989624)
(2, 5.005947589874268)
(3, 6.00749135017395)
如您所見,第一個元素(對應於索引 0)是在 3 秒後產生的,這意味著計時器在 subscribe
方法被呼叫時才開始運作。
這種觀察者被稱為「懶惰觀察者」(lazy observables),因為它們只有在被請求(即訂閱)時才會開始產生值。這種觀察者也被稱為「冷觀察者」(cold observables),因為它們只有在訂閱後才會開始運作。
如果我們將兩個訂閱者附加到同一個間隔觀察者上,計時器將會被多次啟動。以下是一個示例:
import time
from rx import interval
from rx import operators as ops
obs = interval(1)
# 第一個訂閱者
obs.pipe(ops.take(4)).subscribe(lambda x: print(f"訂閱者 1: {x}"))
# 等待 0.5 秒後再新增第二個訂閱者
time.sleep(0.5)
obs.pipe(ops.take(4)).subscribe(lambda x: print(f"訂閱者 2: {x}"))
輸出結果將顯示兩個訂閱者的輸出時間不同,證明計時器被多次啟動。
時間間隔觀察與訂閱
在觀察者模式中,訂閱者可以接收到觀察者發出的事件或資料。在這個例子中,我們使用 interval
函式建立一個每秒傳送事件的觀察者。
import time
from rx import interval
start = time.time()
# 建立一個每秒傳送事件的觀察者
obs = map(lambda a: (a, time.time() - start))(interval(1))
接下來,我們等待 2 秒鐘後開始訂閱觀察者。
# 等待 2 秒鐘
time.sleep(2)
然後,我們使用 take
函式限制觀察者傳送的事件數量為 4,並訂閱觀察者。
from rx import take
# 訂閱觀察者並列印接收到的事件
take(4)(obs).subscribe(lambda x: print("First subscriber: {}".format(x)))
等待 0.5 秒鐘後,我們再次訂閱觀察者。
# 等待 0.5 秒鐘
time.sleep(0.5)
# 再次訂閱觀察者
take(4)(obs).subscribe(lambda x: print("Second subscriber: {}".format(x)))
這個例子展示了觀察者模式的非同步性質,兩個訂閱者可以在不同的時間點接收到相同的事件。
內容解密:
interval(1)
建立一個每秒傳送事件的觀察者。map
函式用於轉換觀察者傳送的事件,增加了事件傳送的時間戳。take(4)
函式限制觀察者傳送的事件數量為 4。subscribe
函式用於訂閱觀察者,接收到事件後列印事件內容。
圖表翻譯:
flowchart TD A[觀察者] --> B[事件傳送] B --> C[訂閱者1] B --> D[訂閱者2] C --> E[列印事件] D --> F[列印事件]
這個圖表展示了觀察者模式的基本結構,觀察者傳送事件,多個訂閱者可以接收到事件並進行處理。
瞭解RxPY的publish方法
在RxPY中,publish
方法是一種強大的工具,允許我們控制Observable的行為,特別是在多個訂閱者訂閱同一個資料源的情況下。當我們使用publish
方法時,Observable會被轉換成ConnectableObservable,這意味著資料的生產會被延遲,直到我們呼叫connect
方法。
使用publish和connect
以下是使用publish
和connect
的範例:
import time
from rx import interval
from rx.operators import publish, take, map
start = time.time()
# 建立一個Observable,傳送間隔1秒的資料
obs = interval(1).pipe(
map(lambda a: (a, time.time() - start)),
publish() # 將Observable轉換成ConnectableObservable
)
# 訂閱者1
take(4)(obs).subscribe(lambda x: print("First subscriber: {}".format(x)))
# 等待2秒
time.sleep(2)
# 訂閱者2
take(4)(obs).subscribe(lambda x: print("Second subscriber: {}".format(x)))
# 啟動資料生產
obs.connect()
在這個範例中,publish
方法被用來轉換Observable成ConnectableObservable。然後,兩個訂閱者被建立,分別訂閱同一個資料源。由於publish
方法的作用,資料的生產會被延遲,直到connect
方法被呼叫。
結果
當我們執行這個範例時,兩個訂閱者都會收到相同的資料。這是因為publish
方法保證了所有訂閱者都會收到相同的資料,即使他們在不同的時間點訂閱。
First subscriber: (0, 2.0016899108886719)
First subscriber: (1, 3.0027990341186523)
First subscriber: (2, 4.003532648086548)
First subscriber: (3, 5.0043666848449707)
Second subscriber: (0, 2.0016899108886719)
Second subscriber: (1, 3.0027990341186523)
Second subscriber: (2, 4.003532648086548)
Second subscriber: (3, 5.0043666848449707)
在這個結果中,兩個訂閱者都收到了相同的資料,儘管他們在不同的時間點訂閱。這是publish
方法的作用,保證了所有訂閱者都會收到相同的資料。
熱資料源(Hot Data Source)與冷資料源(Cold Data Source)
在回應式程式設計中,資料源可以分為兩種型別:熱資料源(Hot Data Source)和冷資料源(Cold Data Source)。熱資料源是指資料的生產與訂閱者無關,資料會持續生產並傳送給所有訂閱者。另一方面,冷資料源則是指資料的生產與訂閱者有關,當訂閱者訂閱資料源時,資料會從頭開始重新生產。
熱資料源的特性
熱資料源的特性是資料的生產與訂閱者無關,資料會持續生產並傳送給所有訂閱者。這意味著,即使訂閱者晚加入,也會從最後一個資料開始接收。熱資料源常見於實時資料處理、事件驅動系統等應用場景。
冷資料源的特性
冷資料源的特性是資料的生產與訂閱者有關,當訂閱者訂閱資料源時,資料會從頭開始重新生產。這意味著,每個訂閱者都會從頭開始接收資料。冷資料源常見於批次處理、離線計算等應用場景。
RxPY 中的 publish
和 replay
方法
在 RxPY 中,publish
方法可以用來建立一個熱資料源,而 replay
方法可以用來建立一個冷資料源。publish
方法會將資料源轉換為一個熱資料源,資料會持續生產並傳送給所有訂閱者。replay
方法則會將資料源轉換為一個冷資料源,資料會從頭開始重新生產並傳送給每個訂閱者。
範例
以下範例展示了 publish
和 replay
方法的使用:
import rx
from rx import operators as ops
import time
# 建立一個間隔 1 秒的資料源
source = rx.interval(1).pipe(
ops.map(lambda x: (x, time.time() - start))
)
# 使用 publish 方法建立一個熱資料源
hot_source = source.pipe(ops.publish())
# 使用 replay 方法建立一個冷資料源
cold_source = source.pipe(ops.replay())
# 訂閱熱資料源
hot_source.subscribe(lambda x: print("First subscriber:", x))
# 等待 2 秒
time.sleep(2)
# 訂閱熱資料源
hot_source.subscribe(lambda x: print("Second subscriber:", x))
# 訂閱冷資料源
cold_source.subscribe(lambda x: print("First subscriber:", x))
# 等待 2 秒
time.sleep(2)
# 訂閱冷資料源
cold_source.subscribe(lambda x: print("Second subscriber:", x))
在這個範例中,publish
方法用來建立一個熱資料源,資料會持續生產並傳送給所有訂閱者。replay
方法則用來建立一個冷資料源,資料會從頭開始重新生產並傳送給每個訂閱者。
並發性實作
在RxPY中,觀察者(Observer)可以訂閱多個觀察序列(Observable),而觀察序列可以是冷的(Cold)或熱的(Hot)。冷觀察序列只有當有訂閱者時才會開始傳送資料,而熱觀察序列則會在建立時立即開始傳送資料。
熱觀察序列
熱觀察序列可以透過多種方式建立,例如使用rx.subject.Subject
類。Subject
類是一種特殊的觀察序列,可以同時接收和傳送資料。下面的程式碼示範瞭如何使用Subject
類建立一個熱觀察序列:
from rx.subject import Subject
s = Subject()
s.subscribe(lambda x: print("觀察序列傳送值:{}".format(x)))
s.on_next(1)
# 觀察序列傳送值:1
s.on_next(2)
# 觀察序列傳送值:2
在上面的程式碼中,建立了一個Subject
例項,並訂閱了它。然後,使用on_next
方法手動推播資料到觀察序列。每當推播資料時,訂閱者都會被呼叫。
並發性實作
RxPY提供了多種方式來實作並發性,包括使用rx.concurrency
模組。下面的程式碼示範瞭如何使用rx.concurrency
模組實作並發性:
import rx
from rx import operators as ops
from rx.concurrency import ThreadPoolScheduler
def intensive_task(x):
# 模擬一個耗時的任務
import time
time.sleep(1)
return x * 2
scheduler = ThreadPoolScheduler()
rx.from_iterable([1, 2, 3, 4, 5]) \
.pipe(ops.map(intensive_task)) \
.subscribe_on(scheduler) \
.subscribe(lambda x: print("任務完成:{}".format(x)))
在上面的程式碼中,使用rx.concurrency
模組建立了一個執行緒池排程器(ThreadPoolScheduler)。然後,使用rx.from_iterable
建立了一個觀察序列,並使用ops.map
運運算元將每個元素對映到一個耗時的任務。最後,使用subscribe_on
方法指定排程器,並訂閱觀察序列。每當任務完成時,訂閱者都會被呼叫。
建立 CPU 監控器
現在我們已經掌握了主要的反應式程式設計概念,我們可以實作一個示例應用:一個監控器,它將提供我們的 CPU 使用率的實時資訊,並且可以檢測尖峰。
第一步:實作資料來源
我們將使用 psutil
模組,它提供了一個函式 psutil.cpu_percent()
,傳回最新可用的 CPU 使用率作為百分比(且不會阻塞)。以下是示例程式碼:
import psutil
print(psutil.cpu_percent())
第二步:建立間隔觀察器
由於我們正在開發一個監控器,我們希望在幾個時間間隔內取樣此資訊。為了實作這一點,我們可以使用熟悉的間隔觀察器,然後使用 publish()
和 connect()
方法使其變熱。以下是建立 cpu_data
觀察器的完整程式碼:
import psutil
from rx import interval, map, publish
cpu_data = interval(0.1).pipe(
map(lambda x: psutil.cpu_percent()),
publish()
)
第三步:測試監控器
我們可以透過以下方式測試監控器:
from rx import take
take(4)(cpu_data).subscribe(print)
結果
輸出結果將顯示 CPU 使用率的實時資訊:
12.5
5.6
4.5
圖表翻譯
以下是 CPU 監控器的 Mermaid 圖表:
flowchart TD A[開始] --> B[建立資料來源] B --> C[建立間隔觀察器] C --> D[測試監控器] D --> E[顯示結果]
圖表翻譯
此圖表顯示了 CPU 監控器的工作流程。首先,建立資料來源,然後建立間隔觀察器,最後測試監控器並顯示結果。
9.6 反應式程式設計
現在我們的主要資料來源已經就位,我們可以使用 matplotlib 實作監視器視覺化。想法是建立一個包含固定數量測量的圖表,並且當新的資料到達時,我們包含最新的測量並移除最舊的那一個。這通常被稱為移動視窗,可以透過圖示來更好地理解。
以下是移動視窗的圖示:
flowchart TD A[cpu_data] --> B[移動視窗] B --> C[圖表] C --> D[更新圖表]
圖表翻譯:
這個圖示展示了 cpu_data 的移動視窗如何被轉換成圖表。當新的資料到達時,移動視窗會向前移動,圖表會被更新。
要實作這個演算法,我們可以寫一個叫做 monitor_cpu
的函式,它會建立和更新我們的圖表視窗。這個函式會做以下事情:
- 初始化一個空的圖表並設定正確的圖表限制。
- 將我們的
cpu_data
可觀察資料轉換為傳回移動視窗的資料。這可以使用buffer_with_count
運算子實作,它會取視窗中的點數npoints
作為引數和 1 作為偏移量。 - 訂閱這個新的資料流並更新圖表以反映傳入的資料。
以下是 monitor_cpu
函式的完整程式碼:
from rx.operators import buffer_with_count
import numpy as np
import pylab as plt
def monitor_cpu(npoints):
lines, = plt.plot([], [])
plt.xlim(0, npoints)
plt.ylim(0, 100) # 0 to 100 percent
cpu_data_window = buffer_with_count(npoints, 1)(cpu_data)
def update_plot(cpu_readings):
lines.set_xdata(np.arange(len(cpu_readings)))
# 更新圖表的 y 資料
lines.set_ydata(cpu_readings)
plt.draw()
plt.pause(0.01)
cpu_data_window.subscribe(update_plot)
plt.show()
內容解密:
這個程式碼建立了一個圖表視窗,並使用 buffer_with_count
運算子將 cpu_data
可觀察資料轉換為傳回移動視窗的資料。然後,它訂閱這個新的資料流並更新圖表以反映傳入的資料。圖表的 x 資料是視窗中的點數,y 資料是傳入的 cpu_readings 資料。
即時CPU使用率監控系統
在這個範例中,我們將使用Python和matplotlib函式庫來建立一個即時CPU使用率監控系統。這個系統可以即時顯示CPU使用率,並在使用率超過一定閾值時觸發警告。
系統架構
系統架構如下:
- CPU使用率資料採集:使用
psutil
函式庫來採集CPU使用率資料。 - 資料處理:使用
numpy
函式庫來處理資料。 - 即時繪圖:使用
matplotlib
函式庫來即時繪圖。 - 警告系統:使用
matplotlib
函式庫來顯示警告訊息。
程式碼實作
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import psutil
# CPU使用率資料採集
def get_cpu_usage():
return psutil.cpu_percent()
# 即時繪圖
def update_plot(frame):
cpu_usage = get_cpu_usage()
lines.set_ydata([cpu_usage])
plt.draw()
# 警告系統
def update_warning(is_high):
if is_high:
label.set_text("高")
else:
label.set_text("正常")
# 主函式
def main():
global lines, label
fig, ax = plt.subplots()
lines, = ax.plot([1], [1])
ax.set_ylim(0, 100)
label = plt.text(1, 1, "正常")
plt.draw()
# 即時繪圖
ani = animation.FuncAnimation(fig, update_plot, interval=1000)
# 警告系統
def check_cpu_usage():
cpu_usage = get_cpu_usage()
if cpu_usage > 20:
update_warning(True)
else:
update_warning(False)
# 每秒檢查一次CPU使用率
import threading
def check_cpu_usage_loop():
while True:
check_cpu_usage()
plt.pause(1)
threading.Thread(target=check_cpu_usage_loop).start()
plt.show()
if __name__ == "__main__":
main()
結果
執行這個程式後,會開啟一個互動式視窗,顯示CPU使用率的即時資料。如果使用率超過20%,會顯示警告訊息。
這個範例展示瞭如何使用Python和matplotlib函式庫來建立一個即時CPU使用率監控系統,並在使用率超過一定閾值時觸發警告。
平行處理與反應式程式設計
平行處理是指使用多個核心或處理器來執行多個任務,以提高程式的執行效率。在這一章中,我們將探討平行處理的基本概念和技術,包括使用多個程式、平行Cython和自動平行化。
平行處理的需求
平行處理是解決大規模問題的必要條件。每天,公司都會產生大量的資料,這些資料需要被儲存和分析在多臺電腦上。科學家和工程師在超級電腦上執行平行程式碼來模擬大規模系統。平行處理允許您利用多核心中央處理器(CPU)和圖形處理器(GPU),這些圖形處理器在高度平行的問題上工作得非常好。
平行程式設計的介紹
要平行化一個程式,需要將問題分解成可以獨立執行的子單元。一個子單元完全獨立於其他子單元的問題被稱為「令人尷尬的平行」(embarrassingly parallel)。例如,對陣列進行元素級別的操作是一個典型的例子,因為這種操作只需要知道它正在處理的元素。另一個例子是粒子模擬器,因為沒有相互作用,每個粒子可以獨立於其他粒子演化。
平行程式設計可以分為兩種:分享記憶體和分散式記憶體。在分享記憶體中,子單元可以存取相同的記憶體空間。這種方法的優點是您不需要明確地處理通訊,因為它足以從分享記憶體中讀取或寫入。然而,當多個程式試圖存取和修改相同的記憶體位置時,問題就會出現。需要使用同步技術來避免這種衝突。
在分散式記憶體模型中,每個程式都是完全獨立的,具有自己的記憶體空間。在這種情況下,通訊是明確地在程式之間處理的。通訊的開銷通常比分享記憶體更昂貴,因為資料可能需要透過網路介面傳輸。
反應式程式設計
反應式程式設計是一種程式設計模型,旨在處理資料流和事件。它非常適合處理實時應用和UI中的資料流。在這一章中,我們將探討反應式程式設計的基本概念和技術,包括使用RxPY函式庫。
RxPY是一個Python函式庫,提供了一種簡單的方式來處理資料流和事件。它允許您使用觀察者模式來處理資料流,從而可以簡化複雜的程式設計任務。
圖表翻譯:
此圖示為RxPY的觀察者模式,使用from_iterable
建立一個資料流,然後使用map
和filter
運運算元處理資料流,最後使用subscribe
方法訂閱資料流。
flowchart TD A[資料流] --> B[觀察者模式] B --> C[map運運算元] C --> D[filter運運算元] D --> E[subscribe方法] E --> F[處理結果]
從技術架構視角來看,RxPY 提供了豐富的運運算元,例如 interval
、merge
、publish
和 buffer_with_count
等,讓開發者能以簡潔的語法實作複雜的非同步資料流操作。然而,深入剖析其冷熱可觀察物件的差異,可以發現正確區分和運用兩者對於避免非預期行為至關重要。冷可觀察物件如 interval
,每個訂閱都會觸發新的資料流,而熱可觀察物件則會分享同一資料流。冷熱可觀察物件的選擇取決於應用場景,例如實時監控系統更適合採用熱可觀察物件。技術團隊應著重理解這些核心差異,才能充分發揮 RxPY 的非同步程式設計優勢。對於追求高效能的系統,建議優先考慮熱可觀察物件,並善用 publish
、connect
等方法控制資料流的生命週期。接下來,隨著更多根據 RxPY 的實務案例出現,預期其在 Python 非同步程式設計領域的影響力將持續提升。