MPSC messages + Shared-State Concurrency in Rust

Free Linux Book

Get FREE domain for 1st year and build your brand new site

As promised, here's the rest of the MPSC (Read the previous post on Message-Sending Concurrency in Rust) Plus! A beginning introduction to the concept of Shared-State concurrency!

In this article, we have presented the idea of MPSC messages + Shared-State Concurrency in Rust along with Mutex. Pointer, Smart Pointer, Reference Counting and Atomic Reference Counting.

Table of contents:

  1. Quick Recap!
  2. Actual MP in MPSC
  3. Shared-State Concurrency Overview
  4. Mutex
  5. Pointers and Smart Pointers
  6. RC (Reference Counting)
  7. Arc (Atomic Reference Counting)

Quick recap!

Here's where we left our code off!

use std::sync::mpsc;
use std::thread;

fn main() {
    let (transmitter, receiver) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("Hi");
        transmitter.send(val).unwrap();
    });

    let received = receiver.recv().unwrap();
    println!("Got \"{}\"", received);
}

As we can see, we store our Channel, in a transmitter-receiver tuple. We then use a child thread to send a message using our transmitter, which is then received by the receiver, and printed. Sounds good? Okay let's continue.

As you can see, we're using MPSC, which stands for Multiple Producer Single Consumer. But we only have one producer here. Let's add a bit more to the code, and start using multiple producers!

Actual MP in MPSC

If you can, try to read the code first, don't look a the image with the output. Scroll down just enough so you can read the code, but not see the picture. Think about what's being done, and try to figure out or guess what the output should be. What would happen if we ran this a few times?

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (transmitter_one, receiver) = mpsc::channel();

    // Second Transmitter clones the first one
    let transmitter_two = transmitter_one.clone();
    thread::spawn(move || {
        let values = vec!["Hello", "Message", "Thread", "Producer."];
        for value in values {
            transmitter_two.send(value).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    // First Transmitter
    thread::spawn(move || {
        let values = vec!["This is a", "from another.", "And another"];
        for value in values {
            transmitter_one.send(value).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in receiver {
        println!("Got {}!", received);
    }
}

I ran the code a few times and something funny happened. Notice how the output is slightly different? This is because we have no assurance as to which thread our OS will execute first. We don't know how the sleep-awake cycle is performed, which thread will be given priority by our OS. Keep this in mind.

output-concurrency-rust

With that out of the way, as you can see, having multiple producers is really easy. We just need to clone/copy our original one. A channel links transmitter to receiver. If we copy the transmitter, this new clone still is linked to the same receiver. Rust's standard library does not have a way of cloning receivers. That will need to be handled in some other way. I'm sure there are crates out there that implement MPMC (Multiple Producers Multiple Consumers), but I'll leave this digging and exploring to you.

Now then.. Let us explore a different method of Concurrency. The Shared-State method.

Shared-State Concurrency

So, when we talk about Shared-State, we're essentially talking about sharing memory. As I mentioned in the first Concurrency article, this can bring some issues.

Let's imagine for a moment that we have two independent threads running at the same time. But both have a particular variable that they both need to read and/or write. We have no guarantee as to which one will access this shared resource first. This is called a race condition.

Multiple Threads can access the same memory at the same time, creating problems.
The message passing model is single ownership, because once you pass down a value, you should not be using it anymore. Shared-State is multiple ownership.

Let's have a look at the Mutex, the first of Rust's tools for Shared-State concurrency.

Mutex

Mutex stands for Mutual Exclusion, as in, a mutex only allows one thread to access the data at a time. This is to solve the race condition.

use std::sync::Mutex;

fn main() {
    let number = Mutex::new(5);

    {
        let mut number = number.lock().unwrap();
        *number = 6;
    }

    println!("Number is {:?}", number);
}

We create Mutex using the ::new function, as we've seen in other things. The exclusion part is evident when we grab a hold of the number variable. The first thing we do, is use the .lock() function. Which promptly locks the variable to this thread.

Okay but we're not using several threads here. Let's take a look at the example in the book, which actually will not compile.

We create a counter, which is a mutex holding a value of 0. Then we create a vector of handles for 10 threads, which will each increment our number by 1.

use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

counter-error-rust

This error as shown by my editor is telling us the value of counter was moved into the thread. More specifically, here:

//snip
thread::spawn(move || // Here
//snip

So, we need to introduce Multiple Ownership. Since we skipped ahead a bit in the book, when it comes to these articles, we'll go back a bit to Chapter 15, which is all about Pointers, or more specifically, Smart Pointers

Pointers and Smart Pointers

But what is a pointer? A pointer is a thing that points. Clever huh? Jokes aside, we have to talk about the stack and the heap.

The Stack and the Heap

Both Stack and heap are located in your computer's RAM. The Stack has a fixed size, while the heap does not. The stack is faster compared to the heap. (Over simplified of course, this is not the subject of this article).

If you want to create and save an object that is bigger than the stack size, it will cause a stack overflow. What you do in that case then, is to create the object in the heap, which has no fixed size. But how do you access it? You store a reference to the start point of your object in the heap. This reference is stored in the stack, and is called a pointer.

Our pointer P, points or references, to the position in memory X. Which is not 1. It's literally a position in memory. To access the number 1, you'd dereference your pointer, which nets you access to that value.

If it sounds convoluted.. It is at first. I recommend doing a bit of digging on your own. It really is not as complicated as it seems.

RC (Reference Counted) Smart pointer

Smart pointers, on the other hand, are data structures that not only act like a pointer but also have additional metadata and capabilities. The concept of smart pointers isn’t unique to Rust: smart pointers originated in C++ and exist in other languages as well. In Rust, the different smart pointers defined in the standard library provide functionality beyond that provided by references.

As always, the book is our best resource in Rust. I also like the real life example they provide (which I will quote below). Refrence Counted or Reference Counting implies that this smart pointer keeps track of how many users it has.

Imagine Rc<T> as a TV in a family room. When one person enters to watch TV, they turn it on. Others can come into the room and watch the TV. When the last person leaves the room, they turn off the TV because it’s no longer being used. If someone turns off the TV while others are still watching it, there would be uproar from the remaining TV watchers!

So we can use a RC Smart pointer when we need multiple parts of our program to use this value. Let's implement this into our code. If we make our counter a Rc pointer, we should be set right? ....Right?

use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

mutex-rust

Unfortunately, Rc cannot be shared safely between threads. This is the important part:

the trait Send is not implemented for Rc<Mutex<i32>>

The Send trait is the one in charge of making sure whatever implements it is capable of being multithreaded / used concurrently.

Luckily, there's an alternative.

Atomic Reference Counting

I will not go into detail about what they are exactly, or how they work, because to be honest it is beyond my understanding aswell. But what we do need to know, is that Arc<T>s are safe to use in concurrent situations. Atomics work like primitive types, but for concurrency. So, let us use these instead. Rc and Arc share the same API so we only need to change a few things here and there. Replace the use to use Arc and Mutex, instead of Rc and Mutex, and then everywhere we used Rc, we replace with Arc. I added a print statement in our new threads just to make it clearer when I'm spawning a new thread, and how the count is increasing.

We don't use atomics everywhere because it comes at a performance cost. If you need to use concurrency, use atomics. If you don't, use simple primitives!

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
            println!("New thread! Count is now {}", *num);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

atomic-reference-counting-rust

This was a rather lengthy article, the lenghtiest I've written so far. But it was necessary to avoid leaving gaps in our basic theory knowledge.

I wonder what the next stop in our rusty journey is. I'm excited to find out. Will you join me again next time? Hope to see you there!

References

MPSC Chapter in the Rust Book
Shared-State Concurrency Chapter in the Rust Book