From f184332c09c851faac50f598d29ebe4426e05464 Mon Sep 17 00:00:00 2001 From: Nayeem Rahman Date: Sat, 9 May 2020 13:34:47 +0100 Subject: BREAKING(std): reorganization (#5087) * Prepend underscores to private modules * Remove collectUint8Arrays() It would be a misuse of Deno.iter()'s result. * Move std/_util/async.ts to std/async * Move std/util/sha*.ts to std/hash --- std/async/mux_async_iterator.ts | 58 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 std/async/mux_async_iterator.ts (limited to 'std/async/mux_async_iterator.ts') diff --git a/std/async/mux_async_iterator.ts b/std/async/mux_async_iterator.ts new file mode 100644 index 000000000..b32689a29 --- /dev/null +++ b/std/async/mux_async_iterator.ts @@ -0,0 +1,58 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +import { Deferred, deferred } from "./deferred.ts"; + +interface TaggedYieldedValue { + iterator: AsyncIterableIterator; + value: T; +} + +/** The MuxAsyncIterator class multiplexes multiple async iterators into a + * single stream. It currently makes a few assumptions: + * - The iterators do not throw. + * - The final result (the value returned and not yielded from the iterator) + * does not matter; if there is any, it is discarded. + */ +export class MuxAsyncIterator implements AsyncIterable { + private iteratorCount = 0; + private yields: Array> = []; + private signal: Deferred = deferred(); + + add(iterator: AsyncIterableIterator): void { + ++this.iteratorCount; + this.callIteratorNext(iterator); + } + + private async callIteratorNext( + iterator: AsyncIterableIterator + ): Promise { + const { value, done } = await iterator.next(); + if (done) { + --this.iteratorCount; + } else { + this.yields.push({ iterator, value }); + } + this.signal.resolve(); + } + + async *iterate(): AsyncIterableIterator { + while (this.iteratorCount > 0) { + // Sleep until any of the wrapped iterators yields. + await this.signal; + + // Note that while we're looping over `yields`, new items may be added. + for (let i = 0; i < this.yields.length; i++) { + const { iterator, value } = this.yields[i]; + yield value; + this.callIteratorNext(iterator); + } + + // Clear the `yields` list and reset the `signal` promise. + this.yields.length = 0; + this.signal = deferred(); + } + } + + [Symbol.asyncIterator](): AsyncIterableIterator { + return this.iterate(); + } +} -- cgit v1.2.3