diff options
Diffstat (limited to 'std/async')
-rw-r--r-- | std/async/README.md | 85 | ||||
-rw-r--r-- | std/async/deferred.ts | 26 | ||||
-rw-r--r-- | std/async/deferred_test.ts | 17 | ||||
-rw-r--r-- | std/async/delay.ts | 9 | ||||
-rw-r--r-- | std/async/delay_test.ts | 12 | ||||
-rw-r--r-- | std/async/mod.ts | 5 | ||||
-rw-r--r-- | std/async/mux_async_iterator.ts | 69 | ||||
-rw-r--r-- | std/async/mux_async_iterator_test.ts | 50 | ||||
-rw-r--r-- | std/async/pool.ts | 68 | ||||
-rw-r--r-- | std/async/pool_test.ts | 44 | ||||
-rw-r--r-- | std/async/test.ts | 2 |
11 files changed, 0 insertions, 387 deletions
diff --git a/std/async/README.md b/std/async/README.md deleted file mode 100644 index 2c2ca1018..000000000 --- a/std/async/README.md +++ /dev/null @@ -1,85 +0,0 @@ -# async - -async is a module to provide help with asynchronous tasks. - -# Usage - -The following functions and class are exposed in `mod.ts`: - -## deferred - -Create a Promise with the `reject` and `resolve` functions. - -```typescript -import { deferred } from "https://deno.land/std/async/mod.ts"; - -const p = deferred<number>(); -// ... -p.resolve(42); -``` - -## delay - -Resolve a Promise after a given amount of milliseconds. - -```typescript -import { delay } from "https://deno.land/std/async/mod.ts"; - -// ... -const delayedPromise = delay(100); -const result = await delayedPromise; -// ... -``` - -## MuxAsyncIterator - -The MuxAsyncIterator class multiplexes multiple async iterators into a single -stream. - -The class makes an assumption that the final result (the value returned and not -yielded from the iterator) does not matter. If there is any result, it is -discarded. - -```typescript -import { MuxAsyncIterator } from "https://deno.land/std/async/mod.ts"; - -async function* gen123(): AsyncIterableIterator<number> { - yield 1; - yield 2; - yield 3; -} - -async function* gen456(): AsyncIterableIterator<number> { - yield 4; - yield 5; - yield 6; -} - -const mux = new MuxAsyncIterator<number>(); -mux.add(gen123()); -mux.add(gen456()); -for await (const value of mux) { - // ... -} -// .. -``` - -## pooledMap - -Transform values from an (async) iterable into another async iterable. The -transforms are done concurrently, with a max concurrency defined by the -poolLimit. - -```typescript -import { pooledMap } from "https://deno.land/std/async/mod.ts"; - -const results = pooledMap( - 2, - [1, 2, 3], - (i) => new Promise((r) => setTimeout(() => r(i), 1000)), -); - -for await (const value of results) { - // ... -} -``` diff --git a/std/async/deferred.ts b/std/async/deferred.ts deleted file mode 100644 index ca05a29bb..000000000 --- a/std/async/deferred.ts +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -// TODO(ry) It'd be better to make Deferred a class that inherits from -// Promise, rather than an interface. This is possible in ES2016, however -// typescript produces broken code when targeting ES5 code. -// See https://github.com/Microsoft/TypeScript/issues/15202 -// At the time of writing, the github issue is closed but the problem remains. -export interface Deferred<T> extends Promise<T> { - resolve: (value?: T | PromiseLike<T>) => void; - // deno-lint-ignore no-explicit-any - reject: (reason?: any) => void; -} - -/** Creates a Promise with the `reject` and `resolve` functions - * placed as methods on the promise object itself. It allows you to do: - * - * const p = deferred<number>(); - * // ... - * p.resolve(42); - */ -export function deferred<T>(): Deferred<T> { - let methods; - const promise = new Promise<T>((resolve, reject): void => { - methods = { resolve, reject }; - }); - return Object.assign(promise, methods) as Deferred<T>; -} diff --git a/std/async/deferred_test.ts b/std/async/deferred_test.ts deleted file mode 100644 index ba287f442..000000000 --- a/std/async/deferred_test.ts +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -import { assertEquals, assertThrowsAsync } from "../testing/asserts.ts"; -import { deferred } from "./deferred.ts"; - -Deno.test("[async] deferred: resolve", async function (): Promise<void> { - const d = deferred<string>(); - d.resolve("🦕"); - assertEquals(await d, "🦕"); -}); - -Deno.test("[async] deferred: reject", async function (): Promise<void> { - const d = deferred<number>(); - d.reject(new Error("A deno error 🦕")); - await assertThrowsAsync(async () => { - await d; - }); -}); diff --git a/std/async/delay.ts b/std/async/delay.ts deleted file mode 100644 index 0a9e1f529..000000000 --- a/std/async/delay.ts +++ /dev/null @@ -1,9 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -/* Resolves after the given number of milliseconds. */ -export function delay(ms: number): Promise<void> { - return new Promise((res): number => - setTimeout((): void => { - res(); - }, ms) - ); -} diff --git a/std/async/delay_test.ts b/std/async/delay_test.ts deleted file mode 100644 index e5f08f110..000000000 --- a/std/async/delay_test.ts +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -import { delay } from "./delay.ts"; -import { assert } from "../testing/asserts.ts"; - -Deno.test("[async] delay", async function (): Promise<void> { - const start = new Date(); - const delayedPromise = delay(100); - const result = await delayedPromise; - const diff = new Date().getTime() - start.getTime(); - assert(result === undefined); - assert(diff >= 100); -}); diff --git a/std/async/mod.ts b/std/async/mod.ts deleted file mode 100644 index 0345d8caf..000000000 --- a/std/async/mod.ts +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -export * from "./deferred.ts"; -export * from "./delay.ts"; -export * from "./mux_async_iterator.ts"; -export * from "./pool.ts"; diff --git a/std/async/mux_async_iterator.ts b/std/async/mux_async_iterator.ts deleted file mode 100644 index 0bda4f579..000000000 --- a/std/async/mux_async_iterator.ts +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -import { Deferred, deferred } from "./deferred.ts"; - -interface TaggedYieldedValue<T> { - iterator: AsyncIterableIterator<T>; - value: T; -} - -/** The MuxAsyncIterator class multiplexes multiple async iterators into a - * single stream. It currently makes an assumption: - * - The final result (the value returned and not yielded from the iterator) - * does not matter; if there is any, it is discarded. - */ -export class MuxAsyncIterator<T> implements AsyncIterable<T> { - private iteratorCount = 0; - private yields: Array<TaggedYieldedValue<T>> = []; - // deno-lint-ignore no-explicit-any - private throws: any[] = []; - private signal: Deferred<void> = deferred(); - - add(iterator: AsyncIterableIterator<T>): void { - ++this.iteratorCount; - this.callIteratorNext(iterator); - } - - private async callIteratorNext( - iterator: AsyncIterableIterator<T>, - ): Promise<void> { - try { - const { value, done } = await iterator.next(); - if (done) { - --this.iteratorCount; - } else { - this.yields.push({ iterator, value }); - } - } catch (e) { - this.throws.push(e); - } - this.signal.resolve(); - } - - async *iterate(): AsyncIterableIterator<T> { - while (this.iteratorCount > 0) { - // Sleep until any of the wrapped iterators yields. - await this.signal; - - // Note that while we're looping over `yields`, new items may be added. - for (let i = 0; i < this.yields.length; i++) { - const { iterator, value } = this.yields[i]; - yield value; - this.callIteratorNext(iterator); - } - - if (this.throws.length) { - for (const e of this.throws) { - throw e; - } - this.throws.length = 0; - } - // Clear the `yields` list and reset the `signal` promise. - this.yields.length = 0; - this.signal = deferred(); - } - } - - [Symbol.asyncIterator](): AsyncIterableIterator<T> { - return this.iterate(); - } -} diff --git a/std/async/mux_async_iterator_test.ts b/std/async/mux_async_iterator_test.ts deleted file mode 100644 index e1bdb47b4..000000000 --- a/std/async/mux_async_iterator_test.ts +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -import { assertEquals, assertThrowsAsync } from "../testing/asserts.ts"; -import { MuxAsyncIterator } from "./mux_async_iterator.ts"; - -async function* gen123(): AsyncIterableIterator<number> { - yield 1; - yield 2; - yield 3; -} - -async function* gen456(): AsyncIterableIterator<number> { - yield 4; - yield 5; - yield 6; -} - -async function* genThrows(): AsyncIterableIterator<number> { - yield 7; - throw new Error("something went wrong"); -} - -Deno.test("[async] MuxAsyncIterator", async function (): Promise<void> { - const mux = new MuxAsyncIterator<number>(); - mux.add(gen123()); - mux.add(gen456()); - const results = new Set(); - for await (const value of mux) { - results.add(value); - } - assertEquals(results.size, 6); -}); - -Deno.test({ - name: "[async] MuxAsyncIterator throws", - async fn() { - const mux = new MuxAsyncIterator<number>(); - mux.add(gen123()); - mux.add(genThrows()); - const results = new Set(); - await assertThrowsAsync( - async () => { - for await (const value of mux) { - results.add(value); - } - }, - Error, - "something went wrong", - ); - }, -}); diff --git a/std/async/pool.ts b/std/async/pool.ts deleted file mode 100644 index 0b87f24ac..000000000 --- a/std/async/pool.ts +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -/** - * pooledMap transforms values from an (async) iterable into another async - * iterable. The transforms are done concurrently, with a max concurrency - * defined by the poolLimit. - * - * If an error is thrown from `iterableFn`, no new transformations will begin. - * All currently executing transformations are allowed to finish and still - * yielded on success. After that, the rejections among them are gathered and - * thrown by the iterator in an `AggregateError`. - * - * @param poolLimit The maximum count of items being processed concurrently. - * @param array The input array for mapping. - * @param iteratorFn The function to call for every item of the array. - */ -export function pooledMap<T, R>( - poolLimit: number, - array: Iterable<T> | AsyncIterable<T>, - iteratorFn: (data: T) => Promise<R>, -): AsyncIterableIterator<R> { - // Create the async iterable that is returned from this function. - const res = new TransformStream<Promise<R>, R>({ - async transform( - p: Promise<R>, - controller: TransformStreamDefaultController<R>, - ): Promise<void> { - controller.enqueue(await p); - }, - }); - // Start processing items from the iterator - (async (): Promise<void> => { - const writer = res.writable.getWriter(); - const executing: Array<Promise<unknown>> = []; - try { - for await (const item of array) { - const p = Promise.resolve().then(() => iteratorFn(item)); - // Only write on success. If we `writer.write()` a rejected promise, - // that will end the iteration. We don't want that yet. Instead let it - // fail the race, taking us to the catch block where all currently - // executing jobs are allowed to finish and all rejections among them - // can be reported together. - p.then((v) => writer.write(Promise.resolve(v))).catch(() => {}); - const e: Promise<unknown> = p.then(() => - executing.splice(executing.indexOf(e), 1) - ); - executing.push(e); - if (executing.length >= poolLimit) { - await Promise.race(executing); - } - } - // Wait until all ongoing events have processed, then close the writer. - await Promise.all(executing); - writer.close(); - } catch { - const errors = []; - for (const result of await Promise.allSettled(executing)) { - if (result.status == "rejected") { - errors.push(result.reason); - } - } - writer.write(Promise.reject( - new AggregateError(errors, "Threw while mapping."), - )).catch(() => {}); - } - })(); - return res.readable[Symbol.asyncIterator](); -} diff --git a/std/async/pool_test.ts b/std/async/pool_test.ts deleted file mode 100644 index 81be903ed..000000000 --- a/std/async/pool_test.ts +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -import { delay } from "./delay.ts"; -import { pooledMap } from "./pool.ts"; -import { - assert, - assertEquals, - assertStringIncludes, - assertThrowsAsync, -} from "../testing/asserts.ts"; - -Deno.test("[async] pooledMap", async function (): Promise<void> { - const start = new Date(); - const results = pooledMap( - 2, - [1, 2, 3], - (i) => new Promise((r) => setTimeout(() => r(i), 1000)), - ); - for await (const value of results) { - console.log(value); - } - const diff = new Date().getTime() - start.getTime(); - assert(diff >= 2000); - assert(diff < 3000); -}); - -Deno.test("[async] pooledMap errors", async function (): Promise<void> { - async function mapNumber(n: number): Promise<number> { - if (n <= 2) { - throw new Error(`Bad number: ${n}`); - } - await delay(100); - return n; - } - const mappedNumbers: number[] = []; - const error = await assertThrowsAsync(async () => { - for await (const m of pooledMap(3, [1, 2, 3, 4], mapNumber)) { - mappedNumbers.push(m); - } - }, AggregateError) as AggregateError; - assertEquals(mappedNumbers, [3]); - assertEquals(error.errors.length, 2); - assertStringIncludes(error.errors[0].stack, "Error: Bad number: 1"); - assertStringIncludes(error.errors[1].stack, "Error: Bad number: 2"); -}); diff --git a/std/async/test.ts b/std/async/test.ts deleted file mode 100644 index 590417055..000000000 --- a/std/async/test.ts +++ /dev/null @@ -1,2 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -import "./mod.ts"; |