summaryrefslogtreecommitdiff
path: root/std/async/pool.ts
diff options
context:
space:
mode:
authorCasper Beyer <caspervonb@pm.me>2021-02-02 19:05:46 +0800
committerGitHub <noreply@github.com>2021-02-02 12:05:46 +0100
commit6abf126c2a7a451cded8c6b5e6ddf1b69c84055d (patch)
treefd94c013a19fcb38954844085821ec1601c20e18 /std/async/pool.ts
parenta2b5d44f1aa9d64f448a2a3cc2001272e2f60b98 (diff)
chore: remove std directory (#9361)
This removes the std folder from the tree. Various parts of the tests are pretty tightly dependent on std (47 direct imports and 75 indirect imports, not counting the cli tests that use them as fixtures) so I've added std as a submodule for now.
Diffstat (limited to 'std/async/pool.ts')
-rw-r--r--std/async/pool.ts68
1 files changed, 0 insertions, 68 deletions
diff --git a/std/async/pool.ts b/std/async/pool.ts
deleted file mode 100644
index 0b87f24ac..000000000
--- a/std/async/pool.ts
+++ /dev/null
@@ -1,68 +0,0 @@
-// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-
-/**
- * pooledMap transforms values from an (async) iterable into another async
- * iterable. The transforms are done concurrently, with a max concurrency
- * defined by the poolLimit.
- *
- * If an error is thrown from `iterableFn`, no new transformations will begin.
- * All currently executing transformations are allowed to finish and still
- * yielded on success. After that, the rejections among them are gathered and
- * thrown by the iterator in an `AggregateError`.
- *
- * @param poolLimit The maximum count of items being processed concurrently.
- * @param array The input array for mapping.
- * @param iteratorFn The function to call for every item of the array.
- */
-export function pooledMap<T, R>(
- poolLimit: number,
- array: Iterable<T> | AsyncIterable<T>,
- iteratorFn: (data: T) => Promise<R>,
-): AsyncIterableIterator<R> {
- // Create the async iterable that is returned from this function.
- const res = new TransformStream<Promise<R>, R>({
- async transform(
- p: Promise<R>,
- controller: TransformStreamDefaultController<R>,
- ): Promise<void> {
- controller.enqueue(await p);
- },
- });
- // Start processing items from the iterator
- (async (): Promise<void> => {
- const writer = res.writable.getWriter();
- const executing: Array<Promise<unknown>> = [];
- try {
- for await (const item of array) {
- const p = Promise.resolve().then(() => iteratorFn(item));
- // Only write on success. If we `writer.write()` a rejected promise,
- // that will end the iteration. We don't want that yet. Instead let it
- // fail the race, taking us to the catch block where all currently
- // executing jobs are allowed to finish and all rejections among them
- // can be reported together.
- p.then((v) => writer.write(Promise.resolve(v))).catch(() => {});
- const e: Promise<unknown> = p.then(() =>
- executing.splice(executing.indexOf(e), 1)
- );
- executing.push(e);
- if (executing.length >= poolLimit) {
- await Promise.race(executing);
- }
- }
- // Wait until all ongoing events have processed, then close the writer.
- await Promise.all(executing);
- writer.close();
- } catch {
- const errors = [];
- for (const result of await Promise.allSettled(executing)) {
- if (result.status == "rejected") {
- errors.push(result.reason);
- }
- }
- writer.write(Promise.reject(
- new AggregateError(errors, "Threw while mapping."),
- )).catch(() => {});
- }
- })();
- return res.readable[Symbol.asyncIterator]();
-}