Rust 的多執行緒模型提供安全且穩健的錯誤處理機制。join() 方法傳回 Result 型別,能捕捉子執行緒的恐慌錯誤,並允許父執行緒進行處理,避免程式當機。跨執行緒分享不可變資料時,使用 Arc 智慧指標能有效避免資料競爭,同時提升效能。Rayon 函式庫提供更簡潔易用的 API,例如 par_iter() 和 join(),讓開發者能更輕鬆地實作資料平行化,提升多核 CPU 的利用率。此外,通道(Channel)提供執行緒間的單向通訊機制,能有效地傳遞資料並確保執行緒安全。
跨執行緒的錯誤處理
在我們的範例中,用於合併子執行緒的程式碼由於錯誤處理而變得複雜。讓我們重新檢視這行程式碼:
handle.join().unwrap()?;
.join() 方法為我們做了兩件很棒的事情。
首先,handle.join() 傳回一個 std::thread::Result,如果子執行緒發生了恐慌(panic),則傳回一個錯誤。這使得 Rust 中的多執行緒比 C++ 更加穩健。在 C++ 中,超出邊界的陣列存取是未定義行為,並且無法保護系統的其他部分免受其後果。在 Rust 中,恐慌是安全的,並且是每個執行緒獨立的。執行緒之間的邊界就像是一道防火牆,防止恐慌自動傳播到依賴它的其他執行緒。相反,一個執行緒中的恐慌會在其他執行緒中被報告為錯誤結果。整個程式可以輕易地從中還原。
然而,在我們的程式中,我們並沒有嘗試任何複雜的恐慌處理。相反,我們立即對這個結果使用 .unwrap(),斷言它是一個 Ok 結果,而不是 Err 結果。如果子執行緒發生了恐慌,那麼這個斷言就會失敗,因此父執行緒也會發生恐慌。我們正在明確地將子執行緒的恐慌傳播到父執行緒。
其次,handle.join() 將子執行緒的傳回值傳遞回父執行緒。我們傳遞給 spawn 的閉包具有 io::Result<()> 的傳回型別,因為這是 process_files 傳回的型別。這個傳回值並沒有被丟棄。當子執行緒完成時,其傳回值被儲存,並且 JoinHandle::join() 將該值傳遞回父執行緒。
在這個程式中,handle.join() 傳回的完整數型別是 std::thread::Result<std::io::Result<()>>。thread::Result 是 spawn/join API 的一部分;io::Result 是我們應用程式的一部分。
在我們的例子中,在解開 thread::Result 之後,我們對 io::Result 使用 ? 運算子,明確地將子執行緒中的 I/O 錯誤傳播到父執行緒。
內容解密:
.join()方法傳回std::thread::Result以處理子執行緒的恐慌。- 使用
.unwrap()將子執行緒的恐慌傳播到父執行緒。 handle.join()將子執行緒的傳回值傳遞回父執行緒。- 對
io::Result使用?運算子以傳播 I/O 錯誤。
所有這些看起來相當複雜。但是請考慮這只是一行程式碼,然後與其他語言進行比較。在 Java 和 C# 中,預設行為是將子執行緒中的例外丟棄到終端並忘記。在 C++ 中,預設是中止程式。在 Rust 中,錯誤是結果值(資料),而不是例外(控制流程)。它們像其他值一樣在執行緒之間傳遞。任何時候你使用低階多執行緒 API,你最終都必須編寫仔細的錯誤處理程式碼,但既然你必須編寫它,有結果值是非常好的。
跨執行緒分享不可變資料
假設我們正在進行的分析需要一個龐大的英文單字和片語資料函式庫:
// before
fn process_files(filenames: Vec<String>)
// after
fn process_files(filenames: Vec<String>, glossary: &GigabyteMap)
這個辭彙表非常龐大,因此我們透過參考傳遞它。我們如何更新 process_files_in_parallel 以將辭彙表傳遞給工作執行緒?
顯而易見的更改並不起作用:
fn process_files_in_parallel(filenames: Vec<String>,
glossary: &GigabyteMap)
-> io::Result<()>
{
...
for worklist in worklists {
thread_handles.push(
spawn(move || process_files(worklist, glossary)) // error
);
}
...
}
我們只是簡單地為函式增加了一個辭彙表引數,並將其傳遞給 process_files。Rust 抱怨:
error[E0477]: the type `[closure@...]` does not fulfill the required lifetime
--> concurrency_spawn_lifetimes.rs:35:13
|
35 | spawn(move || process_files(worklist, glossary)) // error
| ^^^^^
|
= note: type must satisfy the static lifetime
Rust 在抱怨我們傳遞給 spawn 的閉包的生命週期。spawn 啟動獨立的執行緒。Rust 無法知道子執行緒將執行多久,因此它假設最壞的情況:它假設子執行緒可能會繼續執行,即使父執行緒已經完成並且父執行緒中的所有值都消失了。
顯然,如果子執行緒要持續那麼久,那麼它正在執行的閉包也需要持續那麼久。但是這個閉包具有有限的生命週期:它依賴於參考辭彙表,而參考並不會永遠存在。
內容解密:
- Rust 拒絕了程式碼,因為它無法保證閉包的生命週期足夠長。
- 子執行緒可能會在父執行緒完成後繼續執行,並嘗試使用已經被釋放的辭彙表。
- 使用
Arc可以解決這個問題,因為它提供了原子參考計數。
看起來 spawn 太過開放,以至於無法支援跨執行緒分享參考。事實上,我們已經看到了類別似的情況,在「偷竊的閉包」一節中。我們的解決方案是使用移動閉包將資料的所有權轉移到新的執行緒。但是在這裡,這行不通,因為我們有多個執行緒都需要使用相同的資料。一種安全的替代方案是為每個執行緒複製整個辭彙表,但由於它很大,我們希望避免這樣做。幸運的是,標準函式庫提供了另一種方法:原子參考計數。
use std::sync::Arc;
fn process_files_in_parallel(filenames: Vec<String>,
glossary: Arc<GigabyteMap>)
-> io::Result<()>
{
...
for worklist in worklists {
// This call to .clone() only clones the Arc and bumps the
// reference count. It does not clone the GigabyteMap.
let glossary_for_child = glossary.clone();
thread_handles.push(
spawn(move || process_files(worklist, &glossary_for_child))
);
}
...
}
我們更改了辭彙表的型別:要平行執行分析,呼叫者必須透過 Arc::new(giga_map) 將一個 Arc<GigabyteMap> 傳遞進來,這是一個智慧指標,指向已經被移到堆積上的 GigabyteMap。
當我們呼叫 glossary.clone() 時,我們正在複製 Arc 智慧指標,而不是整個 GigabyteMap。這相當於增加一個參考計數。
內容解密:
- 使用
Arc可以安全地在多個執行緒之間分享資料。 - 對
Arc呼叫.clone()只會複製智慧指標,而不會複製底層資料。 - 這樣可以確保資料在任何一個執行緒擁有
Arc時保持存活,從而避免了資料競爭。
Rayon:簡化平行處理的強大工具
在前面的章節中,我們已經討論過Rust標準函式庫中的spawn函式,它是平行程式設計的一個重要根本。然而,spawn函式並不是專門為分叉-合併(fork-join)平行設計的。幸運的是,已經有許多優秀的函式庫在spawn的基礎上建立了更適合fork-join平行的API。其中,Rayon就是一個非常出色的例子。
Rayon的基本用法
Rayon提供了兩種主要的方式來平行執行任務:
- 使用
rayon::join來平行執行兩個任務。 - 使用
.par_iter()方法來建立一個平行迭代器(ParallelIterator),對多個元素進行平行處理。
extern crate rayon;
use rayon::prelude::*;
let (v1, v2) = rayon::join(fn1, fn2);
giant_vector.par_iter().for_each(|value| {
do_thing_with_value(value);
});
在這裡,rayon::join(fn1, fn2)簡單地呼叫兩個函式並傳回它們的結果。.par_iter()方法則建立了一個ParallelIterator,它具有map、filter等方法,就像Rust的Iterator一樣。Rayon使用自己的工作執行緒池來分配工作,這使得它能夠高效地利用多核CPU。
使用Rayon實作檔案平行處理
下面是一個使用Rayon來平行處理檔案的例子:
extern crate rayon;
use rayon::prelude::*;
fn process_files_in_parallel(filenames: Vec<String>, glossary: &GigabyteMap) -> io::Result<()> {
filenames.par_iter()
.map(|filename| process_file(filename, glossary))
.reduce_with(|r1, r2| {
if r1.is_err() { r1 } else { r2 }
})
.unwrap_or(Ok(()))
}
內容解密:
- 首先,我們使用
filenames.par_iter()建立了一個平行迭代器。 - 然後,我們使用
.map()方法對每個檔名呼叫process_file函式,這產生了一個包含`io::Result<()>``值的ParallelIterator。 - 接著,我們使用
.reduce_with()方法來合併結果。在這裡,我們保留了第一個錯誤(如果有的話),並丟棄了其他的結果。 reduce_with傳回一個Option,只有當filenames為空時才會是None。我們使用Option的.unwrap_or()方法在這種情況下傳回Ok(())。
Rayon背後使用了稱為工作竊取(work-stealing)的技術來動態平衡各執行緒之間的負載。這通常比手動劃分工作更有效地保持所有CPU核心忙碌。
重新審視Mandelbrot集合
在第2章中,我們使用fork-join並發來渲染Mandelbrot集合。雖然這樣做使渲染速度提高了四倍,但考慮到我們啟動了八個工作執行緒並在八核機器上執行,改進的幅度並不如預期。
問題在於我們沒有均勻地分配工作量。計算影像的一個畫素相當於執行一個迴圈,而影像中淺灰色部分(迴圈快速離開)的渲染速度遠快於黑色部分(迴圈執行255次迭代)。因此,雖然我們將區域劃分為大小相等的水平帶,但實際上卻建立了不相等的工作量。
使用Rayon可以輕鬆解決這個問題。我們可以為輸出影像的每一行畫素觸發一個平行任務,這樣就建立了數百個任務,讓Rayon能夠跨執行緒分配這些任務。由於工作竊取機制的存在,任務大小的不同並不會成為問題,Rayon會在執行過程中平衡工作量。
以下是修改後的程式碼:
let mut pixels = vec![0; bounds.0 * bounds.1];
{
let bands: Vec<(usize, &mut [u8])> = pixels.chunks_mut(bounds.0).enumerate().collect();
bands.into_par_iter()
.weight_max()
.for_each(|(i, band)| {
let top = i;
let band_bounds = (bounds.0, 1);
let band_upper_left = pixel_to_point(bounds, (0, top), upper_left, lower_right);
let band_lower_right = pixel_to_point(bounds, (bounds.0, top + 1), upper_left, lower_right);
// ...
});
}
內容解密:
- 我們首先建立了一個畫素向量,並將其劃分為多個帶(band)。
- 然後,我們使用
into_par_iter()將這些帶轉換為一個平行迭代器。 .weight_max()方法用於指示Rayon這個迭代器的元素具有不同的計算成本,從而更好地進行負載平衡。- 在
.for_each()中,我們對每個帶進行渲染計算。
透過這種方式,Rayon能夠更有效地分配工作量,從而充分利用多核CPU的優勢。
多執行緒程式設計與通道(Channel)應用
在Rust中,多執行緒程式設計能夠有效地提升程式的效能,而通道(Channel)則是實作執行緒間通訊的重要工具。本文將介紹如何使用Rayon函式庫進行平行計算,以及如何利用通道進行執行緒間的資料傳遞。
使用Rayon進行平行計算
Rayon是一個用於Rust的資料平行函式庫,能夠簡化平行程式的開發。以下是一個使用Rayon進行影像處理的範例:
extern crate rayon;
use rayon::prelude::*;
fn main() {
// 建立任務集合
let mut pixels = vec![0; width * height];
let bands: Vec<(usize, &mut [u8])> = pixels.chunks_mut(width).enumerate().collect();
// 轉換為平行迭代器並執行任務
bands.into_par_iter().weight_max().for_each(|(row, band)| {
let upper_left = (0, row);
let lower_right = (width, row + 1);
let band_upper_left = (0, 0);
let band_lower_right = (width, 1);
render(band, band_upper_left, band_lower_right);
});
// 將結果寫入PNG檔案
write_bitmap(&args[1], &pixels, bounds).expect("error writing PNG file");
}
內容解密:
bands向量儲存了影像處理的任務,每個任務是一個元組,包含行號和該行的畫素資料切片。into_par_iter()將向量轉換為平行迭代器,使Rayon能夠自動分配任務到多個執行緒。weight_max()提示Rayon這些任務是CPU密集型的,以便最佳化任務分配。for_each()定義了每個任務要執行的操作,這裡是呼叫render()函式進行影像處理。- 最後,將處理好的畫素資料寫入PNG檔案。
通道(Channel)的使用
通道是一種單向的執行緒間通訊機制,用於在不同執行緒之間傳遞值。以下是建立和使用通道的基本步驟:
use std::sync::mpsc::channel;
use std::thread::spawn;
fn main() {
// 建立通道
let (sender, receiver) = channel();
// 在新執行緒中傳送資料
let handle = spawn(move || {
for text in vec!["Hello", "World"] {
if sender.send(text.to_string()).is_err() {
break;
}
}
Ok(())
});
// 在主執行緒中接收資料
while let Ok(text) = receiver.recv() {
println!("{}", text);
}
handle.join().unwrap();
}
內容解密:
channel()函式傳回一個傳送者和一個接收者,兩者透過通道連線。- 在新執行緒中,
sender.send()用於傳送字串到通道。 - 在主執行緒中,
receiver.recv()用於接收通道中的字串。 - 資料的所有權從傳送執行緒轉移到接收執行緒,實作了執行緒間的安全通訊。
建構倒排索引的管道
以下是一個使用通道建構倒排索引的範例,展示瞭如何利用多個執行緒和通道來實作平行處理:
use std::fs::File;
use std::io::prelude::*;
use std::thread::spawn;
use std::sync::mpsc::channel;
fn main() {
// 建立通道和啟動檔案讀取執行緒
let (sender, receiver) = channel();
let documents = vec![PathBuf::from("file1.txt"), PathBuf::from("file2.txt")];
let handle = spawn(move || {
for filename in documents {
let mut f = File::open(filename).unwrap();
let mut text = String::new();
f.read_to_string(&mut text).unwrap();
if sender.send(text).is_err() {
break;
}
}
Ok(())
});
// 接收檔案內容並進行後續處理
while let Ok(text) = receiver.recv() {
// 對text進行處理,例如建構倒排索引
println!("{}", text);
}
handle.join().unwrap();
}
內容解密:
- 建立一個通道,用於在檔案讀取執行緒和主執行緒之間傳遞檔案內容。
- 在新執行緒中,讀取檔案內容並透過通道傳送給主執行緒。
- 主執行緒接收檔案內容,並可進行後續的處理,如建構倒排索引。