Rust 的非同步程式設計依賴 async/await 語法,但背後的執行環境(executor)才是真正驅動非同步任務的核心。與傳統同步程式不同,Rust 非同步程式碼本質上產生 Future,這些 Future 不會自動執行,而是需要執行環境來驅動。理解執行環境的工作原理對於撰寫高效能的平行程式至關重要。

非同步執行環境的工作原理

我們可以用一個生活中的例子來解釋非同步執行環境的工作方式。想像你有一件髒外套需要清洗。外套內側的標籤(指示清洗說明和材質)就像future。你走進乾洗店,將外套和指示交給店員。店員透過給外套上塑膠套並標上號碼,使其變得「可執行」。店員也給你一張帶有號碼的票據,這就像主函式獲得的task。

接著,你去做其他事情,同時外套正在被清洗。如果外套第一次清洗不乾淨,它會繼續進行清洗迴圈,直到乾淨為止。然後你帶著票據回來交給店員,這與block_on函式的階段相同。如果你真的花了很長時間才回來,外套可能已經乾淨了,你可以拿走它繼續你的一天。如果你太早去乾洗店,外套還沒洗乾淨,你就必須等到外套洗好才能帶走。乾淨的外套就是結果。

目前,我們的非同步執行環境只有一個執行緒處理佇列。這就像我們堅持只要乾洗店的一個工作人員一樣。考慮到大多數CPU都有多個核心,這並不是對可用資源的最有效利用。因此,探索如何增加工作者和佇列數量以提高處理更多工的能力將會很有用。

增加工作者與佇列

為了增加處理佇列的執行緒數量,我們可以增加另一個消費佇列的執行緒,使用佇列通道的複製接收器:

let (tx, rx) = flume::unbounded::<Runnable>();
let queue_one = rx.clone();
let queue_two = rx.clone();

thread::spawn(move || {
    while let Ok(runnable) = queue_one.recv() {
        let _ = catch_unwind(|| runnable.run());
    }
});

thread::spawn(move || {
    while let Ok(runnable) = queue_two.recv() {
        let _ = catch_unwind(|| runnable.run());
    }
});

如果我們透過通道傳送任務,流量通常會分佈在兩個執行緒上。如果一個執行緒被CPU密集型任務阻塞,另一個執行緒將繼續處理任務。回想一下,第1章透過斐波那契數列範例證明瞭CPU密集型任務可以使用執行緒平行執行。

我們可以用以下程式碼更簡潔地建立執行緒池:

for _ in 0..3 {
    let receiver = rx.clone();
    thread::spawn(move || {
        while let Ok(runnable) = receiver.recv() {
            let _ = catch_unwind(|| runnable.run());
        }
    });
}

我們可以將CPU密集型任務轉移到執行緒池,並繼續處理程式的其餘部分,在需要任務結果時阻塞它。雖然這不完全符合非同步程式設計的精神(我們使用非同步程式設計來最佳化I/O操作的處理),但這是一個有用的方法,記住某些問題可以透過提前解除安裝CPU密集型任務來解決。

在我的實務經驗中,我常遭遇「非同步不適合計算密集型任務」的警告。非同步僅是一種機制,只要合理,你可以將它用於任何你想要的地方。然而,這個警告並非毫無道理。例如,如果你正在使用非同步執行環境處理入站請求(如大多數Web框架所做的),那麼將計算密集型任務放入非同步執行佇列可能會阻礙你處理入站請求的能力,直到這些計算完成。

將任務傳遞到不同佇列

我們可能想要多個佇列的原因之一是任務可能有不同的優先順序。在這一節中,我們將建立一個高優先順序佇列(有兩個消費執行緒)和一個低優先順序佇列(有一個消費執行緒)。為了支援多個佇列,我們需要以下列舉來分類別任務的目標佇列類別:

#[derive(Debug, Clone, Copy)]
enum FutureType {
    High,
    Low
}

構建自己的非同步任務佇列:深入控制任務分配與優先順序

在現代軟體架構中,對非同步任務的精確控制變得越來越重要。雖然我們可以使用如Tokio這樣成熟的非同步執行時期,但瞭解其底層原理並能夠自行實作一個任務佇列系統,對於掌握非同步程式設計至關重要。本文將探討如何開發一個具備優先順序控制和任務竊取功能的非同步執行系統。

任務竊取機制:提高系統利用率

任務竊取(Task Stealing)是一種最佳化技術,當某個執行緒的任務佇列為空時,該執行緒可以「竊取」其他佇列中的任務來執行。這種機制能有效平衡工作負載,避免某些執行緒閒置而其他執行緒過載的情況。

在我們的系統中,如果高優先順序佇列的消費執行緒沒有任務可處理,它可以從低優先順序佇列「竊取」任務;反之亦然。這樣可以確保系統資源的最大化利用。

要實作任務竊取,我們需要讓每個佇列的消費者都能存取所有的佇列通道。首先,我們需要匯入適當的通道實作:

use flume::{Sender, Receiver};

我們使用flume函式庫標準函式庫道,因為我們需要能夠將SenderReceiver傳送到其他執行緒。接下來,我們定義兩個靜態通道:

static HIGH_CHANNEL: LazyLock<(Sender<Runnable>, Receiver<Runnable>)=
    LazyLock::new(|| flume::unbounded::<Runnable>());
static LOW_CHANNEL: LazyLock<(Sender<Runnable>, Receiver<Runnable>)=
    LazyLock::new(|| flume::unbounded::<Runnable>());

這些通道是惰性初始化的,只有在首次使用時才會被建立。

高優先順序佇列的消費者邏輯

高優先順序佇列的消費者執行緒需要按照以下步驟工作:

  1. 檢查HIGH_CHANNEL是否有任務
  2. 如果HIGH_CHANNEL沒有任務,檢查LOW_CHANNEL是否有任務
  3. 如果兩個通道都沒有任務,等待100毫秒後再次嘗試

以下是實作這個邏輯的程式碼:

static HIGH_QUEUE: LazyLock<flume::Sender<Runnable>= LazyLock::new(|| {
    for _ in 0..2 {
        let high_receiver = HIGH_CHANNEL.1.clone();
        let low_receiver = LOW_CHANNEL.1.clone();
        thread::spawn(move || {
            loop {
                match high_receiver.try_recv() {
                    Ok(runnable) ={
                        let _ = catch_unwind(|| runnable.run());
                    },
                    Err(_) ={
                        match low_receiver.try_recv() {
                            Ok(runnable) ={
                                let _ = catch_unwind(|| runnable.run());
                            },
                            Err(_) ={
                                thread::sleep(Duration::from_millis(100));
                            }
                        }
                    }
                };
            }
        });
    }
    HIGH_CHANNEL.0.clone()
});

需要注意的是,在生產環境中,我們應該避免在執行緒中使用睡眠機制,因為這會增加回應延遲。更好的方法是使用執行緒停泊(thread parking)或條件變數(condition variables)等更具回應性的機制。

低優先順序佇列的邏輯基本相同,只是會先檢查LOW_CHANNEL,然後才是HIGH_CHANNEL

重構spawn_task函式以提高易用性

目前我們的實作要求所有的Future都必須實作FutureOrderLabel特性,這對開發者來說非常不友好。我們需要重構spawn_task函式,使其能夠接受普通的async塊和async函式。

首先,我們移除FutureOrderLabel特性約束,並在函式中增加一個引數來指定優先順序:

fn spawn_task<F, T>(future: F, order: FutureType) -Task<T>
where
    F: Future<Output = T+ Send + 'static,
    T: Send + 'static,
{
    // 實作省略...
    let schedule = match order {
        FutureType::High =schedule_high,
        FutureType::Low =schedule_low
    };
    // 其餘實作省略...
}

接下來,我們建立一個巨集來簡化呼叫過程:

macro_rules! spawn_task {
    ($future:expr) ={
        spawn_task!($future, FutureType::Low)
    };
    ($future:expr, $order:expr) ={
        spawn_task($future, $order)
    };
}

這個巨集允許我們只傳入future,並預設使用低優先順序;或者同時指定future和優先順序。這樣,我們的API就更加靈活和易用:

let t_one = spawn_task!(one, FutureType::High);  // 高優先順序任務
let t_two = spawn_task!(two);                   // 預設低優先順序
let t_three = spawn_task!(async_fn());          // 傳入非同步函式
let t_four = spawn_task!(async {                // 傳入非同步塊
    async_fn().await;
    async_fn().await;
}, FutureType::High);

建立join巨集簡化等待多個任務

當需要等待多個任務完成時,重複呼叫future::block_on是很繁瑣的。我們可以建立一個join巨集來簡化這個過程:

macro_rules! join {
    ($($future:expr),*) ={
        {
            let mut results = Vec::new();
            $(
                results.push(future::block_on($future));
            )*
            results
        }
    };
}

這個巨集接受任意數量的future,並對每個future呼叫block_on,然後將結果收集到一個向量中回傳。使用方式如下:

let outcome: Vec<u32= join!(t_one, t_two);
let outcome_two: Vec<()= join!(t_four, t_three);

需要注意的是,所有傳入的future必須有相同的回傳類別。

我們還可以建立一個try_join巨集,用於處理可能出錯的情況:

macro_rules! try_join {
    ($($future:expr),*) ={
        {
            let mut results = Vec::new();
            $(
                let result = catch_unwind(|| future::block_on($future));
                results.push(result);
            )*
            results
        }
    };
}

設定我們的執行時期

最後,我們需要一個優雅的方式來設定我們的執行時期環境。我們可以建立一個Runtime結構體和相關的設定方法:

struct Runtime {
    high_num: usize,
    low_num: usize,
}

impl Runtime {
    pub fn new() -Self {
        let num_cores = std::thread::available_parallelism().unwrap()
            .get();
        Self {
            high_num: num_cores - 2,
            low_num: 1,
        }
    }
    
    pub fn with_high_num(mut self, num: usize) -Self {
        self.high_num = num;
        self
    }
    
    pub fn with_low_num(mut self, num: usize) -Self {
        self.low_num = num;
        self
    }
    
    pub fn run(&self) {
        std::env::set_var("HIGH_NUM", self.high_num.to_string());
        std::env::set_var("LOW_NUM", self.low_num.to_string());
        let high = spawn_task!(async {}, FutureType::High);
        let low = spawn_task!(async {}, FutureType::Low);
        join!(high, low);
    }
}

這個結構體允許我們設定高優先順序和低優先順序佇列的消費者執行緒數量。預設設定根據系統的核心數量,但也可以手動指定。run方法設定環境變數並啟動兩個空任務,確保兩個佇列都已初始化。

在我們的消費者執行緒建立邏輯中,我們需要使用這些環境變數:

static HIGH_QUEUE: LazyLock<flume::Sender<Runnable>= LazyLock::new(|| {
    let high_num = std::env::var("HIGH_NUM").unwrap().parse::<usize>()
        .unwrap();
    for _ in 0..high_num {
        // 執行緒建立邏輯...
    }
    // 其餘實作...
});

這樣,我們就可以在程式的開始部分設定和啟動我們的執行時期:

// 使用預設設定
Runtime::new().run();

// 或者使用自定義設定
Runtime::new().with_low_num(2).with_high_num(4).run();

非同步佇列系統的價值

透過構建自己的非同步佇列系統,我們不僅深入瞭解了非同步執行的內部機制,還獲得了對任務分配的精細控制能力。這種控制對於效能敏感的應用尤為重要,例如:

  1. 在金融交易系統中,可以將關鍵交易放入高優先順序佇列,而將報告生成等非關鍵任務放入低優先順序佇列
  2. 在遊戲伺服器中,可以優先處理玩家輸入和物理計算,而將日誌記錄和統計等工作降低優先順序
  3. 在Web服務中,可以根據請求的重要性或客戶等級分配不同的優先順序

雖然在大多數情況下,像Tokio這樣成熟的非同步執行時期已經能夠滿足需求,但瞭解並能夠自行實作這些機制,對於解決特定領域的效能瓶頸和最佳化系統行為至關重要。

在實際開發中,我們可能不會從零開始構建完整的非同步執行時期,但這些知識和技能將幫助我們更好地理解和使用現有的執行時期,並在必要時進行擴充套件或定製。最重要的是,這種深層次的理解讓我們能夠在設計系統時做出更明智的決策,建立更高效、更可靠的非同步應用程式。

完成非同步執行時:背景處理程式與網路整合

在前面的探索中,我們建立了自己的非同步執行時環境,實作了任務排程、執行和竊取等核心功能。然而,要開發一個完整與實用的非同步系統,我們還需要處理兩個關鍵元素:背景處理程式和網路整合。本文將探討這兩個主題,讓我們的非同步執行時真正具備實用價值。

背景處理程式的實作與應用

背景處理程式是在程式整個生命週期中定期在背景執行的任務。這類別處理程式通常用於監控和維護工作,如資料函式庫、日誌輪替和資料更新,確保程式始終能存取最新資訊。讓我們看如何在非同步執行時中實作這種長時間執行的任務。

首先,我們需要建立一個永不停止被輪詢的 future。此時,依據我們前面章節的學習,你應該能夠自行構建這個元件。如果處理程式是阻塞性的,它應該採取以下形式:

#[derive(Debug, Clone, Copy)]
struct BackgroundProcess;

impl Future for BackgroundProcess {
    type Output = ();
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -Poll<Self::Output{
        println!("背景處理程式觸發");
        std::thread::sleep(Duration::from_secs(1));
        cx.waker().wake_by_ref();
        Poll::Pending
    }
}

這個實作的關鍵在於我們始終回傳 Poll::Pending,這確保了背景處理程式會持續執行而不會完成。

在設計背景處理程式時,我們需要注意一個重要問題:如果在主函式中丟棄一個任務,非同步執行時中執行的該任務會被取消與不再執行。因此,背景任務必須在程式的整個生命週期中存在。我們可以在主函式的開頭,定義執行時後立即傳送背景任務:

Runtime::new().with_low_num(2).with_high_num(4).run();
let _background = spawn_task!(BackgroundProcess{});

然而,這種方法並不符合人體工程學設計原則。想像一下,如果某個結構或函式需要建立一個背景執行的任務,我們不希望在程式中不斷傳遞任務以防止它被丟棄。更優雅的解決方案是使用 detach 方法:

Runtime::new().with_low_num(2).with_high_num(4).run();
spawn_task!(BackgroundProcess{}).detach();

detach 方法會將任務指標移入一個不安全的迴圈中,該迴圈會持續輪詢並排程任務直到完成。主函式中與任務關聯的指標隨後被丟棄,消除了在主函式中保持任務存活的需求。

在我的實際專案中,我經常使用背景處理程式來處理定期資料同步、系統健康檢查和清理過期資料。這種模式特別適合那些需要在不幹擾主要業務邏輯的情況下執行維護工作的系統。

網路功能整合:理解執行器與聯結器

將網路功能整合到我們的非同步執行時是一個重要的擴充套件。在深入程式碼之前,我們需要理解兩個關鍵概念:執行器(executor)和聯結器(connector)。

執行器負責將 future 執行至完成。它是執行時的一部分,負責排程任務並確保它們在準備好時執行(或被執行)。當我們將網路功能引入執行時,執行器變得尤為重要,因為如果沒有它,我們的 HTTP 請求等 future 會被建立但永遠不會實際執行。

聯結器在網路程式設計中是一個建立應用程式與我們想連線的服務之間連線的元件。它處理如開啟 TCP 連線並在請求生命週期內維護這些連線等活動。

這些概念在整合像 hyper 這樣的函式庫們的非同步執行時至關重要。如果沒有適當的執行器和聯結器,我們的執行時將無法處理 hyper 依賴的 HTTP 請求和連線。

將 hyper 整合到我們的非同步執行時

如果我們檢視 hyper 官方檔案或各種線上教程,我們可能會得到這樣的印象:可以使用 hyper 套件透過以下程式碼執行簡單的 GET 請求:

use hyper::{Request, Client};

let url = "http://www.rust-lang.org";
let uri: Uri = url.parse().unwrap();
let request = Request::builder()
    .method("GET")
    .uri(uri)
    .header("User-Agent", "hyper/0.14.2")
    .header("Accept", "text/html")
    .body(hyper::Body::empty()).unwrap();

let future = async {
    let client = Client::new();
    client.request(request).await.unwrap()
};

let test = spawn_task!(future);
let response = future::block_on(test);
println!("Response status: {}", response.status());

然而,若我們執行此程式碼,會得到以下錯誤:

thread '<unnamed>' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime

這是因為在底層,hyper 預設在 Tokio 執行時上執行,而我們的程式碼中沒有指定執行器。如果你使用 reqwest 或其他流行的套件,很可能會遇到類別似錯誤。

為解決這個問題,我們需要建立一個能夠在我們自定義的非同步執行時中處理任務的自定義執行器。然後,我們將構建一個自定義聯結器來管理實際的網路連線,使我們的執行時能無縫整合 hyper 和其他類別似函式庫 首先,我們需要匯入以下內容:

use std::net::Shutdown;
use std::net::{TcpStream, ToSocketAddrs};
use std::pin::Pin;
use std::task::{Context, Poll};

use anyhow::{bail, Context as _, Error, Result};
use async_native_tls::TlsStream;
use http::Uri;
use hyper::{Body, Client, Request, Response};
use smol::{io, prelude::*, Async};

接著,我們可以構建自己的執行器:

struct CustomExecutor;

impl<F: Future + Send + 'statichyper::rt::Executor<Ffor CustomExecutor {
    fn execute(&self, fut: F) {
        spawn_task!(async {
            println!("傳送請求");
            fut.await;
        }).detach();
    }
}

這段程式碼定義了我們的自定義執行器和 execute 函式的行為。在這個函式內部,我們呼叫 spawn_task 巨集,建立一個非同步區塊並等待傳入 execute 函式的 future。我們使用 detach 函式,否則通道會關閉,由於任務移出作用域並被簡單地丟棄,我們的請求將無法繼續。正如我們之前所學,detach 會將任務指標傳送到一個迴圈中,在任務完成前不斷輪詢,然後丟棄該任務。

現在,我們有了一個可以傳遞給 hyper 客戶端的自定義執行器。但是,我們的 hyper 客戶端仍然無法發出請求,因為它期望連線由 Tokio 執行時管理。要完全將 hyper 與我們的自定義非同步執行時整合,我們需要構建自己的非同步聯結器,獨立於 Tokio 處理網路連線。

構建 HTTP 連線

在網路請求方面,協定都是明確定義和標準化的。例如,TCP 使用三步握手來建立連線,然後透過該連線傳送位元組包。除非你有標準化連線協定無法提供的非常特定需求,否則從頭實作 TCP 連線沒有任何好處。

HTTP 和 HTTPS 是在 TCP 等傳輸協定之上執行的應用層協定。在構建我們的聯結器時,我們需要處理 TCP 連線的建立、資料傳輸,以及對於 HTTPS 還需處理 TLS 加密。

在開發自定義聯結器時,我遇到了一些有趣的挑戰。例如,處理 TLS 握手過程中的各種邊緣情況,以及確保連線在不同網路條件下的穩定性。這讓我更加理解了底層網路協定的複雜性,也讓我對標準函式庫的高階抽象更加感激。

接下來,我們將實作一個自定義聯結器,使我們能夠在我們的非同步執行時中支援 HTTP 和 HTTPS 請求。這部分實作較為複雜,涉及到底層網路協定和 TLS 處理,但它為我們提供了對網路連線的完全控制,使我們能夠最佳化效能並處理特定的錯誤情況。

實作自定義聯結器

要整合 HTTP 功能到我們的非同步執行時,我們需要建立一個符合 hyper 要求的聯結器。這個聯結器需要能夠建立 TCP 連線,並在需要時處理 TLS 加密。

首先,讓我們定義一個能夠處理 HTTP 和 HTTPS 連線的結構:

pub struct HttpsConnector {
    // 聯結器設定和狀態
}

impl HttpsConnector {
    pub fn new() -Self {
        // 初始化聯結器
        Self { }
    }
    
    // 實作連線方法和其他必要功能
}

在這個聯結器中,我們需要實作 hyper 的 Connect trait,該 trait 定義瞭如何建立到目標服務的連線。這涉及到解析主機名、建立 TCP 連線,以及對於 HTTPS 連線還需要進行 TLS 握手。

這樣的實作允許我們的非同步執行時與 hyper 無縫整合,使我們能夠傳送 HTTP 請求並接收回應,而無需依賴 Tokio 執行時。

從實踐中學到的經驗

在將網路功能整合到自定義非同步執行時的過程中,我學到了幾個重要的經驗:

  1. 理解底層協定:深入理解 TCP、HTTP 和 TLS 等協定的工作方式,對於有效地實作和除錯網路功能至關重要。

  2. 處理錯誤情況:網路通訊充滿了各種可能的錯誤情況,從連線超時到中斷的連線。設計強大的錯誤處理機制是關鍵。

  3. 效能考量:非同步網路操作的效能取決於多種因素,包括連線池管理、超時設定和資源回收策略。

  4. 跨平台相容性:不同作業系統處理網路操作的方式有所不同,確保你的程式碼在所有目標平台上都能正常工作是很重要的。

  5. 安全性考慮:特別是在處理 HTTPS 連線時,確保正確實作 TLS 握手和證書驗證是必不可少的。

應使用案例項:構建高效能 API 客戶端

利用我們整合到非同步執行時的網路功能,我們可以構建高效能的 API 客戶端。以下是一個範例,展示如何使用我們的自定義執行器和聯結器建立 HTTP 客戶端:

// 建立自定義執行器和聯結器
let executor = CustomExecutor;
let connector = HttpsConnector::new();

// 構建 hyper 客戶端
let client: Client<_, hyper::Body= Client::builder()
    .executor(executor)
    .build(connector);

// 建立請求
let request = Request::builder()
    .method("GET")
    .uri("https://api.example.com/data")
    .header("User-Agent", "CustomRuntime/1.0")
    .body(Body::empty())
    .unwrap();

// 在我們的非同步執行時中傳送請求
let response = spawn_task!(async move {
    match client.request(request).await {
        Ok(resp) ={
            // 處理回應
            println!("Status: {}", resp.status());
            // 讀取回應體
            let body_bytes = hyper::body::to_bytes(resp.into_body()).await.unwrap();
            String::from_utf8(body_bytes.to_vec()).unwrap()
        }
        Err(e) ={
            // 處理錯誤
            format!("請求失敗: {}", e)
        }
    }
});

整合網路功能到自定義非同步執行環境:網路協定層與非同步處理的關係

在網路通訊中,各種協定層級扮演著不同的角色。HTTP通訊需要處理請求主體、標頭等元素,而HTTPS更增加了額外的安全層,需要在客戶端傳送資料前進行憑證驗證和交換。考慮到這些協定中的多次往往返和等待回應的特性,網路請求成為非同步處理的理想應用場景。

我們無法在不犧牲安全性和連線可靠性的前提下省略網路協定中的步驟,但我們可以透過非同步技術,讓CPU在等待網路回應時去處理其他任務,提高資源利用效率。

建立HTTP連線

為了支援HTTP和HTTPS,我們需要實作一個自定義串流列舉:

enum CustomStream {
    Plain(Async<TcpStream>),
    Tls(TlsStream<Async<TcpStream>>),
}

這裡的Plain變體是非同步TCP串流,支援HTTP請求;Tls變體則在TCP和HTTP之間加入TLS層,支援HTTPS請求。

接著實作hyper的Service特徵來建立自定義聯結器:

#[derive(Clone)]
struct CustomConnector;

impl hyper::service::Service<Urifor CustomConnector {
    type Response = CustomStream;
    type Error = Error;
    type Future = Pin<Box<dyn Future<Output = Result<
        Self::Response, Self::Error>+ Send
    >>;
    
    fn poll_ready(&mut self, _: &mut Context<'_>)
        -Poll<Result<(), Self::Error>{
        // 實作內容
    }
    
    fn call(&mut self, uri: Uri) -Self::Future {
        // 實作內容
    }
}

Service特徵定義了連線的未來結果(Future)。在這個實作中,我們的連線是一個執行緒安全的Future,它會回傳我們的串流列舉,這個列舉要麼是非同步TCP連線,要麼是包裝在TLS串流中的非同步TCP連線。

poll_ready函式實作

poll_ready函式被hyper用來檢查服務是否準備好處理請求。對於客戶端實作,我們總是回傳Ready

fn poll_ready(&mut self, cx: &mut Context<'_>) -Poll<Result<(), Error>{
    Poll::Ready(Ok(()))
}

如果我們在實作伺服器時,可能會根據伺服器負載狀態回傳Pending,讓任務持續輪詢直到服務準備就緒。

call函式實作

call函式需要在poll_ready回傳Ok後才能使用:

fn call(&mut self, uri: Uri) -Self::Future {
    Box::pin(async move {
        let host = uri.host().context("cannot parse host")?;
        match uri.scheme_str() {
            Some("http") ={
                // HTTP處理邏輯
            }
            Some("https") ={
                // HTTPS處理邏輯
            }
            scheme =bail!("unsupported scheme: {:?}", scheme),
        }
    })
}

對於HTTPS連線,我們需要執行以下步驟:

let socket_addr = {
    let host = host.to_string();
    let port = uri.port_u16().unwrap_or(443);
    smol::unblock(move || (host.as_str(), port).to_socket_addrs())
        .await?
        .next()
        .context("cannot resolve address")?
};
let stream = Async::<TcpStream>::connect(socket_addr).await?;
let stream = async_native_tls::connect(host, stream).await?;
Ok(CustomStream::Tls(stream))

這裡使用443作為HTTPS的標準埠。我們將解析主機地址的工作放在unblock函式中執行,這樣在解析地址時可以釋放執行緒去做其他事情。接著建立TCP串流,然後連線到TLS,最後回傳我們的CustomStream列舉。

HTTP處理邏輯與HTTPS類別似,只是使用80作為標準埠,與不需要TLS連線:

let socket_addr = {
    let host = host.to_string();
    let port = uri.port_u16().unwrap_or(80);
    smol::unblock(move || (host.as_str(), port).to_socket_addrs())
        .await?
        .next()
        .context("cannot resolve address")?
};
let stream = Async::<TcpStream>::connect(socket_addr).await?;
Ok(CustomStream::Plain(stream))

實作Tokio的AsyncRead特徵

為了讓hyper能使用我們的串流,我們需要實作Tokio的AsyncReadAsyncWrite特徵。AsyncRead特徵類別似於標準函式庫std::io::Read`,但整合了非同步任務系統:

impl tokio::io::AsyncRead for CustomStream {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -Poll<io::Result<()>{
        match &mut *self {
            CustomStream::Plain(s) ={
                Pin::new(s)
                    .poll_read(cx, buf.initialize_unfilled())
                    .map_ok(|size| {
                        buf.advance(size);
                    })
            }
            CustomStream::Tls(s) ={
                Pin::new(s)
                    .poll_read(cx, buf.initialize_unfilled())
                    .map_ok(|size| {
                        buf.advance(size);
                    })
            }
        }
    }
}

在這個實作中,我們根據串流類別呼叫相應的poll_read函式,該函式執行讀取操作並回傳一個表示讀取結果的Poll列舉。如果讀取成功,我們會前進緩衝區的填充區域,表示讀取的資料已經存入緩衝區。

實作Tokio的AsyncWrite特徵

AsyncWrite特徵允許我們非同步地寫入位元組:

impl tokio::io::AsyncWrite for CustomStream {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -Poll<io::Result<usize>{
        match &mut *self {
            CustomStream::Plain(s) =Pin::new(s).poll_write(cx, buf),
            CustomStream::Tls(s) =Pin::new(s).poll_write(cx, buf),
        }
    }
    
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) 
        -Poll<io::Result<()>{
        match &mut *self {
            CustomStream::Plain(s) =Pin::new(s).poll_flush(cx),
            CustomStream::Tls(s) =Pin::new(s).poll_flush(cx),
        }
    }
    
    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) 
        -Poll<io::Result<()>{
        match &mut *self {
            CustomStream::Plain(s) ={
                s.get_ref().shutdown(Shutdown::Write)?;
                Poll::Ready(Ok(()))
            }
            CustomStream::Tls(s) =Pin::new(s).poll_close(cx),
        }
    }
}

poll_write函式嘗試將緩衝區中的位元組寫入物件。如果寫入成功,回傳寫入的位元組數;如果物件尚未準備好寫入,回傳Pending;如果回傳0,通常表示物件不再能接受位元組。

poll_flush確保緩衝區的所有內容立即到達目的地,而poll_shutdown則負責關閉連線。注意PlainTls串流的關閉方式略有不同,Plain直接關閉,而Tls需要透過poll_close非同步關閉。

連線並執行客戶端

最後,我們實作Connection特徵並建立傳送請求的函式:

impl hyper::client::connect::Connection for CustomStream {
    fn connected(&self) -hyper::client::connect::Connected {
        hyper::client::connect::Connected::new()
    }
}

async fn fetch(req: Request<Body>) -Result<Response<Body>{
    Ok(Client::builder()
        .executor(CustomExecutor)
        .build::<_, Body>(CustomConnector)
        .request(req)
        .await?)
}

然後在主函式中使用我們的非同步執行環境執行HTTP客戶端:

fn main() {
    Runtime::new().with_low_num(2).with_high_num(4).run();
    
    let future = async {
        let req = Request::get("https://www.rust-lang.org")
            .body(Body::empty())
            .unwrap();
        let response = fetch(req).await.unwrap();
        let body_bytes = hyper::body::to_bytes(response.into_body())
            .await.unwrap();
        let html = String::from_utf8(body_bytes.to_vec()).unwrap();
        println!("{}", html);
    };
    
    let test = spawn_task!(future);
    let _outcome = future::block_on(test);
}

在這個範例中,我們建立一個請求取得Rust官網的HTML內容。我們的非同步執行環境現在能夠與網際網路非同步通訊,使我們能夠在等待網路回應的同時執行其他任務。

深入理解非同步網路通訊的優勢

在開發網路應用時,玄貓發現非同步處理能帶來顯著的效能提升。當系統需要處理大量連線時,傳統的同步模型會為每個連線分配一個執行緒,這在高併發場景下會導致資源浪費和效能瓶頸。

非同步網路模型允許單一執行緒處理多個連線,在等待I/O操作完成時轉而處理其他任務。透過本章實作的自定義非同步執行環境與網路整合,我們可以更清楚地理解Rust非同步程式設計的深層機制和優勢。

從我的經驗來看,在構建高效能網路服務時,深入理解底層非同步機制是關鍵。本章介紹的模式不僅適用於客戶端,也可擴充套件到伺服器端實作,為構建可擴充套件的網路應用提供堅實基礎。

使用 mio 實作底層非同步網路通訊

探索 mio:Rust 的低階非同步 I/O 函式庫在深入實作網路功能時,我們可以跳過使用其他 crate 的 trait 來獲得非同步實作,而直接使用 mio 來監聽 socket 事件。mio(metal I/O)是 Rust 中一個低層次、非阻塞的 I/O 函式庫提供了建立高效能非同步應用程式的基礎元件,並作為作業系統非同步 I/O 功能的薄抽象層。

mio 之所以如此重要,是因為它作為其他高層次非同步執行時(包括 Tokio)的基礎。這些高層次函式庫掉複雜性,使它們更容易使用。對於需要對 I/O 操作進行精細控制並最佳化效能的開發者來說,mio 非常有用。

![Tokio 建立在 mio 之上]

在本章前面,我們將 hyper 連線到我們的執行時。為了獲得完整圖景,現在讓我們探索 mio 並將其整合到我們的執行時中。首先,我們需要在 Cargo.toml 中增加以下依賴:

mio = {version = "1.0.2", features = ["net", "os-poll"]}

我們還需要這些引入:

use mio::net::{TcpListener, TcpStream};
use mio::{Events, Interest, Poll as MioPoll, Token};
use std::io::{Read, Write};
use std::time::Duration;
use std::error::Error;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

mio 與 hyper 的比較

值得注意的是,本章探討的 mio 並非建立 TCP 伺服器的最佳方法。在生產環境中,使用類別似以下 hyper 範例的方法更為合適:

#[tokio::main]
async fn main() -Result<(), Box<dyn std::error::Error + Send + Sync>{
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    let listener = TcpListener::bind(addr).await?;
    loop {
        let (stream, _) = listener.accept().await?;
        let io = TokioIo::new(stream);
        tokio::task::spawn(async move {
            if let Err(err) = http1::Builder::new()
                .serve_connection(io, service_fn(hello))
                .await
            {
                println!("Error serving connection: {:?}", err);
            }
        });
    }
}

這個範例中,主執行緒等待傳入的資料,當資料到達時,會生成一個新任務來處理該資料。這使得監聽器能夠繼續接受更多傳入的資料。雖然我們的 mio 範例將幫助你理解如何輪詢 TCP 連線,但在建構網頁應用時,使用框架或函式庫的監聽器才是最明智的選擇。

在 Future 中輪詢 Socket

mio 設計用於處理大量 socket(數千個)。因此,我們需要識別哪個 socket 觸發了通知。Token 使我們能夠做到這一點。當我們向事件迴圈註冊 socket 時,我們傳遞一個 token,該 token 在處理程式中回傳。Token 是 usize 的結構體元組,因為每個作業系統允許與 socket 關聯的資料量為指標大小。

mio 在這裡不使用回呼,因為我們需要零成本抽象,而 token 是唯一的實作方式。我們可以在 mio 之上構建回呼、流和 future。

使用 token,我們現在有以下步驟:

  1. 向事件迴圈註冊 socket
  2. 等待 socket 就緒
  3. 使用 token 查詢 socket 狀態
  4. 操作 socket
  5. 重複

我們的簡單範例無需對映,因此我們將使用以下程式碼定義 token:

const SERVER: Token = Token(0);
const CLIENT: Token = Token(1);

這裡,我們只需確保 token 是唯一的。傳入 Token 的整數用於區分它與其他 token。現在我們有了 token,我們定義將輪詢 socket 的 future:

struct ServerFuture {
    server: TcpListener,
    poll: MioPoll,
}

impl Future for ServerFuture {
    type Output = String;
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -Poll<Self::Output{
        // 實作內容
    }
}

我們使用 TcpListener 接受傳入的資料,並使用 MioPoll 輪詢 socket 並告訴 future 何時 socket 可讀。在 future 的 poll 函式中,我們可以定義事件並輪詢 socket:

let mut events = Events::with_capacity(1);
let _ = self.poll.poll(
    &mut events,
    Some(Duration::from_millis(200))
).unwrap();

for event in events.iter() {
    // 處理事件
}

cx.waker().wake_by_ref();
return Poll::Pending

poll 將從 socket 中提取事件到 events 迭代器中。我們還設定 socket poll 在 200 毫秒後超時。如果 socket 中沒有事件,我們會繼續,不處理任何事件,並回傳 Pending。我們將繼續輪詢,直到獲得事件。

當我們確實獲得事件時,我們遍歷它們。在前面的程式碼中,我們將容量設定為 1,但我們可以增加容量以處理多個事件。處理事件時,我們需要確定事件類別。對於我們的 future,我們需要確保 socket 是可讀的,並且 token 是 SERVER token:

if event.token() == SERVER && event.is_readable() {
    let (mut stream, _) = self.server.accept().unwrap();
    let mut buffer = [0u8; 1024];
    let mut received_data = Vec::new();
    
    loop {
        // 讀取資料
    }
    
    if !received_data.is_empty() {
        let received_str = String::from_utf8_lossy(&received_data);
        return Poll::Ready(received_str.to_string())
    }
    
    cx.waker().wake_by_ref();
    return Poll::Pending
}

如果 socket 包含資料,則事件是可讀的。如果我們的事件是正確的,我們提取 TcpStream 並在堆積積上使用 Vec 定義 data_received 集合,使用 buffer 切片進行讀取。如果資料為空,我們回傳 Pending,以便在資料不存在時可以再次輪詢 socket。然後我們將資料轉換為字元串並使用 Ready 回傳。這意味著我們有了資料後,socket 監聽器就完成了。

如果我們希望 socket 在程式生命週期內持續被輪詢,我們會生成一個分離的任務,將資料傳遞給非同步函式來處理資料:

if !received_data.is_empty() {
    spawn_task!(some_async_handle_function(&received_data)).detach();
    return Poll::Pending;
}

在我們的迴圈中,我們從 socket 讀取資料:

loop {
    match stream.read(&mut buffer) {
        Ok(n) if n 0 ={
            received_data.extend_from_slice(&buffer[..n]);
        }
        Ok(_) ={
            break;
        }
        Err(e) ={
            eprintln!("Error reading from stream: {}", e);
            break;
        }
    }
}

不管接收到的訊息是否大於緩衝區;我們的迴圈將繼續提取所有要處理的位元組,將它們增加到我們的 Vec 中。如果沒有更多位元組,我們可以停止迴圈來處理資料。

現在我們有了一個 future,它將繼續輪詢,直到從 socket 接收資料。接收資料後,future 將終止。我們也可以讓這個 future 持續輪詢 socket。我們可以說,如果需要,我們可以使用這種持續輪詢 future 來追蹤數千個 socket。每個 future 一個 socket,並將數千個 future 生成到我們的執行時中。

透過 Socket 傳送資料

對於我們的客戶端,我們將在 main 函式中執行所有內容:

fn main() -Result<(), Box<dyn Error>{
    Runtime::new().with_low_num(2).with_high_num(4).run();
    // 主要邏輯
    Ok(())
}

在我們的 main 中,我們首先建立監聽器和客戶端的流:

let addr = "127.0.0.1:13265".parse()?;
let mut server = TcpListener::bind(addr)?;
let mut stream = TcpStream::connect(server.local_addr()?)?;

我們的範例只需要一個流,但如果需要,我們可以建立多個流。我們向 mio poll 註冊伺服器,並使用伺服器和 poll 來生成監聽器任務:

let poll: MioPoll = MioPoll::new()?;
poll.registry()
    .register(&mut server, SERVER, Interest::READABLE)?;

let server_worker = ServerFuture{
    server,
    poll,
};
let test = spawn_task!(server_worker);

現在我們的任務不斷輪詢 TCP 連線埠以取得傳入的事件。然後我們建立另一個具有 CLIENT token 的 poll,用於可寫事件。如果 socket 未滿,我們可以寫入它。如果 socket 已滿,它不再可寫,需要被重新整理。我們的客戶端 poll 形式如下:

let mut client_poll: MioPoll = MioPoll::new()?;
client_poll.registry()
    .register(&mut stream, CLIENT, Interest::WRITABLE)?;

使用 mio,我們還可以建立在 socket 可讀或可寫時觸發的 poll:

.register(
    &mut server,
    SERVER,
    Interest::READABLE | Interest::WRITABLE
)?;

現在我們已經建立了 poll,我們可以等待 socket 變為可寫,然後再寫入。我們使用以下 poll 呼叫:

let mut events = Events::with_capacity(128);
let _ = client_poll.poll(
    &mut events,
    None
).unwrap();

請注意,超時為 None。這意味著我們當前的執行緒將被阻塞,直到 poll 呼叫產生事件。一旦我們有了事件,我們就向 socket 傳送一個簡單的訊息:

for event in events.iter() {
    if event.token() == CLIENT && event.is_writable() {
        let message = "that's so dingo!\n";
        let _ = stream.write_all(message.as_bytes());
    }
}

訊息已傳送,所以我們可以阻塞我們的執行緒,然後列印出訊息:

let outcome = future::block_on(test);
println!("outcome: {}", outcome);

執行程式碼時,你可能會得到以下輸出:

Error reading from stream: Resource temporarily unavailable (os error 35)
outcome: that's so dingo!

它能運作,但我們得到了初始錯誤。這可能是非阻塞 TCP 監聽器的結果;mio 是非阻塞的。“Resource temporarily unavailable” 錯誤通常是由於 socket 中沒有可用資料造成的。這可能發生在建立 TCP 流時,但這不是問題,因為我們在迴圈中處理這些錯誤,並回傳 Pending,以便 socket 繼續被輪詢:

use std::io::ErrorKind;
// ...
Err(ref e) if e.kind() == ErrorKind::WouldBlock ={
    waker.cx.waker().wake_by_ref();
    return Poll::Pending;
}

透過 mio 輪詢功能,我們實作了透過 TCP socket 的非同步通訊。我們也可以使用 mio 透過 UnixDatagram 在程式之間傳送資料。UnixDatagram 是限制在同一台機器上傳送資料的 socket。因此,UnixDatagram 更快,需要更少的連貫的背景與環境切換,並且不必經過網路堆積積疊。

在本章中,玄貓探索瞭如何使用 trait 將第三方 crate 整合到我們的執行時中,並且深入瞭解如何透過 mio 輪詢 TCP socket。當涉及到執行自定義非同步執行時,只要有 trait 檔案,就沒有什麼能阻止我們。

若要進一步掌握到目前為止獲得的非同步知識,我們已經具備建立處理各種端點的基本網頁伺服器的能力。儘管完全使用 mio 實作所有通訊可能很困難,但僅用於非同步程式設計就簡單得多。hyper 的 HttpListener 將處理協定複雜性,讓我們可以專注於將請求作為非同步任務傳遞,以及向客戶端的回應。

在我們的技術探索旅程中,我們專注於非同步程式設計而非網頁程

協程:非同步程式設計的另一種選擇

在現代程式設計中,處理多個平行任務是個常見需求,尤其是在I/O密集型應用中。前面幾章我們已經探討了非同步程式設計,瞭解了await語法背後的Future、任務、執行緒和佇列等機制。但今天,我們要探討一個更基礎的問題:有沒有可能不依賴非同步執行環境,讓程式碼能夠暫停和還原執行?如果有這樣的機制,我們是否能更容易地測試程式碼在不同輪詢順序和設定下的行為?這正是協程(Coroutine)的用武之地。

在本文中,我將帶你深入瞭解協程的概念、工作原理以及在實際開發中的應用場景。你將學會如何整合協程到你的專案中,以降低記憶體消耗,並在不使用非同步執行環境的情況下實作類別似非同步功能的效果。此外,你還將掌握如何精確控制協程的輪詢時機和順序,讓你的程式更加高效與可控。

在撰寫本文時,Rust中的協程語法仍在nightly版本中開發。語法可能會有所變化,也可能在未來穩定版中推出。儘管語法可能變更,本文討論的協程基本原理和應用方式仍然適用。

協程的本質與特性:什麼是協程?

子程式一旦啟動,就會從頭執行到尾,任何特定的子程式例項只會回傳一次。而協程不同,它可以透過多種方式結束:它可以像子程式一樣完成,也可以透過呼叫另一個協程(稱為「產出」或「讓出」)然後稍後回傳到同一點。因此,協程需要在暫停期間儲存其狀態。

協程並非Rust獨有的概念,在許多程式語言中都有不同的實作。但它們都分享一些基本特性,讓程式能夠暫停和還原執行:

非阻塞性

當協程暫停時,它不會阻塞執行緒的執行。

狀態儲存

協程能在暫停時儲存其狀態,並在還原時從該狀態繼續執行,無需從頭開始。

合作式排程

協程可以受控方式暫停並在稍後階段還原。

協程與執行緒的區別

乍看之下,協程和執行緒似乎很相似—它們都執行任務並在稍後暫停/還原。但關鍵差異在於排程機制:

  • 執行緒是由外部排程器搶佔式排程的:任務會被外部排程器中斷,目的是稍後還原該任務。
  • 協程則是合作式的:它可以暫停或讓出給另一個協程,而無需排程器或作業系統的參與。

為什麼要使用協程?

既然協程聽起來這麼好,為什麼我們還需要async/await呢?就像程式設計中的任何事物一樣,這裡也有權衡取捨。

協程的優勢

  • 無需同步原語如互斥鎖,因為協程在同一執行緒中執行
  • 簡化了程式碼的理解和編寫
  • 在同一執行緒中協程間的切換比執行緒間切換成本低得多
  • 特別適合需要長時間等待的任務

舉個例子,假設你需要監控100個檔案的變化。讓作業系統排程100個執行緒迴圈檢查每個檔案會非常耗費資源,因為連貫的背景與環境切換的計算成本很高。相比之下,使用100個協程來檢查它們各自監控的檔案是否發生變化,然後在檔案變化時將其送入執行緒池處理,會更加高效與容易實作。

協程的主要缺點是,如果只在一個執行緒中使用協程,就無法充分利用多核心處理器的優勢。在單一執行緒中執行程式意味著你無法將任務分散到多個核心上。

協程的實際應用

在高層次上,協程使我們能夠暫停操作,將控制權交還給執行協程的執行緒,然後在需要時還原操作。這聽起來很像非同步程式設計。非同步任務可以透過輪詢讓出控制權,讓另一個任務執行。而協程則使我們能夠暫停操作,並在不需要非同步執行環境或執行緒的情況下透過喚醒器還原操作。

這可能有點抽象,讓我用一個簡單的檔案寫入範例來說明協程的優勢。假設我們需要接收大量整數並將它們寫入檔案。也許我們從另一個程式接收數字,但不能等待所有數字都接收完畢才開始寫入,因為這會佔用太多記憶體。

檔案寫入範例

首先,我們需要以下依賴:

[dependencies]
rand = "0.8.5"

接著,我們需要匯入必要的模組:

#![feature(coroutines)]
#![feature(coroutine_trait)]
use std::fs::{OpenOptions, File};
use std::io::{Write, self};
use std::time::Instant;
use rand::Rng;
use std::ops::{Coroutine, CoroutineState};
use std::pin::Pin;

這些匯入看起來有點多,但在我們進行範例時你會看到它們如何被使用。巨集是為了啟用實驗性功能。

對於簡單的檔案寫入範例,我們有以下函式:

fn append_number_to_file(n: i32) -io::Result<(){
    let mut file = OpenOptions::new()
        .create(true)
        .append(true)
        .open("numbers.txt")?;
    writeln!(file, "{}", n)?;
    Ok(())
}

這個函式開啟檔案並寫入數字。現在我們想測試它並測量效能,所以我們生成20萬個隨機整數,迴圈遍歷它們,寫入檔案,並計時:

fn main() -io::Result<(){
    let mut rng = rand::thread_rng();
    let numbers: Vec<i32= (0..200000).map(|_| rng.gen()).collect();
    
    let start = Instant::now();
    for &number in &numbers {
        if let Err(e) = append_number_to_file(number) {
            eprintln!("Failed to write to file: {}", e);
        }
    }
    let duration = start.elapsed();
    
    println!("Time elapsed in file operations is: {:?}", duration);
    Ok(())
}

在我測試時,這段程式碼大約花了4.39秒完成。這並不是很快,主要是因為我們每次都重新開啟檔案,這會帶來許可權檢查和更新檔案元資料的額外開銷。

使用協程最佳化檔案寫入

現在,我們可以使用協程來處理整數的寫入。首先,我們定義一個包含檔案描述符的結構:

struct WriteCoroutine {
    pub file_handle: File,
}

impl WriteCoroutine {
    fn new(path: &str) -io::Result<Self{
        let file_handle = OpenOptions::new()
            .create(true)
            .append(true)
            .open(path)?;
        Ok(Self { file_handle })
    }
}

然後我們實作Coroutine特性:

impl Coroutine<i32for WriteCoroutine {
    type Yield = ();
    type Return = ();
    
    fn resume(mut self: Pin<&mut Self>, arg: i32) 
        -CoroutineState<Self::Yield, Self::Return{
        writeln!(self.file_handle, "{}", arg).unwrap();
        CoroutineState::Yielded(())
    }
}

我們的協程,如類別type Yield = ();type Return = ();所示,既不產出中間值也不回傳最終結果。重要的是我們回傳CoroutineState::Yielded,這類別似於在Future中回傳Pending,但回傳的是Yield類別。我們也可以回傳CoroutineState::Complete,這類別似於Future中的Ready

在我們的測試中,我們可以建立協程並迴圈遍歷數字,呼叫resume函式:

let mut coroutine = WriteCoroutine::new("numbers.txt")?;
for &number in &numbers {
    Pin::new(&mut coroutine).resume(number);
}

更新後的測試約花了622.6毫秒,大約快了六倍。當然,我們也可以建立一個檔案描述符並在迴圈中參照它來獲得相同的效果,但這演示了暫停和還原協程狀態的好處。我們成功地保持了寫入邏輯的隔離,但不需要任何執行緒或非同步執行環境就獲得了這種加速。協程可以作為執行緒和非同步任務內的構建塊,用於暫停和還原計算。

你可能注意到實作Future特性和Coroutine特性之間的相似之處。兩者都有兩種可能的狀態,根據兩種可能的結果還原或完成,並且都可以暫停和還原。從廣義上講,非同步任務可以被視為協程,區別在於它們被送到不同的執行緒並由執行器輪詢。

協程有很多用途,可用於處理網路請求、大資料處理或UI應用。與使用回呼相比,它們提供了處理非同步任務的更簡單方法。使用協程,我們可以在一個執行緒中實作非同步功能,而無需佇列或定義執行器。

使用協程實作生成器

你可能在其他語言(如Python)中接觸過生成器的概念。生成器是協程的一個子集,有時被稱為「弱協程」。之所以這樣稱呼,是因為它們總是將控制權交還給呼叫它們的程式,而不是交給另一個協程。

生成器允許我們懶惰地執行操作——只在需要時才產生輸出值。這可能是執行計算、建立網路連線或載入資料。當處理大型資料集時,這種懶惰評估特別有用,因為一次性處理可能效率低下或不可行。像range這樣的迭代器也有類別似的目的,允許你懶惰地生成值序列。

值得注意的是,Rust語言正在發展,其中一個發展領域是非同步生成器。這些是特殊類別的生成器,可以在非同步連貫的背景與環境中產生值。這方面的工作正在進行中;詳情請參見Rust RFC Book網站。

在Rust中實作簡單的生成器

假設我們想從儲存在資料檔案中的大型資料結構中提取資訊。該資料檔案非常大,理想情況下我們不想一次性將其全部載入記憶體。在我們的例子中,我們將使用一個只有五行的非常小的資料檔案來展示流式處理。記住,這是一個教學範例;在實際世界中,使用生成器讀取五行資料檔案會被視為殺雞用牛刀!

我們需要前面例子中的協程功能,並匯入這些元件:

#![feature(coroutines)]
#![feature(coroutine_trait)]
use std::fs::File;
use std::io::{self, BufRead, BufReader};
use std::ops::{Coroutine, CoroutineState};
use std::pin::Pin;

現在讓我們建立我們的ReadCoroutine結構:

struct ReadCoroutine {
    lines: io::Lines<BufReader<File>>,
}

impl ReadCoroutine {
    fn new(path: &str) -io::Result<Self{
        let file = File::

Rust協程:超越傳統並發模型的強大機制

在處理複雜的I/O操作和需要暫停執行的任務時,Rust的協程(Coroutines)提供了一種優雅與高效的解決方案。作為Rust實驗性功能,協程為開發者提供了一種能夠暫停和還原執行的程式結構,這在檔案處理、網路通訊和其他需要非阻塞操作的場景中特別有用。

在我多年的系統程式設計經驗中,協程一直是我處理複雜I/O任務的首選工具之一,尤其是在不想引入完整非同步執行時框架的情況下。本文將帶你從基礎概念出發,逐步掌握協程的實作方式和高階應用場景。

協程的基本概念與運作機制

協程本質上是一種可以暫停和還原的函式,它允許程式在特定點暫停執行,稍後再從同一點繼續。這種能力在處理I/O操作時特別有價值,因為它允許程式在等待I/O完成時釋放CPU資源。

在Rust中,協程透過Coroutine特徵(trait)實作,這是一個實驗性功能,需要在nightly版本的Rust中使用。協程有兩種主要狀態:

  1. Yielded - 協程暫時暫停,但將來可以還原
  2. Complete - 協程已完成執行

以下是一個簡單的協程實作範例,用於從檔案中讀取整數:

#![feature(coroutines, coroutine_trait)]
use std::{
    fs::File,
    io::{self, BufRead, BufReader},
    ops::{Coroutine, CoroutineState},
    pin::Pin,
};

struct ReadCoroutine {
    lines: std::io::Lines<BufReader<File>>,
}

impl ReadCoroutine {
    fn new(path: &str) -io::Result<Self{
        let file = File::open(path)?;
        let reader = BufReader::new(file);
        let lines = reader.lines();
        Ok(Self { lines })
    }
}

impl Coroutine<()for ReadCoroutine {
    type Yield = i32;
    type Return = ();

    fn resume(
        mut self: Pin<&mut Self>,
        _: ()
    ) -CoroutineState<Self::Yield, Self::Return{
        match self.lines.next() {
            Some(Ok(line)) ={
                if let Ok(num) = line.parse::<i32>() {
                    CoroutineState::Yielded(num)
                } else {
                    CoroutineState::Complete(())
                }
            }
            Some(Err(_)) | None =CoroutineState::Complete(()),
        }
    }
}

這個協程包含一個Yield陳述,允許我們從生成器中產出值。Coroutine特徵只有一個必須實作的方法:resume。這讓我們可以還原執行,從上次執行點繼續。在這個例子中,resume方法從檔案讀取行,將它們解析為整數,並產出這些值,直到沒有更多行可讀取,此時協程完成。

值得注意的是,Coroutine特徵之前被稱為Generator特徵,而CoroutineState曾被稱為GeneratorState,這些名稱更改發生在2023年底。如果你安裝了較舊版本的nightly Rust,需要更新它才能使用上述程式碼。

現在讓我們在測試檔案上呼叫這個函式:

fn main() -io::Result<(){
    let mut coroutine = ReadCoroutine::new("./data.txt")?;
    loop {
        match Pin::new(&mut coroutine).resume(()) {
            CoroutineState::Yielded(number) =println!("{:?}", number),
            CoroutineState::Complete(()) =break,
        }
    }
    Ok(())
}

執行這段程式碼,假設data.txt包含數字1到5,你會得到以下輸出:

1
2
3
4
5

協程的堆積積疊應用:檔案傳輸案例

協程真正展現威力的地方在於它們可以相互配合工作。讓我來分享一個實際案例,當我需要處理超大檔案傳輸時,使用協程讓我避免了記憶體耗盡的問題。

在這個例子中,我們將實作一個檔案傳輸系統,其中一個協程讀取檔案並產出值,而另一個協程接收這些值並寫入到另一個檔案。這種方式特別適用於處理大檔案,因為我們不需要一次將所有資料載入記憶體。

首先,我們重用之前的ReadCoroutine,並增加一個WriteCoroutine

struct WriteCoroutine {
    pub file_handle: File,
}

impl WriteCoroutine {
    fn new(path: &str) -io::Result<Self{
        let file_handle = OpenOptions::new()
            .create(true)
            .append(true)
            .open(path)?;
        Ok(Self { file_handle })
    }
}

接下來,我們建立一個協程管理器來協調這兩個協程的工作:

struct CoroutineManager {
    reader: ReadCoroutine,
    writer: WriteCoroutine
}

impl CoroutineManager {
    fn new(read_path: &str, write_path: &str) -io::Result<Self{
        let reader = ReadCoroutine::new(read_path)?;
        let writer = WriteCoroutine::new(write_path)?;
        Ok(Self {
            reader,
            writer,
        })
    }
    
    fn run(&mut self) {
        let mut read_pin = Pin::new(&mut self.reader);
        let mut write_pin = Pin::new(&mut self.writer);
        loop {
            match read_pin.as_mut().resume(()) {
                CoroutineState::Yielded(number) ={
                    write_pin.as_mut().resume(number);
                }
                CoroutineState::Complete(()) =break,
            }
        }
    }
}

main函式中使用這個協程管理器:

fn main() {
    let mut manager = CoroutineManager::new(
        "numbers.txt", "output.txt"
    ).unwrap();
    manager.run();
}

這個實作建立了一個簡單但高效的檔案傳輸系統。一個協程逐行讀取檔案並產出值給另一個協程,後者接收這些值並寫入檔案。在這兩個協程中,檔案控制程式碼在整個執行過程中保持開啟,這意味著我們不必反覆處理緩慢的I/O操作。

從更廣泛的角度來看,這種方法的好處顯而易見。我們可以用它來移動100個大型檔案,每個檔案可能有多個GB,從一個位置到另一個位置,甚至透過網路傳輸。這種惰性載入和寫入的方式非常適合處理大規模資料傳輸。

協程間的直接呼叫:邁向對稱協程

在前面的例子中,我們使用了一個協程來產出值,然後由另一個協程接收這些值,這個過程由管理器處理。在理想情況下,我們希望完全移除管理器,讓協程直接呼叫彼此並來回傳遞值。

這種類別的協程被稱為對稱協程(symmetric coroutines),在其他語言中廣泛使用。不過,這個功能在Rust中尚未標準化。為了實作類別似的功能,我們需要放棄使用YieldedComplete語法,轉而建立自己的特徵。

首先,讓我們建立一個名為SymmetricCoroutine的特徵:

trait SymmetricCoroutine {
    type Input;
    type Output;
    fn resume_with_input(
        self: Pin<&mut Self>, input: Self::Input
    ) -Self::Output;
}

然後,我們為ReadCoroutine實作這個特徵:

impl SymmetricCoroutine for ReadCoroutine {
    type Input = ();
    type Output = Option<i32>;
    fn resume_with_input(
        mut self: Pin<&mut Self>, _input: ()
    ) -Self::Output {
        if let Some(Ok(line)) = self.lines.next() {
            line.parse::<i32>().ok()
        } else {
            None
        }
    }
}

同樣,我們也為WriteCoroutine實作這個特徵:

impl SymmetricCoroutine for WriteCoroutine {
    type Input = i32;
    type Output = ();
    fn resume_with_input(
        mut self: Pin<&mut Self>, input: i32
    ) -Self::Output {
        writeln!(self.file_handle, "{}", input).unwrap();
    }
}

最後,我們在main函式中將它們組合在一起:

fn main() -io::Result<(){
    let mut reader = ReadCoroutine::new("numbers.txt")?;
    let mut writer = WriteCoroutine::new("output.txt")?;
    loop {
        let number = Pin::new(&mut reader).resume_with_input(());
        if let Some(num) = number {
            Pin::new(&mut writer).resume_with_input(num);
        } else {
            break;
        }
    }
    Ok(())
}

需要注意的是,這個main函式明確指示了協程應如何一起工作,這涉及手動排程,因此技術上它不完全符合真正的對稱協程標準。我們只是在模擬對稱協程的部分功能作為教育練習。

真正的對稱協程會直接從讀取器將控制權傳遞給寫入器,而不必回傳到main函式;這受到Rust借用規則的限制,因為兩個協程需要相互參照。儘管如此,這仍然是一個有用的例子,展示瞭如何編寫自己的協程來提供更多功能。

用協程模擬非同步行為

協程與非同步程式設計有許多相似之處,因為它們都允許暫停執行,並在特定條件滿足時還原。事實上,可以說所有非同步程式設計都是協程的一個子集,非同步執行時本質上就是跨執行緒的協程。

讓我們透過一個簡單的例子來展示執行暫停的功能。首先,我們設定一個協程,讓它睡眠1秒鐘:

#![feature(coroutines, coroutine_trait)]
use std::{
    collections::VecDeque,
    future::Future,
    ops::{Coroutine, CoroutineState},
    pin::Pin,
    task::{Context, Poll},
    time::Instant,
};

struct SleepCoroutine {
    pub start: Instant,
    pub duration: std::time::Duration,
}

impl SleepCoroutine {
    fn new(duration: std::time::Duration) -Self {
        Self {
            start: Instant::now(),
            duration,
        }
    }
}

impl Coroutine<()for SleepCoroutine {
    type Yield = ();
    type Return = ();
    fn resume(
        self: Pin<&mut Self>, _: ())
        -CoroutineState<Self::Yield, Self::Return{
        if self.start.elapsed() >= self.duration {
            CoroutineState::Complete(())
        } else {
            CoroutineState::Yielded(())
        }
    }
}

接下來,我們設定三個SleepCoroutine例項,它們將同時執行。每個例項睡眠1秒鐘:

fn main() {
    let mut sleep_coroutines = VecDeque::new();
    sleep_coroutines.push_back(
        SleepCoroutine::new(std::time::Duration::from_secs(1))
    );
    sleep_coroutines.push_back(
        SleepCoroutine::new(std::time::Duration::from_secs(1))
    );
    sleep_coroutines.push_back(
        SleepCoroutine::new(std::time::Duration::from_secs(1))
    );
    
    let mut counter = 0;
    let start = Instant::now();
    while counter < sleep_coroutines.len() {
        let mut coroutine = sleep_coroutines.pop_front().unwrap();
        match Pin::new(&mut coroutine).resume(()) {
            CoroutineState::Yielded(_) ={
                sleep_coroutines.push_back(coroutine);
            },
            CoroutineState::Complete(_) ={
                counter += 1;
            },
        }
    }
    println!("Took {:?}", start.elapsed());
}

這段程式碼只需要1秒鐘就能完成,儘管我們執行了3個協程,每個協程都需要1秒鐘才能完成。如果按順序執行,我們預期它應該需要3秒鐘。

之所以能在短時間內完成,正

協程的外部控制:靈活性與實用性

在非同步程式設計的世界中,控制流程是一項關鍵挑戰。我們通常在非同步任務內部處理流程控制 — 例如在實作 Future 特性時,我們可以決定何時回傳 PendingReady,這取決於 poll 函式的內部邏輯。同樣地,在非同步函式中,我們可以選擇何時使用 await 語法將控制權交回執行器,以及何時使用 return 陳述式讓非同步任務回傳 Ready

但有時,我們需要從外部控制這些非同步任務。雖然可以透過原子值和互斥鎖等同步原語來實作,但這種方法有其侷限性:

內部控制的侷限性

傳統上,我們需要在將非同步任務傳送到執行時期前,就在任務內部編寫對外部訊號的回應邏輯。這種方法在簡單情境下可能足夠,但會帶來幾個問題:

  1. 脆弱性:當系統環境變化時,非同步任務容易變得脆弱
  2. 重用困難:任務與特定連貫的背景與環境緊密耦合,難以在其他場景中使用
  3. 任務間依賴:任務可能需要了解其他任務的狀態才能做出反應,這可能導致死鎖等問題

死鎖可能在任務 A 需要任務 B 的某些資源才能繼續,而任務 B 又需要任務 A 的某些資源才能繼續時發生,導致雙方都無法前進。

這就是協程外部控制優勢發揮的地方。協程提供了一種簡單的方式,允許外部程式碼控制其執行流程,讓我們的程式變得更簡潔、更靈活。

隨機數協程:外部控制的實際應用

為了展示外部控制如何簡化程式,我們將編寫一個簡單的程式,它迴圈處理一個協程向量。每個協程都有一個值和一個生存狀態(活著或死亡)。當協程被呼叫時,會生成一個隨機數作為其值,然後將這個值回傳給呼叫者。我們可以累積這些值,並制定一個簡單的規則來決定何時「殺死」協程。

首先,我們需要增加隨機數生成的依賴:

[dependencies]
rand = "0.8.5"

然後是必要的引入:

#![feature(coroutines, coroutine_trait)]
use std::{
    ops::{Coroutine, CoroutineState},
    pin::Pin,
    time::Duration,
};
use rand::Rng;

建立隨機數協程

現在我們可以建立一個隨機數協程:

struct RandCoRoutine {
    pub value: u8,
    pub live: bool,
}

impl RandCoRoutine {
    fn new() -Self {
        let mut coroutine = Self {
            value: 0,
            live: true,
        };
        coroutine.generate();
        coroutine
    }

    fn generate(&mut self) {
        let mut rng = rand::thread_rng();
        self.value = rng.gen_range(0..=10);
    }
}

考慮到外部程式碼將控制我們的協程,我們使用一個簡單的生成器實作:

impl Coroutine<()for RandCoRoutine {
    type Yield = u8;
    type Return = ();
    
    fn resume(mut self: Pin<&mut Self>, _: ())
        -CoroutineState<Self::Yield, Self::Return{
        self.generate();
        CoroutineState::Yielded(self.value)
    }
}

外部控制協程流程

這個生成器可以在整個程式碼函式庫用,它完全按照其設計目的執行協程不需要外部依賴,測試也很簡單。在主函式中,我們建立一個這些協程的向量,並持續呼叫它們,直到向量中的所有協程都「死亡」:

let mut coroutines = Vec::new();
for _ in 0..10 {
    coroutines.push(RandCoRoutine::new());
}

let mut total: u32 = 0;
loop {
    let mut all_dead = true;
    for mut coroutine in coroutines.iter_mut() {
        if coroutine.live {
            // 處理活著的協程
        }
    }
    
    if all_dead {
        break
    }
}
println!("Total: {}", total);

如果迴圈中的協程還活著,我們可以假設不是所有協程都死了,將 all_dead 標誌設為 false。然後我們呼叫協程的 resume 函式,提取結果,並制定一個簡單的規則來決定是否殺死協程:

all_dead = false;
match Pin::new(&mut coroutine).resume(()) {
    CoroutineState::Yielded(result) ={
        total += result as u32;
    },
    CoroutineState::Complete(_) ={
        panic!("Coroutine should not complete");
    },
}

if coroutine.value < 9 {
    coroutine.live = false;
}

協程的靈活控制

如果我們降低迴圈中殺死協程的標準,最終總和將會更高,因為達到標準變得更困難。在主執行緒中,我們可以存取該執行緒中的所有內容。例如,我們可以跟蹤所有已死協程,並在數量過高時開始「復活」協程。我們也可以使用死亡數量來改變殺死協程的規則。

當然,這個簡單範例在非同步任務中也可以實作。例如,一個未來(Future)可以在其內部儲存並輪詢另一個未來:

struct NestingFuture {
    inner: Pin<Box<dyn Future<Output = ()+ Send>>,
}

impl Future for NestingFuture {
    type Output = ();
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -Poll<Self::Output{
        match self.inner.as_mut().poll(cx) {
            Poll::Ready(_) =Poll::Ready(()),
            Poll::Pending =Poll::Pending,
        }
    }
}

NestingFuture 可以有一個其他未來的向量,每次被輪詢時都會更新它們自己的 value 欄位,並永遠回傳 PendingNestingFuture 可以提取該值欄位,並制定規則來決定最近輪詢的未來是否應該被「殺死」。然而,NestingFuture 會在非同步執行時期的執行緒中執行,導致對主執行緒中的資料存取有限。

協程與執行緒的協作

考慮到控制協程的便利性,我們需要記住這不是非此即彼的選擇。不是選協程就不能用非同步。以下程式碼證明我們可以在執行緒之間傳遞協程:

let (sender, receiver) = std::sync::mpsc::channel::<RandCoRoutine>();
let _thread = std::thread::spawn(move || {
    loop {
        let mut coroutine = match receiver.recv() {
            Ok(coroutine) =coroutine,
            Err(_) =break,
        };
        
        match Pin::new(&mut coroutine).resume(()) {
            CoroutineState::Yielded(result) ={
                println!("Coroutine yielded: {}", result);
            },
            CoroutineState::Complete(_) ={
                panic!("Coroutine should not complete");
            },
        }
    }
});

std::thread::sleep(Duration::from_secs(1));
sender.send(RandCoRoutine::new()).unwrap();
sender.send(RandCoRoutine::new()).unwrap();
std::thread::sleep(Duration::from_secs(1));

協程作為計算基本單位

由於協程是執行緒安全的,並且可以輕鬆對映協程的結果,我們可以總結協程的理解。協程是可以暫停和還原的計算單元。此外,這些協程還實作了 Future 特性,它可以呼叫 resume 函式,並將該函式的結果對映到 poll 函式的結果。

這表明我們可以在協程函式和未來函式之間插入可選的介面卡程式碼。因此,協程可以視為基本的計算構建塊。這些協程可以在同步程式碼中暫停和還原,因此在標準測試環境中易於測試,因為你不需要非同步執行時期來測試這些協程。你還可以選擇何時呼叫協程的 resume 函式,因此測試協程彼此互動的不同順序也很簡單。

一旦你對協程及其工作方式感到滿意,你可以將一個或多個協程包裝在實作 Future 特性的結構體中。這個結構體本質上是一個介面卡,使協程能夠與非同步執行時期互動。

這給了我們對計算過程的測試和實作的終極靈活性和控制力,以及這些計算步驟和非同步執行時期之間的明確邊界,因為非同步執行時期基本上是一個帶有佇列的執行緒池。任何熟悉單元測試的人都知道,我們不應該必須與執行緒池通訊來測試函式或結構體的計算邏輯。

協程的測試策略

在測試方面,我想分享一個簡單但有效的例子,展示如何測試協程。我們將測試兩個協程,它們取得相同的互斥鎖並將互斥鎖中的值增加一。這樣,我們可以測試取得鎖時會發生什麼,以及互動後鎖的最終結果。

雖然使用協程進行測試簡單而強大,但即使不使用協程,測試也不是完全無法實作的。有專門的章節討論測試技術,而與不一定需要使用協程。

互斥鎖協程的實作

我們從一個結構體開始,它有一個互斥鎖的控制程式碼和一個閾值,協程將在達到閾值後完成:

use std::ops::{Coroutine, CoroutineState};
use std::pin::Pin;
use std::sync::{Arc, Mutex};

pub struct MutexCoRoutine {
    pub handle: Arc<Mutex<u8>>,
    pub threshold: u8,
}

然後我們實作取得鎖並將值增加一的邏輯:

impl Coroutine<()for MutexCoRoutine {
    type Yield = ();
    type Return = ();
    
    fn resume(mut self: Pin<&mut Self>, _: ())
        -CoroutineState<Self::Yield, Self::Return{
        match self.handle.try_lock() {
            Ok(mut handle) ={
                *handle += 1;
            },
            Err(_) ={
                return CoroutineState::Yielded(());
            },
        }
        
        self.threshold -= 1;
        if self.threshold == 0 {
            return CoroutineState::Complete(())
        }
        return CoroutineState::Yielded(())
    }
}

我們嘗試取得鎖,但如果無法取得,我們不想阻塞,所以我們回傳 Yielded。如果取得了鎖,我們將值增加一,將閾值減少一,然後根據閾值是否達到回傳 YieldedComplete

在非同步函式中的阻塞程式碼可能導致整個非同步執行時期停滯,因為它阻止其他任務取得進展。在 Rust 中,這個概念通常被稱為函式顏色,函式要麼是同步(阻塞)的,要麼是非同步(非阻塞)的。不正確地混合這些可能導致問題。例如,如果 Mutextry_lock 方法阻塞,這在非同步連貫的背景與環境中將是有問題的。雖然 try_lock 本身是非阻塞的,但你應該知道其他鎖定機制(如 lock)會阻塞,因此在非同步函式中應避免或謹慎處理。

協同程式與反應式程式設計:鎖釋放的重要性與狀態檢查

在使用協同程式(coroutines)時,正確管理鎖的釋放至關重要。如同我們在程式中所見,在初始測試完成兩個 yield 操作後,我們必須釋放鎖。若不這麼做,後續測試將會失敗,因為我們的協同程式永遠無法取得該鎖。

執行迴圈的目的是為了證明當協同程式無法取得鎖時,閾值也不會被改變。如果閾值在這種情況下被改變了,那麼在兩次迭代後,協同程式就會回傳 complete 狀態,而下一次呼叫協同程式時就會產生錯誤。雖然即使沒有這個迴圈,測試也能發現這個問題,但在開始時加入迴圈可以消除任何對測試中斷原因的混淆。

釋放鎖後,我們對每個協同程式各呼叫兩次,以確保它們回傳我們預期的結果,並在所有呼叫之間檢查互斥鎖,確保狀態按照我們預期的方式精確變化:

assert_eq!(check_yield(&mut first_coroutine), true);
assert_eq!(*handle.lock().unwrap(), 1);
assert_eq!(check_yield(&mut second_coroutine), true);
assert_eq!(*handle.lock().unwrap(), 2);
assert_eq!(check_yield(&mut first_coroutine), false);
assert_eq!(*handle.lock().unwrap(), 3);
assert_eq!(check_yield(&mut second_coroutine), false);
assert_eq!(*handle.lock().unwrap(), 4);

非同步測試與協同程式整合

在非同步測試中,我們以完全相同的方式建立互斥鎖和協同程式。然而,現在我們測試的是在非同步執行時環境中行為結果是否相同,以及我們的非同步介面是否按預期工作。由於我們使用 Tokio 的測試功能,我們只需啟動任務,等待它們完成,然後檢查鎖:

let handle_one = tokio::spawn(async move {
    first_coroutine.await;
});
let handle_two = tokio::spawn(async move {
    second_coroutine.await;
});
handle_one.await.unwrap();
handle_two.await.unwrap();
assert_eq!(*handle.lock().unwrap(), 4);

執行 cargo test 命令後,我們會看到兩個測試都成功透過。我們已經逐步檢查了兩個協同程式與互斥鎖之間的互動,檢視了每次迭代之間的狀態。我們的協同程式在同步程式碼中正常工作。而與,透過簡單的介面卡,我們也能確認協同程式在非同步執行時環境中也能按照我們預期的方式工作!

值得注意的是,在非同步測試中,我們無法在協同程式的每次互動中檢查互斥鎖,這是因為非同步執行器有自己的運作方式。

反應式程式設計:事件驅動的程式設計正規化

反應式程式設計是一種程式設計正規化,其中程式碼對資料值或事件的變化做出反應。它使我們能夠構建實時動態回應變化的系統。在非同步程式設計的背景下,我們將重點放在輪詢和應對資料變化的非同步方法上。

基本反應式系統的建構

在建構基本反應式系統時,我們將實作觀察者模式。在這種模式中,我們有主題和訂閱該主題更新的觀察者。當主題發布更新時,觀察者根據其特定需求對此更新作出反應。

以一個簡單的加熱系統為例,系統在溫度低於期望設定時開啟加熱器:

static TEMP: LazyLock<Arc<AtomicI16>= LazyLock::new(|| {
    Arc::new(AtomicI16::new(2090))
});

static DESIRED_TEMP: LazyLock<Arc<AtomicI16>= LazyLock::new(|| {
    Arc::new(AtomicI16::new(2100))
});

static HEAT_ON: LazyLock<Arc<AtomicBool>= LazyLock::new(|| {
    Arc::new(AtomicBool::new(false))
});

在這個系統中,溫度和期望溫度是主題,而加熱器和顯示器是觀察者。當溫度低於期望溫度設定時,加熱器會開啟。當溫度變化時,顯示器會將溫度列印到終端。

建構顯示觀察者

定義好主題後,我們可以定義顯示未來(future):

pub struct DisplayFuture {
    pub temp_snapshot: i16,
}

impl DisplayFuture {
    pub fn new() -Self {
        DisplayFuture {
            temp_snapshot: TEMP.load(Ordering::SeqCst)
        }
    }
}

建立未來時,我們載入溫度主題的值並儲存它。我們使用 Ordering::SeqCst 來確保溫度值在所有執行緒中保持一致。

然後,我們可以使用這個儲存的溫度與輪詢時的溫度進行比較:

impl Future for DisplayFuture {
    type Output = ();
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -Poll<Self::Output{
        let current_snapshot = TEMP.load(Ordering::SeqCst);
        let desired_temp = DESIRED_TEMP.load(Ordering::SeqCst);
        let heat_on = HEAT_ON.load(Ordering::SeqCst);
        
        if current_snapshot == self.temp_snapshot {
            cx.waker().wake_by_ref();
            return Poll::Pending
        }
        
        if current_snapshot < desired_temp && heat_on == false {
            HEAT_ON.store(true, Ordering::SeqCst);
        }
        else if current_snapshot desired_temp && heat_on == true {
            HEAT_ON.store(false, Ordering::SeqCst);
        }
        
        clearscreen::clear().unwrap();
        println!("Temperature: {}\nDesired Temp: {}\nHeater On: {}",
                 current_snapshot as f32 / 100.0,
                 desired_temp as f32 / 100.0,
                 heat_on);
                 
        self.temp_snapshot = current_snapshot;
        cx.waker().wake_by_ref();
        return Poll::Pending
    }
}

這段程式碼的功能如下:

  1. 取得系統整體快照
  2. 檢查未來持有的溫度快照與當前溫度之間是否有差異。如果沒有差異,就沒有必要重新渲染顯示或做出任何加熱決策,只需回傳 Pending,結束輪詢
  3. 檢查當前溫度是否低於期望溫度。如果是,將 HEAT_ON 標誌設為 true
  4. 如果溫度高於期望溫度,將 HEAT_ON 標誌設為 false
  5. 清除終端以進行更新
  6. 列印快照的當前狀態
  7. 更新未來參照的快照

協同程式與反應式程式設計的聯絡

在深入研究協同程式與反應式程式設計時,我發現兩者之間存在明顯的相似性。協同程式中的 Yield/Complete 與非同步程式設計中的 Pending/Ready 非常相似。最好的理解方式是將 async/await 視為協同程式的一個子類別——一種跨執行緒運作並使用佇列的協同程式。在協同程式和非同步程式設計中,我們都能暫停活動並在之後回到它。

協同程式使我們能夠構建程式碼結構,因為它們可以作為非同步和同步程式碼之間的接縫。透過協同程式,我們可以構建同步程式碼模組,然後使用標準測試進行評估。我們可以構建作為協同程式的介面卡,讓同步程式碼能夠連線需要非同步功能的程式碼,但非同步功能被表示為協同程式。

然後,我們可以對協同程式進行單元測試,看它們在不同順序和組閤中被輪詢時的行為。我們可以將這些協同程式注入 Future trait 實作中,將程式碼整合到非同步執行時環境中,因為我們可以在未來的輪詢函式中呼叫協同程式。在這裡,我們只需要透過介面隔離這些非同步程式碼。一個非同步函式可以呼叫你的程式碼,然後將輸出傳遞到第三方非同步程式碼中,反之亦然。

隔離程式碼的一個好方法是使用反應式程式設計,我們的程式碼單元可以透過訂閱廣播頻道來消費資料。這讓我們能夠構建既靈活又強大的系統,能夠實時回應變化。

協同程式與反應式程式設計都為我們提供了強大的工具,使我們能夠構建更具回應性和彈性的系統。透過理解這些程式設計正規化並將它們整合到我們的程式碼中,我們能夠建立更加模組化、可測試與可維護的軟體系統。

反應式系統中的資料一致性取捨

在建構反應式系統時,我們經常需要面對資料一致性與即時性之間的平衡。就像我在幾個大型物聯網專案中發現的,系統反應的速度與資料的準確性是一對永恆的矛盾體。

快照決策與即時顯示的平衡

對於溫度控制系統這類別應用,若溫度顯示在瞬間有輕微的誤差,通常不會造成災難性後果。實際上,採取資料快照、根據快照做決策,並顯示這個快照,往往比追求絕對即時的顯示更為合理。這種方法有幾個優點:

  • 提供明確的決策依據資料
  • 簡化除錯流程,因為你能看到系統決策所根據的確切資料
  • 避免在讀取與顯示過程中資料被修改導致的不一致

當然,我們也可以採取其他策略:先取得快照並據此更改加熱狀態,然後在顯示前重新載入所有原子變數,確保顯示的瞬時準確性。或者記錄決策快照,但在顯示時讀取最新資料。

簡單系統中的實用主義

// 在我們的簡易系統中,採用快照方法更為實用
pub fn display_temperature_snapshot(temp: i16, heat_on: bool) {
    println!("溫度: {} 度, 加熱器狀態: {}", 
             temp as f32 / 100.0, 
             if heat_on { "開啟" } else { "關閉" });
}

在我們這個簡單的溫控系統中,使用快照顯示方法已足夠,因為這讓我們能清楚觀察系統如何適應環境並做出決策。然而,在設計反應式系統時,這類別取捨值得深思。畢竟,觀察者處理的資料可能在處理過程中就已經過時。

處理過時資料的策略

有幾種方法可以降低處理過時資料的風險:

  1. 限制執行緒數量:將執行環境限制在單一執行緒,確保在顯示過程中溫度資料不會被其他執行緒修改。

  2. 使用互斥鎖:將溫度包裝在互斥鎖中,確保在讀取與顯示之間資料不會變化。

然而,這些方法在某種程度上是自欺人。溫度是一個實際存在的物理量,熱量損失和加熱器可以在任何時刻影響溫度。在系統外部有過程持續改變我們主體狀態的情況下,試圖用技巧避免系統內部溫度變化只是掩蓋問題。

使用原子比較交換功能

對於我們的簡單系統,過時資料問題不是主要關注點。但在更複雜的場景中,Rust標準函式庫的比較交換功能非常有價值:

use std::sync::atomic::{AtomicI64, Ordering};

fn atomic_compare_exchange_demo() {
    // 建立一個值為5的原子整數
    let atomic_value = AtomicI64::new(5);
    
    // 如果值是5,則更新為10
    let result = atomic_value.compare_exchange(
        5,           // 期望值
        10,          // 新值
        Ordering::Acquire,
        Ordering::Relaxed
    );
    
    // 確認交換成功,並回傳舊值5
    assert_eq!(result, Ok(5));
    
    // 確認新值已更新為10
    assert_eq!(atomic_value.load(Ordering::Relaxed), 10);
    
    // 嘗試另一次交換,但預期值不比對
    let result = atomic_value.compare_exchange(
        6,           // 錯誤的期望值
        12,          // 新值
        Ordering::SeqCst,
        Ordering::Acquire
    );
    
    // 確認交換失敗,回傳實際值10
    assert_eq!(result, Err(10));
    
    // 值仍然是10,沒有被修改
    assert_eq!(atomic_value.load(Ordering::Relaxed), 10);
}

這就是為何原子操作被稱為「原子」—它們的交易是原子性的,意味著當一個交易執行時,不會有其他交易同時對該原子值進行操作。

compare_exchange函式讓我們能在更新前斷言原子值為某個特定值。如果值不符預期,函式會回傳錯誤,並附上原子的實際值。這種機制允許觀察者根據回傳的值重新評估情況,並根據更新的資訊嘗試對原子值進行新的更新。

在瞭解了反應式程式設計中的資料並發問題及解決方案後,我們可以繼續建構加熱器和熱損失觀察者。