debugcsharpMinor
Concurrent/parallel ForEachAsync - proper handling of exceptions and cancellations
Viewed 0 times
handlingexceptionscancellationsandforeachasyncparallelproperconcurrent
Problem
I've created an asynchronous parallel
My assumptions and concerns (some contrary to original answer) are posted with each test case. Please feel free to correct any incorrect assumptions.
Test Harness In All Test Cases
Test Case 1 - Run
This seems to work as expected.
Test Case 2 - Inside
Inside the
ForEach mechanism so I could enumerate an enumerable by N degrees of concurrency and process an action for each item. Additionally, I have the option of stopping when first exception is encountered or processing the entire enumerable and returning all exceptions at the end. I'm trying to ensure that all my exception handling is being done correctly and I understand the flow correctly. My original code review can be found here (Original Answer). The code below incorporates suggestions from the original answer, but I'm not sure all the assumptions stated in that answer are happening. My assumptions and concerns (some contrary to original answer) are posted with each test case. Please feel free to correct any incorrect assumptions.
Test Harness In All Test Cases
// This really would be a CreateLinkedTokenSource that could be cancelled
// from the caller of this code or internally within the body of the ForEachAsync body
var internalCancel = new CancellationTokenSource();
try
{
// Simulate running something for 10 data batches...
await Enumerable.Range(0, 10)
.ForEachAsync(
async jobData =>
{
// body of code that might throw exceptions or set internalCancel.Cancel()...
internalCancel.Token.ThrowIfCancellationRequested();
Console.WriteLine( $"Task {jobKey}: FooAsync - Finish DataChunk {jobData}" );
},
new AsyncParallelOptions { MaxDegreeOfParallelism = 4, CancellationToken = internalCancel.Token }
);
}
catch (Exception ex)
{
Console.WriteLine( $"Task {jobKey}: FooAsync - Exception: {ex.GetType().ToString()}, internalCancel.Token.IsCancellationRequested: {internalCancel.Token.IsCancellationRequested}" );
throw;
}Test Case 1 - Run
body() to completion This seems to work as expected.
Test Case 2 - Inside
body() exception is thrown Inside the
body(), throw a `NotSupportedESolution
Test Case 2
Test Case 3
2(1). If you rethrow the exceptions from the continuation (and don't catch them in the outer
2(2). Correct.
Test Case 4
There is a race condition here. If one of the tasks hits the cancellation code quickly enough, one of the
More Comments
- Correct, but only if those exceptions are thrown asynchronously. See (3) below.
- Correct. When you do
await task.ContinueWith(...), you are no longer awaiting the original task, rather its continuation. The fact that the original task threw will only reflect in the task object passed to the continuation (e.g.t.Exception). You are allowed to observe the exception, but unless you throw anything from the continuation it will not be considered faulted or canceled.
- If you throw synchronously (e.g.
body = i => throw new Exception("foo")), you'll never get back a task and evaluation will stop beforeContinueWithis ever called, let alone awaited. However, since you're using an async delegate in your test harness (async jobData => {...}) that would never happen, as async delegates make sure aTaskis always returned (and when you throw an exception, it is attached to that task).
- It's because you don't throw from either
body(synchronously) or fromContinueWithso basically in the eyes of the TPL everything is completing successfully.
Test Case 3
- If that scenario concerns you, you could always do something like
while (true) {cts.Token.ThrowIfCancellationRequested(); if (!partition.MoveNext()) {break;} }
- Correct.
2(1). If you rethrow the exceptions from the continuation (and don't catch them in the outer
await task.ContinueWith) then it won't be guaranteed either way. This is because execution would stop only for the offending partition's specific task. And since the partitions are dynamic, the enumerable will keep getting processed in the other tasks, and that will continue until all elements are processed, because Task.WhenAll waits for all tasks to complete (even if some failed along the way). Of course if you throw as many exceptions as your degree of parallelism, all tasks will fault and execution will stop. But that's a pretty random condition, so I'd say for this approach to be reasonable you'd have to always trigger cancellation as well (not just when FailImmediately is true).2(2). Correct.
Test Case 4
There is a race condition here. If one of the tasks hits the cancellation code quickly enough, one of the
Task.Run calls will complete as canceled before ever running its task (once it started running a task, it can't magically cancel it, so the token is only relevant before it actually started it). In that case, cancellation will propagate to allDone, and a TaskCanceledException will be thrown at await allDone. To see this in action (in high probability), try increasing your range to say 1000 and your degree of parallelism to say 100. The other option of the race condition is that the tasks are launched so quickly that nothing gets actually cancelled (since, again, true cancellation is only possibe before task launch in your code, and everything was launched by the time cancellation was triggered), leading to the scenario you've described.More Comments
- You should still be able to do
cts = CancellationTokenSource.CreateLinkedTokenSource(parallelOptions.CancellationToken), don't see the need for the separation.
- I don't like your manual check of
t.IsCanceled, the user should specify cancelation via his token. Maybe he wanted to cancel a specific task but not the entire operation.
- It turns out
ConcurrentQueue(which is generally more lightweight) performs better in this use case, both in terms of run-time performance and memory. For proof: http://pastebin.com/ig47x6VV.
Context
StackExchange Code Review Q#145326, answer score: 7
Revisions (0)
No revisions yet.