diff options
author | Luca Casonato <lucacasonato@yahoo.com> | 2020-07-29 02:44:34 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-29 02:44:34 +0200 |
commit | 1b60840f286bc0203a3bd2900f67557a8ff2c3f6 (patch) | |
tree | dc11c98ca3eae7ee35697dc556f50acab7802b2d | |
parent | c6917133942c791480cd2aec7297b2a2ee623059 (diff) |
feat(std/async): add pooledMap utility (#6898)
-rw-r--r-- | std/async/mod.ts | 1 | ||||
-rw-r--r-- | std/async/pool.ts | 46 | ||||
-rw-r--r-- | std/async/pool_test.ts | 19 |
3 files changed, 66 insertions, 0 deletions
diff --git a/std/async/mod.ts b/std/async/mod.ts index 9efead91d..ad4ab5156 100644 --- a/std/async/mod.ts +++ b/std/async/mod.ts @@ -2,3 +2,4 @@ export * from "./deferred.ts"; export * from "./delay.ts"; export * from "./mux_async_iterator.ts"; +export * from "./pool.ts"; diff --git a/std/async/pool.ts b/std/async/pool.ts new file mode 100644 index 000000000..fc8517041 --- /dev/null +++ b/std/async/pool.ts @@ -0,0 +1,46 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +/** + * pooledMap transforms values from an (async) iterable into another async + * iterable. The transforms are done concurrently, with a max concurrency + * defined by the poolLimit. + * + * @param poolLimit The maximum count of items being processed concurrently. + * @param array The input array for mapping. + * @param iteratorFn The function to call for every item of the array. + */ +export function pooledMap<T, R>( + poolLimit: number, + array: Iterable<T> | AsyncIterable<T>, + iteratorFn: (data: T) => Promise<R>, +): AsyncIterableIterator<R> { + // Create the async iterable that is returned from this function. + const res = new TransformStream<Promise<R>, R>({ + async transform( + p: Promise<R>, + controller: TransformStreamDefaultController<R>, + ): Promise<void> { + controller.enqueue(await p); + }, + }); + // Start processing items from the iterator + (async (): Promise<void> => { + const writer = res.writable.getWriter(); + const executing: Array<Promise<unknown>> = []; + for await (const item of array) { + const p = Promise.resolve().then(() => iteratorFn(item)); + writer.write(p); + const e: Promise<unknown> = p.then(() => + executing.splice(executing.indexOf(e), 1) + ); + executing.push(e); + if (executing.length >= poolLimit) { + await Promise.race(executing); + } + } + // Wait until all ongoing events have processed, then close the writer. + await Promise.all(executing); + writer.close(); + })(); + return res.readable.getIterator(); +} diff --git a/std/async/pool_test.ts b/std/async/pool_test.ts new file mode 100644 index 000000000..1be4556be --- /dev/null +++ b/std/async/pool_test.ts @@ -0,0 +1,19 @@ +import { pooledMap } from "./pool.ts"; +import { assert } from "../testing/asserts.ts"; + +Deno.test("[async] pooledMap", async function (): Promise<void> { + const start = new Date(); + const results = pooledMap( + 2, + [1, 2, 3], + (i) => new Promise((r) => setTimeout(() => r(i), 1000)), + ); + for await (const value of results) { + console.log(value); + } + const diff = new Date().getTime() - start.getTime(); + assert(diff >= 2000); + assert(diff < 3000); +}); + +export {}; |