From 1b60840f286bc0203a3bd2900f67557a8ff2c3f6 Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Wed, 29 Jul 2020 02:44:34 +0200 Subject: feat(std/async): add pooledMap utility (#6898) --- std/async/mod.ts | 1 + std/async/pool.ts | 46 ++++++++++++++++++++++++++++++++++++++++++++++ std/async/pool_test.ts | 19 +++++++++++++++++++ 3 files changed, 66 insertions(+) create mode 100644 std/async/pool.ts create mode 100644 std/async/pool_test.ts (limited to 'std/async') diff --git a/std/async/mod.ts b/std/async/mod.ts index 9efead91d..ad4ab5156 100644 --- a/std/async/mod.ts +++ b/std/async/mod.ts @@ -2,3 +2,4 @@ export * from "./deferred.ts"; export * from "./delay.ts"; export * from "./mux_async_iterator.ts"; +export * from "./pool.ts"; diff --git a/std/async/pool.ts b/std/async/pool.ts new file mode 100644 index 000000000..fc8517041 --- /dev/null +++ b/std/async/pool.ts @@ -0,0 +1,46 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +/** + * pooledMap transforms values from an (async) iterable into another async + * iterable. The transforms are done concurrently, with a max concurrency + * defined by the poolLimit. + * + * @param poolLimit The maximum count of items being processed concurrently. + * @param array The input array for mapping. + * @param iteratorFn The function to call for every item of the array. + */ +export function pooledMap( + poolLimit: number, + array: Iterable | AsyncIterable, + iteratorFn: (data: T) => Promise, +): AsyncIterableIterator { + // Create the async iterable that is returned from this function. + const res = new TransformStream, R>({ + async transform( + p: Promise, + controller: TransformStreamDefaultController, + ): Promise { + controller.enqueue(await p); + }, + }); + // Start processing items from the iterator + (async (): Promise => { + const writer = res.writable.getWriter(); + const executing: Array> = []; + for await (const item of array) { + const p = Promise.resolve().then(() => iteratorFn(item)); + writer.write(p); + const e: Promise = p.then(() => + executing.splice(executing.indexOf(e), 1) + ); + executing.push(e); + if (executing.length >= poolLimit) { + await Promise.race(executing); + } + } + // Wait until all ongoing events have processed, then close the writer. + await Promise.all(executing); + writer.close(); + })(); + return res.readable.getIterator(); +} diff --git a/std/async/pool_test.ts b/std/async/pool_test.ts new file mode 100644 index 000000000..1be4556be --- /dev/null +++ b/std/async/pool_test.ts @@ -0,0 +1,19 @@ +import { pooledMap } from "./pool.ts"; +import { assert } from "../testing/asserts.ts"; + +Deno.test("[async] pooledMap", async function (): Promise { + const start = new Date(); + const results = pooledMap( + 2, + [1, 2, 3], + (i) => new Promise((r) => setTimeout(() => r(i), 1000)), + ); + for await (const value of results) { + console.log(value); + } + const diff = new Date().getTime() - start.getTime(); + assert(diff >= 2000); + assert(diff < 3000); +}); + +export {}; -- cgit v1.2.3