diff options
Diffstat (limited to 'std/async/mux_async_iterator.ts')
-rw-r--r-- | std/async/mux_async_iterator.ts | 25 |
1 files changed, 18 insertions, 7 deletions
diff --git a/std/async/mux_async_iterator.ts b/std/async/mux_async_iterator.ts index b32689a29..ba6f22775 100644 --- a/std/async/mux_async_iterator.ts +++ b/std/async/mux_async_iterator.ts @@ -7,14 +7,15 @@ interface TaggedYieldedValue<T> { } /** The MuxAsyncIterator class multiplexes multiple async iterators into a - * single stream. It currently makes a few assumptions: - * - The iterators do not throw. + * single stream. It currently makes an assumption: * - The final result (the value returned and not yielded from the iterator) * does not matter; if there is any, it is discarded. */ export class MuxAsyncIterator<T> implements AsyncIterable<T> { private iteratorCount = 0; private yields: Array<TaggedYieldedValue<T>> = []; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private throws: any[] = []; private signal: Deferred<void> = deferred(); add(iterator: AsyncIterableIterator<T>): void { @@ -25,11 +26,15 @@ export class MuxAsyncIterator<T> implements AsyncIterable<T> { private async callIteratorNext( iterator: AsyncIterableIterator<T> ): Promise<void> { - const { value, done } = await iterator.next(); - if (done) { - --this.iteratorCount; - } else { - this.yields.push({ iterator, value }); + try { + const { value, done } = await iterator.next(); + if (done) { + --this.iteratorCount; + } else { + this.yields.push({ iterator, value }); + } + } catch (e) { + this.throws.push(e); } this.signal.resolve(); } @@ -46,6 +51,12 @@ export class MuxAsyncIterator<T> implements AsyncIterable<T> { this.callIteratorNext(iterator); } + if (this.throws.length) { + for (const e of this.throws) { + throw e; + } + this.throws.length = 0; + } // Clear the `yields` list and reset the `signal` promise. this.yields.length = 0; this.signal = deferred(); |