HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaMinor

Executor Service with Blocking Queue

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
withblockingexecutorservicequeue

Problem

I did not find a single working implementation of using an Executor Service with a Blocking Queue. So , I have come with with a working sample. Thoughts?

```
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

class Producer implements Runnable {

protected BlockingQueue queue = null;

public Producer(BlockingQueue queue) {
this.queue = queue;
}

public void run() {
try {

for (int i = 0; i hashmap;

public Consumer(BlockingQueue queue,
ConcurrentHashMap hashmap) {
this.queue = queue;
this.hashmap = hashmap;
}

public void run() {
try {

while (ExecutorTest.isRunning) {
Integer i = (Integer) queue.take();
System.out.println("Consumer " + Thread.currentThread().getId()
+ ": taking Task : " + i);
if (i == -1) {
queue.put(i);
ExecutorTest.isRunning = false;
// System.out.println("Setting isRunning to false : "+Thread.currentThread().getId());
break;
}
hashmap.put(i, i);
}

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public class ExecutorTest {

static BlockingQueue queue = new LinkedBlockingQueue();
static ConcurrentHashMap hashmap = new ConcurrentHashMap();
static volatile boolean isRunning = true;

public static void main(String[] args) {
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue, hashmap);

new Thread(producer).start();
ExecutorService executorService = Executors.newFixedThreadPool(4);

executorServ

Solution

Producer

protected BlockingQueue queue = null;


Why not apply generics? BlockingQueue avoids awkard casting later on.

Why is this protected? Fields are typically private, and there doesn't seem any motivation in your example to widen visibility on the field itself

} catch (InterruptedException e) {
    e.printStackTrace();
}


This is not the proper way to handle an InterruptedException. The ExecutorService uses interruption for forced shutdown (shutDownNow()), but since you eat the interruption the ExecutorService will not be responsive to forced shotdown. In general you would always propagate interruption to the caller.

Consumer

Actually the remarks for Producer go for Consumer as well.

but also

while (ExecutorTest.isRunning) {


Alternatively you could simply use interruption to stop the task : while (!Thread.currentThread().isInterrupted()).
But what's worse is that isRunning is actually a field on ExecutorTest, This means Consumer class depends on ExecutorTest class for its proper working.

ExecutorTest

ExecutorService executorService = Executors.newFixedThreadPool(4);

executorService.execute(consumer);
executorService.execute(consumer);
executorService.execute(consumer);
executorService.execute(consumer);


Perhaps a small for loop, and make the literal 4 variable or a constant.

// wait for the threads to finish
while (isRunning) {
};


This is unnecessary, you already do executorService.awaitTermination(10, TimeUnit.SECONDS);
A busy wait like this is also a poorer way to await some other thread, as you are wasting CPU cycles.

Code Snippets

protected BlockingQueue queue = null;
} catch (InterruptedException e) {
    e.printStackTrace();
}
while (ExecutorTest.isRunning) {
ExecutorService executorService = Executors.newFixedThreadPool(4);

executorService.execute(consumer);
executorService.execute(consumer);
executorService.execute(consumer);
executorService.execute(consumer);
// wait for the threads to finish
while (isRunning) {
};

Context

StackExchange Code Review Q#120059, answer score: 5

Revisions (0)

No revisions yet.