HiveBrain v1.2.0
Get Started
← Back to all entries
patterncppMinor

Lock-free multi-producer multi-consumer queue

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
multifreeconsumerproducerqueuelock

Problem

I'm looking for some feedback on my lock-free queue, based on Disruptor, mainly for any potential concurrency issues, such as where I need additional fences. It looks correct to me, and I can't seem to able to break it, but I'm only testing it on my x86 machine.

```
#pragma once

#include
#include

template class LockFreeMPMCQueue
{
public:
explicit LockFreeMPMCQueue( size_t size )
: m_data( new T[size] ), m_size( size ), m_head_1( 0 ), m_head_2( 0 ), m_tail_1( 0 ), m_tail_2( 0 )
{
}

virtual ~LockFreeMPMCQueue() { delete[] m_data; }

// non-copyable
LockFreeMPMCQueue( const LockFreeMPMCQueue& ) = delete;
LockFreeMPMCQueue( const LockFreeMPMCQueue&& ) = delete;
LockFreeMPMCQueue& operator=( const LockFreeMPMCQueue& ) = delete;
LockFreeMPMCQueue& operator=( const LockFreeMPMCQueue&& ) = delete;

bool try_enqueue( const T& value )
{
const std::uint64_t head = m_head_2.load( std::memory_order_relaxed );
std::uint64_t tail = m_tail_1.load( std::memory_order_relaxed );

const std::uint64_t count = tail - head;

// count could be greater than size if between the reading of head, and the reading of tail, both head
// and tail have been advanced
if( count >= m_size )
{
return false;
}

if( !std::atomic_compare_exchange_strong_explicit(
&m_tail_1, &tail, tail + 1, std::memory_order_relaxed, std::memory_order_relaxed ) )
{
return false;
}

m_data[tail % m_size] = value;

while( m_tail_2.load( std::memory_order_relaxed ) != tail )
{
std::this_thread::yield();
}

// Release - read/write before can't be reordered with writes after
// Make sure the write of the value to m_data is
// not reordered past the write to m_tail_2
std::atomic_thread_fence( std::memory_order_release );
m_tail_2.store( tail + 1, std::memory_or

Solution

Missing barrier

When I looked at your code, I could immediately tell there was something wrong because you use a release barrier without an acquire barrier that pairs with it.

You have this barrier in try_enqueue() between writing to m_data and writing to m_tail_2:

std::atomic_thread_fence( std::memory_order_release );


But you don't have any corresponding acquire barrier in try_dequeue() between reading from m_tail_2 and reading from m_data. You should put in a barrier like this:

bool try_dequeue( T& out )
{
    // ...

    // Add this barrier which pairs with the other release barrier
    std::atomic_thread_fence( std::memory_order_acquire );

    out = m_data[head % m_size];

    // ...
}


Without this barrier, the cpu could read m_data[head] speculatively before the read of m_tail_2, and you could end up getting an older value instead of the value just written.
Sequence of events that lead to error

  • Initially, m_data[0] = 0



  • Cpu 1: try_enqueue(): m_data[0] = 5



  • Cpu 1: try_enqueue(): fence(release)



  • Cpu 1: try_enqueue(): m_tail_2 = 1



  • Cpu 2: try_dequeue(): reads m_tail_2 as 1



  • Cpu 2: try_dequeue(): exchange(head, head+1) (no effect)



  • Cpu 2: try_dequeue(): reads m_data[0] as 0



Due to weak memory ordering, on step 7 above, m_data[0] can be read as 0 instead of 5 because there was no acquire barrier between steps 5 and 7. The strong exchange on step 6 did not perform a barrier because it was called with std::memory_order_relaxed, std::memory_order_relaxed as the memory ordering.

Note that there are 2 possible ways that memory accesses can be "reordered". The first is that the compiler might shift lines of code around, and therefore, your accesses are actually performed out of order with respect to a program execution timeline. This is not what is happening in my example above. The second way is that the memory system may reorder accesses. In other words, even if two reads occur one after the other in the program execution timeline, the second read may read an earlier value than the first read. This second case is what causes step 7 above to fail.
Possible missing barrier

Similarly, you might need an acquire barrier in try_enqueue() before your write to m_data, which pairs with the release barrier in try_dequeue(). I say "might" because this one is harder to explain. The barrier is only needed if the cpu would do a speculative write to m_data before reading m_head_2. But because m_head_2 is used to determine whether or not to actually write to m_data, there is a data dependency there which would seem to guarantee that m_head_2 is read before writing to m_data. However, there are processors such as the DEC Alpha which have strange behavior with respect to data dependencies. So I'm not 100% sure about whether this barrier is needed, and if it is needed, it may only be needed for DEC Alpha and similar processors.

Code Snippets

std::atomic_thread_fence( std::memory_order_release );
bool try_dequeue( T& out )
{
    // ...

    // Add this barrier which pairs with the other release barrier
    std::atomic_thread_fence( std::memory_order_acquire );

    out = m_data[head % m_size];

    // ...
}

Context

StackExchange Code Review Q#123347, answer score: 3

Revisions (0)

No revisions yet.