HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaMinor

Running multiple kafka consumers on the same box independent of each other

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
sametheeachindependentrunningmultiplekafkaotherboxconsumers

Problem

I have two Kafka consumer ConsumerA and ConsumerB. I want to run these two kafka consumers independent of each other on the same machine. There is no relation between them at all. These two kafka consumer will work on different topics on the same machine.

  • Each consumer should have a different Properties object.



  • Each consumer should have a different thread pool configuration since they can be run in multithreaded way (consumer group) if needed independent of other consumer.



Here is my design:

Consumer class (abstract):

public abstract class Consumer implements Runnable {
    private final Properties consumerProps;
    private final String consumerName;

    public Consumer(String consumerName, Properties consumerProps) {
        this.consumerName = consumerName;
        this.consumerProps = consumerProps;
    }

    protected abstract void shutdown();
    protected abstract void run(String consumerName, Properties consumerProps);

    @Override
    public final void run() {
        run(consumerName, consumerProps);
    }
}


ConsumerA class:

```
public class ConsumerA extends Consumer {
private final AtomicBoolean closed = new AtomicBoolean(false);
private KafkaConsumer consumer;

public ConsumerA(String consumerName, Properties consumerProps) {
super(consumerName, consumerProps);
}

@Override
public void shutdown() {
closed.set(true);
consumer.wakeup();
}

@Override
protected void run(String consumerName, Properties consumerProps) {
consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(getTopicsBasisOnConsumerName());

Map config = new HashMap<>();
config.put(Config.URLS, TEST_URL);
GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);

try {
while (!closed.get()) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord record :

Solution

The majority of java coding standards recommend a 4-space indent (or a tabsize of 4 spaces). This code is indented with two.



Multithreading

The Docs you linked explicitly specify that Consumers are not Thread-Safe. While you're using the pattern from the docs there is a pretty big problem with how you use it:

public ConsumerHandler(Consumer consumer, int poolSize) {
    this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      this.consumers.add(consumer);
      executorServiceConsumer.submit(consumer);
    }
 }


Your code is reusing the same consumer instance for each thread in the pool. This is a bad idea, because calling the same method on the same instance of something is generally not considered threadsafe. Theoretically it should work just fine, but there is a problem that remains: You're calling shutdown on the same Consumer instance for each thread. That's theoretically not a major problem, because the method is written in a way that allows this.

But it's a waste. Since you're reusing the same instance anyways the following code in shutdown

for (Consumer consumer : consumers) {
      consumer.shutdown();
    }


is equivalent to:

for (int i = 0; i < consumers.size(); i++) {
    consumer.shutdown(),
}


Additionally of note is the fact that ConsumerHandler#shutdown does not in fact shut down anything. It just registers a shutdown hook with the JVM. As such this method is:

  • named incorrectly



  • not idempotent (aka. can't be called multiple times without problems)



  • inappropriately visible



I strongly suggest the following pattern instead, which allows you to properly shut down consumer threads, even when the JVM is not terminated:

public final class ConsumerHandler {
        private final ExecutorService executorServiceCustomer;
        private final Consumer consumer;
        private final Thread shutdownHook = new Thread() {
                @Override
                public void run() {
                        consumer.shutdown();
                        executorServiceCustomer.shutdown();
                }
        };

        // inside the constructor you should register the shutdownHook:
                Runtime.getRuntime().addShutdownHook(shutdownHook);

        // and inside the shutdown method,you should deregister it to allow finalization of dependent objects
        public void shutdown() {
                Runtime.getRuntime().removeShutdownHook(shutdownHook);
                shutdownHook.start();
                shutdownHook.join();
        }
}


Note that your shutdown hook is deregistered if it's not needed anymore. Additionally nobody can use your class to register any shutdown-hooks inadvertently and calling shutdown actually shuts the consumers down instead of having to wait for the JVM to terminate.

Interestingly your current implementation would only shut the consumers down when you call System.exit(). This is really undesired behaviour. That's the case because your ExecutorService does not use daemon threads, which means the JVM waits for the threads to terminate before exiting.

Timing

A final note about timing and waiting. Your shutdown hook can wait for up to a second before terminating. That's usually a bad sign. Instead of shutdown() and awaitTermination() you could've used shutdownNow().

Your consumers are extremely unlikely to free their assigned partition by themselves, since you gave them the astonishingly long time of almost 300 million years to report back to your cluster. Luckily the cluster doesn't feature quite that long timeouts between heartbeats, but you get the gist.

Code Snippets

</nitpicks>
public ConsumerHandler(Consumer consumer, int poolSize) {
    this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      this.consumers.add(consumer);
      executorServiceConsumer.submit(consumer);
    }
 }
for (Consumer consumer : consumers) {
      consumer.shutdown();
    }
for (int i = 0; i < consumers.size(); i++) {
    consumer.shutdown(),
}
public final class ConsumerHandler {
        private final ExecutorService executorServiceCustomer;
        private final Consumer consumer;
        private final Thread shutdownHook = new Thread() {
                @Override
                public void run() {
                        consumer.shutdown();
                        executorServiceCustomer.shutdown();
                }
        };

        // inside the constructor you should register the shutdownHook:
                Runtime.getRuntime().addShutdownHook(shutdownHook);

        // and inside the shutdown method,you should deregister it to allow finalization of dependent objects
        public void shutdown() {
                Runtime.getRuntime().removeShutdownHook(shutdownHook);
                shutdownHook.start();
                shutdownHook.join();
        }
}

Context

StackExchange Code Review Q#150174, answer score: 3

Revisions (0)

No revisions yet.