HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaModerate

One producer multiple consumers

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
oneconsumersmultipleproducer

Problem

I want to code the following scenario:

  • One Producer: produces n (n=100 here) objects



  • m Consumers (m = 5 here): consumes k (k = 10 here) objects at a time in a round robin fashion; Consumer 1 first consumes the first 10 objects followed by Consumer 2 consuming another 10 objects and so on. This means that consumer 1 has to wait until consumer 5 is done consuming 10 objects.



I have written the code and it is working, but I feel like it is sub-optimal, might have bugs and has room for improvement.



```
public static void main(String[] args) {
ConsumerProducer cp = new ConsumerProducer();
cp.StartconsumerProducer();
}

// Creates Threads and waits for them to finish till timeout
public class ConsumerProducer {

public void StartconsumerProducer() {
ConsumerProducerMonitor mon = new ConsumerProducerMonitor();
List threads = new ArrayList();
// Create a producer
Thread p1 = new Thread(new Producer(mon), "P1");
p1.start();
// Create consumer 1
Thread c1 = new Thread(new Consumer(mon), "C1");
c1.start();
// Create consumer 2
Thread c2 = new Thread(new Consumer(mon), "C2");
c2.start();
// Create consumer 3
Thread c3 = new Thread(new Consumer(mon), "C3");
c3.start();
threads.add(p1);
threads.add(c1);
threads.add(c2);
threads.add(c3);

for (int i = 0; i = 0) {
ret = mon.get(Thread.currentThread().getName());
}
}

private final ConsumerProducerMonitor mon;
}

// Produces and consumes items in round robin fashion. Uses Semaphores for the same.
public class ConsumerProducerMonitor {

// produces items
public synchronized void put(int item, String threadName) {
if (isProduced) {
return;
}

this.item = item;
System.out.println("Producer " + threadName + " put Item: " + this.item);
if (this.item == 99) {
i

Solution

First of all it would be good to use the standard Java Conventions, i.e. method names start with a lower case letter.
i.e. StartconsumerProducer shold either be startconsumerProducer or simply run.

Also handle InterruptedException correctly.
For details have a look at the book Java Concurrency in Practice

In your case it does not make sense to catch the InterruptedException you can simply declare it as thrown.

The ArrayList for the threads is not necessary, you have a fix number of Threads, you can replace it by an Array.

public static void main(String[] args) throws InterruptedException {
    ConsumerProducer cp = new ConsumerProducer();
    cp.run();
}


You are not really waiting for all threads to fininsh. If one of your threads run in an timeout you simply ignore it. You can fix this by stetting a flag that signals your thread to terminate.

I've done this by adding a class Terminatable which Consumer and Producer extend and which have a flag that signals them to terminate as soon as possible.

Your ConsumerProducerMonitor contains only one value which is overwritten by the Producer Loop via the put method, nothing prevents this.
The notifyAll calls are useless because nobody actually waits for the ConsumerProducerMonitor

There are a lot of issues in your code so I've decided to rewrite it in a way that actually works.

It uses a Queue to transport the elements Elem from the Producer to the Consumer

The Producer notifies any waiting Consumer via notifyAll about new Elements.

The Consumer uses an Index to determine whether he should pull elements from the queue and increments the shouldRun after he has taken the 10 elements from the queue.

In the Consumer I give you an example how the wait() in Java can be used, and a possible way how to deal with InterruptedExcpetions.

```
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class ConsumerProducer {

public static void main(String[] args) throws InterruptedException {
ConsumerProducer cp = new ConsumerProducer();
cp.run();
}

// Creates Threads and waits for them to finish till timeout
private LinkedBlockingQueue queue = new LinkedBlockingQueue<>();

public void run() throws InterruptedException {
Terminatable[] terminateables = new Terminatable[] {
new Producer(queue), new Consumer(queue),
new Consumer(queue), new Consumer(queue), };

startThreads(terminateables);
join(terminateables);
}

private static void join(Terminatable[] terminatables)
throws InterruptedException {
List maybeNotTerminated = Arrays
.asList(terminatables);

while (!maybeNotTerminated.isEmpty()) {
for (int idx = maybeNotTerminated.size() - 1; idx >= 0; --idx) {
Terminatable t = maybeNotTerminated.get(idx);
if (t.isAlive()) {
t.join(20_000);
// force termination
t.terminate = true;
} else {
maybeNotTerminated.remove(idx);
}
}
}
}

private static void startThreads(Runnable[] runnables) {
for (int i = 0; i queue;

Producer(Queue queue) {
this.queue = queue;
}

@Override
public void run() {
for (int i = 0; i queue;
private final int myIdx;
private static int consumerCounter = 0;
private static AtomicInteger shouldRun = new AtomicInteger(0);

public Consumer(Queue queue) {
this.queue = queue;
myIdx = consumerCounter;
consumerCounter++;
}

@Override
// very long with two nested loops
// should be refactored.
public void run() {
while (!terminate) {
for (int idx = 0; idx < 10; ++idx) {
try {
// wait until elements are produced
do {
if (queue.isEmpty()) {
synchronized (queue) {
queue.wait(100);
}
}
} while (shouldRun.get() != myIdx);
// continue only if this consumer should execute
Elem elem = queue.poll();
if (elem != null) {
System.out.println("Thread "
+ Thread.currentThread().getName()
+ " fetch Element " + elem.idx);
}

Code Snippets

public static void main(String[] args) throws InterruptedException {
    ConsumerProducer cp = new ConsumerProducer();
    cp.run();
}
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class ConsumerProducer {

    public static void main(String[] args) throws InterruptedException {
        ConsumerProducer cp = new ConsumerProducer();
        cp.run();
    }

       // Creates Threads and waits for them to finish till timeout
        private LinkedBlockingQueue<Elem> queue = new LinkedBlockingQueue<>();

        public void run() throws InterruptedException {
            Terminatable[] terminateables = new Terminatable[] {
                    new Producer(queue), new Consumer(queue),
                    new Consumer(queue), new Consumer(queue), };

            startThreads(terminateables);
            join(terminateables);
        }

        private static void join(Terminatable[] terminatables)
                throws InterruptedException {
            List<Terminatable> maybeNotTerminated = Arrays
                    .asList(terminatables);

            while (!maybeNotTerminated.isEmpty()) {
                for (int idx = maybeNotTerminated.size() - 1; idx >= 0; --idx) {
                    Terminatable t = maybeNotTerminated.get(idx);
                    if (t.isAlive()) {
                        t.join(20_000);
                        // force termination
                        t.terminate = true;
                    } else {
                        maybeNotTerminated.remove(idx);
                    }
                }
            }
        }

        private static void startThreads(Runnable[] runnables) {
            for (int i = 0; i < runnables.length; i++) {
                new Thread(runnables[i]).start();
            }
        }

    static abstract private class Terminatable implements Runnable {
        volatile boolean terminate = false;

        public boolean isAlive() {
            return Thread.currentThread().isAlive();
        }

        public void join(long millis) throws InterruptedException {
            Thread.currentThread().join(millis);
        }
    }

    static private class Elem {
        public final int idx;
        public final String name;

        public Elem(int i, String name) {
            this.idx = i;
            this.name = name;
        }
    }

    // Producer class
    static private class Producer extends Terminatable implements Runnable {

        private Queue<Elem> queue;

        Producer(Queue<Elem> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i <= 100; ++i) {
                Elem e = new Elem(i, Thread.currentThread().getName());                 
                queue.offer(e);

                synchronized (queue) {
                    queue.notifyAll();
                }

            }
        }
    }

    // Consumer Class
    static private class Consumer extends Terminatable implements Runnable {

        private

Context

StackExchange Code Review Q#93718, answer score: 10

Revisions (0)

No revisions yet.