diff options
author | Luca Casonato <lucacasonato@yahoo.com> | 2020-07-29 02:44:34 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-29 02:44:34 +0200 |
commit | 1b60840f286bc0203a3bd2900f67557a8ff2c3f6 (patch) | |
tree | dc11c98ca3eae7ee35697dc556f50acab7802b2d /std/async/pool.ts | |
parent | c6917133942c791480cd2aec7297b2a2ee623059 (diff) |
feat(std/async): add pooledMap utility (#6898)
Diffstat (limited to 'std/async/pool.ts')
-rw-r--r-- | std/async/pool.ts | 46 |
1 files changed, 46 insertions, 0 deletions
diff --git a/std/async/pool.ts b/std/async/pool.ts new file mode 100644 index 000000000..fc8517041 --- /dev/null +++ b/std/async/pool.ts @@ -0,0 +1,46 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +/** + * pooledMap transforms values from an (async) iterable into another async + * iterable. The transforms are done concurrently, with a max concurrency + * defined by the poolLimit. + * + * @param poolLimit The maximum count of items being processed concurrently. + * @param array The input array for mapping. + * @param iteratorFn The function to call for every item of the array. + */ +export function pooledMap<T, R>( + poolLimit: number, + array: Iterable<T> | AsyncIterable<T>, + iteratorFn: (data: T) => Promise<R>, +): AsyncIterableIterator<R> { + // Create the async iterable that is returned from this function. + const res = new TransformStream<Promise<R>, R>({ + async transform( + p: Promise<R>, + controller: TransformStreamDefaultController<R>, + ): Promise<void> { + controller.enqueue(await p); + }, + }); + // Start processing items from the iterator + (async (): Promise<void> => { + const writer = res.writable.getWriter(); + const executing: Array<Promise<unknown>> = []; + for await (const item of array) { + const p = Promise.resolve().then(() => iteratorFn(item)); + writer.write(p); + const e: Promise<unknown> = p.then(() => + executing.splice(executing.indexOf(e), 1) + ); + executing.push(e); + if (executing.length >= poolLimit) { + await Promise.race(executing); + } + } + // Wait until all ongoing events have processed, then close the writer. + await Promise.all(executing); + writer.close(); + })(); + return res.readable.getIterator(); +} |