diff options
author | Nayeem Rahman <nayeemrmn99@gmail.com> | 2020-05-09 13:34:47 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-09 08:34:47 -0400 |
commit | f184332c09c851faac50f598d29ebe4426e05464 (patch) | |
tree | 2659aba63702537fcde1bb64ddeafea1e5863f3e /std/async | |
parent | 2b02535028f868ea8dfc471c4921a237747ccd4a (diff) |
BREAKING(std): reorganization (#5087)
* Prepend underscores to private modules
* Remove collectUint8Arrays() It would be a misuse of Deno.iter()'s result.
* Move std/_util/async.ts to std/async
* Move std/util/sha*.ts to std/hash
Diffstat (limited to 'std/async')
-rw-r--r-- | std/async/deferred.ts | 26 | ||||
-rw-r--r-- | std/async/deferred_test.ts | 8 | ||||
-rw-r--r-- | std/async/delay.ts | 9 | ||||
-rw-r--r-- | std/async/mod.ts | 4 | ||||
-rw-r--r-- | std/async/mux_async_iterator.ts | 58 | ||||
-rw-r--r-- | std/async/mux_async_iterator_test.ts | 28 |
6 files changed, 133 insertions, 0 deletions
diff --git a/std/async/deferred.ts b/std/async/deferred.ts new file mode 100644 index 000000000..109a1a37e --- /dev/null +++ b/std/async/deferred.ts @@ -0,0 +1,26 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +// TODO(ry) It'd be better to make Deferred a class that inherits from +// Promise, rather than an interface. This is possible in ES2016, however +// typescript produces broken code when targeting ES5 code. +// See https://github.com/Microsoft/TypeScript/issues/15202 +// At the time of writing, the github issue is closed but the problem remains. +export interface Deferred<T> extends Promise<T> { + resolve: (value?: T | PromiseLike<T>) => void; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + reject: (reason?: any) => void; +} + +/** Creates a Promise with the `reject` and `resolve` functions + * placed as methods on the promise object itself. It allows you to do: + * + * const p = deferred<number>(); + * // ... + * p.resolve(42); + */ +export function deferred<T>(): Deferred<T> { + let methods; + const promise = new Promise<T>((resolve, reject): void => { + methods = { resolve, reject }; + }); + return Object.assign(promise, methods) as Deferred<T>; +} diff --git a/std/async/deferred_test.ts b/std/async/deferred_test.ts new file mode 100644 index 000000000..83c317853 --- /dev/null +++ b/std/async/deferred_test.ts @@ -0,0 +1,8 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +import { deferred } from "./deferred.ts"; + +Deno.test("[async] deferred", function (): Promise<void> { + const d = deferred<number>(); + d.resolve(12); + return Promise.resolve(); +}); diff --git a/std/async/delay.ts b/std/async/delay.ts new file mode 100644 index 000000000..e3aec368f --- /dev/null +++ b/std/async/delay.ts @@ -0,0 +1,9 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +/* Resolves after the given number of milliseconds. */ +export function delay(ms: number): Promise<void> { + return new Promise((res): number => + setTimeout((): void => { + res(); + }, ms) + ); +} diff --git a/std/async/mod.ts b/std/async/mod.ts new file mode 100644 index 000000000..9efead91d --- /dev/null +++ b/std/async/mod.ts @@ -0,0 +1,4 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +export * from "./deferred.ts"; +export * from "./delay.ts"; +export * from "./mux_async_iterator.ts"; diff --git a/std/async/mux_async_iterator.ts b/std/async/mux_async_iterator.ts new file mode 100644 index 000000000..b32689a29 --- /dev/null +++ b/std/async/mux_async_iterator.ts @@ -0,0 +1,58 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +import { Deferred, deferred } from "./deferred.ts"; + +interface TaggedYieldedValue<T> { + iterator: AsyncIterableIterator<T>; + value: T; +} + +/** The MuxAsyncIterator class multiplexes multiple async iterators into a + * single stream. It currently makes a few assumptions: + * - The iterators do not throw. + * - The final result (the value returned and not yielded from the iterator) + * does not matter; if there is any, it is discarded. + */ +export class MuxAsyncIterator<T> implements AsyncIterable<T> { + private iteratorCount = 0; + private yields: Array<TaggedYieldedValue<T>> = []; + private signal: Deferred<void> = deferred(); + + add(iterator: AsyncIterableIterator<T>): void { + ++this.iteratorCount; + this.callIteratorNext(iterator); + } + + private async callIteratorNext( + iterator: AsyncIterableIterator<T> + ): Promise<void> { + const { value, done } = await iterator.next(); + if (done) { + --this.iteratorCount; + } else { + this.yields.push({ iterator, value }); + } + this.signal.resolve(); + } + + async *iterate(): AsyncIterableIterator<T> { + while (this.iteratorCount > 0) { + // Sleep until any of the wrapped iterators yields. + await this.signal; + + // Note that while we're looping over `yields`, new items may be added. + for (let i = 0; i < this.yields.length; i++) { + const { iterator, value } = this.yields[i]; + yield value; + this.callIteratorNext(iterator); + } + + // Clear the `yields` list and reset the `signal` promise. + this.yields.length = 0; + this.signal = deferred(); + } + } + + [Symbol.asyncIterator](): AsyncIterableIterator<T> { + return this.iterate(); + } +} diff --git a/std/async/mux_async_iterator_test.ts b/std/async/mux_async_iterator_test.ts new file mode 100644 index 000000000..7017a4eba --- /dev/null +++ b/std/async/mux_async_iterator_test.ts @@ -0,0 +1,28 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +import { assertEquals } from "../testing/asserts.ts"; +import { MuxAsyncIterator } from "./mux_async_iterator.ts"; + +// eslint-disable-next-line require-await +async function* gen123(): AsyncIterableIterator<number> { + yield 1; + yield 2; + yield 3; +} + +// eslint-disable-next-line require-await +async function* gen456(): AsyncIterableIterator<number> { + yield 4; + yield 5; + yield 6; +} + +Deno.test("[async] MuxAsyncIterator", async function (): Promise<void> { + const mux = new MuxAsyncIterator<number>(); + mux.add(gen123()); + mux.add(gen456()); + const results = new Set(); + for await (const value of mux) { + results.add(value); + } + assertEquals(results.size, 6); +}); |