diff options
author | Casper Beyer <caspervonb@pm.me> | 2021-02-02 19:05:46 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-02-02 12:05:46 +0100 |
commit | 6abf126c2a7a451cded8c6b5e6ddf1b69c84055d (patch) | |
tree | fd94c013a19fcb38954844085821ec1601c20e18 /std/async/pool.ts | |
parent | a2b5d44f1aa9d64f448a2a3cc2001272e2f60b98 (diff) |
chore: remove std directory (#9361)
This removes the std folder from the tree.
Various parts of the tests are pretty tightly dependent
on std (47 direct imports and 75 indirect imports, not
counting the cli tests that use them as fixtures) so I've
added std as a submodule for now.
Diffstat (limited to 'std/async/pool.ts')
-rw-r--r-- | std/async/pool.ts | 68 |
1 files changed, 0 insertions, 68 deletions
diff --git a/std/async/pool.ts b/std/async/pool.ts deleted file mode 100644 index 0b87f24ac..000000000 --- a/std/async/pool.ts +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -/** - * pooledMap transforms values from an (async) iterable into another async - * iterable. The transforms are done concurrently, with a max concurrency - * defined by the poolLimit. - * - * If an error is thrown from `iterableFn`, no new transformations will begin. - * All currently executing transformations are allowed to finish and still - * yielded on success. After that, the rejections among them are gathered and - * thrown by the iterator in an `AggregateError`. - * - * @param poolLimit The maximum count of items being processed concurrently. - * @param array The input array for mapping. - * @param iteratorFn The function to call for every item of the array. - */ -export function pooledMap<T, R>( - poolLimit: number, - array: Iterable<T> | AsyncIterable<T>, - iteratorFn: (data: T) => Promise<R>, -): AsyncIterableIterator<R> { - // Create the async iterable that is returned from this function. - const res = new TransformStream<Promise<R>, R>({ - async transform( - p: Promise<R>, - controller: TransformStreamDefaultController<R>, - ): Promise<void> { - controller.enqueue(await p); - }, - }); - // Start processing items from the iterator - (async (): Promise<void> => { - const writer = res.writable.getWriter(); - const executing: Array<Promise<unknown>> = []; - try { - for await (const item of array) { - const p = Promise.resolve().then(() => iteratorFn(item)); - // Only write on success. If we `writer.write()` a rejected promise, - // that will end the iteration. We don't want that yet. Instead let it - // fail the race, taking us to the catch block where all currently - // executing jobs are allowed to finish and all rejections among them - // can be reported together. - p.then((v) => writer.write(Promise.resolve(v))).catch(() => {}); - const e: Promise<unknown> = p.then(() => - executing.splice(executing.indexOf(e), 1) - ); - executing.push(e); - if (executing.length >= poolLimit) { - await Promise.race(executing); - } - } - // Wait until all ongoing events have processed, then close the writer. - await Promise.all(executing); - writer.close(); - } catch { - const errors = []; - for (const result of await Promise.allSettled(executing)) { - if (result.status == "rejected") { - errors.push(result.reason); - } - } - writer.write(Promise.reject( - new AggregateError(errors, "Threw while mapping."), - )).catch(() => {}); - } - })(); - return res.readable[Symbol.asyncIterator](); -} |