Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

测试实例:map-reduce

Rust 使并行数据处理变得非常简单,避免了传统并行处理中常见的诸多问题。

标准库提供了开箱即用的优秀线程原语。这些原语结合 Rust 的所有权概念和别名规则,自动防止了数据竞争。

别名规则(一个可写引用异或多个可读引用)自动防止你操作对其他线程可见的状态。(在需要同步的情况下,可以使用 MutexChannel 等同步原语。)

在这个例子中,我们将计算一个数字块中所有数字的总和。我们通过将数字块分成小块并分配给不同的线程来完成这个任务。每个线程将计算其小块数字的总和,随后我们将汇总每个线程产生的中间结果。

注意,尽管我们在线程间传递引用,但 Rust 理解我们只是传递只读引用,因此不会发生不安全操作或数据竞争。此外,由于我们传递的引用具有 'static 生命周期,Rust 确保这些线程运行时数据不会被销毁。(当需要在线程间共享非 static 数据时,可以使用 Arc 等智能指针来保持数据存活并避免非 static 生命周期。)

use std::thread;

// 这是主线程
fn main() {

    // 这是我们要处理的数据
    // 我们将通过一个线程化的 map-reduce 算法计算所有数字的总和
    // 每个由空格分隔的块将在不同的线程中处理
    //
    // TODO:试试插入空格会对输出有什么影响!
    let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";

    // 创建一个向量来存储我们将要生成的子线程。
    let mut children = vec![];

    /*************************************************************************
     * "Map"阶段
     *
     * 将数据分割成多个段,并进行初步处理
     ************************************************************************/

    // 将数据分割成多个段以进行单独计算
    // 每个数据块都是指向实际数据的引用(&str)
    let chunked_data = data.split_whitespace();

    // 遍历数据段
    // .enumerate() 为迭代的每个元素添加当前循环索引
    // 生成的元组"(索引, 元素)"随后立即通过
    // "解构赋值"被"解构"为两个变量:"i"和"data_segment"
    // 
    for (i, data_segment) in chunked_data.enumerate() {
        println!("数据段 {} 是\"{}\"", i, data_segment);

        // 在单独的线程中处理每个数据段
        //
        // spawn() 返回新线程的句柄,
        // 我们必须保留该句柄以访问返回值
        //
        // 'move || -> u32' 是一个闭包的语法,它:
        // * 不接受参数('||')
        // * 获取其捕获变量的所有权('move')
        // * 返回一个无符号 32 位整数('-> u32')
        //
        // Rust 足够智能,能从闭包本身推断出 '-> u32',
        // 所以我们可以省略它。
        //
        // TODO:尝试移除 'move' 并观察结果
        children.push(thread::spawn(move || -> u32 {
            // 计算此段的中间和:
            let result = data_segment
                        // 遍历此段中的字符...
                        .chars()
                        // ...将文本字符转换为对应的数值...
                        .map(|c| c.to_digit(10).expect("应该是一个数字"))
                        // ...并对结果数字迭代器求和
                        .sum();

            // println! 会锁定标准输出,因此不会出现文本交错
            println!("已处理段 {},结果={}", i, result);

            // 不需要使用 "return",因为 Rust 是一种"表达式语言",
// 每个代码块中最后求值的表达式会自动成为该块的返回值。
            result

        }));
    }


    /*************************************************************************
     * "归约"阶段
     *
     * 收集中间结果,并将它们合并成最终结果
     ************************************************************************/

    // 将每个线程的中间结果合并成一个最终总和。
    //
    // 我们使用 "turbofish" ::<> 为 sum() 提供类型提示。
    //
    // TODO:尝试不使用 turbofish,而是显式
    // 指定 final_result 的类型
    let final_result = children.into_iter().map(|c| c.join().unwrap()).sum::<u32>();

    println!("最终求和结果:{}", final_result);
}

练习

让线程数量依赖于用户输入的数据并不明智。如果用户决定插入大量空格,我们真的想要创建 2,000 个线程吗?修改程序,使数据始终被分割成固定数量的块,这个数量应由程序开头定义的静态常量来确定。

另请参阅: