patterncsharpMinor
Are there pitfalls to this solution to read messages from a queue in parallel?
Viewed 0 times
thisfromsolutionreadaremessagesparallelpitfallstherequeue
Problem
I've posted a question on stackoverflow: How can I consequently read messages from a queue in parallel? I would like my own answer to be reviewed.
Situation
We have one message queue. We would like to process messages in parallel and limit the number of simultaneously processed messages.
Our solution
Based on answers given and some research of our own, we've come to a solution. We're using a
Are there any pitfalls to this solution?
I'm also interested in any better solutions.
Situation
We have one message queue. We would like to process messages in parallel and limit the number of simultaneously processed messages.
Our solution
Based on answers given and some research of our own, we've come to a solution. We're using a
SemaphoreSlim to limit our number of parallel Tasks.Are there any pitfalls to this solution?
I'm also interested in any better solutions.
static string queue = @".\Private$\concurrenttest";
private static async Task Process(CancellationToken token)
{
MessageQueue msMq = new MessageQueue(queue);
msMq.Formatter = new XmlMessageFormatter(new Type[] { typeof(Command1) });
SemaphoreSlim s = new SemaphoreSlim(15, 15);
while (true)
{
await s.WaitAsync();
await PeekAsync(msMq);
Command1 message = await ReceiveAsync(msMq);
Task.Run(async () =>
{
try
{
await HandleMessage(message);
}
catch (Exception)
{
// Exception handling
}
s.Release();
});
}
}
private static Task HandleMessage(Command1 message)
{
Console.WriteLine("id: " + message.id + ", name: " + message.name);
Thread.Sleep(1000);
return Task.FromResult(1);
}
private static Task PeekAsync(MessageQueue msMq)
{
return Task.Factory.FromAsync(msMq.BeginPeek(), msMq.EndPeek);
}
public class Command1
{
public int id { get; set; }
public string name { get; set; }
}
private static async Task ReceiveAsync(MessageQueue msMq)
{
var receiveAsync = await Task.Factory.FromAsync(msMq.BeginReceive(), msMq.EndPeek);
return (Command1)receiveAsync.Body;
}Solution
Readability
Use keyword
Configuration
Avoid hard-coded settings:
Make use of a pattern to allow the number of concurrent processing jobs to be configurable (instance property, configuration file, ..).
Main Loop
The main loop
sand-box mode:
explicit mode:
Threading Integrity
Once you have acquired the semaphore, you should immediately use the try-finally block to make sure to always release it.
MessageQueue
There is no reason to peek before receiving, because both operations are blocking on the same trigger - an incoming message.
You can replace the above with:
Use keyword
var any time that the initialization of the variable clearly tells what the variable represents. Avoid abbreviations in variable names.var messageQueue = new MessageQueue(queue);
var semaphore = new SemaphoreSlim(15, 15);Configuration
Avoid hard-coded settings:
var semaphore = new SemaphoreSlim(15, 15);Make use of a pattern to allow the number of concurrent processing jobs to be configurable (instance property, configuration file, ..).
Main Loop
The main loop
while (true) is considered a code smell by some because it's not clear what the exit conditions are. But what's definately an issue is that the loop never exits. Always make sure the loop has a graceful exit. Since there's a cancellation token provided, why not use it to drive the loop? sand-box mode:
while (!token.IsCancellationRequested)
{
// .. loop body
}explicit mode:
while (true)
{
token.ThrowIfCancellationRequested();
// .. loop body
}Threading Integrity
Once you have acquired the semaphore, you should immediately use the try-finally block to make sure to always release it.
await semaphore.WaitAsync();
// What if the code below throws an exception? The semaphore is not released!
await PeekAsync(messageQueue);
var message = await ReceiveAsync(messageQueue);await semaphore.WaitAsync();
try
{
// .. code while holding lock
}
finally
{
semaphore.Release();
}MessageQueue
There is no reason to peek before receiving, because both operations are blocking on the same trigger - an incoming message.
await PeekAsync(messageQueue);
var message = await ReceiveAsync(messageQueue);You can replace the above with:
var message = await ReceiveAsync(messageQueue);Code Snippets
var messageQueue = new MessageQueue(queue);
var semaphore = new SemaphoreSlim(15, 15);var semaphore = new SemaphoreSlim(15, 15);while (!token.IsCancellationRequested)
{
// .. loop body
}while (true)
{
token.ThrowIfCancellationRequested();
// .. loop body
}await semaphore.WaitAsync();
// What if the code below throws an exception? The semaphore is not released!
await PeekAsync(messageQueue);
var message = await ReceiveAsync(messageQueue);Context
StackExchange Code Review Q#115788, answer score: 4
Revisions (0)
No revisions yet.