patterncppMinor
Multi producer/consumers lockfree stack
Viewed 0 times
lockfreemultistackproducerconsumers
Problem
Can you please take a look at the following x86-64 C++ code which should implement a multi consumer/produce lockfree stack? Do you think I have missed anything?
```
namespace lockfree {
// NOTE: this lock-free stack uses the highest 2 bytes
// of each pointer to store a 'counter' (an "id") to
// ensure that the ABA problem doesn't happen in case
// of multiple consumers.
// If the stack was to be read by _one only_ consumer
// the ABA problem wouldn't manifest and the code would
// not need the 'counter' escamotage.
// Using an unsigned short is ok-ish because 65536 values
// might not be enough in some cases. Ideally we should
// be using 32+ bits.
template
class stack {
struct element {
element *next;
const T data;
element(const T& data_) : data(data_) {
}
private:
element(const element&);
element& operator=(const element&);
};
// NOTE: we use the highest 16 bits as counter
// this trick should perform ok for quite
// _a while_ on Linux 64-bit...
static const unsigned short get_ptr_cnt(element** p) {
unsigned short us_p = (unsigned short )p;
return us_p[3];
}
// NOTE: same as above!
static void set_ptr_cnt(element** p, const unsigned short cnt) {
unsigned short us_p = (unsigned short )p;
us_p[3] = cnt;
}
element *_head;
public:
// this lockfree stack can only be initialised and destroyed
// in single threaded contexts
stack() : _head(0) {
}
// clean the stack (only in single threaded context!)
~stack() {
element *head_ptr = _head;
set_ptr_cnt(&head_ptr, 0);
while(head_ptr) {
element *next = head_ptr->next;
delete head_ptr;
head_ptr = next;
```
namespace lockfree {
// NOTE: this lock-free stack uses the highest 2 bytes
// of each pointer to store a 'counter' (an "id") to
// ensure that the ABA problem doesn't happen in case
// of multiple consumers.
// If the stack was to be read by _one only_ consumer
// the ABA problem wouldn't manifest and the code would
// not need the 'counter' escamotage.
// Using an unsigned short is ok-ish because 65536 values
// might not be enough in some cases. Ideally we should
// be using 32+ bits.
template
class stack {
struct element {
element *next;
const T data;
element(const T& data_) : data(data_) {
}
private:
element(const element&);
element& operator=(const element&);
};
// NOTE: we use the highest 16 bits as counter
// this trick should perform ok for quite
// _a while_ on Linux 64-bit...
static const unsigned short get_ptr_cnt(element** p) {
unsigned short us_p = (unsigned short )p;
return us_p[3];
}
// NOTE: same as above!
static void set_ptr_cnt(element** p, const unsigned short cnt) {
unsigned short us_p = (unsigned short )p;
us_p[3] = cnt;
}
element *_head;
public:
// this lockfree stack can only be initialised and destroyed
// in single threaded contexts
stack() : _head(0) {
}
// clean the stack (only in single threaded context!)
~stack() {
element *head_ptr = _head;
set_ptr_cnt(&head_ptr, 0);
while(head_ptr) {
element *next = head_ptr->next;
delete head_ptr;
head_ptr = next;
Solution
I don't see any problem with the locking, although it is usually quite difficult to see race conditions (which is why you posted it I guess). The code is clearly non-portable (but maybe that is not important), with regard to endian-ness and because
I did wonder about two lines that seem unnecessary. Firstly, setting the count after the swap in
Secondly setting the count on
EDIT - after discussion with Emanuele, it is clear that the first of these (resetting the
So
and other references to the stack maybe too. Having said that, I have modified the code to use
As regards coding style, I think the code is heavily over-commented. Most of the comments are not very useful and could be omitted. Also the variable names are verbose or cryptic. To make the code more readable, I did a global deletion of
Finally, the compiler warns (-Wignored-qualifiers) against the
unsigned short is not guaranteed to be 16 bits (uint16_t is). I did wonder about two lines that seem unnecessary. Firstly, setting the count after the swap in
push: if(__sync_bool_compare_and_swap(&_head, local_head, to_be_pushed))
return;
set_ptr_cnt(&to_be_pushed, 0); // this lineSecondly setting the count on
next_head in popset_ptr_cnt(&local_head, head_cnt);
set_ptr_cnt(&next_head, head_cnt+1); // this line
if(__sync_bool_compare_and_swap(&_head, local_head, next_head)) {EDIT - after discussion with Emanuele, it is clear that the first of these (resetting the
to_be_pushed pointer's counter) is absolutely necessary. The second however (set_ptr_cnt(&next_head, head_cnt+1);) is not, but instead is currently (but not reliably) causing the compiler to generate code that 'works'. The reason, I think, is that the stack is really volatile - meaning that the compiler cannot assume that pointer or data values do not change and cannot optimize access to these locations.So
_head maybe should be declared volatile element *_head;and other references to the stack maybe too. Having said that, I have modified the code to use
volatile for _head and elsewhere (see previous edit) but have been unable to make the compiler take any notice - in other words the assembler code generated with volatile has been the same as that without (gcc -S -O3 using gcc 4.2.1 and 4.4.5). So maybe volatile is not the answer...As regards coding style, I think the code is heavily over-commented. Most of the comments are not very useful and could be omitted. Also the variable names are verbose or cryptic. To make the code more readable, I did a global deletion of
_ptr, and a global replace of:local_head->head
next_head->next
_head->stack_head(leading underscores should be avoided as they are, I think, reserved, and in any case add nothing).
head_cnt->count
_cnt->_count
Finally, the compiler warns (-Wignored-qualifiers) against the
const in:static const unsigned short get_ptr_cnt(element** p) {Code Snippets
if(__sync_bool_compare_and_swap(&_head, local_head, to_be_pushed))
return;
set_ptr_cnt(&to_be_pushed, 0); // this lineset_ptr_cnt(&local_head, head_cnt);
set_ptr_cnt(&next_head, head_cnt+1); // this line
if(__sync_bool_compare_and_swap(&_head, local_head, next_head)) {volatile element *_head;static const unsigned short get_ptr_cnt(element** p) {Context
StackExchange Code Review Q#35502, answer score: 3
Revisions (0)
No revisions yet.