HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

Parallel Task Queue that runs in sequence

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
sequenceparallelthatqueuetaskruns

Problem

I'm in need of a solution that runs constantly incoming requests in a per-resource sequence, but parallel in general.

The use-case:

Many clients connect to a server and start issuing work. The work of a single client needs to run in sequential order, so the downward code doesn't need to cope with concurrency, but in general all work should be run on multiple threads. I'm trusting the .NET framework a lot here, which I hope is a good thing.

I've also read into DataFlow and parallel Rx but could not find a general solution there. But hints into that direction are welcome!

class TaskGroup
{
    public int CurrentlyQueuedTasks { get { return _currentlyQueued; } }

    private readonly object _previousTaskMonitor;
    private Task _previousTask;
    private int _currentlyQueued;

    public TaskGroup()
    {
        _previousTaskMonitor = new object();
        _previousTask = Task.CompletedTask;
    }

    public void Append(Action action)
    {
        lock(_previousTaskMonitor)
        {
            Interlocked.Increment(ref _currentlyQueued);
            _previousTask = _previousTask.ContinueWith(task =>
            {
                try
                {
                    action();
                }catch(Exception)
                {
                    //TODO
                }
                finally
                {
                    Interlocked.Decrement(ref _currentlyQueued);
                }
            });
        }
    }
}

Solution

This is an interesting approach. I would have used a queue by default since it seems to express the semantics a bit clearer (the queuing is slightly more obvious). Also ContinueWith creates a Task wrapping the original task which I'm not sure if that has any form of performance downsides (it probably shouldn't). I hacked a quick benchmark together with the alternative being implemented using the BlockingCollection:

using System;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;

public interface IAppendable
{
    void Append(Action action);
}

public class TaskGroup : IAppendable
{
    public int CurrentlyQueuedTasks { get { return _currentlyQueued; } }

    private readonly object _previousTaskMonitor;
    private Task _previousTask;
    private int _currentlyQueued;

    public TaskGroup()
    {
        _previousTaskMonitor = new object();
        _previousTask = Task.FromResult(false);
    }

    public void Append(Action action)
    {
        lock(_previousTaskMonitor)
        {
            Interlocked.Increment(ref _currentlyQueued);
            _previousTask = _previousTask.ContinueWith(task =>
            {
                try
                {
                    action();
                }catch(Exception)
                {
                    //TODO
                }
                finally
                {
                    Interlocked.Decrement(ref _currentlyQueued);
                }
            });
        }
    }
}

public class QueueAppendable : IAppendable, IDisposable
{
    public int CurrentlyQueuedTasks { get { return _Queue.Count; } }

    BlockingCollection _Queue = new BlockingCollection();

    public QueueAppendable()
    {
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                try 
                {
                    var action = _Queue.Take();
                    action();
                }
                catch (InvalidOperationException)
                {
                    break;
                }
                catch
                {
                    // TODO log me
                }
            }
        });
    }

    public void Append(Action action)
    {
        _Queue.Add(action);
    }

    public void Dispose()
    {
        _Queue.CompleteAdding();
    }
}

public class Test
{
    public static void TimeIt(string name, IAppendable appendable)
    {
        var finishEvent = new ManualResetEvent(false);
        var sw = new Stopwatch();
        sw.Start();
        for (int i = 0; i  { Thread.Sleep(1); });
        }
        appendable.Append(() => { finishEvent.Set(); });
        finishEvent.WaitOne();
        sw.Stop();
        Console.WriteLine("{0} elapsed time: {1}ms", name, sw.ElapsedMilliseconds);
        (appendable as IDisposable)?.Dispose();
    }

    public static void Main()
    {
        TimeIt("TaskGroup", new TaskGroup());
        TimeIt("Queue", new QueueAppendable());
    }
}


Output:

TaskGroup elapsed time: 2135ms
Queue elapsed time: 2121ms


So there is pretty much no performance difference between the two however I think the BlockingCollection approach has a few advantages:

  • Easier to debug. You can simply set a break point and peek the queue. This is quite difficult to do with the wrapped task approach.



  • No use use of lower level synchronization primitives. The first time I read your code I instinctively thought "Hang he's got a lock why the Interlocked calls" until I realized that the decrement happen in the async task outside of the lock. With the BlockingQueue you program against a slightly higher level of abstraction which is often a good thing.



  • Fewer class members which reduce the state complexity of the object (the queue is the only member).



Apart from that I think your approach should be fine. You may want to consider adding support for cancellation via CancellationToken

Code Snippets

using System;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;

public interface IAppendable
{
    void Append(Action action);
}

public class TaskGroup : IAppendable
{
    public int CurrentlyQueuedTasks { get { return _currentlyQueued; } }

    private readonly object _previousTaskMonitor;
    private Task _previousTask;
    private int _currentlyQueued;

    public TaskGroup()
    {
        _previousTaskMonitor = new object();
        _previousTask = Task.FromResult(false);
    }

    public void Append(Action action)
    {
        lock(_previousTaskMonitor)
        {
            Interlocked.Increment(ref _currentlyQueued);
            _previousTask = _previousTask.ContinueWith(task =>
            {
                try
                {
                    action();
                }catch(Exception)
                {
                    //TODO
                }
                finally
                {
                    Interlocked.Decrement(ref _currentlyQueued);
                }
            });
        }
    }
}

public class QueueAppendable : IAppendable, IDisposable
{
    public int CurrentlyQueuedTasks { get { return _Queue.Count; } }

    BlockingCollection<Action> _Queue = new BlockingCollection<Action>();

    public QueueAppendable()
    {
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                try 
                {
                    var action = _Queue.Take();
                    action();
                }
                catch (InvalidOperationException)
                {
                    break;
                }
                catch
                {
                    // TODO log me
                }
            }
        });
    }

    public void Append(Action action)
    {
        _Queue.Add(action);
    }

    public void Dispose()
    {
        _Queue.CompleteAdding();
    }
}

public class Test
{
    public static void TimeIt(string name, IAppendable appendable)
    {
        var finishEvent = new ManualResetEvent(false);
        var sw = new Stopwatch();
        sw.Start();
        for (int i = 0; i < 2000; ++i)
        {
            appendable.Append(() => { Thread.Sleep(1); });
        }
        appendable.Append(() => { finishEvent.Set(); });
        finishEvent.WaitOne();
        sw.Stop();
        Console.WriteLine("{0} elapsed time: {1}ms", name, sw.ElapsedMilliseconds);
        (appendable as IDisposable)?.Dispose();
    }

    public static void Main()
    {
        TimeIt("TaskGroup", new TaskGroup());
        TimeIt("Queue", new QueueAppendable());
    }
}
TaskGroup elapsed time: 2135ms
Queue elapsed time: 2121ms

Context

StackExchange Code Review Q#119097, answer score: 4

Revisions (0)

No revisions yet.