在多執行緒程式設計中,分享狀態的同步至關重要。本文介紹如何使用 Rust 的原子操作來有效地報告進度和收集統計資料,避免鎖的開銷,提升效能。首先,我們使用 AtomicUsize 和 fetch_add 來追蹤已完成的任務數量,並在主執行緒中定期顯示進度。接著,我們擴充套件這個範例,加入 AtomicU64 和 fetch_max 來計算總處理時間和最大處理時間,並在進度報告中顯示平均和峰值時間。然而,單純使用 fetch_add 可能會遇到溢位問題,因此我們引入了比較與交換操作 compare_exchange 來解決這個問題,並示範如何使用它來實作原子遞增和無溢位 ID 分配。此外,compare_exchange 也可用於延遲初始化,確保資源只被初始化一次。最後,我們討論了 ABA 問題以及如何選擇合適的記憶體排序策略。
使用原子操作進行多執行緒進度報告與統計
在多執行緒程式設計中,如何有效地報告進度與收集統計資料是一個重要的議題。本文將探討如何利用原子操作(Atomic Operations)來實作多執行緒的進度報告與統計功能。
多執行緒進度報告範例
在之前的範例中,我們使用了一個 AtomicUsize 來報告單一背景執行緒的進度。如果我們將工作分配給多個執行緒,例如四個執行緒各自處理 25 個專案,我們就需要知道所有執行緒的進度。
方法一:使用多個 AtomicUsize
一種方法是為每個執行緒使用一個獨立的 AtomicUsize,然後在主執行緒中加總這些值。這種方法雖然可行,但稍顯繁瑣。
方法二:使用單一 AtomicUsize 與原子加法操作
更簡單的方法是使用一個 AtomicUsize 來追蹤所有執行緒處理的專案總數。為了實作這一點,我們不能再使用 store 方法,因為它會覆寫其他執行緒的進度。相反,我們可以使用原子加法操作(fetch_add)在每次處理完一個專案後遞增計數器。
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
fn main() {
let num_done = &AtomicUsize::new(0);
thread::scope(|s| {
// 四個背景執行緒處理所有100個專案,每個執行緒處理25個。
for t in 0..4 {
s.spawn(move || {
for i in 0..25 {
process_item(t * 25 + i); // 假設這需要一些時間。
num_done.fetch_add(1, Ordering::Relaxed);
}
});
}
// 主執行緒每秒顯示進度更新。
loop {
let n = num_done.load(Ordering::Relaxed);
if n == 100 { break; }
println!("Working.. {n}/100 done");
thread::sleep(Duration::from_secs(1));
}
});
println!("Done!");
}
fn process_item(_i: usize) {
// 模擬處理專案需要一些時間。
thread::sleep(Duration::from_millis(50));
}
#### 內容解密:
fetch_add方法:用於原子地將計數器增加 1。這個操作是執行緒安全的,不會出現競爭條件(Race Condition)。Ordering::Relaxed:這裡使用了寬鬆的記憶體排序(Relaxed Memory Ordering),因為我們只需要保證fetch_add操作的原子性,而不需要保證其他操作的順序。- 主執行緒中的迴圈:主執行緒每秒載入
num_done的值並列印進度,直到所有專案都處理完畢。
擴充套件範例:收集統計資料
除了報告進度,我們還可以擴充套件這個範例來收集和報告一些統計資料,例如處理每個專案所需的時間。
新增原子變數
我們新增兩個原子變數 total_time 和 max_time 來追蹤處理所有專案所花費的總時間和最大處理時間。
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::thread;
use std::time::{Duration, Instant};
fn main() {
let num_done = &AtomicUsize::new(0);
let total_time = &AtomicU64::new(0);
let max_time = &AtomicU64::new(0);
thread::scope(|s| {
// 四個背景執行緒處理所有100個專案,每個執行緒處理25個。
for t in 0..4 {
s.spawn(move || {
for i in 0..25 {
let start = Instant::now();
process_item(t * 25 + i); // 假設這需要一些時間。
let time_taken = start.elapsed().as_micros() as u64;
num_done.fetch_add(1, Ordering::Relaxed);
total_time.fetch_add(time_taken, Ordering::Relaxed);
max_time.fetch_max(time_taken, Ordering::Relaxed);
}
});
}
// 主執行緒每秒顯示進度更新。
loop {
let n = num_done.load(Ordering::Relaxed);
let total_time_val = total_time.load(Ordering::Relaxed);
let max_time_val = max_time.load(Ordering::Relaxed);
if n == 100 { break; }
if n == 0 {
println!("Working.. nothing done yet.");
} else {
println!(
"Working.. {n}/100 done, average: {:?}, peak: {:?}",
Duration::from_micros(total_time_val / n as u64),
Duration::from_micros(max_time_val)
);
}
thread::sleep(Duration::from_secs(1));
}
});
println!("Done!");
}
fn process_item(_i: usize) {
// 模擬處理專案需要一些時間。
thread::sleep(Duration::from_millis(50));
}
#### 內容解密:
total_time和max_time:用於追蹤總處理時間和最大處理時間。fetch_max方法:用於原子地更新最大值。- 平均處理時間的計算:主執行緒透過將總處理時間除以已處理的專案數量來計算平均處理時間。
- 峰值處理時間:直接使用
max_time的值。
注意事項
- 統計資料的準確性:由於三個原子變數是獨立更新的,主執行緒在載入這些值時可能會看到不一致的狀態。例如,可能會看到
num_done更新了,但total_time還沒更新完畢,從而導致平均處理時間的計算不準確。 - 使用 Mutex 提高準確性:如果需要更準確的統計資料,可以將這三個統計資料放在一個
Mutex中,更新時鎖定Mutex,這樣可以保證三個值的更新是原子的,但代價是增加了鎖的開銷。
比較與交換運算:原子操作的進階應用
在探討原子操作的過程中,我們已經瞭解了基本的原子操作,如fetch_add。現在,讓我們探討更進階的原子操作:比較與交換(Compare-and-Exchange)運算。這種運算提供了更大的靈活性,讓我們能夠實作更複雜的同步機制。
使用fetch_add實作ID分配
首先,讓我們回顧一個使用fetch_add的例子:實作一個分配唯一ID的函式。
use std::sync::atomic::AtomicU32;
fn allocate_new_id() -> u32 {
static NEXT_ID: AtomicU32 = AtomicU32::new(0);
NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
內容解密:
- 我們使用
AtomicU32來儲存下一個可用的ID。 fetch_add(1, Relaxed)原子地將NEXT_ID增加1並傳回舊值。- 這種方法確保每個呼叫者獲得一個唯一的ID。
然而,這種實作存在一個問題:當呼叫次數超過$2^{32}$次時,ID會溢位並重新從0開始。為瞭解決這個問題,我們需要引入更複雜的邏輯。
處理溢位問題
方法1:程式終止
一種簡單的方法是在溢位時終止整個程式。
use std::sync::atomic::AtomicU32;
fn allocate_new_id() -> u32 {
static NEXT_ID: AtomicU32 = AtomicU32::new(0);
let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if id >= 1000 {
std::process::abort();
}
id
}
內容解密:
- 當ID達到1000時,呼叫
std::process::abort()終止程式。 - 這種方法確保不會有重複的ID被分配。
方法2:遞減計數器
另一種方法是,在觸發錯誤之前,先遞減計數器。
use std::sync::atomic::AtomicU32;
fn allocate_new_id() -> u32 {
static NEXT_ID: AtomicU32 = AtomicU32::new(0);
let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if id >= 1000 {
NEXT_ID.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
panic!("too many IDs!");
}
id
}
內容解密:
- 當ID達到1000時,先遞減
NEXT_ID,然後觸發panic。 - 這種方法減少了溢位的可能性。
比較與交換運算
比較與交換(Compare-and-Exchange)運算是原子操作中最進階和最靈活的操作。它檢查原子變數是否等於給定的值,只有在這種情況下才會將其替換為新值,所有這些都作為單一的原子操作。
impl AtomicI32 {
pub fn compare_exchange(
&self,
expected: i32,
new: i32,
success_order: Ordering,
failure_order: Ordering
) -> Result<i32, i32>;
}
內容解密:
compare_exchange檢查原子變數是否等於expected。- 如果相等,則將其替換為
new並傳回Ok(舊值)。 - 如果不相等,則傳回
Err(當前值)。
使用compare_exchange實作原子遞增
讓我們使用compare_exchange來實作一個原子遞增操作,而不使用fetch_add。
use std::sync::atomic::{AtomicU32, Ordering};
fn increment(a: &AtomicU32) {
let mut current = a.load(Ordering::Relaxed);
loop {
let new = current + 1;
match a.compare_exchange(current, new, Ordering::Relaxed, Ordering::Relaxed) {
Ok(_) => return,
Err(v) => current = v,
}
}
}
內容解密:
- 我們首先載入
a的當前值。 - 進入迴圈,計算新的值
new。 - 使用
compare_exchange嘗試將a的值從current更新為new。 - 如果成功,傳回;如果失敗,更新
current並重試。
這種方法展示了compare_exchange的強大之處:它允許我們實作複雜的原子操作,並且能夠處理競爭條件。
圖表翻譯:
graph LR
A[開始] --> B{是否第一次呼叫}
B -->|是| C[分配ID 0]
B -->|否| D[分配下一個ID]
D --> E[增加NEXT_ID]
E --> F[檢查是否溢位]
F -->|是| G[處理溢位]
F -->|否| H[傳回ID]
G --> I[終止或遞減計數器]
圖表翻譯: 此圖示展示了ID分配的流程。首先檢查是否為第一次呼叫,然後分配相應的ID。接著,增加NEXT_ID並檢查是否溢位。如果溢位,則進行相應的處理,否則傳回分配的ID。
透過深入理解和應用這些原子操作,我們可以編寫出更高效、更安全的並發程式。
比較與交換操作(Compare-and-Exchange Operations)深度解析
在多執行緒程式設計中,比較與交換(compare-and-exchange)操作是一種至關重要的原子操作,用於實作無鎖(lock-free)資料結構和演算法。本章節將探討比較與交換操作的原理、應用場景以及相關實作細節。
比較與交換操作的基本原理
比較與交換操作是一種特殊的原子指令,能夠在單一操作中完成以下三個步驟:
- 載入目前的變數值
- 比較目前值是否與預期值相同
- 如果相同,則更新變數為新值
程式碼範例:基本比較與交換操作
use std::sync::atomic::{AtomicU32, Ordering};
fn main() {
let a = AtomicU32::new(5);
let old_value = a.load(Ordering::SeqCst);
let new_value = old_value + 1;
match a.compare_exchange_strong(old_value, new_value, Ordering::SeqCst, Ordering::SeqCst) {
Ok(_) => println!("成功更新值"),
Err(actual) => println!("更新失敗,目前值為:{}", actual),
}
}
內容解密:
- 使用
AtomicU32建立一個原子變數a,初始值為5 load方法用於載入目前的變數值compare_exchange_strong方法執行比較與交換操作- 第一個引數是預期值
- 第二個引數是新值
- 第三個引數是成功時的記憶體順序
- 第四個引數是失敗時的記憶體順序
- 操作成功則傳回
Ok(()),失敗則傳回Err(actual_value)
比較與交換操作的實務應用
案例1:實作無溢位ID分配
在多執行緒環境中,如何確保ID分配的正確性和唯一性是一個常見挑戰。以下是一個使用比較與交換操作實作無溢位ID分配的範例:
use std::sync::atomic::{AtomicU32, Ordering};
fn allocate_new_id() -> u32 {
static NEXT_ID: AtomicU32 = AtomicU32::new(0);
let mut id = NEXT_ID.load(Ordering::Relaxed);
loop {
assert!(id < 1000, "ID數量超出限制!");
match NEXT_ID.compare_exchange_weak(id, id + 1, Ordering::Relaxed, Ordering::Relaxed) {
Ok(_) => return id,
Err(v) => id = v,
}
}
}
內容解密:
- 使用
AtomicU32作為靜態變數NEXT_ID來儲存下一個ID loop迴圈確保在競爭條件下仍能正確分配ID- 使用
compare_exchange_weak進行比較與交換操作 - 如果操作失敗,則使用新的
id值重試 assert!確保ID不會超出限制
案例2:延遲初始化(Lazy Initialization)
在某些場景下,我們需要確保某些資源只被初始化一次,即使在多執行緒環境下。以下是一個使用比較與交換操作實作延遲初始化的範例:
use std::sync::atomic::{AtomicU64, Ordering};
fn get_key() -> u64 {
static KEY: AtomicU64 = AtomicU64::new(0);
let key = KEY.load(Ordering::Relaxed);
if key == 0 {
let new_key = generate_random_key();
match KEY.compare_exchange(0, new_key, Ordering::Relaxed, Ordering::Relaxed) {
Ok(_) => new_key,
Err(k) => k,
}
} else {
key
}
}
fn generate_random_key() -> u64 {
// 實作隨機金鑰生成邏輯
42 // 示例傳回值
}
內容解密:
- 使用
AtomicU64儲存金鑰 - 第一次呼叫
get_key()時會生成新的隨機金鑰 - 使用
compare_exchange確保只有第一個執行緒能夠成功設定金鑰 - 後續呼叫直接傳回已初始化的金鑰
ABA問題及其解決方案
在某些特定的演算法中,特別是涉及原子指標的操作,比較與交換操作可能會遇到ABA問題:
- 變數原本的值是A
- 被其他執行緒修改為B
- 又被修改回A
這種情況下,比較與交換操作會誤以為變數沒有被修改過。解決ABA問題的常見方法包括:
- 使用標籤指標(tagged pointers)
- 使用額外的版本計數器
比較與交換操作的效能考量
compare_exchangevscompare_exchange_weak:compare_exchange保證在值匹配時一定會成功compare_exchange_weak在某些平台上效能更好,但可能會出現偽失敗(spurious failure)- 在迴圈中使用
compare_exchange_weak通常是更好的選擇
最佳實踐與注意事項
-
記憶體順序(Memory Ordering)的選擇:
Relaxed:提供最少的同步保證,但在某些場景下效能最佳SeqCst:提供最強的同步保證,但在某些場景下可能影響效能
-
迴圈處理:
- 在競爭較高的場景下,需要謹慎設計重試邏輯
- 避免無限迴圈的可能性
-
錯誤處理:
- 正確處理
compare_exchange操作的失敗情況 - 根據具體場景選擇是否使用
compare_exchange_weak
- 正確處理