patternjavaMinor
Queue that connects multiple producers and multiple consumers
Viewed 0 times
connectsthatmultipleandproducersqueueconsumers
Problem
The scenario is about processing 'Message' objects. Producer creates them and Consumer does the consumption.
The following is a pipeline object (queue) that connects multiple producer threads and multiple consumer threads. Both my producers & consumers are I/O bound (database) and that justifies this attempt.
API requirements
```
public class MessagePipeline {
//[1]; to support API requirement [B]
public static final MessageBatch SENTINEL_BATCH = new MessageBatch(Collections.emptyList());
private BlockingQueue messageQueue = new LinkedBlockingQueue<>();
// to support API requirement [B]
private CountDownLatch producersWorkCompletionLatch;
public MessagePipeline(int numOfProducerThreads) {
this.producersWorkCompletionLatch = new CountDownLatch(numOfProducerThreads);
}
public boolean add(MessageBatch batch) {
isWriteAllowed(); // UPDATE: this step is misleading and thinking of removing this.
return messageQueue.addAll(batch.getMessages());
}
// returns a batch containing up to maxSize elements.
public MessageBatch takeBatch(int maxSize) {
List messages = drainIfMoreElementsExpected(new ArrayList(), maxSize);
return messages.isEmpty() ? SENTINEL_BATCH : new MessageBatch(messages);
}
//[2]: Producer should invoke this at the end.
public void notifyCompletion() {
producersWorkCompletionLatch.countDown();
}
public boolean isMoreElementsExpected() {
boolean isExpected = !(producersWorkCompletionLatch.getCount() == 0 && messageQueue.size() == 0);
return isExpected;
}
//[3] : This is CPU bound waiting. TODO: Make
class Message{...}
class MessageBatch{
public MessageBatch(Collection messages){...}
public List getMessages(){...};
//more utility methods like getMessageIds(), etc.
}The following is a pipeline object (queue) that connects multiple producer threads and multiple consumer threads. Both my producers & consumers are I/O bound (database) and that justifies this attempt.
API requirements
- [A] Ability to add & get batch of Message objects.
- [B] Consumer has a way to realize that no more elements are expected.
```
public class MessagePipeline {
//[1]; to support API requirement [B]
public static final MessageBatch SENTINEL_BATCH = new MessageBatch(Collections.emptyList());
private BlockingQueue messageQueue = new LinkedBlockingQueue<>();
// to support API requirement [B]
private CountDownLatch producersWorkCompletionLatch;
public MessagePipeline(int numOfProducerThreads) {
this.producersWorkCompletionLatch = new CountDownLatch(numOfProducerThreads);
}
public boolean add(MessageBatch batch) {
isWriteAllowed(); // UPDATE: this step is misleading and thinking of removing this.
return messageQueue.addAll(batch.getMessages());
}
// returns a batch containing up to maxSize elements.
public MessageBatch takeBatch(int maxSize) {
List messages = drainIfMoreElementsExpected(new ArrayList(), maxSize);
return messages.isEmpty() ? SENTINEL_BATCH : new MessageBatch(messages);
}
//[2]: Producer should invoke this at the end.
public void notifyCompletion() {
producersWorkCompletionLatch.countDown();
}
public boolean isMoreElementsExpected() {
boolean isExpected = !(producersWorkCompletionLatch.getCount() == 0 && messageQueue.size() == 0);
return isExpected;
}
//[3] : This is CPU bound waiting. TODO: Make
Solution
Since you say that your whole system is based on message passing you should take a serious look at akka. You get a rock solid message passing system, and you can even spread it on multiple machines in the future.
I have not checked in detail, but you have some very serious concurrency bugs. For example,
I don't like the
I have not checked in detail, but you have some very serious concurrency bugs. For example,
add checks if writing is allowed, then writes to the queue. The problem is that you did not synchronize add, so the check might pass, but before calling the next line, some other thread might bring the count down latch down to zero. BlockingQueue is thread-safe, but it is far from enough to make your code thread-safe.I don't like the
MessagePipeline api: it is too complex. I really don't see the advantage of writing or taking "batches" of messages. If the reader or writers want to process many messages, just let them read or write many messages themselves by processing their many messages in a loop. The api itself should only read/write one message at a time. In that case, your whole MessagePipeline is superfluous since a simple BlockingQueue is exactly what you want. You will also avoid many concurrency bugs. You would also need to keep the CountDownLatch or something similar to let the consumer know when it's over.Context
StackExchange Code Review Q#57533, answer score: 2
Revisions (0)
No revisions yet.