diff options
Diffstat (limited to 'std/async/pool.ts')
-rw-r--r-- | std/async/pool.ts | 50 |
1 files changed, 36 insertions, 14 deletions
diff --git a/std/async/pool.ts b/std/async/pool.ts index 8aeb2671d..0b87f24ac 100644 --- a/std/async/pool.ts +++ b/std/async/pool.ts @@ -4,8 +4,13 @@ * pooledMap transforms values from an (async) iterable into another async * iterable. The transforms are done concurrently, with a max concurrency * defined by the poolLimit. - * - * @param poolLimit The maximum count of items being processed concurrently. + * + * If an error is thrown from `iterableFn`, no new transformations will begin. + * All currently executing transformations are allowed to finish and still + * yielded on success. After that, the rejections among them are gathered and + * thrown by the iterator in an `AggregateError`. + * + * @param poolLimit The maximum count of items being processed concurrently. * @param array The input array for mapping. * @param iteratorFn The function to call for every item of the array. */ @@ -27,20 +32,37 @@ export function pooledMap<T, R>( (async (): Promise<void> => { const writer = res.writable.getWriter(); const executing: Array<Promise<unknown>> = []; - for await (const item of array) { - const p = Promise.resolve().then(() => iteratorFn(item)); - writer.write(p); - const e: Promise<unknown> = p.then(() => - executing.splice(executing.indexOf(e), 1) - ); - executing.push(e); - if (executing.length >= poolLimit) { - await Promise.race(executing); + try { + for await (const item of array) { + const p = Promise.resolve().then(() => iteratorFn(item)); + // Only write on success. If we `writer.write()` a rejected promise, + // that will end the iteration. We don't want that yet. Instead let it + // fail the race, taking us to the catch block where all currently + // executing jobs are allowed to finish and all rejections among them + // can be reported together. + p.then((v) => writer.write(Promise.resolve(v))).catch(() => {}); + const e: Promise<unknown> = p.then(() => + executing.splice(executing.indexOf(e), 1) + ); + executing.push(e); + if (executing.length >= poolLimit) { + await Promise.race(executing); + } } + // Wait until all ongoing events have processed, then close the writer. + await Promise.all(executing); + writer.close(); + } catch { + const errors = []; + for (const result of await Promise.allSettled(executing)) { + if (result.status == "rejected") { + errors.push(result.reason); + } + } + writer.write(Promise.reject( + new AggregateError(errors, "Threw while mapping."), + )).catch(() => {}); } - // Wait until all ongoing events have processed, then close the writer. - await Promise.all(executing); - writer.close(); })(); return res.readable[Symbol.asyncIterator](); } |