patterncsharpMinor
TaskScheduler that uses a dedicated thread
Viewed 0 times
usesdedicatedtaskschedulerthreadthat
Problem
I'm trying to implement a
I've played with it a bit and it seems to work well. I have some lingering questions however:
TaskScheduler that runs all tasks in the order they are submitted, on a single dedicated thread. Here's my implementation, using a BlockingCollection:class SequentialScheduler : TaskScheduler, IDisposable {
readonly BlockingCollection m_taskQueue = new BlockingCollection();
readonly Thread m_thread;
bool m_disposed;
public SequentialScheduler() {
m_thread = new Thread(Run);
m_thread.Start();
}
public void Dispose() {
m_disposed = true;
}
void Run() {
while (!m_disposed) {
var task = m_taskQueue.Take();
Debug.Assert(TryExecuteTask(task));
}
}
protected override IEnumerable GetScheduledTasks() {
return m_taskQueue;
}
protected override void QueueTask(Task task) {
m_taskQueue.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) {
if (Thread.CurrentThread == m_thread) {
return TryExecuteTask(task);
}
return false;
}
}I've played with it a bit and it seems to work well. I have some lingering questions however:
- As you can tell by the Debug.Assert, I'm not sure how
TryExecuteTaskcould return false and I'm just assuming that given my current implementation, it won't. Can that actually happen and what I should I do if it does?
- I'm not sure my implementation of
TryExecuteTaskInlinemakes sense as it doesn't get called in my tests. If I understand correctly, this method should run the task synchronously if possible; hence why I'm checking if the current thread is the scheduler's dedicated thread.
- Also, my implementation of
Disposedoesn't wait for the current task to complete, and will cause the rest of the tasks in the queue to just wait there forever, but short of making it blocking and wait for the queue to empty, I don't see how to do that any differently. I just need a way to release the thr
Solution
TryExecuteTaskInline is used in what is called "task inlining": basically, when you call Wait() on a Task that didn't start executing yet, it might be executed on the current thread. A simple way to test that is:var factory = new TaskFactory(new SequentialScheduler());
factory.StartNew(
() =>
{
factory.StartNew(() => { }).Wait();
});For more information, see Stephen Toub's article Task.Wait and “Inlining”.
But this all means that a
Task might be executed outside of your Run() loop, so the call to TryExecuteTask() there might return false. Because of that, you should simply ignore the return value there (just like the official example scheduler does, in its NotifyThreadPoolOfPendingWork()).Another option would be to remove inlined
Tasks from the queue, but there is no simple way to do that for BlockingCollection.I think that
m_disposed should be volatile, otherwise, the Run() loop can be optimized into an infinite loop that checks the value of m_disposed only once, at the start.For disposal, you might want to use the completion capability of
BlockingQueue. That way, trying to schedule a new Task after the scheduler has been disposed will throw, which I think is the correct behavior.If you do this, you can also rewrite
Run() to use GetConsumingEnumerable(), and remove m_disposed altogether.Code Snippets
var factory = new TaskFactory(new SequentialScheduler());
factory.StartNew(
() =>
{
factory.StartNew(() => { }).Wait();
});Context
StackExchange Code Review Q#43814, answer score: 7
Revisions (0)
No revisions yet.