patterncppMinor
Lock-free, multiple consumer, multiple producer queue
Viewed 0 times
freeconsumermultipleproducerqueuelock
Problem
I'm implementing a lock-free, multiple consumer, multiple producer FIFO queue/pipe as an exercise in thinking about atomicity in operations.
My main concern is correctness of operation, my second concern is good practices around atomics and general C++11. Performance is interesting but not important for this exercise.
Without futher ado, here's the code:
```
#include
#include
// For dump
#include
#include
/// A lock free queue implementation.
///
/// Design notes: Here be dragons. The queue is implemented as a single linked
/// list with a head, divider and tail pointer. These are always ordered such
/// that "head -> divider -> tail" and are always non-null. The divider's next
/// pointer points to the first node with data or is null. In other words, this
/// means that "divider == tail -> empty container". Nodes between head and
/// divider are empty and will be freed lazily.
///
/// * Thread Safety : Full.
/// * Exception Safety: Basic.
///
/// Generic type parameter.
template
class lockfree_queue{
struct link;
using link_ptr = std::atomic ;
struct link{
link() noexcept = default;
link(const link&) = delete;
link& operator = (const link&) = delete;
link_ptr m_next{ nullptr };
};
struct node : link{
template
node(Args&&... args)
: m_data(std::forward(args)...)
{}
T m_data;
};
public:
using size_type = std::size_t;
using value_type = T;
/// Destructor, it's the users responsibility to make sure that
/// no one uses the class after it's destruction and that no
/// thread is in any of the function bodies.
~lockfree_queue(){
free_nodes(m_head.m_next.load());
}
/// Tests if this container is empty. This operation only makes
/// sense if there is only one thread reading/consuming the queue.
///
/// True if the queue is empty, false otherwise.
My main concern is correctness of operation, my second concern is good practices around atomics and general C++11. Performance is interesting but not important for this exercise.
Without futher ado, here's the code:
```
#include
#include
// For dump
#include
#include
/// A lock free queue implementation.
///
/// Design notes: Here be dragons. The queue is implemented as a single linked
/// list with a head, divider and tail pointer. These are always ordered such
/// that "head -> divider -> tail" and are always non-null. The divider's next
/// pointer points to the first node with data or is null. In other words, this
/// means that "divider == tail -> empty container". Nodes between head and
/// divider are empty and will be freed lazily.
///
/// * Thread Safety : Full.
/// * Exception Safety: Basic.
///
/// Generic type parameter.
template
class lockfree_queue{
struct link;
using link_ptr = std::atomic ;
struct link{
link() noexcept = default;
link(const link&) = delete;
link& operator = (const link&) = delete;
link_ptr m_next{ nullptr };
};
struct node : link{
template
node(Args&&... args)
: m_data(std::forward(args)...)
{}
T m_data;
};
public:
using size_type = std::size_t;
using value_type = T;
/// Destructor, it's the users responsibility to make sure that
/// no one uses the class after it's destruction and that no
/// thread is in any of the function bodies.
~lockfree_queue(){
free_nodes(m_head.m_next.load());
}
/// Tests if this container is empty. This operation only makes
/// sense if there is only one thread reading/consuming the queue.
///
/// True if the queue is empty, false otherwise.
Solution
ABA problem
I was able to break your queue (but it wasn't easy). I inserted some code to freeze one thread here in
At this point, one thread was trying to move the divider from A to B like this:
So the thread was frozen with
Then I ran another thread and caused it to consume the whole queue (
When I say I manipulated the allocator, I mean I did an extra allocation to make sure that
But of course
This problem is known as the ABA problem in case you have not already learned about it.
I was able to break your queue (but it wasn't easy). I inserted some code to freeze one thread here in
consume():do{
// The divider's next pointer points to the next node with data.
l_divider = m_divider.load();
l_snack = l_divider->m_next.load(); // divider is never null.
if (nullptr == l_snack)
return false; // empty
// Special hack to freeze one thread at a dangerous spot.
if (freeze) {
freeze = 0;
frozen = 1;
while (frozen);
}
// If the CAS below succeeds, then no one has moved the divider since
// we loaded the new divider position (which is non-null) and we have
// moved the divider to the next node without interruption.
} while (!m_divider.compare_exchange_weak(l_divider, l_snack));At this point, one thread was trying to move the divider from A to B like this:
divider(A) -> B -> C -> D
trying to swap A with B to end up like this:
divider(B) -> C -> DSo the thread was frozen with
l_divider being A and l_snack being B.Then I ran another thread and caused it to consume the whole queue (
ABCD all freed). In that other thread, I used emplace() to put new nodes on the stack, and I carefully manipulated the allocator to force this situation:divider(A) -> C -> DWhen I say I manipulated the allocator, I mean I did an extra allocation to make sure that
B was skipped. At that point, I set frozen = 0 to unfreeze the first thread. What happened was that it swapped A with B like this:divider(B) -> ?But of course
B was no longer part of the queue. So after that, any future consumes were broken. I actually made B point at itself, so the consumes kept consuming B forever.This problem is known as the ABA problem in case you have not already learned about it.
Code Snippets
do{
// The divider's next pointer points to the next node with data.
l_divider = m_divider.load();
l_snack = l_divider->m_next.load(); // divider is never null.
if (nullptr == l_snack)
return false; // empty
// Special hack to freeze one thread at a dangerous spot.
if (freeze) {
freeze = 0;
frozen = 1;
while (frozen);
}
// If the CAS below succeeds, then no one has moved the divider since
// we loaded the new divider position (which is non-null) and we have
// moved the divider to the next node without interruption.
} while (!m_divider.compare_exchange_weak(l_divider, l_snack));divider(A) -> B -> C -> D
trying to swap A with B to end up like this:
divider(B) -> C -> Ddivider(A) -> C -> Ddivider(B) -> ?Context
StackExchange Code Review Q#90697, answer score: 5
Revisions (0)
No revisions yet.