patterncsharpMinor
Code snippet for a method dealing with IDs from multiple threads
Viewed 0 times
snippetmethodwiththreadsidsdealingformultiplecodefrom
Problem
I have multiple threads calling a method, passing in an ID and value.
I have two constraints I need to place on this method:
Here's an Example.
-
Method is called by four different threads with the parameters
-
Program should be able to process [ID 1 Value a] and [ID 2 Value b] immediately, * while [ID 1 Value c] and [ID 1 Value d] wait for [ID 1 Value a] to finish.
I've had a quick go at it but it's messy. I'm writing this post in hopes of getting an improved and abstracted code snippet for doing this.
Here is my attempt:
```
static object _lockObj = new object();
static Dictionary> runningIds = new Dictionary>();
public async Task TheMethod(int id, string val)
{
ManualResetEvent enquedResetEvent = null;
lock (_lockObj)
{
if (runningIds.ContainsKey(id))
{
enquedResetEvent = new ManualResetEvent(false);
runningIds[id].Enqueue(enquedResetEvent);
}
else
{
runningIds.Add(id, new Queue());
}
}
if (enquedResetEvent != null)
enquedResetEvent.WaitOne();
try
{
// DO LONG RUNNING TASK
await Task.Delay(3000);
Console.WriteLine(id + " " + val + " at time " + DateTime.Now.ToString("m:ss"));
}
finally
{
lock (_lockObj)
{
if (runningIds[id].Count > 0)
{
var dequedResetEvent = runningIds[id].Dequeue();
dequedRese
I have two constraints I need to place on this method:
- Only one of the same ID can be processed at a time. Additional threads with the same ID must wait.
- Subsequent calls with the same ID must be processed in the order they were received.
Here's an Example.
-
Method is called by four different threads with the parameters
- ID 1 Value a
- ID 2 Value b
- ID 1 Value c
- ID 1 Value d
-
Program should be able to process [ID 1 Value a] and [ID 2 Value b] immediately, * while [ID 1 Value c] and [ID 1 Value d] wait for [ID 1 Value a] to finish.
- Once [ID 1 Value a] finishes, then the method runs for [ID 1 Value c] while [ID 1 Value d] waits.
- When [ID 1 Value c] finishes the method runs for [ID 1 Value d].
- If the method wrote the value to the console then I'd expect the output to be either
a, b, c, dorb, a, c, d.
I've had a quick go at it but it's messy. I'm writing this post in hopes of getting an improved and abstracted code snippet for doing this.
Here is my attempt:
```
static object _lockObj = new object();
static Dictionary> runningIds = new Dictionary>();
public async Task TheMethod(int id, string val)
{
ManualResetEvent enquedResetEvent = null;
lock (_lockObj)
{
if (runningIds.ContainsKey(id))
{
enquedResetEvent = new ManualResetEvent(false);
runningIds[id].Enqueue(enquedResetEvent);
}
else
{
runningIds.Add(id, new Queue());
}
}
if (enquedResetEvent != null)
enquedResetEvent.WaitOne();
try
{
// DO LONG RUNNING TASK
await Task.Delay(3000);
Console.WriteLine(id + " " + val + " at time " + DateTime.Now.ToString("m:ss"));
}
finally
{
lock (_lockObj)
{
if (runningIds[id].Count > 0)
{
var dequedResetEvent = runningIds[id].Dequeue();
dequedRese
Solution
Here are my thoughts on the code that you provided:
As a result, my first refactoring of your code looks like this:
Second take was mostly as an exercise to get rid of
- You should mark your
lockObjandrunningIdsas readonly
- You are using the presence of the queue in the dictionary as a signal that another thread is currently working on specific ID which is not quite obvious
- You mix non-blocking (await) approach along with blocking (ManualResetEvent.WaitOne). It would be better to stick to one of them (preferably non-blocking).
- If you change the approach from "notify the guy after me that he can start working" to "I'm done", then you would be able to capture only the last "guy" against specific ID. "I'm done" is easily modelled by
Task.
As a result, my first refactoring of your code looks like this:
private static readonly object _lockObj = new object();
private static readonly Dictionary _runningIds = new Dictionary();
public async Task TheMethod(int id, string val)
{
//Will be used to signal others that the work has finished
var completionSource = new TaskCompletionSource();
Task currentTask;
lock (_lockObj)
{
_runningIds.TryGetValue(id, out currentTask);
_runningIds[id] = completionSource.Task;
}
if (currentTask != null)
await currentTask;
// DO LONG RUNNING TASK
await Task.Delay(3000);
Console.WriteLine(id + " " + val + " at time " + DateTime.Now.ToString("m:ss"));
completionSource.SetResult(true);
//cleanup
lock (_lockObj)
{
if (_runningIds.TryGetValue(id, out currentTask) && currentTask == completionSource.Task)
_runningIds.Remove(id);
}
}Second take was mostly as an exercise to get rid of
lock at all by using the ConcurrentDictionary.private static readonly ConcurrentDictionary _runningIds = new ConcurrentDictionary();
private static Task RegisterAsLastScheduledTask(int id, Task task)
{
Task previousTask = null;
//Either adding a new task to the dictionary, or capturing and replacing previous task
while (!_runningIds.TryAdd(id, task) &&
!(_runningIds.TryGetValue(id, out previousTask) && _runningIds.TryUpdate(id, task, previousTask)))
{
previousTask = null;
}
return previousTask;
}
public async Task TheMethod(int id, string val)
{
//Will be used to signal others that the work has finished
var completionSource = new TaskCompletionSource();
Task previousTask = RegisterAsLastScheduledTask(id, completionSource.Task);
if (previousTask != null)
await previousTask;
// DO LONG RUNNING TASK
await Task.Delay(3000);
Console.WriteLine(id + " " + val + " at time " + DateTime.Now.ToString("m:ss"));
completionSource.SetResult(true);
//"hack" to atomically remove the element only if both key and value match, see http://blogs.msdn.com/b/pfxteam/archive/2011/04/02/10149222.aspx
((ICollection>)_runningIds).Remove(new KeyValuePair(id, completionSource.Task));
}Code Snippets
private static readonly object _lockObj = new object();
private static readonly Dictionary<int, Task> _runningIds = new Dictionary<int, Task>();
public async Task TheMethod(int id, string val)
{
//Will be used to signal others that the work has finished
var completionSource = new TaskCompletionSource<bool>();
Task currentTask;
lock (_lockObj)
{
_runningIds.TryGetValue(id, out currentTask);
_runningIds[id] = completionSource.Task;
}
if (currentTask != null)
await currentTask;
// DO LONG RUNNING TASK
await Task.Delay(3000);
Console.WriteLine(id + " " + val + " at time " + DateTime.Now.ToString("m:ss"));
completionSource.SetResult(true);
//cleanup
lock (_lockObj)
{
if (_runningIds.TryGetValue(id, out currentTask) && currentTask == completionSource.Task)
_runningIds.Remove(id);
}
}private static readonly ConcurrentDictionary<int, Task> _runningIds = new ConcurrentDictionary<int, Task>();
private static Task RegisterAsLastScheduledTask(int id, Task task)
{
Task previousTask = null;
//Either adding a new task to the dictionary, or capturing and replacing previous task
while (!_runningIds.TryAdd(id, task) &&
!(_runningIds.TryGetValue(id, out previousTask) && _runningIds.TryUpdate(id, task, previousTask)))
{
previousTask = null;
}
return previousTask;
}
public async Task TheMethod(int id, string val)
{
//Will be used to signal others that the work has finished
var completionSource = new TaskCompletionSource<bool>();
Task previousTask = RegisterAsLastScheduledTask(id, completionSource.Task);
if (previousTask != null)
await previousTask;
// DO LONG RUNNING TASK
await Task.Delay(3000);
Console.WriteLine(id + " " + val + " at time " + DateTime.Now.ToString("m:ss"));
completionSource.SetResult(true);
//"hack" to atomically remove the element only if both key and value match, see http://blogs.msdn.com/b/pfxteam/archive/2011/04/02/10149222.aspx
((ICollection<KeyValuePair<int, Task>>)_runningIds).Remove(new KeyValuePair<int, Task>(id, completionSource.Task));
}Context
StackExchange Code Review Q#80208, answer score: 4
Revisions (0)
No revisions yet.