patternjavaMinor
Non blocking high performance message processor
Viewed 0 times
nonprocessormessagehighblockingperformance
Problem
I have some kind of high thoughput message handler receiving ordered messages. Now the task is - adding monitoring stuff in it in order to recognize wrong input messages, more exactly: it must notify about missed messages and just ignore the stale ones (with smaller order).
Due to high thoughput - it mustn't block.
So it must:
Let's have some long counter in message for ordering - here is my initial version:
```
public class OrderedNonBlockingProcessor implements YetAnotherMessageHandler {
private static Logger log = Logger.getLogger(OrderedNonBlockingProcessor.class);
private final AtomicLong messageCounter = new AtomicLong(0);
@Override
public boolean onSomeHighThroughputMessage(final YetAnotherMessage message) {
final long expected = messageCounter.getAndSet(message.getCounter()) + 1;
if (expected == message.getCounter()) {
processBusinessStuff(message);
return true;
} else if (expected > message.getCounter()) {
/* wrong message, attempt to restore the sequence to prevent an error on next good message
* TODO: fix ABA problem here
*/
messageCounter.compareAndSet(message.getCounter(), expected - 1);
log.error(String.format("messaging system ordering bug: got stale message %s while expected %s!",
message.getCounter(), expected));
// some other notifying stuff...
} else if (expected < message.getCounter()) {
log.error(String.format("got forward message %s while expected %s, missed: %s",
message.getCounter(), expected, message.getCounter() - expected));
// some other notifying stuff...
}
return false;
}
private void processBusinessStuff(YetAnotherMessage message) {
log.info(String.format("process message %s", message.getCounter()));
// some
Due to high thoughput - it mustn't block.
So it must:
- not use locks
- find missing messages
- find wrong ordered messages
Let's have some long counter in message for ordering - here is my initial version:
```
public class OrderedNonBlockingProcessor implements YetAnotherMessageHandler {
private static Logger log = Logger.getLogger(OrderedNonBlockingProcessor.class);
private final AtomicLong messageCounter = new AtomicLong(0);
@Override
public boolean onSomeHighThroughputMessage(final YetAnotherMessage message) {
final long expected = messageCounter.getAndSet(message.getCounter()) + 1;
if (expected == message.getCounter()) {
processBusinessStuff(message);
return true;
} else if (expected > message.getCounter()) {
/* wrong message, attempt to restore the sequence to prevent an error on next good message
* TODO: fix ABA problem here
*/
messageCounter.compareAndSet(message.getCounter(), expected - 1);
log.error(String.format("messaging system ordering bug: got stale message %s while expected %s!",
message.getCounter(), expected));
// some other notifying stuff...
} else if (expected < message.getCounter()) {
log.error(String.format("got forward message %s while expected %s, missed: %s",
message.getCounter(), expected, message.getCounter() - expected));
// some other notifying stuff...
}
return false;
}
private void processBusinessStuff(YetAnotherMessage message) {
log.info(String.format("process message %s", message.getCounter()));
// some
Solution
I think that there is a certain amount of fuzziness to the question but I'll make a couple of suggestions and perhaps they will help.
First of all, I would recommend giving serious consideration to using Akka actors (http://akka.io/). Actors are a very straightforward way to handle multi-threaded processing without locks.
Just to be clear, I think that you are saying that the program receives messages that have an order but does not necessarily receive them IN order and so it needs to restore the correct order before they can be processed.
If this is true, then I would recommend using TWO actors. The first actor would get the messages from wherever they are coming from and dealing with putting them into the correct order. I assume that it would be ok to cache out-of-order messages until it finds the next message to be processed, perhaps on a priority heap.
Once the first actor finds the next message to be processed, it would send it on to the second actor who would then handle the actual processing (by calling processBusinessStuff in your example).
First of all, I would recommend giving serious consideration to using Akka actors (http://akka.io/). Actors are a very straightforward way to handle multi-threaded processing without locks.
Just to be clear, I think that you are saying that the program receives messages that have an order but does not necessarily receive them IN order and so it needs to restore the correct order before they can be processed.
If this is true, then I would recommend using TWO actors. The first actor would get the messages from wherever they are coming from and dealing with putting them into the correct order. I assume that it would be ok to cache out-of-order messages until it finds the next message to be processed, perhaps on a priority heap.
Once the first actor finds the next message to be processed, it would send it on to the second actor who would then handle the actual processing (by calling processBusinessStuff in your example).
Context
StackExchange Code Review Q#10329, answer score: 2
Revisions (0)
No revisions yet.