HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

Parallel foreach with configurable level of concurrency

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
configurablelevelwithconcurrencyparallelforeach

Problem

The purpose of this code is to let me loop over 100 items (up to MAX_CONCURRENT at a time), performing some action on them, and then return only once all items have been processed:

/// Generic method to perform an action or set of actions
/// in parallel on each item in a collection of items, returning
/// only when all actions have been completed.
/// The element type
/// A collection of elements, each of which to
/// perform the action on.
/// The action to perform on each element. The
/// action should of course be thread safe.
/// The maximum number of concurrent actions.
public static void PerformActionsInParallel(IEnumerable elements, Action action)
{
    // Semaphore limiting the number of parallel requests
    Semaphore limit = new Semaphore(MAX_CONCURRENT, MAX_CONCURRENT);
    // Count of the number of remaining threads to be completed
    int remaining = 0;
    // Signal to notify the main thread when a worker is done
    AutoResetEvent onComplete = new AutoResetEvent(false);

    foreach (T element in elements)
    {
        Interlocked.Increment(ref remaining);
        limit.WaitOne();
        new Thread(() =>
        {
            try
            {
                action(element);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Error performing concurrent action: " + ex);
            }
            finally
            {
                Interlocked.Decrement(ref remaining);
                limit.Release();
                onComplete.Set();
            }
        }).Start();
    }
    // Wait for all requests to complete
    while (remaining > 0)
        onComplete.WaitOne(10); // Slightly better than Thread.Sleep(10)
}


I include a timeout on the WaitOne() before checking remaining again to protect against the rare case where the last outstanding thread decrements 'remaining' and then signals completion between the main thread checking 'remaining' and waiting for the next completion signal, wh

Solution

I've made use of a CountdownEvent signal to avoid the use of the remaining integer and avoid the busy waiting involved in polling it with the unreliable AutoResetEvent onComplete:

public static void PerformActionsInParallel(IEnumerable elements, Action action)
{
    int threads = MaxConcurrent ?? DefaultMaxConcurrentRequests;
    // Ensure elements is only enumerated once.
    elements = elements as T[] ?? elements.ToArray();
    // Semaphore limiting the number of parallel requests
    Semaphore limit = new Semaphore(MAX_CONCURRENT, MAX_CONCURRENT);
    // Count of the number of remaining threads to be completed
    CountdownEvent remaining = new CountdownEvent(elements.Count());

    foreach (T element in elements)
    {
        limit.WaitOne();
        new Thread(() =>
        {
            try
            {
                action(element);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Error performing concurrent action: " + ex);
            }
            finally
            {
                remaining.Signal();
                limit.Release();
            }
        }).Start();
    }
    // Wait for all requests to complete
    remaining.Wait();
}

Code Snippets

public static void PerformActionsInParallel<T>(IEnumerable<T> elements, Action<T> action)
{
    int threads = MaxConcurrent ?? DefaultMaxConcurrentRequests;
    // Ensure elements is only enumerated once.
    elements = elements as T[] ?? elements.ToArray();
    // Semaphore limiting the number of parallel requests
    Semaphore limit = new Semaphore(MAX_CONCURRENT, MAX_CONCURRENT);
    // Count of the number of remaining threads to be completed
    CountdownEvent remaining = new CountdownEvent(elements.Count());

    foreach (T element in elements)
    {
        limit.WaitOne();
        new Thread(() =>
        {
            try
            {
                action(element);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Error performing concurrent action: " + ex);
            }
            finally
            {
                remaining.Signal();
                limit.Release();
            }
        }).Start();
    }
    // Wait for all requests to complete
    remaining.Wait();
}

Context

StackExchange Code Review Q#110936, answer score: 3

Revisions (0)

No revisions yet.