HiveBrain v1.2.0
Get Started
← Back to all entries
patterncppMinor

Lockless multi user, multi consumer FIFO queue using C++11

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
fifomultiuserconsumerusinglocklessqueue

Problem

One of the features that is missing from C++11 are lockless queues. Therefore I have decided to write one myself.

A few remarks:

-
I know the code isn't technically lockless. However the lock is only used for a blocking call and can be omitted.

-
I have tested the code and it seems to work. However I found I had some trouble coming up with good tests. So if anybody has a good suggestion I'd be happy to hear them.

-
I'm aware of there being a good implementation for this in the Boost library. However I don't want to include Boost in all my projects. Just for this.

```
#include
#include
#include

namespace threading
{
template
class lockless_queue
{
private:
template
struct node
{
friend class lockless_queue;
node(const U data) :
data(new U(data)),
next(nullptr),
isDummy(false)
{}
private:
node(bool isDummy) :
data(nullptr),
next(nullptr),
isDummy(isDummy)
{}
public:
const bool isDummy;
std::shared_ptr data;
std::shared_ptr> next;
};

public:
lockless_queue()
:
m_head(new node(true)),
m_running(true)
{}

~lockless_queue()
{
m_running = false;
m_newDataWaiter.notify_all();
}

//adds a new element to the end of the array
void produce(const T &&data)
{
//bool indicating whether a notification should be sent after adding
bool l_notifyUponAdding;

//the new node to be added at the end of the array
std::shared_ptr> l_newNode(new node(std::forward(data)));
//pointer to the last node
std::shared_ptr> l_lastNode(std::atomic_load(&m_head));
//value to compare the next of the last node with
st

Solution

General comments

  • The node structure is defined as a struct with private fields. This makes it a class. If it has private fields, prefer a class.



  • The node structure is an implementation detail that is not exposed, you do not need to bother with private here. Just remove the friend declaration and remove the private and public declarations. Keep it simple silly ;)



  • Writing lock-free data structures is difficult and error-prone at best with risk for very subtle bugs and race conditions that only occur very rarely. Are you really sure you need a lock-free queue? Do you have profiling data to back this up? You mentioned boost was out of the question, but for your own mental health and hair growth, please do consider using a well tested lock-free queue implemented by experts.



  • The use of template is unnecessary. The nested class is automatically a template class with the same parameters as the enclosing class. Simply change node to node and remove template from the class declaration.



Also, I'm pretty sure that this:

if (!this || !m_running) //break if the object was destroyed during the wait


is undefined behaviour. If the object has been destroyed, there is nothing that says that this will have been set to nullptr in fact I'd wager it won't. At any rate as your waiters are reading from this you need to inhibit destruction of this until all waiters have returned. Otherwise you risk reading freed memory.

You should initialize all variables when they are declared:

bool l_notifyUponAdding;


should be:

bool l_notifyUponAdding = l_lastNode->isDummy;


API and Naming

To me the names of produce and consume are not very apt as they don't reflect the way I think about a queue and they don't match the naming of the STL queue.

I would much prefer if your class implemented the same API as std::queue where applicable. Or at least used the same terminology such as push and pop.
Performance

This:

std::shared_ptr l_newNode(new node(std::forward(data)));


should be:

auto l_newNode = std::make_shared(std::forward(data));


this only does one memory allocation and gives you better performance when using the shared_ptr as the reference count will be allocated together with the data.

Which brings me to my next point:
Use Forwarding Reference Correctly

This:

void produce(const T &&data){
    ...
    std::shared_ptr l_newNode(new node(std::forward(data)));


really should be:

void produce(T&& data){
    ...
    std::shared_ptr l_newNode(new node(std::forward(data)));


in the template context T&& denotes a forwarding reference (universal reference to some). And will take the correct type depending on how it is called.
Edit

You should also properly forward the argument in the node constructor:

node(U&& data)
   :data(new U(std::forward(data)))


Thread Safety

I'm not going to review thread safety as I'm not confident enough in the correct behaviour of the code.
Addendum: Graceful Shutdown

Requested in comments. To make a graceful shutdown when you may have other threads waiting on data on the queue you need two things:

  • Ability to determine if there are any waiters.



  • Defer destruction until no one is waiting.



I haven't tested the following but it shows the concept:

```
#include
#include
#include
#include
#include

class counter_guard{
public:
counter_guard(std::atomic& a)
: v(a) { v++; }
~counter_guard(){ v--; }
private:
std::atomic& v;
};

class blocking_pipe{
public:
~blocking_pipe(){
m_enabled = false;
m_signal.notify_all();

// Busy wait or you can use another condition_variable
while (0 != m_users){
std::this_thread::yield();
}
}

void push(int val){
counter_guard cg(m_users); // Prevents "this" from being destroyed until we leave the function body.
assert(m_enabled); // It's the users responsibility to not push to a pipe being destroyed.

std::lock_guard lg(m_mutex);
m_queue.push(val);
}

int pop(){
counter_guard cg(m_users); // Prevents "this" from being destroyed until we leave the function body.
assert(m_enabled); // It's the users responsibility to not pop a pipe being destroyed.

std::unique_lock lg(m_mutex);
m_signal.wait(lg, [this](){ return !m_enabled || !m_queue.empty(); });

if (!m_queue.empty()){
// Here m_enabled might be false, but the destructor has not ran yet (we hold a user count)
// so we can still return useful data to the caller.
auto ans = m_queue.front();
m_queue.pop();
return ans;
}
else{
// This means m_enabled == false definitively.
throw std::exception("Pipe severed!"); // non-standard VS2013 constructor
}
}

private:
std::queue m_queue;
std::atomic m_enabled{ true };
std::

Code Snippets

if (!this || !m_running) //break if the object was destroyed during the wait
bool l_notifyUponAdding;
bool l_notifyUponAdding = l_lastNode->isDummy;
std::shared_ptr<node> l_newNode(new node(std::forward<const T&&>(data)));
auto l_newNode = std::make_shared<node>(std::forward<const T&&>(data));

Context

StackExchange Code Review Q#90628, answer score: 8

Revisions (0)

No revisions yet.