patternjavaMinor
ArrayBlockingQueue: concurrent put and take
Viewed 0 times
takearrayblockingqueueputandconcurrent
Problem
I have implemented an
For more information please see this Stack Overflow question. The code is also available at GitHub.
Please review the code for any race conditions.
```
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ArrayBlockingQueue {
private final int capacity;
private final AtomicInteger count = new AtomicInteger(0);
private final Lock putLock = new ReentrantLock();
private final Lock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition(); // putLock will signal that its not empty
private final Condition notFull = putLock.newCondition(); // signal when queue is notFull
private final Object [] items ;
int takeIndex;
int putIndex;
public ArrayBlockingQueue(int capacity) {
this.capacity = capacity;
items = new Object[capacity];
}
public void add(T t) throws InterruptedException {
int c = -1;
final Lock lock = this.putLock;
final AtomicInteger count = this.count;
lock.lock();
try {
while(count.get() == capacity) {
notFull.await();
}
enqueue(t);
c = count.getAndIncrement();
if(c+1 1) {
notEmpty.signal();
}
}finally {
lock.unlock();
}
if(c == capacity) {
signalNotFull();
}
return t;
}
private void signalNotEmpty() {
final Lock lock = this.takeLock;
lock.lock();
try {
notEmpty.signal();
}finally {
lock.unlock();
}
}
private void signalNotFull() {
final Lock lock = this.putLock;
ArrayBlockingQueue (put and take) on the lines of LinkedBlockingQueue i.e using two locks so that my take should not block my put.For more information please see this Stack Overflow question. The code is also available at GitHub.
Please review the code for any race conditions.
```
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ArrayBlockingQueue {
private final int capacity;
private final AtomicInteger count = new AtomicInteger(0);
private final Lock putLock = new ReentrantLock();
private final Lock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition(); // putLock will signal that its not empty
private final Condition notFull = putLock.newCondition(); // signal when queue is notFull
private final Object [] items ;
int takeIndex;
int putIndex;
public ArrayBlockingQueue(int capacity) {
this.capacity = capacity;
items = new Object[capacity];
}
public void add(T t) throws InterruptedException {
int c = -1;
final Lock lock = this.putLock;
final AtomicInteger count = this.count;
lock.lock();
try {
while(count.get() == capacity) {
notFull.await();
}
enqueue(t);
c = count.getAndIncrement();
if(c+1 1) {
notEmpty.signal();
}
}finally {
lock.unlock();
}
if(c == capacity) {
signalNotFull();
}
return t;
}
private void signalNotEmpty() {
final Lock lock = this.takeLock;
lock.lock();
try {
notEmpty.signal();
}finally {
lock.unlock();
}
}
private void signalNotFull() {
final Lock lock = this.putLock;
Solution
Typo
In your
But I'm pretty sure you meant to write this:
Unnecessary Signalling? Nope
Originally, I thought that the extra signalling in the
Also, to minimize need for puts to get takeLock and vice-versa, cascading notifies are used. When a put notices that it has enabled at least one take, it signals taker. That taker in turn signals others if more items have been entered since the signal. And symmetrically for takes signalling puts.
Race condition? Nope
Originally, I thought that the code had a race condition involving memory reordering when accessing the shared queue. However, it was pointed out that
Update
After reading @rolfl's answer with his multiple variants that attempt to improve upon the original function, I was inspired to come up with my own variant. I came up with one that is simpler than any of the variants listed here so far. It basically uses two semaphores, normal synchronization, and an array:
Benchmarks
I used the same code that @rolfl listed in his answer to test the timings of the various Queues. Here are my results:
In your
take() function, you wrote this:notFull.signal();But I'm pretty sure you meant to write this:
notEmpty.signal();Unnecessary Signalling? Nope
Originally, I thought that the extra signalling in the
add() and take() functions were unnecessary. In fact, they are not required, but when they are added, it improves performance. It is a concept called "cascading notifies". The LinkedBlockingQueue sources explain it like this:Also, to minimize need for puts to get takeLock and vice-versa, cascading notifies are used. When a put notices that it has enabled at least one take, it signals taker. That taker in turn signals others if more items have been entered since the signal. And symmetrically for takes signalling puts.
Race condition? Nope
Originally, I thought that the code had a race condition involving memory reordering when accessing the shared queue. However, it was pointed out that
AtomicInteger acts as a volatile variable and provides a memory barrier. So therefore the accesses are safe.Update
After reading @rolfl's answer with his multiple variants that attempt to improve upon the original function, I was inspired to come up with my own variant. I came up with one that is simpler than any of the variants listed here so far. It basically uses two semaphores, normal synchronization, and an array:
import java.util.concurrent.Semaphore;
public class ArrayBlockingSemQueue implements BlockQueue
{
private final int capacity;
private final Semaphore notEmpty;
private final Semaphore notFull;
private final Object [] items;
int takeIndex;
int putIndex;
public ArrayBlockingSemQueue(int capacity) {
this.capacity = capacity;
notEmpty = new Semaphore(0);
notFull = new Semaphore(capacity);
items = new Object[capacity];
}
public void add(T t) throws InterruptedException {
notFull.acquire();
synchronized(notFull) {
items[putIndex++] = t;
if (putIndex == capacity)
putIndex = 0;
}
notEmpty.release();
}
@SuppressWarnings("unchecked")
public T take() throws InterruptedException {
T ret;
notEmpty.acquire();
synchronized(notEmpty) {
ret = (T) items[takeIndex++];
if (takeIndex == capacity)
takeIndex = 0;
}
notFull.release();
return ret;
}
}Benchmarks
I used the same code that @rolfl listed in his answer to test the timings of the various Queues. Here are my results:
1x1 2x2 3x3 4x4 6x6 12x12 1x12 2x6 3x4 4x3 6x2 12x1
Veritas 3.32 3.39 3.54 3.59 3.54 3.62 6.63 3.95 3.67 3.68 3.88 6.16
DoubleSync 3.72 4.97 4.87 4.94 5.07 4.78 5.43 5.01 4.96 4.93 5.01 5.41
CDLatch 2.89 2.56 4.62 5.02 5.83 8.01 8.25 5.95 5.24 3.60 2.58 2.65
JS1 3.05 3.00 3.18 3.08 3.04 3.09 3.47 3.13 3.26 3.07 3.00 3.15
Code Snippets
notFull.signal();notEmpty.signal();import java.util.concurrent.Semaphore;
public class ArrayBlockingSemQueue<T> implements BlockQueue<T>
{
private final int capacity;
private final Semaphore notEmpty;
private final Semaphore notFull;
private final Object [] items;
int takeIndex;
int putIndex;
public ArrayBlockingSemQueue(int capacity) {
this.capacity = capacity;
notEmpty = new Semaphore(0);
notFull = new Semaphore(capacity);
items = new Object[capacity];
}
public void add(T t) throws InterruptedException {
notFull.acquire();
synchronized(notFull) {
items[putIndex++] = t;
if (putIndex == capacity)
putIndex = 0;
}
notEmpty.release();
}
@SuppressWarnings("unchecked")
public T take() throws InterruptedException {
T ret;
notEmpty.acquire();
synchronized(notEmpty) {
ret = (T) items[takeIndex++];
if (takeIndex == capacity)
takeIndex = 0;
}
notFull.release();
return ret;
}
}Context
StackExchange Code Review Q#87141, answer score: 3
Revisions (0)
No revisions yet.