HiveBrain v1.2.0
Get Started
← Back to all entries
patterncppMinor

Lock-free, multiple consumer, multiple producer queue

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
freeconsumermultipleproducerqueuelock

Problem

I'm implementing a lock-free, multiple consumer, multiple producer FIFO queue/pipe as an exercise in thinking about atomicity in operations.

My main concern is correctness of operation, my second concern is good practices around atomics and general C++11. Performance is interesting but not important for this exercise.

Without futher ado, here's the code:

```
#include
#include

// For dump
#include
#include

/// A lock free queue implementation.
///
/// Design notes: Here be dragons. The queue is implemented as a single linked
/// list with a head, divider and tail pointer. These are always ordered such
/// that "head -> divider -> tail" and are always non-null. The divider's next
/// pointer points to the first node with data or is null. In other words, this
/// means that "divider == tail -> empty container". Nodes between head and
/// divider are empty and will be freed lazily.
///
/// * Thread Safety : Full.
/// * Exception Safety: Basic.
///
/// Generic type parameter.
template
class lockfree_queue{
struct link;
using link_ptr = std::atomic ;

struct link{
link() noexcept = default;
link(const link&) = delete;
link& operator = (const link&) = delete;

link_ptr m_next{ nullptr };
};

struct node : link{
template
node(Args&&... args)
: m_data(std::forward(args)...)
{}
T m_data;
};

public:
using size_type = std::size_t;
using value_type = T;

/// Destructor, it's the users responsibility to make sure that
/// no one uses the class after it's destruction and that no
/// thread is in any of the function bodies.
~lockfree_queue(){
free_nodes(m_head.m_next.load());
}

/// Tests if this container is empty. This operation only makes
/// sense if there is only one thread reading/consuming the queue.
///
/// True if the queue is empty, false otherwise.

Solution

ABA problem

I was able to break your queue (but it wasn't easy). I inserted some code to freeze one thread here in consume():

do{
        // The divider's next pointer points to the next node with data.
        l_divider = m_divider.load();
        l_snack = l_divider->m_next.load(); // divider is never null.

        if (nullptr == l_snack)
            return false; // empty

        // Special hack to freeze one thread at a dangerous spot.
        if (freeze) {
            freeze = 0;
            frozen = 1;
            while (frozen);
        }

        // If the CAS below succeeds, then no one has moved the divider since
        // we loaded the new divider position (which is non-null) and we have
        // moved the divider to the next node without interruption.
    } while (!m_divider.compare_exchange_weak(l_divider, l_snack));


At this point, one thread was trying to move the divider from A to B like this:

divider(A) -> B -> C -> D
    trying to swap A with B to end up like this:
divider(B) -> C -> D


So the thread was frozen with l_divider being A and l_snack being B.

Then I ran another thread and caused it to consume the whole queue (ABCD all freed). In that other thread, I used emplace() to put new nodes on the stack, and I carefully manipulated the allocator to force this situation:

divider(A) -> C -> D


When I say I manipulated the allocator, I mean I did an extra allocation to make sure that B was skipped. At that point, I set frozen = 0 to unfreeze the first thread. What happened was that it swapped A with B like this:

divider(B) -> ?


But of course B was no longer part of the queue. So after that, any future consumes were broken. I actually made B point at itself, so the consumes kept consuming B forever.

This problem is known as the ABA problem in case you have not already learned about it.

Code Snippets

do{
        // The divider's next pointer points to the next node with data.
        l_divider = m_divider.load();
        l_snack = l_divider->m_next.load(); // divider is never null.

        if (nullptr == l_snack)
            return false; // empty

        // Special hack to freeze one thread at a dangerous spot.
        if (freeze) {
            freeze = 0;
            frozen = 1;
            while (frozen);
        }

        // If the CAS below succeeds, then no one has moved the divider since
        // we loaded the new divider position (which is non-null) and we have
        // moved the divider to the next node without interruption.
    } while (!m_divider.compare_exchange_weak(l_divider, l_snack));
divider(A) -> B -> C -> D
    trying to swap A with B to end up like this:
divider(B) -> C -> D
divider(A) -> C -> D
divider(B) -> ?

Context

StackExchange Code Review Q#90697, answer score: 5

Revisions (0)

No revisions yet.