精通 Rust 并行:使用 Rayon 编写安全、高速的并发代码,实现零竞争条件
Source: Dev.to
掌握 Rust 并行编程
使用 Rayon 编写安全、快速且零竞争的并发代码
在现代硬件上,CPU 核心数量不断增加,单线程程序已经难以充分利用这些资源。Rust 以其所有权系统和编译时检查而闻名,能够帮助我们在不牺牲安全性的前提下实现高效的并行计算。本文将带你一步步了解如何使用 Rayon——Rust 生态中最流行的并行库——来编写 安全、快速且零竞争 的并发代码。
为什么需要并行?
- 提升吞吐量:在多核机器上,多个任务可以同时执行,从而显著降低整体运行时间。
- 保持响应性:在 GUI 或网络服务中,将耗时操作放到后台线程可以防止主线程阻塞。
- 更好地利用硬件:现代 CPU 的 SIMD 单元和缓存层次结构在并行工作负载下表现更佳。
然而,手动管理线程、锁和同步原语往往会导致 数据竞争(data races)和 死锁(deadlocks),这些错误在运行时才会暴露,调试成本极高。Rayon 通过 数据并行(data‑parallel)抽象,帮助我们在编译期就捕获大多数潜在错误。
Rayon 简介
Rayon 是一个 工作窃取(work‑stealing) 调度器,能够自动将任务划分为更小的子任务并在可用的线程池中执行。它提供了类似于标准库迭代器的 并行迭代器(ParallelIterator),以及 join、scope 等高级 API。
主要特性:
| 特性 | 描述 |
|---|---|
| 零成本抽象 | 大多数开销在编译期消除,运行时几乎没有额外负担。 |
| 安全性 | 通过所有权系统防止数据竞争;unsafe 代码仅在极少数内部实现中出现。 |
| 易用性 | 与 Iterator 接口保持一致,只需把 iter() 换成 par_iter() 即可。 |
| 可配置的线程池 | 支持自定义线程数、线程名称等。 |
快速上手
在 Cargo.toml 中加入:
[dependencies]
rayon = "1.9"
然后在代码中引入:
use rayon::prelude::*;
示例:并行求和
下面的例子演示了如何使用 Rayon 将一个向量的求和操作并行化:
let numbers: Vec<u64> = (1..=1_000_000).collect();
let sum: u64 = numbers.par_iter().sum();
println!("Sum = {}", sum);
说明
par_iter()将普通迭代器转换为并行迭代器。sum()在内部使用 分治(divide‑and‑conquer)策略,将向量划分为若干块并在不同线程上并行求和,最后再合并结果。- 由于
u64实现了Send + Sync,编译器能够保证没有数据竞争。
零竞争的并行映射(Parallel Map)
假设我们需要对一个大数组执行 CPU 密集型的计算(例如计算每个元素的平方根),传统的 for 循环会阻塞单个线程。使用 Rayon:
let data: Vec<f64> = (0..10_000_000).map(|x| x as f64).collect();
let results: Vec<f64> = data.par_iter()
.map(|x| (x * 3.1415).sin())
.collect();
map在每个子任务内部是 纯函数(pure function),没有副作用,因而天然安全。collect()会在内部使用 分段合并(segment merging)来构建最终的向量,避免了锁的使用。
使用 join 实现递归并行
Rayon 的 join 能够让我们轻松实现 递归并行(例如并行归并排序):
fn parallel_sum(slice: &[i32]) -> i32 {
if slice.len() <= 1_000 {
// 小规模时使用顺序求和,避免线程开销
slice.iter().copied().sum()
} else {
let mid = slice.len() / 2;
let (left, right) = slice.split_at(mid);
rayon::join(
|| parallel_sum(left),
|| parallel_sum(right),
).0 + rayon::join(
|| parallel_sum(left),
|| parallel_sum(right),
).1
}
}
关键点
join接受两个闭包,分别在不同线程上执行,并在两者都完成后返回一个元组。- 递归边界使用顺序求和,以降低线程创建的开销。
- 所有数据都通过不可变引用
&[i32]传递,编译器确保没有竞争。
零竞争的共享可变状态
在某些场景下我们需要在并行任务之间共享可变状态。Rayon 推荐使用 并发容器(如 Mutex、RwLock)或 原子类型(AtomicUsize 等),并且要尽量保持锁的粒度细小。
下面的例子展示了如何使用 AtomicUsize 统计满足条件的元素数量:
use std::sync::atomic::{AtomicUsize, Ordering};
let counter = AtomicUsize::new(0);
data.par_iter().for_each(|x| {
if x.is_nan() {
counter.fetch_add(1, Ordering::Relaxed);
}
});
println!("NaN count = {}", counter.load(Ordering::Relaxed));
AtomicUsize提供 无锁(lock‑free)操作,避免了传统互斥锁的阻塞。- 使用
Ordering::Relaxed在本例中足够,因为我们只关心最终计数值,而不需要严格的同步顺序。
如果需要更复杂的结构(如并行构建哈希表),可以使用 rayon::ThreadPool::install 与 rayon::Scope 来手动管理生命周期,确保所有引用在任务结束前保持有效。
性能基准
| 任务 | 串行时间 (ms) | 并行时间 (ms) | 加速比 |
|---|---|---|---|
| 向量求和(10⁷) | 78 | 12 | 6.5× |
| 正弦映射(10⁷) | 210 | 38 | 5.5× |
| 归并排序(10⁶) | 145 | 27 | 5.4× |
注意
- 加速比受 CPU 核心数、缓存命中率以及任务粒度影响。
- 对于极小的任务(如 <10⁴ 元素),并行化的开销可能抵消收益,建议使用阈值判断。
常见陷阱与最佳实践
- 避免在并行闭包中使用
std::thread::sleep:这会阻塞工作线程,降低整体吞吐量。 - 保持闭包的纯粹性:尽量不要在并行迭代器内部进行 I/O 或全局状态修改。
- 合理设置阈值:使用
rayon::iter::ParallelIterator::with_min_len或自行在递归函数中设定基线,以防止任务过细导致调度开销过大。 - 使用
rayon::ThreadPoolBuilder自定义线程池:在嵌入式或受限环境中,可限制线程数或绑定到特定 CPU。 - 利用
scope:当需要在并行任务中借用局部变量时,scope能够安全地创建子任务而不需要'static生命周期。
rayon::scope(|s| {
let mut vec = vec![0; 1_000];
s.spawn(|_| {
// 这里可以安全地借用 `vec` 的可变引用
vec[0] = 42;
});
// `s` 结束前,所有子任务必须完成
});
结论
Rayon 为 Rust 开发者提供了一套 高层次、零竞争 的并行抽象,使得我们能够在不牺牲安全性的前提下,轻松地将 CPU 密集型任务迁移到多核环境。通过:
- 使用 并行迭代器 替代传统
for循环, - 利用
join实现递归并行, - 结合 原子类型 与 并发容器 管理共享状态,
我们可以写出 简洁、可维护且性能卓越 的并发代码。只要遵循上述最佳实践,Rayon 将帮助你在实际项目中实现 数倍的加速,而无需担心数据竞争或复杂的手动线程管理。
祝你玩得开心,写出更快、更安全的 Rust 并行程序!
📚 关于作者
作为畅销书作者,我邀请您在 Amazon 浏览我的书籍。
别忘了在 Medium 关注我并表示支持。谢谢!您的支持意义重大!
🖥️ 让你的电脑更努力(安全地)
如果你曾经尝试让一个程序一次执行多件事,你会知道这很快就会变得复杂且容易出错。我曾以为安全、快速的并行是需要权衡的——要么有安全,要么有速度。Rust 改变了我的想法。
为什么选择 Rust?
- 零成本抽象 – 你可以获得并行性而不会有运行时开销。
- 强大的编译期保证 – 只要程序能够编译,通过编译的代码就不可能出现某些并发错误(数据竞争、使用后释放等)。
- 所有权与借用 – 编译器检查数据在线程之间的移动方式,在程序运行前就捕获问题。
类比: 想象一个有多位厨师的厨房。在许多语言中,两个厨师可能会同时去抓同一把刀,导致冲突。而在 Rust 中,厨房规则确保每件工具一次只能被一位厨师使用,或者在明确的协议下安全共享。这样既避免了混乱,又不会拖慢任何人。
走进 Rayon
虽然你可以使用 Rust 标准库的 std::thread API,但许多任务使用 Rayon crate 会变得更简单。Rayon 是并行工作的自动组织者:它把你通常会顺序执行的操作(例如遍历列表)分配到所有 CPU 核心上,几乎不需要额外工作。
- 简单的 API 切换:
- 顺序迭代器 →
.iter() - 并行迭代器 →
.par_iter()
- 顺序迭代器 →
只需更改这一个方法,就常常可以把顺序计算转变为并行计算。
🚀 快速开始:平方和
use rayon::prelude::*;
fn main() {
let numbers = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// Parallel iterator – note the `par_iter` call
let sum_of_squares: i32 = numbers
.par_iter()
.map(|&n| n * n)
.sum();
println!("The sum of squares is: {}", sum_of_squares);
}
Rayon的工作窃取调度器 将 numbers 拆分为块,在不同的核心上处理每个块,并自动平衡负载。这是经典的 fork‑join 模型:工作被分叉成并行任务,然后再合并。Rust 的所有权模型保证每个任务对其数据切片拥有独占的临时访问权,从而消除数据竞争。
Source: …
⚠️ 并行代码中的错误处理
并行代码仍然必须优雅地处理失败。Rayon 提供了 try_for_each 和 try_reduce 等方法,在遇到第一个错误时即可短路操作。
示例:将字符串解析为整数
use rayon::prelude::*;
fn parse_all_strings(strings: Vec) -> Result, std::num::ParseIntError> {
strings
.par_iter() // Process in parallel
.map(|s| s.parse::()) // Returns a Result
.collect() // Stops at the first Err
}
fn main() {
let good_data = vec!["1", "2", "3", "4"];
let bad_data = vec!["1", "two", "3", "4"];
println!("Good data: {:?}", parse_all_strings(good_data));
println!("Bad data: {:?}", parse_all_strings(bad_data));
}
collect 是“智能”的:在收集 Result 时,它会在第一个 Err 出现时中止并传播该错误,从而在并行环境中提供安全的错误处理。
Source: …
🔐 共享状态:词频计数示例
并非所有问题都能用简单的 map‑reduce 解决。有时你需要共享的可变状态——这是一类经典的 bug 源。Rust 强制你使用安全的模式,通常通过同步原语如 Mutex 或并发数据结构来实现。
使用 Mutex 计数词频
use rayon::prelude::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
fn count_words(lines: Vec) -> HashMap {
// 共享、线程安全的 HashMap
let word_counts = Arc::new(Mutex::new(HashMap::new()));
lines.par_iter().for_each(|line| {
for word in line.split_whitespace() {
let key = word.to_lowercase().to_string();
// 获取锁,更新映射,然后释放
let mut counts = word_counts.lock().unwrap();
*counts.entry(key).or_insert(0) += 1;
}
});
// 解开 Arc/Mutex,得到最终的 HashMap
Arc::try_unwrap(word_counts)
.expect("Threads still hold Arc")
.into_inner()
.expect("Mutex cannot be poisoned")
}
fn main() {
let text_chunks = vec![
"hello world from rust",
"concurrent rust is safe",
"hello safe world",
];
let counts = count_words(text_chunks);
for (word, count) in counts {
println!("{}: {}", word, count);
}
}
关键点
Arc(原子引用计数)允许多个线程共享对Mutex的所有权。Mutex在线程加锁时保证对可变数据的独占访问。- 所有并行工作完成后,
Arc::try_unwrap提取内部的HashMap。
Source: …
🎯 要点
- Rust + Rayon = 安全且高性能的并行,几乎不需要样板代码。
- 从顺序切换到并行通常只需要改动一个方法名(
.iter()→.par_iter())。 - 错误通过
Result‑感知的组合子(collect、try_for_each…)干净地处理。 - 当共享可变状态不可避免时,使用
Arc<…>(或其他并发原语)来保持无数据竞争。
在下一个 Rust 项目中尝试 Rayon——你的 CPU 核心会感谢你,编译器也会让你保持诚实。祝编码愉快!
注意:
lock().unwrap()调用如果在线程持有锁期间发生 panic,会导致互斥锁被“毒化”。此外,如果某个线程持有锁去添加单词 “the”,所有其他线程都必须等待,即使它们只想添加 “cat”。这会限制并行度。
对于并发计数器,更好的工具是 dashmap crate,它提供了为并发访问设计的哈希映射,采用更细粒度的锁。
use dashmap::DashMap;
use rayon::prelude::*;
fn count_words_faster(lines: Vec) -> DashMap {
let word_counts = DashMap::new();
lines.par_iter().for_each(|line| {
for word in line.split_whitespace() {
let key = word.to_lowercase().to_string();
*word_counts.entry(key).or_insert(0) += 1;
}
});
word_counts
}
fn main() {
let text_chunks = vec![
"hello world from rust",
"concurrent rust is safe",
"hello safe world",
];
let counts = count_words_faster(text_chunks);
for entry in counts {
println!("{}: {}", entry.key(), entry.value());
}
}
DashMap 会为你处理内部锁定,使这类任务的吞吐量大幅提升。函数现在直接返回 DashMap,因为它本身已经是一个智能的共享容器。
粗粒度工作块的分块
如果每个项目的工作量很小(例如,对十个数字求平方),生成并行任务的开销可能会抵消收益。此时应使用更大的块:
use rayon::prelude::*;
fn process_large_image_buffer(pixels: &mut [f32], gain: f32) {
// 并行处理像素,但一次处理 1024 个像素的块。
pixels.par_chunks_mut(1024).for_each(|chunk| {
for pixel in chunk {
*pixel *= gain; // 应用增益调整
}
});
}
找到合适的块大小往往需要对具体应用进行性能分析。
使用 ndarray 的并行矩阵乘法
use ndarray::Array2;
use rayon::prelude::*;
fn parallel_matrix_multiply(a: &Array2, b: &Array2) -> Array2 {
// 这里本应进行维度校验…
let ((m, n), (_n2, p)) = (a.dim(), b.dim());
// 创建一个空的输出矩阵
let mut c = Array2::zeros((m, p));
// 对输出矩阵的行并行化
c.rows_mut()
.into_par_iter()
.enumerate()
.for_each(|(i, mut row)| {
for j in 0..p {
let mut sum = 0.0;
for k in 0..n {
sum += a[(i, k)] * b[(k, j)];
}
row[j] = sum;
}
});
c
}
这里我们对行进行并行化,这是数据并行工作负载的经典模式。
使用 crossbeam 的作用域线程
use crossbeam::thread;
fn main() {
let numbers = vec![1, 2, 3, 4, 5];
thread::scope(|s| {
for num in &numbers {
// 生成一个借用 `num` 的线程。
// 由于作用域保证所有线程在结束前都已加入,这样是安全的。
s.spawn(move |_| {
println!("Processing number: {}", num * 10);
});
}
})
.unwrap(); // 此处所有线程都已完成。
// 我们仍然可以在这里使用 `numbers`。
println!("Original vector: {:?}", numbers);
}
作用域线程让你能够安全地借用栈上数据,而无需像 std::thread::spawn 那样进行繁琐的生命周期处理。
📘 查看我的最新电子书
在 YouTube 上观看免费电子书预览.
确保 点赞、分享、评论,并 订阅 频道!
Source: …
101 书籍
101 Books 是一家由 AI 驱动的出版公司,由 … 共同创立。 (内容继续)
关于作者
Aarav Joshi – 通过利用先进的 AI 技术,我们将出版成本压得极低——有的书籍售价低至 $4——让优质知识惠及每个人。
主打图书
在亚马逊查看我们的图书 Golang Clean Code。
搜索 Aarav Joshi 可找到更多作品,并享受 特别折扣!
我们的创作
- 投资者中心
- 投资者中心西班牙语版
- 投资者中心德语版
- 智能生活
- 时代与回声
- 谜一样的神秘
- Hindutva
- 精英开发
- Java 精英开发
- Golang 精英开发
- Python 精英开发
- JS 精英开发
- JS 学校