Rust 的 crossbeam 函式庫提供了一種高效的多執行緒通道實作,可用於構建任務佇列,以提升程式效能。透過 crossbeam::channel::unbounded() 函式,可以建立一個無界通道,用於線上程之間傳遞任務和結果。工作執行緒從通道接收任務,執行後將結果送回主執行緒。這個過程有效地利用了多核心處理器的優勢,平行處理任務,從而縮短了程式執行時間。相較於其他多執行緒處理方式,crossbeam 的通道機制更輕量,且易於使用,適合處理大量的小型任務。

通道的基本使用

本文使用了 crossbeam 函式庫中的通道實作,而不是 Rust 標準函式庫中的 std::sync::mpsc 模組。兩者提供相同的 API,但 crossbeam 提供了更多功能和靈活性。

以下是一個簡單的通道示例:

use crossbeam::channel;

fn main() {
    let (tx, rx) = channel::unbounded();

    tx.send(42).unwrap();
    println!("Received: {:?}", rx.recv().unwrap());
}

這個示例建立了一個無界通道,然後透過傳送端將資料 42 傳送到通道中。接收端從通道中接收資料並列印預出來。

有界通道和無界通道

crossbeam 函式庫提供了有界通道和無界通道兩種型別。有界通道在記憶體使用方面具有確定性,但當生產者需要等待空間可用時會阻塞。無界通道不會阻塞生產者,但可能會導致記憶體使用量增加。

通道示例

以下是一個簡單的通道示例,展示瞭如何建立一個通道並透過它傳送和接收資料:

use crossbeam::channel;

fn main() {
    let (tx, rx) = channel::unbounded();

    tx.send(42).unwrap();
    println!("Received: {:?}", rx.recv().unwrap());
}

這個示例建立了一個無界通道,然後透過傳送端將資料 42 傳送到通道中。接收端從通道中接收資料並列印預出來。

圖表翻譯:

  graph LR
    A[傳送端] -->|傳送資料|> B[通道]
    B -->|接收資料|> C[接收端]
    C -->|列印結果|> D[終端]

這個圖表展示了傳送端、通道和接收端之間的資料流動過程。傳送端將資料傳送到通道中,接收端從通道中接收資料並列印預出來。

使用 Crossbeam 實作通道(Channel)通訊

在並發程式設計中,通道(Channel)是一種重要的同步原語,允許不同執行緒之間進行通訊。在 Rust 中,我們可以使用 Crossbeam 這個 crate 來實作通道通訊。

首先,我們需要在 Cargo.toml 中新增 Crossbeam 的依賴:

[dependencies]
crossbeam = "0.7"

然後,在我們的 Rust 程式碼中,我們可以使用 unbounded 函式來建立一個無界通道(Unbounded Channel):

use crossbeam::channel::unbounded;

fn main() {
    let (tx, rx) = unbounded();
    //...
}

這裡,tx 是傳送端(Sender),而 rx 是接收端(Receiver)。

接下來,我們可以使用 thread::spawn 來建立一個新執行緒,並在該執行緒中傳送一條訊息:

thread::spawn(move || {
    tx.send(42).unwrap();
});

在主執行緒中,我們可以使用 select! 宏來接收訊息:

select! {
    recv(rx) -> msg => println!("{:?}", msg),
}

這裡,recv 函式會阻塞直到有一條訊息可供接收,一旦接收到訊息,就會將其列印預出來。

完整程式碼

以下是完整的程式碼:

use std::thread;
use crossbeam::channel::unbounded;

fn main() {
    let (tx, rx) = unbounded();

    thread::spawn(move || {
        tx.send(42).unwrap();
    });

    select! {
        recv(rx) -> msg => println!("{:?}", msg),
    }
}

這個程式碼會建立一個新執行緒,並在該執行緒中傳送一條訊息,然後在主執行緒中接收該訊息並列印預出來。

Mermaid 圖表

以下是使用 Mermaid 圖表來展示這個過程:

  flowchart TD
    A[主執行緒] -->|建立通道|> B[通道]
    B -->|傳送端|> C[新執行緒]
    C -->|傳送訊息|> B
    B -->|接收端|> A
    A -->|接收訊息|> D[列印訊息]

這個圖表展示了主執行緒建立一個通道,然後建立一個新執行緒,並在該執行緒中傳送一條訊息。主執行緒然後接收該訊息並列印預出來。

通道(Channel)的基本概念

通道是一種允許不同執行緒之間進行通訊的機制。在Rust中,通道是透過crossbeam函式庫實作的。當建立一個通道時,會傳回一個Sender<T>和一個Receiver<T>,其中T是通道中傳遞的資料型別。

通道的使用

在使用通道時,我們可以使用select!巨集來等待多個通道中的訊息。這個巨集允許主執行緒阻塞並等待訊息的到來。需要注意的是,巨集可以定義自己的語法規則,這就是為什麼select!巨集可以使用像recv(rx) ->這樣的語法,而這種語法在標準Rust中是不合法的。

可以透過通道傳遞的資料

透過通道可以傳遞任何實作了Send特性的型別。這意味著我們可以透過通道傳遞任何可以安全地線上程之間傳遞的資料。需要注意的是,通道比簡單的位元組流([u8])更強大,因為它提供了完整的Rust型別系統。

雙向通訊

如果需要實作雙向通訊,可以建立兩套傳送者和接收者,一套用於每個方向。這種方法比使用單個通道實作雙向通訊更簡單。channels-complex專案提供了一個雙向通訊的例子,展示瞭如何使用兩個通道實作雙向通訊。

實作雙向通訊的例子

下面是channels-complex專案中的一個例子,展示瞭如何使用兩個通道實作雙向通訊:

use crossbeam::channel;

fn main() {
    let (tx1, rx1) = channel::unbounded();
    let (tx2, rx2) = channel::unbounded();

    //...
}

在這個例子中,我們建立了兩個通道,分別用於兩個方向的通訊。然後,我們可以使用select!巨集來等待多個通道中的訊息。

使用巨集進行訊息接收

下面是使用select!巨集進行訊息接收的例子:

use crossbeam::channel;

fn main() {
    let (tx1, rx1) = channel::unbounded();
    let (tx2, rx2) = channel::unbounded();

    select! {
        recv(rx1) -> msg => {
            println!("Received message: {:?}", msg);
        }
        recv(rx2) -> msg => {
            println!("Received message: {:?}", msg);
        }
    }
}

在這個例子中,我們使用select!巨集來等待兩個通道中的訊息。當有一個通道中有訊息到來時,巨集會執行相應的程式碼塊。

圖表翻譯:

  flowchart TD
    A[建立通道] --> B[傳回Sender<T>和Receiver<T>]
    B --> C[使用select!巨集等待訊息]
    C --> D[接收訊息]
    D --> E[處理訊息]

這個圖表展示了建立通道、傳回傳送者和接收者、使用select!巨集等待訊息、接收訊息和處理訊息的過程。

使用 Crossbeam 進行通道通訊

在 Rust 中,Crossbeam 是一個提供高效能、低延遲的並發性原語的 crate。它提供了多種工具,包括通道(channel),用於在不同執行緒之間進行通訊。在這個範例中,我們將使用 Crossbeam 的 unbounded 函式建立兩個通道:一個用於傳送請求,另一個用於接收回應。

通道建立

首先,我們需要建立兩個通道:requests_txrequests_rx,分別用於傳送和接收請求;以及 responses_txresponses_rx,分別用於傳送和接收回應。

let (requests_tx, requests_rx) = unbounded();
let (responses_tx, responses_rx) = unbounded();

定義列舉

接下來,我們定義一個列舉 ConnectivityCheck,它代表了三種不同的連線檢查型別:Ping、Pong 和 Pang。

#[derive(Debug)]
enum ConnectivityCheck {
    Ping,
    Pong,
    Pang,
}

主函式

main 函式中,我們指定了要傳送的訊息數量 n_messages,然後建立了兩個通道,如上所述。

fn main() {
    let n_messages = 3;
    //...
}

執行緒間通訊

使用 Crossbeam 的通道,可以輕鬆地在不同執行緒之間進行通訊。以下是如何在一個新執行緒中傳送和接收訊息的簡單範例:

use crossbeam::channel::unbounded;
use std::thread;

fn main() {
    let (tx, rx) = unbounded();

    thread::spawn(move || {
        let msg = ConnectivityCheck::Ping;
        tx.send(msg).unwrap();
    });

    let received_msg = rx.recv().unwrap();
    println!("Received: {:?}", received_msg);
}

在這個範例中,我們建立了一個通道,然後在一個新執行緒中發送了一個 ConnectivityCheck::Ping 訊息。主執行緒等待並接收這個訊息,然後將其列印預出來。

結合所有程式碼

最終,完整的程式碼結構應該如下所示:

use crossbeam::channel::unbounded;
use std::thread;

#[derive(Debug)]
enum ConnectivityCheck {
    Ping,
    Pong,
    Pang,
}

fn main() {
    let n_messages = 3;
    let (requests_tx, requests_rx) = unbounded();
    let (responses_tx, responses_rx) = unbounded();

    // 在這裡實作執行緒間的通訊
    thread::spawn(move || {
        for _ in 0..n_messages {
            let msg = ConnectivityCheck::Ping; // 或 Pong、Pang
            requests_tx.send(msg).unwrap();
        }
    });

    for _ in 0..n_messages {
        let received_msg = requests_rx.recv().unwrap();
        println!("Received: {:?}", received_msg);
        // 處理接收到的訊息,並可透過 responses_tx 發送回應
    }
}

這個範例展示瞭如何使用 Crossbeam 的通道進行執行緒間通訊,以及如何定義和使用列舉來代表不同型別的連線檢查。

自定義訊息型別的優點

在設計複雜的訊息傳遞系統時,自定義訊息型別可以大大簡化後續的訊息解釋過程。透過定義一個明確的訊息結構,可以讓系統更容易地辨識和處理不同的訊息型別,從而提高系統的可靠性和效率。

多緒處理和訊息傳遞

在多緒(thread)環境中,訊息傳遞是一個非常重要的機制。以下是一個簡單的範例,展示如何使用 Rust 的 threadmpsc (多產者單消費者)通道來實作訊息傳遞:

use std::thread;
use std::sync::mpsc;

// 定義訊息型別
enum Message {
    Ping,
    Pong,
    Pang,
}

fn main() {
    // 建立一個通道
    let (requests_tx, requests_rx) = mpsc::channel();

    // 建立一個新的執行緒
    thread::spawn(move || {
        loop {
            // 接收訊息
            match requests_rx.recv().unwrap() {
                Message::Pong => eprintln!("unexpected pong response"),
                Message::Ping => {
                    // 傳送反饋訊息
                    let (responses_tx, _) = mpsc::channel();
                    responses_tx.send(Message::Pong).unwrap();
                }
                Message::Pang => return,
            }
        }
    });

    // 傳送訊息
    for _ in 0..10 {
        requests_tx.send(Message::Ping).unwrap();
    }
}

在這個範例中,我們定義了一個 Message 列舉,代表不同的訊息型別。然後,我們建立了一個通道和一個新的執行緒,負責接收和處理訊息。在主執行緒中,我們傳送多個 Ping 訊息到通道中。

Mermaid 圖表:訊息傳遞流程

  sequenceDiagram
    participant 主執行緒 as Main Thread
    participant 子執行緒 as Worker Thread
    participant 通道 as Channel

    Note over 主執行緒,子執行緒: 建立通道和子執行緒
    主執行緒->>通道: 傳送 Ping 訊息
    通道->>子執行緒: 接收 Ping 訊息
    子執行緒->>通道: 傳送 Pong 訊息
    通道->>主執行緒: 接收 Pong 訊息

圖表翻譯:

這個 Mermaid 圖表展示了訊息傳遞的流程。首先,主執行緒建立了一個通道和一個子執行緒。然後,主執行緒傳送一個 Ping 訊息到通道中。子執行緒接收到這個訊息,並傳送一個 Pong 訊息回應。最後,主執行緒接收到 Pong 訊息,並完成了訊息傳遞的迴圈。

內容解密:

在這個範例中,我們使用 Rust 的 threadmpsc 來實作訊息傳遞。mpsc 通道提供了一種安全的方式來在不同執行緒之間傳遞資料。透過定義一個明確的訊息型別,可以讓系統更容易地辨識和處理不同的訊息型別。這個範例展示瞭如何使用 threadmpsc 來實作一個簡單的訊息傳遞系統。

實作任務佇列

在討論了頻道(channel)後,現在是時候將其應用於最初在清單 10.18 中提出的問題。您會發現接下來在清單 10.28 中的程式碼比清單 10.24 中的平行迭代器方法複雜得多。

以下清單顯示了渲染十六進位制的頻道基礎任務佇列實作的中繼資料。該清單的來源位於 ch10/ch10-render-hex-threadpool/Cargo.toml

[package]
name = "render-hex"
version = "0.1.0"
edition = "2018"

[dependencies]
svg = "0.6"
crossbeam = "0.7"

以下清單著重於 parse() 函式。其餘的程式碼與清單 10.18 相同。您可以在 ch10/ch10-render-hex-threadpool/src/main.rs 中找到以下清單的程式碼。

use std::thread;
use std::env;

use crossbeam::channel::{unbounded};

內容解密:

這段程式碼使用 Rust 的 crossbeam 函式庫來建立一個無界頻道(unbounded channel),以實作任務佇列。unbounded 函式傳回一個無界頻道的傳送器和接收器。這個頻道可以用來在不同執行緒之間傳遞任務。

圖表翻譯:

  flowchart TD
    A[主執行緒] -->|建立頻道|> B[無界頻道]
    B -->|傳送任務|> C[工作執行緒]
    C -->|接收任務|> D[任務處理]
    D -->|傳回結果|> E[主執行緒]

這個流程圖描述了主執行緒建立一個無界頻道,然後傳送任務到工作執行緒。工作執行緒接收任務,處理任務,然後傳回結果給主執行緒。

程式碼實作示例:

let (tx, rx) = unbounded();
tx.send("任務1").unwrap();
tx.send("任務2").unwrap();

for _ in 0..2 {
    let msg = rx.recv().unwrap();
    println!("接收到任務:{}", msg);
}

這個例子展示瞭如何使用無界頻道傳送和接收任務。

圖表翻譯:

  sequenceDiagram
    participant 主執行緒 as 主執行緒
    participant 工作執行緒 as 工作執行緒
    participant 頻道 as 頻道

    主執行緒->>頻道: 傳送任務1
    頻道->>工作執行緒: 傳遞任務1
    工作執行緒->>頻道: 傳回結果1
    頻道->>主執行緒: 傳遞結果1

    主執行緒->>頻道: 傳送任務2
    頻道->>工作執行緒: 傳遞任務2
    工作執行緒->>頻道: 傳回結果2
    頻道->>主執行緒: 傳遞結果2

這個序列圖描述了主執行緒和工作執行緒之間的溝透過程,展示瞭如何使用無界頻道傳遞任務和結果。

多執行緒與控制流程

在 Rust 中,控制流程可以被視為一個表示式,這使得語言具有更大的彈性。例如,loop 關鍵字可以用於建立迴圈,而這個迴圈可以根據特定的條件進行終止。

使用 loop 關鍵字

loop 關鍵字是 Rust 中的一種基本控制流程結構,它允許程式設計師建立無限迴圈。然而,透過使用 breakreturn 等關鍵字,可以從迴圈中離開。

處理多執行緒

在多執行緒環境中,程式需要能夠處理不同執行緒之間的溝通和同步。Rust 的標準函式庫提供了 std::thread 模組,用於建立和管理執行緒。

使用 Crossbeam

Crossbeam 是一個 Rust 的外部函式庫,提供了高階別的多執行緒原語。它允許程式設計師建立和管理執行緒,並提供了一系列工具用於執行緒之間的同步和溝通。

處理工作

在多執行緒程式中,工作(Work)通常被定義為一個列舉(enum),它代表了執行緒需要執行的任務。例如:

enum Work {
    Task((usize, u8)),
    Finished,
}

這個列舉定義了兩種工作型別:TaskFinishedTask 代表了一個需要執行的任務,它包含了一個元組 (usize, u8);而 Finished 代表了工作已經完成。

解析 byte

在某些情況下,程式需要解析 byte 並根據其值執行不同的動作。例如:

fn parse_byte(byte: u8) -> Operation {
    match byte {
        b'0' => Home,
        //...
    }
}

這個函式根據輸入的 byte 值傳回一個 Operation 列舉值。

內容解密:

上述程式碼展示瞭如何使用 loop 關鍵字和 match 陳述式來控制程式的流程。同時,它也展示瞭如何使用列舉來定義工作型別和解析 byte 值。

圖表翻譯:

  flowchart TD
    A[開始] --> B[解析 byte]
    B --> C[執行任務]
    C --> D[完成]
    D --> E[終止]

這個圖表展示了程式的流程:從開始到解析 byte,再到執行任務,最後完成並終止。

處理字元輸入並生成操作序列

在這個程式碼片段中,我們看到了一個 Rust 函式 parse,它負責將輸入的字串轉換成一系列的操作。這些操作是根據輸入的字元來決定,例如數字、字母等。

處理數字輸入

當輸入的字元是數字(b'1'b'9')時,程式碼會計算出需要移動的距離,並呼叫 Forward 函式來執行這個動作。距離的計算是根據輸入的數字減去 0x30(即 ASCII 中數字 0 的編碼),然後乘以 HEIGHT/10

let distance = (byte - 0x30) as isize;
Forward(distance * (HEIGHT/10))

處理字母輸入

當輸入的字元是字母(b'a'b'b'b'c'b'd'b'e'b'f')時,程式碼會根據字母的型別呼叫不同的函式。例如,當輸入的是 b'a'b'b'b'c' 時,會呼叫 TurnLeft 函式;當輸入的是 b'd'b'e'b'f' 時,會呼叫 TurnRight 函式。

b'a' | b'b' | b'c' => TurnLeft,
b'd' | b'e' | b'f' => TurnRight,

處理其他輸入

當輸入的字元不是數字或字母時,程式碼會呼叫 Noop 函式,並傳入原始的字元。

_ => Noop(byte),

多執行緒處理

parse 函式中,我們看到了一個多執行緒的實作。程式碼建立了兩個無界通道(unbounded):todo_txtodo_rx,以及 results_txresults_rx。這些通道用於在多個執行緒之間傳遞資料。

let (todo_tx, todo_rx) = unbounded();
let (results_tx, results_rx) = unbounded();

結果

最終,程式碼會傳回一個包含操作序列的向量。

fn parse(input: &str) -> Vec<Operation> {
    //...
}

圖表翻譯:

以下是對這個程式碼的流程圖表的視覺化呈現:

  flowchart TD
    A[輸入字串] --> B[解析字元]
    B --> C{是數字?}
    C -->|是|> D[計算距離]
    D --> E[呼叫 Forward]
    C -->|否|> F{是字母?}
    F -->|是|> G[呼叫 TurnLeft 或 TurnRight]
    F -->|否|> H[呼叫 Noop]
    G --> I[傳回操作序列]
    H --> I

這個圖表展示了程式碼的主要流程,包括解析字元、計算距離、呼叫函式等步驟。

內容解密:

上述程式碼是 Rust 語言實作的多執行緒任務處理系統的一部分。下面是逐步解說:

  1. 列舉輸入bytesfor (i, byte) in input.bytes().enumerate() 這行程式碼對輸入的 bytes 進行列舉,同時取得每個 byte 的索引 i 和 byte 值 byte

  2. 傳送任務todo_tx.send(Work::Task((i, byte))).unwrap(); 將每個 byte 以 Work::Task 的形式傳送給 todo_tx,這是一個用於傳遞任務的通道。unwrap 方法用於處理可能的錯誤,如果傳送失敗,程式將會 panic。

  3. 計數 bytesn_bytes += 1; 計數目前已經處理的 bytes 數量。

  4. 傳送完成訊號:第二個迴圈 for _ in 0..n_threads 中,程式傳送 Work::Finished 訊號給每個執行緒,表示任務已經全部傳送完畢。

  5. 啟動工作執行緒:第三個迴圈 for _ in 0..n_threads 中,程式為每個執行緒建立一個新的工作執行緒。每個執行緒都會接收任務並處理它們。

  6. 執行緒內容:在每個執行緒中,程式會不斷接收任務並處理,直到接收到 Work::Finished 訊號。這部分的實作並沒有在給出的程式碼片段中顯示,但一般來說,執行緒會不斷地從 todo_rx 通道中接收任務,並將結果傳送給 results_tx 通道。

  7. 執行緒啟動thread::spawn(move || {... }); 啟動一個新的執行緒,並將 todo_rxresults_tx 的複製品移動到新的執行緒中,以便它們可以獨立地運作。

這段程式碼展示瞭如何使用 Rust 的標準函式庫來實作一個簡單的多執行緒任務處理系統,包括任務的分發、接收和處理。然而,這段程式碼似乎缺乏對於任務結果的處理和錯誤控制的詳細實作。

圖表翻譯:

  flowchart TD
    A[輸入列舉] --> B[傳送任務]
    B --> C[計數bytes]
    C --> D[傳送完成訊號]
    D --> E[啟動工作執行緒]
    E --> F[執行緒內容: 接收任務、處理任務、傳送結果]
    F --> G[執行緒啟動]

上述Mermaid圖表描述了多執行緒任務處理系統的流程,從輸入列舉、傳送任務、計數bytes、傳送完成訊號,到啟動工作執行緒和執行緒內容的處理。這個圖表簡單地展示了系統中各部分之間的邏輯關係和執行順序。

處理任務結果並更新操作列表

在這個部分,我們將會處理從工作執行緒傳回的任務結果,並根據這些結果更新操作列表。

從效能最佳化視角來看,使用 Crossbeam 的通道(Channel)構建任務佇列,相較於簡單的平行迭代器,雖然程式碼複雜度有所提升,但在處理大量資料,特別是 I/O 密集型任務時,能更有效地利用多核心 CPU 的效能。通道的非同步特性允許工作執行緒在等待資料的同時執行其他任務,避免了執行緒阻塞造成的效能損耗。然而,通道的管理和同步也帶來了額外的開銷,例如任務的傳送、接收和佇列管理。對於少量資料或計算密集型任務,平行迭代器可能更具效率。技術限制深析顯示,無界通道雖然簡化了程式碼,但存在潛在的記憶體溢位風險。在實際應用中,應根據資料量和系統資源限制,謹慎選擇有界或無界通道。此外,任務粒度的大小也會影響整體效能。過小的粒度會增加通道通訊的開銷,而過大的粒度則可能無法充分利用多核心效能。未來,隨著 Rust 非同步生態的發展,async/await 模式與通道的結合將進一步簡化程式碼,並提升效能。玄貓認為,對於需要高吞吐量和低延遲的並發應用,根據 Crossbeam 通道的任務佇列是值得深入研究和應用的解決方案,但需仔細權衡通道型別、任務粒度和錯誤處理策略,以達到最佳的效能和穩定性。