Rust 的卖点之一是 Fearless Concurrency。 并发编程常常充满危险:
当多个线程在没有原子类型或锁定机制保护的情况下写入相同的数据时,可能会发生数据竞争。当线程超过变量声明时,可能会出现生命周期问题。 该变量可能被父级销毁,而子线程仍然依赖它。
让我们比较一些 Rust 和 C++ 代码,看看 Rust 如何保护您免受这些问题的影响。
寻找素数
有很多方法可以分解一个数字并检测它是否为素数。 一些算法非常快——但在本文中,我们将故意使用缓慢的方法。 一个简单的算法使用更多的 CPU 时间,更好地展示了多线程的好处。 我们将使用以下函数来确定一个数字是否为素数:
fn is_prime(n: u32) -> bool {
(2 ..= n/2).all(|i| n % i != 0 )
}
is_prime 从 2 迭代到数字的一半,并确保当测试的数字除以迭代器时,所有迭代都返回零以外的余数(% 运算符)。
为了确保它有效,这里有一个快速的单元测试,它检查 100 以下的素数列表:
#[test]
fn test_first_hundred_primes() {
let primes: Vec<u32> = (2..100).filter(|n| is_prime(*n)).collect();
assert_eq!(
primes,
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97]
);
}
这是一个等效的 C++ 函数:
bool is_prime(uint32_t n) {
for (uint32_t i = 2; i <= n / 2; ++i) {
if (n % i == 0) return false;
}
return true;
}
我保持简单; 范围支持还没有在很多发布的 C++ 编译器中使用——使用 C++20 标准的 iota 视图将启用类似的基于迭代器的方法。 但是,该代码在功能上是等效的。
单线程素数检测
下面的 Rust 程序计算它可以在 2 到 200,000 之间找到多少个素数:
fn main() {
const MAX: u32 = 200_000;
let n_primes = (2..MAX).filter(|n| is_prime(*n)).count();
println!(“Found {n_primes} prime numbers in the range 2..{MAX}”);
}
该程序在 2 到 200,000 范围内找到 17,984 个素数。
等效的 C++ 程序如下所示:
int main() {
const auto max = 200000;
uint32_t count = 0;
for (auto i=2; i < max; i++) {
if (is_prime(i)) ++count;
}
std::cout << “Found “ << count << ” prime numbers.\n”;
}
这个程序还找到了 17,984 个素数。 到目前为止,一切都很好。 让我们来一场数据竞赛吧!
赛车数据
为了加快我们的素数计数,我们急切地启动了一些 C++ std::thread 对象来在两个线程之间分配我们的工作负载:
int main() {
const auto max = 200000;
uint32_t count = 0;
std::thread t1([&count, max]() {
for (auto i = 2; i < max/2; i++) {
if (is_prime(i)) ++count;
}
});
std::thread t2([&count, max]() {
for (auto i = max/2; i < max; i++) {
if (is_prime(i)) ++count;
}
});
t1.join();
t2.join();
std::cout << “Found ” << count << ” prime numbers.\n“;
}
该程序编译并运行。 Visual Studio(我的编译器)不显示任何警告。 代码将工作负载一分为二,产生了两个线程——每个线程覆盖了质数的一半。
不幸的是,它不起作用。 我第一次运行它时,它计算了 17,983 个素数。 第二次,它只数了17,981! 线程同时访问计数。 递增变量是一个三阶段的过程:读取当前值,将其加一,然后存储结果。 无法保证所有这些步骤都独立于其他线程完成——导致数据竞争。
也许相同代码的 Rust 版本可以工作?
fn main() {
const MAX: u32 = 200_000;
let mut counter = 0;
let t1 = std::thread::spawn(|| {
counter += (2 .. MAX/2).filter(|n| is_prime(*n)).count();
});
let t2 = std::thread::spawn(|| {
counter += (MAX/2 .. MAX).filter(|n| is_prime(*n)).count();
});
t1.join();
t2.join();
println!(“Found {counter} prime numbers in the range 2..{MAX}”);
}
同一程序的 Rust 版本无法编译。 它给出了两个错误:
counter 可能不会多次被可变借用。counter 可能比它的借来的价值更长。 一辈子的问题。
Rust 检测到您的程序不安全,并阻止您创建无意中给出错误答案的代码。
安全多线程
这是 Rust 中的一个安全版本,它既可以编译又给出正确答案:
fn main() {
const MAX: u32 = 200_000;
let counter = Arc::new(AtomicUsize::new(0));
let counter_1 = counter.clone();
let counter_2 = counter.clone();
let t1 = std::thread::spawn(move || {
counter_1.fetch_add(
(2 .. MAX/2).filter(|n| is_prime(*n)).count(),
std::sync::atomic::Ordering::SeqCst
);
});
let t2 = std::thread::spawn(move || {
counter_2.fetch_add(
(MAX/2 .. MAX).filter(|n| is_prime(*n)).count(),
std::sync::atomic::Ordering::SeqCst
);
});
t1.join();
t2.join();
println!(“Found {} prime numbers in the range 2..{MAX}”,
counter.load(Ordering::SeqCst));
}
这个程序中有三个新概念:
AtomicUSize 是一种特殊的“原子”类型。原子类型提供针对数据竞争的自动保护。它们通常非常快,直接映射到等效的 CPU 指令。每当你访问一个原子类型时,你必须指出你需要的同步保证——在这种情况下,SeqCst 提供了最高级别的保护。Arc 是一个“原子引用计数器”。当您在 Arc 中包装类型时,您可以安全地克隆它并在线程之间移动克隆。每个克隆都是指向所包含数据的指针。 Arc 保证在每次引用都使用它之前不会删除基础数据。线程闭包(内联函数)移动它们捕获的数据。它们在 counter Arc 的克隆上运行——所以它们都指向同一个地方。
结合这些概念可以保护我们免受数据竞争和生命周期问题的影响:
Rust 的借用检查器很高兴,因为数据不会在线程之间借用:Arc 被克隆,安全地共享指向计数器数据的指针。Rust 的数据竞争保护对使用 AtomicUsize 不会导致数据竞争感到满意。结果是一个快速的程序,每次都能给出正确的答案。
包起来
Rust 无所畏惧的并发保证使我们免于数据竞争! C++ 没有警告错误,它只是给出了不正确的结果。 Rust 通过拒绝编译不正确的代码挽救了局面。在大型程序中,这可以节省数小时的艰苦调试。
Rust 的 Arc 和 AtomicUSize 类型使代码看起来很复杂。在本系列的下一部分中,我将向您展示如何控制复杂性并创建快速、多线程且易于阅读的代码。
暂无评论内容