summaryrefslogtreecommitdiff
path: root/cli/js/web/streams/readable-internals.ts
diff options
context:
space:
mode:
authorKitson Kelly <me@kitsonkelly.com>2020-04-23 00:06:51 +1000
committerGitHub <noreply@github.com>2020-04-22 10:06:51 -0400
commit8bcfc03d71cbd2cfd7ab68035ec0968d9f93b5b8 (patch)
treee1769ca51d2afde57ae18eb25b7a91388fcbf00a /cli/js/web/streams/readable-internals.ts
parentb270d6c8d090669601465f8c9c94512d6c6a07d4 (diff)
Rewrite streams (#4842)
Diffstat (limited to 'cli/js/web/streams/readable-internals.ts')
-rw-r--r--cli/js/web/streams/readable-internals.ts1350
1 files changed, 0 insertions, 1350 deletions
diff --git a/cli/js/web/streams/readable-internals.ts b/cli/js/web/streams/readable-internals.ts
deleted file mode 100644
index 571ce50ed..000000000
--- a/cli/js/web/streams/readable-internals.ts
+++ /dev/null
@@ -1,1350 +0,0 @@
-// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-/* eslint-disable @typescript-eslint/no-explicit-any */
-// TODO reenable this lint here
-
-import * as shared from "./shared-internals.ts";
-import * as q from "./queue-mixin.ts";
-import {
- QueuingStrategy,
- QueuingStrategySizeCallback,
- UnderlyingSource,
- UnderlyingByteSource,
-} from "../dom_types.d.ts";
-
-// ReadableStreamDefaultController
-export const controlledReadableStream_ = Symbol("controlledReadableStream_");
-export const pullAlgorithm_ = Symbol("pullAlgorithm_");
-export const cancelAlgorithm_ = Symbol("cancelAlgorithm_");
-export const strategySizeAlgorithm_ = Symbol("strategySizeAlgorithm_");
-export const strategyHWM_ = Symbol("strategyHWM_");
-export const started_ = Symbol("started_");
-export const closeRequested_ = Symbol("closeRequested_");
-export const pullAgain_ = Symbol("pullAgain_");
-export const pulling_ = Symbol("pulling_");
-export const cancelSteps_ = Symbol("cancelSteps_");
-export const pullSteps_ = Symbol("pullSteps_");
-
-// ReadableByteStreamController
-export const autoAllocateChunkSize_ = Symbol("autoAllocateChunkSize_");
-export const byobRequest_ = Symbol("byobRequest_");
-export const controlledReadableByteStream_ = Symbol(
- "controlledReadableByteStream_"
-);
-export const pendingPullIntos_ = Symbol("pendingPullIntos_");
-
-// ReadableStreamDefaultReader
-export const closedPromise_ = Symbol("closedPromise_");
-export const ownerReadableStream_ = Symbol("ownerReadableStream_");
-export const readRequests_ = Symbol("readRequests_");
-export const readIntoRequests_ = Symbol("readIntoRequests_");
-
-// ReadableStreamBYOBRequest
-export const associatedReadableByteStreamController_ = Symbol(
- "associatedReadableByteStreamController_"
-);
-export const view_ = Symbol("view_");
-
-// ReadableStreamBYOBReader
-
-// ReadableStream
-export const reader_ = Symbol("reader_");
-export const readableStreamController_ = Symbol("readableStreamController_");
-
-export type StartFunction<OutputType> = (
- controller: SDReadableStreamControllerBase<OutputType>
-) => void | PromiseLike<void>;
-export type StartAlgorithm = () => Promise<void> | void;
-export type PullFunction<OutputType> = (
- controller: SDReadableStreamControllerBase<OutputType>
-) => void | PromiseLike<void>;
-export type PullAlgorithm<OutputType> = (
- controller: SDReadableStreamControllerBase<OutputType>
-) => PromiseLike<void>;
-export type CancelAlgorithm = (reason?: shared.ErrorResult) => Promise<void>;
-
-// ----
-
-export interface SDReadableStreamControllerBase<OutputType> {
- readonly desiredSize: number | null;
- close(): void;
- error(e?: shared.ErrorResult): void;
-
- [cancelSteps_](reason: shared.ErrorResult): Promise<void>;
- [pullSteps_](forAuthorCode: boolean): Promise<IteratorResult<OutputType>>;
-}
-
-export interface SDReadableStreamBYOBRequest {
- readonly view: ArrayBufferView;
- respond(bytesWritten: number): void;
- respondWithNewView(view: ArrayBufferView): void;
-
- [associatedReadableByteStreamController_]:
- | SDReadableByteStreamController
- | undefined;
- [view_]: ArrayBufferView | undefined;
-}
-
-interface ArrayBufferViewCtor {
- new (
- buffer: ArrayBufferLike,
- byteOffset?: number,
- byteLength?: number
- ): ArrayBufferView;
-}
-
-export interface PullIntoDescriptor {
- readerType: "default" | "byob";
- ctor: ArrayBufferViewCtor;
- buffer: ArrayBufferLike;
- byteOffset: number;
- byteLength: number;
- bytesFilled: number;
- elementSize: number;
-}
-
-export interface SDReadableByteStreamController
- extends SDReadableStreamControllerBase<ArrayBufferView>,
- q.ByteQueueContainer {
- readonly byobRequest: SDReadableStreamBYOBRequest | undefined;
- enqueue(chunk: ArrayBufferView): void;
-
- [autoAllocateChunkSize_]: number | undefined; // A positive integer, when the automatic buffer allocation feature is enabled. In that case, this value specifies the size of buffer to allocate. It is undefined otherwise.
- [byobRequest_]: SDReadableStreamBYOBRequest | undefined; // A ReadableStreamBYOBRequest instance representing the current BYOB pull request
- [cancelAlgorithm_]: CancelAlgorithm; // A promise-returning algorithm, taking one argument (the cancel reason), which communicates a requested cancelation to the underlying source
- [closeRequested_]: boolean; // A boolean flag indicating whether the stream has been closed by its underlying byte source, but still has chunks in its internal queue that have not yet been read
- [controlledReadableByteStream_]: SDReadableStream<ArrayBufferView>; // The ReadableStream instance controlled
- [pullAgain_]: boolean; // A boolean flag set to true if the stream’s mechanisms requested a call to the underlying byte source’s pull() method to pull more data, but the pull could not yet be done since a previous call is still executing
- [pullAlgorithm_]: PullAlgorithm<ArrayBufferView>; // A promise-returning algorithm that pulls data from the underlying source
- [pulling_]: boolean; // A boolean flag set to true while the underlying byte source’s pull() method is executing and has not yet fulfilled, used to prevent reentrant calls
- [pendingPullIntos_]: PullIntoDescriptor[]; // A List of descriptors representing pending BYOB pull requests
- [started_]: boolean; // A boolean flag indicating whether the underlying source has finished starting
- [strategyHWM_]: number; // A number supplied to the constructor as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying byte source
-}
-
-export interface SDReadableStreamDefaultController<OutputType>
- extends SDReadableStreamControllerBase<OutputType>,
- q.QueueContainer<OutputType> {
- enqueue(chunk?: OutputType): void;
-
- [controlledReadableStream_]: SDReadableStream<OutputType>;
- [pullAlgorithm_]: PullAlgorithm<OutputType>;
- [cancelAlgorithm_]: CancelAlgorithm;
- [strategySizeAlgorithm_]: QueuingStrategySizeCallback<OutputType>;
- [strategyHWM_]: number;
-
- [started_]: boolean;
- [closeRequested_]: boolean;
- [pullAgain_]: boolean;
- [pulling_]: boolean;
-}
-
-// ----
-
-export interface SDReadableStreamReader<OutputType> {
- readonly closed: Promise<void>;
- cancel(reason: shared.ErrorResult): Promise<void>;
- releaseLock(): void;
-
- [ownerReadableStream_]: SDReadableStream<OutputType> | undefined;
- [closedPromise_]: shared.ControlledPromise<void>;
-}
-
-export interface ReadRequest<V> extends shared.ControlledPromise<V> {
- forAuthorCode: boolean;
-}
-
-export declare class SDReadableStreamDefaultReader<OutputType>
- implements SDReadableStreamReader<OutputType> {
- constructor(stream: SDReadableStream<OutputType>);
-
- readonly closed: Promise<void>;
- cancel(reason: shared.ErrorResult): Promise<void>;
- releaseLock(): void;
- read(): Promise<IteratorResult<OutputType | undefined>>;
-
- [ownerReadableStream_]: SDReadableStream<OutputType> | undefined;
- [closedPromise_]: shared.ControlledPromise<void>;
- [readRequests_]: Array<ReadRequest<IteratorResult<OutputType>>>;
-}
-
-export declare class SDReadableStreamBYOBReader
- implements SDReadableStreamReader<ArrayBufferView> {
- constructor(stream: SDReadableStream<ArrayBufferView>);
-
- readonly closed: Promise<void>;
- cancel(reason: shared.ErrorResult): Promise<void>;
- releaseLock(): void;
- read(view: ArrayBufferView): Promise<IteratorResult<ArrayBufferView>>;
-
- [ownerReadableStream_]: SDReadableStream<ArrayBufferView> | undefined;
- [closedPromise_]: shared.ControlledPromise<void>;
- [readIntoRequests_]: Array<ReadRequest<IteratorResult<ArrayBufferView>>>;
-}
-
-/* TODO reenable this when we add WritableStreams and Transforms
-export interface GenericTransformStream<InputType, OutputType> {
- readable: SDReadableStream<OutputType>;
- writable: ws.WritableStream<InputType>;
-}
-*/
-
-export type ReadableStreamState = "readable" | "closed" | "errored";
-
-export declare class SDReadableStream<OutputType> {
- constructor(
- underlyingSource: UnderlyingByteSource,
- strategy?: { highWaterMark?: number; size?: undefined }
- );
- constructor(
- underlyingSource?: UnderlyingSource<OutputType>,
- strategy?: QueuingStrategy<OutputType>
- );
-
- readonly locked: boolean;
- cancel(reason?: shared.ErrorResult): Promise<void>;
- getReader(): SDReadableStreamReader<OutputType>;
- getReader(options: { mode: "byob" }): SDReadableStreamBYOBReader;
- tee(): Array<SDReadableStream<OutputType>>;
-
- /* TODO reenable these methods when we bring in writableStreams and transport types
- pipeThrough<ResultType>(
- transform: GenericTransformStream<OutputType, ResultType>,
- options?: PipeOptions
- ): SDReadableStream<ResultType>;
- pipeTo(
- dest: ws.WritableStream<OutputType>,
- options?: PipeOptions
- ): Promise<void>;
- */
- [shared.state_]: ReadableStreamState;
- [shared.storedError_]: shared.ErrorResult;
- [reader_]: SDReadableStreamReader<OutputType> | undefined;
- [readableStreamController_]: SDReadableStreamControllerBase<OutputType>;
-}
-
-// ---- Stream
-
-export function initializeReadableStream<OutputType>(
- stream: SDReadableStream<OutputType>
-): void {
- stream[shared.state_] = "readable";
- stream[reader_] = undefined;
- stream[shared.storedError_] = undefined;
- stream[readableStreamController_] = undefined!; // mark slot as used for brand check
-}
-
-export function isReadableStream(
- value: unknown
-): value is SDReadableStream<any> {
- if (typeof value !== "object" || value === null) {
- return false;
- }
- return readableStreamController_ in value;
-}
-
-export function isReadableStreamLocked<OutputType>(
- stream: SDReadableStream<OutputType>
-): boolean {
- return stream[reader_] !== undefined;
-}
-
-export function readableStreamGetNumReadIntoRequests<OutputType>(
- stream: SDReadableStream<OutputType>
-): number {
- // TODO remove the "as unknown" cast
- // This is in to workaround a compiler error
- // error TS2352: Conversion of type 'SDReadableStreamReader<OutputType>' to type 'SDReadableStreamBYOBReader' may be a mistake because neither type sufficiently overlaps with the other. If this was intentional, convert the expression to 'unknown' first.
- // Type 'SDReadableStreamReader<OutputType>' is missing the following properties from type 'SDReadableStreamBYOBReader': read, [readIntoRequests_]
- const reader = (stream[reader_] as unknown) as SDReadableStreamBYOBReader;
- if (reader === undefined) {
- return 0;
- }
- return reader[readIntoRequests_].length;
-}
-
-export function readableStreamGetNumReadRequests<OutputType>(
- stream: SDReadableStream<OutputType>
-): number {
- const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;
- if (reader === undefined) {
- return 0;
- }
- return reader[readRequests_].length;
-}
-
-export function readableStreamCreateReadResult<T>(
- value: T,
- done: boolean,
- forAuthorCode: boolean
-): IteratorResult<T> {
- const prototype = forAuthorCode ? Object.prototype : null;
- const result = Object.create(prototype);
- result.value = value;
- result.done = done;
- return result;
-}
-
-export function readableStreamAddReadIntoRequest(
- stream: SDReadableStream<ArrayBufferView>,
- forAuthorCode: boolean
-): Promise<IteratorResult<ArrayBufferView, any>> {
- // Assert: ! IsReadableStreamBYOBReader(stream.[[reader]]) is true.
- // Assert: stream.[[state]] is "readable" or "closed".
- const reader = stream[reader_] as SDReadableStreamBYOBReader;
- const conProm = shared.createControlledPromise<
- IteratorResult<ArrayBufferView>
- >() as ReadRequest<IteratorResult<ArrayBufferView>>;
- conProm.forAuthorCode = forAuthorCode;
- reader[readIntoRequests_].push(conProm);
- return conProm.promise;
-}
-
-export function readableStreamAddReadRequest<OutputType>(
- stream: SDReadableStream<OutputType>,
- forAuthorCode: boolean
-): Promise<IteratorResult<OutputType, any>> {
- // Assert: ! IsReadableStreamDefaultReader(stream.[[reader]]) is true.
- // Assert: stream.[[state]] is "readable".
- const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;
- const conProm = shared.createControlledPromise<
- IteratorResult<OutputType>
- >() as ReadRequest<IteratorResult<OutputType>>;
- conProm.forAuthorCode = forAuthorCode;
- reader[readRequests_].push(conProm);
- return conProm.promise;
-}
-
-export function readableStreamHasBYOBReader<OutputType>(
- stream: SDReadableStream<OutputType>
-): boolean {
- const reader = stream[reader_];
- return isReadableStreamBYOBReader(reader);
-}
-
-export function readableStreamHasDefaultReader<OutputType>(
- stream: SDReadableStream<OutputType>
-): boolean {
- const reader = stream[reader_];
- return isReadableStreamDefaultReader(reader);
-}
-
-export function readableStreamCancel<OutputType>(
- stream: SDReadableStream<OutputType>,
- reason: shared.ErrorResult
-): Promise<undefined> {
- if (stream[shared.state_] === "closed") {
- return Promise.resolve(undefined);
- }
- if (stream[shared.state_] === "errored") {
- return Promise.reject(stream[shared.storedError_]);
- }
- readableStreamClose(stream);
-
- const sourceCancelPromise = stream[readableStreamController_][cancelSteps_](
- reason
- );
- return sourceCancelPromise.then((_) => undefined);
-}
-
-export function readableStreamClose<OutputType>(
- stream: SDReadableStream<OutputType>
-): void {
- // Assert: stream.[[state]] is "readable".
- stream[shared.state_] = "closed";
- const reader = stream[reader_];
- if (reader === undefined) {
- return;
- }
-
- if (isReadableStreamDefaultReader(reader)) {
- for (const readRequest of reader[readRequests_]) {
- readRequest.resolve(
- readableStreamCreateReadResult(
- undefined,
- true,
- readRequest.forAuthorCode
- )
- );
- }
- reader[readRequests_] = [];
- }
- reader[closedPromise_].resolve();
- reader[closedPromise_].promise.catch(() => {});
-}
-
-export function readableStreamError<OutputType>(
- stream: SDReadableStream<OutputType>,
- error: shared.ErrorResult
-): void {
- if (stream[shared.state_] !== "readable") {
- throw new RangeError("Stream is in an invalid state");
- }
- stream[shared.state_] = "errored";
- stream[shared.storedError_] = error;
-
- const reader = stream[reader_];
- if (reader === undefined) {
- return;
- }
- if (isReadableStreamDefaultReader(reader)) {
- for (const readRequest of reader[readRequests_]) {
- readRequest.reject(error);
- }
- reader[readRequests_] = [];
- } else {
- // Assert: IsReadableStreamBYOBReader(reader).
- // TODO remove the "as unknown" cast
- const readIntoRequests = ((reader as unknown) as SDReadableStreamBYOBReader)[
- readIntoRequests_
- ];
- for (const readIntoRequest of readIntoRequests) {
- readIntoRequest.reject(error);
- }
- // TODO remove the "as unknown" cast
- ((reader as unknown) as SDReadableStreamBYOBReader)[readIntoRequests_] = [];
- }
-
- reader[closedPromise_].reject(error);
-}
-
-// ---- Readers
-
-export function isReadableStreamDefaultReader(
- reader: unknown
-): reader is SDReadableStreamDefaultReader<any> {
- if (typeof reader !== "object" || reader === null) {
- return false;
- }
- return readRequests_ in reader;
-}
-
-export function isReadableStreamBYOBReader(
- reader: unknown
-): reader is SDReadableStreamBYOBReader {
- if (typeof reader !== "object" || reader === null) {
- return false;
- }
- return readIntoRequests_ in reader;
-}
-
-export function readableStreamReaderGenericInitialize<OutputType>(
- reader: SDReadableStreamReader<OutputType>,
- stream: SDReadableStream<OutputType>
-): void {
- reader[ownerReadableStream_] = stream;
- stream[reader_] = reader;
- const streamState = stream[shared.state_];
-
- reader[closedPromise_] = shared.createControlledPromise<void>();
- if (streamState === "readable") {
- // leave as is
- } else if (streamState === "closed") {
- reader[closedPromise_].resolve(undefined);
- } else {
- reader[closedPromise_].reject(stream[shared.storedError_]);
- reader[closedPromise_].promise.catch(() => {});
- }
-}
-
-export function readableStreamReaderGenericRelease<OutputType>(
- reader: SDReadableStreamReader<OutputType>
-): void {
- // Assert: reader.[[ownerReadableStream]] is not undefined.
- // Assert: reader.[[ownerReadableStream]].[[reader]] is reader.
- const stream = reader[ownerReadableStream_];
- if (stream === undefined) {
- throw new TypeError("Reader is in an inconsistent state");
- }
-
- if (stream[shared.state_] === "readable") {
- // code moved out
- } else {
- reader[closedPromise_] = shared.createControlledPromise<void>();
- }
- reader[closedPromise_].reject(new TypeError());
- reader[closedPromise_].promise.catch(() => {});
-
- stream[reader_] = undefined;
- reader[ownerReadableStream_] = undefined;
-}
-
-export function readableStreamBYOBReaderRead(
- reader: SDReadableStreamBYOBReader,
- view: ArrayBufferView,
- forAuthorCode = false
-): Promise<IteratorResult<ArrayBufferView, any>> {
- const stream = reader[ownerReadableStream_]!;
- // Assert: stream is not undefined.
-
- if (stream[shared.state_] === "errored") {
- return Promise.reject(stream[shared.storedError_]);
- }
- return readableByteStreamControllerPullInto(
- stream[readableStreamController_] as SDReadableByteStreamController,
- view,
- forAuthorCode
- );
-}
-
-export function readableStreamDefaultReaderRead<OutputType>(
- reader: SDReadableStreamDefaultReader<OutputType>,
- forAuthorCode = false
-): Promise<IteratorResult<OutputType | undefined>> {
- const stream = reader[ownerReadableStream_]!;
- // Assert: stream is not undefined.
-
- if (stream[shared.state_] === "closed") {
- return Promise.resolve(
- readableStreamCreateReadResult(undefined, true, forAuthorCode)
- );
- }
- if (stream[shared.state_] === "errored") {
- return Promise.reject(stream[shared.storedError_]);
- }
- // Assert: stream.[[state]] is "readable".
- return stream[readableStreamController_][pullSteps_](forAuthorCode);
-}
-
-export function readableStreamFulfillReadIntoRequest<OutputType>(
- stream: SDReadableStream<OutputType>,
- chunk: ArrayBufferView,
- done: boolean
-): void {
- // TODO remove the "as unknown" cast
- const reader = (stream[reader_] as unknown) as SDReadableStreamBYOBReader;
- const readIntoRequest = reader[readIntoRequests_].shift()!; // <-- length check done in caller
- readIntoRequest.resolve(
- readableStreamCreateReadResult(chunk, done, readIntoRequest.forAuthorCode)
- );
-}
-
-export function readableStreamFulfillReadRequest<OutputType>(
- stream: SDReadableStream<OutputType>,
- chunk: OutputType,
- done: boolean
-): void {
- const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;
- const readRequest = reader[readRequests_].shift()!; // <-- length check done in caller
- readRequest.resolve(
- readableStreamCreateReadResult(chunk, done, readRequest.forAuthorCode)
- );
-}
-
-// ---- DefaultController
-
-export function setUpReadableStreamDefaultController<OutputType>(
- stream: SDReadableStream<OutputType>,
- controller: SDReadableStreamDefaultController<OutputType>,
- startAlgorithm: StartAlgorithm,
- pullAlgorithm: PullAlgorithm<OutputType>,
- cancelAlgorithm: CancelAlgorithm,
- highWaterMark: number,
- sizeAlgorithm: QueuingStrategySizeCallback<OutputType>
-): void {
- // Assert: stream.[[readableStreamController]] is undefined.
- controller[controlledReadableStream_] = stream;
- q.resetQueue(controller);
- controller[started_] = false;
- controller[closeRequested_] = false;
- controller[pullAgain_] = false;
- controller[pulling_] = false;
- controller[strategySizeAlgorithm_] = sizeAlgorithm;
- controller[strategyHWM_] = highWaterMark;
- controller[pullAlgorithm_] = pullAlgorithm;
- controller[cancelAlgorithm_] = cancelAlgorithm;
- stream[readableStreamController_] = controller;
-
- const startResult = startAlgorithm();
- Promise.resolve(startResult).then(
- (_) => {
- controller[started_] = true;
- // Assert: controller.[[pulling]] is false.
- // Assert: controller.[[pullAgain]] is false.
- readableStreamDefaultControllerCallPullIfNeeded(controller);
- },
- (error) => {
- readableStreamDefaultControllerError(controller, error);
- }
- );
-}
-
-export function isReadableStreamDefaultController(
- value: unknown
-): value is SDReadableStreamDefaultController<any> {
- if (typeof value !== "object" || value === null) {
- return false;
- }
- return controlledReadableStream_ in value;
-}
-
-export function readableStreamDefaultControllerHasBackpressure<OutputType>(
- controller: SDReadableStreamDefaultController<OutputType>
-): boolean {
- return !readableStreamDefaultControllerShouldCallPull(controller);
-}
-
-export function readableStreamDefaultControllerCanCloseOrEnqueue<OutputType>(
- controller: SDReadableStreamDefaultController<OutputType>
-): boolean {
- const state = controller[controlledReadableStream_][shared.state_];
- return controller[closeRequested_] === false && state === "readable";
-}
-
-export function readableStreamDefaultControllerGetDesiredSize<OutputType>(
- controller: SDReadableStreamDefaultController<OutputType>
-): number | null {
- const state = controller[controlledReadableStream_][shared.state_];
- if (state === "errored") {
- return null;
- }
- if (state === "closed") {
- return 0;
- }
- return controller[strategyHWM_] - controller[q.queueTotalSize_];
-}
-
-export function readableStreamDefaultControllerClose<OutputType>(
- controller: SDReadableStreamDefaultController<OutputType>
-): void {
- // Assert: !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is true.
- controller[closeRequested_] = true;
- const stream = controller[controlledReadableStream_];
- if (controller[q.queue_].length === 0) {
- readableStreamDefaultControllerClearAlgorithms(controller);
- readableStreamClose(stream);
- }
-}
-
-export function readableStreamDefaultControllerEnqueue<OutputType>(
- controller: SDReadableStreamDefaultController<OutputType>,
- chunk: OutputType
-): void {
- const stream = controller[controlledReadableStream_];
- // Assert: !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is true.
- if (
- isReadableStreamLocked(stream) &&
- readableStreamGetNumReadRequests(stream) > 0
- ) {
- readableStreamFulfillReadRequest(stream, chunk, false);
- } else {
- // Let result be the result of performing controller.[[strategySizeAlgorithm]], passing in chunk,
- // and interpreting the result as an ECMAScript completion value.
- // impl note: assuming that in JS land this just means try/catch with rethrow
- let chunkSize: number;
- try {
- chunkSize = controller[strategySizeAlgorithm_](chunk);
- } catch (error) {
- readableStreamDefaultControllerError(controller, error);
- throw error;
- }
- try {
- q.enqueueValueWithSize(controller, chunk, chunkSize);
- } catch (error) {
- readableStreamDefaultControllerError(controller, error);
- throw error;
- }
- }
- readableStreamDefaultControllerCallPullIfNeeded(controller);
-}
-
-export function readableStreamDefaultControllerError<OutputType>(
- controller: SDReadableStreamDefaultController<OutputType>,
- error: shared.ErrorResult
-): void {
- const stream = controller[controlledReadableStream_];
- if (stream[shared.state_] !== "readable") {
- return;
- }
- q.resetQueue(controller);
- readableStreamDefaultControllerClearAlgorithms(controller);
- readableStreamError(stream, error);
-}
-
-export function readableStreamDefaultControllerCallPullIfNeeded<OutputType>(
- controller: SDReadableStreamDefaultController<OutputType>
-): void {
- if (!readableStreamDefaultControllerShouldCallPull(controller)) {
- return;
- }
- if (controller[pulling_]) {
- controller[pullAgain_] = true;
- return;
- }
- if (controller[pullAgain_]) {
- throw new RangeError("Stream controller is in an invalid state.");
- }
-
- controller[pulling_] = true;
- controller[pullAlgorithm_](controller).then(
- (_) => {
- controller[pulling_] = false;
- if (controller[pullAgain_]) {
- controller[pullAgain_] = false;
- readableStreamDefaultControllerCallPullIfNeeded(controller);
- }
- },
- (error) => {
- readableStreamDefaultControllerError(controller, error);
- }
- );
-}
-
-export function readableStreamDefaultControllerShouldCallPull<OutputType>(
- controller: SDReadableStreamDefaultController<OutputType>
-): boolean {
- const stream = controller[controlledReadableStream_];
- if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) {
- return false;
- }
- if (controller[started_] === false) {
- return false;
- }
- if (
- isReadableStreamLocked(stream) &&
- readableStreamGetNumReadRequests(stream) > 0
- ) {
- return true;
- }
- const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller);
- if (desiredSize === null) {
- throw new RangeError("Stream is in an invalid state.");
- }
- return desiredSize > 0;
-}
-
-export function readableStreamDefaultControllerClearAlgorithms<OutputType>(
- controller: SDReadableStreamDefaultController<OutputType>
-): void {
- controller[pullAlgorithm_] = undefined!;
- controller[cancelAlgorithm_] = undefined!;
- controller[strategySizeAlgorithm_] = undefined!;
-}
-
-// ---- BYOBController
-
-export function setUpReadableByteStreamController(
- stream: SDReadableStream<ArrayBufferView>,
- controller: SDReadableByteStreamController,
- startAlgorithm: StartAlgorithm,
- pullAlgorithm: PullAlgorithm<ArrayBufferView>,
- cancelAlgorithm: CancelAlgorithm,
- highWaterMark: number,
- autoAllocateChunkSize: number | undefined
-): void {
- // Assert: stream.[[readableStreamController]] is undefined.
- if (stream[readableStreamController_] !== undefined) {
- throw new TypeError("Cannot reuse streams");
- }
- if (autoAllocateChunkSize !== undefined) {
- if (
- !shared.isInteger(autoAllocateChunkSize) ||
- autoAllocateChunkSize <= 0
- ) {
- throw new RangeError(
- "autoAllocateChunkSize must be a positive, finite integer"
- );
- }
- }
- // Set controller.[[controlledReadableByteStream]] to stream.
- controller[controlledReadableByteStream_] = stream;
- // Set controller.[[pullAgain]] and controller.[[pulling]] to false.
- controller[pullAgain_] = false;
- controller[pulling_] = false;
- readableByteStreamControllerClearPendingPullIntos(controller);
- q.resetQueue(controller);
- controller[closeRequested_] = false;
- controller[started_] = false;
- controller[strategyHWM_] = shared.validateAndNormalizeHighWaterMark(
- highWaterMark
- );
- controller[pullAlgorithm_] = pullAlgorithm;
- controller[cancelAlgorithm_] = cancelAlgorithm;
- controller[autoAllocateChunkSize_] = autoAllocateChunkSize;
- controller[pendingPullIntos_] = [];
- stream[readableStreamController_] = controller;
-
- // Let startResult be the result of performing startAlgorithm.
- const startResult = startAlgorithm();
- Promise.resolve(startResult).then(
- (_) => {
- controller[started_] = true;
- // Assert: controller.[[pulling]] is false.
- // Assert: controller.[[pullAgain]] is false.
- readableByteStreamControllerCallPullIfNeeded(controller);
- },
- (error) => {
- readableByteStreamControllerError(controller, error);
- }
- );
-}
-
-export function isReadableStreamBYOBRequest(
- value: unknown
-): value is SDReadableStreamBYOBRequest {
- if (typeof value !== "object" || value === null) {
- return false;
- }
- return associatedReadableByteStreamController_ in value;
-}
-
-export function isReadableByteStreamController(
- value: unknown
-): value is SDReadableByteStreamController {
- if (typeof value !== "object" || value === null) {
- return false;
- }
- return controlledReadableByteStream_ in value;
-}
-
-export function readableByteStreamControllerCallPullIfNeeded(
- controller: SDReadableByteStreamController
-): void {
- if (!readableByteStreamControllerShouldCallPull(controller)) {
- return;
- }
- if (controller[pulling_]) {
- controller[pullAgain_] = true;
- return;
- }
- // Assert: controller.[[pullAgain]] is false.
- controller[pulling_] = true;
- controller[pullAlgorithm_](controller).then(
- (_) => {
- controller[pulling_] = false;
- if (controller[pullAgain_]) {
- controller[pullAgain_] = false;
- readableByteStreamControllerCallPullIfNeeded(controller);
- }
- },
- (error) => {
- readableByteStreamControllerError(controller, error);
- }
- );
-}
-
-export function readableByteStreamControllerClearAlgorithms(
- controller: SDReadableByteStreamController
-): void {
- controller[pullAlgorithm_] = undefined!;
- controller[cancelAlgorithm_] = undefined!;
-}
-
-export function readableByteStreamControllerClearPendingPullIntos(
- controller: SDReadableByteStreamController
-): void {
- readableByteStreamControllerInvalidateBYOBRequest(controller);
- controller[pendingPullIntos_] = [];
-}
-
-export function readableByteStreamControllerClose(
- controller: SDReadableByteStreamController
-): void {
- const stream = controller[controlledReadableByteStream_];
- // Assert: controller.[[closeRequested]] is false.
- // Assert: stream.[[state]] is "readable".
- if (controller[q.queueTotalSize_] > 0) {
- controller[closeRequested_] = true;
- return;
- }
- if (controller[pendingPullIntos_].length > 0) {
- const firstPendingPullInto = controller[pendingPullIntos_][0];
- if (firstPendingPullInto.bytesFilled > 0) {
- const error = new TypeError();
- readableByteStreamControllerError(controller, error);
- throw error;
- }
- }
- readableByteStreamControllerClearAlgorithms(controller);
- readableStreamClose(stream);
-}
-
-export function readableByteStreamControllerCommitPullIntoDescriptor(
- stream: SDReadableStream<ArrayBufferView>,
- pullIntoDescriptor: PullIntoDescriptor
-): void {
- // Assert: stream.[[state]] is not "errored".
- let done = false;
- if (stream[shared.state_] === "closed") {
- // Assert: pullIntoDescriptor.[[bytesFilled]] is 0.
- done = true;
- }
- const filledView = readableByteStreamControllerConvertPullIntoDescriptor(
- pullIntoDescriptor
- );
- if (pullIntoDescriptor.readerType === "default") {
- readableStreamFulfillReadRequest(stream, filledView, done);
- } else {
- // Assert: pullIntoDescriptor.[[readerType]] is "byob".
- readableStreamFulfillReadIntoRequest(stream, filledView, done);
- }
-}
-
-export function readableByteStreamControllerConvertPullIntoDescriptor(
- pullIntoDescriptor: PullIntoDescriptor
-): ArrayBufferView {
- const { bytesFilled, elementSize } = pullIntoDescriptor;
- // Assert: bytesFilled <= pullIntoDescriptor.byteLength
- // Assert: bytesFilled mod elementSize is 0
- return new pullIntoDescriptor.ctor(
- pullIntoDescriptor.buffer,
- pullIntoDescriptor.byteOffset,
- bytesFilled / elementSize
- );
-}
-
-export function readableByteStreamControllerEnqueue(
- controller: SDReadableByteStreamController,
- chunk: ArrayBufferView
-): void {
- const stream = controller[controlledReadableByteStream_];
- // Assert: controller.[[closeRequested]] is false.
- // Assert: stream.[[state]] is "readable".
- const { buffer, byteOffset, byteLength } = chunk;
-
- const transferredBuffer = shared.transferArrayBuffer(buffer);
-
- if (readableStreamHasDefaultReader(stream)) {
- if (readableStreamGetNumReadRequests(stream) === 0) {
- readableByteStreamControllerEnqueueChunkToQueue(
- controller,
- transferredBuffer,
- byteOffset,
- byteLength
- );
- } else {
- // Assert: controller.[[queue]] is empty.
- const transferredView = new Uint8Array(
- transferredBuffer,
- byteOffset,
- byteLength
- );
- readableStreamFulfillReadRequest(stream, transferredView, false);
- }
- } else if (readableStreamHasBYOBReader(stream)) {
- readableByteStreamControllerEnqueueChunkToQueue(
- controller,
- transferredBuffer,
- byteOffset,
- byteLength
- );
- readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
- controller
- );
- } else {
- // Assert: !IsReadableStreamLocked(stream) is false.
- readableByteStreamControllerEnqueueChunkToQueue(
- controller,
- transferredBuffer,
- byteOffset,
- byteLength
- );
- }
- readableByteStreamControllerCallPullIfNeeded(controller);
-}
-
-export function readableByteStreamControllerEnqueueChunkToQueue(
- controller: SDReadableByteStreamController,
- buffer: ArrayBufferLike,
- byteOffset: number,
- byteLength: number
-): void {
- controller[q.queue_].push({ buffer, byteOffset, byteLength });
- controller[q.queueTotalSize_] += byteLength;
-}
-
-export function readableByteStreamControllerError(
- controller: SDReadableByteStreamController,
- error: shared.ErrorResult
-): void {
- const stream = controller[controlledReadableByteStream_];
- if (stream[shared.state_] !== "readable") {
- return;
- }
- readableByteStreamControllerClearPendingPullIntos(controller);
- q.resetQueue(controller);
- readableByteStreamControllerClearAlgorithms(controller);
- readableStreamError(stream, error);
-}
-
-export function readableByteStreamControllerFillHeadPullIntoDescriptor(
- controller: SDReadableByteStreamController,
- size: number,
- pullIntoDescriptor: PullIntoDescriptor
-): void {
- // Assert: either controller.[[pendingPullIntos]] is empty, or the first element of controller.[[pendingPullIntos]] is pullIntoDescriptor.
- readableByteStreamControllerInvalidateBYOBRequest(controller);
- pullIntoDescriptor.bytesFilled += size;
-}
-
-export function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
- controller: SDReadableByteStreamController,
- pullIntoDescriptor: PullIntoDescriptor
-): boolean {
- const elementSize = pullIntoDescriptor.elementSize;
- const currentAlignedBytes =
- pullIntoDescriptor.bytesFilled -
- (pullIntoDescriptor.bytesFilled % elementSize);
- const maxBytesToCopy = Math.min(
- controller[q.queueTotalSize_],
- pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled
- );
- const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;
- const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
- let totalBytesToCopyRemaining = maxBytesToCopy;
- let ready = false;
-
- if (maxAlignedBytes > currentAlignedBytes) {
- totalBytesToCopyRemaining =
- maxAlignedBytes - pullIntoDescriptor.bytesFilled;
- ready = true;
- }
- const queue = controller[q.queue_];
-
- while (totalBytesToCopyRemaining > 0) {
- const headOfQueue = queue.front()!;
- const bytesToCopy = Math.min(
- totalBytesToCopyRemaining,
- headOfQueue.byteLength
- );
- const destStart =
- pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
- shared.copyDataBlockBytes(
- pullIntoDescriptor.buffer,
- destStart,
- headOfQueue.buffer,
- headOfQueue.byteOffset,
- bytesToCopy
- );
- if (headOfQueue.byteLength === bytesToCopy) {
- queue.shift();
- } else {
- headOfQueue.byteOffset += bytesToCopy;
- headOfQueue.byteLength -= bytesToCopy;
- }
- controller[q.queueTotalSize_] -= bytesToCopy;
- readableByteStreamControllerFillHeadPullIntoDescriptor(
- controller,
- bytesToCopy,
- pullIntoDescriptor
- );
- totalBytesToCopyRemaining -= bytesToCopy;
- }
- if (!ready) {
- // Assert: controller[queueTotalSize_] === 0
- // Assert: pullIntoDescriptor.bytesFilled > 0
- // Assert: pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize
- }
- return ready;
-}
-
-export function readableByteStreamControllerGetDesiredSize(
- controller: SDReadableByteStreamController
-): number | null {
- const stream = controller[controlledReadableByteStream_];
- const state = stream[shared.state_];
- if (state === "errored") {
- return null;
- }
- if (state === "closed") {
- return 0;
- }
- return controller[strategyHWM_] - controller[q.queueTotalSize_];
-}
-
-export function readableByteStreamControllerHandleQueueDrain(
- controller: SDReadableByteStreamController
-): void {
- // Assert: controller.[[controlledReadableByteStream]].[[state]] is "readable".
- if (controller[q.queueTotalSize_] === 0 && controller[closeRequested_]) {
- readableByteStreamControllerClearAlgorithms(controller);
- readableStreamClose(controller[controlledReadableByteStream_]);
- } else {
- readableByteStreamControllerCallPullIfNeeded(controller);
- }
-}
-
-export function readableByteStreamControllerInvalidateBYOBRequest(
- controller: SDReadableByteStreamController
-): void {
- const byobRequest = controller[byobRequest_];
- if (byobRequest === undefined) {
- return;
- }
- byobRequest[associatedReadableByteStreamController_] = undefined;
- byobRequest[view_] = undefined;
- controller[byobRequest_] = undefined;
-}
-
-export function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
- controller: SDReadableByteStreamController
-): void {
- // Assert: controller.[[closeRequested]] is false.
- const pendingPullIntos = controller[pendingPullIntos_];
- while (pendingPullIntos.length > 0) {
- if (controller[q.queueTotalSize_] === 0) {
- return;
- }
- const pullIntoDescriptor = pendingPullIntos[0];
- if (
- readableByteStreamControllerFillPullIntoDescriptorFromQueue(
- controller,
- pullIntoDescriptor
- )
- ) {
- readableByteStreamControllerShiftPendingPullInto(controller);
- readableByteStreamControllerCommitPullIntoDescriptor(
- controller[controlledReadableByteStream_],
- pullIntoDescriptor
- );
- }
- }
-}
-
-export function readableByteStreamControllerPullInto(
- controller: SDReadableByteStreamController,
- view: ArrayBufferView,
- forAuthorCode: boolean
-): Promise<IteratorResult<ArrayBufferView, any>> {
- const stream = controller[controlledReadableByteStream_];
-
- const elementSize = (view as Uint8Array).BYTES_PER_ELEMENT || 1; // DataView exposes this in Webkit as 1, is not present in FF or Blink
- const ctor = view.constructor as Uint8ArrayConstructor; // the typecast here is just for TS typing, it does not influence buffer creation
-
- const byteOffset = view.byteOffset;
- const byteLength = view.byteLength;
- const buffer = shared.transferArrayBuffer(view.buffer);
- const pullIntoDescriptor: PullIntoDescriptor = {
- buffer,
- byteOffset,
- byteLength,
- bytesFilled: 0,
- elementSize,
- ctor,
- readerType: "byob",
- };
-
- if (controller[pendingPullIntos_].length > 0) {
- controller[pendingPullIntos_].push(pullIntoDescriptor);
- return readableStreamAddReadIntoRequest(stream, forAuthorCode);
- }
- if (stream[shared.state_] === "closed") {
- const emptyView = new ctor(
- pullIntoDescriptor.buffer,
- pullIntoDescriptor.byteOffset,
- 0
- );
- return Promise.resolve(
- readableStreamCreateReadResult(emptyView, true, forAuthorCode)
- );
- }
-
- if (controller[q.queueTotalSize_] > 0) {
- if (
- readableByteStreamControllerFillPullIntoDescriptorFromQueue(
- controller,
- pullIntoDescriptor
- )
- ) {
- const filledView = readableByteStreamControllerConvertPullIntoDescriptor(
- pullIntoDescriptor
- );
- readableByteStreamControllerHandleQueueDrain(controller);
- return Promise.resolve(
- readableStreamCreateReadResult(filledView, false, forAuthorCode)
- );
- }
- if (controller[closeRequested_]) {
- const error = new TypeError();
- readableByteStreamControllerError(controller, error);
- return Promise.reject(error);
- }
- }
-
- controller[pendingPullIntos_].push(pullIntoDescriptor);
- const promise = readableStreamAddReadIntoRequest(stream, forAuthorCode);
- readableByteStreamControllerCallPullIfNeeded(controller);
- return promise;
-}
-
-export function readableByteStreamControllerRespond(
- controller: SDReadableByteStreamController,
- bytesWritten: number
-): void {
- bytesWritten = Number(bytesWritten);
- if (!shared.isFiniteNonNegativeNumber(bytesWritten)) {
- throw new RangeError("bytesWritten must be a finite, non-negative number");
- }
- // Assert: controller.[[pendingPullIntos]] is not empty.
- readableByteStreamControllerRespondInternal(controller, bytesWritten);
-}
-
-export function readableByteStreamControllerRespondInClosedState(
- controller: SDReadableByteStreamController,
- firstDescriptor: PullIntoDescriptor
-): void {
- firstDescriptor.buffer = shared.transferArrayBuffer(firstDescriptor.buffer);
- // Assert: firstDescriptor.[[bytesFilled]] is 0.
- const stream = controller[controlledReadableByteStream_];
- if (readableStreamHasBYOBReader(stream)) {
- while (readableStreamGetNumReadIntoRequests(stream) > 0) {
- const pullIntoDescriptor = readableByteStreamControllerShiftPendingPullInto(
- controller
- )!;
- readableByteStreamControllerCommitPullIntoDescriptor(
- stream,
- pullIntoDescriptor
- );
- }
- }
-}
-
-export function readableByteStreamControllerRespondInReadableState(
- controller: SDReadableByteStreamController,
- bytesWritten: number,
- pullIntoDescriptor: PullIntoDescriptor
-): void {
- if (
- pullIntoDescriptor.bytesFilled + bytesWritten >
- pullIntoDescriptor.byteLength
- ) {
- throw new RangeError();
- }
- readableByteStreamControllerFillHeadPullIntoDescriptor(
- controller,
- bytesWritten,
- pullIntoDescriptor
- );
- if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {
- return;
- }
- readableByteStreamControllerShiftPendingPullInto(controller);
- const remainderSize =
- pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize;
- if (remainderSize > 0) {
- const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
- const remainder = shared.cloneArrayBuffer(
- pullIntoDescriptor.buffer,
- end - remainderSize,
- remainderSize,
- ArrayBuffer
- );
- readableByteStreamControllerEnqueueChunkToQueue(
- controller,
- remainder,
- 0,
- remainder.byteLength
- );
- }
- pullIntoDescriptor.buffer = shared.transferArrayBuffer(
- pullIntoDescriptor.buffer
- );
- pullIntoDescriptor.bytesFilled =
- pullIntoDescriptor.bytesFilled - remainderSize;
- readableByteStreamControllerCommitPullIntoDescriptor(
- controller[controlledReadableByteStream_],
- pullIntoDescriptor
- );
- readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);
-}
-
-export function readableByteStreamControllerRespondInternal(
- controller: SDReadableByteStreamController,
- bytesWritten: number
-): void {
- const firstDescriptor = controller[pendingPullIntos_][0];
- const stream = controller[controlledReadableByteStream_];
- if (stream[shared.state_] === "closed") {
- if (bytesWritten !== 0) {
- throw new TypeError();
- }
- readableByteStreamControllerRespondInClosedState(
- controller,
- firstDescriptor
- );
- } else {
- // Assert: stream.[[state]] is "readable".
- readableByteStreamControllerRespondInReadableState(
- controller,
- bytesWritten,
- firstDescriptor
- );
- }
- readableByteStreamControllerCallPullIfNeeded(controller);
-}
-
-export function readableByteStreamControllerRespondWithNewView(
- controller: SDReadableByteStreamController,
- view: ArrayBufferView
-): void {
- // Assert: controller.[[pendingPullIntos]] is not empty.
- const firstDescriptor = controller[pendingPullIntos_][0];
- if (
- firstDescriptor.byteOffset + firstDescriptor.bytesFilled !==
- view.byteOffset
- ) {
- throw new RangeError();
- }
- if (firstDescriptor.byteLength !== view.byteLength) {
- throw new RangeError();
- }
- firstDescriptor.buffer = view.buffer;
- readableByteStreamControllerRespondInternal(controller, view.byteLength);
-}
-
-export function readableByteStreamControllerShiftPendingPullInto(
- controller: SDReadableByteStreamController
-): PullIntoDescriptor | undefined {
- const descriptor = controller[pendingPullIntos_].shift();
- readableByteStreamControllerInvalidateBYOBRequest(controller);
- return descriptor;
-}
-
-export function readableByteStreamControllerShouldCallPull(
- controller: SDReadableByteStreamController
-): boolean {
- // Let stream be controller.[[controlledReadableByteStream]].
- const stream = controller[controlledReadableByteStream_];
- if (stream[shared.state_] !== "readable") {
- return false;
- }
- if (controller[closeRequested_]) {
- return false;
- }
- if (!controller[started_]) {
- return false;
- }
- if (
- readableStreamHasDefaultReader(stream) &&
- readableStreamGetNumReadRequests(stream) > 0
- ) {
- return true;
- }
- if (
- readableStreamHasBYOBReader(stream) &&
- readableStreamGetNumReadIntoRequests(stream) > 0
- ) {
- return true;
- }
- const desiredSize = readableByteStreamControllerGetDesiredSize(controller);
- // Assert: desiredSize is not null.
- return desiredSize! > 0;
-}
-
-export function setUpReadableStreamBYOBRequest(
- request: SDReadableStreamBYOBRequest,
- controller: SDReadableByteStreamController,
- view: ArrayBufferView
-): void {
- if (!isReadableByteStreamController(controller)) {
- throw new TypeError();
- }
- if (!ArrayBuffer.isView(view)) {
- throw new TypeError();
- }
- // Assert: !IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
-
- request[associatedReadableByteStreamController_] = controller;
- request[view_] = view;
-}