patterncsharpMinor
Parallel foreach with configurable level of concurrency
Viewed 0 times
configurablelevelwithconcurrencyparallelforeach
Problem
The purpose of this code is to let me loop over 100 items (up to
I include a timeout on the
MAX_CONCURRENT at a time), performing some action on them, and then return only once all items have been processed: /// Generic method to perform an action or set of actions
/// in parallel on each item in a collection of items, returning
/// only when all actions have been completed.
/// The element type
/// A collection of elements, each of which to
/// perform the action on.
/// The action to perform on each element. The
/// action should of course be thread safe.
/// The maximum number of concurrent actions.
public static void PerformActionsInParallel(IEnumerable elements, Action action)
{
// Semaphore limiting the number of parallel requests
Semaphore limit = new Semaphore(MAX_CONCURRENT, MAX_CONCURRENT);
// Count of the number of remaining threads to be completed
int remaining = 0;
// Signal to notify the main thread when a worker is done
AutoResetEvent onComplete = new AutoResetEvent(false);
foreach (T element in elements)
{
Interlocked.Increment(ref remaining);
limit.WaitOne();
new Thread(() =>
{
try
{
action(element);
}
catch (Exception ex)
{
Console.WriteLine("Error performing concurrent action: " + ex);
}
finally
{
Interlocked.Decrement(ref remaining);
limit.Release();
onComplete.Set();
}
}).Start();
}
// Wait for all requests to complete
while (remaining > 0)
onComplete.WaitOne(10); // Slightly better than Thread.Sleep(10)
}I include a timeout on the
WaitOne() before checking remaining again to protect against the rare case where the last outstanding thread decrements 'remaining' and then signals completion between the main thread checking 'remaining' and waiting for the next completion signal, whSolution
I've made use of a
CountdownEvent signal to avoid the use of the remaining integer and avoid the busy waiting involved in polling it with the unreliable AutoResetEvent onComplete:public static void PerformActionsInParallel(IEnumerable elements, Action action)
{
int threads = MaxConcurrent ?? DefaultMaxConcurrentRequests;
// Ensure elements is only enumerated once.
elements = elements as T[] ?? elements.ToArray();
// Semaphore limiting the number of parallel requests
Semaphore limit = new Semaphore(MAX_CONCURRENT, MAX_CONCURRENT);
// Count of the number of remaining threads to be completed
CountdownEvent remaining = new CountdownEvent(elements.Count());
foreach (T element in elements)
{
limit.WaitOne();
new Thread(() =>
{
try
{
action(element);
}
catch (Exception ex)
{
Console.WriteLine("Error performing concurrent action: " + ex);
}
finally
{
remaining.Signal();
limit.Release();
}
}).Start();
}
// Wait for all requests to complete
remaining.Wait();
}Code Snippets
public static void PerformActionsInParallel<T>(IEnumerable<T> elements, Action<T> action)
{
int threads = MaxConcurrent ?? DefaultMaxConcurrentRequests;
// Ensure elements is only enumerated once.
elements = elements as T[] ?? elements.ToArray();
// Semaphore limiting the number of parallel requests
Semaphore limit = new Semaphore(MAX_CONCURRENT, MAX_CONCURRENT);
// Count of the number of remaining threads to be completed
CountdownEvent remaining = new CountdownEvent(elements.Count());
foreach (T element in elements)
{
limit.WaitOne();
new Thread(() =>
{
try
{
action(element);
}
catch (Exception ex)
{
Console.WriteLine("Error performing concurrent action: " + ex);
}
finally
{
remaining.Signal();
limit.Release();
}
}).Start();
}
// Wait for all requests to complete
remaining.Wait();
}Context
StackExchange Code Review Q#110936, answer score: 3
Revisions (0)
No revisions yet.