HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

Continuously receive messages async from Azure Service Bus Queues

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
asynccontinuouslybusreceiveazuremessagesservicefromqueues

Problem

I'd like to get input on my approach of continuously receiving messages from an Azure Service Bus queue using the async part of the library.
My main concern being whether its "safe" to use Task.Factory.StartNew to process the incoming messages and then continue to call the Action's to continue receiving new messages.

The processor class is started like this:

public void Start()
{
    ThrowIfDisposed();
    lock (_lockObject)
    {
        Trace.WriteLine(string.Format("Started '{0}' allocation.", _projectSettings.Name));

        _cancellationSource = new CancellationTokenSource();
        Task.Factory.StartNew(ReceiveAllocationMessages, _cancellationSource.Token);
    }
}


And then the actual continuous receival of messages from the (Azure Service Bus) QueueClient (from the Microsoft.ServiceBus.Messaging namespace):

private void ReceiveMessage()
{
    Task.Run(async () =>
    {
        //Receive a new message async within 1 minute
        return await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(1));
    })
    .ContinueWith(t =>
    {
        if (t.Result != null)
        {
            //The result is not null so we process the Brokered Message
            Task.Factory.StartNew(() =>
            {
                ProcessMessage(t.Result)
                ReceiveNextMessage();
            }, _cancellationSource.Token);
        }
        else
        {
            // Continue receiving and processing 
            // new messages until told to stop.
            ReceiveNextMessage();
        }
    });
}

private void ReceiveNextMessage()
{
    if (_cancellationSource.IsCancellationRequested == false)
    {
        //Continue to receive new messages every 1 minute
        Task.Delay(TimeSpan.FromMinutes(1))
            .ContinueWith(t => ReceiveMessage());
    }
}


When the process is stopped or disposed the Stop method is called in order to cancel the CancellationTokenSource as shown here:

```
public void Stop()
{
lock (_lockObject)
{

Solution

First, some notes:

-
You shouldn't be using Task.Factory.StartNew() unless you have to, Task.Run() is better.

-
I think you're using Task.Run() (and Task.Factory.StartNew()) and ContinueWith() too much. Only use Task.Run() when you actually need to run the code on another thread. And ContinueWith() is almost always better written using await.

If you're using it to support cancellation, you can just use ThrowIfCancellationRequested() instead.

-
Don't use == false, ! is much clearer.

-
Your your code, the processing can be Started two times. You should probably guard against that.

-
Your Start() and Stop() methods seems to be written with support for restarting in mind. It might be simpler to forbid that.

Now, about the structure of your code: I think a much clearer way to write your code would be to use a loop, instead of recursion. That way, your top level method can be made very clear:

while (!_cancellationSource.IsCancellationRequested)
{
    await ReceiveMessageAsync();
    await Task.Delay(TimeSpan.FromMinutes(1));
}


The ReceiveMessageAsync() method could then look like this, which is a lot shorter and much more readable:

private async Task ReceiveMessageAsync()
{
    var message = await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(1));

    if (message != null)
    {
        _cancellationSource.ThrowIfCancellationRequested();
        ProcessMessage(message);
    }
}


Note that this version behaves differently than yours with regards to exceptions. In your code, if an exception happens, it's either silently ignored (if it's in ReceiveAsync()) or it just silently stops receiving (if it's in ProcessMessage()). Both are very bad and make errors hard to debug. With my code, any exception stops receiving and is reported to the top-level Task.

You should then devise some way of reporting any exceptions on that Task.

A simple solution would be to store the Task in Start() and then rethrow any exceptions in Stop(). A cleaner solution might be to change void Start() into something like Task RunAsync() and report exceptions using the returned Task.

You could also consider not stopping if some specific exception happens (e.g the server is temporarily unavailable). But that should happen only for some specific exceptions and those exceptions should be logged anyway, to help with debugging.

Code Snippets

while (!_cancellationSource.IsCancellationRequested)
{
    await ReceiveMessageAsync();
    await Task.Delay(TimeSpan.FromMinutes(1));
}
private async Task ReceiveMessageAsync()
{
    var message = await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(1));

    if (message != null)
    {
        _cancellationSource.ThrowIfCancellationRequested();
        ProcessMessage(message);
    }
}

Context

StackExchange Code Review Q#41462, answer score: 5

Revisions (0)

No revisions yet.