HiveBrain v1.2.0
Get Started
← Back to all entries
debugcsharpMinor

Concurrent/parallel ForEachAsync - proper handling of exceptions and cancellations

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
handlingexceptionscancellationsandforeachasyncparallelproperconcurrent

Problem

I've created an asynchronous parallel ForEach mechanism so I could enumerate an enumerable by N degrees of concurrency and process an action for each item. Additionally, I have the option of stopping when first exception is encountered or processing the entire enumerable and returning all exceptions at the end. I'm trying to ensure that all my exception handling is being done correctly and I understand the flow correctly. My original code review can be found here (Original Answer). The code below incorporates suggestions from the original answer, but I'm not sure all the assumptions stated in that answer are happening.

My assumptions and concerns (some contrary to original answer) are posted with each test case. Please feel free to correct any incorrect assumptions.

Test Harness In All Test Cases

// This really would be a CreateLinkedTokenSource that could be cancelled
// from the caller of this code or internally within the body of the ForEachAsync body
var internalCancel = new CancellationTokenSource();

try
{
    // Simulate running something for 10 data batches...
    await Enumerable.Range(0, 10)
        .ForEachAsync(
            async jobData =>
            {
                // body of code that might throw exceptions or set internalCancel.Cancel()...

                internalCancel.Token.ThrowIfCancellationRequested();

                Console.WriteLine( $"Task {jobKey}: FooAsync - Finish DataChunk {jobData}" );
            },
            new AsyncParallelOptions { MaxDegreeOfParallelism = 4, CancellationToken = internalCancel.Token }
        );
}
catch (Exception ex)
{
    Console.WriteLine( $"Task {jobKey}: FooAsync - Exception: {ex.GetType().ToString()}, internalCancel.Token.IsCancellationRequested: {internalCancel.Token.IsCancellationRequested}" );
    throw;
}


Test Case 1 - Run body() to completion

This seems to work as expected.

Test Case 2 - Inside body() exception is thrown

Inside the body(), throw a `NotSupportedE

Solution

Test Case 2

  • Correct, but only if those exceptions are thrown asynchronously. See (3) below.



  • Correct. When you do await task.ContinueWith(...), you are no longer awaiting the original task, rather its continuation. The fact that the original task threw will only reflect in the task object passed to the continuation (e.g. t.Exception). You are allowed to observe the exception, but unless you throw anything from the continuation it will not be considered faulted or canceled.



  • If you throw synchronously (e.g. body = i => throw new Exception("foo")), you'll never get back a task and evaluation will stop before ContinueWith is ever called, let alone awaited. However, since you're using an async delegate in your test harness ( async jobData => {...}) that would never happen, as async delegates make sure a Task is always returned (and when you throw an exception, it is attached to that task).



  • It's because you don't throw from either body (synchronously) or from ContinueWith so basically in the eyes of the TPL everything is completing successfully.



Test Case 3

  • If that scenario concerns you, you could always do something like while (true) {cts.Token.ThrowIfCancellationRequested(); if (!partition.MoveNext()) {break;} }



  • Correct.


2(1). If you rethrow the exceptions from the continuation (and don't catch them in the outer await task.ContinueWith) then it won't be guaranteed either way. This is because execution would stop only for the offending partition's specific task. And since the partitions are dynamic, the enumerable will keep getting processed in the other tasks, and that will continue until all elements are processed, because Task.WhenAll waits for all tasks to complete (even if some failed along the way). Of course if you throw as many exceptions as your degree of parallelism, all tasks will fault and execution will stop. But that's a pretty random condition, so I'd say for this approach to be reasonable you'd have to always trigger cancellation as well (not just when FailImmediately is true).
2(2). Correct.

Test Case 4

There is a race condition here. If one of the tasks hits the cancellation code quickly enough, one of the Task.Run calls will complete as canceled before ever running its task (once it started running a task, it can't magically cancel it, so the token is only relevant before it actually started it). In that case, cancellation will propagate to allDone, and a TaskCanceledException will be thrown at await allDone. To see this in action (in high probability), try increasing your range to say 1000 and your degree of parallelism to say 100. The other option of the race condition is that the tasks are launched so quickly that nothing gets actually cancelled (since, again, true cancellation is only possibe before task launch in your code, and everything was launched by the time cancellation was triggered), leading to the scenario you've described.

More Comments

  • You should still be able to do cts = CancellationTokenSource.CreateLinkedTokenSource(parallelOptions.CancellationToken), don't see the need for the separation.



  • I don't like your manual check of t.IsCanceled, the user should specify cancelation via his token. Maybe he wanted to cancel a specific task but not the entire operation.



  • It turns out ConcurrentQueue (which is generally more lightweight) performs better in this use case, both in terms of run-time performance and memory. For proof: http://pastebin.com/ig47x6VV.

Context

StackExchange Code Review Q#145326, answer score: 7

Revisions (0)

No revisions yet.