Message-Sending Concurrency in Rust

Internship at OpenGenus

Get FREE domain for 1st year and build your brand new site

Let us continue dissecting this whole concurrency and parallelism business. Read the previous part on Concurreny in Rust. In this article, we have covered the idea of Closures and Communication using Multiple Producer Single Consumer (MPSC) in Rust.

Table of contents:

  1. Quick recap
  2. Closures
  3. Communication using MPSC

Quick Recap

Allow me to copy the code example I used in the last article, just to give you a quick recap, so you don't have to have both articles open if you've already read the first one.


use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("Hello! I've said Hello {} times from the child thread.", i);
            thread::sleep(Duration::from_secs(1));
        }
    });

    for i in 1..5 {
        println!("Hi! I've said Hi {} times from the main thread.", i);
        thread::sleep(Duration::from_secs(1));
    }

    handle.join().unwrap();
}

This piece of code spawns two threads. The Main one, the starting point of every Rust program and a child thread, who's handle is stored in the appropiately named handle variable. Using the handle right before the end of our Main Function, we ask the main thread to halt until the child thread is done. Pretty neat.

Now you might notice something strange that we haven't seen before in our articles... Namely, this:

thread::spawn(|| {
        // Snip. not important
        }

What is that syntax? "|| {}". These are called closures.

Closures

If you know the terms Lambda function or anonymous function, you already know what a Closure is. It is exactly the same. For those who don't know. Quick example. Show, don't tell. Here we go.

fn main() {
    println!("The sum is {}", simple_sum(2, 3));

    let closure = |x: i64, y: i64| x + y;

    println!("The sum using closures is {}", closure(2, 3));
}

fn simple_sum(x: i64, y: i64) -> i64 {
    x + y
}

closure-rust

As you can see, the result is exactly the same. Although one is a function, the simple_sum function. What about the other? What about the closure variable? It IS also a function. But it's anonymous. It has no name. It can be stored in a variable or..not. Just be called in place (Some standard library functions take closures as a parameter. For example, when you use the filter function of an iterator, the Predicate, is a closure.

predicate-rust

Closures have two parts.

|x: i64, y: i64| x + y;

Between the | | symbols we write the parameters our anonymous function will take. If there are none parameters needed, the space is left empty (IE: ||). Next, come the { }. That is the body of our function. One note, if the body is just one statement, like in the example I gave up there, you can skip the curly brackets, otherwise, you must use them.

So as you can see, a Closure is not that much different from a standard function. So, back to our original code.

// Snip
let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("Hello! I've said Hello {} times from the child thread.", i);
            thread::sleep(Duration::from_secs(1));
        }
    });
    
    // Snip

Our spawn function takes a closure, which uses no parameters, and has a simple for loop in it's body. So, let's try something with a variable. I'll take this following example directly from the rust book, since it goes straight to the point.

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

Before I can even try to compile this, my text editor is already complaining. Let's take a look at the complaint, and what it means.

closure-warning-rust

Borrowing? Ownership? These are key concepts of Rust, that will take a little while to assimilate. I encourage you to stop here, open a new tab or window and go to the Ownership chapter in the rust book, read it a few times. Ownership and Borrows are outside the scope of this article, but it is necessary for you to at least somewhat understand the concept, before moving on. It's essential to understand the above problem and it's fix.

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

Did you read the chapter? Are you sure? Good! Let's continue. In the above snippet I've added the move keyword. That means we're moving ownership of our V variable into the closure's environment.

Rust's Ownership 'system' is key to attaining memory safety without using garbage collectors. You have all the power of memory management, but without the danger of breaking everything due to some silly mistake.

So far we've been using only 1 child thread. When we add more child threads, our complexity will quickly increase. A popular method of dealing with extra complexity is message passing. Threads communicate with each other, and act accordingly.

Communication using MPSC

MPSC stands for Multiple Producer Single Consumer. This means that a Channel (Which is Rust's tool for accomplishing this message-sending concurrency), can have many let's say entities producing and sending these message, but only one that will consume all these messages. I'll close this article by using a single producer single consumer example to show this, but leave the proper MPSC for the next article. (As a way to entice you to dig deeper aswell.)

use std::sync::mpsc;
use std::thread;

fn main() {
    let (transmitter, receiver) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("Hi");
        transmitter.send(val).unwrap();
    });

    let received = receiver.recv().unwrap();
    println!("Got \"{}\"", received);
}

cargo-run-rust-1

We store our channel in a tuple, Transmitter and Receiver (In the book they're called tx and rx, I've used longer variable names to make things slightly clearer. In the future I'll use shorter names though).

We then create a thread that moves the transmitter into it's scope, as seen in the closure's body. We then send a value (and unwrap it so the program will panic and exit if that returns an error. We would need to do proper error handling here, this is outside of the scope of this article).

Then, outside that child thread, in the main thread (where we still own the receiver part of the tuple), we receive that message using the receiver, store this value after unwrapping in the received variable and print it. We have successfully sent a message from the child thread, to another thread. The possibilities here are really interesting.

I'll end this article at OpenGenus, here. In the next one, we'll explore proper MPSC message sending to wrap up Message-Sending concurrency, and start looking at another type of concurrency, the Shared-State concurrency. Hope to see you then!

References

Ownership chapter in the Rust Book
MPSC chapter in the Rust Book