diff options
Diffstat (limited to 'cli/js/web/streams/internals.ts')
-rw-r--r-- | cli/js/web/streams/internals.ts | 1224 |
1 files changed, 1115 insertions, 109 deletions
diff --git a/cli/js/web/streams/internals.ts b/cli/js/web/streams/internals.ts index 2559d9e5c..846db096e 100644 --- a/cli/js/web/streams/internals.ts +++ b/cli/js/web/streams/internals.ts @@ -13,13 +13,25 @@ import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_c import { ReadableStreamDefaultReaderImpl } from "./readable_stream_default_reader.ts"; import { ReadableStreamImpl } from "./readable_stream.ts"; import * as sym from "./symbols.ts"; +import { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts"; +import { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts"; +import { WritableStreamImpl } from "./writable_stream.ts"; +import { AbortSignalImpl } from "../abort_signal.ts"; +import { DOMExceptionImpl as DOMException } from "../dom_exception.ts"; import { cloneValue } from "../util.ts"; -import { assert } from "../../util.ts"; +import { assert, AssertionError } from "../../util.ts"; +export type AbortAlgorithm = (reason?: any) => PromiseLike<void>; +export interface AbortRequest { + promise: Deferred<void>; + reason?: any; + wasAlreadyErroring: boolean; +} export interface BufferQueueItem extends Pair<ArrayBuffer | SharedArrayBuffer> { offset: number; } export type CancelAlgorithm = (reason?: any) => PromiseLike<void>; +export type CloseAlgorithm = () => PromiseLike<void>; type Container<R = any> = { [sym.queue]: Array<Pair<R> | BufferQueueItem>; [sym.queueTotalSize]: number; @@ -28,11 +40,11 @@ export type Pair<R> = { value: R; size: number }; export type PullAlgorithm = () => PromiseLike<void>; export type SizeAlgorithm<T> = (chunk: T) => number; export type StartAlgorithm = () => void | PromiseLike<void>; +export type WriteAlgorithm<W> = (chunk: W) => Promise<void>; export interface Deferred<T> { promise: Promise<T>; resolve?: (value?: T | PromiseLike<T>) => void; reject?: (reason?: any) => void; - settled: boolean; } export interface ReadableStreamGenericReader<R = any> @@ -58,6 +70,12 @@ export function acquireReadableStreamDefaultReader<T>( return reader; } +export function acquireWritableStreamDefaultWriter<W>( + stream: WritableStreamImpl<W> +): WritableStreamDefaultWriterImpl<W> { + return new WritableStreamDefaultWriterImpl(stream); +} + function createAlgorithmFromUnderlyingMethod< O extends UnderlyingByteSource | UnderlyingSource, P extends keyof O @@ -157,14 +175,14 @@ function enqueueValueWithSize<R>( /** Non-spec mechanism to "unwrap" a promise and store it to be resolved * later. */ -function getDeferred<T>(): Deferred<T> { - let resolve = undefined; - let reject = undefined; +export function getDeferred<T>(): Required<Deferred<T>> { + let resolve: (value?: T | PromiseLike<T>) => void; + let reject: (reason?: any) => void; const promise = new Promise<T>((res, rej) => { resolve = res; reject = rej; }); - return { promise, resolve, reject, settled: false }; + return { promise, resolve: resolve!, reject: reject! }; } export function initializeReadableStream(stream: ReadableStreamImpl): void { @@ -173,6 +191,17 @@ export function initializeReadableStream(stream: ReadableStreamImpl): void { stream[sym.disturbed] = false; } +export function initializeWritableStream(stream: WritableStreamImpl): void { + stream[sym.state] = "writable"; + stream[sym.storedError] = stream[sym.writer] = stream[ + sym.writableStreamController + ] = stream[sym.inFlightWriteRequest] = stream[sym.closeRequest] = stream[ + sym.inFlightCloseRequest + ] = stream[sym.pendingAbortRequest] = undefined; + stream[sym.writeRequests] = []; + stream[sym.backpressure] = false; +} + function invokeOrNoop<O extends any, P extends keyof O>( o: O, p: P, @@ -278,6 +307,40 @@ export function isUnderlyingByteSource( return typeString === "bytes"; } +export function isWritableStream(x: unknown): x is WritableStreamImpl { + return typeof x !== "object" || + x === null || + !(sym.writableStreamController in x) + ? false + : true; +} + +export function isWritableStreamDefaultController( + x: unknown +): x is WritableStreamDefaultControllerImpl<any> { + return typeof x !== "object" || + x === null || + !(sym.controlledWritableStream in x) + ? false + : true; +} + +export function isWritableStreamDefaultWriter( + x: unknown +): x is WritableStreamDefaultWriterImpl<any> { + return typeof x !== "object" || x === null || !(sym.ownerWritableStream in x) + ? false + : true; +} + +export function isWritableStreamLocked(stream: WritableStreamImpl): boolean { + assert(isWritableStream(stream)); + if (stream[sym.writer] === undefined) { + return false; + } + return true; +} + export function makeSizeAlgorithmFromSizeFunction<T>( size: QueuingStrategySizeCallback<T> | undefined ): SizeAlgorithm<T> { @@ -292,6 +355,13 @@ export function makeSizeAlgorithmFromSizeFunction<T>( }; } +function peekQueueValue<T>(container: Container<T>): T | "close" { + assert(sym.queue in container && sym.queueTotalSize in container); + assert(container[sym.queue].length); + const [pair] = container[sym.queue]; + return pair.value as T; +} + function readableByteStreamControllerShouldCallPull( controller: ReadableByteStreamControllerImpl ): boolean { @@ -333,25 +403,27 @@ export function readableByteStreamControllerCallPullIfNeeded( assert(controller[sym.pullAgain] === false); controller[sym.pulling] = true; const pullPromise = controller[sym.pullAlgorithm](); - pullPromise.then( - () => { - controller[sym.pulling] = false; - if (controller[sym.pullAgain]) { - controller[sym.pullAgain]; - readableByteStreamControllerCallPullIfNeeded(controller); + setPromiseIsHandledToTrue( + pullPromise.then( + () => { + controller[sym.pulling] = false; + if (controller[sym.pullAgain]) { + controller[sym.pullAgain] = false; + readableByteStreamControllerCallPullIfNeeded(controller); + } + }, + (e) => { + readableByteStreamControllerError(controller, e); } - }, - (e) => { - readableByteStreamControllerError(controller, e); - } + ) ); } export function readableByteStreamControllerClearAlgorithms( controller: ReadableByteStreamControllerImpl ): void { - delete controller[sym.pullAlgorithm]; - delete controller[sym.cancelAlgorithm]; + (controller as any)[sym.pullAlgorithm] = undefined; + (controller as any)[sym.cancelAlgorithm] = undefined; } export function readableByteStreamControllerClose( @@ -476,7 +548,7 @@ export function readableStreamAddReadRequest<R>( return promise.promise; } -export async function readableStreamCancel<T>( +export function readableStreamCancel<T>( stream: ReadableStreamImpl<T>, reason: any ): Promise<void> { @@ -488,7 +560,9 @@ export async function readableStreamCancel<T>( return Promise.reject(stream[sym.storedError]); } readableStreamClose(stream); - await stream[sym.readableStreamController]; + return stream[sym.readableStreamController].then( + () => undefined + ) as Promise<void>; } export function readableStreamClose<T>(stream: ReadableStreamImpl<T>): void { @@ -514,7 +588,6 @@ export function readableStreamClose<T>(stream: ReadableStreamImpl<T>): void { const resolve = reader[sym.closedPromise].resolve; assert(resolve); resolve(); - reader[sym.closedPromise].settled = true; } export function readableStreamCreateReadResult<T>( @@ -573,9 +646,9 @@ export function readableStreamDefaultControllerCanCloseOrEnqueue<T>( export function readableStreamDefaultControllerClearAlgorithms<T>( controller: ReadableStreamDefaultControllerImpl<T> ): void { - delete controller[sym.pullAlgorithm]; - delete controller[sym.cancelAlgorithm]; - delete controller[sym.strategySizeAlgorithm]; + (controller as any)[sym.pullAlgorithm] = undefined; + (controller as any)[sym.cancelAlgorithm] = undefined; + (controller as any)[sym.strategySizeAlgorithm] = undefined; } export function readableStreamDefaultControllerClose<T>( @@ -703,17 +776,18 @@ export function readableStreamError(stream: ReadableStreamImpl, e: any): void { } if (isReadableStreamDefaultReader(reader)) { for (const readRequest of reader[sym.readRequests]) { - const { reject } = readRequest; - assert(reject); - reject(e); + assert(readRequest.reject); + readRequest.reject(e); + readRequest.reject = undefined; + readRequest.resolve = undefined; } reader[sym.readRequests] = []; } // 3.5.6.8 Otherwise, support BYOB Reader - const { reject } = reader[sym.closedPromise]; - assert(reject); - reject(e); - reader[sym.closedPromise].settled = true; + reader[sym.closedPromise].reject!(e); + reader[sym.closedPromise].reject = undefined; + reader[sym.closedPromise].resolve = undefined; + setPromiseIsHandledToTrue(reader[sym.closedPromise].promise); } export function readableStreamFulfillReadRequest<R>( @@ -744,6 +818,252 @@ export function readableStreamHasDefaultReader( : true; } +export function readableStreamPipeTo<T>( + source: ReadableStreamImpl<T>, + dest: WritableStreamImpl<T>, + preventClose: boolean, + preventAbort: boolean, + preventCancel: boolean, + signal: AbortSignalImpl | undefined +): Promise<void> { + assert(isReadableStream(source)); + assert(isWritableStream(dest)); + assert( + typeof preventClose === "boolean" && + typeof preventAbort === "boolean" && + typeof preventCancel === "boolean" + ); + assert(signal === undefined || signal instanceof AbortSignalImpl); + assert(!isReadableStreamLocked(source)); + assert(!isWritableStreamLocked(dest)); + const reader = acquireReadableStreamDefaultReader(source); + const writer = acquireWritableStreamDefaultWriter(dest); + source[sym.disturbed] = true; + let shuttingDown = false; + const promise = getDeferred<void>(); + let abortAlgorithm: () => void; + if (signal) { + abortAlgorithm = (): void => { + const error = new DOMException("Abort signal received.", "AbortSignal"); + const actions: Array<() => Promise<void>> = []; + if (!preventAbort) { + actions.push(() => { + if (dest[sym.state] === "writable") { + return writableStreamAbort(dest, error); + } else { + return Promise.resolve(undefined); + } + }); + } + if (!preventCancel) { + actions.push(() => { + if (source[sym.state] === "readable") { + return readableStreamCancel(source, error); + } else { + return Promise.resolve(undefined); + } + }); + } + shutdownWithAction( + () => Promise.all(actions.map((action) => action())), + true, + error + ); + }; + if (signal.aborted) { + abortAlgorithm(); + return promise.promise; + } + signal.addEventListener("abort", abortAlgorithm); + } + + let currentWrite = Promise.resolve(); + + // At this point, the spec becomes non-specific and vague. Most of the rest + // of this code is based on the reference implementation that is part of the + // specification. This is why the functions are only scoped to this function + // to ensure they don't leak into the spec compliant parts. + + function isOrBecomesClosed( + stream: ReadableStreamImpl | WritableStreamImpl, + promise: Promise<void>, + action: () => void + ): void { + if (stream[sym.state] === "closed") { + action(); + } else { + setPromiseIsHandledToTrue(promise.then(action)); + } + } + + function isOrBecomesErrored( + stream: ReadableStreamImpl | WritableStreamImpl, + promise: Promise<void>, + action: (error: any) => void + ): void { + if (stream[sym.state] === "errored") { + action(stream[sym.storedError]); + } else { + setPromiseIsHandledToTrue(promise.catch((error) => action(error))); + } + } + + function finalize(isError?: boolean, error?: any): void { + writableStreamDefaultWriterRelease(writer); + readableStreamReaderGenericRelease(reader); + + if (signal) { + signal.removeEventListener("abort", abortAlgorithm); + } + if (isError) { + promise.reject(error); + } else { + promise.resolve(); + } + } + + function waitForWritesToFinish(): Promise<void> { + const oldCurrentWrite = currentWrite; + return currentWrite.then(() => + oldCurrentWrite !== currentWrite ? waitForWritesToFinish() : undefined + ); + } + + function shutdownWithAction( + action: () => Promise<any>, + originalIsError?: boolean, + originalError?: any + ): void { + function doTheRest(): void { + setPromiseIsHandledToTrue( + action().then( + () => finalize(originalIsError, originalError), + (newError) => finalize(true, newError) + ) + ); + } + + if (shuttingDown) { + return; + } + shuttingDown = true; + + if ( + dest[sym.state] === "writable" && + writableStreamCloseQueuedOrInFlight(dest) === false + ) { + setPromiseIsHandledToTrue(waitForWritesToFinish().then(doTheRest)); + } else { + doTheRest(); + } + } + + function shutdown(isError: boolean, error?: any): void { + if (shuttingDown) { + return; + } + shuttingDown = true; + + if ( + dest[sym.state] === "writable" && + !writableStreamCloseQueuedOrInFlight(dest) + ) { + setPromiseIsHandledToTrue( + waitForWritesToFinish().then(() => finalize(isError, error)) + ); + } + finalize(isError, error); + } + + function pipeStep(): Promise<boolean> { + if (shuttingDown) { + return Promise.resolve(true); + } + return writer[sym.readyPromise].promise.then(() => { + return readableStreamDefaultReaderRead(reader).then(({ value, done }) => { + if (done === true) { + return true; + } + currentWrite = writableStreamDefaultWriterWrite( + writer, + value! + ).then(undefined, () => {}); + return false; + }); + }); + } + + function pipeLoop(): Promise<void> { + return new Promise((resolveLoop, rejectLoop) => { + function next(done: boolean): void { + if (done) { + resolveLoop(undefined); + } else { + setPromiseIsHandledToTrue(pipeStep().then(next, rejectLoop)); + } + } + next(false); + }); + } + + isOrBecomesErrored( + source, + reader[sym.closedPromise].promise, + (storedError) => { + if (!preventAbort) { + shutdownWithAction( + () => writableStreamAbort(dest, storedError), + true, + storedError + ); + } else { + shutdown(true, storedError); + } + } + ); + + isOrBecomesErrored(dest, writer[sym.closedPromise].promise, (storedError) => { + if (!preventCancel) { + shutdownWithAction( + () => readableStreamCancel(source, storedError), + true, + storedError + ); + } else { + shutdown(true, storedError); + } + }); + + isOrBecomesClosed(source, reader[sym.closedPromise].promise, () => { + if (!preventClose) { + shutdownWithAction(() => + writableStreamDefaultWriterCloseWithErrorPropagation(writer) + ); + } + }); + + if ( + writableStreamCloseQueuedOrInFlight(dest) || + dest[sym.state] === "closed" + ) { + const destClosed = new TypeError( + "The destination writable stream closed before all data could be piped to it." + ); + if (!preventCancel) { + shutdownWithAction( + () => readableStreamCancel(source, destClosed), + true, + destClosed + ); + } else { + shutdown(true, destClosed); + } + } + + setPromiseIsHandledToTrue(pipeLoop()); + return promise.promise; +} + export function readableStreamReaderGenericCancel<R = any>( reader: ReadableStreamGenericReader<R>, reason: any @@ -763,16 +1083,13 @@ export function readableStreamReaderGenericInitialize<R = any>( if (stream[sym.state] === "readable") { reader[sym.closedPromise] = getDeferred(); } else if (stream[sym.state] === "closed") { - reader[sym.closedPromise] = { - promise: Promise.resolve(), - settled: true, - }; + reader[sym.closedPromise] = { promise: Promise.resolve() }; } else { assert(stream[sym.state] === "errored"); reader[sym.closedPromise] = { promise: Promise.reject(stream[sym.storedError]), - settled: true, }; + setPromiseIsHandledToTrue(reader[sym.closedPromise].promise); } } @@ -790,9 +1107,9 @@ export function readableStreamReaderGenericRelease<R = any>( delete closedPromise.reject; delete closedPromise.resolve; } - closedPromise.settled = true; - delete reader[sym.ownerReadableStream][sym.reader]; - delete reader[sym.ownerReadableStream]; + setPromiseIsHandledToTrue(closedPromise.promise); + reader[sym.ownerReadableStream][sym.reader] = undefined; + (reader as any)[sym.ownerReadableStream] = undefined; } export function readableStreamTee<T>( @@ -817,51 +1134,54 @@ export function readableStreamTee<T>( return Promise.resolve(); } reading = true; - readableStreamDefaultReaderRead(reader).then((result) => { - reading = false; - assert(typeof result === "object"); - const { done } = result; - assert(typeof done === "boolean"); - if (done) { + const readPromise = readableStreamDefaultReaderRead(reader).then( + (result) => { + reading = false; + assert(typeof result === "object"); + const { done } = result; + assert(typeof done === "boolean"); + if (done) { + if (!canceled1) { + readableStreamDefaultControllerClose( + branch1[ + sym.readableStreamController + ] as ReadableStreamDefaultControllerImpl + ); + } + if (!canceled2) { + readableStreamDefaultControllerClose( + branch2[ + sym.readableStreamController + ] as ReadableStreamDefaultControllerImpl + ); + } + return; + } + const { value } = result; + const value1 = value!; + let value2 = value!; + if (!canceled2 && cloneForBranch2) { + value2 = cloneValue(value2); + } if (!canceled1) { - readableStreamDefaultControllerClose( + readableStreamDefaultControllerEnqueue( branch1[ sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl + ] as ReadableStreamDefaultControllerImpl, + value1 ); } if (!canceled2) { - readableStreamDefaultControllerClose( + readableStreamDefaultControllerEnqueue( branch2[ sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl + ] as ReadableStreamDefaultControllerImpl, + value2 ); } - return; - } - const { value } = result; - const value1 = value!; - let value2 = value!; - if (!canceled2 && cloneForBranch2) { - value2 = cloneValue(value2); } - if (!canceled1) { - readableStreamDefaultControllerEnqueue( - branch1[ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl, - value1 - ); - } - if (!canceled2) { - readableStreamDefaultControllerEnqueue( - branch2[ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl, - value2 - ); - } - }); + ); + setPromiseIsHandledToTrue(readPromise); return Promise.resolve(); }; const cancel1Algorithm = (reason?: any): PromiseLike<void> => { @@ -870,7 +1190,6 @@ export function readableStreamTee<T>( if (canceled2) { const compositeReason = [reason1, reason2]; const cancelResult = readableStreamCancel(stream, compositeReason); - assert(cancelPromise.resolve); cancelPromise.resolve(cancelResult); } return cancelPromise.promise; @@ -881,7 +1200,6 @@ export function readableStreamTee<T>( if (canceled1) { const compositeReason = [reason1, reason2]; const cancelResult = readableStreamCancel(stream, compositeReason); - assert(cancelPromise.resolve); cancelPromise.resolve(cancelResult); } return cancelPromise.promise; @@ -897,20 +1215,22 @@ export function readableStreamTee<T>( pullAlgorithm, cancel2Algorithm ); - reader[sym.closedPromise].promise.catch((r) => { - readableStreamDefaultControllerError( - branch1[ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl, - r - ); - readableStreamDefaultControllerError( - branch2[ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl, - r - ); - }); + setPromiseIsHandledToTrue( + reader[sym.closedPromise].promise.catch((r) => { + readableStreamDefaultControllerError( + branch1[ + sym.readableStreamController + ] as ReadableStreamDefaultControllerImpl, + r + ); + readableStreamDefaultControllerError( + branch2[ + sym.readableStreamController + ] as ReadableStreamDefaultControllerImpl, + r + ); + }) + ); return [branch1, branch2]; } @@ -920,6 +1240,25 @@ export function resetQueue<R>(container: Container<R>): void { container[sym.queueTotalSize] = 0; } +/** An internal function which provides a function name for some generated + * functions, so stack traces are a bit more readable. */ +export function setFunctionName(fn: Function, value: string): void { + Object.defineProperty(fn, "name", { value, configurable: true }); +} + +/** An internal function which mimics the behavior of setting the promise to + * handled in JavaScript. In this situation, an assertion failure, which + * shouldn't happen will get thrown, instead of swallowed. */ +export function setPromiseIsHandledToTrue(promise: PromiseLike<unknown>): void { + promise.then(undefined, (e) => { + if (e && e instanceof AssertionError) { + queueMicrotask(() => { + throw e; + }); + } + }); +} + function setUpReadableByteStreamController( stream: ReadableStreamImpl, controller: ReadableByteStreamControllerImpl, @@ -950,16 +1289,18 @@ function setUpReadableByteStreamController( stream[sym.readableStreamController] = controller; const startResult = startAlgorithm(); const startPromise = Promise.resolve(startResult); - startPromise.then( - () => { - controller[sym.started] = true; - assert(!controller[sym.pulling]); - assert(!controller[sym.pullAgain]); - readableByteStreamControllerCallPullIfNeeded(controller); - }, - (r) => { - readableByteStreamControllerError(controller, r); - } + setPromiseIsHandledToTrue( + startPromise.then( + () => { + controller[sym.started] = true; + assert(!controller[sym.pulling]); + assert(!controller[sym.pullAgain]); + readableByteStreamControllerCallPullIfNeeded(controller); + }, + (r) => { + readableByteStreamControllerError(controller, r); + } + ) ); } @@ -981,11 +1322,13 @@ export function setUpReadableByteStreamControllerFromUnderlyingSource( 0, controller ); + setFunctionName(pullAlgorithm, "[[pullAlgorithm]]"); const cancelAlgorithm = createAlgorithmFromUnderlyingMethod( underlyingByteSource, "cancel", 1 ); + setFunctionName(cancelAlgorithm, "[[cancelAlgorithm]]"); // 3.13.27.6 Let autoAllocateChunkSize be ? GetV(underlyingByteSource, "autoAllocateChunkSize"). const autoAllocateChunkSize = undefined; setUpReadableByteStreamController( @@ -1022,16 +1365,18 @@ function setUpReadableStreamDefaultController<T>( stream[sym.readableStreamController] = controller; const startResult = startAlgorithm(); const startPromise = Promise.resolve(startResult); - startPromise.then( - () => { - controller[sym.started] = true; - assert(controller[sym.pulling] === false); - assert(controller[sym.pullAgain] === false); - readableStreamDefaultControllerCallPullIfNeeded(controller); - }, - (r) => { - readableStreamDefaultControllerError(controller, r); - } + setPromiseIsHandledToTrue( + startPromise.then( + () => { + controller[sym.started] = true; + assert(controller[sym.pulling] === false); + assert(controller[sym.pullAgain] === false); + readableStreamDefaultControllerCallPullIfNeeded(controller); + }, + (r) => { + readableStreamDefaultControllerError(controller, r); + } + ) ); } @@ -1053,11 +1398,13 @@ export function setUpReadableStreamDefaultControllerFromUnderlyingSource<T>( 0, controller ); + setFunctionName(pullAlgorithm, "[[pullAlgorithm]]"); const cancelAlgorithm: CancelAlgorithm = createAlgorithmFromUnderlyingMethod( underlyingSource, "cancel", 1 ); + setFunctionName(cancelAlgorithm, "[[cancelAlgorithm]]"); setUpReadableStreamDefaultController( stream, controller, @@ -1069,6 +1416,98 @@ export function setUpReadableStreamDefaultControllerFromUnderlyingSource<T>( ); } +function setUpWritableStreamDefaultController<W>( + stream: WritableStreamImpl<W>, + controller: WritableStreamDefaultControllerImpl<W>, + startAlgorithm: StartAlgorithm, + writeAlgorithm: WriteAlgorithm<W>, + closeAlgorithm: CloseAlgorithm, + abortAlgorithm: AbortAlgorithm, + highWaterMark: number, + sizeAlgorithm: SizeAlgorithm<W> +): void { + assert(isWritableStream(stream)); + assert(stream[sym.writableStreamController] === undefined); + controller[sym.controlledWritableStream] = stream; + stream[sym.writableStreamController] = controller; + controller[sym.queue] = []; + controller[sym.queueTotalSize] = 0; + controller[sym.started] = false; + controller[sym.strategySizeAlgorithm] = sizeAlgorithm; + controller[sym.strategyHWM] = highWaterMark; + controller[sym.writeAlgorithm] = writeAlgorithm; + controller[sym.closeAlgorithm] = closeAlgorithm; + controller[sym.abortAlgorithm] = abortAlgorithm; + const backpressure = writableStreamDefaultControllerGetBackpressure( + controller + ); + writableStreamUpdateBackpressure(stream, backpressure); + const startResult = startAlgorithm(); + const startPromise = Promise.resolve(startResult); + setPromiseIsHandledToTrue( + startPromise.then( + () => { + assert( + stream[sym.state] === "writable" || stream[sym.state] === "erroring" + ); + controller[sym.started] = true; + writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + }, + (r) => { + assert( + stream[sym.state] === "writable" || stream[sym.state] === "erroring" + ); + controller[sym.started] = true; + writableStreamDealWithRejection(stream, r); + } + ) + ); +} + +export function setUpWritableStreamDefaultControllerFromUnderlyingSink<W>( + stream: WritableStreamImpl<W>, + underlyingSink: UnderlyingSink<W>, + highWaterMark: number, + sizeAlgorithm: SizeAlgorithm<W> +): void { + assert(underlyingSink); + const controller = Object.create( + WritableStreamDefaultControllerImpl.prototype + ); + const startAlgorithm = (): void | PromiseLike<void> => { + return invokeOrNoop(underlyingSink, "start", controller); + }; + const writeAlgorithm = createAlgorithmFromUnderlyingMethod( + underlyingSink, + "write", + 1, + controller + ); + setFunctionName(writeAlgorithm, "[[writeAlgorithm]]"); + const closeAlgorithm = createAlgorithmFromUnderlyingMethod( + underlyingSink, + "close", + 0 + ); + setFunctionName(closeAlgorithm, "[[closeAlgorithm]]"); + const abortAlgorithm = createAlgorithmFromUnderlyingMethod( + underlyingSink, + "abort", + 1 + ); + setFunctionName(abortAlgorithm, "[[abortAlgorithm]]"); + setUpWritableStreamDefaultController( + stream, + controller, + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + highWaterMark, + sizeAlgorithm + ); +} + function transferArrayBuffer(buffer: ArrayBuffer): ArrayBuffer { assert(!isDetachedBuffer(buffer)); const transferredIshVersion = buffer.slice(0); @@ -1095,4 +1534,571 @@ export function validateAndNormalizeHighWaterMark( return highWaterMark; } +export function writableStreamAbort<W>( + stream: WritableStreamImpl<W>, + reason: any +): Promise<void> { + const state = stream[sym.state]; + if (state === "closed" || state === "errored") { + return Promise.resolve(undefined); + } + if (stream[sym.pendingAbortRequest]) { + return stream[sym.pendingAbortRequest]!.promise.promise; + } + assert(state === "writable" || state === "erroring"); + let wasAlreadyErroring = false; + if (state === "erroring") { + wasAlreadyErroring = true; + reason = undefined; + } + const promise = getDeferred<void>(); + stream[sym.pendingAbortRequest] = { promise, reason, wasAlreadyErroring }; + + if (wasAlreadyErroring === false) { + writableStreamStartErroring(stream, reason); + } + return promise.promise; +} + +function writableStreamAddWriteRequest<W>( + stream: WritableStreamImpl<W> +): Promise<void> { + assert(isWritableStream(stream)); + assert(stream[sym.state] === "writable"); + const promise = getDeferred<void>(); + stream[sym.writeRequests].push(promise); + return promise.promise; +} + +export function writableStreamClose<W>( + stream: WritableStreamImpl<W> +): Promise<void> { + const state = stream[sym.state]; + if (state === "closed" || state === "errored") { + return Promise.reject( + new TypeError("Cannot close an already closed or errored WritableStream.") + ); + } + assert(!writableStreamCloseQueuedOrInFlight(stream)); + const promise = getDeferred<void>(); + stream[sym.closeRequest] = promise; + const writer = stream[sym.writer]; + if (writer && stream[sym.backpressure] && state === "writable") { + writer[sym.readyPromise].resolve!(); + writer[sym.readyPromise].resolve = undefined; + writer[sym.readyPromise].reject = undefined; + } + writableStreamDefaultControllerClose(stream[sym.writableStreamController]!); + return promise.promise; +} + +export function writableStreamCloseQueuedOrInFlight<W>( + stream: WritableStreamImpl<W> +): boolean { + if ( + stream[sym.closeRequest] === undefined && + stream[sym.inFlightCloseRequest] === undefined + ) { + return false; + } + return true; +} + +function writableStreamDealWithRejection<W>( + stream: WritableStreamImpl<W>, + error: any +): void { + const state = stream[sym.state]; + if (state === "writable") { + writableStreamStartErroring(stream, error); + return; + } + assert(state === "erroring"); + writableStreamFinishErroring(stream); +} + +function writableStreamDefaultControllerAdvanceQueueIfNeeded<W>( + controller: WritableStreamDefaultControllerImpl<W> +): void { + const stream = controller[sym.controlledWritableStream]; + if (!controller[sym.started]) { + return; + } + if (stream[sym.inFlightWriteRequest]) { + return; + } + const state = stream[sym.state]; + assert(state !== "closed" && state !== "errored"); + if (state === "erroring") { + writableStreamFinishErroring(stream); + return; + } + if (!controller[sym.queue].length) { + return; + } + const writeRecord = peekQueueValue(controller); + if (writeRecord === "close") { + writableStreamDefaultControllerProcessClose(controller); + } else { + writableStreamDefaultControllerProcessWrite(controller, writeRecord.chunk); + } +} + +export function writableStreamDefaultControllerClearAlgorithms<W>( + controller: WritableStreamDefaultControllerImpl<W> +): void { + (controller as any)[sym.writeAlgorithm] = undefined; + (controller as any)[sym.closeAlgorithm] = undefined; + (controller as any)[sym.abortAlgorithm] = undefined; + (controller as any)[sym.strategySizeAlgorithm] = undefined; +} + +function writableStreamDefaultControllerClose<W>( + controller: WritableStreamDefaultControllerImpl<W> +): void { + enqueueValueWithSize(controller, "close", 0); + writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); +} + +export function writableStreamDefaultControllerError<W>( + controller: WritableStreamDefaultControllerImpl<W>, + error: any +): void { + const stream = controller[sym.controlledWritableStream]; + assert(stream[sym.state] === "writable"); + writableStreamDefaultControllerClearAlgorithms(controller); + writableStreamStartErroring(stream, error); +} + +function writableStreamDefaultControllerErrorIfNeeded<W>( + controller: WritableStreamDefaultControllerImpl<W>, + error: any +): void { + if (controller[sym.controlledWritableStream][sym.state] === "writable") { + writableStreamDefaultControllerError(controller, error); + } +} + +function writableStreamDefaultControllerGetBackpressure<W>( + controller: WritableStreamDefaultControllerImpl<W> +): boolean { + const desiredSize = writableStreamDefaultControllerGetDesiredSize(controller); + return desiredSize <= 0; +} + +function writableStreamDefaultControllerGetChunkSize<W>( + controller: WritableStreamDefaultControllerImpl<W>, + chunk: W +): number { + let returnValue: number; + try { + returnValue = controller[sym.strategySizeAlgorithm](chunk); + } catch (e) { + writableStreamDefaultControllerErrorIfNeeded(controller, e); + return 1; + } + return returnValue; +} + +function writableStreamDefaultControllerGetDesiredSize<W>( + controller: WritableStreamDefaultControllerImpl<W> +): number { + return controller[sym.strategyHWM] - controller[sym.queueTotalSize]; +} + +function writableStreamDefaultControllerProcessClose<W>( + controller: WritableStreamDefaultControllerImpl<W> +): void { + const stream = controller[sym.controlledWritableStream]; + writableStreamMarkCloseRequestInFlight(stream); + dequeueValue(controller); + assert(controller[sym.queue].length === 0); + const sinkClosePromise = controller[sym.closeAlgorithm](); + writableStreamDefaultControllerClearAlgorithms(controller); + setPromiseIsHandledToTrue( + sinkClosePromise.then( + () => { + writableStreamFinishInFlightClose(stream); + }, + (reason) => { + writableStreamFinishInFlightCloseWithError(stream, reason); + } + ) + ); +} + +function writableStreamDefaultControllerProcessWrite<W>( + controller: WritableStreamDefaultControllerImpl<W>, + chunk: W +): void { + const stream = controller[sym.controlledWritableStream]; + writableStreamMarkFirstWriteRequestInFlight(stream); + const sinkWritePromise = controller[sym.writeAlgorithm](chunk); + setPromiseIsHandledToTrue( + sinkWritePromise.then( + () => { + writableStreamFinishInFlightWrite(stream); + const state = stream[sym.state]; + assert(state === "writable" || state === "erroring"); + dequeueValue(controller); + if ( + !writableStreamCloseQueuedOrInFlight(stream) && + state === "writable" + ) { + const backpressure = writableStreamDefaultControllerGetBackpressure( + controller + ); + writableStreamUpdateBackpressure(stream, backpressure); + } + writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + }, + (reason) => { + if (stream[sym.state] === "writable") { + writableStreamDefaultControllerClearAlgorithms(controller); + } + writableStreamFinishInFlightWriteWithError(stream, reason); + } + ) + ); +} + +function writableStreamDefaultControllerWrite<W>( + controller: WritableStreamDefaultControllerImpl<W>, + chunk: W, + chunkSize: number +): void { + const writeRecord = { chunk }; + try { + enqueueValueWithSize(controller, writeRecord, chunkSize); + } catch (e) { + writableStreamDefaultControllerErrorIfNeeded(controller, e); + return; + } + const stream = controller[sym.controlledWritableStream]; + if ( + !writableStreamCloseQueuedOrInFlight(stream) && + stream[sym.state] === "writable" + ) { + const backpressure = writableStreamDefaultControllerGetBackpressure( + controller + ); + writableStreamUpdateBackpressure(stream, backpressure); + } + writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); +} + +export function writableStreamDefaultWriterAbort<W>( + writer: WritableStreamDefaultWriterImpl<W>, + reason: any +): Promise<void> { + const stream = writer[sym.ownerWritableStream]; + assert(stream); + return writableStreamAbort(stream, reason); +} + +export function writableStreamDefaultWriterClose<W>( + writer: WritableStreamDefaultWriterImpl<W> +): Promise<void> { + const stream = writer[sym.ownerWritableStream]; + assert(stream); + return writableStreamClose(stream); +} + +function writableStreamDefaultWriterCloseWithErrorPropagation<W>( + writer: WritableStreamDefaultWriterImpl<W> +): Promise<void> { + const stream = writer[sym.ownerWritableStream]; + assert(stream); + const state = stream[sym.state]; + if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") { + return Promise.resolve(); + } + if (state === "errored") { + return Promise.reject(stream[sym.storedError]); + } + assert(state === "writable" || state === "erroring"); + return writableStreamDefaultWriterClose(writer); +} + +function writableStreamDefaultWriterEnsureClosePromiseRejected<W>( + writer: WritableStreamDefaultWriterImpl<W>, + error: any +): void { + if (writer[sym.closedPromise].reject) { + writer[sym.closedPromise].reject!(error); + } else { + writer[sym.closedPromise] = { + promise: Promise.reject(error), + }; + } + setPromiseIsHandledToTrue(writer[sym.closedPromise].promise); +} + +function writableStreamDefaultWriterEnsureReadyPromiseRejected<W>( + writer: WritableStreamDefaultWriterImpl<W>, + error: any +): void { + if (writer[sym.readyPromise].reject) { + writer[sym.readyPromise].reject!(error); + writer[sym.readyPromise].reject = undefined; + writer[sym.readyPromise].resolve = undefined; + } else { + writer[sym.readyPromise] = { + promise: Promise.reject(error), + }; + } + setPromiseIsHandledToTrue(writer[sym.readyPromise].promise); +} + +export function writableStreamDefaultWriterWrite<W>( + writer: WritableStreamDefaultWriterImpl<W>, + chunk: W +): Promise<void> { + const stream = writer[sym.ownerWritableStream]; + assert(stream); + const controller = stream[sym.writableStreamController]; + assert(controller); + const chunkSize = writableStreamDefaultControllerGetChunkSize( + controller, + chunk + ); + if (stream !== writer[sym.ownerWritableStream]) { + return Promise.reject("Writer has incorrect WritableStream."); + } + const state = stream[sym.state]; + if (state === "errored") { + return Promise.reject(stream[sym.storedError]); + } + if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") { + return Promise.reject(new TypeError("The stream is closed or closing.")); + } + if (state === "erroring") { + return Promise.reject(stream[sym.storedError]); + } + assert(state === "writable"); + const promise = writableStreamAddWriteRequest(stream); + writableStreamDefaultControllerWrite(controller, chunk, chunkSize); + return promise; +} + +export function writableStreamDefaultWriterGetDesiredSize<W>( + writer: WritableStreamDefaultWriterImpl<W> +): number | null { + const stream = writer[sym.ownerWritableStream]; + const state = stream[sym.state]; + if (state === "errored" || state === "erroring") { + return null; + } + if (state === "closed") { + return 0; + } + return writableStreamDefaultControllerGetDesiredSize( + stream[sym.writableStreamController]! + ); +} + +export function writableStreamDefaultWriterRelease<W>( + writer: WritableStreamDefaultWriterImpl<W> +): void { + const stream = writer[sym.ownerWritableStream]; + assert(stream); + assert(stream[sym.writer] === writer); + const releasedError = new TypeError( + "Writer was released and can no longer be used to monitor the stream's closedness." + ); + writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError); + writableStreamDefaultWriterEnsureClosePromiseRejected(writer, releasedError); + stream[sym.writer] = undefined; + (writer as any)[sym.ownerWritableStream] = undefined; +} + +function writableStreamFinishErroring<W>(stream: WritableStreamImpl<W>): void { + assert(stream[sym.state] === "erroring"); + assert(!writableStreamHasOperationMarkedInFlight(stream)); + stream[sym.state] = "errored"; + stream[sym.writableStreamController]![sym.errorSteps](); + const storedError = stream[sym.storedError]; + for (const writeRequest of stream[sym.writeRequests]) { + assert(writeRequest.reject); + writeRequest.reject(storedError); + } + stream[sym.writeRequests] = []; + if (!stream[sym.pendingAbortRequest]) { + writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + return; + } + const abortRequest = stream[sym.pendingAbortRequest]; + assert(abortRequest); + stream[sym.pendingAbortRequest] = undefined; + if (abortRequest.wasAlreadyErroring) { + assert(abortRequest.promise.reject); + abortRequest.promise.reject(storedError); + writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + return; + } + const promise = stream[sym.writableStreamController]; + setPromiseIsHandledToTrue( + promise.then( + () => { + assert(abortRequest.promise.resolve); + abortRequest.promise.resolve(); + writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + }, + (reason) => { + assert(abortRequest.promise.reject); + abortRequest.promise.reject(reason); + writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + } + ) + ); +} + +function writableStreamFinishInFlightClose<W>( + stream: WritableStreamImpl<W> +): void { + assert(stream[sym.inFlightCloseRequest]); + stream[sym.inFlightCloseRequest]?.resolve!(); + stream[sym.inFlightCloseRequest] = undefined; + const state = stream[sym.state]; + assert(state === "writable" || state === "erroring"); + if (state === "erroring") { + stream[sym.storedError] = undefined; + if (stream[sym.pendingAbortRequest]) { + stream[sym.pendingAbortRequest]!.promise.resolve!(); + stream[sym.pendingAbortRequest] = undefined; + } + } + stream[sym.state] = "closed"; + const writer = stream[sym.writer]; + if (writer) { + writer[sym.closedPromise].resolve!(); + } + assert(stream[sym.pendingAbortRequest] === undefined); + assert(stream[sym.storedError] === undefined); +} + +function writableStreamFinishInFlightCloseWithError<W>( + stream: WritableStreamImpl<W>, + error: any +): void { + assert(stream[sym.inFlightCloseRequest]); + stream[sym.inFlightCloseRequest]?.reject!(error); + stream[sym.inFlightCloseRequest] = undefined; + assert(stream[sym.state] === "writable" || stream[sym.state] === "erroring"); + if (stream[sym.pendingAbortRequest]) { + stream[sym.pendingAbortRequest]?.promise.reject!(error); + stream[sym.pendingAbortRequest] = undefined; + } + writableStreamDealWithRejection(stream, error); +} + +function writableStreamFinishInFlightWrite<W>( + stream: WritableStreamImpl<W> +): void { + assert(stream[sym.inFlightWriteRequest]); + stream[sym.inFlightWriteRequest]!.resolve(); + stream[sym.inFlightWriteRequest] = undefined; +} + +function writableStreamFinishInFlightWriteWithError<W>( + stream: WritableStreamImpl<W>, + error: any +): void { + assert(stream[sym.inFlightWriteRequest]); + stream[sym.inFlightWriteRequest]!.reject!(error); + stream[sym.inFlightWriteRequest] = undefined; + assert(stream[sym.state] === "writable" || stream[sym.state] === "erroring"); + writableStreamDealWithRejection(stream, error); +} + +function writableStreamHasOperationMarkedInFlight<W>( + stream: WritableStreamImpl<W> +): boolean { + if ( + stream[sym.inFlightWriteRequest] === undefined && + stream[sym.inFlightCloseRequest] === undefined + ) { + return false; + } + return true; +} + +function writableStreamMarkCloseRequestInFlight<W>( + stream: WritableStreamImpl<W> +): void { + assert(stream[sym.inFlightCloseRequest] === undefined); + assert(stream[sym.closeRequest] !== undefined); + stream[sym.inFlightCloseRequest] = stream[sym.closeRequest]; + stream[sym.closeRequest] = undefined; +} + +function writableStreamMarkFirstWriteRequestInFlight<W>( + stream: WritableStreamImpl<W> +): void { + assert(stream[sym.inFlightWriteRequest] === undefined); + assert(stream[sym.writeRequests].length); + const writeRequest = stream[sym.writeRequests].shift(); + stream[sym.inFlightWriteRequest] = writeRequest; +} + +function writableStreamRejectCloseAndClosedPromiseIfNeeded<W>( + stream: WritableStreamImpl<W> +): void { + assert(stream[sym.state] === "errored"); + if (stream[sym.closeRequest]) { + assert(stream[sym.inFlightCloseRequest] === undefined); + stream[sym.closeRequest]!.reject!(stream[sym.storedError]); + stream[sym.closeRequest] = undefined; + } + const writer = stream[sym.writer]; + if (writer) { + writer[sym.closedPromise].reject!(stream[sym.storedError]); + setPromiseIsHandledToTrue(writer[sym.closedPromise].promise); + } +} + +function writableStreamStartErroring<W>( + stream: WritableStreamImpl<W>, + reason: any +): void { + assert(stream[sym.storedError] === undefined); + assert(stream[sym.state] === "writable"); + const controller = stream[sym.writableStreamController]; + assert(controller); + stream[sym.state] = "erroring"; + stream[sym.storedError] = reason; + const writer = stream[sym.writer]; + if (writer) { + writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason); + } + if ( + !writableStreamHasOperationMarkedInFlight(stream) && + controller[sym.started] + ) { + writableStreamFinishErroring(stream); + } +} + +function writableStreamUpdateBackpressure<W>( + stream: WritableStreamImpl<W>, + backpressure: boolean +): void { + assert(stream[sym.state] === "writable"); + assert(!writableStreamCloseQueuedOrInFlight(stream)); + const writer = stream[sym.writer]; + if (writer && backpressure !== stream[sym.backpressure]) { + if (backpressure) { + writer[sym.readyPromise] = getDeferred(); + } else { + assert(backpressure === false); + writer[sym.readyPromise].resolve!(); + writer[sym.readyPromise].resolve = undefined; + writer[sym.readyPromise].reject = undefined; + } + } + stream[sym.backpressure] = backpressure; +} + /* eslint-enable */ |