HiveBrain v1.2.0
Get Started
← Back to all entries
patterncppModerate

Thread-safe concurrent FIFO queue in C++

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
fifoconcurrentthreadsafequeue

Problem

Is this the correct way to implement a thread-safe concurrent FIFO queue in C++? It requires passing unsigned char* arrays of binary data.

Thread Safe Concurrent Queue

#include 
#include 

class concurrent_queue
{

private:

    std::queue _queue_;
    pthread_mutex_t push_mutex;
    pthread_mutex_t pop_mutex;
    pthread_cond_t cond;

public:

    concurrent_queue()
    {
        pthread_mutex_init(&push_mutex, NULL);
        pthread_mutex_init(&pop_mutex, NULL);
        pthread_cond_init(&cond, NULL);
    }

    void push(unsigned char* data)
    {
        pthread_mutex_lock(&push_mutex);

        _queue_.push(data);

        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&push_mutex);
    }

    void pop(unsigned char** popped_data)
    {
        pthread_mutex_lock(&pop_mutex);

        while (_queue_.empty() == true)
        {
            pthread_cond_wait(&cond, &pop_mutex);
        }

        *popped_data = _queue_.front();
        _queue_.pop();

        pthread_mutex_unlock(&pop_mutex);
    }
};


CONSUMER TEST:

void *consumer_thread(void *arguments)
{
    concurrent_queue *cq = static_cast(arguments);

    while (true)
    {
        unsigned char* data = NULL;

        cq->pop(&data);

        if (data != NULL)
        {
            // Eureka! Received from the other thread!!!
            // Delete it so memory keeps free.
            // NOTE: In the real scenario for which I need
            // this class, the data received are bitmap pixels
            // and at this point it would be processed

            delete[] data;
        }
    }

    return 0;
}


PRODUCER TEST:

```
void main()
{
concurrent_queue cq;

// Create the consumer
pthread_t consumer;
pthread_create(&consumer, NULL, consumer_thread, &cq));

// Start producing
while(true)
{
// Push data.
// Expected behaviour: memory should never run out, as the
// consumer should receive the data and delete it.
//

Solution

You are guarding the state of one variable

std::queue _queue_;


So you only have one mutex.

pthread_mutex_t push_mutex;
pthread_mutex_t pop_mutex;


If you have two then push and pop can modify queue at the same time.

Make sure your mutex locking is exception safe by using RAII. Note all operations were you do start operation stop should be done by using RAII. This is the most important concept in C++ look it up.

Don't use pointers in C++. They do not convey ownership symantics (which means you don't know who should delete it (or when it should be deleted)). In this case use std::string it solves all the problems.

I see calls pthread_XXX_init but not the equivalent destroy.

Because you have not defined them the default copy constructor and assignment operator will be created. Since your object contains resources (mutext and conditional) you probably don't want the default behavior. Either define them or disable them.

C++11 has a better set of threading constructs use those rather than the C pthread library.

Code Snippets

std::queue<unsigned char*> _queue_;
pthread_mutex_t push_mutex;
pthread_mutex_t pop_mutex;

Context

StackExchange Code Review Q#41604, answer score: 12

Revisions (0)

No revisions yet.