diff options
Diffstat (limited to 'cli/js/streams/readable-internals.ts')
-rw-r--r-- | cli/js/streams/readable-internals.ts | 1357 |
1 files changed, 1357 insertions, 0 deletions
diff --git a/cli/js/streams/readable-internals.ts b/cli/js/streams/readable-internals.ts new file mode 100644 index 000000000..36f4223d7 --- /dev/null +++ b/cli/js/streams/readable-internals.ts @@ -0,0 +1,1357 @@ +// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 +// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT + +/** + * streams/readable-internals - internal types and functions for readable streams + * Part of Stardazed + * (c) 2018-Present by Arthur Langereis - @zenmumbler + * https://github.com/stardazed/sd-streams + */ + +/* eslint-disable @typescript-eslint/no-explicit-any */ +// TODO reenable this lint here + +import * as shared from "./shared-internals.ts"; +import * as q from "./queue-mixin.ts"; +import { + QueuingStrategy, + QueuingStrategySizeCallback, + UnderlyingSource, + UnderlyingByteSource +} from "../dom_types.ts"; + +// ReadableStreamDefaultController +export const controlledReadableStream_ = Symbol("controlledReadableStream_"); +export const pullAlgorithm_ = Symbol("pullAlgorithm_"); +export const cancelAlgorithm_ = Symbol("cancelAlgorithm_"); +export const strategySizeAlgorithm_ = Symbol("strategySizeAlgorithm_"); +export const strategyHWM_ = Symbol("strategyHWM_"); +export const started_ = Symbol("started_"); +export const closeRequested_ = Symbol("closeRequested_"); +export const pullAgain_ = Symbol("pullAgain_"); +export const pulling_ = Symbol("pulling_"); +export const cancelSteps_ = Symbol("cancelSteps_"); +export const pullSteps_ = Symbol("pullSteps_"); + +// ReadableByteStreamController +export const autoAllocateChunkSize_ = Symbol("autoAllocateChunkSize_"); +export const byobRequest_ = Symbol("byobRequest_"); +export const controlledReadableByteStream_ = Symbol( + "controlledReadableByteStream_" +); +export const pendingPullIntos_ = Symbol("pendingPullIntos_"); + +// ReadableStreamDefaultReader +export const closedPromise_ = Symbol("closedPromise_"); +export const ownerReadableStream_ = Symbol("ownerReadableStream_"); +export const readRequests_ = Symbol("readRequests_"); +export const readIntoRequests_ = Symbol("readIntoRequests_"); + +// ReadableStreamBYOBRequest +export const associatedReadableByteStreamController_ = Symbol( + "associatedReadableByteStreamController_" +); +export const view_ = Symbol("view_"); + +// ReadableStreamBYOBReader + +// ReadableStream +export const reader_ = Symbol("reader_"); +export const readableStreamController_ = Symbol("readableStreamController_"); + +export type StartFunction<OutputType> = ( + controller: SDReadableStreamControllerBase<OutputType> +) => void | PromiseLike<void>; +export type StartAlgorithm = () => Promise<void> | void; +export type PullFunction<OutputType> = ( + controller: SDReadableStreamControllerBase<OutputType> +) => void | PromiseLike<void>; +export type PullAlgorithm<OutputType> = ( + controller: SDReadableStreamControllerBase<OutputType> +) => PromiseLike<void>; +export type CancelAlgorithm = (reason?: shared.ErrorResult) => Promise<void>; + +// ---- + +export interface SDReadableStreamControllerBase<OutputType> { + readonly desiredSize: number | null; + close(): void; + error(e?: shared.ErrorResult): void; + + [cancelSteps_](reason: shared.ErrorResult): Promise<void>; + [pullSteps_](forAuthorCode: boolean): Promise<IteratorResult<OutputType>>; +} + +export interface SDReadableStreamBYOBRequest { + readonly view: ArrayBufferView; + respond(bytesWritten: number): void; + respondWithNewView(view: ArrayBufferView): void; + + [associatedReadableByteStreamController_]: + | SDReadableByteStreamController + | undefined; + [view_]: ArrayBufferView | undefined; +} + +interface ArrayBufferViewCtor { + new ( + buffer: ArrayBufferLike, + byteOffset?: number, + byteLength?: number + ): ArrayBufferView; +} + +export interface PullIntoDescriptor { + readerType: "default" | "byob"; + ctor: ArrayBufferViewCtor; + buffer: ArrayBufferLike; + byteOffset: number; + byteLength: number; + bytesFilled: number; + elementSize: number; +} + +export interface SDReadableByteStreamController + extends SDReadableStreamControllerBase<ArrayBufferView>, + q.ByteQueueContainer { + readonly byobRequest: SDReadableStreamBYOBRequest | undefined; + enqueue(chunk: ArrayBufferView): void; + + [autoAllocateChunkSize_]: number | undefined; // A positive integer, when the automatic buffer allocation feature is enabled. In that case, this value specifies the size of buffer to allocate. It is undefined otherwise. + [byobRequest_]: SDReadableStreamBYOBRequest | undefined; // A ReadableStreamBYOBRequest instance representing the current BYOB pull request + [cancelAlgorithm_]: CancelAlgorithm; // A promise-returning algorithm, taking one argument (the cancel reason), which communicates a requested cancelation to the underlying source + [closeRequested_]: boolean; // A boolean flag indicating whether the stream has been closed by its underlying byte source, but still has chunks in its internal queue that have not yet been read + [controlledReadableByteStream_]: SDReadableStream<ArrayBufferView>; // The ReadableStream instance controlled + [pullAgain_]: boolean; // A boolean flag set to true if the stream’s mechanisms requested a call to the underlying byte source’s pull() method to pull more data, but the pull could not yet be done since a previous call is still executing + [pullAlgorithm_]: PullAlgorithm<ArrayBufferView>; // A promise-returning algorithm that pulls data from the underlying source + [pulling_]: boolean; // A boolean flag set to true while the underlying byte source’s pull() method is executing and has not yet fulfilled, used to prevent reentrant calls + [pendingPullIntos_]: PullIntoDescriptor[]; // A List of descriptors representing pending BYOB pull requests + [started_]: boolean; // A boolean flag indicating whether the underlying source has finished starting + [strategyHWM_]: number; // A number supplied to the constructor as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying byte source +} + +export interface SDReadableStreamDefaultController<OutputType> + extends SDReadableStreamControllerBase<OutputType>, + q.QueueContainer<OutputType> { + enqueue(chunk?: OutputType): void; + + [controlledReadableStream_]: SDReadableStream<OutputType>; + [pullAlgorithm_]: PullAlgorithm<OutputType>; + [cancelAlgorithm_]: CancelAlgorithm; + [strategySizeAlgorithm_]: QueuingStrategySizeCallback<OutputType>; + [strategyHWM_]: number; + + [started_]: boolean; + [closeRequested_]: boolean; + [pullAgain_]: boolean; + [pulling_]: boolean; +} + +// ---- + +export interface SDReadableStreamReader<OutputType> { + readonly closed: Promise<void>; + cancel(reason: shared.ErrorResult): Promise<void>; + releaseLock(): void; + + [ownerReadableStream_]: SDReadableStream<OutputType> | undefined; + [closedPromise_]: shared.ControlledPromise<void>; +} + +export interface ReadRequest<V> extends shared.ControlledPromise<V> { + forAuthorCode: boolean; +} + +export declare class SDReadableStreamDefaultReader<OutputType> + implements SDReadableStreamReader<OutputType> { + constructor(stream: SDReadableStream<OutputType>); + + readonly closed: Promise<void>; + cancel(reason: shared.ErrorResult): Promise<void>; + releaseLock(): void; + read(): Promise<IteratorResult<OutputType | undefined>>; + + [ownerReadableStream_]: SDReadableStream<OutputType> | undefined; + [closedPromise_]: shared.ControlledPromise<void>; + [readRequests_]: Array<ReadRequest<IteratorResult<OutputType>>>; +} + +export declare class SDReadableStreamBYOBReader + implements SDReadableStreamReader<ArrayBufferView> { + constructor(stream: SDReadableStream<ArrayBufferView>); + + readonly closed: Promise<void>; + cancel(reason: shared.ErrorResult): Promise<void>; + releaseLock(): void; + read(view: ArrayBufferView): Promise<IteratorResult<ArrayBufferView>>; + + [ownerReadableStream_]: SDReadableStream<ArrayBufferView> | undefined; + [closedPromise_]: shared.ControlledPromise<void>; + [readIntoRequests_]: Array<ReadRequest<IteratorResult<ArrayBufferView>>>; +} + +/* TODO reenable this when we add WritableStreams and Transforms +export interface GenericTransformStream<InputType, OutputType> { + readable: SDReadableStream<OutputType>; + writable: ws.WritableStream<InputType>; +} +*/ + +export type ReadableStreamState = "readable" | "closed" | "errored"; + +export declare class SDReadableStream<OutputType> { + constructor( + underlyingSource: UnderlyingByteSource, + strategy?: { highWaterMark?: number; size?: undefined } + ); + constructor( + underlyingSource?: UnderlyingSource<OutputType>, + strategy?: QueuingStrategy<OutputType> + ); + + readonly locked: boolean; + cancel(reason?: shared.ErrorResult): Promise<void>; + getReader(): SDReadableStreamReader<OutputType>; + getReader(options: { mode: "byob" }): SDReadableStreamBYOBReader; + tee(): Array<SDReadableStream<OutputType>>; + + /* TODO reenable these methods when we bring in writableStreams and transport types + pipeThrough<ResultType>( + transform: GenericTransformStream<OutputType, ResultType>, + options?: PipeOptions + ): SDReadableStream<ResultType>; + pipeTo( + dest: ws.WritableStream<OutputType>, + options?: PipeOptions + ): Promise<void>; + */ + [shared.state_]: ReadableStreamState; + [shared.storedError_]: shared.ErrorResult; + [reader_]: SDReadableStreamReader<OutputType> | undefined; + [readableStreamController_]: SDReadableStreamControllerBase<OutputType>; +} + +// ---- Stream + +export function initializeReadableStream<OutputType>( + stream: SDReadableStream<OutputType> +): void { + stream[shared.state_] = "readable"; + stream[reader_] = undefined; + stream[shared.storedError_] = undefined; + stream[readableStreamController_] = undefined!; // mark slot as used for brand check +} + +export function isReadableStream( + value: unknown +): value is SDReadableStream<any> { + if (typeof value !== "object" || value === null) { + return false; + } + return readableStreamController_ in value; +} + +export function isReadableStreamLocked<OutputType>( + stream: SDReadableStream<OutputType> +): boolean { + return stream[reader_] !== undefined; +} + +export function readableStreamGetNumReadIntoRequests<OutputType>( + stream: SDReadableStream<OutputType> +): number | undefined { + // TODO remove the "as unknown" cast + // This is in to workaround a compiler error + // error TS2352: Conversion of type 'SDReadableStreamReader<OutputType>' to type 'SDReadableStreamBYOBReader' may be a mistake because neither type sufficiently overlaps with the other. If this was intentional, convert the expression to 'unknown' first. + // Type 'SDReadableStreamReader<OutputType>' is missing the following properties from type 'SDReadableStreamBYOBReader': read, [readIntoRequests_] + const reader = (stream[reader_] as unknown) as SDReadableStreamBYOBReader; + if (reader === undefined) { + return 0; + } + return reader[readIntoRequests_].length; +} + +export function readableStreamGetNumReadRequests<OutputType>( + stream: SDReadableStream<OutputType> +): number { + const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>; + if (reader === undefined) { + return 0; + } + return reader[readRequests_].length; +} + +export function readableStreamCreateReadResult<T>( + value: T, + done: boolean, + forAuthorCode: boolean +): IteratorResult<T> { + const prototype = forAuthorCode ? Object.prototype : null; + const result = Object.create(prototype); + result.value = value; + result.done = done; + return result; +} + +export function readableStreamAddReadIntoRequest( + stream: SDReadableStream<ArrayBufferView>, + forAuthorCode: boolean +): Promise<IteratorResult<ArrayBufferView, any>> { + // Assert: ! IsReadableStreamBYOBReader(stream.[[reader]]) is true. + // Assert: stream.[[state]] is "readable" or "closed". + const reader = stream[reader_] as SDReadableStreamBYOBReader; + const conProm = shared.createControlledPromise< + IteratorResult<ArrayBufferView> + >() as ReadRequest<IteratorResult<ArrayBufferView>>; + conProm.forAuthorCode = forAuthorCode; + reader[readIntoRequests_].push(conProm); + return conProm.promise; +} + +export function readableStreamAddReadRequest<OutputType>( + stream: SDReadableStream<OutputType>, + forAuthorCode: boolean +): Promise<IteratorResult<OutputType, any>> { + // Assert: ! IsReadableStreamDefaultReader(stream.[[reader]]) is true. + // Assert: stream.[[state]] is "readable". + const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>; + const conProm = shared.createControlledPromise< + IteratorResult<OutputType> + >() as ReadRequest<IteratorResult<OutputType>>; + conProm.forAuthorCode = forAuthorCode; + reader[readRequests_].push(conProm); + return conProm.promise; +} + +export function readableStreamHasBYOBReader<OutputType>( + stream: SDReadableStream<OutputType> +): boolean { + const reader = stream[reader_]; + return isReadableStreamBYOBReader(reader); +} + +export function readableStreamHasDefaultReader<OutputType>( + stream: SDReadableStream<OutputType> +): boolean { + const reader = stream[reader_]; + return isReadableStreamDefaultReader(reader); +} + +export function readableStreamCancel<OutputType>( + stream: SDReadableStream<OutputType>, + reason: shared.ErrorResult +): Promise<undefined> { + if (stream[shared.state_] === "closed") { + return Promise.resolve(undefined); + } + if (stream[shared.state_] === "errored") { + return Promise.reject(stream[shared.storedError_]); + } + readableStreamClose(stream); + + const sourceCancelPromise = stream[readableStreamController_][cancelSteps_]( + reason + ); + return sourceCancelPromise.then(_ => undefined); +} + +export function readableStreamClose<OutputType>( + stream: SDReadableStream<OutputType> +): void { + // Assert: stream.[[state]] is "readable". + stream[shared.state_] = "closed"; + const reader = stream[reader_]; + if (reader === undefined) { + return; + } + + if (isReadableStreamDefaultReader(reader)) { + for (const readRequest of reader[readRequests_]) { + readRequest.resolve( + readableStreamCreateReadResult( + undefined, + true, + readRequest.forAuthorCode + ) + ); + } + reader[readRequests_] = []; + } + reader[closedPromise_].resolve(); + reader[closedPromise_].promise.catch(() => {}); +} + +export function readableStreamError<OutputType>( + stream: SDReadableStream<OutputType>, + error: shared.ErrorResult +): void { + if (stream[shared.state_] !== "readable") { + throw new RangeError("Stream is in an invalid state"); + } + stream[shared.state_] = "errored"; + stream[shared.storedError_] = error; + + const reader = stream[reader_]; + if (reader === undefined) { + return; + } + if (isReadableStreamDefaultReader(reader)) { + for (const readRequest of reader[readRequests_]) { + readRequest.reject(error); + } + reader[readRequests_] = []; + } else { + // Assert: IsReadableStreamBYOBReader(reader). + // TODO remove the "as unknown" cast + const readIntoRequests = ((reader as unknown) as SDReadableStreamBYOBReader)[ + readIntoRequests_ + ]; + for (const readIntoRequest of readIntoRequests) { + readIntoRequest.reject(error); + } + // TODO remove the "as unknown" cast + ((reader as unknown) as SDReadableStreamBYOBReader)[readIntoRequests_] = []; + } + + reader[closedPromise_].reject(error); +} + +// ---- Readers + +export function isReadableStreamDefaultReader( + reader: unknown +): reader is SDReadableStreamDefaultReader<any> { + if (typeof reader !== "object" || reader === null) { + return false; + } + return readRequests_ in reader; +} + +export function isReadableStreamBYOBReader( + reader: unknown +): reader is SDReadableStreamBYOBReader { + if (typeof reader !== "object" || reader === null) { + return false; + } + return readIntoRequests_ in reader; +} + +export function readableStreamReaderGenericInitialize<OutputType>( + reader: SDReadableStreamReader<OutputType>, + stream: SDReadableStream<OutputType> +): void { + reader[ownerReadableStream_] = stream; + stream[reader_] = reader; + const streamState = stream[shared.state_]; + + reader[closedPromise_] = shared.createControlledPromise<void>(); + if (streamState === "readable") { + // leave as is + } else if (streamState === "closed") { + reader[closedPromise_].resolve(undefined); + } else { + reader[closedPromise_].reject(stream[shared.storedError_]); + reader[closedPromise_].promise.catch(() => {}); + } +} + +export function readableStreamReaderGenericRelease<OutputType>( + reader: SDReadableStreamReader<OutputType> +): void { + // Assert: reader.[[ownerReadableStream]] is not undefined. + // Assert: reader.[[ownerReadableStream]].[[reader]] is reader. + const stream = reader[ownerReadableStream_]; + if (stream === undefined) { + throw new TypeError("Reader is in an inconsistent state"); + } + + if (stream[shared.state_] === "readable") { + // code moved out + } else { + reader[closedPromise_] = shared.createControlledPromise<void>(); + } + reader[closedPromise_].reject(new TypeError()); + reader[closedPromise_].promise.catch(() => {}); + + stream[reader_] = undefined; + reader[ownerReadableStream_] = undefined; +} + +export function readableStreamBYOBReaderRead( + reader: SDReadableStreamBYOBReader, + view: ArrayBufferView, + forAuthorCode = false +): Promise<IteratorResult<ArrayBufferView, any>> { + const stream = reader[ownerReadableStream_]!; + // Assert: stream is not undefined. + + if (stream[shared.state_] === "errored") { + return Promise.reject(stream[shared.storedError_]); + } + return readableByteStreamControllerPullInto( + stream[readableStreamController_] as SDReadableByteStreamController, + view, + forAuthorCode + ); +} + +export function readableStreamDefaultReaderRead<OutputType>( + reader: SDReadableStreamDefaultReader<OutputType>, + forAuthorCode = false +): Promise<IteratorResult<OutputType | undefined>> { + const stream = reader[ownerReadableStream_]!; + // Assert: stream is not undefined. + + if (stream[shared.state_] === "closed") { + return Promise.resolve( + readableStreamCreateReadResult(undefined, true, forAuthorCode) + ); + } + if (stream[shared.state_] === "errored") { + return Promise.reject(stream[shared.storedError_]); + } + // Assert: stream.[[state]] is "readable". + return stream[readableStreamController_][pullSteps_](forAuthorCode); +} + +export function readableStreamFulfillReadIntoRequest<OutputType>( + stream: SDReadableStream<OutputType>, + chunk: ArrayBufferView, + done: boolean +): void { + // TODO remove the "as unknown" cast + const reader = (stream[reader_] as unknown) as SDReadableStreamBYOBReader; + const readIntoRequest = reader[readIntoRequests_].shift()!; // <-- length check done in caller + readIntoRequest.resolve( + readableStreamCreateReadResult(chunk, done, readIntoRequest.forAuthorCode) + ); +} + +export function readableStreamFulfillReadRequest<OutputType>( + stream: SDReadableStream<OutputType>, + chunk: OutputType, + done: boolean +): void { + const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>; + const readRequest = reader[readRequests_].shift()!; // <-- length check done in caller + readRequest.resolve( + readableStreamCreateReadResult(chunk, done, readRequest.forAuthorCode) + ); +} + +// ---- DefaultController + +export function setUpReadableStreamDefaultController<OutputType>( + stream: SDReadableStream<OutputType>, + controller: SDReadableStreamDefaultController<OutputType>, + startAlgorithm: StartAlgorithm, + pullAlgorithm: PullAlgorithm<OutputType>, + cancelAlgorithm: CancelAlgorithm, + highWaterMark: number, + sizeAlgorithm: QueuingStrategySizeCallback<OutputType> +): void { + // Assert: stream.[[readableStreamController]] is undefined. + controller[controlledReadableStream_] = stream; + q.resetQueue(controller); + controller[started_] = false; + controller[closeRequested_] = false; + controller[pullAgain_] = false; + controller[pulling_] = false; + controller[strategySizeAlgorithm_] = sizeAlgorithm; + controller[strategyHWM_] = highWaterMark; + controller[pullAlgorithm_] = pullAlgorithm; + controller[cancelAlgorithm_] = cancelAlgorithm; + stream[readableStreamController_] = controller; + + const startResult = startAlgorithm(); + Promise.resolve(startResult).then( + _ => { + controller[started_] = true; + // Assert: controller.[[pulling]] is false. + // Assert: controller.[[pullAgain]] is false. + readableStreamDefaultControllerCallPullIfNeeded(controller); + }, + error => { + readableStreamDefaultControllerError(controller, error); + } + ); +} + +export function isReadableStreamDefaultController( + value: unknown +): value is SDReadableStreamDefaultController<any> { + if (typeof value !== "object" || value === null) { + return false; + } + return controlledReadableStream_ in value; +} + +export function readableStreamDefaultControllerHasBackpressure<OutputType>( + controller: SDReadableStreamDefaultController<OutputType> +): boolean { + return !readableStreamDefaultControllerShouldCallPull(controller); +} + +export function readableStreamDefaultControllerCanCloseOrEnqueue<OutputType>( + controller: SDReadableStreamDefaultController<OutputType> +): boolean { + const state = controller[controlledReadableStream_][shared.state_]; + return controller[closeRequested_] === false && state === "readable"; +} + +export function readableStreamDefaultControllerGetDesiredSize<OutputType>( + controller: SDReadableStreamDefaultController<OutputType> +): number | null { + const state = controller[controlledReadableStream_][shared.state_]; + if (state === "errored") { + return null; + } + if (state === "closed") { + return 0; + } + return controller[strategyHWM_] - controller[q.queueTotalSize_]; +} + +export function readableStreamDefaultControllerClose<OutputType>( + controller: SDReadableStreamDefaultController<OutputType> +): void { + // Assert: !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is true. + controller[closeRequested_] = true; + const stream = controller[controlledReadableStream_]; + if (controller[q.queue_].length === 0) { + readableStreamDefaultControllerClearAlgorithms(controller); + readableStreamClose(stream); + } +} + +export function readableStreamDefaultControllerEnqueue<OutputType>( + controller: SDReadableStreamDefaultController<OutputType>, + chunk: OutputType +): void { + const stream = controller[controlledReadableStream_]; + // Assert: !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is true. + if ( + isReadableStreamLocked(stream) && + readableStreamGetNumReadRequests(stream) > 0 + ) { + readableStreamFulfillReadRequest(stream, chunk, false); + } else { + // Let result be the result of performing controller.[[strategySizeAlgorithm]], passing in chunk, + // and interpreting the result as an ECMAScript completion value. + // impl note: assuming that in JS land this just means try/catch with rethrow + let chunkSize: number; + try { + chunkSize = controller[strategySizeAlgorithm_](chunk); + } catch (error) { + readableStreamDefaultControllerError(controller, error); + throw error; + } + try { + q.enqueueValueWithSize(controller, chunk, chunkSize); + } catch (error) { + readableStreamDefaultControllerError(controller, error); + throw error; + } + } + readableStreamDefaultControllerCallPullIfNeeded(controller); +} + +export function readableStreamDefaultControllerError<OutputType>( + controller: SDReadableStreamDefaultController<OutputType>, + error: shared.ErrorResult +): void { + const stream = controller[controlledReadableStream_]; + if (stream[shared.state_] !== "readable") { + return; + } + q.resetQueue(controller); + readableStreamDefaultControllerClearAlgorithms(controller); + readableStreamError(stream, error); +} + +export function readableStreamDefaultControllerCallPullIfNeeded<OutputType>( + controller: SDReadableStreamDefaultController<OutputType> +): void { + if (!readableStreamDefaultControllerShouldCallPull(controller)) { + return; + } + if (controller[pulling_]) { + controller[pullAgain_] = true; + return; + } + if (controller[pullAgain_]) { + throw new RangeError("Stream controller is in an invalid state."); + } + + controller[pulling_] = true; + controller[pullAlgorithm_](controller).then( + _ => { + controller[pulling_] = false; + if (controller[pullAgain_]) { + controller[pullAgain_] = false; + readableStreamDefaultControllerCallPullIfNeeded(controller); + } + }, + error => { + readableStreamDefaultControllerError(controller, error); + } + ); +} + +export function readableStreamDefaultControllerShouldCallPull<OutputType>( + controller: SDReadableStreamDefaultController<OutputType> +): boolean { + const stream = controller[controlledReadableStream_]; + if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) { + return false; + } + if (controller[started_] === false) { + return false; + } + if ( + isReadableStreamLocked(stream) && + readableStreamGetNumReadRequests(stream) > 0 + ) { + return true; + } + const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller); + if (desiredSize === null) { + throw new RangeError("Stream is in an invalid state."); + } + return desiredSize > 0; +} + +export function readableStreamDefaultControllerClearAlgorithms<OutputType>( + controller: SDReadableStreamDefaultController<OutputType> +): void { + controller[pullAlgorithm_] = undefined!; + controller[cancelAlgorithm_] = undefined!; + controller[strategySizeAlgorithm_] = undefined!; +} + +// ---- BYOBController + +export function setUpReadableByteStreamController( + stream: SDReadableStream<ArrayBufferView>, + controller: SDReadableByteStreamController, + startAlgorithm: StartAlgorithm, + pullAlgorithm: PullAlgorithm<ArrayBufferView>, + cancelAlgorithm: CancelAlgorithm, + highWaterMark: number, + autoAllocateChunkSize: number | undefined +): void { + // Assert: stream.[[readableStreamController]] is undefined. + if (stream[readableStreamController_] !== undefined) { + throw new TypeError("Cannot reuse streams"); + } + if (autoAllocateChunkSize !== undefined) { + if ( + !shared.isInteger(autoAllocateChunkSize) || + autoAllocateChunkSize <= 0 + ) { + throw new RangeError( + "autoAllocateChunkSize must be a positive, finite integer" + ); + } + } + // Set controller.[[controlledReadableByteStream]] to stream. + controller[controlledReadableByteStream_] = stream; + // Set controller.[[pullAgain]] and controller.[[pulling]] to false. + controller[pullAgain_] = false; + controller[pulling_] = false; + readableByteStreamControllerClearPendingPullIntos(controller); + q.resetQueue(controller); + controller[closeRequested_] = false; + controller[started_] = false; + controller[strategyHWM_] = shared.validateAndNormalizeHighWaterMark( + highWaterMark + ); + controller[pullAlgorithm_] = pullAlgorithm; + controller[cancelAlgorithm_] = cancelAlgorithm; + controller[autoAllocateChunkSize_] = autoAllocateChunkSize; + controller[pendingPullIntos_] = []; + stream[readableStreamController_] = controller; + + // Let startResult be the result of performing startAlgorithm. + const startResult = startAlgorithm(); + Promise.resolve(startResult).then( + _ => { + controller[started_] = true; + // Assert: controller.[[pulling]] is false. + // Assert: controller.[[pullAgain]] is false. + readableByteStreamControllerCallPullIfNeeded(controller); + }, + error => { + readableByteStreamControllerError(controller, error); + } + ); +} + +export function isReadableStreamBYOBRequest( + value: unknown +): value is SDReadableStreamBYOBRequest { + if (typeof value !== "object" || value === null) { + return false; + } + return associatedReadableByteStreamController_ in value; +} + +export function isReadableByteStreamController( + value: unknown +): value is SDReadableByteStreamController { + if (typeof value !== "object" || value === null) { + return false; + } + return controlledReadableByteStream_ in value; +} + +export function readableByteStreamControllerCallPullIfNeeded( + controller: SDReadableByteStreamController +): void { + if (!readableByteStreamControllerShouldCallPull(controller)) { + return; + } + if (controller[pulling_]) { + controller[pullAgain_] = true; + return; + } + // Assert: controller.[[pullAgain]] is false. + controller[pulling_] = true; + controller[pullAlgorithm_](controller).then( + _ => { + controller[pulling_] = false; + if (controller[pullAgain_]) { + controller[pullAgain_] = false; + readableByteStreamControllerCallPullIfNeeded(controller); + } + }, + error => { + readableByteStreamControllerError(controller, error); + } + ); +} + +export function readableByteStreamControllerClearAlgorithms( + controller: SDReadableByteStreamController +): void { + controller[pullAlgorithm_] = undefined!; + controller[cancelAlgorithm_] = undefined!; +} + +export function readableByteStreamControllerClearPendingPullIntos( + controller: SDReadableByteStreamController +): void { + readableByteStreamControllerInvalidateBYOBRequest(controller); + controller[pendingPullIntos_] = []; +} + +export function readableByteStreamControllerClose( + controller: SDReadableByteStreamController +): void { + const stream = controller[controlledReadableByteStream_]; + // Assert: controller.[[closeRequested]] is false. + // Assert: stream.[[state]] is "readable". + if (controller[q.queueTotalSize_] > 0) { + controller[closeRequested_] = true; + return; + } + if (controller[pendingPullIntos_].length > 0) { + const firstPendingPullInto = controller[pendingPullIntos_][0]; + if (firstPendingPullInto.bytesFilled > 0) { + const error = new TypeError(); + readableByteStreamControllerError(controller, error); + throw error; + } + } + readableByteStreamControllerClearAlgorithms(controller); + readableStreamClose(stream); +} + +export function readableByteStreamControllerCommitPullIntoDescriptor( + stream: SDReadableStream<ArrayBufferView>, + pullIntoDescriptor: PullIntoDescriptor +): void { + // Assert: stream.[[state]] is not "errored". + let done = false; + if (stream[shared.state_] === "closed") { + // Assert: pullIntoDescriptor.[[bytesFilled]] is 0. + done = true; + } + const filledView = readableByteStreamControllerConvertPullIntoDescriptor( + pullIntoDescriptor + ); + if (pullIntoDescriptor.readerType === "default") { + readableStreamFulfillReadRequest(stream, filledView, done); + } else { + // Assert: pullIntoDescriptor.[[readerType]] is "byob". + readableStreamFulfillReadIntoRequest(stream, filledView, done); + } +} + +export function readableByteStreamControllerConvertPullIntoDescriptor( + pullIntoDescriptor: PullIntoDescriptor +): ArrayBufferView { + const { bytesFilled, elementSize } = pullIntoDescriptor; + // Assert: bytesFilled <= pullIntoDescriptor.byteLength + // Assert: bytesFilled mod elementSize is 0 + return new pullIntoDescriptor.ctor( + pullIntoDescriptor.buffer, + pullIntoDescriptor.byteOffset, + bytesFilled / elementSize + ); +} + +export function readableByteStreamControllerEnqueue( + controller: SDReadableByteStreamController, + chunk: ArrayBufferView +): void { + const stream = controller[controlledReadableByteStream_]; + // Assert: controller.[[closeRequested]] is false. + // Assert: stream.[[state]] is "readable". + const { buffer, byteOffset, byteLength } = chunk; + + const transferredBuffer = shared.transferArrayBuffer(buffer); + + if (readableStreamHasDefaultReader(stream)) { + if (readableStreamGetNumReadRequests(stream) === 0) { + readableByteStreamControllerEnqueueChunkToQueue( + controller, + transferredBuffer, + byteOffset, + byteLength + ); + } else { + // Assert: controller.[[queue]] is empty. + const transferredView = new Uint8Array( + transferredBuffer, + byteOffset, + byteLength + ); + readableStreamFulfillReadRequest(stream, transferredView, false); + } + } else if (readableStreamHasBYOBReader(stream)) { + readableByteStreamControllerEnqueueChunkToQueue( + controller, + transferredBuffer, + byteOffset, + byteLength + ); + readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( + controller + ); + } else { + // Assert: !IsReadableStreamLocked(stream) is false. + readableByteStreamControllerEnqueueChunkToQueue( + controller, + transferredBuffer, + byteOffset, + byteLength + ); + } + readableByteStreamControllerCallPullIfNeeded(controller); +} + +export function readableByteStreamControllerEnqueueChunkToQueue( + controller: SDReadableByteStreamController, + buffer: ArrayBufferLike, + byteOffset: number, + byteLength: number +): void { + controller[q.queue_].push({ buffer, byteOffset, byteLength }); + controller[q.queueTotalSize_] += byteLength; +} + +export function readableByteStreamControllerError( + controller: SDReadableByteStreamController, + error: shared.ErrorResult +): void { + const stream = controller[controlledReadableByteStream_]; + if (stream[shared.state_] !== "readable") { + return; + } + readableByteStreamControllerClearPendingPullIntos(controller); + q.resetQueue(controller); + readableByteStreamControllerClearAlgorithms(controller); + readableStreamError(stream, error); +} + +export function readableByteStreamControllerFillHeadPullIntoDescriptor( + controller: SDReadableByteStreamController, + size: number, + pullIntoDescriptor: PullIntoDescriptor +): void { + // Assert: either controller.[[pendingPullIntos]] is empty, or the first element of controller.[[pendingPullIntos]] is pullIntoDescriptor. + readableByteStreamControllerInvalidateBYOBRequest(controller); + pullIntoDescriptor.bytesFilled += size; +} + +export function readableByteStreamControllerFillPullIntoDescriptorFromQueue( + controller: SDReadableByteStreamController, + pullIntoDescriptor: PullIntoDescriptor +): boolean { + const elementSize = pullIntoDescriptor.elementSize; + const currentAlignedBytes = + pullIntoDescriptor.bytesFilled - + (pullIntoDescriptor.bytesFilled % elementSize); + const maxBytesToCopy = Math.min( + controller[q.queueTotalSize_], + pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled + ); + const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy; + const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize); + let totalBytesToCopyRemaining = maxBytesToCopy; + let ready = false; + + if (maxAlignedBytes > currentAlignedBytes) { + totalBytesToCopyRemaining = + maxAlignedBytes - pullIntoDescriptor.bytesFilled; + ready = true; + } + const queue = controller[q.queue_]; + + while (totalBytesToCopyRemaining > 0) { + const headOfQueue = queue.front()!; + const bytesToCopy = Math.min( + totalBytesToCopyRemaining, + headOfQueue.byteLength + ); + const destStart = + pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled; + shared.copyDataBlockBytes( + pullIntoDescriptor.buffer, + destStart, + headOfQueue.buffer, + headOfQueue.byteOffset, + bytesToCopy + ); + if (headOfQueue.byteLength === bytesToCopy) { + queue.shift(); + } else { + headOfQueue.byteOffset += bytesToCopy; + headOfQueue.byteLength -= bytesToCopy; + } + controller[q.queueTotalSize_] -= bytesToCopy; + readableByteStreamControllerFillHeadPullIntoDescriptor( + controller, + bytesToCopy, + pullIntoDescriptor + ); + totalBytesToCopyRemaining -= bytesToCopy; + } + if (!ready) { + // Assert: controller[queueTotalSize_] === 0 + // Assert: pullIntoDescriptor.bytesFilled > 0 + // Assert: pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize + } + return ready; +} + +export function readableByteStreamControllerGetDesiredSize( + controller: SDReadableByteStreamController +): number | null { + const stream = controller[controlledReadableByteStream_]; + const state = stream[shared.state_]; + if (state === "errored") { + return null; + } + if (state === "closed") { + return 0; + } + return controller[strategyHWM_] - controller[q.queueTotalSize_]; +} + +export function readableByteStreamControllerHandleQueueDrain( + controller: SDReadableByteStreamController +): void { + // Assert: controller.[[controlledReadableByteStream]].[[state]] is "readable". + if (controller[q.queueTotalSize_] === 0 && controller[closeRequested_]) { + readableByteStreamControllerClearAlgorithms(controller); + readableStreamClose(controller[controlledReadableByteStream_]); + } else { + readableByteStreamControllerCallPullIfNeeded(controller); + } +} + +export function readableByteStreamControllerInvalidateBYOBRequest( + controller: SDReadableByteStreamController +): void { + const byobRequest = controller[byobRequest_]; + if (byobRequest === undefined) { + return; + } + byobRequest[associatedReadableByteStreamController_] = undefined; + byobRequest[view_] = undefined; + controller[byobRequest_] = undefined; +} + +export function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( + controller: SDReadableByteStreamController +): void { + // Assert: controller.[[closeRequested]] is false. + const pendingPullIntos = controller[pendingPullIntos_]; + while (pendingPullIntos.length > 0) { + if (controller[q.queueTotalSize_] === 0) { + return; + } + const pullIntoDescriptor = pendingPullIntos[0]; + if ( + readableByteStreamControllerFillPullIntoDescriptorFromQueue( + controller, + pullIntoDescriptor + ) + ) { + readableByteStreamControllerShiftPendingPullInto(controller); + readableByteStreamControllerCommitPullIntoDescriptor( + controller[controlledReadableByteStream_], + pullIntoDescriptor + ); + } + } +} + +export function readableByteStreamControllerPullInto( + controller: SDReadableByteStreamController, + view: ArrayBufferView, + forAuthorCode: boolean +): Promise<IteratorResult<ArrayBufferView, any>> { + const stream = controller[controlledReadableByteStream_]; + + const elementSize = (view as Uint8Array).BYTES_PER_ELEMENT || 1; // DataView exposes this in Webkit as 1, is not present in FF or Blink + const ctor = view.constructor as Uint8ArrayConstructor; // the typecast here is just for TS typing, it does not influence buffer creation + + const byteOffset = view.byteOffset; + const byteLength = view.byteLength; + const buffer = shared.transferArrayBuffer(view.buffer); + const pullIntoDescriptor: PullIntoDescriptor = { + buffer, + byteOffset, + byteLength, + bytesFilled: 0, + elementSize, + ctor, + readerType: "byob" + }; + + if (controller[pendingPullIntos_].length > 0) { + controller[pendingPullIntos_].push(pullIntoDescriptor); + return readableStreamAddReadIntoRequest(stream, forAuthorCode); + } + if (stream[shared.state_] === "closed") { + const emptyView = new ctor( + pullIntoDescriptor.buffer, + pullIntoDescriptor.byteOffset, + 0 + ); + return Promise.resolve( + readableStreamCreateReadResult(emptyView, true, forAuthorCode) + ); + } + + if (controller[q.queueTotalSize_] > 0) { + if ( + readableByteStreamControllerFillPullIntoDescriptorFromQueue( + controller, + pullIntoDescriptor + ) + ) { + const filledView = readableByteStreamControllerConvertPullIntoDescriptor( + pullIntoDescriptor + ); + readableByteStreamControllerHandleQueueDrain(controller); + return Promise.resolve( + readableStreamCreateReadResult(filledView, false, forAuthorCode) + ); + } + if (controller[closeRequested_]) { + const error = new TypeError(); + readableByteStreamControllerError(controller, error); + return Promise.reject(error); + } + } + + controller[pendingPullIntos_].push(pullIntoDescriptor); + const promise = readableStreamAddReadIntoRequest(stream, forAuthorCode); + readableByteStreamControllerCallPullIfNeeded(controller); + return promise; +} + +export function readableByteStreamControllerRespond( + controller: SDReadableByteStreamController, + bytesWritten: number +): void { + bytesWritten = Number(bytesWritten); + if (!shared.isFiniteNonNegativeNumber(bytesWritten)) { + throw new RangeError("bytesWritten must be a finite, non-negative number"); + } + // Assert: controller.[[pendingPullIntos]] is not empty. + readableByteStreamControllerRespondInternal(controller, bytesWritten); +} + +export function readableByteStreamControllerRespondInClosedState( + controller: SDReadableByteStreamController, + firstDescriptor: PullIntoDescriptor +): void { + firstDescriptor.buffer = shared.transferArrayBuffer(firstDescriptor.buffer); + // Assert: firstDescriptor.[[bytesFilled]] is 0. + const stream = controller[controlledReadableByteStream_]; + if (readableStreamHasBYOBReader(stream)) { + while (readableStreamGetNumReadIntoRequests(stream) > 0) { + const pullIntoDescriptor = readableByteStreamControllerShiftPendingPullInto( + controller + )!; + readableByteStreamControllerCommitPullIntoDescriptor( + stream, + pullIntoDescriptor + ); + } + } +} + +export function readableByteStreamControllerRespondInReadableState( + controller: SDReadableByteStreamController, + bytesWritten: number, + pullIntoDescriptor: PullIntoDescriptor +): void { + if ( + pullIntoDescriptor.bytesFilled + bytesWritten > + pullIntoDescriptor.byteLength + ) { + throw new RangeError(); + } + readableByteStreamControllerFillHeadPullIntoDescriptor( + controller, + bytesWritten, + pullIntoDescriptor + ); + if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) { + return; + } + readableByteStreamControllerShiftPendingPullInto(controller); + const remainderSize = + pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize; + if (remainderSize > 0) { + const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled; + const remainder = shared.cloneArrayBuffer( + pullIntoDescriptor.buffer, + end - remainderSize, + remainderSize, + ArrayBuffer + ); + readableByteStreamControllerEnqueueChunkToQueue( + controller, + remainder, + 0, + remainder.byteLength + ); + } + pullIntoDescriptor.buffer = shared.transferArrayBuffer( + pullIntoDescriptor.buffer + ); + pullIntoDescriptor.bytesFilled = + pullIntoDescriptor.bytesFilled - remainderSize; + readableByteStreamControllerCommitPullIntoDescriptor( + controller[controlledReadableByteStream_], + pullIntoDescriptor + ); + readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller); +} + +export function readableByteStreamControllerRespondInternal( + controller: SDReadableByteStreamController, + bytesWritten: number +): void { + const firstDescriptor = controller[pendingPullIntos_][0]; + const stream = controller[controlledReadableByteStream_]; + if (stream[shared.state_] === "closed") { + if (bytesWritten !== 0) { + throw new TypeError(); + } + readableByteStreamControllerRespondInClosedState( + controller, + firstDescriptor + ); + } else { + // Assert: stream.[[state]] is "readable". + readableByteStreamControllerRespondInReadableState( + controller, + bytesWritten, + firstDescriptor + ); + } + readableByteStreamControllerCallPullIfNeeded(controller); +} + +export function readableByteStreamControllerRespondWithNewView( + controller: SDReadableByteStreamController, + view: ArrayBufferView +): void { + // Assert: controller.[[pendingPullIntos]] is not empty. + const firstDescriptor = controller[pendingPullIntos_][0]; + if ( + firstDescriptor.byteOffset + firstDescriptor.bytesFilled !== + view.byteOffset + ) { + throw new RangeError(); + } + if (firstDescriptor.byteLength !== view.byteLength) { + throw new RangeError(); + } + firstDescriptor.buffer = view.buffer; + readableByteStreamControllerRespondInternal(controller, view.byteLength); +} + +export function readableByteStreamControllerShiftPendingPullInto( + controller: SDReadableByteStreamController +): PullIntoDescriptor | undefined { + const descriptor = controller[pendingPullIntos_].shift(); + readableByteStreamControllerInvalidateBYOBRequest(controller); + return descriptor; +} + +export function readableByteStreamControllerShouldCallPull( + controller: SDReadableByteStreamController +): boolean { + // Let stream be controller.[[controlledReadableByteStream]]. + const stream = controller[controlledReadableByteStream_]; + if (stream[shared.state_] !== "readable") { + return false; + } + if (controller[closeRequested_]) { + return false; + } + if (!controller[started_]) { + return false; + } + if ( + readableStreamHasDefaultReader(stream) && + readableStreamGetNumReadRequests(stream) > 0 + ) { + return true; + } + if ( + readableStreamHasBYOBReader(stream) && + readableStreamGetNumReadIntoRequests(stream) > 0 + ) { + return true; + } + const desiredSize = readableByteStreamControllerGetDesiredSize(controller); + // Assert: desiredSize is not null. + return desiredSize! > 0; +} + +export function setUpReadableStreamBYOBRequest( + request: SDReadableStreamBYOBRequest, + controller: SDReadableByteStreamController, + view: ArrayBufferView +): void { + if (!isReadableByteStreamController(controller)) { + throw new TypeError(); + } + if (!ArrayBuffer.isView(view)) { + throw new TypeError(); + } + // Assert: !IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false. + + request[associatedReadableByteStreamController_] = controller; + request[view_] = view; +} |