patterncppModerate
Thread-safe concurrent FIFO queue in C++
Viewed 0 times
fifoconcurrentthreadsafequeue
Problem
Is this the correct way to implement a thread-safe concurrent FIFO queue in C++? It requires passing
Thread Safe Concurrent Queue
CONSUMER TEST:
PRODUCER TEST:
```
void main()
{
concurrent_queue cq;
// Create the consumer
pthread_t consumer;
pthread_create(&consumer, NULL, consumer_thread, &cq));
// Start producing
while(true)
{
// Push data.
// Expected behaviour: memory should never run out, as the
// consumer should receive the data and delete it.
//
unsigned char* arrays of binary data.Thread Safe Concurrent Queue
#include
#include
class concurrent_queue
{
private:
std::queue _queue_;
pthread_mutex_t push_mutex;
pthread_mutex_t pop_mutex;
pthread_cond_t cond;
public:
concurrent_queue()
{
pthread_mutex_init(&push_mutex, NULL);
pthread_mutex_init(&pop_mutex, NULL);
pthread_cond_init(&cond, NULL);
}
void push(unsigned char* data)
{
pthread_mutex_lock(&push_mutex);
_queue_.push(data);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&push_mutex);
}
void pop(unsigned char** popped_data)
{
pthread_mutex_lock(&pop_mutex);
while (_queue_.empty() == true)
{
pthread_cond_wait(&cond, &pop_mutex);
}
*popped_data = _queue_.front();
_queue_.pop();
pthread_mutex_unlock(&pop_mutex);
}
};CONSUMER TEST:
void *consumer_thread(void *arguments)
{
concurrent_queue *cq = static_cast(arguments);
while (true)
{
unsigned char* data = NULL;
cq->pop(&data);
if (data != NULL)
{
// Eureka! Received from the other thread!!!
// Delete it so memory keeps free.
// NOTE: In the real scenario for which I need
// this class, the data received are bitmap pixels
// and at this point it would be processed
delete[] data;
}
}
return 0;
}PRODUCER TEST:
```
void main()
{
concurrent_queue cq;
// Create the consumer
pthread_t consumer;
pthread_create(&consumer, NULL, consumer_thread, &cq));
// Start producing
while(true)
{
// Push data.
// Expected behaviour: memory should never run out, as the
// consumer should receive the data and delete it.
//
Solution
You are guarding the state of one variable
So you only have one mutex.
If you have two then push and pop can modify queue at the same time.
Make sure your mutex locking is exception safe by using RAII. Note all operations were you do start operation stop should be done by using RAII. This is the most important concept in C++ look it up.
Don't use pointers in C++. They do not convey ownership symantics (which means you don't know who should delete it (or when it should be deleted)). In this case use std::string it solves all the problems.
I see calls
Because you have not defined them the default copy constructor and assignment operator will be created. Since your object contains resources (mutext and conditional) you probably don't want the default behavior. Either define them or disable them.
C++11 has a better set of threading constructs use those rather than the C pthread library.
std::queue _queue_;So you only have one mutex.
pthread_mutex_t push_mutex;
pthread_mutex_t pop_mutex;If you have two then push and pop can modify queue at the same time.
Make sure your mutex locking is exception safe by using RAII. Note all operations were you do start operation stop should be done by using RAII. This is the most important concept in C++ look it up.
Don't use pointers in C++. They do not convey ownership symantics (which means you don't know who should delete it (or when it should be deleted)). In this case use std::string it solves all the problems.
I see calls
pthread_XXX_init but not the equivalent destroy.Because you have not defined them the default copy constructor and assignment operator will be created. Since your object contains resources (mutext and conditional) you probably don't want the default behavior. Either define them or disable them.
C++11 has a better set of threading constructs use those rather than the C pthread library.
Code Snippets
std::queue<unsigned char*> _queue_;pthread_mutex_t push_mutex;
pthread_mutex_t pop_mutex;Context
StackExchange Code Review Q#41604, answer score: 12
Revisions (0)
No revisions yet.