HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

Code snippet for a method dealing with IDs from multiple threads

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
snippetmethodwiththreadsidsdealingformultiplecodefrom

Problem

I have multiple threads calling a method, passing in an ID and value.

I have two constraints I need to place on this method:

  • Only one of the same ID can be processed at a time. Additional threads with the same ID must wait.



  • Subsequent calls with the same ID must be processed in the order they were received.



Here's an Example.

-
Method is called by four different threads with the parameters

  • ID 1 Value a



  • ID 2 Value b



  • ID 1 Value c



  • ID 1 Value d



-
Program should be able to process [ID 1 Value a] and [ID 2 Value b] immediately, * while [ID 1 Value c] and [ID 1 Value d] wait for [ID 1 Value a] to finish.

  • Once [ID 1 Value a] finishes, then the method runs for [ID 1 Value c] while [ID 1 Value d] waits.



  • When [ID 1 Value c] finishes the method runs for [ID 1 Value d].



  • If the method wrote the value to the console then I'd expect the output to be either a, b, c, d or b, a, c, d.



I've had a quick go at it but it's messy. I'm writing this post in hopes of getting an improved and abstracted code snippet for doing this.

Here is my attempt:

```
static object _lockObj = new object();
static Dictionary> runningIds = new Dictionary>();

public async Task TheMethod(int id, string val)
{
ManualResetEvent enquedResetEvent = null;
lock (_lockObj)
{
if (runningIds.ContainsKey(id))
{
enquedResetEvent = new ManualResetEvent(false);
runningIds[id].Enqueue(enquedResetEvent);
}
else
{
runningIds.Add(id, new Queue());
}
}
if (enquedResetEvent != null)
enquedResetEvent.WaitOne();
try
{
// DO LONG RUNNING TASK
await Task.Delay(3000);
Console.WriteLine(id + " " + val + " at time " + DateTime.Now.ToString("m:ss"));
}
finally
{
lock (_lockObj)
{
if (runningIds[id].Count > 0)
{
var dequedResetEvent = runningIds[id].Dequeue();
dequedRese

Solution

Here are my thoughts on the code that you provided:

  • You should mark your lockObj and runningIds as readonly



  • You are using the presence of the queue in the dictionary as a signal that another thread is currently working on specific ID which is not quite obvious



  • You mix non-blocking (await) approach along with blocking (ManualResetEvent.WaitOne). It would be better to stick to one of them (preferably non-blocking).



  • If you change the approach from "notify the guy after me that he can start working" to "I'm done", then you would be able to capture only the last "guy" against specific ID. "I'm done" is easily modelled by Task.



As a result, my first refactoring of your code looks like this:

private static readonly object _lockObj = new object();
private static readonly Dictionary _runningIds = new Dictionary();

public async Task TheMethod(int id, string val)
{
    //Will be used to signal others that the work has finished
    var completionSource = new TaskCompletionSource();

    Task currentTask;

    lock (_lockObj)
    {
        _runningIds.TryGetValue(id, out currentTask);
        _runningIds[id] = completionSource.Task;
    }

    if (currentTask != null)
        await currentTask; 

    // DO LONG RUNNING TASK
    await Task.Delay(3000);
    Console.WriteLine(id + " " + val + " at time " + DateTime.Now.ToString("m:ss"));

    completionSource.SetResult(true);

    //cleanup
    lock (_lockObj)
    {
        if (_runningIds.TryGetValue(id, out currentTask) && currentTask == completionSource.Task)
            _runningIds.Remove(id);
    }
}


Second take was mostly as an exercise to get rid of lock at all by using the ConcurrentDictionary.

private static readonly ConcurrentDictionary _runningIds = new ConcurrentDictionary();

private static Task RegisterAsLastScheduledTask(int id, Task task)
{
    Task previousTask = null;

    //Either adding a new task to the dictionary, or capturing and replacing previous task
    while (!_runningIds.TryAdd(id, task) &&
        !(_runningIds.TryGetValue(id, out previousTask) && _runningIds.TryUpdate(id, task, previousTask)))
    {
        previousTask = null;
    }

    return previousTask;
}

public async Task TheMethod(int id, string val)
{
    //Will be used to signal others that the work has finished
    var completionSource = new TaskCompletionSource();
    Task previousTask = RegisterAsLastScheduledTask(id, completionSource.Task);

    if (previousTask != null)
        await previousTask;

    // DO LONG RUNNING TASK
    await Task.Delay(3000);
    Console.WriteLine(id + " " + val + " at time " + DateTime.Now.ToString("m:ss"));

    completionSource.SetResult(true);

    //"hack" to atomically remove the element only if both key and value match, see http://blogs.msdn.com/b/pfxteam/archive/2011/04/02/10149222.aspx
    ((ICollection>)_runningIds).Remove(new KeyValuePair(id, completionSource.Task));
}

Code Snippets

private static readonly object _lockObj = new object();
private static readonly Dictionary<int, Task> _runningIds = new Dictionary<int, Task>();

public async Task TheMethod(int id, string val)
{
    //Will be used to signal others that the work has finished
    var completionSource = new TaskCompletionSource<bool>();

    Task currentTask;

    lock (_lockObj)
    {
        _runningIds.TryGetValue(id, out currentTask);
        _runningIds[id] = completionSource.Task;
    }

    if (currentTask != null)
        await currentTask; 

    // DO LONG RUNNING TASK
    await Task.Delay(3000);
    Console.WriteLine(id + " " + val + " at time " + DateTime.Now.ToString("m:ss"));

    completionSource.SetResult(true);

    //cleanup
    lock (_lockObj)
    {
        if (_runningIds.TryGetValue(id, out currentTask) && currentTask == completionSource.Task)
            _runningIds.Remove(id);
    }
}
private static readonly ConcurrentDictionary<int, Task> _runningIds = new ConcurrentDictionary<int, Task>();

private static Task RegisterAsLastScheduledTask(int id, Task task)
{
    Task previousTask = null;

    //Either adding a new task to the dictionary, or capturing and replacing previous task
    while (!_runningIds.TryAdd(id, task) &&
        !(_runningIds.TryGetValue(id, out previousTask) && _runningIds.TryUpdate(id, task, previousTask)))
    {
        previousTask = null;
    }

    return previousTask;
}

public async Task TheMethod(int id, string val)
{
    //Will be used to signal others that the work has finished
    var completionSource = new TaskCompletionSource<bool>();
    Task previousTask = RegisterAsLastScheduledTask(id, completionSource.Task);

    if (previousTask != null)
        await previousTask;

    // DO LONG RUNNING TASK
    await Task.Delay(3000);
    Console.WriteLine(id + " " + val + " at time " + DateTime.Now.ToString("m:ss"));

    completionSource.SetResult(true);

    //"hack" to atomically remove the element only if both key and value match, see http://blogs.msdn.com/b/pfxteam/archive/2011/04/02/10149222.aspx
    ((ICollection<KeyValuePair<int, Task>>)_runningIds).Remove(new KeyValuePair<int, Task>(id, completionSource.Task));
}

Context

StackExchange Code Review Q#80208, answer score: 4

Revisions (0)

No revisions yet.