patternjavaMinor
Executor Service with Blocking Queue
Viewed 0 times
withblockingexecutorservicequeue
Problem
I did not find a single working implementation of using an Executor Service with a Blocking Queue. So , I have come with with a working sample. Thoughts?
```
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
class Producer implements Runnable {
protected BlockingQueue queue = null;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
for (int i = 0; i hashmap;
public Consumer(BlockingQueue queue,
ConcurrentHashMap hashmap) {
this.queue = queue;
this.hashmap = hashmap;
}
public void run() {
try {
while (ExecutorTest.isRunning) {
Integer i = (Integer) queue.take();
System.out.println("Consumer " + Thread.currentThread().getId()
+ ": taking Task : " + i);
if (i == -1) {
queue.put(i);
ExecutorTest.isRunning = false;
// System.out.println("Setting isRunning to false : "+Thread.currentThread().getId());
break;
}
hashmap.put(i, i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ExecutorTest {
static BlockingQueue queue = new LinkedBlockingQueue();
static ConcurrentHashMap hashmap = new ConcurrentHashMap();
static volatile boolean isRunning = true;
public static void main(String[] args) {
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue, hashmap);
new Thread(producer).start();
ExecutorService executorService = Executors.newFixedThreadPool(4);
executorServ
```
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
class Producer implements Runnable {
protected BlockingQueue queue = null;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
for (int i = 0; i hashmap;
public Consumer(BlockingQueue queue,
ConcurrentHashMap hashmap) {
this.queue = queue;
this.hashmap = hashmap;
}
public void run() {
try {
while (ExecutorTest.isRunning) {
Integer i = (Integer) queue.take();
System.out.println("Consumer " + Thread.currentThread().getId()
+ ": taking Task : " + i);
if (i == -1) {
queue.put(i);
ExecutorTest.isRunning = false;
// System.out.println("Setting isRunning to false : "+Thread.currentThread().getId());
break;
}
hashmap.put(i, i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ExecutorTest {
static BlockingQueue queue = new LinkedBlockingQueue();
static ConcurrentHashMap hashmap = new ConcurrentHashMap();
static volatile boolean isRunning = true;
public static void main(String[] args) {
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue, hashmap);
new Thread(producer).start();
ExecutorService executorService = Executors.newFixedThreadPool(4);
executorServ
Solution
Producer
Why not apply generics?
Why is this protected? Fields are typically private, and there doesn't seem any motivation in your example to widen visibility on the field itself
This is not the proper way to handle an
Consumer
Actually the remarks for
but also
Alternatively you could simply use interruption to stop the task :
But what's worse is that
ExecutorTest
Perhaps a small for loop, and make the literal 4 variable or a constant.
This is unnecessary, you already do
A busy wait like this is also a poorer way to await some other thread, as you are wasting CPU cycles.
protected BlockingQueue queue = null;Why not apply generics?
BlockingQueue avoids awkard casting later on.Why is this protected? Fields are typically private, and there doesn't seem any motivation in your example to widen visibility on the field itself
} catch (InterruptedException e) {
e.printStackTrace();
}This is not the proper way to handle an
InterruptedException. The ExecutorService uses interruption for forced shutdown (shutDownNow()), but since you eat the interruption the ExecutorService will not be responsive to forced shotdown. In general you would always propagate interruption to the caller.Consumer
Actually the remarks for
Producer go for Consumer as well.but also
while (ExecutorTest.isRunning) {Alternatively you could simply use interruption to stop the task :
while (!Thread.currentThread().isInterrupted()).But what's worse is that
isRunning is actually a field on ExecutorTest, This means Consumer class depends on ExecutorTest class for its proper working.ExecutorTest
ExecutorService executorService = Executors.newFixedThreadPool(4);
executorService.execute(consumer);
executorService.execute(consumer);
executorService.execute(consumer);
executorService.execute(consumer);Perhaps a small for loop, and make the literal 4 variable or a constant.
// wait for the threads to finish
while (isRunning) {
};This is unnecessary, you already do
executorService.awaitTermination(10, TimeUnit.SECONDS);A busy wait like this is also a poorer way to await some other thread, as you are wasting CPU cycles.
Code Snippets
protected BlockingQueue queue = null;} catch (InterruptedException e) {
e.printStackTrace();
}while (ExecutorTest.isRunning) {ExecutorService executorService = Executors.newFixedThreadPool(4);
executorService.execute(consumer);
executorService.execute(consumer);
executorService.execute(consumer);
executorService.execute(consumer);// wait for the threads to finish
while (isRunning) {
};Context
StackExchange Code Review Q#120059, answer score: 5
Revisions (0)
No revisions yet.