HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaMinor

Producer Consumer scenario implementation in Java

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
implementationjavaconsumerscenarioproducer

Problem

Please suggest improvements in the following java program I've written for producer-consumer scenario. The program seems to be working fine. Does it suffer from possible deadlock scenarios? How better I could have done this? Since I am using Stack read/write (push/pop) already been synchronized? What if they do not?

```
import java.util.Stack;

import logger.CustomLogger;

public class TestProducerConsumer {

private Stack buffer;
public static final int MAX_SIZE = 10;
public int count;

public TestProducerConsumer(){

buffer = new Stack();
count = 0;
}

public Stack getBuffer(){
return buffer;
}

public void addToBuffer(Integer i) throws StackException{

if(buffer.size() buf = pc.getBuffer();

while(true){
synchronized(pc){
if(buf.size() buf = pc.getBuffer();
int i;

while(true){

synchronized(pc){

if(buf.size() == 0){

try {
CustomLogger.logger.info("Consumer Sleeping");
pc.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else
{
try {
i = pc.removeFromBuffer();
CustomLogger.logger.info("poped "+i);

if(buf.size() == 0){
CustomLogger.logger.info("Wake up Producer");
pc.notifyAll();
}
} catch (StackException e) {
System.out.println(e.getError());
break;
}
}
}
}
}
}

class StackException extends Exception{
private String reason;

public StackException(){
super();
}
public StackException(String reason){

Solution

You could try using a queue as they are designed for this sort of thing. The code is much shorter.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class TestProducerConsumer {
    public static final int MAX_SIZE = 10;
    private final BlockingQueue tasks = new ArrayBlockingQueue(MAX_SIZE);
    public final ExecutorService executor = Executors.newCachedThreadPool();
    public final AtomicInteger count = new AtomicInteger();

    public static final int POISON_VALUE = -1;

    public void addToBuffer(Integer i) {
        try {
            tasks.put(i);
        } catch (InterruptedException e) {
            throw new AssertionError(e);
        }
    }

    public Integer removeFromBuffer() {
        try {
            return tasks.take();
        } catch (InterruptedException e) {
            throw new AssertionError(e);
        }
    }

    public static void main(String... args) {
        TestProducerConsumer pd = new TestProducerConsumer();
        pd.new Producer();
        pd.new Producer();
        pd.new Producer();

        pd.new Consumer();
        pd.new Consumer();
        pd.new Consumer();
        pd.new Consumer();
        pd.new Consumer();
    }

    class Producer implements Runnable {
        public Producer() {
            executor.execute(this);
        }

        public void run() {
            while (count.get() >= 0) {
                addToBuffer(count.getAndIncrement());
            }
            addToBuffer(TestProducerConsumer.POISON_VALUE);
        }
    }

    class Consumer implements Runnable {
        public Consumer() {
            executor.execute(this);
        }

        public void run() {
            Integer num;
            while ((num = removeFromBuffer()) != TestProducerConsumer.POISON_VALUE) {
                System.out.println("popped " + num);
            }
        }
    }
}

Code Snippets

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class TestProducerConsumer {
    public static final int MAX_SIZE = 10;
    private final BlockingQueue<Integer> tasks = new ArrayBlockingQueue<Integer>(MAX_SIZE);
    public final ExecutorService executor = Executors.newCachedThreadPool();
    public final AtomicInteger count = new AtomicInteger();

    public static final int POISON_VALUE = -1;

    public void addToBuffer(Integer i) {
        try {
            tasks.put(i);
        } catch (InterruptedException e) {
            throw new AssertionError(e);
        }
    }

    public Integer removeFromBuffer() {
        try {
            return tasks.take();
        } catch (InterruptedException e) {
            throw new AssertionError(e);
        }
    }

    public static void main(String... args) {
        TestProducerConsumer pd = new TestProducerConsumer();
        pd.new Producer();
        pd.new Producer();
        pd.new Producer();

        pd.new Consumer();
        pd.new Consumer();
        pd.new Consumer();
        pd.new Consumer();
        pd.new Consumer();
    }

    class Producer implements Runnable {
        public Producer() {
            executor.execute(this);
        }

        public void run() {
            while (count.get() >= 0) {
                addToBuffer(count.getAndIncrement());
            }
            addToBuffer(TestProducerConsumer.POISON_VALUE);
        }
    }

    class Consumer implements Runnable {
        public Consumer() {
            executor.execute(this);
        }

        public void run() {
            Integer num;
            while ((num = removeFromBuffer()) != TestProducerConsumer.POISON_VALUE) {
                System.out.println("popped " + num);
            }
        }
    }
}

Context

StackExchange Code Review Q#2538, answer score: 4

Revisions (0)

No revisions yet.