Concurrency in C++


Reading time: 40 minutes

Concurrency is having multiple threads of execution for a given process. As of today, C++ does not directly support it. However, several libraries exist that will tie a given function to a new thread of execution.In this article we will learn the basics of running concurrent threads with shared memory.It will also give some general idea about:

  1. Shared memory parallelization

    • Threads
    • Race conditions
    • Mutexes
    • Atomicity
    • Asynchronous tasks
    • Condition variables
  2. Producer-consumer problem

Threads

Creating threads is easy:

#include <iostream>
#include <thread>
using namespace std;

void func(int x) {
    cout << "Inside thread " << x << endl;
}

int main() {
    thread th(&func, 100);
    th.join();
    cout << "Outside thread" << endl;
    return 0;
}

Copy this to main.cpp and compile it by running:

$ g++ -std=c++0x main.cpp -pthread

Note that we have to specify that we want to use C++11, but we're using its old name since we have an old version of GCC. We also have to specify -pthread, since the implementation for our GCC uses something called pthreads as its backend. Make sure it compiles and runs.

Race conditions

Let us imagine that x * x is a very costly operation (it's not, but use your imagination) and we want to calculate the sum of squares up to a certain number. It would make sense to parallelize the calulation of each square across threads. We can do something like this:

#include <iostream>
#include <vector>
#include <thread>
using namespace std;

int accum = 0;

void square(int x) {
    accum += x * x;
}

int main() {
    vector<thread> ths;
    for (int i = 1; i <= 20; i++) {
        ths.push_back(thread(&square, i));
    }

    for (auto& th : ths) {
        th.join();
    }
    cout << "accum = " << accum << endl;
    return 0;
}

This should sum all squares up to and including 20. We iterate up to 20, and launch a new thread in each iteration that we give the assignment to. After this, we call join() on all our threads, which is a blocking operation that waits for the thread to finish, before continuing the execution. This is important to do before we print accum, since otherwise our threads might not be done yet. You should always join your threads before leaving main, if you haven't already.

Now, run this. Chances are it spits out 2870, which is the correct answer.

You should definitely see plenty of incorrect answers, even though most of the time it gets it right. This is because of something called a race condition. When the compiler processes accum += x * x;, reading the current value of accum and setting the updated value is not an atomic (meaning indivisible) event. Let's re-write square to capture this:

int temp = accum;
temp += x * x;
accum = temp;

Now, let's say the first two threads are interleaved over time, giving us something like this:

// Thread 1             // Thread 2
int temp1 = accum;      int temp2 = accum;          // temp1 = temp2 = 0
                        temp2 += 2 * 2;             // temp2 = 4
temp1 += 1 * 1;                                     // temp1 = 1
                        accum = temp1;              // accum = 1
accum = temp2;                                      // accum = 4

We end up with accum as 4, instead of the correct 5.

Mutex

A mutex (mutual exlusion) allows us to encapsulate blocks of code that should only be executed in one thread at a time. Keeping the main function the same:

int accum = 0;
mutex accum_mutex;

void square(int x) {
    int temp = x * x;
    accum_mutex.lock();
    accum += temp;
    accum_mutex.unlock();
}

Try running the program repeatedly again and the problem should now be fixed. The first thread that calls lock() gets the lock. During this time, all other threads that call lock(), will simply halt, waiting at that line for the mutex to be unlocked. It is important to introduce the variable temp, since we want the x * x calculations to be outside the lock-unlock block, otherwise we would be hogging the lock while we're running our calculations.

Atomic

C++11 offers even nicer abstractions to solve this problem. For instance, the atomic container:

#include <atomic>

atomic<int> accum(0);

void square(int x) {
    accum += x * x;
}

We don't need to introduce temp here, since x * x will be evaluated before handed off to accum, so it will be outside the atomic event.

Tasks

An even higher level of abstraction avoids the concept of threads altogether and talks in terms of tasks instead. Consider the following example:

#include <iostream>
#include <future>
#include <chrono>
using namespace std;

int square(int x) {
    return x * x;
}

int main() {
    auto a = async(&square, 10);
    int v = a.get();

    cout << "The thread returned " << v << endl;
    return 0;
}

The async construct uses an object pair called a promise and a future. The former has made a promise to eventually provide a value. The future is linked to the promise and can at any time try to retrieve the value by get(). If the promise hasn't been fulfilled yet, it will simply wait until the value is ready. The async hides most of this for us, except that it returns in this case a future object. Again, since the compiler knows what this call to async returns, we can use auto to declare the future.

Condition variables

If we return to threads, it would be useful to be able to have one thread wait for another thread to finish processing something, essentially sending a signal between the threads. This can be done with mutexes, but it would be awkward. It can also be done using a global boolean variable called notified that is set to true when we want to send the signal. The other thread would then run a for loop that checks if notified is true and stops looping when that happens. Since setting notified to true is atomic and in this example we're only setting it once, we don't even need a mutex. However, on the receiving thread we are running a for loop at full speed, wasting a lot of CPU time. We could add a short sleep_for inside the for loop, making the CPU idle most of the time.
A more principled way however is to add a call to wait for a condition variable inside the for loop. The instructor will cover this in Thursday's class, so this will be sneak preview:

#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <chrono>
#include <queue>
using namespace std;

condition_variable cond_var;
mutex m;

int main() {
    int value = 100;
    bool notified = false;
    thread reporter([&]() {
        /*
        unique_lock<mutex> lock(m);
        while (!notified) {
            cond_var.wait(lock);
        }
        */
        cout << "The value is " << value << endl;
    });

    thread assigner([&]() {
        value = 20;
        /*
        notified = true;
        cond_var.notify_one();
        */
    });

    reporter.join();
    assigner.join();
    return 0;
}

First of all, we're using some new syntax from C++11, that enables us to define the thread functions in-place as anynomous functions. They are implicitly passed the local scope, so they can read and write value and notified. If you compile it as it is, it will output 100 most of the time. However, we want the reporter thread to wait for the assigner thread to give it the value 20, before outputting it. You can do this by uncommenting the two /* ... */ blocks in either thread function. In the assigner thread, it will set notified to true and send a signal through the condition variable cond_var. In the reporter thread, we're looping as long as notified is false, and in each iteration we wait for a signal. Try running it, it should work.

But wait, if cond_var can send a signal that will make the call cond_var.wait(lock) blocking until it receives it, why are we still using notified and a for loop? Well, that's because the condition variable can be spuriously awaken even if we didn't call notify_one, and in those cases we need to fall back to checking notified. This for loop will iterate that many times.

This is a simplified description since we are also giving wait the object lock, which is associated with a mutex m. What happens is that when wait is called, it not only waits for a notification, but also for the mutex m to be unlocked. When this happens, it will acquire the lock itself. If cond_var has acquired a lock and wait is called again, it will be unlocked as long as it's waiting to acquire it again. This gives us some structure of mutual exclusion between the two threads, as we will see in the following example.

Producer-consumer problem

You should now have all the tools needed to fix an instance of the producer-consumer problem. Simply put, one thread is producing goods and another thread is consuming goods. We want the consumer thread to wait using a condition variable, and we want goods.push(i) to be mutually exclusive to goods.pop(), so are data doesn't get corrupted. We are letting c++ and c-- be surrogates for this mutual exclusion, since we can easily check if we correctly end up with 0 in the end. Run the code as it is, and you will see that the net value is way off:

#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <chrono>
#include <queue>
using namespace std;

int main() {
    int c = 0;
    bool done = false;
    queue<int> goods;

    thread producer([&]() {
        for (int i = 0; i < 500; ++i) {
            goods.push(i);
            c++;
        }

        done = true;
    });

    thread consumer([&]() {
        while (!done) {
            while (!goods.empty()) {
                goods.pop();
                c--;
            }
        }
    });

    producer.join();
    consumer.join();
    cout << "Net: " << c << endl;
}