×

Search anything:

Linux threads: synchronization

Internship at OpenGenus

Get this book -> Problems on Array: For Interviews and Competitive Programming

Programming and debugging a threaded program has its difficulties since we don't always know how a system will schedule threads or even reproduce a behavior caused by a bug in such a program. In this article we have discussed three synchronization techniques namely mutexes, semaphores and condition variables which solve such issues.

Table of contents.

  1. Introduction.
  2. Race condition.
  3. Mutual exclusion locks(mutexes).
  4. Deadlocks.
  5. Semaphores.
  6. Condition variables.
  7. Summary.
  8. References.

Prerequisites.

  1. Linux threads: Creation, Data passing, Waiting
  2. Linux threads: cancellation, data and cleanup

Introduction.

Threaded programs are concurrent programs, there is no way of knowing how a system will schedule threads, that is, a system might let a thread run for a long time or it can switch between multiple threads.

Debugging a threaded program is difficult since we can't easily reproduce a bug. This is because of how a system will schedule threads, when we run the program the first time we might find a bug but when we run the second time the bug is nowhere to be found. This is due to the way threads are scheduled, at both times, the threads are scheduled differently.

Usually the cause of bugs in threaded programs is caused by data access. Threads can use the same data for each of their operation which is a huge pro but also a con since it is the source of bugs e.g a thread accessing data while another is updating the same data.

Race condition.

We have the following example of a thread function which processes jobs it takes from a queue.

#include<malloc.h>

struct job{
    // field for linked list
    struct job* next;

    // other fields describing work
};

// pending jobs
struct job* jobQueue;

// processing jobs
void *threadFunction(void* arg){
    while(jobQueue != NULL){
        // get next job
        struct job* nextJob = jobQueue;
        // remove job from list
        jobQueue = jobQueue->next;
        // process the job
        processJob(nextJob);
        // cleanup
        free(nextJob);
    }
    return NULL;
}

After a job is completed the function will check the queue to look for other jobs.
Assume that a single job is left in the queue and two threads complete their tasks at the same time now they are looking for the next job.

The first checks the queue and finds a job, it enters the loop and stores the pointer to the job object in nextJob.
The second also does the same.

Now the two threads are executing the same job, furthermore one thread will unlink a job from the queue leaving it NULL and thus when a thread evaluates job_queue->next we will have a segmentation fault.
This is a race condition.

On one instance this program might run successfully without hitches since the threads were scheduled right on another a similar problem as described above will occur and will lead to bugs that are hard to fix.

Race conditions are eliminated by making operations atomic, whereby once an operation begins, it will be indivisible and uninterruptible until it is completed.

In our example we could do this by checking the queue, if it is not empty we remove the first job as a single atomic operation.

Mutual Exclusion Locks(mutexes).

A solution to race conditions is to only allow a single thread to access the queue at a time, that is, when a thread is evaluating the queue, no other thread should be able to access the queue until the current thread decides the process the job after which the job is removed from the queue.

This is possible by use of mutexes, these is a locking mechanism that limits resource access.

In this case, if a thread locks a mutex and another tries to lock the same mutex, it is blocked or put on hold. When the first thread unlocks the mutex then the second is allowed to proceed, this will guarantee that race conditions are avoided.

We demonstrate this with an example of a job queue protected by a mutex.

#include<malloc.h>
#include<pthread.h>

struct job{
    // link field for linked list
    struct job* next;

    // other fields
};

// pending jobs
struct job* jobQueue;

// a mutex
pthread_mutex_t jobQueueMutex = PTHREAD_MUTEX_INITIALIZER;

// process jobs
void* threadFunction(void* arg){
    while(1){
        struct job* nextJob;

        // lock mutex on job queue
        pthread_mutex_lock(&jobQueueMutex);

        // check if queue is empty
        if(jobQueue == NULL)
            nextJob = NULL;
        else{
            // get next job
            nextJob = jobQueue;
            // remove job from queue
            jobQueue = jobQueue->next;
        }
        // done with queue - unlock mutex in job
        pthread_mutex_unlock(&jobQueueMutex);
        
        // terminate thread if queue was empty
        if(nextJob == NULL)
            break;

        // process the job
        processJob(nextJob);

        // cleanup
        free(nextJob);
    }
    return NULL;
}

We create a mutex with default attributes by initializing it with PTHREAD_MUTEX_INITIALIZER.

A thread attempts to lock a mutex by calling pthread_mutex_lock, if it is unlocked, it becomes locked and the function returns immediately, otherwise pthread_mutex_lock will block execution and only returns when mutex is unlocked by the other thread.

Multiple threads can be blocked at a time, when a mutex is unlocked, one thread is selected and it locks the mutex while others stay blocked.
pthread_mutex_unlock called from the same function that locked the mutex is used to unlock a mutex.

From the above example, each thread will lock a mutex and only when the checking sequence of the queue is completed, the mutex is unlocked, this prevents race conditions.

Accesses to jobQueue and shared data pointer are between pthread_mutex_lock and pthread_mutex_unlock, nextJob is accessed outside this region after it has been removed from the queue. At this point it is accessible to other threads.
If jobQueue is empty we don't break out of the while loop as it will leave mutex locked permanently and thus prevent other threads from accessing it.

We set nextJob to null so as to remember this and exit loop only after mutex is unlocked.

Deadlocks.

Although mutexes allow a thread to block the execution of another this leads to a new class of bugs referred to as deadlock whereby threads are stuck waiting.
A deadlock may occur when the same thread attempts to lock a mutex twice in a row.

We solve this by testing whether a mutex is locked without actually blocking on it, that is, a thread might need to lock a mutex but may have other work to do instead of blocking, if the mutex is already locked and since pthread_mutex_lock only returns when a mutex is unlocked, the function pthread_mutex_trylock is used. When called on an unlocked mutex, it locks the mutex as if we called pthread_mutex_lock and it returns zero.

If mutex is already locked by another thread, pthread_mutex_lock will not block, instead it returns with an error code EBUSY.
In this case the mutex lock held by the other thread is not affected.

Semaphores.

In the previous example where several threads process jobs from the queue, the main thread function carries out the next job until the queue is empty then exits.
This works on the condition that all jobs are queued in advance or if new jobs are queued they are queued at least as quickly as they are processed, however if the threads work very fast, the queue will be empty and they will exit then if new jobs are queued there will be no threads left to process them.

A mechanism that blocks threads when the queue empties until new jobs are added to the queue is needed in such a case.
A semaphore is a counter used to synchronize multiple threads.
Each semaphore will have a non-negative integer as a counter value.
Operations of a semaphore include,
wait, which decrements the semaphore value by 1, if it was previously 0 the operation blocks until its value is positive. When it is positive, it is decremented by 1 and the operation returns.
post, increments the value by 1, if it was previously 0 and other threads are blocked in a wait operation on that semaphore, one thread is unblocked and its wait operation completes.

An example of a job queue controlled by a semaphore is as follows,

#include<malloc.h>
#include<pthread.h>
#include<semaphore.h>

struct job{
    // link field for linked list
    struct job* next;

    // other fields
};

// pending jobs
struct job* jobQueue;

// mutex to protect jobQueue
pthread_mutex_t jobQueueMutex = PTHREAD_MUTEX_INITIALIZER;

// semaphore to count jobs in queue
sem_t jobQueueCount;

// initialize job queue
void initializeJobQueue(){
    jobQueue = NULL;
    // initialize semaphore with initial count as 0
    sem_init(&jobQueueCount, 0, 0);
}

// process queued jobs
void* threadFunction(void* arg){
    while(1){
        struct job* nextJob;
        
        // wait on jobQueue sempahore
        // if value >= 1 decrement count
        // if queue is empty, block until new job is added to queue
        sem_wait(&jobQueueCount);

        // lock mutex on job queue
        pthread_mutex_lock(&jobQueueMutex);
        
        // queue not null, get next job and remove it from list
        nextJob = jobQueue;
        jobQueue = jobQueue->next;

        // unlock mutex
        pthread_mutex_unlock(&jobQueueMutex);

        // process jobs
        processJob(nextJob);
        // clean up
        free(nextJob);
    }
    return null;
}

void enqueueJobs(/* job data */){
    struct job* newJob;
    // allocate new job
    newJob = (struct job*) malloc(sizeof(struct job));
    // other job struct fields ...

    // lock mutex on job queue before accessing
    pthread_mutex_lock(&jobQueueMutex);

    // place job at head of queue
    newJob->next = jobQueue;
    jobQueue = newJob;

    // post semaphore - another job is available
    // if threads are blocked, waiting on semaphore
    // one is unblocked to process the job
    sem_post(&jobQueueCount);

    // unlock job queue mutex
    pthread_mutex_unlock(&jobQueueMutex);
}

sem_t represents a semaphore. We initialize it using sem_init and pass a pointer to sem_t variable, 0 as the second parameter and its initial value as the third.
enqueueJobs adds a job to the queue.

Threads here never exit, if no jobs are available they block in sem_wait.

Condition variables.

A condition variable enables us to synchronize threads by implementing complex conditions under which threads will execute.

Assuming we write a thread function which executes an infinite loop whereby at each iteration, work is performed and we control it with a flag such that it only runs when the flag is set otherwise it pauses.
This is inefficient as it consumes alot of processing because the thread function will spend time checking if the flag is set each time locking and unlocking the mutex.
A better approach would be to put the thread to sleep when the flag is not set until a circumstance changes that might cause it to become set.

A condition variable will help us implement a condition whereby a thread executes and when it blocks.
A thread may wait on a condition variable, that is if a thread x waits on a condition variable, it is blocked until another thread y signals the same condition variable.
A condition variable does not have a counter or memory and therefore thread x must wait on the condition variable before thread y begins, otherwise if thread y signals the condition variable before thread x waits on it, the signal is lost.

To optimize the previous example, we would use loop in thread_function to check the flag. If it is not set, the thread will wait on the condition variable. set_thread_flag will signal the condition variable after changing the value of the flag and thus if thread_function is blocked on the condition variable, it will be unlocked and will check the condition again.

However a race condition exists in this approach between checking the flag value and signaling or waiting on the condition variable.

We solve it by locking the flag and condition variable with a single mutex as follows,
The loop in thread_function will lock the mutex and read the flag value, if it is set, it unlocks the mutex and executes the work function, otherwise it atomically unlocks the mutex and waits on the condition variable.

The possibility of another thread changing the flag value and signal the condition variable between thread_function's test of flag value and wait in the condition variable is eliminated.

When a program performs an action which might change the sense of the condition we are protecting with the condition variable,(e.g the condition is the state of the thread flag and therefore we take these steps when it is changed), first we lock the mutex accompanying the condition variable, next we take an action that might change the sense of the condition in this case set the flag, Thirdly, we signal the condition variable depending on the desired behavior and finally we unlock the mutex accompanying the condition variable.

An example of controlling a thread using a condition variable.

#include<pthread.h>

int threadFlag;
pthread_cond_t threadFlagCv;
thread_mutex_t threadFlagMutex;

void initializeFlag(){
    // initialize mutex and condition variable
    pthread_mutex_init(&threadFlagMutex, NULL);
    pthread_cond_init(&threadFlagCv, NULL);

    //initialize flag value
    threadFlag = 0;
}

// do work while flag is set
// block if flag is clear

void* threadFunction(void* threadArg){
    while(1){
        // lock mutex before accessing flag value
        pthread_mutex_lock(&threadFlagMutex);
        while(!threadFlag){
            // flag is clear
            // wait for signal on condition variable indicating flag has changed
            // when signal arrives and thread unblocks, loop and check flag again
            pthread_cond_wait(&threadFlagCv, &threadFlagMutex);
            
            // flag is set, unlock mutex
            pthread_mutex_unlock(&threadFlagMutex);

            // do work
            doWork();
        }
        return NULL;
    }
}

// set value of thread flag to flagValue
void setThreadFlag(int flagValue){
    // lock mutex before access =ing flag value
    pthread_mutex_lock(&threadFlagMutex);
    // set flag value and signal incase thread function is blocked waiting for flag to become set
    // thread function cannot check flag until mutex is unlocked.
    threadFlag = flagValue;
    pthread_cond_signal(&threadFlagCv);
    // unlock mutex
    pthread_mutex_unlock(&threadFlagMutex);
}

pthread_cond_init will initialize a condition variable, its first argument is a pointer to pthread_cond_t instance, the second a pointer to condition variable attribute object, in this case we set it to NULL(it is ignored under GNU/Linux).
pthread_cond_signal signals a condition variable. A single thread blocked on the condition variable will be unblocked and if there is no other thread blocked, the signal is ignored. It takes an argument which is a pointer to pthread_cont_t instance.

pthread_cond_wait is used to block the calling thread until the condition variable is signaled. Its argument is a pointer to pthread_cont_t instance, the second argument is a pointer to pthread_mutex_t mutex instance.
When called, the mutex must be locked by the calling thread. This function will unclock it and block on the condition variable.
When the condition variable is signaled and the calling thread unblocks, pthread_cond_wait reacquires a lock on the mutex.

Summary.

In this article we have discussed how to use mutex to protect a variable from simultaneous access by two threads and also how semaphores are used to implement a shared counter.

We have also discussed condition variables which we use to implement complex conditions under which thread execute.

All these are synchronizations are aimed to prevent race conditions and deadlocks in threaded programs.

References.

  1. Advanced Programming in the UNIX Environment 3rd Edition W. Richard Stevens.
  2. Advanced Linux Programming Mark Mitchell, Jeffrey Oldham,and Alex Samuel.
Linux threads: synchronization
Share this