HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

Loop for periodic processing in a background thread

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
backgroundloopperiodicthreadforprocessing

Problem

Occasionally I need to implement periodic checks in a loop in a background thread, a typical example being asynchronous processing using a message queue. While it’s not terribly complicated, I wonder what simpler, more elegant solutions exist.

I post my solution, and I’m eager to hear any constructive criticism!

The requirements in more details are these: I need:

  • A loop that does periodic checks and actions if needed, and blocks


in-between

  • It runs in a background thread (since it does an active


loop)

  • It can be stopped properly (e.g. the loop exits and the thread


freed)

  • The blocking wait can be awaken if needed (e.g. new entry in


queue, or exit)

  • Thread safety



My solution:

`using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading;

namespace BgLoopExample
{
///
/// Perform asynchronous processing in a background thread.
/// Handle thread creation, exit, thread safety.
///
public class BgLoopExample : IDisposable
{
///
/// Queue for the incoming messages.
///
private Queue myQueue;

///
/// Task for background processing.
///
private Task myBgLoopTask;

///
/// Event for awakening the processing loop, used when new entry is added to the queue or exit requested.
///
private AutoResetEvent myQueueEvent;

///
/// Flag to signal stop for the parallel sender thread.
///
private bool myStopTaskFlag;

///
/// Flag to store if this class is disposing.
///
private bool myDisposed;

///
/// Class initializer, start the background thread.
///
public void Init()
{
myQueue = new Queue();
myQueueEvent = new AutoResetEvent(false);
myStopTaskFlag = false;
myDisposed = false;

StartSenderTask();
}

///
//

Solution

So the first thing is that you'll want to have a more generalized queue. You don't want to have to create a different type of queue for every situation. Write it once in such a way that each time you need it you can provide it with the operations to perform and it will perform them, without needing to know anything about those operations.

The thread pool is specifically designed for short lived operations. You're providing an operation to the thread pool that is going to be long running, and blocking it when there is nothing to do. Don't do that. Rather, each time you have an operation to run, ask the thread pool to run it, and simply don't have any thread doing anything whenever you don't have any work to do. This means that you don't have a thread pool thread sitting there doing nothing when you have no work, and it means the thread pool will be able to more effectively manage its work as you'll be using it in a way that's in line with its expected usage. Doing this also means that you have no need to stop it; since it's just doing nothing when you don't have any work to it, all you need to do to clean it up is stop giving it more work and there is no longer anything to clean up.

Doing this is actually rather straightforward. You keep a Task representing the "last" operation in the queue, and then each time you go to add a new operation to the queue you have it await that task and then set "itself" as "the last item in the queue".

public class TaskQueue
{
    private Task previous = Task.FromResult(false);
    private object key = new object();

    public Task Enqueue(Func> taskGenerator)
    {
        lock (key)
        {
            var next = AddContinuation(taskGenerator);
            previous = next;
            return next;
        }
    }

    public Task Enqueue(Func function)
    {
        return Enqueue(() => Task.Run(function));
    }

    public Task Enqueue(Func taskGenerator)
    {
        lock (key)
        {
            var next = AddContinuation(taskGenerator);
            previous = next;
            return next;
        }
    }

    public Task Enqueue(Action action)
    {
        return Enqueue(() => Task.Run(action));
    }

    private async Task AddContinuation(Func> taskGenerator)
    {
        await previous
            .ContinueWith(t => { }); //ignore errors of previous task here
        return await taskGenerator();
    }

    private async Task AddContinuation(Func taskGenerator)
    {
        await previous
            .ContinueWith(t => { }); //ignore errors of previous task here
        await taskGenerator();
    }
}


So we still need a lock here, to make sure that two threads don't end up adding continuations to the same Task, and end up running in parallel. There are also a number of trivial variations that we can make for each overload to add useful features. First, there are generic and non-generic versions, so that the operation itself can either compute a value or not. Additionally, we can trivially create overloads for delegates to be executed in a thread pool thread, as a convenience, but more importantly, we can provide operations that return Task or Task, which means the queue can support inherently asynchronous operations rather than just synchronous operations that need to be executed in a thread pool thread.

Another highly useful feature of this implementation is that every time you add an operation to the queue you're given a Task (or Task) representing that operation, so you can tell when it finished, as well as being given information about whether it completed successfully or errored.

Code Snippets

public class TaskQueue
{
    private Task previous = Task.FromResult(false);
    private object key = new object();

    public Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        lock (key)
        {
            var next = AddContinuation(taskGenerator);
            previous = next;
            return next;
        }
    }

    public Task<T> Enqueue<T>(Func<T> function)
    {
        return Enqueue(() => Task.Run(function));
    }

    public Task Enqueue(Func<Task> taskGenerator)
    {
        lock (key)
        {
            var next = AddContinuation(taskGenerator);
            previous = next;
            return next;
        }
    }

    public Task Enqueue(Action action)
    {
        return Enqueue(() => Task.Run(action));
    }

    private async Task<T> AddContinuation<T>(Func<Task<T>> taskGenerator)
    {
        await previous
            .ContinueWith(t => { }); //ignore errors of previous task here
        return await taskGenerator();
    }

    private async Task AddContinuation(Func<Task> taskGenerator)
    {
        await previous
            .ContinueWith(t => { }); //ignore errors of previous task here
        await taskGenerator();
    }
}

Context

StackExchange Code Review Q#151562, answer score: 7

Revisions (0)

No revisions yet.