HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaMinor

ArrayBlockingQueue: concurrent put and take

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
takearrayblockingqueueputandconcurrent

Problem

I have implemented an ArrayBlockingQueue (put and take) on the lines of LinkedBlockingQueue i.e using two locks so that my take should not block my put.

For more information please see this Stack Overflow question. The code is also available at GitHub.

Please review the code for any race conditions.

```
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ArrayBlockingQueue {

private final int capacity;
private final AtomicInteger count = new AtomicInteger(0);
private final Lock putLock = new ReentrantLock();
private final Lock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition(); // putLock will signal that its not empty
private final Condition notFull = putLock.newCondition(); // signal when queue is notFull
private final Object [] items ;
int takeIndex;
int putIndex;

public ArrayBlockingQueue(int capacity) {
this.capacity = capacity;
items = new Object[capacity];

}

public void add(T t) throws InterruptedException {
int c = -1;
final Lock lock = this.putLock;
final AtomicInteger count = this.count;
lock.lock();
try {
while(count.get() == capacity) {
notFull.await();
}
enqueue(t);
c = count.getAndIncrement();
if(c+1 1) {
notEmpty.signal();
}
}finally {
lock.unlock();
}
if(c == capacity) {
signalNotFull();
}
return t;
}

private void signalNotEmpty() {
final Lock lock = this.takeLock;
lock.lock();
try {
notEmpty.signal();
}finally {
lock.unlock();
}
}

private void signalNotFull() {
final Lock lock = this.putLock;

Solution

Typo

In your take() function, you wrote this:

notFull.signal();


But I'm pretty sure you meant to write this:

notEmpty.signal();


Unnecessary Signalling? Nope

Originally, I thought that the extra signalling in the add() and take() functions were unnecessary. In fact, they are not required, but when they are added, it improves performance. It is a concept called "cascading notifies". The LinkedBlockingQueue sources explain it like this:


Also, to minimize need for puts to get takeLock and vice-versa, cascading notifies are used. When a put notices that it has enabled at least one take, it signals taker. That taker in turn signals others if more items have been entered since the signal. And symmetrically for takes signalling puts.

Race condition? Nope

Originally, I thought that the code had a race condition involving memory reordering when accessing the shared queue. However, it was pointed out that AtomicInteger acts as a volatile variable and provides a memory barrier. So therefore the accesses are safe.

Update

After reading @rolfl's answer with his multiple variants that attempt to improve upon the original function, I was inspired to come up with my own variant. I came up with one that is simpler than any of the variants listed here so far. It basically uses two semaphores, normal synchronization, and an array:

import java.util.concurrent.Semaphore;

public class ArrayBlockingSemQueue implements BlockQueue
{
    private final int capacity;
    private final Semaphore notEmpty;
    private final Semaphore notFull;
    private final Object [] items;
    int takeIndex;
    int putIndex;

    public ArrayBlockingSemQueue(int capacity) {
        this.capacity = capacity;
        notEmpty = new Semaphore(0);
        notFull  = new Semaphore(capacity);
        items = new Object[capacity];
    }

    public void add(T t) throws InterruptedException {
        notFull.acquire();

        synchronized(notFull) {
            items[putIndex++] = t;
            if (putIndex == capacity)
                putIndex = 0;
        }

        notEmpty.release();
    }

    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        T ret;

        notEmpty.acquire();

        synchronized(notEmpty) {
            ret = (T) items[takeIndex++];
            if (takeIndex == capacity)
                takeIndex = 0;
        }

        notFull.release();
        return ret;
    }
}


Benchmarks

I used the same code that @rolfl listed in his answer to test the timings of the various Queues. Here are my results:

1x1 2x2 3x3 4x4 6x6 12x12 1x12 2x6 3x4 4x3 6x2 12x1
Veritas 3.32 3.39 3.54 3.59 3.54 3.62 6.63 3.95 3.67 3.68 3.88 6.16
DoubleSync 3.72 4.97 4.87 4.94 5.07 4.78 5.43 5.01 4.96 4.93 5.01 5.41
CDLatch 2.89 2.56 4.62 5.02 5.83 8.01 8.25 5.95 5.24 3.60 2.58 2.65
JS1 3.05 3.00 3.18 3.08 3.04 3.09 3.47 3.13 3.26 3.07 3.00 3.15

Code Snippets

notFull.signal();
notEmpty.signal();
import java.util.concurrent.Semaphore;

public class ArrayBlockingSemQueue<T> implements BlockQueue<T>
{
    private final int capacity;
    private final Semaphore notEmpty;
    private final Semaphore notFull;
    private final Object [] items;
    int takeIndex;
    int putIndex;

    public ArrayBlockingSemQueue(int capacity) {
        this.capacity = capacity;
        notEmpty = new Semaphore(0);
        notFull  = new Semaphore(capacity);
        items = new Object[capacity];
    }

    public void add(T t) throws InterruptedException {
        notFull.acquire();

        synchronized(notFull) {
            items[putIndex++] = t;
            if (putIndex == capacity)
                putIndex = 0;
        }

        notEmpty.release();
    }

    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        T ret;

        notEmpty.acquire();

        synchronized(notEmpty) {
            ret = (T) items[takeIndex++];
            if (takeIndex == capacity)
                takeIndex = 0;
        }

        notFull.release();
        return ret;
    }
}

Context

StackExchange Code Review Q#87141, answer score: 3

Revisions (0)

No revisions yet.