测试实例:map-reduce
Rust 使并行数据处理变得非常简单,避免了传统并行处理中常见的诸多问题。
标准库提供了开箱即用的优秀线程原语。这些原语结合 Rust 的所有权概念和别名规则,自动防止了数据竞争。
别名规则(一个可写引用异或多个可读引用)自动防止你操作对其他线程可见的状态。(在需要同步的情况下,可以使用 Mutex
或 Channel
等同步原语。)
在这个例子中,我们将计算一个数字块中所有数字的总和。我们通过将数字块分成小块并分配给不同的线程来完成这个任务。每个线程将计算其小块数字的总和,随后我们将汇总每个线程产生的中间结果。
注意,尽管我们在线程间传递引用,但 Rust 理解我们只是传递只读引用,因此不会发生不安全操作或数据竞争。此外,由于我们传递的引用具有 'static
生命周期,Rust 确保这些线程运行时数据不会被销毁。(当需要在线程间共享非 static
数据时,可以使用 Arc
等智能指针来保持数据存活并避免非 static
生命周期。)
use std::thread; // 这是主线程 fn main() { // 这是我们要处理的数据 // 我们将通过一个线程化的 map-reduce 算法计算所有数字的总和 // 每个由空格分隔的块将在不同的线程中处理 // // TODO:试试插入空格会对输出有什么影响! let data = "86967897737416471853297327050364959 11861322575564723963297542624962850 70856234701860851907960690014725639 38397966707106094172783238747669219 52380795257888236525459303330302837 58495327135744041048897885734297812 69920216438980873548808413720956532 16278424637452589860345374828574668"; // 创建一个向量来存储我们将要生成的子线程。 let mut children = vec![]; /************************************************************************* * "Map"阶段 * * 将数据分割成多个段,并进行初步处理 ************************************************************************/ // 将数据分割成多个段以进行单独计算 // 每个数据块都是指向实际数据的引用(&str) let chunked_data = data.split_whitespace(); // 遍历数据段 // .enumerate() 为迭代的每个元素添加当前循环索引 // 生成的元组"(索引, 元素)"随后立即通过 // "解构赋值"被"解构"为两个变量:"i"和"data_segment" // for (i, data_segment) in chunked_data.enumerate() { println!("数据段 {} 是\"{}\"", i, data_segment); // 在单独的线程中处理每个数据段 // // spawn() 返回新线程的句柄, // 我们必须保留该句柄以访问返回值 // // 'move || -> u32' 是一个闭包的语法,它: // * 不接受参数('||') // * 获取其捕获变量的所有权('move') // * 返回一个无符号 32 位整数('-> u32') // // Rust 足够智能,能从闭包本身推断出 '-> u32', // 所以我们可以省略它。 // // TODO:尝试移除 'move' 并观察结果 children.push(thread::spawn(move || -> u32 { // 计算此段的中间和: let result = data_segment // 遍历此段中的字符... .chars() // ...将文本字符转换为对应的数值... .map(|c| c.to_digit(10).expect("应该是一个数字")) // ...并对结果数字迭代器求和 .sum(); // println! 会锁定标准输出,因此不会出现文本交错 println!("已处理段 {},结果={}", i, result); // 不需要使用 "return",因为 Rust 是一种"表达式语言", // 每个代码块中最后求值的表达式会自动成为该块的返回值。 result })); } /************************************************************************* * "归约"阶段 * * 收集中间结果,并将它们合并成最终结果 ************************************************************************/ // 将每个线程的中间结果合并成一个最终总和。 // // 我们使用 "turbofish" ::<> 为 sum() 提供类型提示。 // // TODO:尝试不使用 turbofish,而是显式 // 指定 final_result 的类型 let final_result = children.into_iter().map(|c| c.join().unwrap()).sum::<u32>(); println!("最终求和结果:{}", final_result); }
练习
让线程数量依赖于用户输入的数据并不明智。如果用户决定插入大量空格,我们真的想要创建 2,000 个线程吗?修改程序,使数据始终被分割成固定数量的块,这个数量应由程序开头定义的静态常量来确定。