diff options
Diffstat (limited to 'cli/js')
-rw-r--r-- | cli/js/globals.ts | 7 | ||||
-rw-r--r-- | cli/js/lib.deno.shared_globals.d.ts | 64 | ||||
-rw-r--r-- | cli/js/tests/streams_piping_test.ts | 131 | ||||
-rw-r--r-- | cli/js/tests/streams_writable_test.ts | 253 | ||||
-rw-r--r-- | cli/js/tests/unit_tests.ts | 2 | ||||
-rw-r--r-- | cli/js/util.ts | 12 | ||||
-rw-r--r-- | cli/js/web/dom_types.d.ts | 16 | ||||
-rw-r--r-- | cli/js/web/fetch.ts | 4 | ||||
-rw-r--r-- | cli/js/web/streams/internals.ts | 1224 | ||||
-rw-r--r-- | cli/js/web/streams/queuing_strategy.ts | 53 | ||||
-rw-r--r-- | cli/js/web/streams/readable_byte_stream_controller.ts | 10 | ||||
-rw-r--r-- | cli/js/web/streams/readable_stream.ts | 158 | ||||
-rw-r--r-- | cli/js/web/streams/readable_stream_default_controller.ts | 10 | ||||
-rw-r--r-- | cli/js/web/streams/readable_stream_default_reader.ts | 7 | ||||
-rw-r--r-- | cli/js/web/streams/symbols.ts | 16 | ||||
-rw-r--r-- | cli/js/web/streams/writable_stream.ts | 107 | ||||
-rw-r--r-- | cli/js/web/streams/writable_stream_default_controller.ts | 68 | ||||
-rw-r--r-- | cli/js/web/streams/writable_stream_default_writer.ts | 164 |
18 files changed, 2094 insertions, 212 deletions
diff --git a/cli/js/globals.ts b/cli/js/globals.ts index 87309a158..caf069ffd 100644 --- a/cli/js/globals.ts +++ b/cli/js/globals.ts @@ -23,6 +23,8 @@ import * as workers from "./web/workers.ts"; import * as performanceUtil from "./web/performance.ts"; import * as request from "./web/request.ts"; import * as readableStream from "./web/streams/readable_stream.ts"; +import * as queuingStrategy from "./web/streams/queuing_strategy.ts"; +import * as writableStream from "./web/streams/writable_stream.ts"; // These imports are not exposed and therefore are fine to just import the // symbols required. @@ -216,6 +218,10 @@ export const windowOrWorkerGlobalScopeProperties = { AbortController: nonEnumerable(abortController.AbortControllerImpl), AbortSignal: nonEnumerable(abortSignal.AbortSignalImpl), Blob: nonEnumerable(blob.DenoBlob), + ByteLengthQueuingStrategy: nonEnumerable( + queuingStrategy.ByteLengthQueuingStrategyImpl + ), + CountQueuingStrategy: nonEnumerable(queuingStrategy.CountQueuingStrategyImpl), File: nonEnumerable(domFile.DomFileImpl), CustomEvent: nonEnumerable(customEvent.CustomEventImpl), DOMException: nonEnumerable(domException.DOMExceptionImpl), @@ -232,6 +238,7 @@ export const windowOrWorkerGlobalScopeProperties = { Response: nonEnumerable(fetchTypes.Response), performance: writable(new performanceUtil.Performance()), Worker: nonEnumerable(workers.WorkerImpl), + WritableStream: nonEnumerable(writableStream.WritableStreamImpl), }; // eslint-disable-next-line @typescript-eslint/no-explicit-any diff --git a/cli/js/lib.deno.shared_globals.d.ts b/cli/js/lib.deno.shared_globals.d.ts index 635fa391e..f86279c27 100644 --- a/cli/js/lib.deno.shared_globals.d.ts +++ b/cli/js/lib.deno.shared_globals.d.ts @@ -309,6 +309,21 @@ interface QueuingStrategy<T = any> { size?: QueuingStrategySizeCallback<T>; } +/** This Streams API interface provides a built-in byte length queuing strategy + * that can be used when constructing streams. */ +declare class CountQueuingStrategy implements QueuingStrategy { + constructor(options: { highWaterMark: number }); + highWaterMark: number; + size(chunk: any): 1; +} + +declare class ByteLengthQueuingStrategy + implements QueuingStrategy<ArrayBufferView> { + constructor(options: { highWaterMark: number }); + highWaterMark: number; + size(chunk: ArrayBufferView): number; +} + /** This Streams API interface represents a readable stream of byte data. The * Fetch API offers a concrete instance of a ReadableStream through the body * property of a Response object. */ @@ -347,13 +362,58 @@ declare var ReadableStream: { ): ReadableStream<R>; }; -/** This Streams API interface provides a standard abstraction for writing streaming data to a destination, known as a sink. This object comes with built-in backpressure and queuing. */ -interface WritableStream<W = any> { +interface WritableStreamDefaultControllerCloseCallback { + (): void | PromiseLike<void>; +} + +interface WritableStreamDefaultControllerStartCallback { + (controller: WritableStreamDefaultController): void | PromiseLike<void>; +} + +interface WritableStreamDefaultControllerWriteCallback<W> { + (chunk: W, controller: WritableStreamDefaultController): void | PromiseLike< + void + >; +} + +interface WritableStreamErrorCallback { + (reason: any): void | PromiseLike<void>; +} + +interface UnderlyingSink<W = any> { + abort?: WritableStreamErrorCallback; + close?: WritableStreamDefaultControllerCloseCallback; + start?: WritableStreamDefaultControllerStartCallback; + type?: undefined; + write?: WritableStreamDefaultControllerWriteCallback<W>; +} + +/** This Streams API interface provides a standard abstraction for writing + * streaming data to a destination, known as a sink. This object comes with + * built-in backpressure and queuing. */ +declare class WritableStream<W = any> { + constructor( + underlyingSink?: UnderlyingSink<W>, + strategy?: QueuingStrategy<W> + ); readonly locked: boolean; abort(reason?: any): Promise<void>; + close(): Promise<void>; getWriter(): WritableStreamDefaultWriter<W>; } +/** This Streams API interface represents a controller allowing control of a + * WritableStream's state. When constructing a WritableStream, the underlying + * sink is given a corresponding WritableStreamDefaultController instance to + * manipulate. */ +interface WritableStreamDefaultController { + error(error?: any): void; +} + +/** This Streams API interface is the object returned by + * WritableStream.getWriter() and once created locks the < writer to the + * WritableStream ensuring that no other streams can write to the underlying + * sink. */ interface WritableStreamDefaultWriter<W = any> { readonly closed: Promise<void>; readonly desiredSize: number | null; diff --git a/cli/js/tests/streams_piping_test.ts b/cli/js/tests/streams_piping_test.ts new file mode 100644 index 000000000..a947b3821 --- /dev/null +++ b/cli/js/tests/streams_piping_test.ts @@ -0,0 +1,131 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +import { unitTest, assert, assertEquals } from "./test_util.ts"; +import { assertThrowsAsync } from "../../../std/testing/asserts.ts"; + +unitTest(function streamPipeLocks() { + const rs = new ReadableStream(); + const ws = new WritableStream(); + + assertEquals(rs.locked, false); + assertEquals(ws.locked, false); + + rs.pipeTo(ws); + + assert(rs.locked); + assert(ws.locked); +}); + +unitTest(async function streamPipeFinishUnlocks() { + const rs = new ReadableStream({ + start(controller: ReadableStreamDefaultController): void { + controller.close(); + }, + }); + const ws = new WritableStream(); + + await rs.pipeTo(ws); + assertEquals(rs.locked, false); + assertEquals(ws.locked, false); +}); + +unitTest(async function streamPipeReadableStreamLocked() { + const rs = new ReadableStream(); + const ws = new WritableStream(); + + rs.getReader(); + + await assertThrowsAsync(async () => { + await rs.pipeTo(ws); + }, TypeError); +}); + +unitTest(async function streamPipeReadableStreamLocked() { + const rs = new ReadableStream(); + const ws = new WritableStream(); + + ws.getWriter(); + + await assertThrowsAsync(async () => { + await rs.pipeTo(ws); + }, TypeError); +}); + +unitTest(async function streamPipeLotsOfChunks() { + const CHUNKS = 10; + + const rs = new ReadableStream<number>({ + start(c: ReadableStreamDefaultController): void { + for (let i = 0; i < CHUNKS; ++i) { + c.enqueue(i); + } + c.close(); + }, + }); + + const written: Array<string | number> = []; + const ws = new WritableStream( + { + write(chunk: number): void { + written.push(chunk); + }, + close(): void { + written.push("closed"); + }, + }, + new CountQueuingStrategy({ highWaterMark: CHUNKS }) + ); + + await rs.pipeTo(ws); + const targetValues = []; + for (let i = 0; i < CHUNKS; ++i) { + targetValues.push(i); + } + targetValues.push("closed"); + + assertEquals(written, targetValues, "the correct values must be written"); + + // Ensure both readable and writable are closed by the time the pipe finishes. + await Promise.all([rs.getReader().closed, ws.getWriter().closed]); +}); + +for (const preventAbort of [true, false]) { + unitTest(function undefinedRejectionFromPull() { + const rs = new ReadableStream({ + pull(): Promise<void> { + return Promise.reject(undefined); + }, + }); + + return rs.pipeTo(new WritableStream(), { preventAbort }).then( + () => { + throw new Error("pipeTo promise should be rejected"); + }, + (value) => + assertEquals(value, undefined, "rejection value should be undefined") + ); + }); +} + +for (const preventCancel of [true, false]) { + unitTest(function undefinedRejectionWithPreventCancel() { + const rs = new ReadableStream({ + pull(controller: ReadableStreamDefaultController<number>): void { + controller.enqueue(0); + }, + }); + + const ws = new WritableStream({ + write(): Promise<void> { + return Promise.reject(undefined); + }, + }); + + return rs.pipeTo(ws, { preventCancel }).then( + () => { + throw new Error("pipeTo promise should be rejected"); + }, + (value) => + assertEquals(value, undefined, "rejection value should be undefined") + ); + }); +} diff --git a/cli/js/tests/streams_writable_test.ts b/cli/js/tests/streams_writable_test.ts new file mode 100644 index 000000000..54c1624af --- /dev/null +++ b/cli/js/tests/streams_writable_test.ts @@ -0,0 +1,253 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +import { unitTest, assert, assertEquals, assertThrows } from "./test_util.ts"; + +unitTest(function writableStreamDesiredSizeOnReleasedWriter() { + const ws = new WritableStream(); + const writer = ws.getWriter(); + writer.releaseLock(); + assertThrows(() => { + writer.desiredSize; + }, TypeError); +}); + +unitTest(function writableStreamDesiredSizeInitialValue() { + const ws = new WritableStream(); + const writer = ws.getWriter(); + assertEquals(writer.desiredSize, 1); +}); + +unitTest(async function writableStreamDesiredSizeClosed() { + const ws = new WritableStream(); + const writer = ws.getWriter(); + await writer.close(); + assertEquals(writer.desiredSize, 0); +}); + +unitTest(function writableStreamStartThrowsDesiredSizeNull() { + const ws = new WritableStream({ + start(c): void { + c.error(); + }, + }); + + const writer = ws.getWriter(); + assertEquals(writer.desiredSize, null, "desiredSize should be null"); +}); + +unitTest(function getWriterOnClosingStream() { + const ws = new WritableStream({}); + + const writer = ws.getWriter(); + writer.close(); + writer.releaseLock(); + + ws.getWriter(); +}); + +unitTest(async function getWriterOnClosedStream() { + const ws = new WritableStream({}); + + const writer = ws.getWriter(); + await writer.close(); + writer.releaseLock(); + + ws.getWriter(); +}); + +unitTest(function getWriterOnAbortedStream() { + const ws = new WritableStream({}); + + const writer = ws.getWriter(); + writer.abort(); + writer.releaseLock(); + + ws.getWriter(); +}); + +unitTest(function getWriterOnErroredStream() { + const ws = new WritableStream({ + start(c): void { + c.error(); + }, + }); + + const writer = ws.getWriter(); + return writer.closed.then( + (v) => { + throw new Error(`writer.closed fulfilled unexpectedly with: ${v}`); + }, + () => { + writer.releaseLock(); + ws.getWriter(); + } + ); +}); + +unitTest(function closedAndReadyOnReleasedWriter() { + const ws = new WritableStream({}); + + const writer = ws.getWriter(); + writer.releaseLock(); + + return writer.closed.then( + (v) => { + throw new Error("writer.closed fulfilled unexpectedly with: " + v); + }, + (closedRejection) => { + assertEquals( + closedRejection.name, + "TypeError", + "closed promise should reject with a TypeError" + ); + return writer.ready.then( + (v) => { + throw new Error("writer.ready fulfilled unexpectedly with: " + v); + }, + (readyRejection) => + assertEquals( + readyRejection, + closedRejection, + "ready promise should reject with the same error" + ) + ); + } + ); +}); + +unitTest(function sinkMethodsCalledAsMethods() { + let thisObject: Sink | null = null; + // Calls to Sink methods after the first are implicitly ignored. Only the + // first value that is passed to the resolver is used. + class Sink { + start(): void { + assertEquals(this, thisObject, "start should be called as a method"); + } + + write(): void { + assertEquals(this, thisObject, "write should be called as a method"); + } + + close(): void { + assertEquals(this, thisObject, "close should be called as a method"); + } + + abort(): void { + assertEquals(this, thisObject, "abort should be called as a method"); + } + } + + const theSink = new Sink(); + thisObject = theSink; + const ws = new WritableStream(theSink); + + const writer = ws.getWriter(); + + writer.write("a"); + const closePromise = writer.close(); + + const ws2 = new WritableStream(theSink); + const writer2 = ws2.getWriter(); + const abortPromise = writer2.abort(); + + return Promise.all([closePromise, abortPromise]).then(undefined); +}); + +unitTest(function sizeShouldNotBeCalledAsMethod() { + const strategy = { + size(): number { + if (this !== undefined) { + throw new Error("size called as a method"); + } + return 1; + }, + }; + + const ws = new WritableStream({}, strategy); + const writer = ws.getWriter(); + return writer.write("a"); +}); + +unitTest(function redundantReleaseLockIsNoOp() { + const ws = new WritableStream(); + const writer1 = ws.getWriter(); + assertEquals( + undefined, + writer1.releaseLock(), + "releaseLock() should return undefined" + ); + const writer2 = ws.getWriter(); + assertEquals( + undefined, + writer1.releaseLock(), + "no-op releaseLock() should return undefined" + ); + // Calling releaseLock() on writer1 should not interfere with writer2. If it did, then the ready promise would be + // rejected. + return writer2.ready; +}); + +unitTest(function readyPromiseShouldFireBeforeReleaseLock() { + const events: string[] = []; + const ws = new WritableStream(); + const writer = ws.getWriter(); + return writer.ready.then(() => { + // Force the ready promise back to a pending state. + const writerPromise = writer.write("dummy"); + const readyPromise = writer.ready.catch(() => events.push("ready")); + const closedPromise = writer.closed.catch(() => events.push("closed")); + writer.releaseLock(); + return Promise.all([readyPromise, closedPromise]).then(() => { + assertEquals( + events, + ["ready", "closed"], + "ready promise should fire before closed promise" + ); + // Stop the writer promise hanging around after the test has finished. + return Promise.all([writerPromise, ws.abort()]).then(undefined); + }); + }); +}); + +unitTest(function subclassingWritableStream() { + class Subclass extends WritableStream { + extraFunction(): boolean { + return true; + } + } + assert( + Object.getPrototypeOf(Subclass.prototype) === WritableStream.prototype, + "Subclass.prototype's prototype should be WritableStream.prototype" + ); + assert( + Object.getPrototypeOf(Subclass) === WritableStream, + "Subclass's prototype should be WritableStream" + ); + const sub = new Subclass(); + assert( + sub instanceof WritableStream, + "Subclass object should be an instance of WritableStream" + ); + assert( + sub instanceof Subclass, + "Subclass object should be an instance of Subclass" + ); + const lockedGetter = Object.getOwnPropertyDescriptor( + WritableStream.prototype, + "locked" + )!.get!; + assert( + lockedGetter.call(sub) === sub.locked, + "Subclass object should pass brand check" + ); + assert( + sub.extraFunction(), + "extraFunction() should be present on Subclass object" + ); +}); + +unitTest(function lockedGetterShouldReturnTrue() { + const ws = new WritableStream(); + assert(!ws.locked, "stream should not be locked"); + ws.getWriter(); + assert(ws.locked, "stream should be locked"); +}); diff --git a/cli/js/tests/unit_tests.ts b/cli/js/tests/unit_tests.ts index fa168e6dd..db2df2216 100644 --- a/cli/js/tests/unit_tests.ts +++ b/cli/js/tests/unit_tests.ts @@ -53,6 +53,8 @@ import "./request_test.ts"; import "./resources_test.ts"; import "./signal_test.ts"; import "./stat_test.ts"; +import "./streams_piping_test.ts"; +import "./streams_writable_test.ts"; import "./symlink_test.ts"; import "./text_encoding_test.ts"; import "./testing_test.ts"; diff --git a/cli/js/util.ts b/cli/js/util.ts index 6db8ade7b..309bfcd0c 100644 --- a/cli/js/util.ts +++ b/cli/js/util.ts @@ -20,9 +20,17 @@ export function log(...args: unknown[]): void { } // @internal -export function assert(cond: unknown, msg = "assert"): asserts cond { +export class AssertionError extends Error { + constructor(msg?: string) { + super(msg); + this.name = "AssertionError"; + } +} + +// @internal +export function assert(cond: unknown, msg = "Assertion failed."): asserts cond { if (!cond) { - throw Error(msg); + throw new AssertionError(msg); } } diff --git a/cli/js/web/dom_types.d.ts b/cli/js/web/dom_types.d.ts index a78fd7d9e..b5b172ccd 100644 --- a/cli/js/web/dom_types.d.ts +++ b/cli/js/web/dom_types.d.ts @@ -253,22 +253,6 @@ export interface Body { text(): Promise<string>; } -export interface WritableStream<W = any> { - readonly locked: boolean; - abort(reason?: any): Promise<void>; - getWriter(): WritableStreamDefaultWriter<W>; -} - -export interface WritableStreamDefaultWriter<W = any> { - readonly closed: Promise<void>; - readonly desiredSize: number | null; - readonly ready: Promise<void>; - abort(reason?: any): Promise<void>; - close(): Promise<void>; - releaseLock(): void; - write(chunk: W): Promise<void>; -} - export interface RequestInit { body?: BodyInit | null; cache?: RequestCache; diff --git a/cli/js/web/fetch.ts b/cli/js/web/fetch.ts index cbedcabfd..38ca03aca 100644 --- a/cli/js/web/fetch.ts +++ b/cli/js/web/fetch.ts @@ -258,7 +258,7 @@ class Body pipeThrough<T>( _: { - writable: domTypes.WritableStream<Uint8Array>; + writable: WritableStream<Uint8Array>; readable: ReadableStream<T>; }, _options?: PipeOptions @@ -267,7 +267,7 @@ class Body } pipeTo( - _dest: domTypes.WritableStream<Uint8Array>, + _dest: WritableStream<Uint8Array>, _options?: PipeOptions ): Promise<void> { return notImplemented(); diff --git a/cli/js/web/streams/internals.ts b/cli/js/web/streams/internals.ts index 2559d9e5c..846db096e 100644 --- a/cli/js/web/streams/internals.ts +++ b/cli/js/web/streams/internals.ts @@ -13,13 +13,25 @@ import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_c import { ReadableStreamDefaultReaderImpl } from "./readable_stream_default_reader.ts"; import { ReadableStreamImpl } from "./readable_stream.ts"; import * as sym from "./symbols.ts"; +import { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts"; +import { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts"; +import { WritableStreamImpl } from "./writable_stream.ts"; +import { AbortSignalImpl } from "../abort_signal.ts"; +import { DOMExceptionImpl as DOMException } from "../dom_exception.ts"; import { cloneValue } from "../util.ts"; -import { assert } from "../../util.ts"; +import { assert, AssertionError } from "../../util.ts"; +export type AbortAlgorithm = (reason?: any) => PromiseLike<void>; +export interface AbortRequest { + promise: Deferred<void>; + reason?: any; + wasAlreadyErroring: boolean; +} export interface BufferQueueItem extends Pair<ArrayBuffer | SharedArrayBuffer> { offset: number; } export type CancelAlgorithm = (reason?: any) => PromiseLike<void>; +export type CloseAlgorithm = () => PromiseLike<void>; type Container<R = any> = { [sym.queue]: Array<Pair<R> | BufferQueueItem>; [sym.queueTotalSize]: number; @@ -28,11 +40,11 @@ export type Pair<R> = { value: R; size: number }; export type PullAlgorithm = () => PromiseLike<void>; export type SizeAlgorithm<T> = (chunk: T) => number; export type StartAlgorithm = () => void | PromiseLike<void>; +export type WriteAlgorithm<W> = (chunk: W) => Promise<void>; export interface Deferred<T> { promise: Promise<T>; resolve?: (value?: T | PromiseLike<T>) => void; reject?: (reason?: any) => void; - settled: boolean; } export interface ReadableStreamGenericReader<R = any> @@ -58,6 +70,12 @@ export function acquireReadableStreamDefaultReader<T>( return reader; } +export function acquireWritableStreamDefaultWriter<W>( + stream: WritableStreamImpl<W> +): WritableStreamDefaultWriterImpl<W> { + return new WritableStreamDefaultWriterImpl(stream); +} + function createAlgorithmFromUnderlyingMethod< O extends UnderlyingByteSource | UnderlyingSource, P extends keyof O @@ -157,14 +175,14 @@ function enqueueValueWithSize<R>( /** Non-spec mechanism to "unwrap" a promise and store it to be resolved * later. */ -function getDeferred<T>(): Deferred<T> { - let resolve = undefined; - let reject = undefined; +export function getDeferred<T>(): Required<Deferred<T>> { + let resolve: (value?: T | PromiseLike<T>) => void; + let reject: (reason?: any) => void; const promise = new Promise<T>((res, rej) => { resolve = res; reject = rej; }); - return { promise, resolve, reject, settled: false }; + return { promise, resolve: resolve!, reject: reject! }; } export function initializeReadableStream(stream: ReadableStreamImpl): void { @@ -173,6 +191,17 @@ export function initializeReadableStream(stream: ReadableStreamImpl): void { stream[sym.disturbed] = false; } +export function initializeWritableStream(stream: WritableStreamImpl): void { + stream[sym.state] = "writable"; + stream[sym.storedError] = stream[sym.writer] = stream[ + sym.writableStreamController + ] = stream[sym.inFlightWriteRequest] = stream[sym.closeRequest] = stream[ + sym.inFlightCloseRequest + ] = stream[sym.pendingAbortRequest] = undefined; + stream[sym.writeRequests] = []; + stream[sym.backpressure] = false; +} + function invokeOrNoop<O extends any, P extends keyof O>( o: O, p: P, @@ -278,6 +307,40 @@ export function isUnderlyingByteSource( return typeString === "bytes"; } +export function isWritableStream(x: unknown): x is WritableStreamImpl { + return typeof x !== "object" || + x === null || + !(sym.writableStreamController in x) + ? false + : true; +} + +export function isWritableStreamDefaultController( + x: unknown +): x is WritableStreamDefaultControllerImpl<any> { + return typeof x !== "object" || + x === null || + !(sym.controlledWritableStream in x) + ? false + : true; +} + +export function isWritableStreamDefaultWriter( + x: unknown +): x is WritableStreamDefaultWriterImpl<any> { + return typeof x !== "object" || x === null || !(sym.ownerWritableStream in x) + ? false + : true; +} + +export function isWritableStreamLocked(stream: WritableStreamImpl): boolean { + assert(isWritableStream(stream)); + if (stream[sym.writer] === undefined) { + return false; + } + return true; +} + export function makeSizeAlgorithmFromSizeFunction<T>( size: QueuingStrategySizeCallback<T> | undefined ): SizeAlgorithm<T> { @@ -292,6 +355,13 @@ export function makeSizeAlgorithmFromSizeFunction<T>( }; } +function peekQueueValue<T>(container: Container<T>): T | "close" { + assert(sym.queue in container && sym.queueTotalSize in container); + assert(container[sym.queue].length); + const [pair] = container[sym.queue]; + return pair.value as T; +} + function readableByteStreamControllerShouldCallPull( controller: ReadableByteStreamControllerImpl ): boolean { @@ -333,25 +403,27 @@ export function readableByteStreamControllerCallPullIfNeeded( assert(controller[sym.pullAgain] === false); controller[sym.pulling] = true; const pullPromise = controller[sym.pullAlgorithm](); - pullPromise.then( - () => { - controller[sym.pulling] = false; - if (controller[sym.pullAgain]) { - controller[sym.pullAgain]; - readableByteStreamControllerCallPullIfNeeded(controller); + setPromiseIsHandledToTrue( + pullPromise.then( + () => { + controller[sym.pulling] = false; + if (controller[sym.pullAgain]) { + controller[sym.pullAgain] = false; + readableByteStreamControllerCallPullIfNeeded(controller); + } + }, + (e) => { + readableByteStreamControllerError(controller, e); } - }, - (e) => { - readableByteStreamControllerError(controller, e); - } + ) ); } export function readableByteStreamControllerClearAlgorithms( controller: ReadableByteStreamControllerImpl ): void { - delete controller[sym.pullAlgorithm]; - delete controller[sym.cancelAlgorithm]; + (controller as any)[sym.pullAlgorithm] = undefined; + (controller as any)[sym.cancelAlgorithm] = undefined; } export function readableByteStreamControllerClose( @@ -476,7 +548,7 @@ export function readableStreamAddReadRequest<R>( return promise.promise; } -export async function readableStreamCancel<T>( +export function readableStreamCancel<T>( stream: ReadableStreamImpl<T>, reason: any ): Promise<void> { @@ -488,7 +560,9 @@ export async function readableStreamCancel<T>( return Promise.reject(stream[sym.storedError]); } readableStreamClose(stream); - await stream[sym.readableStreamController]; + return stream[sym.readableStreamController].then( + () => undefined + ) as Promise<void>; } export function readableStreamClose<T>(stream: ReadableStreamImpl<T>): void { @@ -514,7 +588,6 @@ export function readableStreamClose<T>(stream: ReadableStreamImpl<T>): void { const resolve = reader[sym.closedPromise].resolve; assert(resolve); resolve(); - reader[sym.closedPromise].settled = true; } export function readableStreamCreateReadResult<T>( @@ -573,9 +646,9 @@ export function readableStreamDefaultControllerCanCloseOrEnqueue<T>( export function readableStreamDefaultControllerClearAlgorithms<T>( controller: ReadableStreamDefaultControllerImpl<T> ): void { - delete controller[sym.pullAlgorithm]; - delete controller[sym.cancelAlgorithm]; - delete controller[sym.strategySizeAlgorithm]; + (controller as any)[sym.pullAlgorithm] = undefined; + (controller as any)[sym.cancelAlgorithm] = undefined; + (controller as any)[sym.strategySizeAlgorithm] = undefined; } export function readableStreamDefaultControllerClose<T>( @@ -703,17 +776,18 @@ export function readableStreamError(stream: ReadableStreamImpl, e: any): void { } if (isReadableStreamDefaultReader(reader)) { for (const readRequest of reader[sym.readRequests]) { - const { reject } = readRequest; - assert(reject); - reject(e); + assert(readRequest.reject); + readRequest.reject(e); + readRequest.reject = undefined; + readRequest.resolve = undefined; } reader[sym.readRequests] = []; } // 3.5.6.8 Otherwise, support BYOB Reader - const { reject } = reader[sym.closedPromise]; - assert(reject); - reject(e); - reader[sym.closedPromise].settled = true; + reader[sym.closedPromise].reject!(e); + reader[sym.closedPromise].reject = undefined; + reader[sym.closedPromise].resolve = undefined; + setPromiseIsHandledToTrue(reader[sym.closedPromise].promise); } export function readableStreamFulfillReadRequest<R>( @@ -744,6 +818,252 @@ export function readableStreamHasDefaultReader( : true; } +export function readableStreamPipeTo<T>( + source: ReadableStreamImpl<T>, + dest: WritableStreamImpl<T>, + preventClose: boolean, + preventAbort: boolean, + preventCancel: boolean, + signal: AbortSignalImpl | undefined +): Promise<void> { + assert(isReadableStream(source)); + assert(isWritableStream(dest)); + assert( + typeof preventClose === "boolean" && + typeof preventAbort === "boolean" && + typeof preventCancel === "boolean" + ); + assert(signal === undefined || signal instanceof AbortSignalImpl); + assert(!isReadableStreamLocked(source)); + assert(!isWritableStreamLocked(dest)); + const reader = acquireReadableStreamDefaultReader(source); + const writer = acquireWritableStreamDefaultWriter(dest); + source[sym.disturbed] = true; + let shuttingDown = false; + const promise = getDeferred<void>(); + let abortAlgorithm: () => void; + if (signal) { + abortAlgorithm = (): void => { + const error = new DOMException("Abort signal received.", "AbortSignal"); + const actions: Array<() => Promise<void>> = []; + if (!preventAbort) { + actions.push(() => { + if (dest[sym.state] === "writable") { + return writableStreamAbort(dest, error); + } else { + return Promise.resolve(undefined); + } + }); + } + if (!preventCancel) { + actions.push(() => { + if (source[sym.state] === "readable") { + return readableStreamCancel(source, error); + } else { + return Promise.resolve(undefined); + } + }); + } + shutdownWithAction( + () => Promise.all(actions.map((action) => action())), + true, + error + ); + }; + if (signal.aborted) { + abortAlgorithm(); + return promise.promise; + } + signal.addEventListener("abort", abortAlgorithm); + } + + let currentWrite = Promise.resolve(); + + // At this point, the spec becomes non-specific and vague. Most of the rest + // of this code is based on the reference implementation that is part of the + // specification. This is why the functions are only scoped to this function + // to ensure they don't leak into the spec compliant parts. + + function isOrBecomesClosed( + stream: ReadableStreamImpl | WritableStreamImpl, + promise: Promise<void>, + action: () => void + ): void { + if (stream[sym.state] === "closed") { + action(); + } else { + setPromiseIsHandledToTrue(promise.then(action)); + } + } + + function isOrBecomesErrored( + stream: ReadableStreamImpl | WritableStreamImpl, + promise: Promise<void>, + action: (error: any) => void + ): void { + if (stream[sym.state] === "errored") { + action(stream[sym.storedError]); + } else { + setPromiseIsHandledToTrue(promise.catch((error) => action(error))); + } + } + + function finalize(isError?: boolean, error?: any): void { + writableStreamDefaultWriterRelease(writer); + readableStreamReaderGenericRelease(reader); + + if (signal) { + signal.removeEventListener("abort", abortAlgorithm); + } + if (isError) { + promise.reject(error); + } else { + promise.resolve(); + } + } + + function waitForWritesToFinish(): Promise<void> { + const oldCurrentWrite = currentWrite; + return currentWrite.then(() => + oldCurrentWrite !== currentWrite ? waitForWritesToFinish() : undefined + ); + } + + function shutdownWithAction( + action: () => Promise<any>, + originalIsError?: boolean, + originalError?: any + ): void { + function doTheRest(): void { + setPromiseIsHandledToTrue( + action().then( + () => finalize(originalIsError, originalError), + (newError) => finalize(true, newError) + ) + ); + } + + if (shuttingDown) { + return; + } + shuttingDown = true; + + if ( + dest[sym.state] === "writable" && + writableStreamCloseQueuedOrInFlight(dest) === false + ) { + setPromiseIsHandledToTrue(waitForWritesToFinish().then(doTheRest)); + } else { + doTheRest(); + } + } + + function shutdown(isError: boolean, error?: any): void { + if (shuttingDown) { + return; + } + shuttingDown = true; + + if ( + dest[sym.state] === "writable" && + !writableStreamCloseQueuedOrInFlight(dest) + ) { + setPromiseIsHandledToTrue( + waitForWritesToFinish().then(() => finalize(isError, error)) + ); + } + finalize(isError, error); + } + + function pipeStep(): Promise<boolean> { + if (shuttingDown) { + return Promise.resolve(true); + } + return writer[sym.readyPromise].promise.then(() => { + return readableStreamDefaultReaderRead(reader).then(({ value, done }) => { + if (done === true) { + return true; + } + currentWrite = writableStreamDefaultWriterWrite( + writer, + value! + ).then(undefined, () => {}); + return false; + }); + }); + } + + function pipeLoop(): Promise<void> { + return new Promise((resolveLoop, rejectLoop) => { + function next(done: boolean): void { + if (done) { + resolveLoop(undefined); + } else { + setPromiseIsHandledToTrue(pipeStep().then(next, rejectLoop)); + } + } + next(false); + }); + } + + isOrBecomesErrored( + source, + reader[sym.closedPromise].promise, + (storedError) => { + if (!preventAbort) { + shutdownWithAction( + () => writableStreamAbort(dest, storedError), + true, + storedError + ); + } else { + shutdown(true, storedError); + } + } + ); + + isOrBecomesErrored(dest, writer[sym.closedPromise].promise, (storedError) => { + if (!preventCancel) { + shutdownWithAction( + () => readableStreamCancel(source, storedError), + true, + storedError + ); + } else { + shutdown(true, storedError); + } + }); + + isOrBecomesClosed(source, reader[sym.closedPromise].promise, () => { + if (!preventClose) { + shutdownWithAction(() => + writableStreamDefaultWriterCloseWithErrorPropagation(writer) + ); + } + }); + + if ( + writableStreamCloseQueuedOrInFlight(dest) || + dest[sym.state] === "closed" + ) { + const destClosed = new TypeError( + "The destination writable stream closed before all data could be piped to it." + ); + if (!preventCancel) { + shutdownWithAction( + () => readableStreamCancel(source, destClosed), + true, + destClosed + ); + } else { + shutdown(true, destClosed); + } + } + + setPromiseIsHandledToTrue(pipeLoop()); + return promise.promise; +} + export function readableStreamReaderGenericCancel<R = any>( reader: ReadableStreamGenericReader<R>, reason: any @@ -763,16 +1083,13 @@ export function readableStreamReaderGenericInitialize<R = any>( if (stream[sym.state] === "readable") { reader[sym.closedPromise] = getDeferred(); } else if (stream[sym.state] === "closed") { - reader[sym.closedPromise] = { - promise: Promise.resolve(), - settled: true, - }; + reader[sym.closedPromise] = { promise: Promise.resolve() }; } else { assert(stream[sym.state] === "errored"); reader[sym.closedPromise] = { promise: Promise.reject(stream[sym.storedError]), - settled: true, }; + setPromiseIsHandledToTrue(reader[sym.closedPromise].promise); } } @@ -790,9 +1107,9 @@ export function readableStreamReaderGenericRelease<R = any>( delete closedPromise.reject; delete closedPromise.resolve; } - closedPromise.settled = true; - delete reader[sym.ownerReadableStream][sym.reader]; - delete reader[sym.ownerReadableStream]; + setPromiseIsHandledToTrue(closedPromise.promise); + reader[sym.ownerReadableStream][sym.reader] = undefined; + (reader as any)[sym.ownerReadableStream] = undefined; } export function readableStreamTee<T>( @@ -817,51 +1134,54 @@ export function readableStreamTee<T>( return Promise.resolve(); } reading = true; - readableStreamDefaultReaderRead(reader).then((result) => { - reading = false; - assert(typeof result === "object"); - const { done } = result; - assert(typeof done === "boolean"); - if (done) { + const readPromise = readableStreamDefaultReaderRead(reader).then( + (result) => { + reading = false; + assert(typeof result === "object"); + const { done } = result; + assert(typeof done === "boolean"); + if (done) { + if (!canceled1) { + readableStreamDefaultControllerClose( + branch1[ + sym.readableStreamController + ] as ReadableStreamDefaultControllerImpl + ); + } + if (!canceled2) { + readableStreamDefaultControllerClose( + branch2[ + sym.readableStreamController + ] as ReadableStreamDefaultControllerImpl + ); + } + return; + } + const { value } = result; + const value1 = value!; + let value2 = value!; + if (!canceled2 && cloneForBranch2) { + value2 = cloneValue(value2); + } if (!canceled1) { - readableStreamDefaultControllerClose( + readableStreamDefaultControllerEnqueue( branch1[ sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl + ] as ReadableStreamDefaultControllerImpl, + value1 ); } if (!canceled2) { - readableStreamDefaultControllerClose( + readableStreamDefaultControllerEnqueue( branch2[ sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl + ] as ReadableStreamDefaultControllerImpl, + value2 ); } - return; - } - const { value } = result; - const value1 = value!; - let value2 = value!; - if (!canceled2 && cloneForBranch2) { - value2 = cloneValue(value2); } - if (!canceled1) { - readableStreamDefaultControllerEnqueue( - branch1[ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl, - value1 - ); - } - if (!canceled2) { - readableStreamDefaultControllerEnqueue( - branch2[ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl, - value2 - ); - } - }); + ); + setPromiseIsHandledToTrue(readPromise); return Promise.resolve(); }; const cancel1Algorithm = (reason?: any): PromiseLike<void> => { @@ -870,7 +1190,6 @@ export function readableStreamTee<T>( if (canceled2) { const compositeReason = [reason1, reason2]; const cancelResult = readableStreamCancel(stream, compositeReason); - assert(cancelPromise.resolve); cancelPromise.resolve(cancelResult); } return cancelPromise.promise; @@ -881,7 +1200,6 @@ export function readableStreamTee<T>( if (canceled1) { const compositeReason = [reason1, reason2]; const cancelResult = readableStreamCancel(stream, compositeReason); - assert(cancelPromise.resolve); cancelPromise.resolve(cancelResult); } return cancelPromise.promise; @@ -897,20 +1215,22 @@ export function readableStreamTee<T>( pullAlgorithm, cancel2Algorithm ); - reader[sym.closedPromise].promise.catch((r) => { - readableStreamDefaultControllerError( - branch1[ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl, - r - ); - readableStreamDefaultControllerError( - branch2[ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl, - r - ); - }); + setPromiseIsHandledToTrue( + reader[sym.closedPromise].promise.catch((r) => { + readableStreamDefaultControllerError( + branch1[ + sym.readableStreamController + ] as ReadableStreamDefaultControllerImpl, + r + ); + readableStreamDefaultControllerError( + branch2[ + sym.readableStreamController + ] as ReadableStreamDefaultControllerImpl, + r + ); + }) + ); return [branch1, branch2]; } @@ -920,6 +1240,25 @@ export function resetQueue<R>(container: Container<R>): void { container[sym.queueTotalSize] = 0; } +/** An internal function which provides a function name for some generated + * functions, so stack traces are a bit more readable. */ +export function setFunctionName(fn: Function, value: string): void { + Object.defineProperty(fn, "name", { value, configurable: true }); +} + +/** An internal function which mimics the behavior of setting the promise to + * handled in JavaScript. In this situation, an assertion failure, which + * shouldn't happen will get thrown, instead of swallowed. */ +export function setPromiseIsHandledToTrue(promise: PromiseLike<unknown>): void { + promise.then(undefined, (e) => { + if (e && e instanceof AssertionError) { + queueMicrotask(() => { + throw e; + }); + } + }); +} + function setUpReadableByteStreamController( stream: ReadableStreamImpl, controller: ReadableByteStreamControllerImpl, @@ -950,16 +1289,18 @@ function setUpReadableByteStreamController( stream[sym.readableStreamController] = controller; const startResult = startAlgorithm(); const startPromise = Promise.resolve(startResult); - startPromise.then( - () => { - controller[sym.started] = true; - assert(!controller[sym.pulling]); - assert(!controller[sym.pullAgain]); - readableByteStreamControllerCallPullIfNeeded(controller); - }, - (r) => { - readableByteStreamControllerError(controller, r); - } + setPromiseIsHandledToTrue( + startPromise.then( + () => { + controller[sym.started] = true; + assert(!controller[sym.pulling]); + assert(!controller[sym.pullAgain]); + readableByteStreamControllerCallPullIfNeeded(controller); + }, + (r) => { + readableByteStreamControllerError(controller, r); + } + ) ); } @@ -981,11 +1322,13 @@ export function setUpReadableByteStreamControllerFromUnderlyingSource( 0, controller ); + setFunctionName(pullAlgorithm, "[[pullAlgorithm]]"); const cancelAlgorithm = createAlgorithmFromUnderlyingMethod( underlyingByteSource, "cancel", 1 ); + setFunctionName(cancelAlgorithm, "[[cancelAlgorithm]]"); // 3.13.27.6 Let autoAllocateChunkSize be ? GetV(underlyingByteSource, "autoAllocateChunkSize"). const autoAllocateChunkSize = undefined; setUpReadableByteStreamController( @@ -1022,16 +1365,18 @@ function setUpReadableStreamDefaultController<T>( stream[sym.readableStreamController] = controller; const startResult = startAlgorithm(); const startPromise = Promise.resolve(startResult); - startPromise.then( - () => { - controller[sym.started] = true; - assert(controller[sym.pulling] === false); - assert(controller[sym.pullAgain] === false); - readableStreamDefaultControllerCallPullIfNeeded(controller); - }, - (r) => { - readableStreamDefaultControllerError(controller, r); - } + setPromiseIsHandledToTrue( + startPromise.then( + () => { + controller[sym.started] = true; + assert(controller[sym.pulling] === false); + assert(controller[sym.pullAgain] === false); + readableStreamDefaultControllerCallPullIfNeeded(controller); + }, + (r) => { + readableStreamDefaultControllerError(controller, r); + } + ) ); } @@ -1053,11 +1398,13 @@ export function setUpReadableStreamDefaultControllerFromUnderlyingSource<T>( 0, controller ); + setFunctionName(pullAlgorithm, "[[pullAlgorithm]]"); const cancelAlgorithm: CancelAlgorithm = createAlgorithmFromUnderlyingMethod( underlyingSource, "cancel", 1 ); + setFunctionName(cancelAlgorithm, "[[cancelAlgorithm]]"); setUpReadableStreamDefaultController( stream, controller, @@ -1069,6 +1416,98 @@ export function setUpReadableStreamDefaultControllerFromUnderlyingSource<T>( ); } +function setUpWritableStreamDefaultController<W>( + stream: WritableStreamImpl<W>, + controller: WritableStreamDefaultControllerImpl<W>, + startAlgorithm: StartAlgorithm, + writeAlgorithm: WriteAlgorithm<W>, + closeAlgorithm: CloseAlgorithm, + abortAlgorithm: AbortAlgorithm, + highWaterMark: number, + sizeAlgorithm: SizeAlgorithm<W> +): void { + assert(isWritableStream(stream)); + assert(stream[sym.writableStreamController] === undefined); + controller[sym.controlledWritableStream] = stream; + stream[sym.writableStreamController] = controller; + controller[sym.queue] = []; + controller[sym.queueTotalSize] = 0; + controller[sym.started] = false; + controller[sym.strategySizeAlgorithm] = sizeAlgorithm; + controller[sym.strategyHWM] = highWaterMark; + controller[sym.writeAlgorithm] = writeAlgorithm; + controller[sym.closeAlgorithm] = closeAlgorithm; + controller[sym.abortAlgorithm] = abortAlgorithm; + const backpressure = writableStreamDefaultControllerGetBackpressure( + controller + ); + writableStreamUpdateBackpressure(stream, backpressure); + const startResult = startAlgorithm(); + const startPromise = Promise.resolve(startResult); + setPromiseIsHandledToTrue( + startPromise.then( + () => { + assert( + stream[sym.state] === "writable" || stream[sym.state] === "erroring" + ); + controller[sym.started] = true; + writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + }, + (r) => { + assert( + stream[sym.state] === "writable" || stream[sym.state] === "erroring" + ); + controller[sym.started] = true; + writableStreamDealWithRejection(stream, r); + } + ) + ); +} + +export function setUpWritableStreamDefaultControllerFromUnderlyingSink<W>( + stream: WritableStreamImpl<W>, + underlyingSink: UnderlyingSink<W>, + highWaterMark: number, + sizeAlgorithm: SizeAlgorithm<W> +): void { + assert(underlyingSink); + const controller = Object.create( + WritableStreamDefaultControllerImpl.prototype + ); + const startAlgorithm = (): void | PromiseLike<void> => { + return invokeOrNoop(underlyingSink, "start", controller); + }; + const writeAlgorithm = createAlgorithmFromUnderlyingMethod( + underlyingSink, + "write", + 1, + controller + ); + setFunctionName(writeAlgorithm, "[[writeAlgorithm]]"); + const closeAlgorithm = createAlgorithmFromUnderlyingMethod( + underlyingSink, + "close", + 0 + ); + setFunctionName(closeAlgorithm, "[[closeAlgorithm]]"); + const abortAlgorithm = createAlgorithmFromUnderlyingMethod( + underlyingSink, + "abort", + 1 + ); + setFunctionName(abortAlgorithm, "[[abortAlgorithm]]"); + setUpWritableStreamDefaultController( + stream, + controller, + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + highWaterMark, + sizeAlgorithm + ); +} + function transferArrayBuffer(buffer: ArrayBuffer): ArrayBuffer { assert(!isDetachedBuffer(buffer)); const transferredIshVersion = buffer.slice(0); @@ -1095,4 +1534,571 @@ export function validateAndNormalizeHighWaterMark( return highWaterMark; } +export function writableStreamAbort<W>( + stream: WritableStreamImpl<W>, + reason: any +): Promise<void> { + const state = stream[sym.state]; + if (state === "closed" || state === "errored") { + return Promise.resolve(undefined); + } + if (stream[sym.pendingAbortRequest]) { + return stream[sym.pendingAbortRequest]!.promise.promise; + } + assert(state === "writable" || state === "erroring"); + let wasAlreadyErroring = false; + if (state === "erroring") { + wasAlreadyErroring = true; + reason = undefined; + } + const promise = getDeferred<void>(); + stream[sym.pendingAbortRequest] = { promise, reason, wasAlreadyErroring }; + + if (wasAlreadyErroring === false) { + writableStreamStartErroring(stream, reason); + } + return promise.promise; +} + +function writableStreamAddWriteRequest<W>( + stream: WritableStreamImpl<W> +): Promise<void> { + assert(isWritableStream(stream)); + assert(stream[sym.state] === "writable"); + const promise = getDeferred<void>(); + stream[sym.writeRequests].push(promise); + return promise.promise; +} + +export function writableStreamClose<W>( + stream: WritableStreamImpl<W> +): Promise<void> { + const state = stream[sym.state]; + if (state === "closed" || state === "errored") { + return Promise.reject( + new TypeError("Cannot close an already closed or errored WritableStream.") + ); + } + assert(!writableStreamCloseQueuedOrInFlight(stream)); + const promise = getDeferred<void>(); + stream[sym.closeRequest] = promise; + const writer = stream[sym.writer]; + if (writer && stream[sym.backpressure] && state === "writable") { + writer[sym.readyPromise].resolve!(); + writer[sym.readyPromise].resolve = undefined; + writer[sym.readyPromise].reject = undefined; + } + writableStreamDefaultControllerClose(stream[sym.writableStreamController]!); + return promise.promise; +} + +export function writableStreamCloseQueuedOrInFlight<W>( + stream: WritableStreamImpl<W> +): boolean { + if ( + stream[sym.closeRequest] === undefined && + stream[sym.inFlightCloseRequest] === undefined + ) { + return false; + } + return true; +} + +function writableStreamDealWithRejection<W>( + stream: WritableStreamImpl<W>, + error: any +): void { + const state = stream[sym.state]; + if (state === "writable") { + writableStreamStartErroring(stream, error); + return; + } + assert(state === "erroring"); + writableStreamFinishErroring(stream); +} + +function writableStreamDefaultControllerAdvanceQueueIfNeeded<W>( + controller: WritableStreamDefaultControllerImpl<W> +): void { + const stream = controller[sym.controlledWritableStream]; + if (!controller[sym.started]) { + return; + } + if (stream[sym.inFlightWriteRequest]) { + return; + } + const state = stream[sym.state]; + assert(state !== "closed" && state !== "errored"); + if (state === "erroring") { + writableStreamFinishErroring(stream); + return; + } + if (!controller[sym.queue].length) { + return; + } + const writeRecord = peekQueueValue(controller); + if (writeRecord === "close") { + writableStreamDefaultControllerProcessClose(controller); + } else { + writableStreamDefaultControllerProcessWrite(controller, writeRecord.chunk); + } +} + +export function writableStreamDefaultControllerClearAlgorithms<W>( + controller: WritableStreamDefaultControllerImpl<W> +): void { + (controller as any)[sym.writeAlgorithm] = undefined; + (controller as any)[sym.closeAlgorithm] = undefined; + (controller as any)[sym.abortAlgorithm] = undefined; + (controller as any)[sym.strategySizeAlgorithm] = undefined; +} + +function writableStreamDefaultControllerClose<W>( + controller: WritableStreamDefaultControllerImpl<W> +): void { + enqueueValueWithSize(controller, "close", 0); + writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); +} + +export function writableStreamDefaultControllerError<W>( + controller: WritableStreamDefaultControllerImpl<W>, + error: any +): void { + const stream = controller[sym.controlledWritableStream]; + assert(stream[sym.state] === "writable"); + writableStreamDefaultControllerClearAlgorithms(controller); + writableStreamStartErroring(stream, error); +} + +function writableStreamDefaultControllerErrorIfNeeded<W>( + controller: WritableStreamDefaultControllerImpl<W>, + error: any +): void { + if (controller[sym.controlledWritableStream][sym.state] === "writable") { + writableStreamDefaultControllerError(controller, error); + } +} + +function writableStreamDefaultControllerGetBackpressure<W>( + controller: WritableStreamDefaultControllerImpl<W> +): boolean { + const desiredSize = writableStreamDefaultControllerGetDesiredSize(controller); + return desiredSize <= 0; +} + +function writableStreamDefaultControllerGetChunkSize<W>( + controller: WritableStreamDefaultControllerImpl<W>, + chunk: W +): number { + let returnValue: number; + try { + returnValue = controller[sym.strategySizeAlgorithm](chunk); + } catch (e) { + writableStreamDefaultControllerErrorIfNeeded(controller, e); + return 1; + } + return returnValue; +} + +function writableStreamDefaultControllerGetDesiredSize<W>( + controller: WritableStreamDefaultControllerImpl<W> +): number { + return controller[sym.strategyHWM] - controller[sym.queueTotalSize]; +} + +function writableStreamDefaultControllerProcessClose<W>( + controller: WritableStreamDefaultControllerImpl<W> +): void { + const stream = controller[sym.controlledWritableStream]; + writableStreamMarkCloseRequestInFlight(stream); + dequeueValue(controller); + assert(controller[sym.queue].length === 0); + const sinkClosePromise = controller[sym.closeAlgorithm](); + writableStreamDefaultControllerClearAlgorithms(controller); + setPromiseIsHandledToTrue( + sinkClosePromise.then( + () => { + writableStreamFinishInFlightClose(stream); + }, + (reason) => { + writableStreamFinishInFlightCloseWithError(stream, reason); + } + ) + ); +} + +function writableStreamDefaultControllerProcessWrite<W>( + controller: WritableStreamDefaultControllerImpl<W>, + chunk: W +): void { + const stream = controller[sym.controlledWritableStream]; + writableStreamMarkFirstWriteRequestInFlight(stream); + const sinkWritePromise = controller[sym.writeAlgorithm](chunk); + setPromiseIsHandledToTrue( + sinkWritePromise.then( + () => { + writableStreamFinishInFlightWrite(stream); + const state = stream[sym.state]; + assert(state === "writable" || state === "erroring"); + dequeueValue(controller); + if ( + !writableStreamCloseQueuedOrInFlight(stream) && + state === "writable" + ) { + const backpressure = writableStreamDefaultControllerGetBackpressure( + controller + ); + writableStreamUpdateBackpressure(stream, backpressure); + } + writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + }, + (reason) => { + if (stream[sym.state] === "writable") { + writableStreamDefaultControllerClearAlgorithms(controller); + } + writableStreamFinishInFlightWriteWithError(stream, reason); + } + ) + ); +} + +function writableStreamDefaultControllerWrite<W>( + controller: WritableStreamDefaultControllerImpl<W>, + chunk: W, + chunkSize: number +): void { + const writeRecord = { chunk }; + try { + enqueueValueWithSize(controller, writeRecord, chunkSize); + } catch (e) { + writableStreamDefaultControllerErrorIfNeeded(controller, e); + return; + } + const stream = controller[sym.controlledWritableStream]; + if ( + !writableStreamCloseQueuedOrInFlight(stream) && + stream[sym.state] === "writable" + ) { + const backpressure = writableStreamDefaultControllerGetBackpressure( + controller + ); + writableStreamUpdateBackpressure(stream, backpressure); + } + writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); +} + +export function writableStreamDefaultWriterAbort<W>( + writer: WritableStreamDefaultWriterImpl<W>, + reason: any +): Promise<void> { + const stream = writer[sym.ownerWritableStream]; + assert(stream); + return writableStreamAbort(stream, reason); +} + +export function writableStreamDefaultWriterClose<W>( + writer: WritableStreamDefaultWriterImpl<W> +): Promise<void> { + const stream = writer[sym.ownerWritableStream]; + assert(stream); + return writableStreamClose(stream); +} + +function writableStreamDefaultWriterCloseWithErrorPropagation<W>( + writer: WritableStreamDefaultWriterImpl<W> +): Promise<void> { + const stream = writer[sym.ownerWritableStream]; + assert(stream); + const state = stream[sym.state]; + if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") { + return Promise.resolve(); + } + if (state === "errored") { + return Promise.reject(stream[sym.storedError]); + } + assert(state === "writable" || state === "erroring"); + return writableStreamDefaultWriterClose(writer); +} + +function writableStreamDefaultWriterEnsureClosePromiseRejected<W>( + writer: WritableStreamDefaultWriterImpl<W>, + error: any +): void { + if (writer[sym.closedPromise].reject) { + writer[sym.closedPromise].reject!(error); + } else { + writer[sym.closedPromise] = { + promise: Promise.reject(error), + }; + } + setPromiseIsHandledToTrue(writer[sym.closedPromise].promise); +} + +function writableStreamDefaultWriterEnsureReadyPromiseRejected<W>( + writer: WritableStreamDefaultWriterImpl<W>, + error: any +): void { + if (writer[sym.readyPromise].reject) { + writer[sym.readyPromise].reject!(error); + writer[sym.readyPromise].reject = undefined; + writer[sym.readyPromise].resolve = undefined; + } else { + writer[sym.readyPromise] = { + promise: Promise.reject(error), + }; + } + setPromiseIsHandledToTrue(writer[sym.readyPromise].promise); +} + +export function writableStreamDefaultWriterWrite<W>( + writer: WritableStreamDefaultWriterImpl<W>, + chunk: W +): Promise<void> { + const stream = writer[sym.ownerWritableStream]; + assert(stream); + const controller = stream[sym.writableStreamController]; + assert(controller); + const chunkSize = writableStreamDefaultControllerGetChunkSize( + controller, + chunk + ); + if (stream !== writer[sym.ownerWritableStream]) { + return Promise.reject("Writer has incorrect WritableStream."); + } + const state = stream[sym.state]; + if (state === "errored") { + return Promise.reject(stream[sym.storedError]); + } + if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") { + return Promise.reject(new TypeError("The stream is closed or closing.")); + } + if (state === "erroring") { + return Promise.reject(stream[sym.storedError]); + } + assert(state === "writable"); + const promise = writableStreamAddWriteRequest(stream); + writableStreamDefaultControllerWrite(controller, chunk, chunkSize); + return promise; +} + +export function writableStreamDefaultWriterGetDesiredSize<W>( + writer: WritableStreamDefaultWriterImpl<W> +): number | null { + const stream = writer[sym.ownerWritableStream]; + const state = stream[sym.state]; + if (state === "errored" || state === "erroring") { + return null; + } + if (state === "closed") { + return 0; + } + return writableStreamDefaultControllerGetDesiredSize( + stream[sym.writableStreamController]! + ); +} + +export function writableStreamDefaultWriterRelease<W>( + writer: WritableStreamDefaultWriterImpl<W> +): void { + const stream = writer[sym.ownerWritableStream]; + assert(stream); + assert(stream[sym.writer] === writer); + const releasedError = new TypeError( + "Writer was released and can no longer be used to monitor the stream's closedness." + ); + writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError); + writableStreamDefaultWriterEnsureClosePromiseRejected(writer, releasedError); + stream[sym.writer] = undefined; + (writer as any)[sym.ownerWritableStream] = undefined; +} + +function writableStreamFinishErroring<W>(stream: WritableStreamImpl<W>): void { + assert(stream[sym.state] === "erroring"); + assert(!writableStreamHasOperationMarkedInFlight(stream)); + stream[sym.state] = "errored"; + stream[sym.writableStreamController]![sym.errorSteps](); + const storedError = stream[sym.storedError]; + for (const writeRequest of stream[sym.writeRequests]) { + assert(writeRequest.reject); + writeRequest.reject(storedError); + } + stream[sym.writeRequests] = []; + if (!stream[sym.pendingAbortRequest]) { + writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + return; + } + const abortRequest = stream[sym.pendingAbortRequest]; + assert(abortRequest); + stream[sym.pendingAbortRequest] = undefined; + if (abortRequest.wasAlreadyErroring) { + assert(abortRequest.promise.reject); + abortRequest.promise.reject(storedError); + writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + return; + } + const promise = stream[sym.writableStreamController]; + setPromiseIsHandledToTrue( + promise.then( + () => { + assert(abortRequest.promise.resolve); + abortRequest.promise.resolve(); + writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + }, + (reason) => { + assert(abortRequest.promise.reject); + abortRequest.promise.reject(reason); + writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + } + ) + ); +} + +function writableStreamFinishInFlightClose<W>( + stream: WritableStreamImpl<W> +): void { + assert(stream[sym.inFlightCloseRequest]); + stream[sym.inFlightCloseRequest]?.resolve!(); + stream[sym.inFlightCloseRequest] = undefined; + const state = stream[sym.state]; + assert(state === "writable" || state === "erroring"); + if (state === "erroring") { + stream[sym.storedError] = undefined; + if (stream[sym.pendingAbortRequest]) { + stream[sym.pendingAbortRequest]!.promise.resolve!(); + stream[sym.pendingAbortRequest] = undefined; + } + } + stream[sym.state] = "closed"; + const writer = stream[sym.writer]; + if (writer) { + writer[sym.closedPromise].resolve!(); + } + assert(stream[sym.pendingAbortRequest] === undefined); + assert(stream[sym.storedError] === undefined); +} + +function writableStreamFinishInFlightCloseWithError<W>( + stream: WritableStreamImpl<W>, + error: any +): void { + assert(stream[sym.inFlightCloseRequest]); + stream[sym.inFlightCloseRequest]?.reject!(error); + stream[sym.inFlightCloseRequest] = undefined; + assert(stream[sym.state] === "writable" || stream[sym.state] === "erroring"); + if (stream[sym.pendingAbortRequest]) { + stream[sym.pendingAbortRequest]?.promise.reject!(error); + stream[sym.pendingAbortRequest] = undefined; + } + writableStreamDealWithRejection(stream, error); +} + +function writableStreamFinishInFlightWrite<W>( + stream: WritableStreamImpl<W> +): void { + assert(stream[sym.inFlightWriteRequest]); + stream[sym.inFlightWriteRequest]!.resolve(); + stream[sym.inFlightWriteRequest] = undefined; +} + +function writableStreamFinishInFlightWriteWithError<W>( + stream: WritableStreamImpl<W>, + error: any +): void { + assert(stream[sym.inFlightWriteRequest]); + stream[sym.inFlightWriteRequest]!.reject!(error); + stream[sym.inFlightWriteRequest] = undefined; + assert(stream[sym.state] === "writable" || stream[sym.state] === "erroring"); + writableStreamDealWithRejection(stream, error); +} + +function writableStreamHasOperationMarkedInFlight<W>( + stream: WritableStreamImpl<W> +): boolean { + if ( + stream[sym.inFlightWriteRequest] === undefined && + stream[sym.inFlightCloseRequest] === undefined + ) { + return false; + } + return true; +} + +function writableStreamMarkCloseRequestInFlight<W>( + stream: WritableStreamImpl<W> +): void { + assert(stream[sym.inFlightCloseRequest] === undefined); + assert(stream[sym.closeRequest] !== undefined); + stream[sym.inFlightCloseRequest] = stream[sym.closeRequest]; + stream[sym.closeRequest] = undefined; +} + +function writableStreamMarkFirstWriteRequestInFlight<W>( + stream: WritableStreamImpl<W> +): void { + assert(stream[sym.inFlightWriteRequest] === undefined); + assert(stream[sym.writeRequests].length); + const writeRequest = stream[sym.writeRequests].shift(); + stream[sym.inFlightWriteRequest] = writeRequest; +} + +function writableStreamRejectCloseAndClosedPromiseIfNeeded<W>( + stream: WritableStreamImpl<W> +): void { + assert(stream[sym.state] === "errored"); + if (stream[sym.closeRequest]) { + assert(stream[sym.inFlightCloseRequest] === undefined); + stream[sym.closeRequest]!.reject!(stream[sym.storedError]); + stream[sym.closeRequest] = undefined; + } + const writer = stream[sym.writer]; + if (writer) { + writer[sym.closedPromise].reject!(stream[sym.storedError]); + setPromiseIsHandledToTrue(writer[sym.closedPromise].promise); + } +} + +function writableStreamStartErroring<W>( + stream: WritableStreamImpl<W>, + reason: any +): void { + assert(stream[sym.storedError] === undefined); + assert(stream[sym.state] === "writable"); + const controller = stream[sym.writableStreamController]; + assert(controller); + stream[sym.state] = "erroring"; + stream[sym.storedError] = reason; + const writer = stream[sym.writer]; + if (writer) { + writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason); + } + if ( + !writableStreamHasOperationMarkedInFlight(stream) && + controller[sym.started] + ) { + writableStreamFinishErroring(stream); + } +} + +function writableStreamUpdateBackpressure<W>( + stream: WritableStreamImpl<W>, + backpressure: boolean +): void { + assert(stream[sym.state] === "writable"); + assert(!writableStreamCloseQueuedOrInFlight(stream)); + const writer = stream[sym.writer]; + if (writer && backpressure !== stream[sym.backpressure]) { + if (backpressure) { + writer[sym.readyPromise] = getDeferred(); + } else { + assert(backpressure === false); + writer[sym.readyPromise].resolve!(); + writer[sym.readyPromise].resolve = undefined; + writer[sym.readyPromise].reject = undefined; + } + } + stream[sym.backpressure] = backpressure; +} + /* eslint-enable */ diff --git a/cli/js/web/streams/queuing_strategy.ts b/cli/js/web/streams/queuing_strategy.ts new file mode 100644 index 000000000..d2717874e --- /dev/null +++ b/cli/js/web/streams/queuing_strategy.ts @@ -0,0 +1,53 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +import { setFunctionName } from "./internals.ts"; +import { customInspect } from "../console.ts"; + +export class CountQueuingStrategyImpl implements CountQueuingStrategy { + highWaterMark: number; + + constructor({ highWaterMark }: { highWaterMark: number }) { + this.highWaterMark = highWaterMark; + } + + size(): 1 { + return 1; + } + + [customInspect](): string { + return `${this.constructor.name} { highWaterMark: ${String( + this.highWaterMark + )}, size: f }`; + } +} + +Object.defineProperty(CountQueuingStrategyImpl.prototype, "size", { + enumerable: true, +}); + +setFunctionName(CountQueuingStrategyImpl, "CountQueuingStrategy"); + +export class ByteLengthQueuingStrategyImpl + implements ByteLengthQueuingStrategy { + highWaterMark: number; + + constructor({ highWaterMark }: { highWaterMark: number }) { + this.highWaterMark = highWaterMark; + } + + size(chunk: ArrayBufferView): number { + return chunk.byteLength; + } + + [customInspect](): string { + return `${this.constructor.name} { highWaterMark: ${String( + this.highWaterMark + )}, size: f }`; + } +} + +Object.defineProperty(ByteLengthQueuingStrategyImpl.prototype, "size", { + enumerable: true, +}); + +setFunctionName(CountQueuingStrategyImpl, "CountQueuingStrategy"); diff --git a/cli/js/web/streams/readable_byte_stream_controller.ts b/cli/js/web/streams/readable_byte_stream_controller.ts index 80ba9ad85..cbef64357 100644 --- a/cli/js/web/streams/readable_byte_stream_controller.ts +++ b/cli/js/web/streams/readable_byte_stream_controller.ts @@ -18,11 +18,12 @@ import { readableStreamHasDefaultReader, readableStreamGetNumReadRequests, readableStreamCreateReadResult, + setFunctionName, } from "./internals.ts"; import { ReadableStreamImpl } from "./readable_stream.ts"; import * as sym from "./symbols.ts"; import { assert } from "../../util.ts"; -import { customInspect } from "../../web/console.ts"; +import { customInspect } from "../console.ts"; export class ReadableByteStreamControllerImpl implements ReadableByteStreamController { @@ -136,8 +137,13 @@ export class ReadableByteStreamControllerImpl } [customInspect](): string { - return `ReadableByteStreamController { byobRequest: ${String( + return `${this.constructor.name} { byobRequest: ${String( this.byobRequest )}, desiredSize: ${String(this.desiredSize)} }`; } } + +setFunctionName( + ReadableByteStreamControllerImpl, + "ReadableByteStreamController" +); diff --git a/cli/js/web/streams/readable_stream.ts b/cli/js/web/streams/readable_stream.ts index e234c909d..27a733d9d 100644 --- a/cli/js/web/streams/readable_stream.ts +++ b/cli/js/web/streams/readable_stream.ts @@ -6,9 +6,14 @@ import { isReadableStream, isReadableStreamLocked, isUnderlyingByteSource, + isWritableStream, + isWritableStreamLocked, makeSizeAlgorithmFromSizeFunction, + setFunctionName, + setPromiseIsHandledToTrue, readableStreamCancel, ReadableStreamGenericReader, + readableStreamPipeTo, readableStreamTee, setUpReadableByteStreamControllerFromUnderlyingSource, setUpReadableStreamDefaultControllerFromUnderlyingSource, @@ -18,8 +23,8 @@ import { ReadableByteStreamControllerImpl } from "./readable_byte_stream_control import { ReadableStreamAsyncIteratorPrototype } from "./readable_stream_async_iterator.ts"; import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts"; import * as sym from "./symbols.ts"; -import { notImplemented } from "../../util.ts"; -import { customInspect } from "../../web/console.ts"; +import { customInspect } from "../console.ts"; +import { AbortSignalImpl } from "../abort_signal.ts"; // eslint-disable-next-line @typescript-eslint/no-explicit-any export class ReadableStreamImpl<R = any> implements ReadableStream<R> { @@ -119,80 +124,81 @@ export class ReadableStreamImpl<R = any> implements ReadableStream<R> { throw new RangeError(`Unsupported mode "${mode}"`); } - pipeThrough<T>(): // { - // writable, - // readable, - // }: { - // writable: WritableStream<R>; - // readable: ReadableStream<T>; - // }, - // { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {}, - ReadableStream<T> { - return notImplemented(); - // if (!isReadableStream(this)) { - // throw new TypeError("Invalid ReadableStream."); - // } - // if (!isWritableStream(writable)) { - // throw new TypeError("writable is not a valid WritableStream."); - // } - // if (!isReadableStream(readable)) { - // throw new TypeError("readable is not a valid ReadableStream."); - // } - // preventClose = Boolean(preventClose); - // preventAbort = Boolean(preventAbort); - // preventCancel = Boolean(preventCancel); - // if (signal && !(signal instanceof AbortSignalImpl)) { - // throw new TypeError("Invalid signal."); - // } - // if (isReadableStreamLocked(this)) { - // throw new TypeError("ReadableStream is locked."); - // } - // if (isWritableStreamLocked(writable)) { - // throw new TypeError("writable is locked."); - // } - // readableStreamPipeTo( - // this, - // writable, - // preventClose, - // preventAbort, - // preventCancel, - // signal, - // ); - // return readable; + pipeThrough<T>( + { + writable, + readable, + }: { + writable: WritableStream<R>; + readable: ReadableStream<T>; + }, + { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {} + ): ReadableStream<T> { + if (!isReadableStream(this)) { + throw new TypeError("Invalid ReadableStream."); + } + if (!isWritableStream(writable)) { + throw new TypeError("writable is not a valid WritableStream."); + } + if (!isReadableStream(readable)) { + throw new TypeError("readable is not a valid ReadableStream."); + } + preventClose = Boolean(preventClose); + preventAbort = Boolean(preventAbort); + preventCancel = Boolean(preventCancel); + if (signal && !(signal instanceof AbortSignalImpl)) { + throw new TypeError("Invalid signal."); + } + if (isReadableStreamLocked(this)) { + throw new TypeError("ReadableStream is locked."); + } + if (isWritableStreamLocked(writable)) { + throw new TypeError("writable is locked."); + } + const promise = readableStreamPipeTo( + this, + writable, + preventClose, + preventAbort, + preventCancel, + signal + ); + setPromiseIsHandledToTrue(promise); + return readable; } - pipeTo(): // dest: WritableStream<R>, - // { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {}, - Promise<void> { - return notImplemented(); - // if (!isReadableStream(this)) { - // return Promise.reject(new TypeError("Invalid ReadableStream.")); - // } - // if (!isWritableStream(dest)) { - // return Promise.reject( - // new TypeError("dest is not a valid WritableStream."), - // ); - // } - // preventClose = Boolean(preventClose); - // preventAbort = Boolean(preventAbort); - // preventCancel = Boolean(preventCancel); - // if (signal && !(signal instanceof AbortSignalImpl)) { - // return Promise.reject(new TypeError("Invalid signal.")); - // } - // if (isReadableStreamLocked(this)) { - // return Promise.reject(new TypeError("ReadableStream is locked.")); - // } - // if (isWritableStreamLocked(this)) { - // return Promise.reject(new TypeError("dest is locked.")); - // } - // return readableStreamPipeTo( - // this, - // dest, - // preventClose, - // preventAbort, - // preventCancel, - // signal, - // ); + pipeTo( + dest: WritableStream<R>, + { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {} + ): Promise<void> { + if (!isReadableStream(this)) { + return Promise.reject(new TypeError("Invalid ReadableStream.")); + } + if (!isWritableStream(dest)) { + return Promise.reject( + new TypeError("dest is not a valid WritableStream.") + ); + } + preventClose = Boolean(preventClose); + preventAbort = Boolean(preventAbort); + preventCancel = Boolean(preventCancel); + if (signal && !(signal instanceof AbortSignalImpl)) { + return Promise.reject(new TypeError("Invalid signal.")); + } + if (isReadableStreamLocked(this)) { + return Promise.reject(new TypeError("ReadableStream is locked.")); + } + if (isWritableStreamLocked(dest)) { + return Promise.reject(new TypeError("dest is locked.")); + } + return readableStreamPipeTo( + this, + dest, + preventClose, + preventAbort, + preventCancel, + signal + ); } tee(): [ReadableStreamImpl<R>, ReadableStreamImpl<R>] { @@ -203,7 +209,7 @@ export class ReadableStreamImpl<R = any> implements ReadableStream<R> { } [customInspect](): string { - return `ReadableStream { locked: ${String(this.locked)} }`; + return `${this.constructor.name} { locked: ${String(this.locked)} }`; } [Symbol.asyncIterator]( @@ -214,3 +220,5 @@ export class ReadableStreamImpl<R = any> implements ReadableStream<R> { return this.getIterator(options); } } + +setFunctionName(ReadableStreamImpl, "ReadableStream"); diff --git a/cli/js/web/streams/readable_stream_default_controller.ts b/cli/js/web/streams/readable_stream_default_controller.ts index bbdfa1f0d..0755e7765 100644 --- a/cli/js/web/streams/readable_stream_default_controller.ts +++ b/cli/js/web/streams/readable_stream_default_controller.ts @@ -18,10 +18,11 @@ import { readableStreamDefaultControllerGetDesiredSize, resetQueue, SizeAlgorithm, + setFunctionName, } from "./internals.ts"; import { ReadableStreamImpl } from "./readable_stream.ts"; import * as sym from "./symbols.ts"; -import { customInspect } from "../../web/console.ts"; +import { customInspect } from "../console.ts"; // eslint-disable-next-line @typescript-eslint/no-explicit-any export class ReadableStreamDefaultControllerImpl<R = any> @@ -113,8 +114,13 @@ export class ReadableStreamDefaultControllerImpl<R = any> } [customInspect](): string { - return `ReadableStreamDefaultController { desiredSize: ${String( + return `${this.constructor.name} { desiredSize: ${String( this.desiredSize )} }`; } } + +setFunctionName( + ReadableStreamDefaultControllerImpl, + "ReadableStreamDefaultController" +); diff --git a/cli/js/web/streams/readable_stream_default_reader.ts b/cli/js/web/streams/readable_stream_default_reader.ts index 20fb21208..a0d5901dc 100644 --- a/cli/js/web/streams/readable_stream_default_reader.ts +++ b/cli/js/web/streams/readable_stream_default_reader.ts @@ -9,10 +9,11 @@ import { readableStreamReaderGenericCancel, readableStreamReaderGenericInitialize, readableStreamReaderGenericRelease, + setFunctionName, } from "./internals.ts"; import { ReadableStreamImpl } from "./readable_stream.ts"; import * as sym from "./symbols.ts"; -import { customInspect } from "../../web/console.ts"; +import { customInspect } from "../console.ts"; // eslint-disable-next-line @typescript-eslint/no-explicit-any export class ReadableStreamDefaultReaderImpl<R = any> @@ -84,6 +85,8 @@ export class ReadableStreamDefaultReaderImpl<R = any> } [customInspect](): string { - return `ReadableStreamDefaultReader { closed: Promise }`; + return `${this.constructor.name} { closed: Promise }`; } } + +setFunctionName(ReadableStreamDefaultReaderImpl, "ReadableStreamDefaultReader"); diff --git a/cli/js/web/streams/symbols.ts b/cli/js/web/streams/symbols.ts index 9d6335ef0..9e5cb5715 100644 --- a/cli/js/web/streams/symbols.ts +++ b/cli/js/web/streams/symbols.ts @@ -6,21 +6,32 @@ // this data from the public, therefore we will use unique symbols which are // not available in the runtime. +export const abortAlgorithm = Symbol("abortAlgorithm"); +export const abortSteps = Symbol("abortSteps"); export const asyncIteratorReader = Symbol("asyncIteratorReader"); export const autoAllocateChunkSize = Symbol("autoAllocateChunkSize"); +export const backpressure = Symbol("backpressure"); export const byobRequest = Symbol("byobRequest"); export const cancelAlgorithm = Symbol("cancelAlgorithm"); export const cancelSteps = Symbol("cancelSteps"); +export const closeAlgorithm = Symbol("closeAlgorithm"); export const closedPromise = Symbol("closedPromise"); +export const closeRequest = Symbol("closeRequest"); export const closeRequested = Symbol("closeRequested"); export const controlledReadableByteStream = Symbol( "controlledReadableByteStream" ); export const controlledReadableStream = Symbol("controlledReadableStream"); +export const controlledWritableStream = Symbol("controlledWritableStream"); export const disturbed = Symbol("disturbed"); +export const errorSteps = Symbol("errorSteps"); export const forAuthorCode = Symbol("forAuthorCode"); +export const inFlightWriteRequest = Symbol("inFlightWriteRequest"); +export const inFlightCloseRequest = Symbol("inFlightCloseRequest"); export const isFakeDetached = Symbol("isFakeDetached"); export const ownerReadableStream = Symbol("ownerReadableStream"); +export const ownerWritableStream = Symbol("ownerWritableStream"); +export const pendingAbortRequest = Symbol("pendingAbortRequest"); export const preventCancel = Symbol("preventCancel"); export const pullAgain = Symbol("pullAgain"); export const pullAlgorithm = Symbol("pullAlgorithm"); @@ -31,8 +42,13 @@ export const queueTotalSize = Symbol("queueTotalSize"); export const readableStreamController = Symbol("readableStreamController"); export const reader = Symbol("reader"); export const readRequests = Symbol("readRequests"); +export const readyPromise = Symbol("readyPromise"); export const started = Symbol("started"); export const state = Symbol("state"); export const storedError = Symbol("storedError"); export const strategyHWM = Symbol("strategyHWM"); export const strategySizeAlgorithm = Symbol("strategySizeAlgorithm"); +export const writableStreamController = Symbol("writableStreamController"); +export const writeAlgorithm = Symbol("writeAlgorithm"); +export const writer = Symbol("writer"); +export const writeRequests = Symbol("writeRequests"); diff --git a/cli/js/web/streams/writable_stream.ts b/cli/js/web/streams/writable_stream.ts new file mode 100644 index 000000000..4bbf339da --- /dev/null +++ b/cli/js/web/streams/writable_stream.ts @@ -0,0 +1,107 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +import { + AbortRequest, + acquireWritableStreamDefaultWriter, + Deferred, + initializeWritableStream, + isWritableStream, + isWritableStreamLocked, + makeSizeAlgorithmFromSizeFunction, + setFunctionName, + setUpWritableStreamDefaultControllerFromUnderlyingSink, + writableStreamAbort, + writableStreamClose, + writableStreamCloseQueuedOrInFlight, + validateAndNormalizeHighWaterMark, +} from "./internals.ts"; +import * as sym from "./symbols.ts"; +import { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts"; +import { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts"; +import { customInspect } from "../console.ts"; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export class WritableStreamImpl<W = any> implements WritableStream<W> { + [sym.backpressure]: boolean; + [sym.closeRequest]?: Deferred<void>; + [sym.inFlightWriteRequest]?: Required<Deferred<void>>; + [sym.inFlightCloseRequest]?: Deferred<void>; + [sym.pendingAbortRequest]?: AbortRequest; + [sym.state]: "writable" | "closed" | "erroring" | "errored"; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + [sym.storedError]?: any; + [sym.writableStreamController]?: WritableStreamDefaultControllerImpl<W>; + [sym.writer]?: WritableStreamDefaultWriterImpl<W>; + [sym.writeRequests]: Array<Required<Deferred<void>>>; + + constructor( + underlyingSink: UnderlyingSink = {}, + strategy: QueuingStrategy = {} + ) { + initializeWritableStream(this); + const size = strategy.size; + let highWaterMark = strategy.highWaterMark ?? 1; + const { type } = underlyingSink; + if (type !== undefined) { + throw new RangeError(`Sink type of "${String(type)}" not supported.`); + } + const sizeAlgorithm = makeSizeAlgorithmFromSizeFunction(size); + highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark); + setUpWritableStreamDefaultControllerFromUnderlyingSink( + this, + underlyingSink, + highWaterMark, + sizeAlgorithm + ); + } + + get locked(): boolean { + if (!isWritableStream(this)) { + throw new TypeError("Invalid WritableStream."); + } + return isWritableStreamLocked(this); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + abort(reason: any): Promise<void> { + if (!isWritableStream(this)) { + return Promise.reject(new TypeError("Invalid WritableStream.")); + } + if (isWritableStreamLocked(this)) { + return Promise.reject( + new TypeError("Cannot abort a locked WritableStream.") + ); + } + return writableStreamAbort(this, reason); + } + + close(): Promise<void> { + if (!isWritableStream(this)) { + return Promise.reject(new TypeError("Invalid WritableStream.")); + } + if (isWritableStreamLocked(this)) { + return Promise.reject( + new TypeError("Cannot abort a locked WritableStream.") + ); + } + if (writableStreamCloseQueuedOrInFlight(this)) { + return Promise.reject( + new TypeError("Cannot close an already closing WritableStream.") + ); + } + return writableStreamClose(this); + } + + getWriter(): WritableStreamDefaultWriter<W> { + if (!isWritableStream(this)) { + throw new TypeError("Invalid WritableStream."); + } + return acquireWritableStreamDefaultWriter(this); + } + + [customInspect](): string { + return `${this.constructor.name} { locked: ${String(this.locked)} }`; + } +} + +setFunctionName(WritableStreamImpl, "WritableStream"); diff --git a/cli/js/web/streams/writable_stream_default_controller.ts b/cli/js/web/streams/writable_stream_default_controller.ts new file mode 100644 index 000000000..040d0eefc --- /dev/null +++ b/cli/js/web/streams/writable_stream_default_controller.ts @@ -0,0 +1,68 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +import { + AbortAlgorithm, + CloseAlgorithm, + isWritableStreamDefaultController, + Pair, + resetQueue, + setFunctionName, + SizeAlgorithm, + WriteAlgorithm, + writableStreamDefaultControllerClearAlgorithms, + writableStreamDefaultControllerError, +} from "./internals.ts"; +import * as sym from "./symbols.ts"; +import { WritableStreamImpl } from "./writable_stream.ts"; +import { customInspect } from "../console.ts"; + +export class WritableStreamDefaultControllerImpl<W> + implements WritableStreamDefaultController { + [sym.abortAlgorithm]: AbortAlgorithm; + [sym.closeAlgorithm]: CloseAlgorithm; + [sym.controlledWritableStream]: WritableStreamImpl; + [sym.queue]: Array<Pair<{ chunk: W } | "close">>; + [sym.queueTotalSize]: number; + [sym.started]: boolean; + [sym.strategyHWM]: number; + [sym.strategySizeAlgorithm]: SizeAlgorithm<W>; + [sym.writeAlgorithm]: WriteAlgorithm<W>; + + private constructor() { + throw new TypeError( + "WritableStreamDefaultController's constructor cannot be called." + ); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + error(e: any): void { + if (!isWritableStreamDefaultController(this)) { + throw new TypeError("Invalid WritableStreamDefaultController."); + } + const state = this[sym.controlledWritableStream][sym.state]; + if (state !== "writable") { + return; + } + writableStreamDefaultControllerError(this, e); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + [sym.abortSteps](reason: any): PromiseLike<void> { + const result = this[sym.abortAlgorithm](reason); + writableStreamDefaultControllerClearAlgorithms(this); + return result; + } + + [sym.errorSteps](): void { + resetQueue(this); + } + + [customInspect](): string { + return `${this.constructor.name} { }`; + } +} + +setFunctionName( + WritableStreamDefaultControllerImpl, + "WritableStreamDefaultController" +); diff --git a/cli/js/web/streams/writable_stream_default_writer.ts b/cli/js/web/streams/writable_stream_default_writer.ts new file mode 100644 index 000000000..cd6b71044 --- /dev/null +++ b/cli/js/web/streams/writable_stream_default_writer.ts @@ -0,0 +1,164 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +import { + Deferred, + getDeferred, + isWritableStream, + isWritableStreamDefaultWriter, + isWritableStreamLocked, + setFunctionName, + setPromiseIsHandledToTrue, + writableStreamCloseQueuedOrInFlight, + writableStreamDefaultWriterAbort, + writableStreamDefaultWriterClose, + writableStreamDefaultWriterGetDesiredSize, + writableStreamDefaultWriterRelease, + writableStreamDefaultWriterWrite, +} from "./internals.ts"; +import * as sym from "./symbols.ts"; +import { WritableStreamImpl } from "./writable_stream.ts"; +import { customInspect } from "../console.ts"; +import { assert } from "../../util.ts"; + +export class WritableStreamDefaultWriterImpl<W> + implements WritableStreamDefaultWriter<W> { + [sym.closedPromise]: Deferred<void>; + [sym.ownerWritableStream]: WritableStreamImpl<W>; + [sym.readyPromise]: Deferred<void>; + + constructor(stream: WritableStreamImpl<W>) { + if (!isWritableStream(stream)) { + throw new TypeError("Invalid stream."); + } + if (isWritableStreamLocked(stream)) { + throw new TypeError("Cannot create a reader for a locked stream."); + } + this[sym.ownerWritableStream] = stream; + stream[sym.writer] = this; + const state = stream[sym.state]; + if (state === "writable") { + if ( + !writableStreamCloseQueuedOrInFlight(stream) && + stream[sym.backpressure] + ) { + this[sym.readyPromise] = getDeferred(); + } else { + this[sym.readyPromise] = { promise: Promise.resolve() }; + } + this[sym.closedPromise] = getDeferred(); + } else if (state === "erroring") { + this[sym.readyPromise] = { + promise: Promise.reject(stream[sym.storedError]), + }; + setPromiseIsHandledToTrue(this[sym.readyPromise].promise); + this[sym.closedPromise] = getDeferred(); + } else if (state === "closed") { + this[sym.readyPromise] = { promise: Promise.resolve() }; + this[sym.closedPromise] = { promise: Promise.resolve() }; + } else { + assert(state === "errored"); + const storedError = stream[sym.storedError]; + this[sym.readyPromise] = { promise: Promise.reject(storedError) }; + setPromiseIsHandledToTrue(this[sym.readyPromise].promise); + this[sym.closedPromise] = { promise: Promise.reject(storedError) }; + setPromiseIsHandledToTrue(this[sym.closedPromise].promise); + } + } + + get closed(): Promise<void> { + if (!isWritableStreamDefaultWriter(this)) { + return Promise.reject( + new TypeError("Invalid WritableStreamDefaultWriter.") + ); + } + return this[sym.closedPromise].promise; + } + + get desiredSize(): number | null { + if (!isWritableStreamDefaultWriter(this)) { + throw new TypeError("Invalid WritableStreamDefaultWriter."); + } + if (!this[sym.ownerWritableStream]) { + throw new TypeError("WritableStreamDefaultWriter has no owner."); + } + return writableStreamDefaultWriterGetDesiredSize(this); + } + + get ready(): Promise<void> { + if (!isWritableStreamDefaultWriter(this)) { + return Promise.reject( + new TypeError("Invalid WritableStreamDefaultWriter.") + ); + } + return this[sym.readyPromise].promise; + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + abort(reason: any): Promise<void> { + if (!isWritableStreamDefaultWriter(this)) { + return Promise.reject( + new TypeError("Invalid WritableStreamDefaultWriter.") + ); + } + if (!this[sym.ownerWritableStream]) { + Promise.reject( + new TypeError("WritableStreamDefaultWriter has no owner.") + ); + } + return writableStreamDefaultWriterAbort(this, reason); + } + + close(): Promise<void> { + if (!isWritableStreamDefaultWriter(this)) { + return Promise.reject( + new TypeError("Invalid WritableStreamDefaultWriter.") + ); + } + const stream = this[sym.ownerWritableStream]; + if (!stream) { + Promise.reject( + new TypeError("WritableStreamDefaultWriter has no owner.") + ); + } + if (writableStreamCloseQueuedOrInFlight(stream)) { + Promise.reject( + new TypeError("Stream is in an invalid state to be closed.") + ); + } + return writableStreamDefaultWriterClose(this); + } + + releaseLock(): void { + if (!isWritableStreamDefaultWriter(this)) { + throw new TypeError("Invalid WritableStreamDefaultWriter."); + } + const stream = this[sym.ownerWritableStream]; + if (!stream) { + return; + } + assert(stream[sym.writer]); + writableStreamDefaultWriterRelease(this); + } + + write(chunk: W): Promise<void> { + if (!isWritableStreamDefaultWriter(this)) { + return Promise.reject( + new TypeError("Invalid WritableStreamDefaultWriter.") + ); + } + if (!this[sym.ownerWritableStream]) { + Promise.reject( + new TypeError("WritableStreamDefaultWriter has no owner.") + ); + } + return writableStreamDefaultWriterWrite(this, chunk); + } + + [customInspect](): string { + return `${this.constructor.name} { closed: Promise, desiredSize: ${String( + this.desiredSize + )}, ready: Promise }`; + } +} + +setFunctionName(WritableStreamDefaultWriterImpl, "WritableStreamDefaultWriter"); |