patterncsharpMinor
Parallel Task Queue that runs in sequence
Viewed 0 times
sequenceparallelthatqueuetaskruns
Problem
I'm in need of a solution that runs constantly incoming requests in a per-resource sequence, but parallel in general.
The use-case:
Many clients connect to a server and start issuing work. The work of a single client needs to run in sequential order, so the downward code doesn't need to cope with concurrency, but in general all work should be run on multiple threads. I'm trusting the .NET framework a lot here, which I hope is a good thing.
I've also read into
The use-case:
Many clients connect to a server and start issuing work. The work of a single client needs to run in sequential order, so the downward code doesn't need to cope with concurrency, but in general all work should be run on multiple threads. I'm trusting the .NET framework a lot here, which I hope is a good thing.
I've also read into
DataFlow and parallel Rx but could not find a general solution there. But hints into that direction are welcome!class TaskGroup
{
public int CurrentlyQueuedTasks { get { return _currentlyQueued; } }
private readonly object _previousTaskMonitor;
private Task _previousTask;
private int _currentlyQueued;
public TaskGroup()
{
_previousTaskMonitor = new object();
_previousTask = Task.CompletedTask;
}
public void Append(Action action)
{
lock(_previousTaskMonitor)
{
Interlocked.Increment(ref _currentlyQueued);
_previousTask = _previousTask.ContinueWith(task =>
{
try
{
action();
}catch(Exception)
{
//TODO
}
finally
{
Interlocked.Decrement(ref _currentlyQueued);
}
});
}
}
}Solution
This is an interesting approach. I would have used a queue by default since it seems to express the semantics a bit clearer (the queuing is slightly more obvious). Also
Output:
So there is pretty much no performance difference between the two however I think the
Apart from that I think your approach should be fine. You may want to consider adding support for cancellation via
ContinueWith creates a Task wrapping the original task which I'm not sure if that has any form of performance downsides (it probably shouldn't). I hacked a quick benchmark together with the alternative being implemented using the BlockingCollection:using System;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;
public interface IAppendable
{
void Append(Action action);
}
public class TaskGroup : IAppendable
{
public int CurrentlyQueuedTasks { get { return _currentlyQueued; } }
private readonly object _previousTaskMonitor;
private Task _previousTask;
private int _currentlyQueued;
public TaskGroup()
{
_previousTaskMonitor = new object();
_previousTask = Task.FromResult(false);
}
public void Append(Action action)
{
lock(_previousTaskMonitor)
{
Interlocked.Increment(ref _currentlyQueued);
_previousTask = _previousTask.ContinueWith(task =>
{
try
{
action();
}catch(Exception)
{
//TODO
}
finally
{
Interlocked.Decrement(ref _currentlyQueued);
}
});
}
}
}
public class QueueAppendable : IAppendable, IDisposable
{
public int CurrentlyQueuedTasks { get { return _Queue.Count; } }
BlockingCollection _Queue = new BlockingCollection();
public QueueAppendable()
{
Task.Factory.StartNew(() =>
{
while (true)
{
try
{
var action = _Queue.Take();
action();
}
catch (InvalidOperationException)
{
break;
}
catch
{
// TODO log me
}
}
});
}
public void Append(Action action)
{
_Queue.Add(action);
}
public void Dispose()
{
_Queue.CompleteAdding();
}
}
public class Test
{
public static void TimeIt(string name, IAppendable appendable)
{
var finishEvent = new ManualResetEvent(false);
var sw = new Stopwatch();
sw.Start();
for (int i = 0; i { Thread.Sleep(1); });
}
appendable.Append(() => { finishEvent.Set(); });
finishEvent.WaitOne();
sw.Stop();
Console.WriteLine("{0} elapsed time: {1}ms", name, sw.ElapsedMilliseconds);
(appendable as IDisposable)?.Dispose();
}
public static void Main()
{
TimeIt("TaskGroup", new TaskGroup());
TimeIt("Queue", new QueueAppendable());
}
}Output:
TaskGroup elapsed time: 2135ms
Queue elapsed time: 2121msSo there is pretty much no performance difference between the two however I think the
BlockingCollection approach has a few advantages:- Easier to debug. You can simply set a break point and peek the queue. This is quite difficult to do with the wrapped task approach.
- No use use of lower level synchronization primitives. The first time I read your code I instinctively thought "Hang he's got a
lockwhy theInterlockedcalls" until I realized that the decrement happen in the async task outside of the lock. With theBlockingQueueyou program against a slightly higher level of abstraction which is often a good thing.
- Fewer class members which reduce the state complexity of the object (the queue is the only member).
Apart from that I think your approach should be fine. You may want to consider adding support for cancellation via
CancellationTokenCode Snippets
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;
public interface IAppendable
{
void Append(Action action);
}
public class TaskGroup : IAppendable
{
public int CurrentlyQueuedTasks { get { return _currentlyQueued; } }
private readonly object _previousTaskMonitor;
private Task _previousTask;
private int _currentlyQueued;
public TaskGroup()
{
_previousTaskMonitor = new object();
_previousTask = Task.FromResult(false);
}
public void Append(Action action)
{
lock(_previousTaskMonitor)
{
Interlocked.Increment(ref _currentlyQueued);
_previousTask = _previousTask.ContinueWith(task =>
{
try
{
action();
}catch(Exception)
{
//TODO
}
finally
{
Interlocked.Decrement(ref _currentlyQueued);
}
});
}
}
}
public class QueueAppendable : IAppendable, IDisposable
{
public int CurrentlyQueuedTasks { get { return _Queue.Count; } }
BlockingCollection<Action> _Queue = new BlockingCollection<Action>();
public QueueAppendable()
{
Task.Factory.StartNew(() =>
{
while (true)
{
try
{
var action = _Queue.Take();
action();
}
catch (InvalidOperationException)
{
break;
}
catch
{
// TODO log me
}
}
});
}
public void Append(Action action)
{
_Queue.Add(action);
}
public void Dispose()
{
_Queue.CompleteAdding();
}
}
public class Test
{
public static void TimeIt(string name, IAppendable appendable)
{
var finishEvent = new ManualResetEvent(false);
var sw = new Stopwatch();
sw.Start();
for (int i = 0; i < 2000; ++i)
{
appendable.Append(() => { Thread.Sleep(1); });
}
appendable.Append(() => { finishEvent.Set(); });
finishEvent.WaitOne();
sw.Stop();
Console.WriteLine("{0} elapsed time: {1}ms", name, sw.ElapsedMilliseconds);
(appendable as IDisposable)?.Dispose();
}
public static void Main()
{
TimeIt("TaskGroup", new TaskGroup());
TimeIt("Queue", new QueueAppendable());
}
}TaskGroup elapsed time: 2135ms
Queue elapsed time: 2121msContext
StackExchange Code Review Q#119097, answer score: 4
Revisions (0)
No revisions yet.