HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

TaskScheduler that uses a dedicated thread

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
usesdedicatedtaskschedulerthreadthat

Problem

I'm trying to implement a TaskScheduler that runs all tasks in the order they are submitted, on a single dedicated thread. Here's my implementation, using a BlockingCollection:

class SequentialScheduler : TaskScheduler, IDisposable {
    readonly BlockingCollection m_taskQueue = new BlockingCollection();
    readonly Thread m_thread;
    bool m_disposed;

    public SequentialScheduler() {
        m_thread = new Thread(Run);
        m_thread.Start();
    }

    public void Dispose() {
        m_disposed = true;
    }

    void Run() {
        while (!m_disposed) {
            var task = m_taskQueue.Take();
            Debug.Assert(TryExecuteTask(task));
        }
    }

    protected override IEnumerable GetScheduledTasks() {
        return m_taskQueue;
    }

    protected override void QueueTask(Task task) {
        m_taskQueue.Add(task);
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) {
        if (Thread.CurrentThread == m_thread) {
            return TryExecuteTask(task);
        }
        return false;
    }
}


I've played with it a bit and it seems to work well. I have some lingering questions however:

  • As you can tell by the Debug.Assert, I'm not sure how TryExecuteTask could return false and I'm just assuming that given my current implementation, it won't. Can that actually happen and what I should I do if it does?



  • I'm not sure my implementation of TryExecuteTaskInline makes sense as it doesn't get called in my tests. If I understand correctly, this method should run the task synchronously if possible; hence why I'm checking if the current thread is the scheduler's dedicated thread.



  • Also, my implementation of Dispose doesn't wait for the current task to complete, and will cause the rest of the tasks in the queue to just wait there forever, but short of making it blocking and wait for the queue to empty, I don't see how to do that any differently. I just need a way to release the thr

Solution

TryExecuteTaskInline is used in what is called "task inlining": basically, when you call Wait() on a Task that didn't start executing yet, it might be executed on the current thread. A simple way to test that is:

var factory = new TaskFactory(new SequentialScheduler());

factory.StartNew(
    () =>
    {
        factory.StartNew(() => { }).Wait();
    });


For more information, see Stephen Toub's article Task.Wait and “Inlining”.

But this all means that a Task might be executed outside of your Run() loop, so the call to TryExecuteTask() there might return false. Because of that, you should simply ignore the return value there (just like the official example scheduler does, in its NotifyThreadPoolOfPendingWork()).

Another option would be to remove inlined Tasks from the queue, but there is no simple way to do that for BlockingCollection.

I think that m_disposed should be volatile, otherwise, the Run() loop can be optimized into an infinite loop that checks the value of m_disposed only once, at the start.

For disposal, you might want to use the completion capability of BlockingQueue. That way, trying to schedule a new Task after the scheduler has been disposed will throw, which I think is the correct behavior.

If you do this, you can also rewrite Run() to use GetConsumingEnumerable(), and remove m_disposed altogether.

Code Snippets

var factory = new TaskFactory(new SequentialScheduler());

factory.StartNew(
    () =>
    {
        factory.StartNew(() => { }).Wait();
    });

Context

StackExchange Code Review Q#43814, answer score: 7

Revisions (0)

No revisions yet.