HiveBrain v1.2.0
Get Started
← Back to all entries
patterncppMinor

Multi producer/consumers lockfree stack

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
lockfreemultistackproducerconsumers

Problem

Can you please take a look at the following x86-64 C++ code which should implement a multi consumer/produce lockfree stack? Do you think I have missed anything?

```
namespace lockfree {
// NOTE: this lock-free stack uses the highest 2 bytes
// of each pointer to store a 'counter' (an "id") to
// ensure that the ABA problem doesn't happen in case
// of multiple consumers.
// If the stack was to be read by _one only_ consumer
// the ABA problem wouldn't manifest and the code would
// not need the 'counter' escamotage.
// Using an unsigned short is ok-ish because 65536 values
// might not be enough in some cases. Ideally we should
// be using 32+ bits.
template
class stack {

struct element {
element *next;
const T data;

element(const T& data_) : data(data_) {
}
private:
element(const element&);
element& operator=(const element&);
};

// NOTE: we use the highest 16 bits as counter
// this trick should perform ok for quite
// _a while_ on Linux 64-bit...
static const unsigned short get_ptr_cnt(element** p) {
unsigned short us_p = (unsigned short )p;
return us_p[3];
}

// NOTE: same as above!
static void set_ptr_cnt(element** p, const unsigned short cnt) {
unsigned short us_p = (unsigned short )p;
us_p[3] = cnt;
}

element *_head;
public:

// this lockfree stack can only be initialised and destroyed
// in single threaded contexts
stack() : _head(0) {
}

// clean the stack (only in single threaded context!)
~stack() {
element *head_ptr = _head;
set_ptr_cnt(&head_ptr, 0);
while(head_ptr) {
element *next = head_ptr->next;
delete head_ptr;
head_ptr = next;

Solution

I don't see any problem with the locking, although it is usually quite difficult to see race conditions (which is why you posted it I guess). The code is clearly non-portable (but maybe that is not important), with regard to endian-ness and because unsigned short is not guaranteed to be 16 bits (uint16_t is).

I did wonder about two lines that seem unnecessary. Firstly, setting the count after the swap in push:

if(__sync_bool_compare_and_swap(&_head, local_head, to_be_pushed))
     return;
set_ptr_cnt(&to_be_pushed, 0); // this line


Secondly setting the count on next_head in pop

set_ptr_cnt(&local_head, head_cnt);
set_ptr_cnt(&next_head, head_cnt+1);  // this line
if(__sync_bool_compare_and_swap(&_head, local_head, next_head)) {


EDIT - after discussion with Emanuele, it is clear that the first of these (resetting the to_be_pushed pointer's counter) is absolutely necessary. The second however (set_ptr_cnt(&next_head, head_cnt+1);) is not, but instead is currently (but not reliably) causing the compiler to generate code that 'works'. The reason, I think, is that the stack is really volatile - meaning that the compiler cannot assume that pointer or data values do not change and cannot optimize access to these locations.

So _head maybe should be declared

volatile element *_head;


and other references to the stack maybe too. Having said that, I have modified the code to use volatile for _head and elsewhere (see previous edit) but have been unable to make the compiler take any notice - in other words the assembler code generated with volatile has been the same as that without (gcc -S -O3 using gcc 4.2.1 and 4.4.5). So maybe volatile is not the answer...

As regards coding style, I think the code is heavily over-commented. Most of the comments are not very useful and could be omitted. Also the variable names are verbose or cryptic. To make the code more readable, I did a global deletion of _ptr, and a global replace of:

  • local_head -> head



  • next_head -> next



  • _head -> stack_head (leading underscores should be avoided as they are, I think, reserved, and in any case add nothing).



  • head_cnt -> count



  • _cnt -> _count



Finally, the compiler warns (-Wignored-qualifiers) against the const in:

static const unsigned short get_ptr_cnt(element** p) {

Code Snippets

if(__sync_bool_compare_and_swap(&_head, local_head, to_be_pushed))
     return;
set_ptr_cnt(&to_be_pushed, 0); // this line
set_ptr_cnt(&local_head, head_cnt);
set_ptr_cnt(&next_head, head_cnt+1);  // this line
if(__sync_bool_compare_and_swap(&_head, local_head, next_head)) {
volatile element *_head;
static const unsigned short get_ptr_cnt(element** p) {

Context

StackExchange Code Review Q#35502, answer score: 3

Revisions (0)

No revisions yet.