patternjavaModerate
One producer multiple consumers
Viewed 0 times
oneconsumersmultipleproducer
Problem
I want to code the following scenario:
I have written the code and it is working, but I feel like it is sub-optimal, might have bugs and has room for improvement.
```
public static void main(String[] args) {
ConsumerProducer cp = new ConsumerProducer();
cp.StartconsumerProducer();
}
// Creates Threads and waits for them to finish till timeout
public class ConsumerProducer {
public void StartconsumerProducer() {
ConsumerProducerMonitor mon = new ConsumerProducerMonitor();
List threads = new ArrayList();
// Create a producer
Thread p1 = new Thread(new Producer(mon), "P1");
p1.start();
// Create consumer 1
Thread c1 = new Thread(new Consumer(mon), "C1");
c1.start();
// Create consumer 2
Thread c2 = new Thread(new Consumer(mon), "C2");
c2.start();
// Create consumer 3
Thread c3 = new Thread(new Consumer(mon), "C3");
c3.start();
threads.add(p1);
threads.add(c1);
threads.add(c2);
threads.add(c3);
for (int i = 0; i = 0) {
ret = mon.get(Thread.currentThread().getName());
}
}
private final ConsumerProducerMonitor mon;
}
// Produces and consumes items in round robin fashion. Uses Semaphores for the same.
public class ConsumerProducerMonitor {
// produces items
public synchronized void put(int item, String threadName) {
if (isProduced) {
return;
}
this.item = item;
System.out.println("Producer " + threadName + " put Item: " + this.item);
if (this.item == 99) {
i
- One Producer: produces n (n=100 here) objects
- m Consumers (m = 5 here): consumes k (k = 10 here) objects at a time in a round robin fashion; Consumer 1 first consumes the first 10 objects followed by Consumer 2 consuming another 10 objects and so on. This means that consumer 1 has to wait until consumer 5 is done consuming 10 objects.
I have written the code and it is working, but I feel like it is sub-optimal, might have bugs and has room for improvement.
```
public static void main(String[] args) {
ConsumerProducer cp = new ConsumerProducer();
cp.StartconsumerProducer();
}
// Creates Threads and waits for them to finish till timeout
public class ConsumerProducer {
public void StartconsumerProducer() {
ConsumerProducerMonitor mon = new ConsumerProducerMonitor();
List threads = new ArrayList();
// Create a producer
Thread p1 = new Thread(new Producer(mon), "P1");
p1.start();
// Create consumer 1
Thread c1 = new Thread(new Consumer(mon), "C1");
c1.start();
// Create consumer 2
Thread c2 = new Thread(new Consumer(mon), "C2");
c2.start();
// Create consumer 3
Thread c3 = new Thread(new Consumer(mon), "C3");
c3.start();
threads.add(p1);
threads.add(c1);
threads.add(c2);
threads.add(c3);
for (int i = 0; i = 0) {
ret = mon.get(Thread.currentThread().getName());
}
}
private final ConsumerProducerMonitor mon;
}
// Produces and consumes items in round robin fashion. Uses Semaphores for the same.
public class ConsumerProducerMonitor {
// produces items
public synchronized void put(int item, String threadName) {
if (isProduced) {
return;
}
this.item = item;
System.out.println("Producer " + threadName + " put Item: " + this.item);
if (this.item == 99) {
i
Solution
First of all it would be good to use the standard Java Conventions, i.e. method names start with a lower case letter.
i.e.
Also handle InterruptedException correctly.
For details have a look at the book Java Concurrency in Practice
In your case it does not make sense to catch the
The ArrayList for the threads is not necessary, you have a fix number of Threads, you can replace it by an Array.
You are not really waiting for all threads to fininsh. If one of your threads run in an timeout you simply ignore it. You can fix this by stetting a flag that signals your thread to terminate.
I've done this by adding a class
Your
The
There are a lot of issues in your code so I've decided to rewrite it in a way that actually works.
It uses a Queue to transport the elements
The
The
In the
```
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class ConsumerProducer {
public static void main(String[] args) throws InterruptedException {
ConsumerProducer cp = new ConsumerProducer();
cp.run();
}
// Creates Threads and waits for them to finish till timeout
private LinkedBlockingQueue queue = new LinkedBlockingQueue<>();
public void run() throws InterruptedException {
Terminatable[] terminateables = new Terminatable[] {
new Producer(queue), new Consumer(queue),
new Consumer(queue), new Consumer(queue), };
startThreads(terminateables);
join(terminateables);
}
private static void join(Terminatable[] terminatables)
throws InterruptedException {
List maybeNotTerminated = Arrays
.asList(terminatables);
while (!maybeNotTerminated.isEmpty()) {
for (int idx = maybeNotTerminated.size() - 1; idx >= 0; --idx) {
Terminatable t = maybeNotTerminated.get(idx);
if (t.isAlive()) {
t.join(20_000);
// force termination
t.terminate = true;
} else {
maybeNotTerminated.remove(idx);
}
}
}
}
private static void startThreads(Runnable[] runnables) {
for (int i = 0; i queue;
Producer(Queue queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i queue;
private final int myIdx;
private static int consumerCounter = 0;
private static AtomicInteger shouldRun = new AtomicInteger(0);
public Consumer(Queue queue) {
this.queue = queue;
myIdx = consumerCounter;
consumerCounter++;
}
@Override
// very long with two nested loops
// should be refactored.
public void run() {
while (!terminate) {
for (int idx = 0; idx < 10; ++idx) {
try {
// wait until elements are produced
do {
if (queue.isEmpty()) {
synchronized (queue) {
queue.wait(100);
}
}
} while (shouldRun.get() != myIdx);
// continue only if this consumer should execute
Elem elem = queue.poll();
if (elem != null) {
System.out.println("Thread "
+ Thread.currentThread().getName()
+ " fetch Element " + elem.idx);
}
i.e.
StartconsumerProducer shold either be startconsumerProducer or simply run. Also handle InterruptedException correctly.
For details have a look at the book Java Concurrency in Practice
In your case it does not make sense to catch the
InterruptedException you can simply declare it as thrown.The ArrayList for the threads is not necessary, you have a fix number of Threads, you can replace it by an Array.
public static void main(String[] args) throws InterruptedException {
ConsumerProducer cp = new ConsumerProducer();
cp.run();
}You are not really waiting for all threads to fininsh. If one of your threads run in an timeout you simply ignore it. You can fix this by stetting a flag that signals your thread to terminate.
I've done this by adding a class
Terminatable which Consumer and Producer extend and which have a flag that signals them to terminate as soon as possible.Your
ConsumerProducerMonitor contains only one value which is overwritten by the Producer Loop via the put method, nothing prevents this.The
notifyAll calls are useless because nobody actually waits for the ConsumerProducerMonitorThere are a lot of issues in your code so I've decided to rewrite it in a way that actually works.
It uses a Queue to transport the elements
Elem from the Producer to the ConsumerThe
Producer notifies any waiting Consumer via notifyAll about new Elements.The
Consumer uses an Index to determine whether he should pull elements from the queue and increments the shouldRun after he has taken the 10 elements from the queue.In the
Consumer I give you an example how the wait() in Java can be used, and a possible way how to deal with InterruptedExcpetions. ```
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class ConsumerProducer {
public static void main(String[] args) throws InterruptedException {
ConsumerProducer cp = new ConsumerProducer();
cp.run();
}
// Creates Threads and waits for them to finish till timeout
private LinkedBlockingQueue queue = new LinkedBlockingQueue<>();
public void run() throws InterruptedException {
Terminatable[] terminateables = new Terminatable[] {
new Producer(queue), new Consumer(queue),
new Consumer(queue), new Consumer(queue), };
startThreads(terminateables);
join(terminateables);
}
private static void join(Terminatable[] terminatables)
throws InterruptedException {
List maybeNotTerminated = Arrays
.asList(terminatables);
while (!maybeNotTerminated.isEmpty()) {
for (int idx = maybeNotTerminated.size() - 1; idx >= 0; --idx) {
Terminatable t = maybeNotTerminated.get(idx);
if (t.isAlive()) {
t.join(20_000);
// force termination
t.terminate = true;
} else {
maybeNotTerminated.remove(idx);
}
}
}
}
private static void startThreads(Runnable[] runnables) {
for (int i = 0; i queue;
Producer(Queue queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i queue;
private final int myIdx;
private static int consumerCounter = 0;
private static AtomicInteger shouldRun = new AtomicInteger(0);
public Consumer(Queue queue) {
this.queue = queue;
myIdx = consumerCounter;
consumerCounter++;
}
@Override
// very long with two nested loops
// should be refactored.
public void run() {
while (!terminate) {
for (int idx = 0; idx < 10; ++idx) {
try {
// wait until elements are produced
do {
if (queue.isEmpty()) {
synchronized (queue) {
queue.wait(100);
}
}
} while (shouldRun.get() != myIdx);
// continue only if this consumer should execute
Elem elem = queue.poll();
if (elem != null) {
System.out.println("Thread "
+ Thread.currentThread().getName()
+ " fetch Element " + elem.idx);
}
Code Snippets
public static void main(String[] args) throws InterruptedException {
ConsumerProducer cp = new ConsumerProducer();
cp.run();
}import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class ConsumerProducer {
public static void main(String[] args) throws InterruptedException {
ConsumerProducer cp = new ConsumerProducer();
cp.run();
}
// Creates Threads and waits for them to finish till timeout
private LinkedBlockingQueue<Elem> queue = new LinkedBlockingQueue<>();
public void run() throws InterruptedException {
Terminatable[] terminateables = new Terminatable[] {
new Producer(queue), new Consumer(queue),
new Consumer(queue), new Consumer(queue), };
startThreads(terminateables);
join(terminateables);
}
private static void join(Terminatable[] terminatables)
throws InterruptedException {
List<Terminatable> maybeNotTerminated = Arrays
.asList(terminatables);
while (!maybeNotTerminated.isEmpty()) {
for (int idx = maybeNotTerminated.size() - 1; idx >= 0; --idx) {
Terminatable t = maybeNotTerminated.get(idx);
if (t.isAlive()) {
t.join(20_000);
// force termination
t.terminate = true;
} else {
maybeNotTerminated.remove(idx);
}
}
}
}
private static void startThreads(Runnable[] runnables) {
for (int i = 0; i < runnables.length; i++) {
new Thread(runnables[i]).start();
}
}
static abstract private class Terminatable implements Runnable {
volatile boolean terminate = false;
public boolean isAlive() {
return Thread.currentThread().isAlive();
}
public void join(long millis) throws InterruptedException {
Thread.currentThread().join(millis);
}
}
static private class Elem {
public final int idx;
public final String name;
public Elem(int i, String name) {
this.idx = i;
this.name = name;
}
}
// Producer class
static private class Producer extends Terminatable implements Runnable {
private Queue<Elem> queue;
Producer(Queue<Elem> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i <= 100; ++i) {
Elem e = new Elem(i, Thread.currentThread().getName());
queue.offer(e);
synchronized (queue) {
queue.notifyAll();
}
}
}
}
// Consumer Class
static private class Consumer extends Terminatable implements Runnable {
privateContext
StackExchange Code Review Q#93718, answer score: 10
Revisions (0)
No revisions yet.