diff options
Diffstat (limited to 'cli/js/web/streams')
-rw-r--r-- | cli/js/web/streams/internals.ts | 2405 | ||||
-rw-r--r-- | cli/js/web/streams/queuing_strategy.ts | 53 | ||||
-rw-r--r-- | cli/js/web/streams/readable_byte_stream_controller.ts | 149 | ||||
-rw-r--r-- | cli/js/web/streams/readable_stream.ts | 224 | ||||
-rw-r--r-- | cli/js/web/streams/readable_stream_async_iterator.ts | 80 | ||||
-rw-r--r-- | cli/js/web/streams/readable_stream_default_controller.ts | 126 | ||||
-rw-r--r-- | cli/js/web/streams/readable_stream_default_reader.ts | 92 | ||||
-rw-r--r-- | cli/js/web/streams/symbols.ts | 61 | ||||
-rw-r--r-- | cli/js/web/streams/transform_stream.ts | 118 | ||||
-rw-r--r-- | cli/js/web/streams/transform_stream_default_controller.ts | 75 | ||||
-rw-r--r-- | cli/js/web/streams/writable_stream.ts | 107 | ||||
-rw-r--r-- | cli/js/web/streams/writable_stream_default_controller.ts | 68 | ||||
-rw-r--r-- | cli/js/web/streams/writable_stream_default_writer.ts | 164 |
13 files changed, 0 insertions, 3722 deletions
diff --git a/cli/js/web/streams/internals.ts b/cli/js/web/streams/internals.ts deleted file mode 100644 index 06c5e304d..000000000 --- a/cli/js/web/streams/internals.ts +++ /dev/null @@ -1,2405 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -// This code closely follows the WHATWG Stream Specification -// See: https://streams.spec.whatwg.org/ -// -// There are some parts that are not fully implemented, and there are some -// comments which point to steps of the specification that are not implemented. - -/* eslint-disable @typescript-eslint/no-explicit-any,require-await */ -import { ReadableByteStreamControllerImpl } from "./readable_byte_stream_controller.ts"; -import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts"; -import { ReadableStreamDefaultReaderImpl } from "./readable_stream_default_reader.ts"; -import { ReadableStreamImpl } from "./readable_stream.ts"; -import * as sym from "./symbols.ts"; -import type { TransformStreamImpl } from "./transform_stream.ts"; -import { TransformStreamDefaultControllerImpl } from "./transform_stream_default_controller.ts"; -import { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts"; -import { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts"; -import { WritableStreamImpl } from "./writable_stream.ts"; -import { AbortSignalImpl } from "../abort_signal.ts"; -import { DOMExceptionImpl as DOMException } from "../dom_exception.ts"; -import { cloneValue, setFunctionName } from "../util.ts"; -import { assert, AssertionError } from "../../util.ts"; - -export type AbortAlgorithm = (reason?: any) => PromiseLike<void>; -export interface AbortRequest { - promise: Deferred<void>; - reason?: any; - wasAlreadyErroring: boolean; -} -export interface BufferQueueItem extends Pair<ArrayBuffer | SharedArrayBuffer> { - offset: number; -} -export type CancelAlgorithm = (reason?: any) => PromiseLike<void>; -export type CloseAlgorithm = () => PromiseLike<void>; -type Container<R = any> = { - [sym.queue]: Array<Pair<R> | BufferQueueItem>; - [sym.queueTotalSize]: number; -}; -export type FlushAlgorithm = () => Promise<void>; -export type Pair<R> = { value: R; size: number }; -export type PullAlgorithm = () => PromiseLike<void>; -export type SizeAlgorithm<T> = (chunk: T) => number; -export type StartAlgorithm = () => void | PromiseLike<void>; -export type TransformAlgorithm<I> = (chunk: I) => Promise<void>; -export type WriteAlgorithm<W> = (chunk: W) => Promise<void>; -export interface Deferred<T> { - promise: Promise<T>; - resolve?: (value?: T | PromiseLike<T>) => void; - reject?: (reason?: any) => void; -} - -export interface ReadableStreamGenericReader<R = any> - extends ReadableStreamReader<R> { - [sym.closedPromise]: Deferred<void>; - [sym.forAuthorCode]: boolean; - [sym.ownerReadableStream]: ReadableStreamImpl<R>; - [sym.readRequests]: Array<Deferred<ReadableStreamReadResult<R>>>; -} - -export interface ReadableStreamAsyncIterator<T = any> extends AsyncIterator<T> { - [sym.asyncIteratorReader]: ReadableStreamDefaultReaderImpl<T>; - [sym.preventCancel]: boolean; - return(value?: any | PromiseLike<any>): Promise<IteratorResult<T>>; -} - -export function acquireReadableStreamDefaultReader<T>( - stream: ReadableStreamImpl<T>, - forAuthorCode = false, -): ReadableStreamDefaultReaderImpl<T> { - const reader = new ReadableStreamDefaultReaderImpl(stream); - reader[sym.forAuthorCode] = forAuthorCode; - return reader; -} - -export function acquireWritableStreamDefaultWriter<W>( - stream: WritableStreamImpl<W>, -): WritableStreamDefaultWriterImpl<W> { - return new WritableStreamDefaultWriterImpl(stream); -} - -export function call<F extends (...args: any[]) => any>( - fn: F, - v: ThisType<F>, - args: Parameters<F>, -): ReturnType<F> { - return Function.prototype.apply.call(fn, v, args); -} - -function createAlgorithmFromUnderlyingMethod< - O extends UnderlyingByteSource | UnderlyingSource | Transformer, - P extends keyof O, ->( - underlyingObject: O, - methodName: P, - algoArgCount: 0, - ...extraArgs: any[] -): () => Promise<void>; - -function createAlgorithmFromUnderlyingMethod< - O extends UnderlyingByteSource | UnderlyingSource | Transformer, - P extends keyof O, ->( - underlyingObject: O, - methodName: P, - algoArgCount: 1, - ...extraArgs: any[] -): (arg: any) => Promise<void>; -function createAlgorithmFromUnderlyingMethod< - O extends UnderlyingByteSource | UnderlyingSource | Transformer, - P extends keyof O, ->( - underlyingObject: O, - methodName: P, - algoArgCount: 0 | 1, - ...extraArgs: any[] -): (() => Promise<void>) | ((arg: any) => Promise<void>) { - const method = underlyingObject[methodName]; - if (method) { - if (!isCallable(method)) { - throw new TypeError("method is not callable"); - } - if (algoArgCount === 0) { - return async (): Promise<void> => - call(method, underlyingObject, extraArgs as any); - } else { - return async (arg: any): Promise<void> => { - const fullArgs = [arg, ...extraArgs]; - return call(method, underlyingObject, fullArgs as any); - }; - } - } - return async (): Promise<void> => undefined; -} - -function createReadableStream<T>( - startAlgorithm: StartAlgorithm, - pullAlgorithm: PullAlgorithm, - cancelAlgorithm: CancelAlgorithm, - highWaterMark = 1, - sizeAlgorithm: SizeAlgorithm<T> = (): number => 1, -): ReadableStreamImpl<T> { - highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark); - const stream: ReadableStreamImpl<T> = Object.create( - ReadableStreamImpl.prototype, - ); - initializeReadableStream(stream); - const controller: ReadableStreamDefaultControllerImpl<T> = Object.create( - ReadableStreamDefaultControllerImpl.prototype, - ); - setUpReadableStreamDefaultController( - stream, - controller, - startAlgorithm, - pullAlgorithm, - cancelAlgorithm, - highWaterMark, - sizeAlgorithm, - ); - return stream; -} - -function createWritableStream<W>( - startAlgorithm: StartAlgorithm, - writeAlgorithm: WriteAlgorithm<W>, - closeAlgorithm: CloseAlgorithm, - abortAlgorithm: AbortAlgorithm, - highWaterMark = 1, - sizeAlgorithm: SizeAlgorithm<W> = (): number => 1, -): WritableStreamImpl<W> { - highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark); - const stream = Object.create(WritableStreamImpl.prototype); - initializeWritableStream(stream); - const controller = Object.create( - WritableStreamDefaultControllerImpl.prototype, - ); - setUpWritableStreamDefaultController( - stream, - controller, - startAlgorithm, - writeAlgorithm, - closeAlgorithm, - abortAlgorithm, - highWaterMark, - sizeAlgorithm, - ); - return stream; -} - -export function dequeueValue<R>(container: Container<R>): R { - assert(sym.queue in container && sym.queueTotalSize in container); - assert(container[sym.queue].length); - const pair = container[sym.queue].shift()!; - container[sym.queueTotalSize] -= pair.size; - if (container[sym.queueTotalSize] <= 0) { - container[sym.queueTotalSize] = 0; - } - return pair.value as R; -} - -function enqueueValueWithSize<R>( - container: Container<R>, - value: R, - size: number, -): void { - assert(sym.queue in container && sym.queueTotalSize in container); - size = Number(size); - if (!isFiniteNonNegativeNumber(size)) { - throw new RangeError("size must be a finite non-negative number."); - } - container[sym.queue].push({ value, size }); - container[sym.queueTotalSize] += size; -} - -/** Non-spec mechanism to "unwrap" a promise and store it to be resolved - * later. */ -export function getDeferred<T>(): Required<Deferred<T>> { - let resolve: (value?: T | PromiseLike<T>) => void; - let reject: (reason?: any) => void; - const promise = new Promise<T>((res, rej) => { - resolve = res; - reject = rej; - }); - return { promise, resolve: resolve!, reject: reject! }; -} - -export function initializeReadableStream<R>( - stream: ReadableStreamImpl<R>, -): void { - stream[sym.state] = "readable"; - stream[sym.reader] = stream[sym.storedError] = undefined; - stream[sym.disturbed] = false; -} - -export function initializeTransformStream<I, O>( - stream: TransformStreamImpl<I, O>, - startPromise: Promise<void>, - writableHighWaterMark: number, - writableSizeAlgorithm: SizeAlgorithm<I>, - readableHighWaterMark: number, - readableSizeAlgorithm: SizeAlgorithm<O>, -): void { - const startAlgorithm = (): Promise<void> => startPromise; - const writeAlgorithm = (chunk: any): Promise<void> => - transformStreamDefaultSinkWriteAlgorithm(stream, chunk); - const abortAlgorithm = (reason: any): Promise<void> => - transformStreamDefaultSinkAbortAlgorithm(stream, reason); - const closeAlgorithm = (): Promise<void> => - transformStreamDefaultSinkCloseAlgorithm(stream); - stream[sym.writable] = createWritableStream( - startAlgorithm, - writeAlgorithm, - closeAlgorithm, - abortAlgorithm, - writableHighWaterMark, - writableSizeAlgorithm, - ); - const pullAlgorithm = (): PromiseLike<void> => - transformStreamDefaultSourcePullAlgorithm(stream); - const cancelAlgorithm = (reason: any): Promise<void> => { - transformStreamErrorWritableAndUnblockWrite(stream, reason); - return Promise.resolve(undefined); - }; - stream[sym.readable] = createReadableStream( - startAlgorithm, - pullAlgorithm, - cancelAlgorithm, - readableHighWaterMark, - readableSizeAlgorithm, - ); - stream[sym.backpressure] = stream[sym.backpressureChangePromise] = undefined; - transformStreamSetBackpressure(stream, true); - Object.defineProperty(stream, sym.transformStreamController, { - value: undefined, - configurable: true, - }); -} - -export function initializeWritableStream<W>( - stream: WritableStreamImpl<W>, -): void { - stream[sym.state] = "writable"; - stream[sym.storedError] = stream[sym.writer] = stream[ - sym.writableStreamController - ] = stream[sym.inFlightWriteRequest] = stream[sym.closeRequest] = stream[ - sym.inFlightCloseRequest - ] = stream[sym.pendingAbortRequest] = undefined; - stream[sym.writeRequests] = []; - stream[sym.backpressure] = false; -} - -export function invokeOrNoop<O extends Record<string, any>, P extends keyof O>( - o: O, - p: P, - ...args: Parameters<O[P]> -): ReturnType<O[P]> | undefined { - assert(o); - const method = o[p]; - if (!method) { - return undefined; - } - return call(method, o, args); -} - -function isCallable(value: unknown): value is (...args: any) => any { - return typeof value === "function"; -} - -export function isDetachedBuffer(value: object): boolean { - return sym.isFakeDetached in value; -} - -function isFiniteNonNegativeNumber(v: unknown): v is number { - return Number.isFinite(v) && (v as number) >= 0; -} - -export function isReadableByteStreamController( - x: unknown, -): x is ReadableByteStreamControllerImpl { - return !( - typeof x !== "object" || - x === null || - !(sym.controlledReadableByteStream in x) - ); -} - -export function isReadableStream(x: unknown): x is ReadableStreamImpl { - return !( - typeof x !== "object" || - x === null || - !(sym.readableStreamController in x) - ); -} - -export function isReadableStreamAsyncIterator( - x: unknown, -): x is ReadableStreamAsyncIterator { - if (typeof x !== "object" || x === null) { - return false; - } - return sym.asyncIteratorReader in x; -} - -export function isReadableStreamDefaultController( - x: unknown, -): x is ReadableStreamDefaultControllerImpl { - return !( - typeof x !== "object" || - x === null || - !(sym.controlledReadableStream in x) - ); -} - -export function isReadableStreamDefaultReader<T>( - x: unknown, -): x is ReadableStreamDefaultReaderImpl<T> { - return !(typeof x !== "object" || x === null || !(sym.readRequests in x)); -} - -export function isReadableStreamLocked(stream: ReadableStreamImpl): boolean { - assert(isReadableStream(stream)); - return !!stream[sym.reader]; -} - -export function isReadableStreamDisturbed(stream: ReadableStream): boolean { - assert(isReadableStream(stream)); - return !!stream[sym.disturbed]; -} - -export function isTransformStream(x: unknown): x is TransformStreamImpl { - return !( - typeof x !== "object" || - x === null || - !(sym.transformStreamController in x) - ); -} - -export function isTransformStreamDefaultController( - x: unknown, -): x is TransformStreamDefaultControllerImpl { - return !( - typeof x !== "object" || - x === null || - !(sym.controlledTransformStream in x) - ); -} - -export function isUnderlyingByteSource( - underlyingSource: UnderlyingByteSource | UnderlyingSource, -): underlyingSource is UnderlyingByteSource { - const { type } = underlyingSource; - const typeString = String(type); - return typeString === "bytes"; -} - -export function isWritableStream(x: unknown): x is WritableStreamImpl { - return !( - typeof x !== "object" || - x === null || - !(sym.writableStreamController in x) - ); -} - -export function isWritableStreamDefaultController( - x: unknown, -): x is WritableStreamDefaultControllerImpl<any> { - return !( - typeof x !== "object" || - x === null || - !(sym.controlledWritableStream in x) - ); -} - -export function isWritableStreamDefaultWriter( - x: unknown, -): x is WritableStreamDefaultWriterImpl<any> { - return !( - typeof x !== "object" || - x === null || - !(sym.ownerWritableStream in x) - ); -} - -export function isWritableStreamLocked(stream: WritableStreamImpl): boolean { - assert(isWritableStream(stream)); - return stream[sym.writer] !== undefined; -} - -export function makeSizeAlgorithmFromSizeFunction<T>( - size: QueuingStrategySizeCallback<T> | undefined, -): SizeAlgorithm<T> { - if (size === undefined) { - return (): number => 1; - } - if (typeof size !== "function") { - throw new TypeError("size must be callable."); - } - return (chunk: T): number => { - return size.call(undefined, chunk); - }; -} - -function peekQueueValue<T>(container: Container<T>): T | "close" { - assert(sym.queue in container && sym.queueTotalSize in container); - assert(container[sym.queue].length); - const [pair] = container[sym.queue]; - return pair.value as T; -} - -function readableByteStreamControllerShouldCallPull( - controller: ReadableByteStreamControllerImpl, -): boolean { - const stream = controller[sym.controlledReadableByteStream]; - if ( - stream[sym.state] !== "readable" || - controller[sym.closeRequested] || - !controller[sym.started] - ) { - return false; - } - if ( - readableStreamHasDefaultReader(stream) && - readableStreamGetNumReadRequests(stream) > 0 - ) { - return true; - } - // 3.13.25.6 If ! ReadableStreamHasBYOBReader(stream) is true and ! - // ReadableStreamGetNumReadIntoRequests(stream) > 0, return true. - const desiredSize = readableByteStreamControllerGetDesiredSize(controller); - assert(desiredSize !== null); - return desiredSize > 0; -} - -export function readableByteStreamControllerCallPullIfNeeded( - controller: ReadableByteStreamControllerImpl, -): void { - const shouldPull = readableByteStreamControllerShouldCallPull(controller); - if (!shouldPull) { - return; - } - if (controller[sym.pulling]) { - controller[sym.pullAgain] = true; - return; - } - assert(controller[sym.pullAgain] === false); - controller[sym.pulling] = true; - const pullPromise = controller[sym.pullAlgorithm](); - setPromiseIsHandledToTrue( - pullPromise.then( - () => { - controller[sym.pulling] = false; - if (controller[sym.pullAgain]) { - controller[sym.pullAgain] = false; - readableByteStreamControllerCallPullIfNeeded(controller); - } - }, - (e) => { - readableByteStreamControllerError(controller, e); - }, - ), - ); -} - -export function readableByteStreamControllerClearAlgorithms( - controller: ReadableByteStreamControllerImpl, -): void { - (controller as any)[sym.pullAlgorithm] = undefined; - (controller as any)[sym.cancelAlgorithm] = undefined; -} - -export function readableByteStreamControllerClose( - controller: ReadableByteStreamControllerImpl, -): void { - const stream = controller[sym.controlledReadableByteStream]; - if (controller[sym.closeRequested] || stream[sym.state] !== "readable") { - return; - } - if (controller[sym.queueTotalSize] > 0) { - controller[sym.closeRequested] = true; - return; - } - // 3.13.6.4 If controller.[[pendingPullIntos]] is not empty, (BYOB Support) - readableByteStreamControllerClearAlgorithms(controller); - readableStreamClose(stream); -} - -export function readableByteStreamControllerEnqueue( - controller: ReadableByteStreamControllerImpl, - chunk: ArrayBufferView, -): void { - const stream = controller[sym.controlledReadableByteStream]; - if (controller[sym.closeRequested] || stream[sym.state] !== "readable") { - return; - } - const { buffer, byteOffset, byteLength } = chunk; - const transferredBuffer = transferArrayBuffer(buffer); - if (readableStreamHasDefaultReader(stream)) { - if (readableStreamGetNumReadRequests(stream) === 0) { - readableByteStreamControllerEnqueueChunkToQueue( - controller, - transferredBuffer, - byteOffset, - byteLength, - ); - } else { - assert(controller[sym.queue].length === 0); - const transferredView = new Uint8Array( - transferredBuffer, - byteOffset, - byteLength, - ); - readableStreamFulfillReadRequest(stream, transferredView, false); - } - // 3.13.9.8 Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true - } else { - assert(!isReadableStreamLocked(stream)); - readableByteStreamControllerEnqueueChunkToQueue( - controller, - transferredBuffer, - byteOffset, - byteLength, - ); - } - readableByteStreamControllerCallPullIfNeeded(controller); -} - -function readableByteStreamControllerEnqueueChunkToQueue( - controller: ReadableByteStreamControllerImpl, - buffer: ArrayBuffer | SharedArrayBuffer, - byteOffset: number, - byteLength: number, -): void { - controller[sym.queue].push({ - value: buffer, - offset: byteOffset, - size: byteLength, - }); - controller[sym.queueTotalSize] += byteLength; -} - -export function readableByteStreamControllerError( - controller: ReadableByteStreamControllerImpl, - e: any, -): void { - const stream = controller[sym.controlledReadableByteStream]; - if (stream[sym.state] !== "readable") { - return; - } - // 3.13.11.3 Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller). - resetQueue(controller); - readableByteStreamControllerClearAlgorithms(controller); - readableStreamError(stream, e); -} - -export function readableByteStreamControllerGetDesiredSize( - controller: ReadableByteStreamControllerImpl, -): number | null { - const stream = controller[sym.controlledReadableByteStream]; - const state = stream[sym.state]; - if (state === "errored") { - return null; - } - if (state === "closed") { - return 0; - } - return controller[sym.strategyHWM] - controller[sym.queueTotalSize]; -} - -export function readableByteStreamControllerHandleQueueDrain( - controller: ReadableByteStreamControllerImpl, -): void { - assert( - controller[sym.controlledReadableByteStream][sym.state] === "readable", - ); - if (controller[sym.queueTotalSize] === 0 && controller[sym.closeRequested]) { - readableByteStreamControllerClearAlgorithms(controller); - readableStreamClose(controller[sym.controlledReadableByteStream]); - } else { - readableByteStreamControllerCallPullIfNeeded(controller); - } -} - -export function readableStreamAddReadRequest<R>( - stream: ReadableStreamImpl<R>, -): Promise<ReadableStreamReadResult<R>> { - assert(isReadableStreamDefaultReader(stream[sym.reader])); - assert(stream[sym.state] === "readable"); - const promise = getDeferred<ReadableStreamReadResult<R>>(); - stream[sym.reader]![sym.readRequests].push(promise); - return promise.promise; -} - -export function readableStreamCancel<T>( - stream: ReadableStreamImpl<T>, - reason: any, -): Promise<void> { - stream[sym.disturbed] = true; - if (stream[sym.state] === "closed") { - return Promise.resolve(); - } - if (stream[sym.state] === "errored") { - return Promise.reject(stream[sym.storedError]); - } - readableStreamClose(stream); - return stream[sym.readableStreamController].then( - () => undefined, - ) as Promise<void>; -} - -export function readableStreamClose<T>(stream: ReadableStreamImpl<T>): void { - assert(stream[sym.state] === "readable"); - stream[sym.state] = "closed"; - const reader = stream[sym.reader]; - if (!reader) { - return; - } - if (isReadableStreamDefaultReader<T>(reader)) { - for (const readRequest of reader[sym.readRequests]) { - assert(readRequest.resolve); - readRequest.resolve( - readableStreamCreateReadResult<T>( - undefined, - true, - reader[sym.forAuthorCode], - ), - ); - } - reader[sym.readRequests] = []; - } - const resolve = reader[sym.closedPromise].resolve; - assert(resolve); - resolve(); -} - -export function readableStreamCreateReadResult<T>( - value: T | undefined, - done: boolean, - forAuthorCode: boolean, -): ReadableStreamReadResult<T> { - const prototype = forAuthorCode ? Object.prototype : null; - assert(typeof done === "boolean"); - const obj: ReadableStreamReadResult<T> = Object.create(prototype); - Object.defineProperties(obj, { - value: { value, writable: true, enumerable: true, configurable: true }, - done: { value: done, writable: true, enumerable: true, configurable: true }, - }); - return obj; -} - -export function readableStreamDefaultControllerCallPullIfNeeded<T>( - controller: ReadableStreamDefaultControllerImpl<T>, -): void { - const shouldPull = readableStreamDefaultControllerShouldCallPull(controller); - if (!shouldPull) { - return; - } - if (controller[sym.pulling]) { - controller[sym.pullAgain] = true; - return; - } - assert(controller[sym.pullAgain] === false); - controller[sym.pulling] = true; - const pullPromise = controller[sym.pullAlgorithm](); - pullPromise.then( - () => { - controller[sym.pulling] = false; - if (controller[sym.pullAgain]) { - controller[sym.pullAgain] = false; - readableStreamDefaultControllerCallPullIfNeeded(controller); - } - }, - (e) => { - readableStreamDefaultControllerError(controller, e); - }, - ); -} - -export function readableStreamDefaultControllerCanCloseOrEnqueue<T>( - controller: ReadableStreamDefaultControllerImpl<T>, -): boolean { - const state = controller[sym.controlledReadableStream][sym.state]; - return !controller[sym.closeRequested] && state === "readable"; -} - -export function readableStreamDefaultControllerClearAlgorithms<T>( - controller: ReadableStreamDefaultControllerImpl<T>, -): void { - (controller as any)[sym.pullAlgorithm] = undefined; - (controller as any)[sym.cancelAlgorithm] = undefined; - (controller as any)[sym.strategySizeAlgorithm] = undefined; -} - -export function readableStreamDefaultControllerClose<T>( - controller: ReadableStreamDefaultControllerImpl<T>, -): void { - if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) { - return; - } - const stream = controller[sym.controlledReadableStream]; - controller[sym.closeRequested] = true; - if (controller[sym.queue].length === 0) { - readableStreamDefaultControllerClearAlgorithms(controller); - readableStreamClose(stream); - } -} - -export function readableStreamDefaultControllerEnqueue<T>( - controller: ReadableStreamDefaultControllerImpl<T>, - chunk: T, -): void { - if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) { - return; - } - const stream = controller[sym.controlledReadableStream]; - if ( - isReadableStreamLocked(stream) && - readableStreamGetNumReadRequests(stream) > 0 - ) { - readableStreamFulfillReadRequest(stream, chunk, false); - } else { - try { - const chunkSize = controller[sym.strategySizeAlgorithm](chunk); - enqueueValueWithSize(controller, chunk, chunkSize); - } catch (err) { - readableStreamDefaultControllerError(controller, err); - throw err; - } - } - readableStreamDefaultControllerCallPullIfNeeded(controller); -} - -export function readableStreamDefaultControllerGetDesiredSize<T>( - controller: ReadableStreamDefaultControllerImpl<T>, -): number | null { - const stream = controller[sym.controlledReadableStream]; - const state = stream[sym.state]; - if (state === "errored") { - return null; - } - if (state === "closed") { - return 0; - } - return controller[sym.strategyHWM] - controller[sym.queueTotalSize]; -} - -export function readableStreamDefaultControllerError<T>( - controller: ReadableStreamDefaultControllerImpl<T>, - e: any, -): void { - const stream = controller[sym.controlledReadableStream]; - if (stream[sym.state] !== "readable") { - return; - } - resetQueue(controller); - readableStreamDefaultControllerClearAlgorithms(controller); - readableStreamError(stream, e); -} - -function readableStreamDefaultControllerHasBackpressure<T>( - controller: ReadableStreamDefaultControllerImpl<T>, -): boolean { - return readableStreamDefaultControllerShouldCallPull(controller); -} - -function readableStreamDefaultControllerShouldCallPull<T>( - controller: ReadableStreamDefaultControllerImpl<T>, -): boolean { - const stream = controller[sym.controlledReadableStream]; - if ( - !readableStreamDefaultControllerCanCloseOrEnqueue(controller) || - controller[sym.started] === false - ) { - return false; - } - if ( - isReadableStreamLocked(stream) && - readableStreamGetNumReadRequests(stream) > 0 - ) { - return true; - } - const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller); - assert(desiredSize !== null); - return desiredSize > 0; -} - -export function readableStreamDefaultReaderRead<R>( - reader: ReadableStreamDefaultReaderImpl<R>, -): Promise<ReadableStreamReadResult<R>> { - const stream = reader[sym.ownerReadableStream]; - assert(stream); - stream[sym.disturbed] = true; - if (stream[sym.state] === "closed") { - return Promise.resolve( - readableStreamCreateReadResult<R>( - undefined, - true, - reader[sym.forAuthorCode], - ), - ); - } - if (stream[sym.state] === "errored") { - return Promise.reject(stream[sym.storedError]); - } - assert(stream[sym.state] === "readable"); - return (stream[ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl)[sym.pullSteps](); -} - -export function readableStreamError(stream: ReadableStreamImpl, e: any): void { - assert(isReadableStream(stream)); - assert(stream[sym.state] === "readable"); - stream[sym.state] = "errored"; - stream[sym.storedError] = e; - const reader = stream[sym.reader]; - if (reader === undefined) { - return; - } - if (isReadableStreamDefaultReader(reader)) { - for (const readRequest of reader[sym.readRequests]) { - assert(readRequest.reject); - readRequest.reject(e); - readRequest.reject = undefined; - readRequest.resolve = undefined; - } - reader[sym.readRequests] = []; - } - // 3.5.6.8 Otherwise, support BYOB Reader - reader[sym.closedPromise].reject!(e); - reader[sym.closedPromise].reject = undefined; - reader[sym.closedPromise].resolve = undefined; - setPromiseIsHandledToTrue(reader[sym.closedPromise].promise); -} - -export function readableStreamFulfillReadRequest<R>( - stream: ReadableStreamImpl<R>, - chunk: R, - done: boolean, -): void { - const reader = stream[sym.reader]!; - const readRequest = reader[sym.readRequests].shift()!; - assert(readRequest.resolve); - readRequest.resolve( - readableStreamCreateReadResult(chunk, done, reader[sym.forAuthorCode]), - ); -} - -export function readableStreamGetNumReadRequests( - stream: ReadableStreamImpl, -): number { - return stream[sym.reader]?.[sym.readRequests].length ?? 0; -} - -export function readableStreamHasDefaultReader( - stream: ReadableStreamImpl, -): boolean { - const reader = stream[sym.reader]; - return !(reader === undefined || !isReadableStreamDefaultReader(reader)); -} - -export function readableStreamPipeTo<T>( - source: ReadableStreamImpl<T>, - dest: WritableStreamImpl<T>, - preventClose: boolean, - preventAbort: boolean, - preventCancel: boolean, - signal: AbortSignalImpl | undefined, -): Promise<void> { - assert(isReadableStream(source)); - assert(isWritableStream(dest)); - assert( - typeof preventClose === "boolean" && - typeof preventAbort === "boolean" && - typeof preventCancel === "boolean", - ); - assert(signal === undefined || signal instanceof AbortSignalImpl); - assert(!isReadableStreamLocked(source)); - assert(!isWritableStreamLocked(dest)); - const reader = acquireReadableStreamDefaultReader(source); - const writer = acquireWritableStreamDefaultWriter(dest); - source[sym.disturbed] = true; - let shuttingDown = false; - const promise = getDeferred<void>(); - let abortAlgorithm: () => void; - if (signal) { - abortAlgorithm = (): void => { - const error = new DOMException("Abort signal received.", "AbortSignal"); - const actions: Array<() => Promise<void>> = []; - if (!preventAbort) { - actions.push(() => { - if (dest[sym.state] === "writable") { - return writableStreamAbort(dest, error); - } else { - return Promise.resolve(undefined); - } - }); - } - if (!preventCancel) { - actions.push(() => { - if (source[sym.state] === "readable") { - return readableStreamCancel(source, error); - } else { - return Promise.resolve(undefined); - } - }); - } - shutdownWithAction( - () => Promise.all(actions.map((action) => action())), - true, - error, - ); - }; - if (signal.aborted) { - abortAlgorithm(); - return promise.promise; - } - signal.addEventListener("abort", abortAlgorithm); - } - - let currentWrite = Promise.resolve(); - - // At this point, the spec becomes non-specific and vague. Most of the rest - // of this code is based on the reference implementation that is part of the - // specification. This is why the functions are only scoped to this function - // to ensure they don't leak into the spec compliant parts. - - function isOrBecomesClosed( - stream: ReadableStreamImpl | WritableStreamImpl, - promise: Promise<void>, - action: () => void, - ): void { - if (stream[sym.state] === "closed") { - action(); - } else { - setPromiseIsHandledToTrue(promise.then(action)); - } - } - - function isOrBecomesErrored( - stream: ReadableStreamImpl | WritableStreamImpl, - promise: Promise<void>, - action: (error: any) => void, - ): void { - if (stream[sym.state] === "errored") { - action(stream[sym.storedError]); - } else { - setPromiseIsHandledToTrue(promise.catch((error) => action(error))); - } - } - - function finalize(isError?: boolean, error?: any): void { - writableStreamDefaultWriterRelease(writer); - readableStreamReaderGenericRelease(reader); - - if (signal) { - signal.removeEventListener("abort", abortAlgorithm); - } - if (isError) { - promise.reject(error); - } else { - promise.resolve(); - } - } - - function waitForWritesToFinish(): Promise<void> { - const oldCurrentWrite = currentWrite; - return currentWrite.then(() => - oldCurrentWrite !== currentWrite ? waitForWritesToFinish() : undefined - ); - } - - function shutdownWithAction( - action: () => Promise<any>, - originalIsError?: boolean, - originalError?: any, - ): void { - function doTheRest(): void { - setPromiseIsHandledToTrue( - action().then( - () => finalize(originalIsError, originalError), - (newError) => finalize(true, newError), - ), - ); - } - - if (shuttingDown) { - return; - } - shuttingDown = true; - - if ( - dest[sym.state] === "writable" && - writableStreamCloseQueuedOrInFlight(dest) === false - ) { - setPromiseIsHandledToTrue(waitForWritesToFinish().then(doTheRest)); - } else { - doTheRest(); - } - } - - function shutdown(isError: boolean, error?: any): void { - if (shuttingDown) { - return; - } - shuttingDown = true; - - if ( - dest[sym.state] === "writable" && - !writableStreamCloseQueuedOrInFlight(dest) - ) { - setPromiseIsHandledToTrue( - waitForWritesToFinish().then(() => finalize(isError, error)), - ); - } - finalize(isError, error); - } - - function pipeStep(): Promise<boolean> { - if (shuttingDown) { - return Promise.resolve(true); - } - return writer[sym.readyPromise].promise.then(() => { - return readableStreamDefaultReaderRead(reader).then(({ value, done }) => { - if (done === true) { - return true; - } - currentWrite = writableStreamDefaultWriterWrite( - writer, - value!, - ).then(undefined, () => {}); - return false; - }); - }); - } - - function pipeLoop(): Promise<void> { - return new Promise((resolveLoop, rejectLoop) => { - function next(done: boolean): void { - if (done) { - resolveLoop(undefined); - } else { - setPromiseIsHandledToTrue(pipeStep().then(next, rejectLoop)); - } - } - next(false); - }); - } - - isOrBecomesErrored( - source, - reader[sym.closedPromise].promise, - (storedError) => { - if (!preventAbort) { - shutdownWithAction( - () => writableStreamAbort(dest, storedError), - true, - storedError, - ); - } else { - shutdown(true, storedError); - } - }, - ); - - isOrBecomesErrored(dest, writer[sym.closedPromise].promise, (storedError) => { - if (!preventCancel) { - shutdownWithAction( - () => readableStreamCancel(source, storedError), - true, - storedError, - ); - } else { - shutdown(true, storedError); - } - }); - - isOrBecomesClosed(source, reader[sym.closedPromise].promise, () => { - if (!preventClose) { - shutdownWithAction(() => - writableStreamDefaultWriterCloseWithErrorPropagation(writer) - ); - } - }); - - if ( - writableStreamCloseQueuedOrInFlight(dest) || - dest[sym.state] === "closed" - ) { - const destClosed = new TypeError( - "The destination writable stream closed before all data could be piped to it.", - ); - if (!preventCancel) { - shutdownWithAction( - () => readableStreamCancel(source, destClosed), - true, - destClosed, - ); - } else { - shutdown(true, destClosed); - } - } - - setPromiseIsHandledToTrue(pipeLoop()); - return promise.promise; -} - -export function readableStreamReaderGenericCancel<R = any>( - reader: ReadableStreamGenericReader<R>, - reason: any, -): Promise<void> { - const stream = reader[sym.ownerReadableStream]; - assert(stream); - return readableStreamCancel(stream, reason); -} - -export function readableStreamReaderGenericInitialize<R = any>( - reader: ReadableStreamGenericReader<R>, - stream: ReadableStreamImpl<R>, -): void { - reader[sym.forAuthorCode] = true; - reader[sym.ownerReadableStream] = stream; - stream[sym.reader] = reader; - if (stream[sym.state] === "readable") { - reader[sym.closedPromise] = getDeferred(); - } else if (stream[sym.state] === "closed") { - reader[sym.closedPromise] = { promise: Promise.resolve() }; - } else { - assert(stream[sym.state] === "errored"); - reader[sym.closedPromise] = { - promise: Promise.reject(stream[sym.storedError]), - }; - setPromiseIsHandledToTrue(reader[sym.closedPromise].promise); - } -} - -export function readableStreamReaderGenericRelease<R = any>( - reader: ReadableStreamGenericReader<R>, -): void { - assert(reader[sym.ownerReadableStream]); - assert(reader[sym.ownerReadableStream][sym.reader] === reader); - const closedPromise = reader[sym.closedPromise]; - if (reader[sym.ownerReadableStream][sym.state] === "readable") { - assert(closedPromise.reject); - closedPromise.reject(new TypeError("ReadableStream state is readable.")); - } else { - closedPromise.promise = Promise.reject(new TypeError("Reading is closed.")); - delete closedPromise.reject; - delete closedPromise.resolve; - } - setPromiseIsHandledToTrue(closedPromise.promise); - reader[sym.ownerReadableStream][sym.reader] = undefined; - (reader as any)[sym.ownerReadableStream] = undefined; -} - -export function readableStreamTee<T>( - stream: ReadableStreamImpl<T>, - cloneForBranch2: boolean, -): [ReadableStreamImpl<T>, ReadableStreamImpl<T>] { - assert(isReadableStream(stream)); - assert(typeof cloneForBranch2 === "boolean"); - const reader = acquireReadableStreamDefaultReader(stream); - let reading = false; - let canceled1 = false; - let canceled2 = false; - let reason1: any = undefined; - let reason2: any = undefined; - /* eslint-disable prefer-const */ - let branch1: ReadableStreamImpl<T>; - let branch2: ReadableStreamImpl<T>; - /* eslint-enable prefer-const */ - const cancelPromise = getDeferred<void>(); - const pullAlgorithm = (): PromiseLike<void> => { - if (reading) { - return Promise.resolve(); - } - reading = true; - const readPromise = readableStreamDefaultReaderRead(reader).then( - (result) => { - reading = false; - assert(typeof result === "object"); - const { done } = result; - assert(typeof done === "boolean"); - if (done) { - if (!canceled1) { - readableStreamDefaultControllerClose( - branch1[ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl, - ); - } - if (!canceled2) { - readableStreamDefaultControllerClose( - branch2[ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl, - ); - } - return; - } - const { value } = result; - const value1 = value!; - let value2 = value!; - if (!canceled2 && cloneForBranch2) { - value2 = cloneValue(value2); - } - if (!canceled1) { - readableStreamDefaultControllerEnqueue( - branch1[ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl, - value1, - ); - } - if (!canceled2) { - readableStreamDefaultControllerEnqueue( - branch2[ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl, - value2, - ); - } - }, - ); - setPromiseIsHandledToTrue(readPromise); - return Promise.resolve(); - }; - const cancel1Algorithm = (reason?: any): PromiseLike<void> => { - canceled1 = true; - reason1 = reason; - if (canceled2) { - const compositeReason = [reason1, reason2]; - const cancelResult = readableStreamCancel(stream, compositeReason); - cancelPromise.resolve(cancelResult); - } - return cancelPromise.promise; - }; - const cancel2Algorithm = (reason?: any): PromiseLike<void> => { - canceled2 = true; - reason2 = reason; - if (canceled1) { - const compositeReason = [reason1, reason2]; - const cancelResult = readableStreamCancel(stream, compositeReason); - cancelPromise.resolve(cancelResult); - } - return cancelPromise.promise; - }; - const startAlgorithm = (): void => undefined; - branch1 = createReadableStream( - startAlgorithm, - pullAlgorithm, - cancel1Algorithm, - ); - branch2 = createReadableStream( - startAlgorithm, - pullAlgorithm, - cancel2Algorithm, - ); - setPromiseIsHandledToTrue( - reader[sym.closedPromise].promise.catch((r) => { - readableStreamDefaultControllerError( - branch1[ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl, - r, - ); - readableStreamDefaultControllerError( - branch2[ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl, - r, - ); - }), - ); - return [branch1, branch2]; -} - -export function resetQueue<R>(container: Container<R>): void { - assert(sym.queue in container && sym.queueTotalSize in container); - container[sym.queue] = []; - container[sym.queueTotalSize] = 0; -} - -/** An internal function which mimics the behavior of setting the promise to - * handled in JavaScript. In this situation, an assertion failure, which - * shouldn't happen will get thrown, instead of swallowed. */ -export function setPromiseIsHandledToTrue(promise: PromiseLike<unknown>): void { - promise.then(undefined, (e) => { - if (e && e instanceof AssertionError) { - queueMicrotask(() => { - throw e; - }); - } - }); -} - -function setUpReadableByteStreamController( - stream: ReadableStreamImpl, - controller: ReadableByteStreamControllerImpl, - startAlgorithm: StartAlgorithm, - pullAlgorithm: PullAlgorithm, - cancelAlgorithm: CancelAlgorithm, - highWaterMark: number, - autoAllocateChunkSize: number | undefined, -): void { - assert(stream[sym.readableStreamController] === undefined); - if (autoAllocateChunkSize !== undefined) { - assert(Number.isInteger(autoAllocateChunkSize)); - assert(autoAllocateChunkSize >= 0); - } - controller[sym.controlledReadableByteStream] = stream; - controller[sym.pulling] = controller[sym.pullAgain] = false; - controller[sym.byobRequest] = undefined; - controller[sym.queue] = []; - controller[sym.queueTotalSize] = 0; - controller[sym.closeRequested] = controller[sym.started] = false; - controller[sym.strategyHWM] = validateAndNormalizeHighWaterMark( - highWaterMark, - ); - controller[sym.pullAlgorithm] = pullAlgorithm; - controller[sym.cancelAlgorithm] = cancelAlgorithm; - controller[sym.autoAllocateChunkSize] = autoAllocateChunkSize; - // 3.13.26.12 Set controller.[[pendingPullIntos]] to a new empty List. - stream[sym.readableStreamController] = controller; - const startResult = startAlgorithm(); - const startPromise = Promise.resolve(startResult); - setPromiseIsHandledToTrue( - startPromise.then( - () => { - controller[sym.started] = true; - assert(!controller[sym.pulling]); - assert(!controller[sym.pullAgain]); - readableByteStreamControllerCallPullIfNeeded(controller); - }, - (r) => { - readableByteStreamControllerError(controller, r); - }, - ), - ); -} - -export function setUpReadableByteStreamControllerFromUnderlyingSource( - stream: ReadableStreamImpl, - underlyingByteSource: UnderlyingByteSource, - highWaterMark: number, -): void { - assert(underlyingByteSource); - const controller: ReadableByteStreamControllerImpl = Object.create( - ReadableByteStreamControllerImpl.prototype, - ); - const startAlgorithm: StartAlgorithm = () => { - return invokeOrNoop(underlyingByteSource, "start", controller); - }; - const pullAlgorithm = createAlgorithmFromUnderlyingMethod( - underlyingByteSource, - "pull", - 0, - controller, - ); - setFunctionName(pullAlgorithm, "[[pullAlgorithm]]"); - const cancelAlgorithm = createAlgorithmFromUnderlyingMethod( - underlyingByteSource, - "cancel", - 1, - ); - setFunctionName(cancelAlgorithm, "[[cancelAlgorithm]]"); - // 3.13.27.6 Let autoAllocateChunkSize be ? GetV(underlyingByteSource, "autoAllocateChunkSize"). - const autoAllocateChunkSize = undefined; - setUpReadableByteStreamController( - stream, - controller, - startAlgorithm, - pullAlgorithm, - cancelAlgorithm, - highWaterMark, - autoAllocateChunkSize, - ); -} - -function setUpReadableStreamDefaultController<T>( - stream: ReadableStreamImpl<T>, - controller: ReadableStreamDefaultControllerImpl<T>, - startAlgorithm: StartAlgorithm, - pullAlgorithm: PullAlgorithm, - cancelAlgorithm: CancelAlgorithm, - highWaterMark: number, - sizeAlgorithm: SizeAlgorithm<T>, -): void { - assert(stream[sym.readableStreamController] === undefined); - controller[sym.controlledReadableStream] = stream; - controller[sym.queue] = []; - controller[sym.queueTotalSize] = 0; - controller[sym.started] = controller[sym.closeRequested] = controller[ - sym.pullAgain - ] = controller[sym.pulling] = false; - controller[sym.strategySizeAlgorithm] = sizeAlgorithm; - controller[sym.strategyHWM] = highWaterMark; - controller[sym.pullAlgorithm] = pullAlgorithm; - controller[sym.cancelAlgorithm] = cancelAlgorithm; - stream[sym.readableStreamController] = controller; - const startResult = startAlgorithm(); - const startPromise = Promise.resolve(startResult); - setPromiseIsHandledToTrue( - startPromise.then( - () => { - controller[sym.started] = true; - assert(controller[sym.pulling] === false); - assert(controller[sym.pullAgain] === false); - readableStreamDefaultControllerCallPullIfNeeded(controller); - }, - (r) => { - readableStreamDefaultControllerError(controller, r); - }, - ), - ); -} - -export function setUpReadableStreamDefaultControllerFromUnderlyingSource<T>( - stream: ReadableStreamImpl<T>, - underlyingSource: UnderlyingSource<T>, - highWaterMark: number, - sizeAlgorithm: SizeAlgorithm<T>, -): void { - assert(underlyingSource); - const controller: ReadableStreamDefaultControllerImpl<T> = Object.create( - ReadableStreamDefaultControllerImpl.prototype, - ); - const startAlgorithm: StartAlgorithm = (): void | PromiseLike<void> => - invokeOrNoop(underlyingSource, "start", controller); - const pullAlgorithm: PullAlgorithm = createAlgorithmFromUnderlyingMethod( - underlyingSource, - "pull", - 0, - controller, - ); - setFunctionName(pullAlgorithm, "[[pullAlgorithm]]"); - const cancelAlgorithm: CancelAlgorithm = createAlgorithmFromUnderlyingMethod( - underlyingSource, - "cancel", - 1, - ); - setFunctionName(cancelAlgorithm, "[[cancelAlgorithm]]"); - setUpReadableStreamDefaultController( - stream, - controller, - startAlgorithm, - pullAlgorithm, - cancelAlgorithm, - highWaterMark, - sizeAlgorithm, - ); -} - -function setUpTransformStreamDefaultController<I, O>( - stream: TransformStreamImpl<I, O>, - controller: TransformStreamDefaultControllerImpl<I, O>, - transformAlgorithm: TransformAlgorithm<I>, - flushAlgorithm: FlushAlgorithm, -): void { - assert(isTransformStream(stream)); - assert(stream[sym.transformStreamController] === undefined); - controller[sym.controlledTransformStream] = stream; - stream[sym.transformStreamController] = controller; - controller[sym.transformAlgorithm] = transformAlgorithm; - controller[sym.flushAlgorithm] = flushAlgorithm; -} - -export function setUpTransformStreamDefaultControllerFromTransformer<I, O>( - stream: TransformStreamImpl<I, O>, - transformer: Transformer<I, O>, -): void { - assert(transformer); - const controller = Object.create( - TransformStreamDefaultControllerImpl.prototype, - ) as TransformStreamDefaultControllerImpl<I, O>; - let transformAlgorithm: TransformAlgorithm<I> = (chunk) => { - try { - transformStreamDefaultControllerEnqueue( - controller, - // it defaults to no tranformation, so I is assumed to be O - (chunk as unknown) as O, - ); - } catch (e) { - return Promise.reject(e); - } - return Promise.resolve(); - }; - const transformMethod = transformer.transform; - if (transformMethod) { - if (typeof transformMethod !== "function") { - throw new TypeError("tranformer.transform must be callable."); - } - transformAlgorithm = async (chunk): Promise<void> => - call(transformMethod, transformer, [chunk, controller]); - } - const flushAlgorithm = createAlgorithmFromUnderlyingMethod( - transformer, - "flush", - 0, - controller, - ); - setUpTransformStreamDefaultController( - stream, - controller, - transformAlgorithm, - flushAlgorithm, - ); -} - -function setUpWritableStreamDefaultController<W>( - stream: WritableStreamImpl<W>, - controller: WritableStreamDefaultControllerImpl<W>, - startAlgorithm: StartAlgorithm, - writeAlgorithm: WriteAlgorithm<W>, - closeAlgorithm: CloseAlgorithm, - abortAlgorithm: AbortAlgorithm, - highWaterMark: number, - sizeAlgorithm: SizeAlgorithm<W>, -): void { - assert(isWritableStream(stream)); - assert(stream[sym.writableStreamController] === undefined); - controller[sym.controlledWritableStream] = stream; - stream[sym.writableStreamController] = controller; - controller[sym.queue] = []; - controller[sym.queueTotalSize] = 0; - controller[sym.started] = false; - controller[sym.strategySizeAlgorithm] = sizeAlgorithm; - controller[sym.strategyHWM] = highWaterMark; - controller[sym.writeAlgorithm] = writeAlgorithm; - controller[sym.closeAlgorithm] = closeAlgorithm; - controller[sym.abortAlgorithm] = abortAlgorithm; - const backpressure = writableStreamDefaultControllerGetBackpressure( - controller, - ); - writableStreamUpdateBackpressure(stream, backpressure); - const startResult = startAlgorithm(); - const startPromise = Promise.resolve(startResult); - setPromiseIsHandledToTrue( - startPromise.then( - () => { - assert( - stream[sym.state] === "writable" || stream[sym.state] === "erroring", - ); - controller[sym.started] = true; - writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); - }, - (r) => { - assert( - stream[sym.state] === "writable" || stream[sym.state] === "erroring", - ); - controller[sym.started] = true; - writableStreamDealWithRejection(stream, r); - }, - ), - ); -} - -export function setUpWritableStreamDefaultControllerFromUnderlyingSink<W>( - stream: WritableStreamImpl<W>, - underlyingSink: UnderlyingSink<W>, - highWaterMark: number, - sizeAlgorithm: SizeAlgorithm<W>, -): void { - assert(underlyingSink); - const controller = Object.create( - WritableStreamDefaultControllerImpl.prototype, - ); - const startAlgorithm = (): void | PromiseLike<void> => { - return invokeOrNoop(underlyingSink, "start", controller); - }; - const writeAlgorithm = createAlgorithmFromUnderlyingMethod( - underlyingSink, - "write", - 1, - controller, - ); - setFunctionName(writeAlgorithm, "[[writeAlgorithm]]"); - const closeAlgorithm = createAlgorithmFromUnderlyingMethod( - underlyingSink, - "close", - 0, - ); - setFunctionName(closeAlgorithm, "[[closeAlgorithm]]"); - const abortAlgorithm = createAlgorithmFromUnderlyingMethod( - underlyingSink, - "abort", - 1, - ); - setFunctionName(abortAlgorithm, "[[abortAlgorithm]]"); - setUpWritableStreamDefaultController( - stream, - controller, - startAlgorithm, - writeAlgorithm, - closeAlgorithm, - abortAlgorithm, - highWaterMark, - sizeAlgorithm, - ); -} - -function transformStreamDefaultControllerClearAlgorithms<I, O>( - controller: TransformStreamDefaultControllerImpl<I, O>, -): void { - (controller as any)[sym.transformAlgorithm] = undefined; - (controller as any)[sym.flushAlgorithm] = undefined; -} - -export function transformStreamDefaultControllerEnqueue<I, O>( - controller: TransformStreamDefaultControllerImpl<I, O>, - chunk: O, -): void { - const stream = controller[sym.controlledTransformStream]; - const readableController = stream[sym.readable][ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl<O>; - if (!readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) { - throw new TypeError( - "TransformStream's readable controller cannot be closed or enqueued.", - ); - } - try { - readableStreamDefaultControllerEnqueue(readableController, chunk); - } catch (e) { - transformStreamErrorWritableAndUnblockWrite(stream, e); - throw stream[sym.readable][sym.storedError]; - } - const backpressure = readableStreamDefaultControllerHasBackpressure( - readableController, - ); - if (backpressure) { - transformStreamSetBackpressure(stream, true); - } -} - -export function transformStreamDefaultControllerError<I, O>( - controller: TransformStreamDefaultControllerImpl<I, O>, - e: any, -): void { - transformStreamError(controller[sym.controlledTransformStream], e); -} - -function transformStreamDefaultControllerPerformTransform<I, O>( - controller: TransformStreamDefaultControllerImpl<I, O>, - chunk: I, -): Promise<void> { - const transformPromise = controller[sym.transformAlgorithm](chunk); - return transformPromise.then(undefined, (r) => { - transformStreamError(controller[sym.controlledTransformStream], r); - throw r; - }); -} - -function transformStreamDefaultSinkAbortAlgorithm<I, O>( - stream: TransformStreamImpl<I, O>, - reason: any, -): Promise<void> { - transformStreamError(stream, reason); - return Promise.resolve(undefined); -} - -function transformStreamDefaultSinkCloseAlgorithm<I, O>( - stream: TransformStreamImpl<I, O>, -): Promise<void> { - const readable = stream[sym.readable]; - const controller = stream[sym.transformStreamController]; - const flushPromise = controller[sym.flushAlgorithm](); - transformStreamDefaultControllerClearAlgorithms(controller); - return flushPromise.then( - () => { - if (readable[sym.state] === "errored") { - throw readable[sym.storedError]; - } - const readableController = readable[ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl<O>; - if ( - readableStreamDefaultControllerCanCloseOrEnqueue(readableController) - ) { - readableStreamDefaultControllerClose(readableController); - } - }, - (r) => { - transformStreamError(stream, r); - throw readable[sym.storedError]; - }, - ); -} - -function transformStreamDefaultSinkWriteAlgorithm<I, O>( - stream: TransformStreamImpl<I, O>, - chunk: I, -): Promise<void> { - assert(stream[sym.writable][sym.state] === "writable"); - const controller = stream[sym.transformStreamController]; - if (stream[sym.backpressure]) { - const backpressureChangePromise = stream[sym.backpressureChangePromise]; - assert(backpressureChangePromise); - return backpressureChangePromise.promise.then(() => { - const writable = stream[sym.writable]; - const state = writable[sym.state]; - if (state === "erroring") { - throw writable[sym.storedError]; - } - assert(state === "writable"); - return transformStreamDefaultControllerPerformTransform( - controller, - chunk, - ); - }); - } - return transformStreamDefaultControllerPerformTransform(controller, chunk); -} - -function transformStreamDefaultSourcePullAlgorithm<I, O>( - stream: TransformStreamImpl<I, O>, -): Promise<void> { - assert(stream[sym.backpressure] === true); - assert(stream[sym.backpressureChangePromise] !== undefined); - transformStreamSetBackpressure(stream, false); - return stream[sym.backpressureChangePromise]!.promise; -} - -function transformStreamError<I, O>( - stream: TransformStreamImpl<I, O>, - e: any, -): void { - readableStreamDefaultControllerError( - stream[sym.readable][ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl<O>, - e, - ); - transformStreamErrorWritableAndUnblockWrite(stream, e); -} - -export function transformStreamDefaultControllerTerminate<I, O>( - controller: TransformStreamDefaultControllerImpl<I, O>, -): void { - const stream = controller[sym.controlledTransformStream]; - const readableController = stream[sym.readable][ - sym.readableStreamController - ] as ReadableStreamDefaultControllerImpl<O>; - readableStreamDefaultControllerClose(readableController); - const error = new TypeError("TransformStream is closed."); - transformStreamErrorWritableAndUnblockWrite(stream, error); -} - -function transformStreamErrorWritableAndUnblockWrite<I, O>( - stream: TransformStreamImpl<I, O>, - e: any, -): void { - transformStreamDefaultControllerClearAlgorithms( - stream[sym.transformStreamController], - ); - writableStreamDefaultControllerErrorIfNeeded( - stream[sym.writable][sym.writableStreamController]!, - e, - ); - if (stream[sym.backpressure]) { - transformStreamSetBackpressure(stream, false); - } -} - -function transformStreamSetBackpressure<I, O>( - stream: TransformStreamImpl<I, O>, - backpressure: boolean, -): void { - assert(stream[sym.backpressure] !== backpressure); - if (stream[sym.backpressureChangePromise] !== undefined) { - stream[sym.backpressureChangePromise]!.resolve!(undefined); - } - stream[sym.backpressureChangePromise] = getDeferred<void>(); - stream[sym.backpressure] = backpressure; -} - -function transferArrayBuffer(buffer: ArrayBuffer): ArrayBuffer { - assert(!isDetachedBuffer(buffer)); - const transferredIshVersion = buffer.slice(0); - - Object.defineProperty(buffer, "byteLength", { - get(): number { - return 0; - }, - }); - (buffer as any)[sym.isFakeDetached] = true; - - return transferredIshVersion; -} - -export function validateAndNormalizeHighWaterMark( - highWaterMark: number, -): number { - highWaterMark = Number(highWaterMark); - if (Number.isNaN(highWaterMark) || highWaterMark < 0) { - throw new RangeError( - `highWaterMark must be a positive number or Infinity. Received: ${highWaterMark}.`, - ); - } - return highWaterMark; -} - -export function writableStreamAbort<W>( - stream: WritableStreamImpl<W>, - reason: any, -): Promise<void> { - const state = stream[sym.state]; - if (state === "closed" || state === "errored") { - return Promise.resolve(undefined); - } - if (stream[sym.pendingAbortRequest]) { - return stream[sym.pendingAbortRequest]!.promise.promise; - } - assert(state === "writable" || state === "erroring"); - let wasAlreadyErroring = false; - if (state === "erroring") { - wasAlreadyErroring = true; - reason = undefined; - } - const promise = getDeferred<void>(); - stream[sym.pendingAbortRequest] = { promise, reason, wasAlreadyErroring }; - - if (wasAlreadyErroring === false) { - writableStreamStartErroring(stream, reason); - } - return promise.promise; -} - -function writableStreamAddWriteRequest<W>( - stream: WritableStreamImpl<W>, -): Promise<void> { - assert(isWritableStream(stream)); - assert(stream[sym.state] === "writable"); - const promise = getDeferred<void>(); - stream[sym.writeRequests].push(promise); - return promise.promise; -} - -export function writableStreamClose<W>( - stream: WritableStreamImpl<W>, -): Promise<void> { - const state = stream[sym.state]; - if (state === "closed" || state === "errored") { - return Promise.reject( - new TypeError( - "Cannot close an already closed or errored WritableStream.", - ), - ); - } - assert(!writableStreamCloseQueuedOrInFlight(stream)); - const promise = getDeferred<void>(); - stream[sym.closeRequest] = promise; - const writer = stream[sym.writer]; - if (writer && stream[sym.backpressure] && state === "writable") { - writer[sym.readyPromise].resolve!(); - writer[sym.readyPromise].resolve = undefined; - writer[sym.readyPromise].reject = undefined; - } - writableStreamDefaultControllerClose(stream[sym.writableStreamController]!); - return promise.promise; -} - -export function writableStreamCloseQueuedOrInFlight<W>( - stream: WritableStreamImpl<W>, -): boolean { - return !( - stream[sym.closeRequest] === undefined && - stream[sym.inFlightCloseRequest] === undefined - ); -} - -function writableStreamDealWithRejection<W>( - stream: WritableStreamImpl<W>, - error: any, -): void { - const state = stream[sym.state]; - if (state === "writable") { - writableStreamStartErroring(stream, error); - return; - } - assert(state === "erroring"); - writableStreamFinishErroring(stream); -} - -function writableStreamDefaultControllerAdvanceQueueIfNeeded<W>( - controller: WritableStreamDefaultControllerImpl<W>, -): void { - const stream = controller[sym.controlledWritableStream]; - if (!controller[sym.started]) { - return; - } - if (stream[sym.inFlightWriteRequest]) { - return; - } - const state = stream[sym.state]; - assert(state !== "closed" && state !== "errored"); - if (state === "erroring") { - writableStreamFinishErroring(stream); - return; - } - if (!controller[sym.queue].length) { - return; - } - const writeRecord = peekQueueValue(controller); - if (writeRecord === "close") { - writableStreamDefaultControllerProcessClose(controller); - } else { - writableStreamDefaultControllerProcessWrite(controller, writeRecord.chunk); - } -} - -export function writableStreamDefaultControllerClearAlgorithms<W>( - controller: WritableStreamDefaultControllerImpl<W>, -): void { - (controller as any)[sym.writeAlgorithm] = undefined; - (controller as any)[sym.closeAlgorithm] = undefined; - (controller as any)[sym.abortAlgorithm] = undefined; - (controller as any)[sym.strategySizeAlgorithm] = undefined; -} - -function writableStreamDefaultControllerClose<W>( - controller: WritableStreamDefaultControllerImpl<W>, -): void { - enqueueValueWithSize(controller, "close", 0); - writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); -} - -export function writableStreamDefaultControllerError<W>( - controller: WritableStreamDefaultControllerImpl<W>, - error: any, -): void { - const stream = controller[sym.controlledWritableStream]; - assert(stream[sym.state] === "writable"); - writableStreamDefaultControllerClearAlgorithms(controller); - writableStreamStartErroring(stream, error); -} - -function writableStreamDefaultControllerErrorIfNeeded<W>( - controller: WritableStreamDefaultControllerImpl<W>, - error: any, -): void { - if (controller[sym.controlledWritableStream][sym.state] === "writable") { - writableStreamDefaultControllerError(controller, error); - } -} - -function writableStreamDefaultControllerGetBackpressure<W>( - controller: WritableStreamDefaultControllerImpl<W>, -): boolean { - const desiredSize = writableStreamDefaultControllerGetDesiredSize(controller); - return desiredSize <= 0; -} - -function writableStreamDefaultControllerGetChunkSize<W>( - controller: WritableStreamDefaultControllerImpl<W>, - chunk: W, -): number { - let returnValue: number; - try { - returnValue = controller[sym.strategySizeAlgorithm](chunk); - } catch (e) { - writableStreamDefaultControllerErrorIfNeeded(controller, e); - return 1; - } - return returnValue; -} - -function writableStreamDefaultControllerGetDesiredSize<W>( - controller: WritableStreamDefaultControllerImpl<W>, -): number { - return controller[sym.strategyHWM] - controller[sym.queueTotalSize]; -} - -function writableStreamDefaultControllerProcessClose<W>( - controller: WritableStreamDefaultControllerImpl<W>, -): void { - const stream = controller[sym.controlledWritableStream]; - writableStreamMarkCloseRequestInFlight(stream); - dequeueValue(controller); - assert(controller[sym.queue].length === 0); - const sinkClosePromise = controller[sym.closeAlgorithm](); - writableStreamDefaultControllerClearAlgorithms(controller); - setPromiseIsHandledToTrue( - sinkClosePromise.then( - () => { - writableStreamFinishInFlightClose(stream); - }, - (reason) => { - writableStreamFinishInFlightCloseWithError(stream, reason); - }, - ), - ); -} - -function writableStreamDefaultControllerProcessWrite<W>( - controller: WritableStreamDefaultControllerImpl<W>, - chunk: W, -): void { - const stream = controller[sym.controlledWritableStream]; - writableStreamMarkFirstWriteRequestInFlight(stream); - const sinkWritePromise = controller[sym.writeAlgorithm](chunk); - setPromiseIsHandledToTrue( - sinkWritePromise.then( - () => { - writableStreamFinishInFlightWrite(stream); - const state = stream[sym.state]; - assert(state === "writable" || state === "erroring"); - dequeueValue(controller); - if ( - !writableStreamCloseQueuedOrInFlight(stream) && - state === "writable" - ) { - const backpressure = writableStreamDefaultControllerGetBackpressure( - controller, - ); - writableStreamUpdateBackpressure(stream, backpressure); - } - writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); - }, - (reason) => { - if (stream[sym.state] === "writable") { - writableStreamDefaultControllerClearAlgorithms(controller); - } - writableStreamFinishInFlightWriteWithError(stream, reason); - }, - ), - ); -} - -function writableStreamDefaultControllerWrite<W>( - controller: WritableStreamDefaultControllerImpl<W>, - chunk: W, - chunkSize: number, -): void { - const writeRecord = { chunk }; - try { - enqueueValueWithSize(controller, writeRecord, chunkSize); - } catch (e) { - writableStreamDefaultControllerErrorIfNeeded(controller, e); - return; - } - const stream = controller[sym.controlledWritableStream]; - if ( - !writableStreamCloseQueuedOrInFlight(stream) && - stream[sym.state] === "writable" - ) { - const backpressure = writableStreamDefaultControllerGetBackpressure( - controller, - ); - writableStreamUpdateBackpressure(stream, backpressure); - } - writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); -} - -export function writableStreamDefaultWriterAbort<W>( - writer: WritableStreamDefaultWriterImpl<W>, - reason: any, -): Promise<void> { - const stream = writer[sym.ownerWritableStream]; - assert(stream); - return writableStreamAbort(stream, reason); -} - -export function writableStreamDefaultWriterClose<W>( - writer: WritableStreamDefaultWriterImpl<W>, -): Promise<void> { - const stream = writer[sym.ownerWritableStream]; - assert(stream); - return writableStreamClose(stream); -} - -function writableStreamDefaultWriterCloseWithErrorPropagation<W>( - writer: WritableStreamDefaultWriterImpl<W>, -): Promise<void> { - const stream = writer[sym.ownerWritableStream]; - assert(stream); - const state = stream[sym.state]; - if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") { - return Promise.resolve(); - } - if (state === "errored") { - return Promise.reject(stream[sym.storedError]); - } - assert(state === "writable" || state === "erroring"); - return writableStreamDefaultWriterClose(writer); -} - -function writableStreamDefaultWriterEnsureClosePromiseRejected<W>( - writer: WritableStreamDefaultWriterImpl<W>, - error: any, -): void { - if (writer[sym.closedPromise].reject) { - writer[sym.closedPromise].reject!(error); - } else { - writer[sym.closedPromise] = { - promise: Promise.reject(error), - }; - } - setPromiseIsHandledToTrue(writer[sym.closedPromise].promise); -} - -function writableStreamDefaultWriterEnsureReadyPromiseRejected<W>( - writer: WritableStreamDefaultWriterImpl<W>, - error: any, -): void { - if (writer[sym.readyPromise].reject) { - writer[sym.readyPromise].reject!(error); - writer[sym.readyPromise].reject = undefined; - writer[sym.readyPromise].resolve = undefined; - } else { - writer[sym.readyPromise] = { - promise: Promise.reject(error), - }; - } - setPromiseIsHandledToTrue(writer[sym.readyPromise].promise); -} - -export function writableStreamDefaultWriterWrite<W>( - writer: WritableStreamDefaultWriterImpl<W>, - chunk: W, -): Promise<void> { - const stream = writer[sym.ownerWritableStream]; - assert(stream); - const controller = stream[sym.writableStreamController]; - assert(controller); - const chunkSize = writableStreamDefaultControllerGetChunkSize( - controller, - chunk, - ); - if (stream !== writer[sym.ownerWritableStream]) { - return Promise.reject("Writer has incorrect WritableStream."); - } - const state = stream[sym.state]; - if (state === "errored") { - return Promise.reject(stream[sym.storedError]); - } - if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") { - return Promise.reject(new TypeError("The stream is closed or closing.")); - } - if (state === "erroring") { - return Promise.reject(stream[sym.storedError]); - } - assert(state === "writable"); - const promise = writableStreamAddWriteRequest(stream); - writableStreamDefaultControllerWrite(controller, chunk, chunkSize); - return promise; -} - -export function writableStreamDefaultWriterGetDesiredSize<W>( - writer: WritableStreamDefaultWriterImpl<W>, -): number | null { - const stream = writer[sym.ownerWritableStream]; - const state = stream[sym.state]; - if (state === "errored" || state === "erroring") { - return null; - } - if (state === "closed") { - return 0; - } - return writableStreamDefaultControllerGetDesiredSize( - stream[sym.writableStreamController]!, - ); -} - -export function writableStreamDefaultWriterRelease<W>( - writer: WritableStreamDefaultWriterImpl<W>, -): void { - const stream = writer[sym.ownerWritableStream]; - assert(stream); - assert(stream[sym.writer] === writer); - const releasedError = new TypeError( - "Writer was released and can no longer be used to monitor the stream's closedness.", - ); - writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError); - writableStreamDefaultWriterEnsureClosePromiseRejected(writer, releasedError); - stream[sym.writer] = undefined; - (writer as any)[sym.ownerWritableStream] = undefined; -} - -function writableStreamFinishErroring<W>(stream: WritableStreamImpl<W>): void { - assert(stream[sym.state] === "erroring"); - assert(!writableStreamHasOperationMarkedInFlight(stream)); - stream[sym.state] = "errored"; - stream[sym.writableStreamController]![sym.errorSteps](); - const storedError = stream[sym.storedError]; - for (const writeRequest of stream[sym.writeRequests]) { - assert(writeRequest.reject); - writeRequest.reject(storedError); - } - stream[sym.writeRequests] = []; - if (!stream[sym.pendingAbortRequest]) { - writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); - return; - } - const abortRequest = stream[sym.pendingAbortRequest]; - assert(abortRequest); - stream[sym.pendingAbortRequest] = undefined; - if (abortRequest.wasAlreadyErroring) { - assert(abortRequest.promise.reject); - abortRequest.promise.reject(storedError); - writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); - return; - } - const promise = stream[sym.writableStreamController]; - setPromiseIsHandledToTrue( - promise.then( - () => { - assert(abortRequest.promise.resolve); - abortRequest.promise.resolve(); - writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); - }, - (reason) => { - assert(abortRequest.promise.reject); - abortRequest.promise.reject(reason); - writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); - }, - ), - ); -} - -function writableStreamFinishInFlightClose<W>( - stream: WritableStreamImpl<W>, -): void { - assert(stream[sym.inFlightCloseRequest]); - stream[sym.inFlightCloseRequest]?.resolve!(); - stream[sym.inFlightCloseRequest] = undefined; - const state = stream[sym.state]; - assert(state === "writable" || state === "erroring"); - if (state === "erroring") { - stream[sym.storedError] = undefined; - if (stream[sym.pendingAbortRequest]) { - stream[sym.pendingAbortRequest]!.promise.resolve!(); - stream[sym.pendingAbortRequest] = undefined; - } - } - stream[sym.state] = "closed"; - const writer = stream[sym.writer]; - if (writer) { - writer[sym.closedPromise].resolve!(); - } - assert(stream[sym.pendingAbortRequest] === undefined); - assert(stream[sym.storedError] === undefined); -} - -function writableStreamFinishInFlightCloseWithError<W>( - stream: WritableStreamImpl<W>, - error: any, -): void { - assert(stream[sym.inFlightCloseRequest]); - stream[sym.inFlightCloseRequest]?.reject!(error); - stream[sym.inFlightCloseRequest] = undefined; - assert(stream[sym.state] === "writable" || stream[sym.state] === "erroring"); - if (stream[sym.pendingAbortRequest]) { - stream[sym.pendingAbortRequest]?.promise.reject!(error); - stream[sym.pendingAbortRequest] = undefined; - } - writableStreamDealWithRejection(stream, error); -} - -function writableStreamFinishInFlightWrite<W>( - stream: WritableStreamImpl<W>, -): void { - assert(stream[sym.inFlightWriteRequest]); - stream[sym.inFlightWriteRequest]!.resolve(); - stream[sym.inFlightWriteRequest] = undefined; -} - -function writableStreamFinishInFlightWriteWithError<W>( - stream: WritableStreamImpl<W>, - error: any, -): void { - assert(stream[sym.inFlightWriteRequest]); - stream[sym.inFlightWriteRequest]!.reject!(error); - stream[sym.inFlightWriteRequest] = undefined; - assert(stream[sym.state] === "writable" || stream[sym.state] === "erroring"); - writableStreamDealWithRejection(stream, error); -} - -function writableStreamHasOperationMarkedInFlight<W>( - stream: WritableStreamImpl<W>, -): boolean { - return !( - stream[sym.inFlightWriteRequest] === undefined && - stream[sym.inFlightCloseRequest] === undefined - ); -} - -function writableStreamMarkCloseRequestInFlight<W>( - stream: WritableStreamImpl<W>, -): void { - assert(stream[sym.inFlightCloseRequest] === undefined); - assert(stream[sym.closeRequest] !== undefined); - stream[sym.inFlightCloseRequest] = stream[sym.closeRequest]; - stream[sym.closeRequest] = undefined; -} - -function writableStreamMarkFirstWriteRequestInFlight<W>( - stream: WritableStreamImpl<W>, -): void { - assert(stream[sym.inFlightWriteRequest] === undefined); - assert(stream[sym.writeRequests].length); - const writeRequest = stream[sym.writeRequests].shift(); - stream[sym.inFlightWriteRequest] = writeRequest; -} - -function writableStreamRejectCloseAndClosedPromiseIfNeeded<W>( - stream: WritableStreamImpl<W>, -): void { - assert(stream[sym.state] === "errored"); - if (stream[sym.closeRequest]) { - assert(stream[sym.inFlightCloseRequest] === undefined); - stream[sym.closeRequest]!.reject!(stream[sym.storedError]); - stream[sym.closeRequest] = undefined; - } - const writer = stream[sym.writer]; - if (writer) { - writer[sym.closedPromise].reject!(stream[sym.storedError]); - setPromiseIsHandledToTrue(writer[sym.closedPromise].promise); - } -} - -function writableStreamStartErroring<W>( - stream: WritableStreamImpl<W>, - reason: any, -): void { - assert(stream[sym.storedError] === undefined); - assert(stream[sym.state] === "writable"); - const controller = stream[sym.writableStreamController]; - assert(controller); - stream[sym.state] = "erroring"; - stream[sym.storedError] = reason; - const writer = stream[sym.writer]; - if (writer) { - writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason); - } - if ( - !writableStreamHasOperationMarkedInFlight(stream) && - controller[sym.started] - ) { - writableStreamFinishErroring(stream); - } -} - -function writableStreamUpdateBackpressure<W>( - stream: WritableStreamImpl<W>, - backpressure: boolean, -): void { - assert(stream[sym.state] === "writable"); - assert(!writableStreamCloseQueuedOrInFlight(stream)); - const writer = stream[sym.writer]; - if (writer && backpressure !== stream[sym.backpressure]) { - if (backpressure) { - writer[sym.readyPromise] = getDeferred(); - } else { - assert(backpressure === false); - writer[sym.readyPromise].resolve!(); - writer[sym.readyPromise].resolve = undefined; - writer[sym.readyPromise].reject = undefined; - } - } - stream[sym.backpressure] = backpressure; -} - -/* eslint-enable */ diff --git a/cli/js/web/streams/queuing_strategy.ts b/cli/js/web/streams/queuing_strategy.ts deleted file mode 100644 index 818f0d51b..000000000 --- a/cli/js/web/streams/queuing_strategy.ts +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -import { customInspect } from "../console.ts"; -import { setFunctionName } from "../util.ts"; - -export class CountQueuingStrategyImpl implements CountQueuingStrategy { - highWaterMark: number; - - constructor({ highWaterMark }: { highWaterMark: number }) { - this.highWaterMark = highWaterMark; - } - - size(): 1 { - return 1; - } - - [customInspect](): string { - return `${this.constructor.name} { highWaterMark: ${ - String(this.highWaterMark) - }, size: f }`; - } -} - -Object.defineProperty(CountQueuingStrategyImpl.prototype, "size", { - enumerable: true, -}); - -setFunctionName(CountQueuingStrategyImpl, "CountQueuingStrategy"); - -export class ByteLengthQueuingStrategyImpl - implements ByteLengthQueuingStrategy { - highWaterMark: number; - - constructor({ highWaterMark }: { highWaterMark: number }) { - this.highWaterMark = highWaterMark; - } - - size(chunk: ArrayBufferView): number { - return chunk.byteLength; - } - - [customInspect](): string { - return `${this.constructor.name} { highWaterMark: ${ - String(this.highWaterMark) - }, size: f }`; - } -} - -Object.defineProperty(ByteLengthQueuingStrategyImpl.prototype, "size", { - enumerable: true, -}); - -setFunctionName(CountQueuingStrategyImpl, "CountQueuingStrategy"); diff --git a/cli/js/web/streams/readable_byte_stream_controller.ts b/cli/js/web/streams/readable_byte_stream_controller.ts deleted file mode 100644 index eb4374604..000000000 --- a/cli/js/web/streams/readable_byte_stream_controller.ts +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -import { - BufferQueueItem, - CancelAlgorithm, - isDetachedBuffer, - isReadableByteStreamController, - PullAlgorithm, - resetQueue, - readableByteStreamControllerCallPullIfNeeded, - readableByteStreamControllerClearAlgorithms, - readableByteStreamControllerClose, - readableByteStreamControllerEnqueue, - readableByteStreamControllerError, - readableByteStreamControllerGetDesiredSize, - readableByteStreamControllerHandleQueueDrain, - readableStreamAddReadRequest, - readableStreamHasDefaultReader, - readableStreamGetNumReadRequests, - readableStreamCreateReadResult, -} from "./internals.ts"; -import type { ReadableStreamImpl } from "./readable_stream.ts"; -import * as sym from "./symbols.ts"; -import { assert } from "../../util.ts"; -import { customInspect } from "../console.ts"; -import { setFunctionName } from "../util.ts"; - -export class ReadableByteStreamControllerImpl - implements ReadableByteStreamController { - [sym.autoAllocateChunkSize]: number | undefined; - [sym.byobRequest]: undefined; - [sym.cancelAlgorithm]: CancelAlgorithm; - [sym.closeRequested]: boolean; - [sym.controlledReadableByteStream]: ReadableStreamImpl<Uint8Array>; - [sym.pullAgain]: boolean; - [sym.pullAlgorithm]: PullAlgorithm; - [sym.pulling]: boolean; - [sym.queue]: BufferQueueItem[]; - [sym.queueTotalSize]: number; - [sym.started]: boolean; - [sym.strategyHWM]: number; - - private constructor() { - throw new TypeError( - "ReadableByteStreamController's constructor cannot be called.", - ); - } - - get byobRequest(): undefined { - return undefined; - } - - get desiredSize(): number | null { - if (!isReadableByteStreamController(this)) { - throw new TypeError("Invalid ReadableByteStreamController."); - } - return readableByteStreamControllerGetDesiredSize(this); - } - - close(): void { - if (!isReadableByteStreamController(this)) { - throw new TypeError("Invalid ReadableByteStreamController."); - } - if (this[sym.closeRequested]) { - throw new TypeError("Closed already requested."); - } - if (this[sym.controlledReadableByteStream][sym.state] !== "readable") { - throw new TypeError( - "ReadableByteStreamController's stream is not in a readable state.", - ); - } - readableByteStreamControllerClose(this); - } - - enqueue(chunk: ArrayBufferView): void { - if (!isReadableByteStreamController(this)) { - throw new TypeError("Invalid ReadableByteStreamController."); - } - if (this[sym.closeRequested]) { - throw new TypeError("Closed already requested."); - } - if (this[sym.controlledReadableByteStream][sym.state] !== "readable") { - throw new TypeError( - "ReadableByteStreamController's stream is not in a readable state.", - ); - } - if (!ArrayBuffer.isView(chunk)) { - throw new TypeError( - "You can only enqueue array buffer views when using a ReadableByteStreamController", - ); - } - if (isDetachedBuffer(chunk.buffer)) { - throw new TypeError("Cannot enqueue a view onto a detached ArrayBuffer"); - } - readableByteStreamControllerEnqueue(this, chunk); - } - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - error(error?: any): void { - if (!isReadableByteStreamController(this)) { - throw new TypeError("Invalid ReadableByteStreamController."); - } - readableByteStreamControllerError(this, error); - } - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - [sym.cancelSteps](reason: any): PromiseLike<void> { - // 3.11.5.1.1 If this.[[pendingPullIntos]] is not empty, - resetQueue(this); - const result = this[sym.cancelAlgorithm](reason); - readableByteStreamControllerClearAlgorithms(this); - return result; - } - - [sym.pullSteps](): Promise<ReadableStreamReadResult<Uint8Array>> { - const stream = this[sym.controlledReadableByteStream]; - assert(readableStreamHasDefaultReader(stream)); - if (this[sym.queueTotalSize] > 0) { - assert(readableStreamGetNumReadRequests(stream) === 0); - const entry = this[sym.queue].shift(); - assert(entry); - this[sym.queueTotalSize] -= entry.size; - readableByteStreamControllerHandleQueueDrain(this); - const view = new Uint8Array(entry.value, entry.offset, entry.size); - return Promise.resolve( - readableStreamCreateReadResult( - view, - false, - stream[sym.reader]![sym.forAuthorCode], - ), - ); - } - // 3.11.5.2.5 If autoAllocateChunkSize is not undefined, - const promise = readableStreamAddReadRequest(stream); - readableByteStreamControllerCallPullIfNeeded(this); - return promise; - } - - [customInspect](): string { - return `${this.constructor.name} { byobRequest: ${ - String(this.byobRequest) - }, desiredSize: ${String(this.desiredSize)} }`; - } -} - -setFunctionName( - ReadableByteStreamControllerImpl, - "ReadableByteStreamController", -); diff --git a/cli/js/web/streams/readable_stream.ts b/cli/js/web/streams/readable_stream.ts deleted file mode 100644 index 086535bd2..000000000 --- a/cli/js/web/streams/readable_stream.ts +++ /dev/null @@ -1,224 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -import { - acquireReadableStreamDefaultReader, - initializeReadableStream, - isReadableStream, - isReadableStreamLocked, - isUnderlyingByteSource, - isWritableStream, - isWritableStreamLocked, - makeSizeAlgorithmFromSizeFunction, - setPromiseIsHandledToTrue, - readableStreamCancel, - ReadableStreamGenericReader, - readableStreamPipeTo, - readableStreamTee, - setUpReadableByteStreamControllerFromUnderlyingSource, - setUpReadableStreamDefaultControllerFromUnderlyingSource, - validateAndNormalizeHighWaterMark, -} from "./internals.ts"; -import type { ReadableByteStreamControllerImpl } from "./readable_byte_stream_controller.ts"; -import { ReadableStreamAsyncIteratorPrototype } from "./readable_stream_async_iterator.ts"; -import type { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts"; -import * as sym from "./symbols.ts"; -import { customInspect } from "../console.ts"; -import { AbortSignalImpl } from "../abort_signal.ts"; -import { setFunctionName } from "../util.ts"; - -// eslint-disable-next-line @typescript-eslint/no-explicit-any -export class ReadableStreamImpl<R = any> implements ReadableStream<R> { - [sym.disturbed]: boolean; - [sym.readableStreamController]: - | ReadableStreamDefaultControllerImpl<R> - | ReadableByteStreamControllerImpl; - [sym.reader]: ReadableStreamGenericReader<R> | undefined; - [sym.state]: "readable" | "closed" | "errored"; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - [sym.storedError]: any; - - constructor( - underlyingSource: UnderlyingByteSource | UnderlyingSource<R> = {}, - strategy: - | { - highWaterMark?: number; - size?: undefined; - } - | QueuingStrategy<R> = {}, - ) { - initializeReadableStream(this); - const { size } = strategy; - let { highWaterMark } = strategy; - const { type } = underlyingSource; - - if (isUnderlyingByteSource(underlyingSource)) { - if (size !== undefined) { - throw new RangeError( - `When underlying source is "bytes", strategy.size must be undefined.`, - ); - } - highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark ?? 0); - setUpReadableByteStreamControllerFromUnderlyingSource( - this, - underlyingSource, - highWaterMark, - ); - } else if (type === undefined) { - const sizeAlgorithm = makeSizeAlgorithmFromSizeFunction(size); - highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark ?? 1); - setUpReadableStreamDefaultControllerFromUnderlyingSource( - this, - underlyingSource, - highWaterMark, - sizeAlgorithm, - ); - } else { - throw new RangeError( - `Valid values for underlyingSource are "bytes" or undefined. Received: "${type}".`, - ); - } - } - - get locked(): boolean { - if (!isReadableStream(this)) { - throw new TypeError("Invalid ReadableStream."); - } - return isReadableStreamLocked(this); - } - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - cancel(reason?: any): Promise<void> { - if (!isReadableStream(this)) { - return Promise.reject(new TypeError("Invalid ReadableStream.")); - } - if (isReadableStreamLocked(this)) { - return Promise.reject( - new TypeError("Cannot cancel a locked ReadableStream."), - ); - } - return readableStreamCancel(this, reason); - } - - getIterator({ - preventCancel, - }: { preventCancel?: boolean } = {}): AsyncIterableIterator<R> { - if (!isReadableStream(this)) { - throw new TypeError("Invalid ReadableStream."); - } - const reader = acquireReadableStreamDefaultReader(this); - const iterator = Object.create(ReadableStreamAsyncIteratorPrototype); - iterator[sym.asyncIteratorReader] = reader; - iterator[sym.preventCancel] = Boolean(preventCancel); - return iterator; - } - - getReader({ mode }: { mode?: string } = {}): ReadableStreamDefaultReader<R> { - if (!isReadableStream(this)) { - throw new TypeError("Invalid ReadableStream."); - } - if (mode === undefined) { - return acquireReadableStreamDefaultReader(this, true); - } - mode = String(mode); - // 3.2.5.4.4 If mode is "byob", return ? AcquireReadableStreamBYOBReader(this, true). - throw new RangeError(`Unsupported mode "${mode}"`); - } - - pipeThrough<T>( - { - writable, - readable, - }: { - writable: WritableStream<R>; - readable: ReadableStream<T>; - }, - { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {}, - ): ReadableStream<T> { - if (!isReadableStream(this)) { - throw new TypeError("Invalid ReadableStream."); - } - if (!isWritableStream(writable)) { - throw new TypeError("writable is not a valid WritableStream."); - } - if (!isReadableStream(readable)) { - throw new TypeError("readable is not a valid ReadableStream."); - } - preventClose = Boolean(preventClose); - preventAbort = Boolean(preventAbort); - preventCancel = Boolean(preventCancel); - if (signal && !(signal instanceof AbortSignalImpl)) { - throw new TypeError("Invalid signal."); - } - if (isReadableStreamLocked(this)) { - throw new TypeError("ReadableStream is locked."); - } - if (isWritableStreamLocked(writable)) { - throw new TypeError("writable is locked."); - } - const promise = readableStreamPipeTo( - this, - writable, - preventClose, - preventAbort, - preventCancel, - signal, - ); - setPromiseIsHandledToTrue(promise); - return readable; - } - - pipeTo( - dest: WritableStream<R>, - { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {}, - ): Promise<void> { - if (!isReadableStream(this)) { - return Promise.reject(new TypeError("Invalid ReadableStream.")); - } - if (!isWritableStream(dest)) { - return Promise.reject( - new TypeError("dest is not a valid WritableStream."), - ); - } - preventClose = Boolean(preventClose); - preventAbort = Boolean(preventAbort); - preventCancel = Boolean(preventCancel); - if (signal && !(signal instanceof AbortSignalImpl)) { - return Promise.reject(new TypeError("Invalid signal.")); - } - if (isReadableStreamLocked(this)) { - return Promise.reject(new TypeError("ReadableStream is locked.")); - } - if (isWritableStreamLocked(dest)) { - return Promise.reject(new TypeError("dest is locked.")); - } - return readableStreamPipeTo( - this, - dest, - preventClose, - preventAbort, - preventCancel, - signal, - ); - } - - tee(): [ReadableStreamImpl<R>, ReadableStreamImpl<R>] { - if (!isReadableStream(this)) { - throw new TypeError("Invalid ReadableStream."); - } - return readableStreamTee(this, false); - } - - [customInspect](): string { - return `${this.constructor.name} { locked: ${String(this.locked)} }`; - } - - [Symbol.asyncIterator]( - options: { - preventCancel?: boolean; - } = {}, - ): AsyncIterableIterator<R> { - return this.getIterator(options); - } -} - -setFunctionName(ReadableStreamImpl, "ReadableStream"); diff --git a/cli/js/web/streams/readable_stream_async_iterator.ts b/cli/js/web/streams/readable_stream_async_iterator.ts deleted file mode 100644 index c6b9759a5..000000000 --- a/cli/js/web/streams/readable_stream_async_iterator.ts +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -import * as sym from "./symbols.ts"; -import { - isReadableStreamAsyncIterator, - ReadableStreamAsyncIterator, - readableStreamCreateReadResult, - readableStreamReaderGenericCancel, - readableStreamReaderGenericRelease, - readableStreamDefaultReaderRead, -} from "./internals.ts"; -import { assert } from "../../util.ts"; - -// eslint-disable-next-line @typescript-eslint/no-explicit-any -const AsyncIteratorPrototype: AsyncIterableIterator<any> = Object - .getPrototypeOf(Object.getPrototypeOf(async function* () {}).prototype); - -export const ReadableStreamAsyncIteratorPrototype: ReadableStreamAsyncIterator = - Object.setPrototypeOf({ - next( - this: ReadableStreamAsyncIterator, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - ): Promise<ReadableStreamReadResult<any>> { - if (!isReadableStreamAsyncIterator(this)) { - return Promise.reject( - new TypeError("invalid ReadableStreamAsyncIterator."), - ); - } - const reader = this[sym.asyncIteratorReader]; - if (!reader[sym.ownerReadableStream]) { - return Promise.reject( - new TypeError("reader owner ReadableStream is undefined."), - ); - } - return readableStreamDefaultReaderRead(reader).then((result) => { - assert(typeof result === "object"); - const { done } = result; - assert(typeof done === "boolean"); - if (done) { - readableStreamReaderGenericRelease(reader); - } - const { value } = result; - return readableStreamCreateReadResult(value, done, true); - }); - }, - return( - this: ReadableStreamAsyncIterator, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - value?: any | PromiseLike<any>, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - ): Promise<ReadableStreamReadResult<any>> { - if (!isReadableStreamAsyncIterator(this)) { - return Promise.reject( - new TypeError("invalid ReadableStreamAsyncIterator."), - ); - } - const reader = this[sym.asyncIteratorReader]; - if (!reader[sym.ownerReadableStream]) { - return Promise.reject( - new TypeError("reader owner ReadableStream is undefined."), - ); - } - if (reader[sym.readRequests].length) { - return Promise.reject( - new TypeError("reader has outstanding read requests."), - ); - } - if (!this[sym.preventCancel]) { - const result = readableStreamReaderGenericCancel(reader, value); - readableStreamReaderGenericRelease(reader); - return result.then(() => - readableStreamCreateReadResult(value, true, true) - ); - } - readableStreamReaderGenericRelease(reader); - return Promise.resolve( - readableStreamCreateReadResult(value, true, true), - ); - }, - }, AsyncIteratorPrototype); diff --git a/cli/js/web/streams/readable_stream_default_controller.ts b/cli/js/web/streams/readable_stream_default_controller.ts deleted file mode 100644 index 8536712b9..000000000 --- a/cli/js/web/streams/readable_stream_default_controller.ts +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -import { - CancelAlgorithm, - dequeueValue, - isReadableStreamDefaultController, - Pair, - PullAlgorithm, - readableStreamAddReadRequest, - readableStreamClose, - readableStreamCreateReadResult, - readableStreamDefaultControllerCallPullIfNeeded, - readableStreamDefaultControllerCanCloseOrEnqueue, - readableStreamDefaultControllerClearAlgorithms, - readableStreamDefaultControllerClose, - readableStreamDefaultControllerEnqueue, - readableStreamDefaultControllerError, - readableStreamDefaultControllerGetDesiredSize, - resetQueue, - SizeAlgorithm, -} from "./internals.ts"; -import type { ReadableStreamImpl } from "./readable_stream.ts"; -import * as sym from "./symbols.ts"; -import { customInspect } from "../console.ts"; -import { setFunctionName } from "../util.ts"; - -// eslint-disable-next-line @typescript-eslint/no-explicit-any -export class ReadableStreamDefaultControllerImpl<R = any> - implements ReadableStreamDefaultController<R> { - [sym.cancelAlgorithm]: CancelAlgorithm; - [sym.closeRequested]: boolean; - [sym.controlledReadableStream]: ReadableStreamImpl<R>; - [sym.pullAgain]: boolean; - [sym.pullAlgorithm]: PullAlgorithm; - [sym.pulling]: boolean; - [sym.queue]: Array<Pair<R>>; - [sym.queueTotalSize]: number; - [sym.started]: boolean; - [sym.strategyHWM]: number; - [sym.strategySizeAlgorithm]: SizeAlgorithm<R>; - - private constructor() { - throw new TypeError( - "ReadableStreamDefaultController's constructor cannot be called.", - ); - } - - get desiredSize(): number | null { - if (!isReadableStreamDefaultController(this)) { - throw new TypeError("Invalid ReadableStreamDefaultController."); - } - return readableStreamDefaultControllerGetDesiredSize(this); - } - - close(): void { - if (!isReadableStreamDefaultController(this)) { - throw new TypeError("Invalid ReadableStreamDefaultController."); - } - if (!readableStreamDefaultControllerCanCloseOrEnqueue(this)) { - throw new TypeError( - "ReadableStreamDefaultController cannot close or enqueue.", - ); - } - readableStreamDefaultControllerClose(this); - } - - enqueue(chunk: R): void { - if (!isReadableStreamDefaultController(this)) { - throw new TypeError("Invalid ReadableStreamDefaultController."); - } - if (!readableStreamDefaultControllerCanCloseOrEnqueue(this)) { - throw new TypeError("ReadableSteamController cannot enqueue."); - } - return readableStreamDefaultControllerEnqueue(this, chunk); - } - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - error(error?: any): void { - if (!isReadableStreamDefaultController(this)) { - throw new TypeError("Invalid ReadableStreamDefaultController."); - } - readableStreamDefaultControllerError(this, error); - } - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - [sym.cancelSteps](reason?: any): PromiseLike<void> { - resetQueue(this); - const result = this[sym.cancelAlgorithm](reason); - readableStreamDefaultControllerClearAlgorithms(this); - return result; - } - - [sym.pullSteps](): Promise<ReadableStreamReadResult<R>> { - const stream = this[sym.controlledReadableStream]; - if (this[sym.queue].length) { - const chunk = dequeueValue<R>(this); - if (this[sym.closeRequested] && this[sym.queue].length === 0) { - readableStreamDefaultControllerClearAlgorithms(this); - readableStreamClose(stream); - } else { - readableStreamDefaultControllerCallPullIfNeeded(this); - } - return Promise.resolve( - readableStreamCreateReadResult( - chunk, - false, - stream[sym.reader]![sym.forAuthorCode], - ), - ); - } - const pendingPromise = readableStreamAddReadRequest(stream); - readableStreamDefaultControllerCallPullIfNeeded(this); - return pendingPromise; - } - - [customInspect](): string { - return `${this.constructor.name} { desiredSize: ${ - String(this.desiredSize) - } }`; - } -} - -setFunctionName( - ReadableStreamDefaultControllerImpl, - "ReadableStreamDefaultController", -); diff --git a/cli/js/web/streams/readable_stream_default_reader.ts b/cli/js/web/streams/readable_stream_default_reader.ts deleted file mode 100644 index 88260effb..000000000 --- a/cli/js/web/streams/readable_stream_default_reader.ts +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -import { - Deferred, - isReadableStream, - isReadableStreamDefaultReader, - isReadableStreamLocked, - readableStreamDefaultReaderRead, - readableStreamReaderGenericCancel, - readableStreamReaderGenericInitialize, - readableStreamReaderGenericRelease, -} from "./internals.ts"; -import type { ReadableStreamImpl } from "./readable_stream.ts"; -import * as sym from "./symbols.ts"; -import { customInspect } from "../console.ts"; -import { setFunctionName } from "../util.ts"; - -// eslint-disable-next-line @typescript-eslint/no-explicit-any -export class ReadableStreamDefaultReaderImpl<R = any> - implements ReadableStreamDefaultReader<R> { - [sym.closedPromise]: Deferred<void>; - [sym.forAuthorCode]: boolean; - [sym.ownerReadableStream]: ReadableStreamImpl<R>; - [sym.readRequests]: Array<Deferred<ReadableStreamReadResult<R>>>; - - constructor(stream: ReadableStream<R>) { - if (!isReadableStream(stream)) { - throw new TypeError("stream is not a ReadableStream."); - } - if (isReadableStreamLocked(stream)) { - throw new TypeError("stream is locked."); - } - readableStreamReaderGenericInitialize(this, stream); - this[sym.readRequests] = []; - } - - get closed(): Promise<void> { - if (!isReadableStreamDefaultReader(this)) { - return Promise.reject( - new TypeError("Invalid ReadableStreamDefaultReader."), - ); - } - return ( - this[sym.closedPromise].promise ?? - Promise.reject(new TypeError("Invalid reader.")) - ); - } - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - cancel(reason?: any): Promise<void> { - if (!isReadableStreamDefaultReader(this)) { - return Promise.reject( - new TypeError("Invalid ReadableStreamDefaultReader."), - ); - } - if (!this[sym.ownerReadableStream]) { - return Promise.reject(new TypeError("Invalid reader.")); - } - return readableStreamReaderGenericCancel(this, reason); - } - - read(): Promise<ReadableStreamReadResult<R>> { - if (!isReadableStreamDefaultReader(this)) { - return Promise.reject( - new TypeError("Invalid ReadableStreamDefaultReader."), - ); - } - if (!this[sym.ownerReadableStream]) { - return Promise.reject(new TypeError("Invalid reader.")); - } - return readableStreamDefaultReaderRead(this); - } - - releaseLock(): void { - if (!isReadableStreamDefaultReader(this)) { - throw new TypeError("Invalid ReadableStreamDefaultReader."); - } - if (this[sym.ownerReadableStream] === undefined) { - return; - } - if (this[sym.readRequests].length) { - throw new TypeError("Cannot release lock with pending read requests."); - } - readableStreamReaderGenericRelease(this); - } - - [customInspect](): string { - return `${this.constructor.name} { closed: Promise }`; - } -} - -setFunctionName(ReadableStreamDefaultReaderImpl, "ReadableStreamDefaultReader"); diff --git a/cli/js/web/streams/symbols.ts b/cli/js/web/streams/symbols.ts deleted file mode 100644 index 82d668598..000000000 --- a/cli/js/web/streams/symbols.ts +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -// The specification refers to internal slots. In most cases, ECMAScript -// Private Fields are not sufficient for these, as they are often accessed -// outside of the class itself and using a WeakMap gets really complex to hide -// this data from the public, therefore we will use unique symbols which are -// not available in the runtime. - -export const abortAlgorithm = Symbol("abortAlgorithm"); -export const abortSteps = Symbol("abortSteps"); -export const asyncIteratorReader = Symbol("asyncIteratorReader"); -export const autoAllocateChunkSize = Symbol("autoAllocateChunkSize"); -export const backpressure = Symbol("backpressure"); -export const backpressureChangePromise = Symbol("backpressureChangePromise"); -export const byobRequest = Symbol("byobRequest"); -export const cancelAlgorithm = Symbol("cancelAlgorithm"); -export const cancelSteps = Symbol("cancelSteps"); -export const closeAlgorithm = Symbol("closeAlgorithm"); -export const closedPromise = Symbol("closedPromise"); -export const closeRequest = Symbol("closeRequest"); -export const closeRequested = Symbol("closeRequested"); -export const controlledReadableByteStream = Symbol( - "controlledReadableByteStream", -); -export const controlledReadableStream = Symbol("controlledReadableStream"); -export const controlledTransformStream = Symbol("controlledTransformStream"); -export const controlledWritableStream = Symbol("controlledWritableStream"); -export const disturbed = Symbol("disturbed"); -export const errorSteps = Symbol("errorSteps"); -export const flushAlgorithm = Symbol("flushAlgorithm"); -export const forAuthorCode = Symbol("forAuthorCode"); -export const inFlightWriteRequest = Symbol("inFlightWriteRequest"); -export const inFlightCloseRequest = Symbol("inFlightCloseRequest"); -export const isFakeDetached = Symbol("isFakeDetached"); -export const ownerReadableStream = Symbol("ownerReadableStream"); -export const ownerWritableStream = Symbol("ownerWritableStream"); -export const pendingAbortRequest = Symbol("pendingAbortRequest"); -export const preventCancel = Symbol("preventCancel"); -export const pullAgain = Symbol("pullAgain"); -export const pullAlgorithm = Symbol("pullAlgorithm"); -export const pulling = Symbol("pulling"); -export const pullSteps = Symbol("pullSteps"); -export const queue = Symbol("queue"); -export const queueTotalSize = Symbol("queueTotalSize"); -export const readable = Symbol("readable"); -export const readableStreamController = Symbol("readableStreamController"); -export const reader = Symbol("reader"); -export const readRequests = Symbol("readRequests"); -export const readyPromise = Symbol("readyPromise"); -export const started = Symbol("started"); -export const state = Symbol("state"); -export const storedError = Symbol("storedError"); -export const strategyHWM = Symbol("strategyHWM"); -export const strategySizeAlgorithm = Symbol("strategySizeAlgorithm"); -export const transformAlgorithm = Symbol("transformAlgorithm"); -export const transformStreamController = Symbol("transformStreamController"); -export const writableStreamController = Symbol("writableStreamController"); -export const writeAlgorithm = Symbol("writeAlgorithm"); -export const writable = Symbol("writable"); -export const writer = Symbol("writer"); -export const writeRequests = Symbol("writeRequests"); diff --git a/cli/js/web/streams/transform_stream.ts b/cli/js/web/streams/transform_stream.ts deleted file mode 100644 index f6924aead..000000000 --- a/cli/js/web/streams/transform_stream.ts +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -import { - Deferred, - getDeferred, - initializeTransformStream, - invokeOrNoop, - isTransformStream, - makeSizeAlgorithmFromSizeFunction, - setUpTransformStreamDefaultControllerFromTransformer, - validateAndNormalizeHighWaterMark, -} from "./internals.ts"; -import type { ReadableStreamImpl } from "./readable_stream.ts"; -import * as sym from "./symbols.ts"; -import type { TransformStreamDefaultControllerImpl } from "./transform_stream_default_controller.ts"; -import type { WritableStreamImpl } from "./writable_stream.ts"; -import { customInspect, inspect } from "../console.ts"; -import { setFunctionName } from "../util.ts"; - -// eslint-disable-next-line @typescript-eslint/no-explicit-any -export class TransformStreamImpl<I = any, O = any> - implements TransformStream<I, O> { - [sym.backpressure]?: boolean; - [sym.backpressureChangePromise]?: Deferred<void>; - [sym.readable]: ReadableStreamImpl<O>; - [sym.transformStreamController]: TransformStreamDefaultControllerImpl<I, O>; - [sym.writable]: WritableStreamImpl<I>; - - constructor( - transformer: Transformer<I, O> = {}, - writableStrategy: QueuingStrategy<I> = {}, - readableStrategy: QueuingStrategy<O> = {}, - ) { - const writableSizeFunction = writableStrategy.size; - let writableHighWaterMark = writableStrategy.highWaterMark; - const readableSizeFunction = readableStrategy.size; - let readableHighWaterMark = readableStrategy.highWaterMark; - const writableType = transformer.writableType; - if (writableType !== undefined) { - throw new RangeError( - `Expected transformer writableType to be undefined, received "${ - String(writableType) - }"`, - ); - } - const writableSizeAlgorithm = makeSizeAlgorithmFromSizeFunction( - writableSizeFunction, - ); - if (writableHighWaterMark === undefined) { - writableHighWaterMark = 1; - } - writableHighWaterMark = validateAndNormalizeHighWaterMark( - writableHighWaterMark, - ); - const readableType = transformer.readableType; - if (readableType !== undefined) { - throw new RangeError( - `Expected transformer readableType to be undefined, received "${ - String(readableType) - }"`, - ); - } - const readableSizeAlgorithm = makeSizeAlgorithmFromSizeFunction( - readableSizeFunction, - ); - if (readableHighWaterMark === undefined) { - readableHighWaterMark = 1; - } - readableHighWaterMark = validateAndNormalizeHighWaterMark( - readableHighWaterMark, - ); - const startPromise = getDeferred<void>(); - initializeTransformStream( - this, - startPromise.promise, - writableHighWaterMark, - writableSizeAlgorithm, - readableHighWaterMark, - readableSizeAlgorithm, - ); - // the brand check expects this, and the brand check occurs in the following - // but the property hasn't been defined. - Object.defineProperty(this, sym.transformStreamController, { - value: undefined, - writable: true, - configurable: true, - }); - setUpTransformStreamDefaultControllerFromTransformer(this, transformer); - const startResult: void | PromiseLike<void> = invokeOrNoop( - transformer, - "start", - this[sym.transformStreamController], - ); - startPromise.resolve(startResult); - } - - get readable(): ReadableStream<O> { - if (!isTransformStream(this)) { - throw new TypeError("Invalid TransformStream."); - } - return this[sym.readable]; - } - - get writable(): WritableStream<I> { - if (!isTransformStream(this)) { - throw new TypeError("Invalid TransformStream."); - } - return this[sym.writable]; - } - - [customInspect](): string { - return `${this.constructor.name} {\n readable: ${ - inspect(this.readable) - }\n writable: ${inspect(this.writable)}\n}`; - } -} - -setFunctionName(TransformStreamImpl, "TransformStream"); diff --git a/cli/js/web/streams/transform_stream_default_controller.ts b/cli/js/web/streams/transform_stream_default_controller.ts deleted file mode 100644 index 54f1e9f2d..000000000 --- a/cli/js/web/streams/transform_stream_default_controller.ts +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -import { - FlushAlgorithm, - isTransformStreamDefaultController, - readableStreamDefaultControllerGetDesiredSize, - TransformAlgorithm, - transformStreamDefaultControllerEnqueue, - transformStreamDefaultControllerError, - transformStreamDefaultControllerTerminate, -} from "./internals.ts"; -import type { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts"; -import * as sym from "./symbols.ts"; -import type { TransformStreamImpl } from "./transform_stream.ts"; -import { customInspect } from "../console.ts"; -import { setFunctionName } from "../util.ts"; - -// eslint-disable-next-line @typescript-eslint/no-explicit-any -export class TransformStreamDefaultControllerImpl<I = any, O = any> - implements TransformStreamDefaultController<O> { - [sym.controlledTransformStream]: TransformStreamImpl<I, O>; - [sym.flushAlgorithm]: FlushAlgorithm; - [sym.transformAlgorithm]: TransformAlgorithm<I>; - - private constructor() { - throw new TypeError( - "TransformStreamDefaultController's constructor cannot be called.", - ); - } - - get desiredSize(): number | null { - if (!isTransformStreamDefaultController(this)) { - throw new TypeError("Invalid TransformStreamDefaultController."); - } - const readableController = this[sym.controlledTransformStream][ - sym.readable - ][sym.readableStreamController]; - return readableStreamDefaultControllerGetDesiredSize( - readableController as ReadableStreamDefaultControllerImpl<O>, - ); - } - - enqueue(chunk: O): void { - if (!isTransformStreamDefaultController(this)) { - throw new TypeError("Invalid TransformStreamDefaultController."); - } - transformStreamDefaultControllerEnqueue(this, chunk); - } - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - error(reason?: any): void { - if (!isTransformStreamDefaultController(this)) { - throw new TypeError("Invalid TransformStreamDefaultController."); - } - transformStreamDefaultControllerError(this, reason); - } - - terminate(): void { - if (!isTransformStreamDefaultController(this)) { - throw new TypeError("Invalid TransformStreamDefaultController."); - } - transformStreamDefaultControllerTerminate(this); - } - - [customInspect](): string { - return `${this.constructor.name} { desiredSize: ${ - String(this.desiredSize) - } }`; - } -} - -setFunctionName( - TransformStreamDefaultControllerImpl, - "TransformStreamDefaultController", -); diff --git a/cli/js/web/streams/writable_stream.ts b/cli/js/web/streams/writable_stream.ts deleted file mode 100644 index 94bedb941..000000000 --- a/cli/js/web/streams/writable_stream.ts +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -import { - AbortRequest, - acquireWritableStreamDefaultWriter, - Deferred, - initializeWritableStream, - isWritableStream, - isWritableStreamLocked, - makeSizeAlgorithmFromSizeFunction, - setUpWritableStreamDefaultControllerFromUnderlyingSink, - writableStreamAbort, - writableStreamClose, - writableStreamCloseQueuedOrInFlight, - validateAndNormalizeHighWaterMark, -} from "./internals.ts"; -import * as sym from "./symbols.ts"; -import type { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts"; -import type { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts"; -import { customInspect } from "../console.ts"; -import { setFunctionName } from "../util.ts"; - -// eslint-disable-next-line @typescript-eslint/no-explicit-any -export class WritableStreamImpl<W = any> implements WritableStream<W> { - [sym.backpressure]: boolean; - [sym.closeRequest]?: Deferred<void>; - [sym.inFlightWriteRequest]?: Required<Deferred<void>>; - [sym.inFlightCloseRequest]?: Deferred<void>; - [sym.pendingAbortRequest]?: AbortRequest; - [sym.state]: "writable" | "closed" | "erroring" | "errored"; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - [sym.storedError]?: any; - [sym.writableStreamController]?: WritableStreamDefaultControllerImpl<W>; - [sym.writer]?: WritableStreamDefaultWriterImpl<W>; - [sym.writeRequests]: Array<Required<Deferred<void>>>; - - constructor( - underlyingSink: UnderlyingSink = {}, - strategy: QueuingStrategy = {}, - ) { - initializeWritableStream(this); - const size = strategy.size; - let highWaterMark = strategy.highWaterMark ?? 1; - const { type } = underlyingSink; - if (type !== undefined) { - throw new RangeError(`Sink type of "${String(type)}" not supported.`); - } - const sizeAlgorithm = makeSizeAlgorithmFromSizeFunction(size); - highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark); - setUpWritableStreamDefaultControllerFromUnderlyingSink( - this, - underlyingSink, - highWaterMark, - sizeAlgorithm, - ); - } - - get locked(): boolean { - if (!isWritableStream(this)) { - throw new TypeError("Invalid WritableStream."); - } - return isWritableStreamLocked(this); - } - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - abort(reason: any): Promise<void> { - if (!isWritableStream(this)) { - return Promise.reject(new TypeError("Invalid WritableStream.")); - } - if (isWritableStreamLocked(this)) { - return Promise.reject( - new TypeError("Cannot abort a locked WritableStream."), - ); - } - return writableStreamAbort(this, reason); - } - - close(): Promise<void> { - if (!isWritableStream(this)) { - return Promise.reject(new TypeError("Invalid WritableStream.")); - } - if (isWritableStreamLocked(this)) { - return Promise.reject( - new TypeError("Cannot abort a locked WritableStream."), - ); - } - if (writableStreamCloseQueuedOrInFlight(this)) { - return Promise.reject( - new TypeError("Cannot close an already closing WritableStream."), - ); - } - return writableStreamClose(this); - } - - getWriter(): WritableStreamDefaultWriter<W> { - if (!isWritableStream(this)) { - throw new TypeError("Invalid WritableStream."); - } - return acquireWritableStreamDefaultWriter(this); - } - - [customInspect](): string { - return `${this.constructor.name} { locked: ${String(this.locked)} }`; - } -} - -setFunctionName(WritableStreamImpl, "WritableStream"); diff --git a/cli/js/web/streams/writable_stream_default_controller.ts b/cli/js/web/streams/writable_stream_default_controller.ts deleted file mode 100644 index 960060b8f..000000000 --- a/cli/js/web/streams/writable_stream_default_controller.ts +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -import { - AbortAlgorithm, - CloseAlgorithm, - isWritableStreamDefaultController, - Pair, - resetQueue, - SizeAlgorithm, - WriteAlgorithm, - writableStreamDefaultControllerClearAlgorithms, - writableStreamDefaultControllerError, -} from "./internals.ts"; -import * as sym from "./symbols.ts"; -import type { WritableStreamImpl } from "./writable_stream.ts"; -import { customInspect } from "../console.ts"; -import { setFunctionName } from "../util.ts"; - -export class WritableStreamDefaultControllerImpl<W> - implements WritableStreamDefaultController { - [sym.abortAlgorithm]: AbortAlgorithm; - [sym.closeAlgorithm]: CloseAlgorithm; - [sym.controlledWritableStream]: WritableStreamImpl; - [sym.queue]: Array<Pair<{ chunk: W } | "close">>; - [sym.queueTotalSize]: number; - [sym.started]: boolean; - [sym.strategyHWM]: number; - [sym.strategySizeAlgorithm]: SizeAlgorithm<W>; - [sym.writeAlgorithm]: WriteAlgorithm<W>; - - private constructor() { - throw new TypeError( - "WritableStreamDefaultController's constructor cannot be called.", - ); - } - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - error(e: any): void { - if (!isWritableStreamDefaultController(this)) { - throw new TypeError("Invalid WritableStreamDefaultController."); - } - const state = this[sym.controlledWritableStream][sym.state]; - if (state !== "writable") { - return; - } - writableStreamDefaultControllerError(this, e); - } - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - [sym.abortSteps](reason: any): PromiseLike<void> { - const result = this[sym.abortAlgorithm](reason); - writableStreamDefaultControllerClearAlgorithms(this); - return result; - } - - [sym.errorSteps](): void { - resetQueue(this); - } - - [customInspect](): string { - return `${this.constructor.name} { }`; - } -} - -setFunctionName( - WritableStreamDefaultControllerImpl, - "WritableStreamDefaultController", -); diff --git a/cli/js/web/streams/writable_stream_default_writer.ts b/cli/js/web/streams/writable_stream_default_writer.ts deleted file mode 100644 index 34e664e95..000000000 --- a/cli/js/web/streams/writable_stream_default_writer.ts +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -import { - Deferred, - getDeferred, - isWritableStream, - isWritableStreamDefaultWriter, - isWritableStreamLocked, - setPromiseIsHandledToTrue, - writableStreamCloseQueuedOrInFlight, - writableStreamDefaultWriterAbort, - writableStreamDefaultWriterClose, - writableStreamDefaultWriterGetDesiredSize, - writableStreamDefaultWriterRelease, - writableStreamDefaultWriterWrite, -} from "./internals.ts"; -import * as sym from "./symbols.ts"; -import type { WritableStreamImpl } from "./writable_stream.ts"; -import { customInspect } from "../console.ts"; -import { assert } from "../../util.ts"; -import { setFunctionName } from "../util.ts"; - -export class WritableStreamDefaultWriterImpl<W> - implements WritableStreamDefaultWriter<W> { - [sym.closedPromise]: Deferred<void>; - [sym.ownerWritableStream]: WritableStreamImpl<W>; - [sym.readyPromise]: Deferred<void>; - - constructor(stream: WritableStreamImpl<W>) { - if (!isWritableStream(stream)) { - throw new TypeError("Invalid stream."); - } - if (isWritableStreamLocked(stream)) { - throw new TypeError("Cannot create a writer for a locked stream."); - } - this[sym.ownerWritableStream] = stream; - stream[sym.writer] = this; - const state = stream[sym.state]; - if (state === "writable") { - if ( - !writableStreamCloseQueuedOrInFlight(stream) && - stream[sym.backpressure] - ) { - this[sym.readyPromise] = getDeferred(); - } else { - this[sym.readyPromise] = { promise: Promise.resolve() }; - } - this[sym.closedPromise] = getDeferred(); - } else if (state === "erroring") { - this[sym.readyPromise] = { - promise: Promise.reject(stream[sym.storedError]), - }; - setPromiseIsHandledToTrue(this[sym.readyPromise].promise); - this[sym.closedPromise] = getDeferred(); - } else if (state === "closed") { - this[sym.readyPromise] = { promise: Promise.resolve() }; - this[sym.closedPromise] = { promise: Promise.resolve() }; - } else { - assert(state === "errored"); - const storedError = stream[sym.storedError]; - this[sym.readyPromise] = { promise: Promise.reject(storedError) }; - setPromiseIsHandledToTrue(this[sym.readyPromise].promise); - this[sym.closedPromise] = { promise: Promise.reject(storedError) }; - setPromiseIsHandledToTrue(this[sym.closedPromise].promise); - } - } - - get closed(): Promise<void> { - if (!isWritableStreamDefaultWriter(this)) { - return Promise.reject( - new TypeError("Invalid WritableStreamDefaultWriter."), - ); - } - return this[sym.closedPromise].promise; - } - - get desiredSize(): number | null { - if (!isWritableStreamDefaultWriter(this)) { - throw new TypeError("Invalid WritableStreamDefaultWriter."); - } - if (!this[sym.ownerWritableStream]) { - throw new TypeError("WritableStreamDefaultWriter has no owner."); - } - return writableStreamDefaultWriterGetDesiredSize(this); - } - - get ready(): Promise<void> { - if (!isWritableStreamDefaultWriter(this)) { - return Promise.reject( - new TypeError("Invalid WritableStreamDefaultWriter."), - ); - } - return this[sym.readyPromise].promise; - } - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - abort(reason: any): Promise<void> { - if (!isWritableStreamDefaultWriter(this)) { - return Promise.reject( - new TypeError("Invalid WritableStreamDefaultWriter."), - ); - } - if (!this[sym.ownerWritableStream]) { - Promise.reject( - new TypeError("WritableStreamDefaultWriter has no owner."), - ); - } - return writableStreamDefaultWriterAbort(this, reason); - } - - close(): Promise<void> { - if (!isWritableStreamDefaultWriter(this)) { - return Promise.reject( - new TypeError("Invalid WritableStreamDefaultWriter."), - ); - } - const stream = this[sym.ownerWritableStream]; - if (!stream) { - Promise.reject( - new TypeError("WritableStreamDefaultWriter has no owner."), - ); - } - if (writableStreamCloseQueuedOrInFlight(stream)) { - Promise.reject( - new TypeError("Stream is in an invalid state to be closed."), - ); - } - return writableStreamDefaultWriterClose(this); - } - - releaseLock(): void { - if (!isWritableStreamDefaultWriter(this)) { - throw new TypeError("Invalid WritableStreamDefaultWriter."); - } - const stream = this[sym.ownerWritableStream]; - if (!stream) { - return; - } - assert(stream[sym.writer]); - writableStreamDefaultWriterRelease(this); - } - - write(chunk: W): Promise<void> { - if (!isWritableStreamDefaultWriter(this)) { - return Promise.reject( - new TypeError("Invalid WritableStreamDefaultWriter."), - ); - } - if (!this[sym.ownerWritableStream]) { - Promise.reject( - new TypeError("WritableStreamDefaultWriter has no owner."), - ); - } - return writableStreamDefaultWriterWrite(this, chunk); - } - - [customInspect](): string { - return `${this.constructor.name} { closed: Promise, desiredSize: ${ - String(this.desiredSize) - }, ready: Promise }`; - } -} - -setFunctionName(WritableStreamDefaultWriterImpl, "WritableStreamDefaultWriter"); |