在 Rust 中,通道提供了一種在不同執行緒間安全地傳遞資料的機制。本文將逐步探討如何利用 Arc 和原子操作來建構一個安全且高效的通道,並進一步探討如何實作無分配的通道以及自定義 Arc 型別。首先,我們使用 Arc 封裝通道,讓 Sender 和 Receiver 能夠安全地跨執行緒共用通道例項。接著,我們引入生命週期(lifetimes)的概念,避免不必要的記憶體分配,並設計無分配的通道。最後,我們深入研究如何建構自定義的 Arc 型別,從基本參考計數開始,逐步完善其功能,並探討其記憶體管理策略。這些技術的結合,將有助於提升 Rust 程式在多執行緒環境下的效能和安全性。
透過型別系統提升安全性:自行實作通道(Channel)的進階技術探討
在前面的章節中,我們已經探討瞭如何實作一個簡單的通道(Channel),並且利用 Rust 的所有權(Ownership)系統來提升程式的安全性。在本章節中,我們將深入研究如何進一步改進這個實作,使其更加安全、方便且高效。
以 Arc 實作通道的封裝
首先,我們將探討如何使用 Arc(Atomic Reference Counting)來封裝我們的通道實作。這樣做的目的是確保通道的 Sender 和 Receiver 能夠安全地跨多執行緒共用。
程式碼實作:
use std::sync::atomic::{AtomicBool, Ordering::*};
use std::sync::{Arc, Mutex};
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
// 定義 Channel 結構
pub struct Channel<T> {
message: UnsafeCell<MaybeUninit<T>>,
ready: AtomicBool,
}
impl<T> Channel<T> {
// 建立新的 Channel
pub fn new() -> Self {
Self {
message: UnsafeCell::new(MaybeUninit::uninit()),
ready: AtomicBool::new(false),
}
}
}
// 為 Channel 實作 Send 與 Sync
unsafe impl<T> Send for Channel<T> where T: Send {}
unsafe impl<T> Sync for Channel<T> where T: Send {}
// 定義 Sender 結構
pub struct Sender<T> {
channel: Arc<Channel<T>>,
}
impl<T> Sender<T> {
// 傳送訊息
pub fn send(self, message: T) {
unsafe { (*self.channel.message.get()).write(message) };
self.channel.ready.store(true, Release);
}
}
// 定義 Receiver 結構
pub struct Receiver<T> {
channel: Arc<Channel<T>>,
}
impl<T> Receiver<T> {
// 檢查是否有訊息準備好
pub fn is_ready(&self) -> bool {
self.channel.ready.load(Relaxed)
}
// 接收訊息
pub fn receive(self) -> T {
if !self.channel.ready.swap(false, Acquire) {
panic!("no message available!");
}
unsafe { (*self.channel.message.get()).assume_init_read() }
}
}
// 建立通道並傳回 Sender 與 Receiver
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let a = Arc::new(Channel::new());
(Sender { channel: a.clone() }, Receiver { channel: a })
}
#### 內容解密:
上述程式碼展示瞭如何使用 `Arc` 來封裝 `Channel`,並將其與 `Sender` 和 `Receiver` 結構結合。這樣的設計使得 `Sender` 和 `Receiver` 能夠共用同一個 `Channel` 例項,並且能夠安全地跨執行緒使用。
1. **`Channel` 結構的定義**:`Channel` 結構包含一個 `message` 欄位用於儲存訊息,以及一個 `ready` 旗標用於表示是否有訊息準備好。
2. **`Sender` 與 `Receiver` 的實作**:`Sender` 和 `Receiver` 分別持有 `Channel` 的 `Arc` 參考,它們的方法用於傳送和接收訊息。
3. **`send` 方法**:`send` 方法將訊息寫入 `Channel` 中,並將 `ready` 旗標設為 `true`。這個方法會取得 `self` 的所有權,以確保每個 `Sender` 只能呼叫一次 `send`。
4. **`receive` 方法**:`receive` 方法檢查 `ready` 旗標,如果有訊息準備好,則讀取訊息並將 `ready` 旗標重設為 `false`。同樣地,這個方法也會取得 `self` 的所有權,以確保每個 `Receiver` 只能呼叫一次 `receive`。
### 使用範例:
```rust
fn main() {
std::thread::scope(|s| {
let (sender, receiver) = channel();
let t = std::thread::current();
s.spawn(move || {
sender.send("hello world!");
t.unpark();
});
while !receiver.is_ready() {
std::thread::park();
}
assert_eq!(receiver.receive(), "hello world!");
});
}
內容解密:
這個範例展示瞭如何使用我們實作的 channel 函式來建立一個通道,並在不同的執行緒之間傳遞訊息。Sender 在另一個執行緒中傳送訊息,而 Receiver 在主執行緒中等待並接收訊息。
最佳化和改進
雖然上述實作已經相當安全和方便,但仍有一些可以改進的地方。例如,目前的實作需要手動使用執行緒暫停(parking)來等待訊息,這在某些情況下可能不太方便。
進一步的改進方向:
- 避免分配記憶體:目前的實作使用了
Arc,這需要在堆積積(heap)上分配記憶體。我們可以設計另一種版本,讓使用者自行管理Channel的記憶體分配,從而避免這種開銷。 - 簡化使用介面:我們可以進一步簡化
Sender和Receiver的使用介面,使其更加直觀和方便。 - 提升效能:我們可以對目前的實作進行效能分析和最佳化,以提升其在不同場景下的表現。
未來,我們可以進一步探索更多的通道實作方案,例如使用其他同步機制或不同的記憶體管理策略。同時,我們也可以將這個實作應用到更多的實際場景中,以驗證其效能和可靠性。
參考資料
- Rust 程式語言官方檔案:https://doc.rust-lang.org/
- Rust Atomics and Locks:https://marabos.nl/atomics/
實作無分配的通道(Channel)
在前面的章節中,我們探討了使用 Arc 實作的通道(Channel)。現在,我們將透過引入生命週期(lifetimes)的概念來進一步最佳化通道的實作,以避免不必要的記憶體分配。
結構定義與實作
我們首先定義通道的結構,包括 Channel、Sender 和 Receiver。這些結構將使用生命週期引數,以確保 Sender 和 Receiver 正確地借用 Channel。
pub struct Channel<T> {
message: UnsafeCell<MaybeUninit<T>>,
ready: AtomicBool,
}
pub struct Sender<'a, T> {
channel: &'a Channel<T>,
}
pub struct Receiver<'a, T> {
channel: &'a Channel<T>,
}
內容解密:
Channel<T>結構包含一個message欄位,用於儲存待傳送的訊息,以及一個ready欄位,用於指示訊息是否準備好。Sender和Receiver結構都包含一個對Channel的參照,它們的生命週期與Channel繫結。
建立通道及其元件
不同於之前使用 channel() 函式建立 Sender 和 Receiver 對,我們現在使用 Channel::new 方法建立 Channel 物件,並透過 split 方法生成 Sender 和 Receiver。
impl<T> Channel<T> {
pub const fn new() -> Self {
Self {
message: UnsafeCell::new(MaybeUninit::uninit()),
ready: AtomicBool::new(false),
}
}
pub fn split<'a>(&'a mut self) -> (Sender<'a, T>, Receiver<'a, T>) {
*self = Self::new();
(Sender { channel: self }, Receiver { channel: self })
}
}
內容解密:
Channel::new方法建立一個新的Channel例項,將message初始化為未初始化狀態,並將ready設為false。split方法將Channel例項拆分為Sender和Receiver。它首先重置Channel狀態為初始狀態,然後建立Sender和Receiver例項。
傳送與接收訊息
Sender 和 Receiver 的實作如下:
impl<T> Sender<'_, T> {
pub fn send(self, message: T) {
unsafe { (*self.channel.message.get()).write(message) };
self.channel.ready.store(true, Release);
}
}
impl<T> Receiver<'_, T> {
pub fn is_ready(&self) -> bool {
self.channel.ready.load(Relaxed)
}
pub fn receive(self) -> T {
if !self.channel.ready.swap(false, Acquire) {
panic!("no message available!");
}
unsafe { (*self.channel.message.get()).assume_init_read() }
}
}
內容解密:
Sender::send方法將訊息寫入Channel,並將ready設為true,表示訊息已準備好。Receiver::is_ready方法檢查Channel中的訊息是否準備好。Receiver::receive方法讀取Channel中的訊息,並將ready重置為false。如果沒有訊息可用,則會引發 panic。
清理與測試
Channel 的 Drop 實作確保在銷毀時正確地處理訊息:
impl<T> Drop for Channel<T> {
fn drop(&mut self) {
if *self.ready.get_mut() {
unsafe { self.message.get_mut().assume_init_drop() }
}
}
}
測試程式碼如下:
fn main() {
let mut channel = Channel::new();
thread::scope(|s| {
let (sender, receiver) = channel.split();
let t = thread::current();
s.spawn(move || {
sender.send("hello world!");
t.unpark();
});
while !receiver.is_ready() {
thread::park();
}
assert_eq!(receiver.receive(), "hello world!");
});
}
內容解密:
- 測試程式碼建立了一個
Channel例項,並線上程範圍內拆分為Sender和Receiver。 - 傳送執行緒傳送訊息並喚醒接收執行緒。
- 接收執行緒等待訊息準備好後接收並驗證訊息內容。
新增阻塞介面
為了改進通道的易用性,我們將新增阻塞介面,使接收者能夠在沒有訊息時阻塞等待。
use std::thread::Thread;
pub struct Sender<'a, T> {
channel: &'a Channel<T>,
receiving_thread: Thread,
}
內容解密:
Sender結構新增了一個receiving_thread欄位,用於儲存接收執行緒的控制程式碼,以便在傳送訊息時喚醒接收執行緒。
透過上述改進,我們實作了一個無分配且帶有阻塞介面的通道,能夠有效地線上程之間傳遞訊息。
建構自定義的「Arc」
在「參考計數」章節中,我們探討了 std::sync::Arc<T> 型別,它透過參考計數實作了分享所有權。Arc::new 函式會建立新的記憶體分配,就像 Box::new 一樣。然而,與 Box 不同,複製 Arc 會分享原有的記憶體分配,而不會建立新的。只有當最後一個 Arc 及其所有複製物件都被丟棄時,分享的記憶體分配才會被釋放。
實作 Arc<T> 涉及的記憶體排序考量非常有趣。在本章中,我們將進一步實踐理論,實作我們自己的 Arc<T>。我們將從基本版本開始,然後擴充套件它以支援弱指標用於環狀結構,最後以一個幾乎與標準函式庫實作相同的最佳化版本結束本章。
基本參考計數
我們的第一個版本將使用單一 AtomicUsize 來計算分享同一記憶體分配的 Arc 物件數量。讓我們從一個包含此計數器和 T 物件的結構體開始:
struct ArcData<T> {
ref_count: AtomicUsize,
data: T,
}
內容解密:
此結構體 ArcData<T> 不是公開的,它是我們 Arc 實作的內部實作細節。其中 ref_count 欄位用於記錄有多少個 Arc 物件正在分享此 ArcData<T>,而 data 則儲存實際的 T 物件。
接下來是 Arc<T> 結構體本身,它本質上只是一個指向(分享)ArcData<T> 物件的指標。
pub struct Arc<T> {
ptr: *const ArcData<T>,
}
內容解密:
Arc<T> 結構體包含一個指向 ArcData<T> 的指標 ptr。由於 ArcData<T> 是在堆積上分配的,因此 Arc<T> 可以透過這個指標來存取和操作 ArcData<T>。
建立和複製 Arc
為了建立一個新的 Arc,我們需要實作 Arc::new 方法,這個方法會在堆積上分配一個新的 ArcData<T>。
impl<T> Arc<T> {
pub fn new(data: T) -> Self {
let arc_data = ArcData {
ref_count: AtomicUsize::new(1),
data,
};
let ptr = Box::into_raw(Box::new(arc_data));
Arc { ptr }
}
}
內容解密:
在 Arc::new 方法中,我們首先建立一個新的 ArcData<T> 例項,將 ref_count 初始化為 1,因為最初只有一個 Arc 物件指向它。然後,我們使用 Box::into_raw 將 ArcData<T> 轉換為一個原始指標,並將其儲存在 Arc<T> 結構體中。
當 Arc 被複製時,我們需要增加 ref_count。
impl<T> Clone for Arc<T> {
fn clone(&self) -> Self {
unsafe {
(*self.ptr).ref_count.fetch_add(1, Ordering::Relaxed);
}
Arc { ptr: self.ptr }
}
}
內容解密:
在 clone 方法中,我們使用 fetch_add 原子操作來增加 ref_count。這裡使用 Ordering::Relaxed 是因為我們只關心計數的正確性,而不涉及其他同步操作。
丟棄 Arc
當 Arc 被丟棄時,我們需要減少 ref_count,並在計數歸零時釋放 ArcData<T>。
impl<T> Drop for Arc<T> {
fn drop(&mut self) {
unsafe {
if (*self.ptr).ref_count.fetch_sub(1, Ordering::Release) == 1 {
fence(Ordering::Acquire);
drop(Box::from_raw(self.ptr as *mut ArcData<T>));
}
}
}
}
內容解密:
在 drop 方法中,我們使用 fetch_sub 減少 ref_count。如果減少後計數為 1,表示當前 Arc 是最後一個指向 ArcData<T> 的物件,因此我們需要釋放 ArcData<T>。在這之前,我們使用 fence(Ordering::Acquire) 來確保任何對 ArcData<T> 的存取都已完成。最後,我們使用 Box::from_raw 將原始指標轉換回 Box 並丟棄它,從而釋放 ArcData<T>。
未來方向
在未來的章節中,我們可以進一步擴充套件這個 Arc 實作,例如新增對弱指標的支援,以允許建立環狀結構。此外,我們還可以探討如何最佳化 Arc 的效能,使其更加接近標準函式庫中的實作。
graph LR
A[Arc<T>] -->|ptr|> B[ArcData<T>]
B -->|ref_count|> C[參考計數]
B -->|data|> D[T 物件]
圖表翻譯:
此圖示展示了 Arc<T> 和 ArcData<T> 之間的關係。Arc<T> 透過指標 ptr 指向 ArcData<T>,而 ArcData<T> 包含了參考計數 ref_count 和實際的 T 物件 data。透過這種結構,Arc<T> 能夠實作分享所有權和自動記憶體管理。