summaryrefslogtreecommitdiff
path: root/std/async
diff options
context:
space:
mode:
Diffstat (limited to 'std/async')
-rw-r--r--std/async/mod.ts1
-rw-r--r--std/async/pool.ts46
-rw-r--r--std/async/pool_test.ts19
3 files changed, 66 insertions, 0 deletions
diff --git a/std/async/mod.ts b/std/async/mod.ts
index 9efead91d..ad4ab5156 100644
--- a/std/async/mod.ts
+++ b/std/async/mod.ts
@@ -2,3 +2,4 @@
export * from "./deferred.ts";
export * from "./delay.ts";
export * from "./mux_async_iterator.ts";
+export * from "./pool.ts";
diff --git a/std/async/pool.ts b/std/async/pool.ts
new file mode 100644
index 000000000..fc8517041
--- /dev/null
+++ b/std/async/pool.ts
@@ -0,0 +1,46 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+/**
+ * pooledMap transforms values from an (async) iterable into another async
+ * iterable. The transforms are done concurrently, with a max concurrency
+ * defined by the poolLimit.
+ *
+ * @param poolLimit The maximum count of items being processed concurrently.
+ * @param array The input array for mapping.
+ * @param iteratorFn The function to call for every item of the array.
+ */
+export function pooledMap<T, R>(
+ poolLimit: number,
+ array: Iterable<T> | AsyncIterable<T>,
+ iteratorFn: (data: T) => Promise<R>,
+): AsyncIterableIterator<R> {
+ // Create the async iterable that is returned from this function.
+ const res = new TransformStream<Promise<R>, R>({
+ async transform(
+ p: Promise<R>,
+ controller: TransformStreamDefaultController<R>,
+ ): Promise<void> {
+ controller.enqueue(await p);
+ },
+ });
+ // Start processing items from the iterator
+ (async (): Promise<void> => {
+ const writer = res.writable.getWriter();
+ const executing: Array<Promise<unknown>> = [];
+ for await (const item of array) {
+ const p = Promise.resolve().then(() => iteratorFn(item));
+ writer.write(p);
+ const e: Promise<unknown> = p.then(() =>
+ executing.splice(executing.indexOf(e), 1)
+ );
+ executing.push(e);
+ if (executing.length >= poolLimit) {
+ await Promise.race(executing);
+ }
+ }
+ // Wait until all ongoing events have processed, then close the writer.
+ await Promise.all(executing);
+ writer.close();
+ })();
+ return res.readable.getIterator();
+}
diff --git a/std/async/pool_test.ts b/std/async/pool_test.ts
new file mode 100644
index 000000000..1be4556be
--- /dev/null
+++ b/std/async/pool_test.ts
@@ -0,0 +1,19 @@
+import { pooledMap } from "./pool.ts";
+import { assert } from "../testing/asserts.ts";
+
+Deno.test("[async] pooledMap", async function (): Promise<void> {
+ const start = new Date();
+ const results = pooledMap(
+ 2,
+ [1, 2, 3],
+ (i) => new Promise((r) => setTimeout(() => r(i), 1000)),
+ );
+ for await (const value of results) {
+ console.log(value);
+ }
+ const diff = new Date().getTime() - start.getTime();
+ assert(diff >= 2000);
+ assert(diff < 3000);
+});
+
+export {};