patterncsharpMinor
Continuously receive messages async from Azure Service Bus Queues
Viewed 0 times
asynccontinuouslybusreceiveazuremessagesservicefromqueues
Problem
I'd like to get input on my approach of continuously receiving messages from an Azure Service Bus queue using the async part of the library.
My main concern being whether its "safe" to use Task.Factory.StartNew to process the incoming messages and then continue to call the Action's to continue receiving new messages.
The processor class is started like this:
And then the actual continuous receival of messages from the (Azure Service Bus) QueueClient (from the Microsoft.ServiceBus.Messaging namespace):
When the process is stopped or disposed the Stop method is called in order to cancel the CancellationTokenSource as shown here:
```
public void Stop()
{
lock (_lockObject)
{
My main concern being whether its "safe" to use Task.Factory.StartNew to process the incoming messages and then continue to call the Action's to continue receiving new messages.
The processor class is started like this:
public void Start()
{
ThrowIfDisposed();
lock (_lockObject)
{
Trace.WriteLine(string.Format("Started '{0}' allocation.", _projectSettings.Name));
_cancellationSource = new CancellationTokenSource();
Task.Factory.StartNew(ReceiveAllocationMessages, _cancellationSource.Token);
}
}And then the actual continuous receival of messages from the (Azure Service Bus) QueueClient (from the Microsoft.ServiceBus.Messaging namespace):
private void ReceiveMessage()
{
Task.Run(async () =>
{
//Receive a new message async within 1 minute
return await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(1));
})
.ContinueWith(t =>
{
if (t.Result != null)
{
//The result is not null so we process the Brokered Message
Task.Factory.StartNew(() =>
{
ProcessMessage(t.Result)
ReceiveNextMessage();
}, _cancellationSource.Token);
}
else
{
// Continue receiving and processing
// new messages until told to stop.
ReceiveNextMessage();
}
});
}
private void ReceiveNextMessage()
{
if (_cancellationSource.IsCancellationRequested == false)
{
//Continue to receive new messages every 1 minute
Task.Delay(TimeSpan.FromMinutes(1))
.ContinueWith(t => ReceiveMessage());
}
}When the process is stopped or disposed the Stop method is called in order to cancel the CancellationTokenSource as shown here:
```
public void Stop()
{
lock (_lockObject)
{
Solution
First, some notes:
-
You shouldn't be using
-
I think you're using
If you're using it to support cancellation, you can just use
-
Don't use
-
Your your code, the processing can be
-
Your
Now, about the structure of your code: I think a much clearer way to write your code would be to use a loop, instead of recursion. That way, your top level method can be made very clear:
The
Note that this version behaves differently than yours with regards to exceptions. In your code, if an exception happens, it's either silently ignored (if it's in
You should then devise some way of reporting any exceptions on that
A simple solution would be to store the
You could also consider not stopping if some specific exception happens (e.g the server is temporarily unavailable). But that should happen only for some specific exceptions and those exceptions should be logged anyway, to help with debugging.
-
You shouldn't be using
Task.Factory.StartNew() unless you have to, Task.Run() is better.-
I think you're using
Task.Run() (and Task.Factory.StartNew()) and ContinueWith() too much. Only use Task.Run() when you actually need to run the code on another thread. And ContinueWith() is almost always better written using await.If you're using it to support cancellation, you can just use
ThrowIfCancellationRequested() instead.-
Don't use
== false, ! is much clearer.-
Your your code, the processing can be
Started two times. You should probably guard against that.-
Your
Start() and Stop() methods seems to be written with support for restarting in mind. It might be simpler to forbid that.Now, about the structure of your code: I think a much clearer way to write your code would be to use a loop, instead of recursion. That way, your top level method can be made very clear:
while (!_cancellationSource.IsCancellationRequested)
{
await ReceiveMessageAsync();
await Task.Delay(TimeSpan.FromMinutes(1));
}The
ReceiveMessageAsync() method could then look like this, which is a lot shorter and much more readable:private async Task ReceiveMessageAsync()
{
var message = await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(1));
if (message != null)
{
_cancellationSource.ThrowIfCancellationRequested();
ProcessMessage(message);
}
}Note that this version behaves differently than yours with regards to exceptions. In your code, if an exception happens, it's either silently ignored (if it's in
ReceiveAsync()) or it just silently stops receiving (if it's in ProcessMessage()). Both are very bad and make errors hard to debug. With my code, any exception stops receiving and is reported to the top-level Task.You should then devise some way of reporting any exceptions on that
Task.A simple solution would be to store the
Task in Start() and then rethrow any exceptions in Stop(). A cleaner solution might be to change void Start() into something like Task RunAsync() and report exceptions using the returned Task.You could also consider not stopping if some specific exception happens (e.g the server is temporarily unavailable). But that should happen only for some specific exceptions and those exceptions should be logged anyway, to help with debugging.
Code Snippets
while (!_cancellationSource.IsCancellationRequested)
{
await ReceiveMessageAsync();
await Task.Delay(TimeSpan.FromMinutes(1));
}private async Task ReceiveMessageAsync()
{
var message = await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(1));
if (message != null)
{
_cancellationSource.ThrowIfCancellationRequested();
ProcessMessage(message);
}
}Context
StackExchange Code Review Q#41462, answer score: 5
Revisions (0)
No revisions yet.