HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

Listen to multiple RabbitMQ queue by task and process the message

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
theprocessrabbitmqlistenmessagemultipleandqueuetask

Problem

Single app which listen to multiple RabbitMQ queue and process the message. This is working fine but not sure this implementation is right one or I am missing something.

Implementation is inspired from this answer https://stackoverflow.com/a/21847234/37571

```
//Message subscriber implementation
public class AuditSubscriber : IMessageSubscriber
{
public IList SubscribedRouteKeys
{
get { return new List()
{
".inceitive.attested."
};
}
}

public async Task Process(Core.MessageInfo MessageItem)
{
//Start new task to process the message
bool _ProcessedResult = await Task.Factory.StartNew(() => MessageProcesser(MessageItem), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);

return _ProcessedResult;
}

protected bool MessageProcesser(MessageInfo MessageItem)
{
Thread.Sleep(1000); //Acthual work
return true;
}

}

public class RabbitMQMessageConsumer : AbstractRabbitMQClient, IMessageConsumer
{

//Message consumer method, which will initiate number of tasks based upon the available subscriber.
public void Consume(CancellationToken token)
{
//Start Rabbit MQ connection
StartConnection(_ConnectionFactory.Get());

List tasks = new List();
foreach (SubscriberType subscriberType in (SubscriberType[])Enum.GetValues(typeof(SubscriberType)))
{
//Start listeing to all queues based upon the number of subscriber type availbale in the system
Task task = Task.Factory.StartNew(() => ConsumeMessage(subscriberType, token), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
tasks.Add(task);
}

Solution

public async Task Process(Core.MessageInfo MessageItem)
{
    //Start new task to process the message
   bool _ProcessedResult =  await Task.Factory.StartNew(() => MessageProcesser(MessageItem), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);

   return _ProcessedResult;
}


-
That's a weird name for a local variable. The common naming is camelCase (e.g. processedResult). Underscores are sometimes used for fields, but certainly not for locals.

-
The variable is actually unnecessary, you can just directly return the expression.

-
If all awaits in a method are return awaits, then you don't need await at all, just return the Task directly, after removing async from the signature.

-
Are you sure TaskCreationOptions.LongRunning is appropriate here? Its practical (and undocumented) effect is that it creates a new Thread to execute the Task. If you don't need that, just use Task.Run()

With all those changes the method would look like this:

public Task Process(Core.MessageInfo MessageItem)
{
    return Task.Run(() => MessageProcesser(MessageItem));
}


There are many empty lines in your code that I think are unnecessary. Empty lines are useful, but I think the way you're using them (after { or between two }) is just wasting space. And multiple empty lines are usually not useful either.

Task.WhenAll(tasks);


WhenAll() returns a Task that represents waiting for all the passed-in Tasks, so ignoring its return value like this doesn't make any sense. Ideally, you should await the returned Task, but for that you need async. And async void methods shouldn't be used. So, if you need to wait for all the Tasks here and you can't use await, you will have to block the thread by using Task.WaitAll(tasks).

consumer.Queue.Dequeue(1000, out eventArgs)


This looks like a blocking method. Isn't there an asynchronous version available? If there is, you should probably use that instead.

Code Snippets

public async Task<bool> Process(Core.MessageInfo MessageItem)
{
    //Start new task to process the message
   bool _ProcessedResult =  await Task<bool>.Factory.StartNew(() => MessageProcesser(MessageItem), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);

   return _ProcessedResult;
}
public Task<bool> Process(Core.MessageInfo MessageItem)
{
    return Task.Run(() => MessageProcesser(MessageItem));
}
Task.WhenAll(tasks);
consumer.Queue.Dequeue(1000, out eventArgs)

Context

StackExchange Code Review Q#42836, answer score: 6

Revisions (0)

No revisions yet.