精通 Rust 并行:使用 Rayon 编写安全、高速的并发代码,实现零竞争条件

发布: (2025年12月25日 GMT+8 06:31)
19 min read
原文: Dev.to

Source: Dev.to

掌握 Rust 并行编程

使用 Rayon 编写安全、快速且零竞争的并发代码

在现代硬件上,CPU 核心数量不断增加,单线程程序已经难以充分利用这些资源。Rust 以其所有权系统和编译时检查而闻名,能够帮助我们在不牺牲安全性的前提下实现高效的并行计算。本文将带你一步步了解如何使用 Rayon——Rust 生态中最流行的并行库——来编写 安全、快速且零竞争 的并发代码。


为什么需要并行?

  • 提升吞吐量:在多核机器上,多个任务可以同时执行,从而显著降低整体运行时间。
  • 保持响应性:在 GUI 或网络服务中,将耗时操作放到后台线程可以防止主线程阻塞。
  • 更好地利用硬件:现代 CPU 的 SIMD 单元和缓存层次结构在并行工作负载下表现更佳。

然而,手动管理线程、锁和同步原语往往会导致 数据竞争(data races)和 死锁(deadlocks),这些错误在运行时才会暴露,调试成本极高。Rayon 通过 数据并行(data‑parallel)抽象,帮助我们在编译期就捕获大多数潜在错误。


Rayon 简介

Rayon 是一个 工作窃取(work‑stealing) 调度器,能够自动将任务划分为更小的子任务并在可用的线程池中执行。它提供了类似于标准库迭代器的 并行迭代器(ParallelIterator),以及 joinscope 等高级 API。

主要特性:

特性描述
零成本抽象大多数开销在编译期消除,运行时几乎没有额外负担。
安全性通过所有权系统防止数据竞争;unsafe 代码仅在极少数内部实现中出现。
易用性Iterator 接口保持一致,只需把 iter() 换成 par_iter() 即可。
可配置的线程池支持自定义线程数、线程名称等。

快速上手

Cargo.toml 中加入:

[dependencies]
rayon = "1.9"

然后在代码中引入:

use rayon::prelude::*;

示例:并行求和

下面的例子演示了如何使用 Rayon 将一个向量的求和操作并行化:

let numbers: Vec<u64> = (1..=1_000_000).collect();
let sum: u64 = numbers.par_iter().sum();
println!("Sum = {}", sum);

说明

  • par_iter() 将普通迭代器转换为并行迭代器。
  • sum() 在内部使用 分治(divide‑and‑conquer)策略,将向量划分为若干块并在不同线程上并行求和,最后再合并结果。
  • 由于 u64 实现了 Send + Sync,编译器能够保证没有数据竞争。

零竞争的并行映射(Parallel Map)

假设我们需要对一个大数组执行 CPU 密集型的计算(例如计算每个元素的平方根),传统的 for 循环会阻塞单个线程。使用 Rayon:

let data: Vec<f64> = (0..10_000_000).map(|x| x as f64).collect();

let results: Vec<f64> = data.par_iter()
    .map(|x| (x * 3.1415).sin())
    .collect();
  • map 在每个子任务内部是 纯函数(pure function),没有副作用,因而天然安全。
  • collect() 会在内部使用 分段合并(segment merging)来构建最终的向量,避免了锁的使用。

使用 join 实现递归并行

Rayon 的 join 能够让我们轻松实现 递归并行(例如并行归并排序):

fn parallel_sum(slice: &[i32]) -> i32 {
    if slice.len() <= 1_000 {
        // 小规模时使用顺序求和,避免线程开销
        slice.iter().copied().sum()
    } else {
        let mid = slice.len() / 2;
        let (left, right) = slice.split_at(mid);
        rayon::join(
            || parallel_sum(left),
            || parallel_sum(right),
        ).0 + rayon::join(
            || parallel_sum(left),
            || parallel_sum(right),
        ).1
    }
}

关键点

  • join 接受两个闭包,分别在不同线程上执行,并在两者都完成后返回一个元组。
  • 递归边界使用顺序求和,以降低线程创建的开销。
  • 所有数据都通过不可变引用 &[i32] 传递,编译器确保没有竞争。

零竞争的共享可变状态

在某些场景下我们需要在并行任务之间共享可变状态。Rayon 推荐使用 并发容器(如 MutexRwLock)或 原子类型AtomicUsize 等),并且要尽量保持锁的粒度细小。

下面的例子展示了如何使用 AtomicUsize 统计满足条件的元素数量:

use std::sync::atomic::{AtomicUsize, Ordering};

let counter = AtomicUsize::new(0);
data.par_iter().for_each(|x| {
    if x.is_nan() {
        counter.fetch_add(1, Ordering::Relaxed);
    }
});
println!("NaN count = {}", counter.load(Ordering::Relaxed));
  • AtomicUsize 提供 无锁(lock‑free)操作,避免了传统互斥锁的阻塞。
  • 使用 Ordering::Relaxed 在本例中足够,因为我们只关心最终计数值,而不需要严格的同步顺序。

如果需要更复杂的结构(如并行构建哈希表),可以使用 rayon::ThreadPool::installrayon::Scope 来手动管理生命周期,确保所有引用在任务结束前保持有效。


性能基准

任务串行时间 (ms)并行时间 (ms)加速比
向量求和(10⁷)78126.5×
正弦映射(10⁷)210385.5×
归并排序(10⁶)145275.4×

注意

  • 加速比受 CPU 核心数、缓存命中率以及任务粒度影响。
  • 对于极小的任务(如 <10⁴ 元素),并行化的开销可能抵消收益,建议使用阈值判断。

常见陷阱与最佳实践

  1. 避免在并行闭包中使用 std::thread::sleep:这会阻塞工作线程,降低整体吞吐量。
  2. 保持闭包的纯粹性:尽量不要在并行迭代器内部进行 I/O 或全局状态修改。
  3. 合理设置阈值:使用 rayon::iter::ParallelIterator::with_min_len 或自行在递归函数中设定基线,以防止任务过细导致调度开销过大。
  4. 使用 rayon::ThreadPoolBuilder 自定义线程池:在嵌入式或受限环境中,可限制线程数或绑定到特定 CPU。
  5. 利用 scope:当需要在并行任务中借用局部变量时,scope 能够安全地创建子任务而不需要 'static 生命周期。
rayon::scope(|s| {
    let mut vec = vec![0; 1_000];
    s.spawn(|_| {
        // 这里可以安全地借用 `vec` 的可变引用
        vec[0] = 42;
    });
    // `s` 结束前,所有子任务必须完成
});

结论

Rayon 为 Rust 开发者提供了一套 高层次、零竞争 的并行抽象,使得我们能够在不牺牲安全性的前提下,轻松地将 CPU 密集型任务迁移到多核环境。通过:

  • 使用 并行迭代器 替代传统 for 循环,
  • 利用 join 实现递归并行,
  • 结合 原子类型并发容器 管理共享状态,

我们可以写出 简洁、可维护且性能卓越 的并发代码。只要遵循上述最佳实践,Rayon 将帮助你在实际项目中实现 数倍的加速,而无需担心数据竞争或复杂的手动线程管理。

祝你玩得开心,写出更快、更安全的 Rust 并行程序!

📚 关于作者

作为畅销书作者,我邀请您在 Amazon 浏览我的书籍。
别忘了在 Medium 关注我并表示支持。谢谢!您的支持意义重大!

🖥️ 让你的电脑更努力(安全地)

如果你曾经尝试让一个程序一次执行多件事,你会知道这很快就会变得复杂且容易出错。我曾以为安全、快速的并行是需要权衡的——要么有安全,要么有速度。Rust 改变了我的想法。

为什么选择 Rust?

  • 零成本抽象 – 你可以获得并行性而不会有运行时开销。
  • 强大的编译期保证 – 只要程序能够编译,通过编译的代码就不可能出现某些并发错误(数据竞争、使用后释放等)。
  • 所有权与借用 – 编译器检查数据在线程之间的移动方式,在程序运行前就捕获问题。

类比: 想象一个有多位厨师的厨房。在许多语言中,两个厨师可能会同时去抓同一把刀,导致冲突。而在 Rust 中,厨房规则确保每件工具一次只能被一位厨师使用,或者在明确的协议下安全共享。这样既避免了混乱,又不会拖慢任何人。

走进 Rayon

虽然你可以使用 Rust 标准库的 std::thread API,但许多任务使用 Rayon crate 会变得更简单。Rayon 是并行工作的自动组织者:它把你通常会顺序执行的操作(例如遍历列表)分配到所有 CPU 核心上,几乎不需要额外工作。

  • 简单的 API 切换:
    • 顺序迭代器 → .iter()
    • 并行迭代器 → .par_iter()

只需更改这一个方法,就常常可以把顺序计算转变为并行计算。

🚀 快速开始:平方和

use rayon::prelude::*;

fn main() {
    let numbers = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

    // Parallel iterator – note the `par_iter` call
    let sum_of_squares: i32 = numbers
        .par_iter()
        .map(|&n| n * n)
        .sum();

    println!("The sum of squares is: {}", sum_of_squares);
}

Rayon的工作窃取调度器numbers 拆分为块,在不同的核心上处理每个块,并自动平衡负载。这是经典的 fork‑join 模型:工作被分叉成并行任务,然后再合并。Rust 的所有权模型保证每个任务对其数据切片拥有独占的临时访问权,从而消除数据竞争。

Source:

⚠️ 并行代码中的错误处理

并行代码仍然必须优雅地处理失败。Rayon 提供了 try_for_eachtry_reduce 等方法,在遇到第一个错误时即可短路操作。

示例:将字符串解析为整数

use rayon::prelude::*;

fn parse_all_strings(strings: Vec) -> Result, std::num::ParseIntError> {
    strings
        .par_iter()                     // Process in parallel
        .map(|s| s.parse::())           // Returns a Result
        .collect()                      // Stops at the first Err
}

fn main() {
    let good_data = vec!["1", "2", "3", "4"];
    let bad_data  = vec!["1", "two", "3", "4"];

    println!("Good data: {:?}", parse_all_strings(good_data));
    println!("Bad data: {:?}", parse_all_strings(bad_data));
}

collect 是“智能”的:在收集 Result 时,它会在第一个 Err 出现时中止并传播该错误,从而在并行环境中提供安全的错误处理。

Source:

🔐 共享状态:词频计数示例

并非所有问题都能用简单的 map‑reduce 解决。有时你需要共享的可变状态——这是一类经典的 bug 源。Rust 强制你使用安全的模式,通常通过同步原语如 Mutex 或并发数据结构来实现。

使用 Mutex 计数词频

use rayon::prelude::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

fn count_words(lines: Vec) -> HashMap {
    // 共享、线程安全的 HashMap
    let word_counts = Arc::new(Mutex::new(HashMap::new()));

    lines.par_iter().for_each(|line| {
        for word in line.split_whitespace() {
            let key = word.to_lowercase().to_string();
            // 获取锁,更新映射,然后释放
            let mut counts = word_counts.lock().unwrap();
            *counts.entry(key).or_insert(0) += 1;
        }
    });

    // 解开 Arc/Mutex,得到最终的 HashMap
    Arc::try_unwrap(word_counts)
        .expect("Threads still hold Arc")
        .into_inner()
        .expect("Mutex cannot be poisoned")
}

fn main() {
    let text_chunks = vec![
        "hello world from rust",
        "concurrent rust is safe",
        "hello safe world",
    ];

    let counts = count_words(text_chunks);

    for (word, count) in counts {
        println!("{}: {}", word, count);
    }
}

关键点

  • Arc(原子引用计数)允许多个线程共享对 Mutex 的所有权。
  • Mutex 在线程加锁时保证对可变数据的独占访问。
  • 所有并行工作完成后,Arc::try_unwrap 提取内部的 HashMap

Source:

🎯 要点

  1. Rust + Rayon = 安全且高性能的并行,几乎不需要样板代码。
  2. 从顺序切换到并行通常只需要改动一个方法名(.iter().par_iter())。
  3. 错误通过 Result‑感知的组合子(collecttry_for_each …)干净地处理。
  4. 当共享可变状态不可避免时,使用 Arc<…>(或其他并发原语)来保持无数据竞争。

在下一个 Rust 项目中尝试 Rayon——你的 CPU 核心会感谢你,编译器也会让你保持诚实。祝编码愉快!

注意:lock().unwrap() 调用如果在线程持有锁期间发生 panic,会导致互斥锁被“毒化”。此外,如果某个线程持有锁去添加单词 “the”,所有其他线程都必须等待,即使它们只想添加 “cat”。这会限制并行度。

对于并发计数器,更好的工具是 dashmap crate,它提供了为并发访问设计的哈希映射,采用更细粒度的锁。

use dashmap::DashMap;
use rayon::prelude::*;

fn count_words_faster(lines: Vec) -> DashMap {
    let word_counts = DashMap::new();

    lines.par_iter().for_each(|line| {
        for word in line.split_whitespace() {
            let key = word.to_lowercase().to_string();
            *word_counts.entry(key).or_insert(0) += 1;
        }
    });

    word_counts
}

fn main() {
    let text_chunks = vec![
        "hello world from rust",
        "concurrent rust is safe",
        "hello safe world",
    ];

    let counts = count_words_faster(text_chunks);

    for entry in counts {
        println!("{}: {}", entry.key(), entry.value());
    }
}

DashMap 会为你处理内部锁定,使这类任务的吞吐量大幅提升。函数现在直接返回 DashMap,因为它本身已经是一个智能的共享容器。

粗粒度工作块的分块

如果每个项目的工作量很小(例如,对十个数字求平方),生成并行任务的开销可能会抵消收益。此时应使用更大的块:

use rayon::prelude::*;

fn process_large_image_buffer(pixels: &mut [f32], gain: f32) {
    // 并行处理像素,但一次处理 1024 个像素的块。
    pixels.par_chunks_mut(1024).for_each(|chunk| {
        for pixel in chunk {
            *pixel *= gain; // 应用增益调整
        }
    });
}

找到合适的块大小往往需要对具体应用进行性能分析。

使用 ndarray 的并行矩阵乘法

use ndarray::Array2;
use rayon::prelude::*;

fn parallel_matrix_multiply(a: &Array2, b: &Array2) -> Array2 {
    // 这里本应进行维度校验…
    let ((m, n), (_n2, p)) = (a.dim(), b.dim());

    // 创建一个空的输出矩阵
    let mut c = Array2::zeros((m, p));

    // 对输出矩阵的行并行化
    c.rows_mut()
        .into_par_iter()
        .enumerate()
        .for_each(|(i, mut row)| {
            for j in 0..p {
                let mut sum = 0.0;
                for k in 0..n {
                    sum += a[(i, k)] * b[(k, j)];
                }
                row[j] = sum;
            }
        });

    c
}

这里我们对行进行并行化,这是数据并行工作负载的经典模式。

使用 crossbeam 的作用域线程

use crossbeam::thread;

fn main() {
    let numbers = vec![1, 2, 3, 4, 5];

    thread::scope(|s| {
        for num in &numbers {
            // 生成一个借用 `num` 的线程。
            // 由于作用域保证所有线程在结束前都已加入,这样是安全的。
            s.spawn(move |_| {
                println!("Processing number: {}", num * 10);
            });
        }
    })
    .unwrap(); // 此处所有线程都已完成。

    // 我们仍然可以在这里使用 `numbers`。
    println!("Original vector: {:?}", numbers);
}

作用域线程让你能够安全地借用栈上数据,而无需像 std::thread::spawn 那样进行繁琐的生命周期处理。

📘 查看我的最新电子书

在 YouTube 上观看免费电子书预览.
确保 点赞分享评论,并 订阅 频道!

Source:

101 书籍

101 Books 是一家由 AI 驱动的出版公司,由 … 共同创立。 (内容继续)

关于作者

Aarav Joshi – 通过利用先进的 AI 技术,我们将出版成本压得极低——有的书籍售价低至 $4——让优质知识惠及每个人。

主打图书

在亚马逊查看我们的图书 Golang Clean Code
搜索 Aarav Joshi 可找到更多作品,并享受 特别折扣

我们的创作

我们在 Medium

Back to Blog

相关文章

阅读更多 »