HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaMinor

Using a Java queue

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
queueusingjava

Problem

I am developing a plugin for a system, the plugin catches some events in that system, wraps them and pushes out to another system for analysis. The overhead generated by the plugin must be as low as possible, because the main system will be blocked until the plugin finishes work. So what I have so far is that I catch the event and push it into a BlockingQueue and another thread is listening, if the queue gets an item it will try to push it out from the system.

@Override
public void start(){
    StatsManager statsManager = new StatsManager();
}

//catch event
public void taskFinished(){
    statsManager.pushData(Context.getData());
}


The StatsManager:

private BlockingQueue blockingQueue;

public StatsManager(){
    blockingQueue = new ArrayBlockingQueue(32768);
    SenderThread sender = new SenderThread(blockingQueue);
    new Thread(sender).start();
}

public void pushData(ItemFromContext ctxItem){
    try {
        MyItem item = CollectInterestingData(ctxItem);
        blockingQueue.put(item);
    } catch (InterruptedException e) {
        logger.error(e.toString());
        // Is the thread interrupted anyway? 
        Thread.currentThread().interrupt();
    }
}

private class SenderThread implements Runnable{

    private BlockingQueue blockingQueue;

    public SenderThread(BlockingQueue queue){
        blockingQueue = queue;
    }

    @Override
    public void run() {
        while (true){
            try {
                MyItem item = blockingQueue.take();
                postItem(item);

            } catch (InterruptedException e) {
                logger.error(e.toString());
                Thread.currentThread().interrupt();
            }
        }
    }


In the other end, a REST service is waiting. Once it receives an item, it pushes it into a BlockingQueue and returns. Another thread is listening to that queue and once item arrives, it writes it down to database.

What problem I do not foresee? One thing that seems obvious is th

Solution

private BlockingQueue blockingQueue;


Make the fields final and initialize them inline, if possible.

blockingQueue = new ArrayBlockingQueue(32768);


Usually, you should define a constant. IMHO it's OK to omit it, if the value appears only once and only in a field initializer (as it's well visible, anyway).

try {
    MyItem item = CollectInterestingData(ctxItem);
    blockingQueue.put(item);
} catch (InterruptedException e) {
     logger.error(e.toString());
     // Is the thread interrupted anyway? 
     Thread.currentThread().interrupt();
}


The comment makes no sense to me.

Re-interrupting a thread in an endless loop is rather pointless. All you get is an exception on the next call. This considerable slows down all the following operations and fills your logs for no gain at all.

Not interrupting, i.e., logging and swallowing the exception would be better. But interrupts are the only way how you can terminate a thread. So in the catch block you should test if termination is really desired and then conditionally exit the loop.


If it's relevant, I am expecting roughly 80000-100000 items per 24 hours.

If this were that many per second, you'd have a serious problem. Per minute should be doable (assuming the items are small).


So the balance here would be whether I risk losing some data or I risk blocking the whole system.

True. Measure, if you have enough memory to sustain long enough for someone coming and fixing it. Make sure, someone gets alerting before a problem happens. Test the queue for being half full or alike.


Also, would it make sense to implement the whole StatsManager as Runnable and use ThreadPoolExecutor to spin off wrapping and sending tasks?

Only if you want to parallelize the communication and are prepared to deal with out-of-order arrivals at the other end.


Could that be more efficient?

Not more efficient in terms of CPU usage, but maybe in terms of overall throughput. Before you optimize, ask yourself if you need it. How long does one item take? Multiply it by the itemsPerDay and look at what you get.


Would this be thread-safe?

Yes, unless your definition of safety includes in-order-arrival at the other end (which is AFAIK normally unwarranted anyway). The BlockingQueue and Executor are designed for such tasks.

Code Snippets

private BlockingQueue<MyItem> blockingQueue;
blockingQueue = new ArrayBlockingQueue<MyItem>(32768);
try {
    MyItem item = CollectInterestingData(ctxItem);
    blockingQueue.put(item);
} catch (InterruptedException e) {
     logger.error(e.toString());
     // Is the thread interrupted anyway? 
     Thread.currentThread().interrupt();
}

Context

StackExchange Code Review Q#71086, answer score: 3

Revisions (0)

No revisions yet.