Tokio 作為 Rust 非同步程式設計的根本,其高效的事件迴圈機制是關鍵。為了維持系統的回應性,必須避免長時間阻塞操作,善用 spawn_blocking 將 CPU 密集型任務解除安裝到專用執行緒池。非同步程式設計中,資料分享需要謹慎處理,Tokio 提供了通道和互斥鎖等機制確保執行緒安全。一次性通道適用於單一值傳遞,而多生產者單消費者通道則支援多個生產者向單一消費者傳送資料。此外,結合 Arc 和 Mutex 能有效管理分享變數,避免資料競爭。迭代器和組合子是簡化集合操作的利器,mapfiltercollect 等組合子能以函式式風格串聯操作,提升程式碼可讀性和效率。理解這些核心概念並靈活運用,才能充分發揮 Tokio 的非同步效能。

3.7 避免阻塞事件迴圈

在非同步程式設計的世界中,最重要的規則就是不要阻塞事件迴圈。這意味著不要直接呼叫可能會執行超過10到100微秒的函式,而是應該使用spawn_blocking來處理這些函式。

3.7.1 CPU密集型操作

那麼,如何執行像加密、影像編碼或檔案雜湊等CPU密集型操作呢?Tokio提供了tokio::task::spawn_blocking函式,用於處理最終會自行完成的阻塞操作。對於這類別任務,Rust的執行緒(Thread)更為合適。

以下是一個使用spawn_blocking的範例:

let is_code_valid = spawn_blocking(move || crypto::verify_password(&code, &code_hash)).await?;

在這個範例中,crypto::verify_password函式預計需要幾毫秒才能完成,如果直接呼叫它,就會阻塞事件迴圈。使用spawn_blocking後,這個操作會被分派到Tokio的阻塞任務執行緒池中。

內容解密:

  1. spawn_blocking的作用:將阻塞操作分派到Tokio的阻塞任務執行緒池,避免阻塞事件迴圈。
  2. Tokio的執行緒池:Tokio維護兩個執行緒池,一個用於執行非同步任務,另一個用於執行阻塞任務。
  3. 非同步任務與阻塞任務的區別:非同步任務適合用於I/O密集型操作,而阻塞任務適合用於CPU密集型操作。

3.8 資料分享

在非同步程式設計中,資料分享是一個常見的需求。由於每個任務都可能在不同的執行緒中執行,因此資料分享需要遵循與執行緒間資料分享相同的規則。

3.8.1 通道(Channels)

通道是一種透過通訊來分享記憶體的方式,而不是透過分享記憶體來通訊。Tokio提供了多種通道型別,以滿足不同的需求。

3.8.1.1 一次性通道(oneshot channel)

一次性通道支援從單一生產者向單一消費者傳送單一值。它通常用於將計算結果傳送給等待者。

use tokio::sync::oneshot;

async fn some_computation() -> String {
    "represents the result of the computation".to_string()
}

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();
    tokio::spawn(async move {
        let res = some_computation().await;
        tx.send(res).unwrap();
    });
    // 在背景計算的同時進行其他工作
    // 等待計算結果
    let res = rx.await.unwrap();
}

內容解密:

  1. 一次性通道的使用場景:用於從單一生產者向單一消費者傳送單一值。
  2. oneshot::channel的建立:使用oneshot::channel()函式建立一次性通道。
  3. tx.sendrx.await:生產者使用tx.send傳送值,消費者使用rx.await接收值。

3.8.1.2 多生產者單消費者通道(mpsc channel)

mpsc通道支援從多個生產者向單一消費者傳送多個值。它通常用於將工作分派給任務或接收多個計算結果。

use tokio::sync::mpsc;

async fn some_computation(input: u32) -> String {
    format!("the result of computation {}", input)
}

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(100);
    tokio::spawn(async move {
        for i in 0..10 {
            let res = some_computation(i).await;
            tx.send(res).await.unwrap();
        }
    });
    while let Some(res) = rx.recv().await {
        println!("got = {}", res);
    }
}

內容解密:

  1. mpsc通道的使用場景:用於從多個生產者向單一消費者傳送多個值。
  2. mpsc::channel的建立:使用mpsc::channel(100)函式建立mpsc通道,緩衝區大小為100。
  3. tx.sendrx.recv:生產者使用tx.send傳送值,消費者使用rx.recv接收值。

3.8.2 Arc<Mutex>

在非同步Rust中,使用互斥鎖(Mutex)來安全地分享變數是必要的。然而,由於Rust的所有權模型,Mutex需要被包裹在std::sync::Arc智慧指標中。

use tokio::sync::Mutex;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let data1 = Arc::new(Mutex::new(0));
    let data2 = Arc::clone(&data1);
    tokio::spawn(async move {
        let mut lock = data2.lock().await;
        *lock += 1;
    });
    let mut lock = data1.lock().await;
    *lock += 1;
}

內容解密:

  1. Mutex的使用:使用Mutex來安全地分享變數。
  2. Arc的作用:使用Arc來實作多個所有者分享同一個Mutex。
  3. RAII的優勢:Mutex會在超出作用域時自動解鎖,避免了手動解鎖的麻煩。

圖表翻譯:

此圖示展示了Tokio的執行緒池架構,包括用於執行非同步任務的固定大小執行緒池和用於執行阻塞任務的動態大小執行緒池。

  graph LR
A[Tokio Runtime] --> B[Async Thread Pool]
A --> C[Blocking Thread Pool]
B --> D[Execute Async Tasks]
C --> E[Execute Blocking Tasks]

圖表翻譯:

  1. Tokio Runtime:Tokio的執行環境。
  2. Async Thread Pool:用於執行非同步任務的執行緒池。
  3. Blocking Thread Pool:用於執行阻塞任務的執行緒池。
  4. Execute Async Tasks:執行非同步任務。
  5. Execute Blocking Tasks:執行阻塞任務。

3.9 組合子(Combinators)

組合子是一個非常有趣的話題。網上大多數的定義都會讓人感到困惑,因為它們引發的問題比答案還要多。因此,這裡提供一個經驗性的定義:組合子是一種簡化某種型別 T 操作的方法。它們傾向於使用函式式(方法鏈式)的程式設計風格。

let sum: u64 = vec![1, 2, 3].into_iter().map(|x| x * x).sum();

本文將專注於介紹如何使用組合子以及實際應用模式,說明它們如何使程式碼更容易閱讀或重構。

3.9.1 迭代器(Iterators)

首先,我們來討論迭代器,因為這是組合子最常被使用的場景。

3.9.1.1 取得迭代器

迭代器是一種能夠讓開發者遍歷集合的物件。大多數標準函式庫中的集合都可以提供迭代器。

首先,into_iter 方法提供了一個擁有迭代器,即集合被移動後,原變數無法再被使用。

fn vector() {
    let v = vec![1, 2, 3];
    for x in v.into_iter() {
        println!("{}", x);
    }
    // 你不能再使用 v
}

內容解密:

  • into_iter 方法將 Vec 轉換為一個迭代器,並且取得其所有權。
  • 在迴圈中使用 into_iter 後,原始的 Vec 變數 v 就無法再被使用了,因為它的所有權已經被轉移。

接著,iter 方法提供了一個借用迭代器。這裡的 keyvalue 變數是參照(在這個例子中是 &String)。

fn hashmap() {
    let mut h = HashMap::new();
    h.insert(String::from("Hello"), String::from("World"));
    for (key, value) in h.iter() {
        println!("{}: {}", key, value);
    }
}

內容解密:

  • iter 方法提供了一個對 HashMap 的借用迭代器。
  • 在迴圈中,keyvalue 是對應的鍵值對的參照,因此不會取得所有權。

從 1.53 版本(2021 年 6 月 17 日發布)開始,陣列也可以提供迭代器:

fn array() {
    let a = [1, 2, 3];
    for x in a.iter() {
        println!("{}", x);
    }
}

內容解密:

  • iter 方法同樣適用於陣列,提供了一個對陣列元素的借用迭代器。

3.9.1.2 使用迭代器

迭代器是惰性的,只有在被使用的時候才會進行操作。就像我們剛剛看到的,迭代器可以透過 for x in 迴圈來使用。但這並不是它們最常用的方式。Rust 的慣用方式是偏好函式式程式設計,因為它更符合 Rust 的所有權模型。

for_eachfor .. in .. 迴圈的函式式等價物:

fn for_each() {
    let v = vec!["Hello", "World", "!"].into_iter();
    v.for_each(|word| {
        println!("{}", word);
    });
}

內容解密:

  • for_each 方法對迭代器中的每個元素執行給定的閉包。
  • 這種寫法比傳統的 for 迴圈更簡潔,並且更具函式式風格。

collect 可以用來將迭代器轉換為集合:

fn collect() {
    let x = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10].into_iter();
    let _: Vec<u64> = x.collect();
}

內容解密:

  • collect 方法將迭代器的結果收集到一個新的集合中。
  • 在這個例子中,它將迭代器的結果收集到一個 Vec<u64> 中。

相反地,你可以使用 from_iter 從迭代器中建立一個 HashMap(或其他集合):

fn from_iter() {
    let x = vec![(1, 2), (3, 4), (5, 6)].into_iter();
    let _: HashMap<u64, u64> = HashMap::from_iter(x);
}

內容解密:

  • from_iter 方法從迭代器中建立一個新的 HashMap
  • 這裡的迭代器包含鍵值對,因此可以直接用來建立 HashMap

reduce 可以透過應用閉包來累積迭代器的結果:

fn reduce() {
    let values = vec![1, 2, 3, 4, 5].into_iter();
    let _sum = values.reduce(|acc, x| acc + x);
}

內容解密:

  • reduce 方法將迭代器的元素累積起來,計算結果。
  • 在這個例子中,它計算了所有元素的和。

foldreduce 類別似,但它可以傳回與迭代器元素不同型別的累積結果:

fn fold() {
    let values = vec!["Hello", "World", "!"].into_iter();
    let _sentence = values.fold(String::new(), |acc, x| acc + x);
}

內容解密:

  • fold 方法同樣是累積迭代器的結果,但它可以指定初始值和累積的方式。
  • 在這個例子中,它將多個字串連線成一個單一的字串。

3.9.1.3 組合子

首先,我們來看一個幾乎所有語言都有的著名組合子:filter

fn filter() {
    let v = vec![-1, 2, -3, 4, 5].into_iter();
    let _positive_numbers: Vec<i32> = v.filter(|x: &i32| x.is_positive()).collect();
}

內容解密:

  • filter 方法根據給定的條件過濾迭代器的元素。
  • 在這個例子中,它過濾出正數。

inspect 可以用來檢查流經迭代器的值:

fn inspect() {
    let v = vec![-1, 2, -3, 4, 5].into_iter();
    let _positive_numbers: Vec<i32> = v
        .inspect(|x| println!("Before filter: {}", x))
        .filter(|x: &i32| x.is_positive())
        .inspect(|x| println!("After filter: {}", x))
        .collect();
}

內容解密:

  • inspect 方法允許你在不改變迭代器的情況下檢查它的元素。
  • 在這個例子中,它用來在過濾前後列印元素的值。

map 用於將迭代器的元素從一種型別轉換為另一種:

fn map() {
    let v = vec!["Hello", "World", "!"].into_iter();
    let w: Vec<String> = v.map(String::from).collect();
}

內容解密:

  • map 方法將迭代器的每個元素轉換為另一種型別。
  • 在這個例子中,它將 &str 轉換為 String

還有許多其他有用的組合子,例如 filter_map, chain, 和 flatten 等。這些組合子可以有效地簡化程式碼,使其更具可讀性。

圖表翻譯:

下圖展示了不同組合子的作用流程:

  graph LR;
    A["vec![1, 2, 3]"] -->|into_iter|> B["Iterator"];
    B -->|map|> C["Mapped Iterator"];
    C -->|filter|> D["Filtered Iterator"];
    D -->|collect|> E["Vec"];

圖表翻譯: 此圖示展示了從原始向量到經過多個組合子處理後的最終向量的流程。首先,將向量轉換為迭代器,然後透過 mapfilter 等操作,最終收集結果到一個新的向量中。整個過程清晰地展示了組合子的鏈式呼叫和資料流動。

總之,組合子提供了一種優雅且高效的方式來處理集合和迭代器。它們使得程式碼更加簡潔、易讀,並且減少了錯誤的可能性。在實際開發中,熟練掌握和使用組合子可以顯著提高程式碼的品質和可維護性。