patternjavaMinor
Shutdown method in queue implementations
Viewed 0 times
shutdownimplementationsqueuemethod
Problem
I am trying to implement a shutdown method in queue implementation. I took the code from BlockingQueue source from Java and trying to check the shutdown method. Will the following code be thread safe and an acceptable implementation?
```
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingQueue {
/ Node encapsulating the object that needs to be stored /
static class Node{
/ Make sure all the threads read / write are done on updated variable /
volatile E element;
Node next;
Node(E value) { element = value;}
}
private final int capacity;
private final AtomicInteger count = new AtomicInteger(0);
private final AtomicInteger shutdown = new AtomicInteger(0);
/ Need lock to make the thread re-entrant /
private final ReentrantLock putLock = new ReentrantLock(true);
private final ReentrantLock getLock = new ReentrantLock();
private final Condition queueNotEmpty = getLock.newCondition();
private final Condition queueNotFull = putLock.newCondition();
/* Make sure updates to head and tail variables are atomic
* May be redundant as I have already enclosed the updated to head and tail
* using locks.
*/
private AtomicReference> head, tail;
public BlockingQueue()
{
this.capacity = Integer.MAX_VALUE;
}
public BlockingQueue(int capacity){
if (capacity >(null);
}
public void add(E e) throws InterruptedException, Exception{
if (e == null) throw new NullPointerException();
/* We need to wait if there is not enough space on the queue
*
*/
final ReentrantLock lock = this.putLock;
final AtomicInteger count = this.count;
int c =-1;
lock.lockInterruptibly();
try {
try {
while(count.get() == capacity)
qu
```
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingQueue {
/ Node encapsulating the object that needs to be stored /
static class Node{
/ Make sure all the threads read / write are done on updated variable /
volatile E element;
Node next;
Node(E value) { element = value;}
}
private final int capacity;
private final AtomicInteger count = new AtomicInteger(0);
private final AtomicInteger shutdown = new AtomicInteger(0);
/ Need lock to make the thread re-entrant /
private final ReentrantLock putLock = new ReentrantLock(true);
private final ReentrantLock getLock = new ReentrantLock();
private final Condition queueNotEmpty = getLock.newCondition();
private final Condition queueNotFull = putLock.newCondition();
/* Make sure updates to head and tail variables are atomic
* May be redundant as I have already enclosed the updated to head and tail
* using locks.
*/
private AtomicReference> head, tail;
public BlockingQueue()
{
this.capacity = Integer.MAX_VALUE;
}
public BlockingQueue(int capacity){
if (capacity >(null);
}
public void add(E e) throws InterruptedException, Exception{
if (e == null) throw new NullPointerException();
/* We need to wait if there is not enough space on the queue
*
*/
final ReentrantLock lock = this.putLock;
final AtomicInteger count = this.count;
int c =-1;
lock.lockInterruptibly();
try {
try {
while(count.get() == capacity)
qu
Solution
My immediate thought is that this is an unacceptable implementation. In particular, you seem to be trying to shutdown the threads accessing the queue by signally conditions, which I think confuses things badly.
See Concurrent Programming in Java, Second Edition by Doug Lea -- especially Chapter 3 ("State Dependence").
Consider this use case: you have a service, with multiple publishers (
You seem to be implementing the idea that the user wants to shut down the queue. But I put it to you that it's more accurate that the user wants to shut down the service itself. The service itself maintains its own shutdown state, which the runnables can access. Invoking service shutdown then does two things - (1) it sets the shutdown flag, so that the producer and consumer flags can see the change, and (2) it interrupts the producers and consumers, so that they look at the shutdown flag.
So the usual idiom would be to run the publishers and consumers in an
In short,
See Concurrent Programming in Java, Second Edition by Doug Lea -- especially Chapter 3 ("State Dependence").
Consider this use case: you have a service, with multiple publishers (
Runnables) and consumers (also Runnables); publishers offer items to the queue, consumers take items from it. There's a UI, with a big red SHUTDOWN button on it. When the user hits that button, what does the user want?You seem to be implementing the idea that the user wants to shut down the queue. But I put it to you that it's more accurate that the user wants to shut down the service itself. The service itself maintains its own shutdown state, which the runnables can access. Invoking service shutdown then does two things - (1) it sets the shutdown flag, so that the producer and consumer flags can see the change, and (2) it interrupts the producers and consumers, so that they look at the shutdown flag.
So the usual idiom would be to run the publishers and consumers in an
ExecutorService, and after the shutdown flag has been set, ExecutorService.shutdownNow() would be invoked to signal the interruption to the running threads. The threads that are parked on your conditions (queueNotFull.await()) throw InterruptedExceptions, and the handlers for these exceptions can then check the shutdown flag and do the right thing.Condition.signalAll doesn't interrupt() the threads, it just gets them contending on the lock again. When they get the lock, they just return from await; which means they check the while condition, and then put themselves back to sleep (Condition.await) because it is still true.In short,
Locks and Conditions are for co-ordination; if you are trying to implement cancellation, you should be using interrupts.Context
StackExchange Code Review Q#87042, answer score: 2
Revisions (0)
No revisions yet.