在 Rust 中,通道提供了一種在不同執行緒間安全地傳遞資料的機制。本文將逐步探討如何利用 Arc 和原子操作來建構一個安全且高效的通道,並進一步探討如何實作無分配的通道以及自定義 Arc 型別。首先,我們使用 Arc 封裝通道,讓 Sender 和 Receiver 能夠安全地跨執行緒共用通道例項。接著,我們引入生命週期(lifetimes)的概念,避免不必要的記憶體分配,並設計無分配的通道。最後,我們深入研究如何建構自定義的 Arc 型別,從基本參考計數開始,逐步完善其功能,並探討其記憶體管理策略。這些技術的結合,將有助於提升 Rust 程式在多執行緒環境下的效能和安全性。

透過型別系統提升安全性:自行實作通道(Channel)的進階技術探討

在前面的章節中,我們已經探討瞭如何實作一個簡單的通道(Channel),並且利用 Rust 的所有權(Ownership)系統來提升程式的安全性。在本章節中,我們將深入研究如何進一步改進這個實作,使其更加安全、方便且高效。

以 Arc 實作通道的封裝

首先,我們將探討如何使用 Arc(Atomic Reference Counting)來封裝我們的通道實作。這樣做的目的是確保通道的 SenderReceiver 能夠安全地跨多執行緒共用。

程式碼實作:

use std::sync::atomic::{AtomicBool, Ordering::*};
use std::sync::{Arc, Mutex};
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;

// 定義 Channel 結構
pub struct Channel<T> {
    message: UnsafeCell<MaybeUninit<T>>,
    ready: AtomicBool,
}

impl<T> Channel<T> {
    // 建立新的 Channel
    pub fn new() -> Self {
        Self {
            message: UnsafeCell::new(MaybeUninit::uninit()),
            ready: AtomicBool::new(false),
        }
    }
}

// 為 Channel 實作 Send 與 Sync
unsafe impl<T> Send for Channel<T> where T: Send {}
unsafe impl<T> Sync for Channel<T> where T: Send {}

// 定義 Sender 結構
pub struct Sender<T> {
    channel: Arc<Channel<T>>,
}

impl<T> Sender<T> {
    // 傳送訊息
    pub fn send(self, message: T) {
        unsafe { (*self.channel.message.get()).write(message) };
        self.channel.ready.store(true, Release);
    }
}

// 定義 Receiver 結構
pub struct Receiver<T> {
    channel: Arc<Channel<T>>,
}

impl<T> Receiver<T> {
    // 檢查是否有訊息準備好
    pub fn is_ready(&self) -> bool {
        self.channel.ready.load(Relaxed)
    }

    // 接收訊息
    pub fn receive(self) -> T {
        if !self.channel.ready.swap(false, Acquire) {
            panic!("no message available!");
        }
        unsafe { (*self.channel.message.get()).assume_init_read() }
    }
}

// 建立通道並傳回 Sender 與 Receiver
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
    let a = Arc::new(Channel::new());
    (Sender { channel: a.clone() }, Receiver { channel: a })
}

#### 內容解密:
上述程式碼展示瞭如何使用 `Arc` 來封裝 `Channel`,並將其與 `Sender`  `Receiver` 結構結合。這樣的設計使得 `Sender`  `Receiver` 能夠共用同一個 `Channel` 例項,並且能夠安全地跨執行緒使用。

1. **`Channel` 結構的定義**:`Channel` 結構包含一個 `message` 欄位用於儲存訊息,以及一個 `ready` 旗標用於表示是否有訊息準備好。
2. **`Sender`  `Receiver` 的實作**:`Sender`  `Receiver` 分別持有 `Channel`  `Arc` 參考,它們的方法用於傳送和接收訊息。
3. **`send` 方法**:`send` 方法將訊息寫入 `Channel` 中,並將 `ready` 旗標設為 `true`。這個方法會取得 `self` 的所有權,以確保每個 `Sender` 只能呼叫一次 `send`。
4. **`receive` 方法**:`receive` 方法檢查 `ready` 旗標,如果有訊息準備好,則讀取訊息並將 `ready` 旗標重設為 `false`。同樣地,這個方法也會取得 `self` 的所有權,以確保每個 `Receiver` 只能呼叫一次 `receive`。

### 使用範例:

```rust
fn main() {
    std::thread::scope(|s| {
        let (sender, receiver) = channel();
        let t = std::thread::current();
        s.spawn(move || {
            sender.send("hello world!");
            t.unpark();
        });
        while !receiver.is_ready() {
            std::thread::park();
        }
        assert_eq!(receiver.receive(), "hello world!");
    });
}

內容解密:

這個範例展示瞭如何使用我們實作的 channel 函式來建立一個通道,並在不同的執行緒之間傳遞訊息。Sender 在另一個執行緒中傳送訊息,而 Receiver 在主執行緒中等待並接收訊息。

最佳化和改進

雖然上述實作已經相當安全和方便,但仍有一些可以改進的地方。例如,目前的實作需要手動使用執行緒暫停(parking)來等待訊息,這在某些情況下可能不太方便。

進一步的改進方向:

  1. 避免分配記憶體:目前的實作使用了 Arc,這需要在堆積積(heap)上分配記憶體。我們可以設計另一種版本,讓使用者自行管理 Channel 的記憶體分配,從而避免這種開銷。
  2. 簡化使用介面:我們可以進一步簡化 SenderReceiver 的使用介面,使其更加直觀和方便。
  3. 提升效能:我們可以對目前的實作進行效能分析和最佳化,以提升其在不同場景下的表現。

未來,我們可以進一步探索更多的通道實作方案,例如使用其他同步機制或不同的記憶體管理策略。同時,我們也可以將這個實作應用到更多的實際場景中,以驗證其效能和可靠性。

參考資料

實作無分配的通道(Channel)

在前面的章節中,我們探討了使用 Arc 實作的通道(Channel)。現在,我們將透過引入生命週期(lifetimes)的概念來進一步最佳化通道的實作,以避免不必要的記憶體分配。

結構定義與實作

我們首先定義通道的結構,包括 ChannelSenderReceiver。這些結構將使用生命週期引數,以確保 SenderReceiver 正確地借用 Channel

pub struct Channel<T> {
    message: UnsafeCell<MaybeUninit<T>>,
    ready: AtomicBool,
}

pub struct Sender<'a, T> {
    channel: &'a Channel<T>,
}

pub struct Receiver<'a, T> {
    channel: &'a Channel<T>,
}

內容解密:

  • Channel<T> 結構包含一個 message 欄位,用於儲存待傳送的訊息,以及一個 ready 欄位,用於指示訊息是否準備好。
  • SenderReceiver 結構都包含一個對 Channel 的參照,它們的生命週期與 Channel 繫結。

建立通道及其元件

不同於之前使用 channel() 函式建立 SenderReceiver 對,我們現在使用 Channel::new 方法建立 Channel 物件,並透過 split 方法生成 SenderReceiver

impl<T> Channel<T> {
    pub const fn new() -> Self {
        Self {
            message: UnsafeCell::new(MaybeUninit::uninit()),
            ready: AtomicBool::new(false),
        }
    }

    pub fn split<'a>(&'a mut self) -> (Sender<'a, T>, Receiver<'a, T>) {
        *self = Self::new();
        (Sender { channel: self }, Receiver { channel: self })
    }
}

內容解密:

  • Channel::new 方法建立一個新的 Channel 例項,將 message 初始化為未初始化狀態,並將 ready 設為 false
  • split 方法將 Channel 例項拆分為 SenderReceiver。它首先重置 Channel 狀態為初始狀態,然後建立 SenderReceiver 例項。

傳送與接收訊息

SenderReceiver 的實作如下:

impl<T> Sender<'_, T> {
    pub fn send(self, message: T) {
        unsafe { (*self.channel.message.get()).write(message) };
        self.channel.ready.store(true, Release);
    }
}

impl<T> Receiver<'_, T> {
    pub fn is_ready(&self) -> bool {
        self.channel.ready.load(Relaxed)
    }

    pub fn receive(self) -> T {
        if !self.channel.ready.swap(false, Acquire) {
            panic!("no message available!");
        }
        unsafe { (*self.channel.message.get()).assume_init_read() }
    }
}

內容解密:

  • Sender::send 方法將訊息寫入 Channel,並將 ready 設為 true,表示訊息已準備好。
  • Receiver::is_ready 方法檢查 Channel 中的訊息是否準備好。
  • Receiver::receive 方法讀取 Channel 中的訊息,並將 ready 重置為 false。如果沒有訊息可用,則會引發 panic。

清理與測試

ChannelDrop 實作確保在銷毀時正確地處理訊息:

impl<T> Drop for Channel<T> {
    fn drop(&mut self) {
        if *self.ready.get_mut() {
            unsafe { self.message.get_mut().assume_init_drop() }
        }
    }
}

測試程式碼如下:

fn main() {
    let mut channel = Channel::new();
    thread::scope(|s| {
        let (sender, receiver) = channel.split();
        let t = thread::current();
        s.spawn(move || {
            sender.send("hello world!");
            t.unpark();
        });
        while !receiver.is_ready() {
            thread::park();
        }
        assert_eq!(receiver.receive(), "hello world!");
    });
}

內容解密:

  • 測試程式碼建立了一個 Channel 例項,並線上程範圍內拆分為 SenderReceiver
  • 傳送執行緒傳送訊息並喚醒接收執行緒。
  • 接收執行緒等待訊息準備好後接收並驗證訊息內容。

新增阻塞介面

為了改進通道的易用性,我們將新增阻塞介面,使接收者能夠在沒有訊息時阻塞等待。

use std::thread::Thread;

pub struct Sender<'a, T> {
    channel: &'a Channel<T>,
    receiving_thread: Thread,
}

內容解密:

  • Sender 結構新增了一個 receiving_thread 欄位,用於儲存接收執行緒的控制程式碼,以便在傳送訊息時喚醒接收執行緒。

透過上述改進,我們實作了一個無分配且帶有阻塞介面的通道,能夠有效地線上程之間傳遞訊息。

建構自定義的「Arc」

在「參考計數」章節中,我們探討了 std::sync::Arc<T> 型別,它透過參考計數實作了分享所有權。Arc::new 函式會建立新的記憶體分配,就像 Box::new 一樣。然而,與 Box 不同,複製 Arc 會分享原有的記憶體分配,而不會建立新的。只有當最後一個 Arc 及其所有複製物件都被丟棄時,分享的記憶體分配才會被釋放。

實作 Arc<T> 涉及的記憶體排序考量非常有趣。在本章中,我們將進一步實踐理論,實作我們自己的 Arc<T>。我們將從基本版本開始,然後擴充套件它以支援弱指標用於環狀結構,最後以一個幾乎與標準函式庫實作相同的最佳化版本結束本章。

基本參考計數

我們的第一個版本將使用單一 AtomicUsize 來計算分享同一記憶體分配的 Arc 物件數量。讓我們從一個包含此計數器和 T 物件的結構體開始:

struct ArcData<T> {
    ref_count: AtomicUsize,
    data: T,
}

內容解密:

此結構體 ArcData<T> 不是公開的,它是我們 Arc 實作的內部實作細節。其中 ref_count 欄位用於記錄有多少個 Arc 物件正在分享此 ArcData<T>,而 data 則儲存實際的 T 物件。

接下來是 Arc<T> 結構體本身,它本質上只是一個指向(分享)ArcData<T> 物件的指標。

pub struct Arc<T> {
    ptr: *const ArcData<T>,
}

內容解密:

Arc<T> 結構體包含一個指向 ArcData<T> 的指標 ptr。由於 ArcData<T> 是在堆積上分配的,因此 Arc<T> 可以透過這個指標來存取和操作 ArcData<T>

建立和複製 Arc

為了建立一個新的 Arc,我們需要實作 Arc::new 方法,這個方法會在堆積上分配一個新的 ArcData<T>

impl<T> Arc<T> {
    pub fn new(data: T) -> Self {
        let arc_data = ArcData {
            ref_count: AtomicUsize::new(1),
            data,
        };
        let ptr = Box::into_raw(Box::new(arc_data));
        Arc { ptr }
    }
}

內容解密:

Arc::new 方法中,我們首先建立一個新的 ArcData<T> 例項,將 ref_count 初始化為 1,因為最初只有一個 Arc 物件指向它。然後,我們使用 Box::into_rawArcData<T> 轉換為一個原始指標,並將其儲存在 Arc<T> 結構體中。

Arc 被複製時,我們需要增加 ref_count

impl<T> Clone for Arc<T> {
    fn clone(&self) -> Self {
        unsafe {
            (*self.ptr).ref_count.fetch_add(1, Ordering::Relaxed);
        }
        Arc { ptr: self.ptr }
    }
}

內容解密:

clone 方法中,我們使用 fetch_add 原子操作來增加 ref_count。這裡使用 Ordering::Relaxed 是因為我們只關心計數的正確性,而不涉及其他同步操作。

丟棄 Arc

Arc 被丟棄時,我們需要減少 ref_count,並在計數歸零時釋放 ArcData<T>

impl<T> Drop for Arc<T> {
    fn drop(&mut self) {
        unsafe {
            if (*self.ptr).ref_count.fetch_sub(1, Ordering::Release) == 1 {
                fence(Ordering::Acquire);
                drop(Box::from_raw(self.ptr as *mut ArcData<T>));
            }
        }
    }
}

內容解密:

drop 方法中,我們使用 fetch_sub 減少 ref_count。如果減少後計數為 1,表示當前 Arc 是最後一個指向 ArcData<T> 的物件,因此我們需要釋放 ArcData<T>。在這之前,我們使用 fence(Ordering::Acquire) 來確保任何對 ArcData<T> 的存取都已完成。最後,我們使用 Box::from_raw 將原始指標轉換回 Box 並丟棄它,從而釋放 ArcData<T>

未來方向

在未來的章節中,我們可以進一步擴充套件這個 Arc 實作,例如新增對弱指標的支援,以允許建立環狀結構。此外,我們還可以探討如何最佳化 Arc 的效能,使其更加接近標準函式庫中的實作。

  graph LR
    A[Arc<T>] -->|ptr|> B[ArcData<T>]
    B -->|ref_count|> C[參考計數]
    B -->|data|> D[T 物件]

圖表翻譯: 此圖示展示了 Arc<T>ArcData<T> 之間的關係。Arc<T> 透過指標 ptr 指向 ArcData<T>,而 ArcData<T> 包含了參考計數 ref_count 和實際的 T 物件 data。透過這種結構,Arc<T> 能夠實作分享所有權和自動記憶體管理。