patterncppMinor
Lockless multi user, multi consumer FIFO queue using C++11
Viewed 0 times
fifomultiuserconsumerusinglocklessqueue
Problem
One of the features that is missing from C++11 are lockless queues. Therefore I have decided to write one myself.
A few remarks:
-
I know the code isn't technically lockless. However the lock is only used for a blocking call and can be omitted.
-
I have tested the code and it seems to work. However I found I had some trouble coming up with good tests. So if anybody has a good suggestion I'd be happy to hear them.
-
I'm aware of there being a good implementation for this in the Boost library. However I don't want to include Boost in all my projects. Just for this.
```
#include
#include
#include
namespace threading
{
template
class lockless_queue
{
private:
template
struct node
{
friend class lockless_queue;
node(const U data) :
data(new U(data)),
next(nullptr),
isDummy(false)
{}
private:
node(bool isDummy) :
data(nullptr),
next(nullptr),
isDummy(isDummy)
{}
public:
const bool isDummy;
std::shared_ptr data;
std::shared_ptr> next;
};
public:
lockless_queue()
:
m_head(new node(true)),
m_running(true)
{}
~lockless_queue()
{
m_running = false;
m_newDataWaiter.notify_all();
}
//adds a new element to the end of the array
void produce(const T &&data)
{
//bool indicating whether a notification should be sent after adding
bool l_notifyUponAdding;
//the new node to be added at the end of the array
std::shared_ptr> l_newNode(new node(std::forward(data)));
//pointer to the last node
std::shared_ptr> l_lastNode(std::atomic_load(&m_head));
//value to compare the next of the last node with
st
A few remarks:
-
I know the code isn't technically lockless. However the lock is only used for a blocking call and can be omitted.
-
I have tested the code and it seems to work. However I found I had some trouble coming up with good tests. So if anybody has a good suggestion I'd be happy to hear them.
-
I'm aware of there being a good implementation for this in the Boost library. However I don't want to include Boost in all my projects. Just for this.
```
#include
#include
#include
namespace threading
{
template
class lockless_queue
{
private:
template
struct node
{
friend class lockless_queue;
node(const U data) :
data(new U(data)),
next(nullptr),
isDummy(false)
{}
private:
node(bool isDummy) :
data(nullptr),
next(nullptr),
isDummy(isDummy)
{}
public:
const bool isDummy;
std::shared_ptr data;
std::shared_ptr> next;
};
public:
lockless_queue()
:
m_head(new node(true)),
m_running(true)
{}
~lockless_queue()
{
m_running = false;
m_newDataWaiter.notify_all();
}
//adds a new element to the end of the array
void produce(const T &&data)
{
//bool indicating whether a notification should be sent after adding
bool l_notifyUponAdding;
//the new node to be added at the end of the array
std::shared_ptr> l_newNode(new node(std::forward(data)));
//pointer to the last node
std::shared_ptr> l_lastNode(std::atomic_load(&m_head));
//value to compare the next of the last node with
st
Solution
General comments
Also, I'm pretty sure that this:
is undefined behaviour. If the object has been destroyed, there is nothing that says that
You should initialize all variables when they are declared:
should be:
API and Naming
To me the names of
I would much prefer if your class implemented the same API as
Performance
This:
should be:
this only does one memory allocation and gives you better performance when using the
Which brings me to my next point:
Use Forwarding Reference Correctly
This:
really should be:
in the template context
Edit
You should also properly forward the argument in the node constructor:
Thread Safety
I'm not going to review thread safety as I'm not confident enough in the correct behaviour of the code.
Addendum: Graceful Shutdown
Requested in comments. To make a graceful shutdown when you may have other threads waiting on data on the queue you need two things:
I haven't tested the following but it shows the concept:
```
#include
#include
#include
#include
#include
class counter_guard{
public:
counter_guard(std::atomic& a)
: v(a) { v++; }
~counter_guard(){ v--; }
private:
std::atomic& v;
};
class blocking_pipe{
public:
~blocking_pipe(){
m_enabled = false;
m_signal.notify_all();
// Busy wait or you can use another condition_variable
while (0 != m_users){
std::this_thread::yield();
}
}
void push(int val){
counter_guard cg(m_users); // Prevents "this" from being destroyed until we leave the function body.
assert(m_enabled); // It's the users responsibility to not push to a pipe being destroyed.
std::lock_guard lg(m_mutex);
m_queue.push(val);
}
int pop(){
counter_guard cg(m_users); // Prevents "this" from being destroyed until we leave the function body.
assert(m_enabled); // It's the users responsibility to not pop a pipe being destroyed.
std::unique_lock lg(m_mutex);
m_signal.wait(lg, [this](){ return !m_enabled || !m_queue.empty(); });
if (!m_queue.empty()){
// Here m_enabled might be false, but the destructor has not ran yet (we hold a user count)
// so we can still return useful data to the caller.
auto ans = m_queue.front();
m_queue.pop();
return ans;
}
else{
// This means m_enabled == false definitively.
throw std::exception("Pipe severed!"); // non-standard VS2013 constructor
}
}
private:
std::queue m_queue;
std::atomic m_enabled{ true };
std::
- The
nodestructure is defined as astructwithprivatefields. This makes it aclass. If it has private fields, prefer aclass.
- The
nodestructure is an implementation detail that is not exposed, you do not need to bother with private here. Just remove thefrienddeclaration and remove theprivateandpublicdeclarations. Keep it simple silly ;)
- Writing lock-free data structures is difficult and error-prone at best with risk for very subtle bugs and race conditions that only occur very rarely. Are you really sure you need a lock-free queue? Do you have profiling data to back this up? You mentioned
boostwas out of the question, but for your own mental health and hair growth, please do consider using a well tested lock-free queue implemented by experts.
- The use of
templateis unnecessary. The nested class is automatically a template class with the same parameters as the enclosing class. Simply changenodetonodeand removetemplatefrom the class declaration.
Also, I'm pretty sure that this:
if (!this || !m_running) //break if the object was destroyed during the waitis undefined behaviour. If the object has been destroyed, there is nothing that says that
this will have been set to nullptr in fact I'd wager it won't. At any rate as your waiters are reading from this you need to inhibit destruction of this until all waiters have returned. Otherwise you risk reading freed memory.You should initialize all variables when they are declared:
bool l_notifyUponAdding;should be:
bool l_notifyUponAdding = l_lastNode->isDummy;API and Naming
To me the names of
produce and consume are not very apt as they don't reflect the way I think about a queue and they don't match the naming of the STL queue.I would much prefer if your class implemented the same API as
std::queue where applicable. Or at least used the same terminology such as push and pop.Performance
This:
std::shared_ptr l_newNode(new node(std::forward(data)));should be:
auto l_newNode = std::make_shared(std::forward(data));this only does one memory allocation and gives you better performance when using the
shared_ptr as the reference count will be allocated together with the data.Which brings me to my next point:
Use Forwarding Reference Correctly
This:
void produce(const T &&data){
...
std::shared_ptr l_newNode(new node(std::forward(data)));really should be:
void produce(T&& data){
...
std::shared_ptr l_newNode(new node(std::forward(data)));in the template context
T&& denotes a forwarding reference (universal reference to some). And will take the correct type depending on how it is called.Edit
You should also properly forward the argument in the node constructor:
node(U&& data)
:data(new U(std::forward(data)))Thread Safety
I'm not going to review thread safety as I'm not confident enough in the correct behaviour of the code.
Addendum: Graceful Shutdown
Requested in comments. To make a graceful shutdown when you may have other threads waiting on data on the queue you need two things:
- Ability to determine if there are any waiters.
- Defer destruction until no one is waiting.
I haven't tested the following but it shows the concept:
```
#include
#include
#include
#include
#include
class counter_guard{
public:
counter_guard(std::atomic& a)
: v(a) { v++; }
~counter_guard(){ v--; }
private:
std::atomic& v;
};
class blocking_pipe{
public:
~blocking_pipe(){
m_enabled = false;
m_signal.notify_all();
// Busy wait or you can use another condition_variable
while (0 != m_users){
std::this_thread::yield();
}
}
void push(int val){
counter_guard cg(m_users); // Prevents "this" from being destroyed until we leave the function body.
assert(m_enabled); // It's the users responsibility to not push to a pipe being destroyed.
std::lock_guard lg(m_mutex);
m_queue.push(val);
}
int pop(){
counter_guard cg(m_users); // Prevents "this" from being destroyed until we leave the function body.
assert(m_enabled); // It's the users responsibility to not pop a pipe being destroyed.
std::unique_lock lg(m_mutex);
m_signal.wait(lg, [this](){ return !m_enabled || !m_queue.empty(); });
if (!m_queue.empty()){
// Here m_enabled might be false, but the destructor has not ran yet (we hold a user count)
// so we can still return useful data to the caller.
auto ans = m_queue.front();
m_queue.pop();
return ans;
}
else{
// This means m_enabled == false definitively.
throw std::exception("Pipe severed!"); // non-standard VS2013 constructor
}
}
private:
std::queue m_queue;
std::atomic m_enabled{ true };
std::
Code Snippets
if (!this || !m_running) //break if the object was destroyed during the waitbool l_notifyUponAdding;bool l_notifyUponAdding = l_lastNode->isDummy;std::shared_ptr<node> l_newNode(new node(std::forward<const T&&>(data)));auto l_newNode = std::make_shared<node>(std::forward<const T&&>(data));Context
StackExchange Code Review Q#90628, answer score: 8
Revisions (0)
No revisions yet.