0x16 Concurrency

Review

  1. 2024-07-16 07:04

一、Introduction #

Concurrency and parallelism are two terms related to multitasking. In concurrency, tasks have the ability to run in an overlapping manner, which does not necessarily mean they run at the same exact time. It means the start and end times of the tasks intersect. On the other hand, parallelism is the process where several tasks are executed simultaneously.

Concurrent programming, where different parts of a program execute independently, and parallel programming, where different parts of a program execute at the same time.

[!Info]

  1. Futures
  2. Async/Await
  3. Atomic Operations and Memory Barriers
  4. Threads, Channels, and Message Passing

Fearless concurrency allows you to write code that is free of subtle bugs and is easy to refactor without introducing new bugs.

Splitting the computation in your program into multiple threads to run multiple tasks at the same time can improve performance, but it also adds complexity. Because threads can run simultaneously, there’s no inherent guarantee about the order in which parts of your code on different threads will run. This can lead to problems, such as:

  • Race conditions, where threads are accessing data or resources in an inconsistent order
  • Deadlocks, where two threads are waiting for each other, preventing both threads from continuing
  • Bugs that happen only in certain situations and are hard to reproduce and fix reliably

To create a new thread, we call the thread::spawn function and pass it a closure containing the code we want to run in the new thread.

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {i} from the spawned thread!");
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {i} from the main thread!");
        thread::sleep(Duration::from_millis(1));
    }
}

Note that when the main thread of a Rust program completes, all spawned threads are shut down, whether or not they have finished running.

The calls to thread::sleep force a thread to stop its execution for a short duration, allowing a different thread to run.

The return type of thread::spawn is JoinHandle. A JoinHandle is an owned value that, when we call the join method on it, will wait for its thread to finish.

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {i} from the spawned thread!");
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {i} from the main thread!");
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

We’ll often use the move keyword with closures passed to thread::spawn because the closure will then take ownership of the values it uses from the environment, thus transferring ownership of those values from one thread to another.

To use data from the main thread in the spawned thread, the spawned thread’s closure must capture the values it needs.

By adding the move keyword before the closure, we force the closure to take ownership of the values it’s using rather than allowing Rust to infer that it should borrow the values.

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("Here's a vector: {v:?}");
    });

    handle.join().unwrap();
}

message passing #

Do not communicate by sharing memory; instead, share memory by communicating.

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

mpsc 是 多个生产者,单个消费者multiple producer, single consumer)的缩写 mpsc::channel 函数返回一个元组:第一个元素是发送端,而第二个元素是接收端。由于历史原因,tx 和 rx 通常作为 发送者transmitter)和 接收者receiver)的缩写

use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

通道的接收端有两个有用的方法:recv 和 try_recv。这里,我们使用了 recv,它是 receive 的缩写。这个方法会阻塞主线程执行直到从通道中接收一个值。一旦发送了一个值,recv 会在一个 Result<T, E> 中返回它。当通道发送端关闭,recv 会返回一个错误表明不会再有新的值到来了。

try_recv 不会阻塞,相反它立刻返回一个 Result<T, E>Ok 值包含可用的信息,而 Err 值代表此时没有任何消息。

use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}
use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
// --snip--

let (tx, rx) = mpsc::channel();

let tx1 = tx.clone();
thread::spawn(move || {
    let vals = vec![
        String::from("hi"),
        String::from("from"),
        String::from("the"),
        String::from("thread"),
    ];

    for val in vals {
        tx1.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

thread::spawn(move || {
    let vals = vec![
        String::from("more"),
        String::from("messages"),
        String::from("for"),
        String::from("you"),
    ];

    for val in vals {
        tx.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

for received in rx {
    println!("Got: {}", received);
}

// --snip--
}
use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

Reference #