patterncppMinor
Lock-free MultiConsumer/MultiProducer queue
Viewed 0 times
freemulticonsumerqueuemultiproducerlock
Problem
I've written a simple lock-free multi-consumer/-producer FIFO queue.
It's supposed to work like this:
be made available later
It's meant to be used for several worker threads to communicate with the GUI simultaneously. I wouldn't necessarily need multi-consumers, single would suffice, but I figured it would be relatively easy to have multi-consumers.
So far this seems to work fine. I'd like to have it checked and validated
It's supposed to work like this:
- At any time many consumers may read from it
- At any time multiple producers can write to it, added elements will
be made available later
- After write modification is finished, we call
makePushedElementsAvailable(), so that consumers can read the newly added elements
It's meant to be used for several worker threads to communicate with the GUI simultaneously. I wouldn't necessarily need multi-consumers, single would suffice, but I figured it would be relatively easy to have multi-consumers.
#pragma once
#include
#include
template
class MCMP
{
public:
MCMP();
bool tryPushLater(const T &element);
bool tryPop(T &element);
void makePushedElementsAvailable();
bool wasEmpty();
int size();
protected:
private:
T elements[Size];
std::atomic iHead, iTail, iWrite;
};
template
int MCMP::size()
{
return (iTail.load() - iHead.load());
}
template
bool MCMP::wasEmpty()
{
return (iHead.load() == iTail.load());
}
template
MCMP::MCMP() : iHead(0), iTail(0), iWrite(0)
{
}
template
void MCMP::makePushedElementsAvailable()
{
iTail.store(iWrite.load());
}
template
bool MCMP::tryPop(T &element)
{
int newIndex;
int index;
do {
index = iHead.load();
if (index == iTail.load())
return false;
newIndex = index + 1;
} while (!iHead.compare_exchange_weak(index, newIndex));
index = index % Size;
element = elements[index];
return true;
}
template
bool MCMP::tryPushLater(const T &element)
{
int newIndex;
int index;
do {
index = iWrite.load();
if (index - iHead.load() >= Size)
return false;
newIndex = index + 1;
} while (!iWrite.compare_exchange_weak(index, newIndex));
index = index % Size;
elements[index] = element;
return true;
}So far this seems to work fine. I'd like to have it checked and validated
Solution
Hello and welcome to Code Review.
Code comments
Consider writing what you expect each function to do at its most important steps. Short concise comments are preferred (I am guilty of not doing this, too!). This would help reviewers that look at your code and yourself when debugging.
Class names
Your queue's name is
Redundancy in iHead, iTail, iWrite
One of those variables is redundant, your queue should support pushing from one end and popping from the other concurrently, so you only require
They don't do what you think they do
What happens if a value is loaded from
Compare/exchange loops
You seem to misunderstand how compare/exchange loops work. Every time the compare/exchange operation fails, the first argument of the compare/exchange function (expected) gets updated with the newest value of A.
So in your case, there is no need to load from A every single time in the loop. Your compare/exchange loops thus become:
The ABA problem
Because concurrent code can execute with undetermined timings, your
How could it fail if
Thread A can be paused for an indeterminate amount of time. While A is paused, a number of threads concurrently push/pop elements causing your index to overflow and reach the value that A has; A resumes and finds that the comparison is equal and doesn't update
While this scenario is very unlikely, it is still possible. Therefore, the code is not error-free. Your queue could not be used as part of a long running application because that issue might eventually arise causing the application's state to become corrupted.
If you require further explanation of this issue, tell me so in a comment.
The ABA problem - Extended
For an atomic compare/exchange operation:
In any compare/exchange based algorithm that loads the value of
The ABA problem occurs when thread execution timings cause
While
In the case of the first example of the previous bullet list,
Other bugs
Code comments
Consider writing what you expect each function to do at its most important steps. Short concise comments are preferred (I am guilty of not doing this, too!). This would help reviewers that look at your code and yourself when debugging.
Class names
Your queue's name is
MCMP, which I assume stands for multi-consumer/multi-producer. It is an unclear class name. Consider changing it to something more descriptive such as: lock_free_queue.template
class lock_free_queue
{
/* declarations */
}Redundancy in iHead, iTail, iWrite
One of those variables is redundant, your queue should support pushing from one end and popping from the other concurrently, so you only require
iHead and iTail. The interaction between these three variables is confusing. Can you explain how they are supposed to work together?They don't do what you think they do
size() and wasEmpty() are obviously supposed to return the size of the queue and if it is empty, respectively. However, you are not doing an atomic operation to evaluate their expressions. Therefore, nothing prevents another thread from modifying one of either atomic variable after it is loaded. For example:template
bool MCMP::wasEmpty()
{
return ( iHead.load() == iTail.load() );
}What happens if a value is loaded from
iHead.load(), but then another thread modifies iHead a bunch of times before iTail loads? It will be compared with an outdated value of iHead and could make your queue say that it is not empty, but it is actually empty in reality.Compare/exchange loops
You seem to misunderstand how compare/exchange loops work. Every time the compare/exchange operation fails, the first argument of the compare/exchange function (expected) gets updated with the newest value of A.
So in your case, there is no need to load from A every single time in the loop. Your compare/exchange loops thus become:
int newIndex;
int index = iHead.load(); // the initial load from the atomic variable
do
{
if ( index == iTail.load() )
return false;
newIndex = index + 1;
} // 'index' gets updated when the loop fails
while ( !iHead.compare_exchange_weak( index, newIndex ) );The ABA problem
tryPop( T& ) and tryPushLater( const T& )Because concurrent code can execute with undetermined timings, your
do{} while{} loops suffer from the ABA problem; newIndex gets updated in the loop when the condition fails, but it is possible for one thread to stall and for another to come in and change the value of index from A to B then back to A. In that case, newIndex would not be updated, as the condition would succeed when the original (stalled) thread resumes.How could it fail if
iHead is always incremented?Thread A can be paused for an indeterminate amount of time. While A is paused, a number of threads concurrently push/pop elements causing your index to overflow and reach the value that A has; A resumes and finds that the comparison is equal and doesn't update
newIndex. You have the ABA problem.While this scenario is very unlikely, it is still possible. Therefore, the code is not error-free. Your queue could not be used as part of a long running application because that issue might eventually arise causing the application's state to become corrupted.
If you require further explanation of this issue, tell me so in a comment.
The ABA problem - Extended
For an atomic compare/exchange operation:
- Let
T1,T2be threads.
- Let
Abe the atomic variable.
- Let
Ebe the expected value ofA.
- Let
Nbe the new value ofAon success.
In any compare/exchange based algorithm that loads the value of
A and performs another operation based on that value, such as:- Updating
Nif the compare/exchange operation fails (A.load() != E) with the result of a calculation dependent on the most up-to-date value ofA.
- Dereferencing the value loaded from
Aif it is a pointer.
- Performing a lookup based on the value loaded from
A.
- etc.
The ABA problem occurs when thread execution timings cause
T1 to stall before the compare/exchange operation (that is dependent on the value loaded from A) is evaluated.While
T1 is stalled, T2 changes the value of A to some other value and then back to the original value of A before T1 resumes. Once T1 continues, the condition of the compare/exchange operation will thus evaluate to true when it should in fact be false.In the case of the first example of the previous bullet list,
N would be stored in A while N has an invalid value/state; had the compare/exchange operation failed (as it should have) N would have been updated according to the new value of A before the next compare/exchange evaluation.Other bugs
- When you pop an element from your queue, you don't actually remove the element, you just return a reference to it.
- The values get written to the array whether `makePus
Code Snippets
template <typename T, int Size>
class lock_free_queue
{
/* declarations */
}template<typename T, int Size>
bool MCMP<T, Size>::wasEmpty()
{
return ( iHead.load() == iTail.load() );
}int newIndex;
int index = iHead.load(); // the initial load from the atomic variable
do
{
if ( index == iTail.load() )
return false;
newIndex = index + 1;
} // 'index' gets updated when the loop fails
while ( !iHead.compare_exchange_weak( index, newIndex ) );Context
StackExchange Code Review Q#106104, answer score: 3
Revisions (0)
No revisions yet.