HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaMinor

Producer/Consumer implementation

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
implementationproducerconsumer

Problem

I am using Netty embedded within Grails to process and display incoming SNMP messages. Since Netty 4 doesn't come with a built-in ChannelExecutionHandler I had to make my own and would like some feedback on it.

```
package org.ciscotalk.common.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.socket.DatagramPacket;
import lombok.Getter;
import lombok.Setter;
import org.ciscotalk.common.netty.exec.ChannelDecoderEvent;
import org.ciscotalk.common.snmp.SnmpMessage;
import org.ciscotalk.snmp.SnmpService;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.concurrent.*;

/**
* @author Diljot
*/
@ChannelHandler.Sharable
public class ChannelExecutionHandler
extends CombinedChannelDuplexHandler
{

@Autowired
@Getter @Setter
private SnmpService snmpService;

private final ExecutorService executor = Executors.newCachedThreadPool();
private final BlockingDeque incoming = new LinkedBlockingDeque<>();
//TODO private static final BlockingDeque outgoing = new LinkedBlockingDeque<>();
private final BlockingDeque> futures = new LinkedBlockingDeque<>();

public ChannelExecutionHandler() {
init(new ChannelInboundExecutionHandler(), new ChannelOutboundExecutionHandler());
Executors.newSingleThreadExecutor().execute(producer);
Executors.newSingleThreadExecutor().execute(consumer);
}

public final Runnable producer = new Runnable() {
@Override
public void run() {
try {
ChannelDecoderEvent ev = nextEvent(incoming.take());
if (ev != null) {
Future f = executor.submit((Runnable) ev, ev);
futures.put(f);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};

public final Runnable consumer = new Runnable() {
@Override
public void run() {
try {

Solution

So your code/concept looks functional to me. There are some comments though... and they are a bit scattered, forgive me....

Threads

You use the Executors class to create three ExecutorServices. Each of these services create threads that are non-daemon threads...

This is a problem because you will need to manually shut down all threads before you can exit your program correctly.

Using the default ThreadFactory is also a problem because the threads have horrible names. It is very convenient to name your threads so that, if there are problems, the stack-traces and java dumps are more easy to interpret.

creating a class like:

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Simple thread-safe ThreadFactory implementation that can be
 * customised to have a special name-base, and daemon setting.
 */
public class CustomizableThreadFactory implements ThreadFactory {

    private final AtomicInteger idgen = new AtomicInteger();
    private final String basename;
    private final boolean daemon;

    /**
     * Create a new ThreadFactory with the given base-name and daemon settings.
     * Each thread created from this factory will have a unique name (basename + " - " + id)
     * @param basename the basename to use for threads from this factory
     * @param daemon whether the threads are daemon threads, or not (see Thread.setDaemon(boolean) )
     */
    public CustomizableThreadFactory(final String basename, final boolean daemon) {
        this.basename = basename;
        this.daemon = daemon;
    }

    @Override
    public Thread newThread(final Runnable torun) {
        Thread thread = new Thread(torun, String.format("%s - %d", basename, idgen.incrementAndGet()));
        thread.setDaemon(daemon);
        return thread;
    }

}


You can use that class anywhere you use an ExecutorService, and the threads will be much nicer to use. Consider your constructor:

private final ExecutorService executor = Executors.newCachedThreadPool();
private final BlockingDeque incoming = new LinkedBlockingDeque<>();
//TODO private static final BlockingDeque outgoing = new LinkedBlockingDeque<>();
private final BlockingDeque> futures = new LinkedBlockingDeque<>();

public ChannelExecutionHandler() {
    init(new ChannelInboundExecutionHandler(), new ChannelOutboundExecutionHandler());
    Executors.newSingleThreadExecutor().execute(producer);
    Executors.newSingleThreadExecutor().execute(consumer);
}


This code would be better as:

private final ExecutorService executor = Executors.newCachedThreadPool(
      new CustomizedThreadFactory("SNMP Decoder" + true));
private final BlockingDeque incoming = new LinkedBlockingDeque<>();
private final BlockingDeque> futures = new LinkedBlockingDeque<>();

public ChannelExecutionHandler() {
    init(new ChannelInboundExecutionHandler(), new ChannelOutboundExecutionHandler());
    Executors.newSingleThreadExecutor(
         new CustomizedThreadFactory("SNMP Producer", true)).execute(producer);
    Executors.newSingleThreadExecutor(
         new CustomizedThreadFactory("SNMP Consumer", true)).execute(consumer);
}


Alright, apart from the thread daemon status and names, the rest of the thread model looks OK.....

You have a thread that queues the events in an ordered queue, then a bunch of threads that decode them in paralle, and a final thread that removes them in the same order as their insert order.

From what I can tell, this is good.

Generics

You go part way to working with your generics, but then you get lazy, or there is an inconsistency in your code.

Inconsistency:

you have Future f = executor.submit((Runnable) ev, ev); where ev is ChannelDecoderEvent ev = nextEvent(incoming.take());

Now, ChannelDecoderEvent is:

public class ChannelDecoderEvent implements Callable { ...


So, you are creating a future that will result in a ChannelDecoderEvent output (that is what the ExecutorService.submit(...) does).

But, when you pull values from the futures queue with:

Future f = futures.take();
            if (f != null) {
                Object o = f.get();
                snmpService.pushToQueue((SnmpMessage) o);


you are casting the o value to be an SnmpMessage, but that o value is a ChannelDecoderEvent, and that is not an SnmpMessage, so you will get ClassCastException.

If you did it right ....

Your generics should be worked correctly the whole way though.

There is no need for Future, it should be Future, and instead of loading the executors as a submit(Runnable r, T result); you should be using the correct callable-nature of the ChannelDecoderEvent, and submitting as:

Future f = submit(ev);


Then, your future queue:

private final BlockingDeque> futures = new LinkedBlockingDeque<>();


should be:

private final BlockingDeque> futures = new LinkedBlockingDeque<>();


Finally, your queue retrieval code becomes:

```
Future f = futures.t

Code Snippets

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;


/**
 * Simple thread-safe ThreadFactory implementation that can be
 * customised to have a special name-base, and daemon setting.
 */
public class CustomizableThreadFactory implements ThreadFactory {

    private final AtomicInteger idgen = new AtomicInteger();
    private final String basename;
    private final boolean daemon;

    /**
     * Create a new ThreadFactory with the given base-name and daemon settings.
     * Each thread created from this factory will have a unique name (basename + " - " + id)
     * @param basename the basename to use for threads from this factory
     * @param daemon whether the threads are daemon threads, or not (see Thread.setDaemon(boolean) )
     */
    public CustomizableThreadFactory(final String basename, final boolean daemon) {
        this.basename = basename;
        this.daemon = daemon;
    }

    @Override
    public Thread newThread(final Runnable torun) {
        Thread thread = new Thread(torun, String.format("%s - %d", basename, idgen.incrementAndGet()));
        thread.setDaemon(daemon);
        return thread;
    }

}
private final ExecutorService executor = Executors.newCachedThreadPool();
private final BlockingDeque<ByteBuf> incoming = new LinkedBlockingDeque<>();
//TODO private static final BlockingDeque<?> outgoing = new LinkedBlockingDeque<>();
private final BlockingDeque<Future<?>> futures = new LinkedBlockingDeque<>();

public ChannelExecutionHandler() {
    init(new ChannelInboundExecutionHandler(), new ChannelOutboundExecutionHandler());
    Executors.newSingleThreadExecutor().execute(producer);
    Executors.newSingleThreadExecutor().execute(consumer);
}
private final ExecutorService executor = Executors.newCachedThreadPool(
      new CustomizedThreadFactory("SNMP Decoder" + true));
private final BlockingDeque<ByteBuf> incoming = new LinkedBlockingDeque<>();
private final BlockingDeque<Future<?>> futures = new LinkedBlockingDeque<>();

public ChannelExecutionHandler() {
    init(new ChannelInboundExecutionHandler(), new ChannelOutboundExecutionHandler());
    Executors.newSingleThreadExecutor(
         new CustomizedThreadFactory("SNMP Producer", true)).execute(producer);
    Executors.newSingleThreadExecutor(
         new CustomizedThreadFactory("SNMP Consumer", true)).execute(consumer);
}
public class ChannelDecoderEvent<T extends SnmpMessage> implements Callable<T> { ...
Future<?> f = futures.take();
            if (f != null) {
                Object o = f.get();
                snmpService.pushToQueue((SnmpMessage) o);

Context

StackExchange Code Review Q#43015, answer score: 6

Revisions (0)

No revisions yet.