HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

Thread-Safe Task Queue Implementation

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
threadsafeimplementationqueuetask

Problem

Purpose:

This class exists to ensure actions created from other threads are executed in a synchronous manner. Additionally, it provides functionality to add actions that run after a delay.

Code: (Note that the comments may be inconsistent and inaccurate in places.)

```
class ActionQueue
{
// We don't lock on the actual objects, just in case (for whatever bad reason) we ever pass references or expose them publicly.
private readonly object _actionsLock = new object();
private readonly object _scheduledActionsLock = new object();

private List actions;
private List scheduledActions;
private ManualResetEvent hasActions;

public ActionQueue()
{
this.actions = new List();
this.scheduledActions = new List();
this.hasActions = new ManualResetEvent(true);
}

///
/// Adds an action to the queue.
///
/// The action to be executed.
/// If true, this action will be pushed to the front of the queue.
public void Enqueue(System.Action action, bool immediate = false)
{
lock (this._actionsLock)
{
if (immediate)
{
this.actions.Insert(0, new Tasking.Action(action));
}
else
{
this.actions.Add(new Tasking.Action(action));
}

this.hasActions.Set(); // The actions queue was previously empty, so we must signal that it is no longer.
}
}

///
/// Schedule an action to be added to the task queue after a specified delay.
///
/// The delay after which this action will be added to the queue.
/// The action to will be executed.
/// Optional. If true, this action will be added to the front of the queue instead of the back.
///
public ScheduledAction Schedule(int delay, System.Action action, bool immediate = false)
{
ScheduledAction scheduledAction = new ScheduledAction(delay, action, immediate);

lock (this._sch

Solution

The .NET framework provides you with tons of concurrent solutions that help you reduce the amount of locks and this way the chance of deadlocks. I've revised your code and found the already present implementations in the framework suitable for your needs.

I sincerely hope you don't mind that I have reworked your solution to be lock-free (and also solved your issue of naming the classes). You can find the reworked class below my explanation.

First of all, since we are working in a multi-threaded environment, we need an intermediate solution to synchronize the workflow of the multiple threads. There can be multiple worker threads, that pass the actions to the one processor thread.

Our intermediate layer consists of two ConcurrentQueue<> and a ConcurrentBag<>. The reason for the two ConcurrentQueue<> variables is that I have separated the primary actions from the other ones. Instead of inserting the action to the front of the queue, I insert it to a dedicated queue. When the dedicated priority queue has an action waiting to be processed, it is going to interrupt the progression of the other queue (can be also interrupted in the middle of the cycle without any problems). The ConcurrentBag<> is responsible for the scheduled actions. Their order is not necessary to be kept, since they won't processed until the timeout expires. We check in each cycle and update the time left. When it has timed out, we pass it to the appropriate queue.

(code untested)

public class ScheduledSynchronizedAction
{
    public Action Action { get; set; }
    public int DelayMilliseconds { get; set; }
    public bool IsPriority { get; set; }
}

public class SynchronizedActionQueue
{
    private ConcurrentQueue SynchronizedActions { get; set; }
    private ConcurrentQueue PrioritySynchronizedActions { get; set; }
    private ConcurrentBag ScheduledSynchronizedActions { get; set; }

    public SynchronizedActionQueue()
    {
        SynchronizedActions = new ConcurrentQueue();
        PrioritySynchronizedActions = new ConcurrentQueue();
        ScheduledSynchronizedActions = new ConcurrentBag();
    }

    public void Enqueue(Action Action, bool IsPriority = false)
    {
        if (IsPriority)
            PrioritySynchronizedActions.Enqueue(Action);
        else
            SynchronizedActions.Enqueue(Action);
    }

    public void Schedule(Action Action, int DelayMilliseconds, bool IsPriority = false) => ScheduledSynchronizedActions.Add(new ScheduledSynchronizedAction { Action = Action, DelayMilliseconds = DelayMilliseconds, IsPriority = IsPriority });

    public void Update(int Diff)
    {
        Action DummyAction;
        ScheduledSynchronizedAction DummyScheduledAction;
        while (!SynchronizedActions.IsEmpty || !PrioritySynchronizedActions.IsEmpty)
        {
            while (!PrioritySynchronizedActions.IsEmpty)
                if (PrioritySynchronizedActions.TryDequeue(out DummyAction))
                    DummyAction.Invoke();

            if (SynchronizedActions.TryDequeue(out DummyAction))
                DummyAction.Invoke();
        }        

        Queue TempScheduledSynchronizedActions = new Queue();
        while (!ScheduledSynchronizedActions.IsEmpty)
        {
            if (ScheduledSynchronizedActions.TryTake(out DummyScheduledAction))
            {
                DummyScheduledAction.DelayMilliseconds -= Diff;

                if (DummyScheduledAction.DelayMilliseconds  0)
            ScheduledSynchronizedActions.Add(TempScheduledSynchronizedActions.Dequeue());
    }

    public void Run()
    {
        while (true)
        {
            Update(50);
            Thread.Sleep(50); // todo: implement this properly
        }
    }
}

Code Snippets

public class ScheduledSynchronizedAction
{
    public Action Action { get; set; }
    public int DelayMilliseconds { get; set; }
    public bool IsPriority { get; set; }
}

public class SynchronizedActionQueue
{
    private ConcurrentQueue<Action> SynchronizedActions { get; set; }
    private ConcurrentQueue<Action> PrioritySynchronizedActions { get; set; }
    private ConcurrentBag<ScheduledSynchronizedAction> ScheduledSynchronizedActions { get; set; }

    public SynchronizedActionQueue()
    {
        SynchronizedActions = new ConcurrentQueue<Action>();
        PrioritySynchronizedActions = new ConcurrentQueue<Action>();
        ScheduledSynchronizedActions = new ConcurrentBag<ScheduledSynchronizedAction>();
    }

    public void Enqueue(Action Action, bool IsPriority = false)
    {
        if (IsPriority)
            PrioritySynchronizedActions.Enqueue(Action);
        else
            SynchronizedActions.Enqueue(Action);
    }

    public void Schedule(Action Action, int DelayMilliseconds, bool IsPriority = false) => ScheduledSynchronizedActions.Add(new ScheduledSynchronizedAction { Action = Action, DelayMilliseconds = DelayMilliseconds, IsPriority = IsPriority });

    public void Update(int Diff)
    {
        Action DummyAction;
        ScheduledSynchronizedAction DummyScheduledAction;
        while (!SynchronizedActions.IsEmpty || !PrioritySynchronizedActions.IsEmpty)
        {
            while (!PrioritySynchronizedActions.IsEmpty)
                if (PrioritySynchronizedActions.TryDequeue(out DummyAction))
                    DummyAction.Invoke();

            if (SynchronizedActions.TryDequeue(out DummyAction))
                DummyAction.Invoke();
        }        

        Queue<ScheduledSynchronizedAction> TempScheduledSynchronizedActions = new Queue<ScheduledSynchronizedAction>();
        while (!ScheduledSynchronizedActions.IsEmpty)
        {
            if (ScheduledSynchronizedActions.TryTake(out DummyScheduledAction))
            {
                DummyScheduledAction.DelayMilliseconds -= Diff;

                if (DummyScheduledAction.DelayMilliseconds <= 0)
                    Enqueue(DummyScheduledAction.Action, DummyScheduledAction.IsPriority);
                else
                    TempScheduledSynchronizedActions.Enqueue(DummyScheduledAction);
            }
        }

        while (TempScheduledSynchronizedActions.Count > 0)
            ScheduledSynchronizedActions.Add(TempScheduledSynchronizedActions.Dequeue());
    }

    public void Run()
    {
        while (true)
        {
            Update(50);
            Thread.Sleep(50); // todo: implement this properly
        }
    }
}

Context

StackExchange Code Review Q#157909, answer score: 2

Revisions (0)

No revisions yet.