# Producer/Consumer Queues in C++ #

This article describes a ‘plain’ C++ implementation of producer/consumers queues where one can see all the details and better understand the inner workings of this synchronization mechanism.

## Background #

The problem of producer-consumer processes has been studied from the ‘70-es even before multi-threading became important. That’s because these processes play an integral part of any interaction between a computer and the outside world. Even when data was read from punched cards a program needed to ‘consume’ a card before the reading process could ‘produce’ another one. Similarly a print line needed to be ‘consumed’ by the printer before the next one could be produced.

While for some time these problems have stayed in the realm of operating systems, once user processes got access to multi-threading facilities they became a concern for everyday users. For a short background on different implementations one can check the Wikipedia page.

If you are looking for a C# implementation you can check Mark Clifton’s article on this subject. Here we are going to stay with the good old C++.

## A Producer/Consumer Example #

Our example here is going to be taken directly from Mark’s article mentioned above. We will write a program that finds all the prime numbers less than a certain limit. We want however to take advantage of all the cores of the CPU in order to ‘speed-up’ the process. I put speed-up in quotation marks because here we are not interested in real speed: following Mark’s example, we will take the same very inefficient function to determine if a number is prime or not:

```
bool IsPrime (int n)
{
bool ret = true;
for (int i = 2; i <= n / 2 && ret; ret = n % i++ != 0)
;
return ret;
}
```

The general strategy is to have one producer thread adding to a queue the numbers to be checked for “prime-ness” while a number of consumer threads will pick up those numbers and place all positive results in another queue. For added fanciness, the output queue will keep both the prime number and an ID of the consumer thread that computed it.

Here are the data structures used for this setup:

```
struct result
{
int prime;
int worker;
};
//...
sync_queue<int> nums;
sync_queue<result> primes;
```

The `nums`

queue keeps all the numbers to be tested and the `primes`

queue keeps the positive results.

`sync_queue`

is a producer/consumer queue structure that provides orderly access for all threads; we will see it’s inner workings in a moment.

### Producer Thread #

This is the simplest. It just fills the `nums`

queue with all the numbers up to a certain limit. At the end it places a number of zeroes as a signal for consumers that job is done. A consumer will terminate when it extracts a 0 from the queue:

```
thread producer ([&nums]()->int {
for (int i = 2; i < 500000; i++)
nums.produce (i);
for (int i = 0; i < NTHREADS; i++)
nums.produce (0);
return 0;
});
```

### Consumer Threads #

The code for a consumer thread is not much more complicated. It extract a number from the `nums`

queue, checks if it is a prime using the `IsPrime`

function and, if it is a prime, posts a new result in the `primes`

queue adding it’s own consumer number to know who calculated it:

```
auto checker = [&nums, &primes, thnum]()->int {
int n = 1;
while (n = nums.consume ())
{
if (IsPrime (n))
primes.produce ({ n,thnum });
}
return 0;
};
```

When the number retrieved is 0, the function returns and the thread terminates.

The main application thread creates a number of consumer threads and starts them all before starting the producer thread:

```
thread* consumers[NTHREADS];
for (int thnum = 0; thnum < NTHREADS; thnum++)
{
auto checker = [&nums, &primes, thnum]()->int {
int n = 1;
while (n = nums.consume ())
{
if (IsPrime (n))
primes.produce ({ n,thnum });
}
return 0;
};
consumers[thnum] = new thread (checker);
consumers[thnum]->start ();
}
```

### Running the Rodeo #

Now that everything is setup, we can start the show:

```
stopwatch t_prod, t_cons;
t_prod.start ();
t_cons.start ();
producer.start (); //Start producer
producer.wait (); //Wait to finish producing
t_prod.stop ();
//Show producer statistics
cout << "sync_queue finished producing" << " in " << fixed
<< setprecision (2) << t_prod.msecEnd ()/1000. << "sec" << endl;
//Wait for consumers to finish
for (int i = 0; i < NTHREADS; i++)
consumers[i]->wait ();
t_cons.stop ();
cout << "finished consuming" << " in " << fixed
<< setprecision (2) << t_cons.msecEnd () / 1000. << "sec" << endl;
//Did we find all the primes?
cout << "Expecting 41538 primes, found "
<< primes.size () <<endl;
//Check who did what
vector<int> found_by(NTHREADS);
while (!primes.empty ())
{
result r = primes.consume ();
found_by[r.worker]++;
}
//Show consumers statistics
for (int i = 0; i < NTHREADS; i++)
cout << "Consumer " << i << " found " << found_by[i] << " primes." << endl;
```

On my machine (no speed monster this one), I get something like:

```
sync_queue finished producing in 1.67sec
finished consuming in 4.34sec
Expecting 41538 primes, found 41538
Consumer 0 found 4869 primes.
Consumer 1 found 5530 primes.
Consumer 2 found 5467 primes.
Consumer 3 found 4844 primes.
Consumer 4 found 4863 primes.
Consumer 5 found 5529 primes.
Consumer 6 found 5596 primes.
Consumer 7 found 4840 primes.
```

Instead of having some central control distribute the work among consumers, the `sync_queue`

allowed each consumer to pick it’s work unit and generate results. Some of the threads got a bit more, some a bit less but, all in all, the work was distributed fairly.

## The Inner Workings of a Producer/Consumer Queue #

`sync_queue`

is a template class derived from `std::queue`

. It provides two main methods: `produce`

and `consume`

. To synchronize access, it uses a semaphore as well as a critical section object to keep everything consistent. The `produce`

method is very simple:

```
template <class M, class C=std::deque<M>>
class sync_queue : protected std::queue<M, C>
{
public:
//...
/// Append an element to queue
virtual void produce (const M& obj)
{
lock l (update); //take control of the queue
this->push (obj); //put a copy of the object at the end
con_sema.signal (); //and signal the semaphore
}
//...
protected:
semaphore con_sema; ///< consumers' semaphore counts down until queue is empty
criticalsection update; ///< critical section protects queue's integrity
```

A `lock`

object acquires the critical section to prevent simultaneous access. The object to be produced is pushed in the queue and the semaphore is signaled. When the function the lock object goes out of scope and the critical section is released.

Consuming is slightly more complicated:

```
/// Extract and return first element in queue
virtual M consume ()
{
M result;
update.enter ();
while (std::queue<M, C>::empty ())
{
update.leave ();
con_sema.wait (); //wait for a producer
update.enter ();
}
result = this->front (); //get the message
this->pop ();
update.leave ();
return result;
}
```

We enter the critical section and check if the queue is empty. If so, we leave the critical section and start waiting for the consumers’ semaphore to be signaled by a producer. When awoken by a signal, we re-enter the critical section and loop again.

At this point two things might have happened:

- No one else got the object and we find that queue is not empty. In this case we exit the while loop, pick up the object and leave the critical section.
- Another hungry consumer got the object and we find the queue empty. In this case we leave the critical section and wait for another signal at the consumers’ semaphore.

In addition to these main methods there is another method to check if the queue is empty and another one to return the size of the queue. Note that both of them are only indicative because the result might change before the caller has a chance to check it.

## Bounded Producer/Consumer Queue #

A sharp-eyed reader might have noticed that `sync_queue::produce`

method has no error checking. It blissfully calls `std::queue::push`

and assumes there is enough memory for the new object. This can be seen also from the run times of the producer and consumer threads in the example above: it took producer only 1.7 seconds to fill the queue of numbers to be checked and took consumers 4.4 seconds to empty it.

The `bounded_queue`

class allows you to limit the number of objects that can be queued. If a producer finds the bounded queue full, it has to wait until a consumer removes some of the objects. To do that we need one more semaphore, `pro_sema`

that is initialized with the maximum size of the queue. The `produce`

method becomes:

```
template< class M, class C = std::deque<M> >
class bounded_queue : public sync_queue<M, C>
{
public:
bounded_queue (size_t limit_) : limit (limit_)
{
pro_sema.signal ((int)limit);
}
/// Append an element to queue. If queue is full, waits until space
/// becomes available.
void produce (const M& obj)
{
this->update.enter ();
while (std::queue<M, C>::size () > limit)
{
this->update.leave ();
pro_sema.wait ();
this->update.enter ();
}
this->push (obj);
this->con_sema.signal ();
this->update.leave ();
}
//...
protected:
size_t limit;
semaphore pro_sema; // producers' semaphore counts down until queue is full
```

You can see that it is much more similar to the `consume`

method shown before. It enters the critical section and, if the queue is full, repeatedly tries to find space for the new object by waiting on `pro_sema`

.

If we change the prime numbers example to use the `bounded_queue`

structure with 20 entries, the results look like this:

```
bounded_queue finished producing in 4.32sec
finished consuming in 4.32sec
Expecting 41538 primes, found 41538
Consumer 0 found 5103 primes.
Consumer 1 found 5156 primes.
Consumer 2 found 5192 primes.
Consumer 3 found 5240 primes.
Consumer 4 found 5227 primes.
Consumer 5 found 5091 primes.
Consumer 6 found 5267 primes.
Consumer 7 found 5262 primes.
```

The time required by the producer thread is the same as the time required by the consumers. That is because the producer is held up by the limited queue size.

## Conclusion #

The producer/consumer queues presented in this article provide an easy to use mechanism for inter-thread communication. The threading primitives shown here (`thread`

, `critical_section`

, `semaphore`

, etc.) are part of the MLIB project. You can download the complete project from the GitHub project page.