HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

Are there pitfalls to this solution to read messages from a queue in parallel?

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
thisfromsolutionreadaremessagesparallelpitfallstherequeue

Problem

I've posted a question on stackoverflow: How can I consequently read messages from a queue in parallel? I would like my own answer to be reviewed.
Situation

We have one message queue. We would like to process messages in parallel and limit the number of simultaneously processed messages.
Our solution

Based on answers given and some research of our own, we've come to a solution. We're using a SemaphoreSlim to limit our number of parallel Tasks.

Are there any pitfalls to this solution?
I'm also interested in any better solutions.

static string queue = @".\Private$\concurrenttest";

private static async Task Process(CancellationToken token)
{
    MessageQueue msMq = new MessageQueue(queue);
    msMq.Formatter = new XmlMessageFormatter(new Type[] { typeof(Command1) });
    SemaphoreSlim s = new SemaphoreSlim(15, 15);

    while (true)
    {
        await s.WaitAsync();
        await PeekAsync(msMq);
        Command1 message = await ReceiveAsync(msMq);
        Task.Run(async () =>
        {
            try
            {
                await HandleMessage(message);
            }
            catch (Exception)
            {
                // Exception handling
            }
            s.Release();
        });
    }
}

private static Task HandleMessage(Command1 message)
{
    Console.WriteLine("id: " + message.id + ", name: " + message.name);
    Thread.Sleep(1000);
    return Task.FromResult(1);
}

private static Task PeekAsync(MessageQueue msMq)
{
    return Task.Factory.FromAsync(msMq.BeginPeek(), msMq.EndPeek);
}

public class Command1
{
    public int id { get; set; }
    public string name { get; set; }
}

private static async Task ReceiveAsync(MessageQueue msMq)
{
    var receiveAsync = await Task.Factory.FromAsync(msMq.BeginReceive(), msMq.EndPeek);
    return (Command1)receiveAsync.Body;
}

Solution

Readability

Use keyword var any time that the initialization of the variable clearly tells what the variable represents. Avoid abbreviations in variable names.

var messageQueue = new MessageQueue(queue);
var semaphore = new SemaphoreSlim(15, 15);


Configuration

Avoid hard-coded settings:

var semaphore = new SemaphoreSlim(15, 15);


Make use of a pattern to allow the number of concurrent processing jobs to be configurable (instance property, configuration file, ..).

Main Loop

The main loop while (true) is considered a code smell by some because it's not clear what the exit conditions are. But what's definately an issue is that the loop never exits. Always make sure the loop has a graceful exit. Since there's a cancellation token provided, why not use it to drive the loop?

sand-box mode:

while (!token.IsCancellationRequested)
{
    // .. loop body
}


explicit mode:

while (true)
{
    token.ThrowIfCancellationRequested();
    // .. loop body
}


Threading Integrity

Once you have acquired the semaphore, you should immediately use the try-finally block to make sure to always release it.

await semaphore.WaitAsync();

// What if the code below throws an exception? The semaphore is not released!
await PeekAsync(messageQueue);
var message = await ReceiveAsync(messageQueue);


await semaphore.WaitAsync();
try
{
    // .. code while holding lock
}
finally 
{
    semaphore.Release();
}


MessageQueue

There is no reason to peek before receiving, because both operations are blocking on the same trigger - an incoming message.

await PeekAsync(messageQueue);
var message = await ReceiveAsync(messageQueue);


You can replace the above with:

var message = await ReceiveAsync(messageQueue);

Code Snippets

var messageQueue = new MessageQueue(queue);
var semaphore = new SemaphoreSlim(15, 15);
var semaphore = new SemaphoreSlim(15, 15);
while (!token.IsCancellationRequested)
{
    // .. loop body
}
while (true)
{
    token.ThrowIfCancellationRequested();
    // .. loop body
}
await semaphore.WaitAsync();

// What if the code below throws an exception? The semaphore is not released!
await PeekAsync(messageQueue);
var message = await ReceiveAsync(messageQueue);

Context

StackExchange Code Review Q#115788, answer score: 4

Revisions (0)

No revisions yet.