HiveBrain v1.2.0
Get Started
← Back to all entries
patterntypescriptMinor

Parallel and Sequential array looping with async/await

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
loopingarraywithasyncawaitsequentialparalleland

Problem

Are these good implementations of parallel and sequential "extension" methods, and do they accomplish what I think they do?

My understanding is that the first one is parallel because the await applies to each invocation of the async lambda that is passed to map. So each invocation will block asynchronously and run in parallel.

In contrast, the await in the sequential function applies to the outer function, so each iteration of the for loop will block, and the next iteration will only occur once the previous one completes.

I based these implementations on the answer found at this SO question: https://stackoverflow.com/questions/37576685/using-async-await-with-a-foreach-loop

Array.prototype.forEachParallel = async function (
   func: (item: any) => Promise
): Promise {
   await Promise.all(this.map(async (item: any) => await func(item)));
};

Array.prototype.forEachSequential = async function (
   func: (item: any) => Promise
): Promise {
   for (let item of this)
      await func(item);
};


Example usage:

await someList.forEachParallel(item => this.SomeAsyncOperation(item));


A little additional code is:

// type declaration
interface Array {
   ...
   forEachParallel(func: (item: T) => Promise): Promise;
   forEachSequential(func: (item: T) => Promise): Promise;
}

// make sure nothing is already defined on these members
[
   ...
   Array.prototype.forEachParallel,
   Array.prototype.forEachSequential
].forEach(maybeMember => { if (maybeMember) throw new Error("prototype extension collision"); });

Solution

JavaScript's execution model


My understanding is that the first one is parallel because the await applies to each invocation of the async lambda that is passed to map.

This is not quite right since JavaScript (i.e. ECMAScript) is not multi-threaded.

Simply speaking, the VM executing the code has a queue from which it dequeues an action (e.g. "run this file") and executes it. This is done ad infinitum. It runs every dequeued element either to completion, to a yield (in generator functions), an await (in async functions) or an iterator.next() (on a generator function).

The only way your code can be interfered with (preempted) is using these statements. But even then no parallelism is created, the VM just switches the current queue item it is processing, e.g. to the caller of iterator.next() when the generator function just yielded a value.

I recommend you to read Concurrency model and Event Loop on MDN, which explains it in more detail and formally better than I can off the top of my head :)

The effects of the execution model

Endless loops cannot be avoided

The following code will print Processed 1 and then hang in the endless loop.

await [1, 2, 3].forEachParallel(async (element: number) => {
  if (element === 2) {
    while (true);
  }
  else {
    console.log('Processed ' + element);
  }
});


Your forEachParallel will call Array#map, which will iterate the array's elements one-by-one and apply (i.e. call, not await!) the passed function on it. It is now useful to mentally desugar your passed function from the aync/await syntactic sugar:

async (item: any) => await func(item)
//  (just return the Promise)
async (item: any) => func(item)
//  (an async function without awaits does not need to be async anymore)
(item: any) => func(item)


After desugaring, it is now apparent that this is the reason why Array#map will hang on the second iteration.

forEachParallel vs. forEachSequential


In contrast, the await in the sequential function applies to the outer function, so each iteration of the for loop will block, and the next iteration will only occur once the previous one completes.

Exactly!

forEachSequential guarantees that func will be called and completed in-order on the array elements. In contrast, forEachParallel guarantees as well that func will be called in-order on the array elements, but not that they complete in-order, too!

await [1, 2, 3].forEachParallel(async (element: number) => {
  console.log('Processed ' + element);
});


The code above might print:

Processed 3
Processed 1
Processed 2


Whereas the following code will always print 1, 2, 3 in-order:

await [1, 2, 3].forEachSequential(async (element: number) => {
  console.log('Processed ' + element);
});

// This will print 1, 2, 3 as well, necessitating 1000ms for *each*
// element.
await [1, 2, 3].forEachSequential(async (element: number) => {
  return new Promise(resolve => {
    setTimeout(() => {
      console.log('Processed ' + element);
      resolve();
    }, 1000);
  });
});


Quiz: forEachParallel with timeouts

Will the following code hang? If yes, where does it happen (in map, in await Promise.all(), in neither)? What does it print?

await [1, 2, 3].forEachParallel(async (element: number) => {
  setTimeout(() => {
    if (element === 2) {
      while (true);
    }
    else {
      console.log('Processed ' + element);
    }
  }, 0);
});



Processed 1 will be printed and then the program will hang.

setTimeout(..., 0) immediately adds the function to the end of the VM's queue (see the MDN link above). Since Array#map works sequentially on all elements, the VM's queue will look like

[current: forEachParallel] | setTimeout from first element | setTimeout from the second element | setTimeout from the third element

Actually, I am not really sure where it will hang. I suppose it might happen in forEachParallel (because await Promise.all(...)) will pave the way for the execution of the setTimeout callbacks (and thereby escpecially for the second one) or it might happen after forEachParallel has entirely completed its execution. Note that the promise returned by the function passed to it are resolved directly at the end through the implicit return, not in the setTimeout callback!

Design Suggestions

-
I would opt to not use async for the function you pass to Array#map:

await Promise.all(this.map(async (item: any) => await func(item)));


The first time I read the code, I thought: "Is this really the Array#map function which is passed an async function? But how should that work? Array#map does not deal with async functions, or does it?"

Put another way, I usually expect functions getting async functions passed as arguments to be aware of them and deal with them in a special way. Here, you are just using the fact that Array#map accepts a general and generic (element: T) => U function, where you happen to

Code Snippets

await [1, 2, 3].forEachParallel(async (element: number) => {
  if (element === 2) {
    while (true);
  }
  else {
    console.log('Processed ' + element);
  }
});
async (item: any) => await func(item)
// <=> (just return the Promise)
async (item: any) => func(item)
// <=> (an async function without awaits does not need to be async anymore)
(item: any) => func(item)
await [1, 2, 3].forEachParallel(async (element: number) => {
  console.log('Processed ' + element);
});
Processed 3
Processed 1
Processed 2
await [1, 2, 3].forEachSequential(async (element: number) => {
  console.log('Processed ' + element);
});

// This will print 1, 2, 3 as well, necessitating 1000ms for *each*
// element.
await [1, 2, 3].forEachSequential(async (element: number) => {
  return new Promise(resolve => {
    setTimeout(() => {
      console.log('Processed ' + element);
      resolve();
    }, 1000);
  });
});

Context

StackExchange Code Review Q#147455, answer score: 6

Revisions (0)

No revisions yet.