Rust 提供的 Option 和 Result 組合子簡化了錯誤處理流程,使程式碼更簡潔易讀。非同步程式設計在網路掃描等 I/O 密集型任務中至關重要。本文結合 Future、Stream 和 tokio,展示如何構建高效的非同步網路掃描工具,並探討不同平行處理策略,例如 buffer_unordered 和通道,以最大化資源利用率。同時,文章也涵蓋了錯誤處理和程式碼最佳化的最佳實務,讓讀者能更深入地理解 Rust 非同步程式設計的精髓。
Rust 中的組合子(Combinators)詳解
Rust 語言提供了一系列強大的工具來處理 Option 和 Result 型別,這些工具被稱為組合子(Combinators)。組合子允許開發者以函式語言程式設計風格處理可能存在或不存在的值,以及可能成功或失敗的操作。本章將探討 Rust 中的 Option 和 Result 組合子,並介紹非同步程式設計中的 FutureExt 和 StreamExt 特徵。
3.9.1 Option 組合子
在 Rust 中,Option 型別用於表示一個值可能存在或不存在。Option 組合子提供了一種靈活的方式來處理這種不確定性。
使用 or_else 提供預設值
當我們需要為 Option 提供一個預設值時,可以使用 or_else 方法。以下是一個範例:
fn get_default_port() -> Option<String> {
// 假設這裡有取得預設埠的邏輯
Some(String::from("8080"))
}
fn main() {
let _port = std::env::var("PORT").ok().or_else(get_default_port);
// 如果環境變數 "PORT" 存在,則使用其值;否則,使用 get_default_port 的傳回值
}
#### 內容解密:
std::env::var("PORT").ok()嘗試從環境變數中取得 “PORT” 的值,並將其轉換為Option<String>。.or_else(get_default_port)如果前一步驟的結果是None,則呼叫get_default_port函式來取得預設值。
檢查 Option 狀態
is_some 和 is_none 方法可以用於檢查 Option 是否包含值或為空。
fn main() {
let a: Option<u32> = Some(1);
if a.is_some() {
println!("a 包含值");
}
let b: Option<u32> = None;
if b.is_none() {
println!("b 為空");
}
}
#### 內容解密:
is_some方法檢查Option是否為Some,如果是,則傳回true。is_none方法檢查Option是否為None,如果是,則傳回true。
3.9.2 Result 組合子
Result 型別用於表示一個操作可能成功或失敗。Result 組合子提供了一種處理這種結果的方式。
將 Result 轉換為 Option
可以使用 ok 方法將 Result 轉換為 Option,丟棄錯誤資訊。
fn main() {
let _port: Option<String> = std::env::var("PORT").ok();
// 將取得環境變數 "PORT" 的結果轉換為 Option<String>
}
#### 內容解密:
std::env::var("PORT")傳回一個Result<String, std::env::VarError>。.ok()將其轉換為Option<String>,忽略可能的錯誤。
提供預設的 Result
使用 or 方法可以為 Result 提供一個預設值。
fn main() {
let _port: Result<String, std::env::VarError> =
std::env::var("PORT").or(Ok(String::from("8080")));
// 如果取得 "PORT" 失敗,則使用 "8080" 作為預設值
}
#### 內容解密:
.or(Ok(String::from("8080")))如果前一步驟的結果是Err,則傳回Ok(String::from("8080"))。
轉換錯誤型別
使用 map_err 方法可以將 Result 的錯誤型別轉換為另一種型別。
fn convert_error(err: std::env::VarError) -> ErrorType2 {
// 將 std::env::VarError 轉換為 ErrorType2 的邏輯
}
fn main() {
let _port: Result<String, ErrorType2> =
std::env::var("PORT").map_err(convert_error);
// 將錯誤型別從 std::env::VarError 轉換為 ErrorType2
}
#### 內容解密:
.map_err(convert_error)將錯誤型別透過convert_error函式進行轉換。
鏈式呼叫
使用 and_then 可以在 Result 為 Ok 時呼叫另一個函式。
fn port_to_address(port: String) -> Option<String> {
// 將埠號轉換為地址的邏輯
}
fn main() {
let _address = std::env::var("PORT").and_then(port_to_address);
// 如果 "PORT" 取得成功,則呼叫 port_to_address 將其轉換為地址
}
#### 內容解密:
.and_then(port_to_address)如果前一步驟的結果是Ok,則呼叫port_to_address函式。
3.9.3 何時使用 .unwrap() 和 .expect()
.unwrap() 和 .expect() 可以用於直接取得 Option 或 Result 中的值,但如果值不存在或操作失敗,程式將會當機。因此,應謹慎使用這些方法。
建議在以下情況下使用它們:
- 在探索性程式設計或快速指令碼中,為了避免處理所有邊界情況。
- 當你確定它們不會導致程式當機時,但應附上註解說明原因。
3.9.4 非同步組合子
在非同步程式設計中,FutureExt 和 StreamExt 特徵提供了額外的組合子來處理非同步操作。
FutureExt
.then()在初始Future完成後呼叫另一個傳回Future的函式。.map()將Future的輸出轉換為另一種型別。.flatten()將巢狀的Future合併為單一的Future。.into_stream()將Future轉換為單元素流。
async fn compute_a() -> i64 {
40
}
async fn compute_b(a: i64) -> i64 {
a + 2
}
fn main() {
let b = compute_a().then(compute_b).await;
// b = 42
}
#### 內容解密:
.then(compute_b)在compute_a()完成後呼叫compute_b()。
StreamExt
.filter()、.fold()、.for_each()、.map()等方法與迭代器類別似,但適用於非同步流。.for_each_concurrent()和.buffer_unordered()可以平行處理流中的元素。
use futures::{stream, StreamExt};
use rand::{thread_rng, Rng};
use std::time::Duration;
#[tokio::main(flavor = "multi_thread")]
async fn main() {
stream::iter(0..200u64)
.for_each_concurrent(20, |number| async move {
let mut rng = thread_rng();
let sleep_ms: u64 = rng.gen_range(0..20);
tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
println!("{}", number);
})
.await;
}
#### 內容解密:
.for_each_concurrent(20, ...)同時平行處理最多 20 個元素。- 在每個非同步區塊中,模擬了一個隨機延遲後列印數字。
使用非同步程式設計最佳化網路掃描工具
在現代軟體開發中,非同步程式設計已成為提升程式效能的重要技術。透過非同步處理,可以有效提高系統資源利用率,特別是在涉及大量I/O操作的網路掃描工具中。本章將探討如何在Rust中使用非同步程式設計來最佳化網路掃描工具的效能。
非同步程式設計基礎
非同步程式設計的核心概念是允許程式在等待某些操作完成時繼續執行其他任務。在Rust中,這是透過async和await關鍵字實作的。非同步函式傳回一個Future,代表一個可能尚未完成的操作。
程式碼範例:簡單的非同步函式
async fn fetch_data() {
// 模擬網路請求
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
println!("資料擷取完成");
}
內容解密:
async fn定義了一個非同步函式。await關鍵字用於等待一個Future完成。tokio::time::sleep是一個非同步函式,用於模擬延遲。
Streams與平行處理
在處理大量資料時,Rust的Streams提供了一種有效的方式來管理平行操作。Streams可以被視為非同步的迭代器,允許我們對一系列值進行非同步操作。
使用buffer_unordered和collect
let subdomains: Vec<Subdomain> = stream::iter(subdomains.into_iter())
.map(|subdomain| ports::scan_ports(ports_concurrency, subdomain))
.buffer_unordered(subdomains_concurrency)
.collect()
.await;
內容解密:
stream::iter將一個迭代器轉換為Stream。map將每個子網域轉換為一個非同步任務。buffer_unordered允許這些任務平行執行,並限制最大平行數。collect將結果收集到一個Vec中。
實作平行處理的不同方法
-
使用
buffer_unordered和collect- 這是一種函式式和慣用的方式來實作工作池。
- 適合用於需要收集所有結果的情況。
-
使用
Arc<Mutex<T>>let res: Arc<Mutex<Vec<Subdomain>>> = Arc::new(Mutex::new(Vec::new())); stream::iter(subdomains.into_iter()) .for_each_concurrent(subdomains_concurrency, |subdomain| { let res = res.clone(); async move { let subdomain = ports::scan_ports(ports_concurrency, subdomain).await; res.lock().await.push(subdomain) } }) .await;內容解密:
- 使用
Arc和Mutex來分享可變狀態。 for_each_concurrent平行執行任務,並將結果存入共用向量。
- 使用
-
使用通道(channels)
let (input_tx, input_rx) = mpsc::channel(concurrency); let (output_tx, output_rx) = mpsc::channel(concurrency); // ... 省略部分實作細節 let open_ports: Vec<Port> = output_rx_stream.collect().await;內容解密:
- 使用兩個通道,一個用於傳送任務,一個用於收集結果。
- 將接收者轉換為Stream以便進行非同步處理。
將掃描器移植到非同步模式
要將我們的掃描器移植到非同步模式,首先需要使用tokio::main巨集來裝飾main函式。
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
// ... 主函式內容
}
內容解密:
tokio::main巨集建立了一個多執行緒的執行環境。- 將
main函式轉換為非同步函式。
子網域列舉的非同步實作
pub async fn enumerate(http_client: &Client, target: &str) -> Result<Vec<Subdomain>, Error> {
// ... 省略部分實作細節
let subdomains: Vec<Subdomain> = stream::iter(subdomains.into_iter())
.map(|domain| Subdomain {
domain,
open_ports: Vec::new(),
})
.filter_map(|subdomain| {
let dns_resolver = dns_resolver.clone();
async move {
if resolves(&dns_resolver, &subdomain).await {
Some(subdomain)
} else {
None
}
}
})
.collect()
.await;
Ok(subdomains)
}
內容解密:
- 使用Stream處理子網域列表。
- 使用
filter_map過濾掉無法解析的子網域。 - 將結果收集到一個
Vec中。