summaryrefslogtreecommitdiff
path: root/cli/js/streams/readable-internals.ts
diff options
context:
space:
mode:
authorNick Stott <nick@nickstott.com>2019-10-28 12:41:36 -0400
committerRy Dahl <ry@tinyclouds.org>2019-10-28 12:41:36 -0400
commit65d9286203cf239f68c6015818e82e8521e600a1 (patch)
tree0af1a7be449036f2f4ae9d3ecf06b7d645c8bddc /cli/js/streams/readable-internals.ts
parent967c236fa5fb1e87e1b5ee788fe77d3a07361da1 (diff)
Re-enable basic stream support for fetch bodies (#3192)
* Add sd-streams from https://github.com/stardazed/sd-streams/blob/master/packages/streams/src/ * change the interfaces in dom_types to match what sd-streams expects
Diffstat (limited to 'cli/js/streams/readable-internals.ts')
-rw-r--r--cli/js/streams/readable-internals.ts1357
1 files changed, 1357 insertions, 0 deletions
diff --git a/cli/js/streams/readable-internals.ts b/cli/js/streams/readable-internals.ts
new file mode 100644
index 000000000..36f4223d7
--- /dev/null
+++ b/cli/js/streams/readable-internals.ts
@@ -0,0 +1,1357 @@
+// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+/**
+ * streams/readable-internals - internal types and functions for readable streams
+ * Part of Stardazed
+ * (c) 2018-Present by Arthur Langereis - @zenmumbler
+ * https://github.com/stardazed/sd-streams
+ */
+
+/* eslint-disable @typescript-eslint/no-explicit-any */
+// TODO reenable this lint here
+
+import * as shared from "./shared-internals.ts";
+import * as q from "./queue-mixin.ts";
+import {
+ QueuingStrategy,
+ QueuingStrategySizeCallback,
+ UnderlyingSource,
+ UnderlyingByteSource
+} from "../dom_types.ts";
+
+// ReadableStreamDefaultController
+export const controlledReadableStream_ = Symbol("controlledReadableStream_");
+export const pullAlgorithm_ = Symbol("pullAlgorithm_");
+export const cancelAlgorithm_ = Symbol("cancelAlgorithm_");
+export const strategySizeAlgorithm_ = Symbol("strategySizeAlgorithm_");
+export const strategyHWM_ = Symbol("strategyHWM_");
+export const started_ = Symbol("started_");
+export const closeRequested_ = Symbol("closeRequested_");
+export const pullAgain_ = Symbol("pullAgain_");
+export const pulling_ = Symbol("pulling_");
+export const cancelSteps_ = Symbol("cancelSteps_");
+export const pullSteps_ = Symbol("pullSteps_");
+
+// ReadableByteStreamController
+export const autoAllocateChunkSize_ = Symbol("autoAllocateChunkSize_");
+export const byobRequest_ = Symbol("byobRequest_");
+export const controlledReadableByteStream_ = Symbol(
+ "controlledReadableByteStream_"
+);
+export const pendingPullIntos_ = Symbol("pendingPullIntos_");
+
+// ReadableStreamDefaultReader
+export const closedPromise_ = Symbol("closedPromise_");
+export const ownerReadableStream_ = Symbol("ownerReadableStream_");
+export const readRequests_ = Symbol("readRequests_");
+export const readIntoRequests_ = Symbol("readIntoRequests_");
+
+// ReadableStreamBYOBRequest
+export const associatedReadableByteStreamController_ = Symbol(
+ "associatedReadableByteStreamController_"
+);
+export const view_ = Symbol("view_");
+
+// ReadableStreamBYOBReader
+
+// ReadableStream
+export const reader_ = Symbol("reader_");
+export const readableStreamController_ = Symbol("readableStreamController_");
+
+export type StartFunction<OutputType> = (
+ controller: SDReadableStreamControllerBase<OutputType>
+) => void | PromiseLike<void>;
+export type StartAlgorithm = () => Promise<void> | void;
+export type PullFunction<OutputType> = (
+ controller: SDReadableStreamControllerBase<OutputType>
+) => void | PromiseLike<void>;
+export type PullAlgorithm<OutputType> = (
+ controller: SDReadableStreamControllerBase<OutputType>
+) => PromiseLike<void>;
+export type CancelAlgorithm = (reason?: shared.ErrorResult) => Promise<void>;
+
+// ----
+
+export interface SDReadableStreamControllerBase<OutputType> {
+ readonly desiredSize: number | null;
+ close(): void;
+ error(e?: shared.ErrorResult): void;
+
+ [cancelSteps_](reason: shared.ErrorResult): Promise<void>;
+ [pullSteps_](forAuthorCode: boolean): Promise<IteratorResult<OutputType>>;
+}
+
+export interface SDReadableStreamBYOBRequest {
+ readonly view: ArrayBufferView;
+ respond(bytesWritten: number): void;
+ respondWithNewView(view: ArrayBufferView): void;
+
+ [associatedReadableByteStreamController_]:
+ | SDReadableByteStreamController
+ | undefined;
+ [view_]: ArrayBufferView | undefined;
+}
+
+interface ArrayBufferViewCtor {
+ new (
+ buffer: ArrayBufferLike,
+ byteOffset?: number,
+ byteLength?: number
+ ): ArrayBufferView;
+}
+
+export interface PullIntoDescriptor {
+ readerType: "default" | "byob";
+ ctor: ArrayBufferViewCtor;
+ buffer: ArrayBufferLike;
+ byteOffset: number;
+ byteLength: number;
+ bytesFilled: number;
+ elementSize: number;
+}
+
+export interface SDReadableByteStreamController
+ extends SDReadableStreamControllerBase<ArrayBufferView>,
+ q.ByteQueueContainer {
+ readonly byobRequest: SDReadableStreamBYOBRequest | undefined;
+ enqueue(chunk: ArrayBufferView): void;
+
+ [autoAllocateChunkSize_]: number | undefined; // A positive integer, when the automatic buffer allocation feature is enabled. In that case, this value specifies the size of buffer to allocate. It is undefined otherwise.
+ [byobRequest_]: SDReadableStreamBYOBRequest | undefined; // A ReadableStreamBYOBRequest instance representing the current BYOB pull request
+ [cancelAlgorithm_]: CancelAlgorithm; // A promise-returning algorithm, taking one argument (the cancel reason), which communicates a requested cancelation to the underlying source
+ [closeRequested_]: boolean; // A boolean flag indicating whether the stream has been closed by its underlying byte source, but still has chunks in its internal queue that have not yet been read
+ [controlledReadableByteStream_]: SDReadableStream<ArrayBufferView>; // The ReadableStream instance controlled
+ [pullAgain_]: boolean; // A boolean flag set to true if the stream’s mechanisms requested a call to the underlying byte source’s pull() method to pull more data, but the pull could not yet be done since a previous call is still executing
+ [pullAlgorithm_]: PullAlgorithm<ArrayBufferView>; // A promise-returning algorithm that pulls data from the underlying source
+ [pulling_]: boolean; // A boolean flag set to true while the underlying byte source’s pull() method is executing and has not yet fulfilled, used to prevent reentrant calls
+ [pendingPullIntos_]: PullIntoDescriptor[]; // A List of descriptors representing pending BYOB pull requests
+ [started_]: boolean; // A boolean flag indicating whether the underlying source has finished starting
+ [strategyHWM_]: number; // A number supplied to the constructor as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying byte source
+}
+
+export interface SDReadableStreamDefaultController<OutputType>
+ extends SDReadableStreamControllerBase<OutputType>,
+ q.QueueContainer<OutputType> {
+ enqueue(chunk?: OutputType): void;
+
+ [controlledReadableStream_]: SDReadableStream<OutputType>;
+ [pullAlgorithm_]: PullAlgorithm<OutputType>;
+ [cancelAlgorithm_]: CancelAlgorithm;
+ [strategySizeAlgorithm_]: QueuingStrategySizeCallback<OutputType>;
+ [strategyHWM_]: number;
+
+ [started_]: boolean;
+ [closeRequested_]: boolean;
+ [pullAgain_]: boolean;
+ [pulling_]: boolean;
+}
+
+// ----
+
+export interface SDReadableStreamReader<OutputType> {
+ readonly closed: Promise<void>;
+ cancel(reason: shared.ErrorResult): Promise<void>;
+ releaseLock(): void;
+
+ [ownerReadableStream_]: SDReadableStream<OutputType> | undefined;
+ [closedPromise_]: shared.ControlledPromise<void>;
+}
+
+export interface ReadRequest<V> extends shared.ControlledPromise<V> {
+ forAuthorCode: boolean;
+}
+
+export declare class SDReadableStreamDefaultReader<OutputType>
+ implements SDReadableStreamReader<OutputType> {
+ constructor(stream: SDReadableStream<OutputType>);
+
+ readonly closed: Promise<void>;
+ cancel(reason: shared.ErrorResult): Promise<void>;
+ releaseLock(): void;
+ read(): Promise<IteratorResult<OutputType | undefined>>;
+
+ [ownerReadableStream_]: SDReadableStream<OutputType> | undefined;
+ [closedPromise_]: shared.ControlledPromise<void>;
+ [readRequests_]: Array<ReadRequest<IteratorResult<OutputType>>>;
+}
+
+export declare class SDReadableStreamBYOBReader
+ implements SDReadableStreamReader<ArrayBufferView> {
+ constructor(stream: SDReadableStream<ArrayBufferView>);
+
+ readonly closed: Promise<void>;
+ cancel(reason: shared.ErrorResult): Promise<void>;
+ releaseLock(): void;
+ read(view: ArrayBufferView): Promise<IteratorResult<ArrayBufferView>>;
+
+ [ownerReadableStream_]: SDReadableStream<ArrayBufferView> | undefined;
+ [closedPromise_]: shared.ControlledPromise<void>;
+ [readIntoRequests_]: Array<ReadRequest<IteratorResult<ArrayBufferView>>>;
+}
+
+/* TODO reenable this when we add WritableStreams and Transforms
+export interface GenericTransformStream<InputType, OutputType> {
+ readable: SDReadableStream<OutputType>;
+ writable: ws.WritableStream<InputType>;
+}
+*/
+
+export type ReadableStreamState = "readable" | "closed" | "errored";
+
+export declare class SDReadableStream<OutputType> {
+ constructor(
+ underlyingSource: UnderlyingByteSource,
+ strategy?: { highWaterMark?: number; size?: undefined }
+ );
+ constructor(
+ underlyingSource?: UnderlyingSource<OutputType>,
+ strategy?: QueuingStrategy<OutputType>
+ );
+
+ readonly locked: boolean;
+ cancel(reason?: shared.ErrorResult): Promise<void>;
+ getReader(): SDReadableStreamReader<OutputType>;
+ getReader(options: { mode: "byob" }): SDReadableStreamBYOBReader;
+ tee(): Array<SDReadableStream<OutputType>>;
+
+ /* TODO reenable these methods when we bring in writableStreams and transport types
+ pipeThrough<ResultType>(
+ transform: GenericTransformStream<OutputType, ResultType>,
+ options?: PipeOptions
+ ): SDReadableStream<ResultType>;
+ pipeTo(
+ dest: ws.WritableStream<OutputType>,
+ options?: PipeOptions
+ ): Promise<void>;
+ */
+ [shared.state_]: ReadableStreamState;
+ [shared.storedError_]: shared.ErrorResult;
+ [reader_]: SDReadableStreamReader<OutputType> | undefined;
+ [readableStreamController_]: SDReadableStreamControllerBase<OutputType>;
+}
+
+// ---- Stream
+
+export function initializeReadableStream<OutputType>(
+ stream: SDReadableStream<OutputType>
+): void {
+ stream[shared.state_] = "readable";
+ stream[reader_] = undefined;
+ stream[shared.storedError_] = undefined;
+ stream[readableStreamController_] = undefined!; // mark slot as used for brand check
+}
+
+export function isReadableStream(
+ value: unknown
+): value is SDReadableStream<any> {
+ if (typeof value !== "object" || value === null) {
+ return false;
+ }
+ return readableStreamController_ in value;
+}
+
+export function isReadableStreamLocked<OutputType>(
+ stream: SDReadableStream<OutputType>
+): boolean {
+ return stream[reader_] !== undefined;
+}
+
+export function readableStreamGetNumReadIntoRequests<OutputType>(
+ stream: SDReadableStream<OutputType>
+): number | undefined {
+ // TODO remove the "as unknown" cast
+ // This is in to workaround a compiler error
+ // error TS2352: Conversion of type 'SDReadableStreamReader<OutputType>' to type 'SDReadableStreamBYOBReader' may be a mistake because neither type sufficiently overlaps with the other. If this was intentional, convert the expression to 'unknown' first.
+ // Type 'SDReadableStreamReader<OutputType>' is missing the following properties from type 'SDReadableStreamBYOBReader': read, [readIntoRequests_]
+ const reader = (stream[reader_] as unknown) as SDReadableStreamBYOBReader;
+ if (reader === undefined) {
+ return 0;
+ }
+ return reader[readIntoRequests_].length;
+}
+
+export function readableStreamGetNumReadRequests<OutputType>(
+ stream: SDReadableStream<OutputType>
+): number {
+ const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;
+ if (reader === undefined) {
+ return 0;
+ }
+ return reader[readRequests_].length;
+}
+
+export function readableStreamCreateReadResult<T>(
+ value: T,
+ done: boolean,
+ forAuthorCode: boolean
+): IteratorResult<T> {
+ const prototype = forAuthorCode ? Object.prototype : null;
+ const result = Object.create(prototype);
+ result.value = value;
+ result.done = done;
+ return result;
+}
+
+export function readableStreamAddReadIntoRequest(
+ stream: SDReadableStream<ArrayBufferView>,
+ forAuthorCode: boolean
+): Promise<IteratorResult<ArrayBufferView, any>> {
+ // Assert: ! IsReadableStreamBYOBReader(stream.[[reader]]) is true.
+ // Assert: stream.[[state]] is "readable" or "closed".
+ const reader = stream[reader_] as SDReadableStreamBYOBReader;
+ const conProm = shared.createControlledPromise<
+ IteratorResult<ArrayBufferView>
+ >() as ReadRequest<IteratorResult<ArrayBufferView>>;
+ conProm.forAuthorCode = forAuthorCode;
+ reader[readIntoRequests_].push(conProm);
+ return conProm.promise;
+}
+
+export function readableStreamAddReadRequest<OutputType>(
+ stream: SDReadableStream<OutputType>,
+ forAuthorCode: boolean
+): Promise<IteratorResult<OutputType, any>> {
+ // Assert: ! IsReadableStreamDefaultReader(stream.[[reader]]) is true.
+ // Assert: stream.[[state]] is "readable".
+ const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;
+ const conProm = shared.createControlledPromise<
+ IteratorResult<OutputType>
+ >() as ReadRequest<IteratorResult<OutputType>>;
+ conProm.forAuthorCode = forAuthorCode;
+ reader[readRequests_].push(conProm);
+ return conProm.promise;
+}
+
+export function readableStreamHasBYOBReader<OutputType>(
+ stream: SDReadableStream<OutputType>
+): boolean {
+ const reader = stream[reader_];
+ return isReadableStreamBYOBReader(reader);
+}
+
+export function readableStreamHasDefaultReader<OutputType>(
+ stream: SDReadableStream<OutputType>
+): boolean {
+ const reader = stream[reader_];
+ return isReadableStreamDefaultReader(reader);
+}
+
+export function readableStreamCancel<OutputType>(
+ stream: SDReadableStream<OutputType>,
+ reason: shared.ErrorResult
+): Promise<undefined> {
+ if (stream[shared.state_] === "closed") {
+ return Promise.resolve(undefined);
+ }
+ if (stream[shared.state_] === "errored") {
+ return Promise.reject(stream[shared.storedError_]);
+ }
+ readableStreamClose(stream);
+
+ const sourceCancelPromise = stream[readableStreamController_][cancelSteps_](
+ reason
+ );
+ return sourceCancelPromise.then(_ => undefined);
+}
+
+export function readableStreamClose<OutputType>(
+ stream: SDReadableStream<OutputType>
+): void {
+ // Assert: stream.[[state]] is "readable".
+ stream[shared.state_] = "closed";
+ const reader = stream[reader_];
+ if (reader === undefined) {
+ return;
+ }
+
+ if (isReadableStreamDefaultReader(reader)) {
+ for (const readRequest of reader[readRequests_]) {
+ readRequest.resolve(
+ readableStreamCreateReadResult(
+ undefined,
+ true,
+ readRequest.forAuthorCode
+ )
+ );
+ }
+ reader[readRequests_] = [];
+ }
+ reader[closedPromise_].resolve();
+ reader[closedPromise_].promise.catch(() => {});
+}
+
+export function readableStreamError<OutputType>(
+ stream: SDReadableStream<OutputType>,
+ error: shared.ErrorResult
+): void {
+ if (stream[shared.state_] !== "readable") {
+ throw new RangeError("Stream is in an invalid state");
+ }
+ stream[shared.state_] = "errored";
+ stream[shared.storedError_] = error;
+
+ const reader = stream[reader_];
+ if (reader === undefined) {
+ return;
+ }
+ if (isReadableStreamDefaultReader(reader)) {
+ for (const readRequest of reader[readRequests_]) {
+ readRequest.reject(error);
+ }
+ reader[readRequests_] = [];
+ } else {
+ // Assert: IsReadableStreamBYOBReader(reader).
+ // TODO remove the "as unknown" cast
+ const readIntoRequests = ((reader as unknown) as SDReadableStreamBYOBReader)[
+ readIntoRequests_
+ ];
+ for (const readIntoRequest of readIntoRequests) {
+ readIntoRequest.reject(error);
+ }
+ // TODO remove the "as unknown" cast
+ ((reader as unknown) as SDReadableStreamBYOBReader)[readIntoRequests_] = [];
+ }
+
+ reader[closedPromise_].reject(error);
+}
+
+// ---- Readers
+
+export function isReadableStreamDefaultReader(
+ reader: unknown
+): reader is SDReadableStreamDefaultReader<any> {
+ if (typeof reader !== "object" || reader === null) {
+ return false;
+ }
+ return readRequests_ in reader;
+}
+
+export function isReadableStreamBYOBReader(
+ reader: unknown
+): reader is SDReadableStreamBYOBReader {
+ if (typeof reader !== "object" || reader === null) {
+ return false;
+ }
+ return readIntoRequests_ in reader;
+}
+
+export function readableStreamReaderGenericInitialize<OutputType>(
+ reader: SDReadableStreamReader<OutputType>,
+ stream: SDReadableStream<OutputType>
+): void {
+ reader[ownerReadableStream_] = stream;
+ stream[reader_] = reader;
+ const streamState = stream[shared.state_];
+
+ reader[closedPromise_] = shared.createControlledPromise<void>();
+ if (streamState === "readable") {
+ // leave as is
+ } else if (streamState === "closed") {
+ reader[closedPromise_].resolve(undefined);
+ } else {
+ reader[closedPromise_].reject(stream[shared.storedError_]);
+ reader[closedPromise_].promise.catch(() => {});
+ }
+}
+
+export function readableStreamReaderGenericRelease<OutputType>(
+ reader: SDReadableStreamReader<OutputType>
+): void {
+ // Assert: reader.[[ownerReadableStream]] is not undefined.
+ // Assert: reader.[[ownerReadableStream]].[[reader]] is reader.
+ const stream = reader[ownerReadableStream_];
+ if (stream === undefined) {
+ throw new TypeError("Reader is in an inconsistent state");
+ }
+
+ if (stream[shared.state_] === "readable") {
+ // code moved out
+ } else {
+ reader[closedPromise_] = shared.createControlledPromise<void>();
+ }
+ reader[closedPromise_].reject(new TypeError());
+ reader[closedPromise_].promise.catch(() => {});
+
+ stream[reader_] = undefined;
+ reader[ownerReadableStream_] = undefined;
+}
+
+export function readableStreamBYOBReaderRead(
+ reader: SDReadableStreamBYOBReader,
+ view: ArrayBufferView,
+ forAuthorCode = false
+): Promise<IteratorResult<ArrayBufferView, any>> {
+ const stream = reader[ownerReadableStream_]!;
+ // Assert: stream is not undefined.
+
+ if (stream[shared.state_] === "errored") {
+ return Promise.reject(stream[shared.storedError_]);
+ }
+ return readableByteStreamControllerPullInto(
+ stream[readableStreamController_] as SDReadableByteStreamController,
+ view,
+ forAuthorCode
+ );
+}
+
+export function readableStreamDefaultReaderRead<OutputType>(
+ reader: SDReadableStreamDefaultReader<OutputType>,
+ forAuthorCode = false
+): Promise<IteratorResult<OutputType | undefined>> {
+ const stream = reader[ownerReadableStream_]!;
+ // Assert: stream is not undefined.
+
+ if (stream[shared.state_] === "closed") {
+ return Promise.resolve(
+ readableStreamCreateReadResult(undefined, true, forAuthorCode)
+ );
+ }
+ if (stream[shared.state_] === "errored") {
+ return Promise.reject(stream[shared.storedError_]);
+ }
+ // Assert: stream.[[state]] is "readable".
+ return stream[readableStreamController_][pullSteps_](forAuthorCode);
+}
+
+export function readableStreamFulfillReadIntoRequest<OutputType>(
+ stream: SDReadableStream<OutputType>,
+ chunk: ArrayBufferView,
+ done: boolean
+): void {
+ // TODO remove the "as unknown" cast
+ const reader = (stream[reader_] as unknown) as SDReadableStreamBYOBReader;
+ const readIntoRequest = reader[readIntoRequests_].shift()!; // <-- length check done in caller
+ readIntoRequest.resolve(
+ readableStreamCreateReadResult(chunk, done, readIntoRequest.forAuthorCode)
+ );
+}
+
+export function readableStreamFulfillReadRequest<OutputType>(
+ stream: SDReadableStream<OutputType>,
+ chunk: OutputType,
+ done: boolean
+): void {
+ const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;
+ const readRequest = reader[readRequests_].shift()!; // <-- length check done in caller
+ readRequest.resolve(
+ readableStreamCreateReadResult(chunk, done, readRequest.forAuthorCode)
+ );
+}
+
+// ---- DefaultController
+
+export function setUpReadableStreamDefaultController<OutputType>(
+ stream: SDReadableStream<OutputType>,
+ controller: SDReadableStreamDefaultController<OutputType>,
+ startAlgorithm: StartAlgorithm,
+ pullAlgorithm: PullAlgorithm<OutputType>,
+ cancelAlgorithm: CancelAlgorithm,
+ highWaterMark: number,
+ sizeAlgorithm: QueuingStrategySizeCallback<OutputType>
+): void {
+ // Assert: stream.[[readableStreamController]] is undefined.
+ controller[controlledReadableStream_] = stream;
+ q.resetQueue(controller);
+ controller[started_] = false;
+ controller[closeRequested_] = false;
+ controller[pullAgain_] = false;
+ controller[pulling_] = false;
+ controller[strategySizeAlgorithm_] = sizeAlgorithm;
+ controller[strategyHWM_] = highWaterMark;
+ controller[pullAlgorithm_] = pullAlgorithm;
+ controller[cancelAlgorithm_] = cancelAlgorithm;
+ stream[readableStreamController_] = controller;
+
+ const startResult = startAlgorithm();
+ Promise.resolve(startResult).then(
+ _ => {
+ controller[started_] = true;
+ // Assert: controller.[[pulling]] is false.
+ // Assert: controller.[[pullAgain]] is false.
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+ },
+ error => {
+ readableStreamDefaultControllerError(controller, error);
+ }
+ );
+}
+
+export function isReadableStreamDefaultController(
+ value: unknown
+): value is SDReadableStreamDefaultController<any> {
+ if (typeof value !== "object" || value === null) {
+ return false;
+ }
+ return controlledReadableStream_ in value;
+}
+
+export function readableStreamDefaultControllerHasBackpressure<OutputType>(
+ controller: SDReadableStreamDefaultController<OutputType>
+): boolean {
+ return !readableStreamDefaultControllerShouldCallPull(controller);
+}
+
+export function readableStreamDefaultControllerCanCloseOrEnqueue<OutputType>(
+ controller: SDReadableStreamDefaultController<OutputType>
+): boolean {
+ const state = controller[controlledReadableStream_][shared.state_];
+ return controller[closeRequested_] === false && state === "readable";
+}
+
+export function readableStreamDefaultControllerGetDesiredSize<OutputType>(
+ controller: SDReadableStreamDefaultController<OutputType>
+): number | null {
+ const state = controller[controlledReadableStream_][shared.state_];
+ if (state === "errored") {
+ return null;
+ }
+ if (state === "closed") {
+ return 0;
+ }
+ return controller[strategyHWM_] - controller[q.queueTotalSize_];
+}
+
+export function readableStreamDefaultControllerClose<OutputType>(
+ controller: SDReadableStreamDefaultController<OutputType>
+): void {
+ // Assert: !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is true.
+ controller[closeRequested_] = true;
+ const stream = controller[controlledReadableStream_];
+ if (controller[q.queue_].length === 0) {
+ readableStreamDefaultControllerClearAlgorithms(controller);
+ readableStreamClose(stream);
+ }
+}
+
+export function readableStreamDefaultControllerEnqueue<OutputType>(
+ controller: SDReadableStreamDefaultController<OutputType>,
+ chunk: OutputType
+): void {
+ const stream = controller[controlledReadableStream_];
+ // Assert: !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is true.
+ if (
+ isReadableStreamLocked(stream) &&
+ readableStreamGetNumReadRequests(stream) > 0
+ ) {
+ readableStreamFulfillReadRequest(stream, chunk, false);
+ } else {
+ // Let result be the result of performing controller.[[strategySizeAlgorithm]], passing in chunk,
+ // and interpreting the result as an ECMAScript completion value.
+ // impl note: assuming that in JS land this just means try/catch with rethrow
+ let chunkSize: number;
+ try {
+ chunkSize = controller[strategySizeAlgorithm_](chunk);
+ } catch (error) {
+ readableStreamDefaultControllerError(controller, error);
+ throw error;
+ }
+ try {
+ q.enqueueValueWithSize(controller, chunk, chunkSize);
+ } catch (error) {
+ readableStreamDefaultControllerError(controller, error);
+ throw error;
+ }
+ }
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+}
+
+export function readableStreamDefaultControllerError<OutputType>(
+ controller: SDReadableStreamDefaultController<OutputType>,
+ error: shared.ErrorResult
+): void {
+ const stream = controller[controlledReadableStream_];
+ if (stream[shared.state_] !== "readable") {
+ return;
+ }
+ q.resetQueue(controller);
+ readableStreamDefaultControllerClearAlgorithms(controller);
+ readableStreamError(stream, error);
+}
+
+export function readableStreamDefaultControllerCallPullIfNeeded<OutputType>(
+ controller: SDReadableStreamDefaultController<OutputType>
+): void {
+ if (!readableStreamDefaultControllerShouldCallPull(controller)) {
+ return;
+ }
+ if (controller[pulling_]) {
+ controller[pullAgain_] = true;
+ return;
+ }
+ if (controller[pullAgain_]) {
+ throw new RangeError("Stream controller is in an invalid state.");
+ }
+
+ controller[pulling_] = true;
+ controller[pullAlgorithm_](controller).then(
+ _ => {
+ controller[pulling_] = false;
+ if (controller[pullAgain_]) {
+ controller[pullAgain_] = false;
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+ }
+ },
+ error => {
+ readableStreamDefaultControllerError(controller, error);
+ }
+ );
+}
+
+export function readableStreamDefaultControllerShouldCallPull<OutputType>(
+ controller: SDReadableStreamDefaultController<OutputType>
+): boolean {
+ const stream = controller[controlledReadableStream_];
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) {
+ return false;
+ }
+ if (controller[started_] === false) {
+ return false;
+ }
+ if (
+ isReadableStreamLocked(stream) &&
+ readableStreamGetNumReadRequests(stream) > 0
+ ) {
+ return true;
+ }
+ const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller);
+ if (desiredSize === null) {
+ throw new RangeError("Stream is in an invalid state.");
+ }
+ return desiredSize > 0;
+}
+
+export function readableStreamDefaultControllerClearAlgorithms<OutputType>(
+ controller: SDReadableStreamDefaultController<OutputType>
+): void {
+ controller[pullAlgorithm_] = undefined!;
+ controller[cancelAlgorithm_] = undefined!;
+ controller[strategySizeAlgorithm_] = undefined!;
+}
+
+// ---- BYOBController
+
+export function setUpReadableByteStreamController(
+ stream: SDReadableStream<ArrayBufferView>,
+ controller: SDReadableByteStreamController,
+ startAlgorithm: StartAlgorithm,
+ pullAlgorithm: PullAlgorithm<ArrayBufferView>,
+ cancelAlgorithm: CancelAlgorithm,
+ highWaterMark: number,
+ autoAllocateChunkSize: number | undefined
+): void {
+ // Assert: stream.[[readableStreamController]] is undefined.
+ if (stream[readableStreamController_] !== undefined) {
+ throw new TypeError("Cannot reuse streams");
+ }
+ if (autoAllocateChunkSize !== undefined) {
+ if (
+ !shared.isInteger(autoAllocateChunkSize) ||
+ autoAllocateChunkSize <= 0
+ ) {
+ throw new RangeError(
+ "autoAllocateChunkSize must be a positive, finite integer"
+ );
+ }
+ }
+ // Set controller.[[controlledReadableByteStream]] to stream.
+ controller[controlledReadableByteStream_] = stream;
+ // Set controller.[[pullAgain]] and controller.[[pulling]] to false.
+ controller[pullAgain_] = false;
+ controller[pulling_] = false;
+ readableByteStreamControllerClearPendingPullIntos(controller);
+ q.resetQueue(controller);
+ controller[closeRequested_] = false;
+ controller[started_] = false;
+ controller[strategyHWM_] = shared.validateAndNormalizeHighWaterMark(
+ highWaterMark
+ );
+ controller[pullAlgorithm_] = pullAlgorithm;
+ controller[cancelAlgorithm_] = cancelAlgorithm;
+ controller[autoAllocateChunkSize_] = autoAllocateChunkSize;
+ controller[pendingPullIntos_] = [];
+ stream[readableStreamController_] = controller;
+
+ // Let startResult be the result of performing startAlgorithm.
+ const startResult = startAlgorithm();
+ Promise.resolve(startResult).then(
+ _ => {
+ controller[started_] = true;
+ // Assert: controller.[[pulling]] is false.
+ // Assert: controller.[[pullAgain]] is false.
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ },
+ error => {
+ readableByteStreamControllerError(controller, error);
+ }
+ );
+}
+
+export function isReadableStreamBYOBRequest(
+ value: unknown
+): value is SDReadableStreamBYOBRequest {
+ if (typeof value !== "object" || value === null) {
+ return false;
+ }
+ return associatedReadableByteStreamController_ in value;
+}
+
+export function isReadableByteStreamController(
+ value: unknown
+): value is SDReadableByteStreamController {
+ if (typeof value !== "object" || value === null) {
+ return false;
+ }
+ return controlledReadableByteStream_ in value;
+}
+
+export function readableByteStreamControllerCallPullIfNeeded(
+ controller: SDReadableByteStreamController
+): void {
+ if (!readableByteStreamControllerShouldCallPull(controller)) {
+ return;
+ }
+ if (controller[pulling_]) {
+ controller[pullAgain_] = true;
+ return;
+ }
+ // Assert: controller.[[pullAgain]] is false.
+ controller[pulling_] = true;
+ controller[pullAlgorithm_](controller).then(
+ _ => {
+ controller[pulling_] = false;
+ if (controller[pullAgain_]) {
+ controller[pullAgain_] = false;
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ }
+ },
+ error => {
+ readableByteStreamControllerError(controller, error);
+ }
+ );
+}
+
+export function readableByteStreamControllerClearAlgorithms(
+ controller: SDReadableByteStreamController
+): void {
+ controller[pullAlgorithm_] = undefined!;
+ controller[cancelAlgorithm_] = undefined!;
+}
+
+export function readableByteStreamControllerClearPendingPullIntos(
+ controller: SDReadableByteStreamController
+): void {
+ readableByteStreamControllerInvalidateBYOBRequest(controller);
+ controller[pendingPullIntos_] = [];
+}
+
+export function readableByteStreamControllerClose(
+ controller: SDReadableByteStreamController
+): void {
+ const stream = controller[controlledReadableByteStream_];
+ // Assert: controller.[[closeRequested]] is false.
+ // Assert: stream.[[state]] is "readable".
+ if (controller[q.queueTotalSize_] > 0) {
+ controller[closeRequested_] = true;
+ return;
+ }
+ if (controller[pendingPullIntos_].length > 0) {
+ const firstPendingPullInto = controller[pendingPullIntos_][0];
+ if (firstPendingPullInto.bytesFilled > 0) {
+ const error = new TypeError();
+ readableByteStreamControllerError(controller, error);
+ throw error;
+ }
+ }
+ readableByteStreamControllerClearAlgorithms(controller);
+ readableStreamClose(stream);
+}
+
+export function readableByteStreamControllerCommitPullIntoDescriptor(
+ stream: SDReadableStream<ArrayBufferView>,
+ pullIntoDescriptor: PullIntoDescriptor
+): void {
+ // Assert: stream.[[state]] is not "errored".
+ let done = false;
+ if (stream[shared.state_] === "closed") {
+ // Assert: pullIntoDescriptor.[[bytesFilled]] is 0.
+ done = true;
+ }
+ const filledView = readableByteStreamControllerConvertPullIntoDescriptor(
+ pullIntoDescriptor
+ );
+ if (pullIntoDescriptor.readerType === "default") {
+ readableStreamFulfillReadRequest(stream, filledView, done);
+ } else {
+ // Assert: pullIntoDescriptor.[[readerType]] is "byob".
+ readableStreamFulfillReadIntoRequest(stream, filledView, done);
+ }
+}
+
+export function readableByteStreamControllerConvertPullIntoDescriptor(
+ pullIntoDescriptor: PullIntoDescriptor
+): ArrayBufferView {
+ const { bytesFilled, elementSize } = pullIntoDescriptor;
+ // Assert: bytesFilled <= pullIntoDescriptor.byteLength
+ // Assert: bytesFilled mod elementSize is 0
+ return new pullIntoDescriptor.ctor(
+ pullIntoDescriptor.buffer,
+ pullIntoDescriptor.byteOffset,
+ bytesFilled / elementSize
+ );
+}
+
+export function readableByteStreamControllerEnqueue(
+ controller: SDReadableByteStreamController,
+ chunk: ArrayBufferView
+): void {
+ const stream = controller[controlledReadableByteStream_];
+ // Assert: controller.[[closeRequested]] is false.
+ // Assert: stream.[[state]] is "readable".
+ const { buffer, byteOffset, byteLength } = chunk;
+
+ const transferredBuffer = shared.transferArrayBuffer(buffer);
+
+ if (readableStreamHasDefaultReader(stream)) {
+ if (readableStreamGetNumReadRequests(stream) === 0) {
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ transferredBuffer,
+ byteOffset,
+ byteLength
+ );
+ } else {
+ // Assert: controller.[[queue]] is empty.
+ const transferredView = new Uint8Array(
+ transferredBuffer,
+ byteOffset,
+ byteLength
+ );
+ readableStreamFulfillReadRequest(stream, transferredView, false);
+ }
+ } else if (readableStreamHasBYOBReader(stream)) {
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ transferredBuffer,
+ byteOffset,
+ byteLength
+ );
+ readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
+ controller
+ );
+ } else {
+ // Assert: !IsReadableStreamLocked(stream) is false.
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ transferredBuffer,
+ byteOffset,
+ byteLength
+ );
+ }
+ readableByteStreamControllerCallPullIfNeeded(controller);
+}
+
+export function readableByteStreamControllerEnqueueChunkToQueue(
+ controller: SDReadableByteStreamController,
+ buffer: ArrayBufferLike,
+ byteOffset: number,
+ byteLength: number
+): void {
+ controller[q.queue_].push({ buffer, byteOffset, byteLength });
+ controller[q.queueTotalSize_] += byteLength;
+}
+
+export function readableByteStreamControllerError(
+ controller: SDReadableByteStreamController,
+ error: shared.ErrorResult
+): void {
+ const stream = controller[controlledReadableByteStream_];
+ if (stream[shared.state_] !== "readable") {
+ return;
+ }
+ readableByteStreamControllerClearPendingPullIntos(controller);
+ q.resetQueue(controller);
+ readableByteStreamControllerClearAlgorithms(controller);
+ readableStreamError(stream, error);
+}
+
+export function readableByteStreamControllerFillHeadPullIntoDescriptor(
+ controller: SDReadableByteStreamController,
+ size: number,
+ pullIntoDescriptor: PullIntoDescriptor
+): void {
+ // Assert: either controller.[[pendingPullIntos]] is empty, or the first element of controller.[[pendingPullIntos]] is pullIntoDescriptor.
+ readableByteStreamControllerInvalidateBYOBRequest(controller);
+ pullIntoDescriptor.bytesFilled += size;
+}
+
+export function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
+ controller: SDReadableByteStreamController,
+ pullIntoDescriptor: PullIntoDescriptor
+): boolean {
+ const elementSize = pullIntoDescriptor.elementSize;
+ const currentAlignedBytes =
+ pullIntoDescriptor.bytesFilled -
+ (pullIntoDescriptor.bytesFilled % elementSize);
+ const maxBytesToCopy = Math.min(
+ controller[q.queueTotalSize_],
+ pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled
+ );
+ const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;
+ const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
+ let totalBytesToCopyRemaining = maxBytesToCopy;
+ let ready = false;
+
+ if (maxAlignedBytes > currentAlignedBytes) {
+ totalBytesToCopyRemaining =
+ maxAlignedBytes - pullIntoDescriptor.bytesFilled;
+ ready = true;
+ }
+ const queue = controller[q.queue_];
+
+ while (totalBytesToCopyRemaining > 0) {
+ const headOfQueue = queue.front()!;
+ const bytesToCopy = Math.min(
+ totalBytesToCopyRemaining,
+ headOfQueue.byteLength
+ );
+ const destStart =
+ pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
+ shared.copyDataBlockBytes(
+ pullIntoDescriptor.buffer,
+ destStart,
+ headOfQueue.buffer,
+ headOfQueue.byteOffset,
+ bytesToCopy
+ );
+ if (headOfQueue.byteLength === bytesToCopy) {
+ queue.shift();
+ } else {
+ headOfQueue.byteOffset += bytesToCopy;
+ headOfQueue.byteLength -= bytesToCopy;
+ }
+ controller[q.queueTotalSize_] -= bytesToCopy;
+ readableByteStreamControllerFillHeadPullIntoDescriptor(
+ controller,
+ bytesToCopy,
+ pullIntoDescriptor
+ );
+ totalBytesToCopyRemaining -= bytesToCopy;
+ }
+ if (!ready) {
+ // Assert: controller[queueTotalSize_] === 0
+ // Assert: pullIntoDescriptor.bytesFilled > 0
+ // Assert: pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize
+ }
+ return ready;
+}
+
+export function readableByteStreamControllerGetDesiredSize(
+ controller: SDReadableByteStreamController
+): number | null {
+ const stream = controller[controlledReadableByteStream_];
+ const state = stream[shared.state_];
+ if (state === "errored") {
+ return null;
+ }
+ if (state === "closed") {
+ return 0;
+ }
+ return controller[strategyHWM_] - controller[q.queueTotalSize_];
+}
+
+export function readableByteStreamControllerHandleQueueDrain(
+ controller: SDReadableByteStreamController
+): void {
+ // Assert: controller.[[controlledReadableByteStream]].[[state]] is "readable".
+ if (controller[q.queueTotalSize_] === 0 && controller[closeRequested_]) {
+ readableByteStreamControllerClearAlgorithms(controller);
+ readableStreamClose(controller[controlledReadableByteStream_]);
+ } else {
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ }
+}
+
+export function readableByteStreamControllerInvalidateBYOBRequest(
+ controller: SDReadableByteStreamController
+): void {
+ const byobRequest = controller[byobRequest_];
+ if (byobRequest === undefined) {
+ return;
+ }
+ byobRequest[associatedReadableByteStreamController_] = undefined;
+ byobRequest[view_] = undefined;
+ controller[byobRequest_] = undefined;
+}
+
+export function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
+ controller: SDReadableByteStreamController
+): void {
+ // Assert: controller.[[closeRequested]] is false.
+ const pendingPullIntos = controller[pendingPullIntos_];
+ while (pendingPullIntos.length > 0) {
+ if (controller[q.queueTotalSize_] === 0) {
+ return;
+ }
+ const pullIntoDescriptor = pendingPullIntos[0];
+ if (
+ readableByteStreamControllerFillPullIntoDescriptorFromQueue(
+ controller,
+ pullIntoDescriptor
+ )
+ ) {
+ readableByteStreamControllerShiftPendingPullInto(controller);
+ readableByteStreamControllerCommitPullIntoDescriptor(
+ controller[controlledReadableByteStream_],
+ pullIntoDescriptor
+ );
+ }
+ }
+}
+
+export function readableByteStreamControllerPullInto(
+ controller: SDReadableByteStreamController,
+ view: ArrayBufferView,
+ forAuthorCode: boolean
+): Promise<IteratorResult<ArrayBufferView, any>> {
+ const stream = controller[controlledReadableByteStream_];
+
+ const elementSize = (view as Uint8Array).BYTES_PER_ELEMENT || 1; // DataView exposes this in Webkit as 1, is not present in FF or Blink
+ const ctor = view.constructor as Uint8ArrayConstructor; // the typecast here is just for TS typing, it does not influence buffer creation
+
+ const byteOffset = view.byteOffset;
+ const byteLength = view.byteLength;
+ const buffer = shared.transferArrayBuffer(view.buffer);
+ const pullIntoDescriptor: PullIntoDescriptor = {
+ buffer,
+ byteOffset,
+ byteLength,
+ bytesFilled: 0,
+ elementSize,
+ ctor,
+ readerType: "byob"
+ };
+
+ if (controller[pendingPullIntos_].length > 0) {
+ controller[pendingPullIntos_].push(pullIntoDescriptor);
+ return readableStreamAddReadIntoRequest(stream, forAuthorCode);
+ }
+ if (stream[shared.state_] === "closed") {
+ const emptyView = new ctor(
+ pullIntoDescriptor.buffer,
+ pullIntoDescriptor.byteOffset,
+ 0
+ );
+ return Promise.resolve(
+ readableStreamCreateReadResult(emptyView, true, forAuthorCode)
+ );
+ }
+
+ if (controller[q.queueTotalSize_] > 0) {
+ if (
+ readableByteStreamControllerFillPullIntoDescriptorFromQueue(
+ controller,
+ pullIntoDescriptor
+ )
+ ) {
+ const filledView = readableByteStreamControllerConvertPullIntoDescriptor(
+ pullIntoDescriptor
+ );
+ readableByteStreamControllerHandleQueueDrain(controller);
+ return Promise.resolve(
+ readableStreamCreateReadResult(filledView, false, forAuthorCode)
+ );
+ }
+ if (controller[closeRequested_]) {
+ const error = new TypeError();
+ readableByteStreamControllerError(controller, error);
+ return Promise.reject(error);
+ }
+ }
+
+ controller[pendingPullIntos_].push(pullIntoDescriptor);
+ const promise = readableStreamAddReadIntoRequest(stream, forAuthorCode);
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ return promise;
+}
+
+export function readableByteStreamControllerRespond(
+ controller: SDReadableByteStreamController,
+ bytesWritten: number
+): void {
+ bytesWritten = Number(bytesWritten);
+ if (!shared.isFiniteNonNegativeNumber(bytesWritten)) {
+ throw new RangeError("bytesWritten must be a finite, non-negative number");
+ }
+ // Assert: controller.[[pendingPullIntos]] is not empty.
+ readableByteStreamControllerRespondInternal(controller, bytesWritten);
+}
+
+export function readableByteStreamControllerRespondInClosedState(
+ controller: SDReadableByteStreamController,
+ firstDescriptor: PullIntoDescriptor
+): void {
+ firstDescriptor.buffer = shared.transferArrayBuffer(firstDescriptor.buffer);
+ // Assert: firstDescriptor.[[bytesFilled]] is 0.
+ const stream = controller[controlledReadableByteStream_];
+ if (readableStreamHasBYOBReader(stream)) {
+ while (readableStreamGetNumReadIntoRequests(stream) > 0) {
+ const pullIntoDescriptor = readableByteStreamControllerShiftPendingPullInto(
+ controller
+ )!;
+ readableByteStreamControllerCommitPullIntoDescriptor(
+ stream,
+ pullIntoDescriptor
+ );
+ }
+ }
+}
+
+export function readableByteStreamControllerRespondInReadableState(
+ controller: SDReadableByteStreamController,
+ bytesWritten: number,
+ pullIntoDescriptor: PullIntoDescriptor
+): void {
+ if (
+ pullIntoDescriptor.bytesFilled + bytesWritten >
+ pullIntoDescriptor.byteLength
+ ) {
+ throw new RangeError();
+ }
+ readableByteStreamControllerFillHeadPullIntoDescriptor(
+ controller,
+ bytesWritten,
+ pullIntoDescriptor
+ );
+ if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {
+ return;
+ }
+ readableByteStreamControllerShiftPendingPullInto(controller);
+ const remainderSize =
+ pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize;
+ if (remainderSize > 0) {
+ const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
+ const remainder = shared.cloneArrayBuffer(
+ pullIntoDescriptor.buffer,
+ end - remainderSize,
+ remainderSize,
+ ArrayBuffer
+ );
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ remainder,
+ 0,
+ remainder.byteLength
+ );
+ }
+ pullIntoDescriptor.buffer = shared.transferArrayBuffer(
+ pullIntoDescriptor.buffer
+ );
+ pullIntoDescriptor.bytesFilled =
+ pullIntoDescriptor.bytesFilled - remainderSize;
+ readableByteStreamControllerCommitPullIntoDescriptor(
+ controller[controlledReadableByteStream_],
+ pullIntoDescriptor
+ );
+ readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);
+}
+
+export function readableByteStreamControllerRespondInternal(
+ controller: SDReadableByteStreamController,
+ bytesWritten: number
+): void {
+ const firstDescriptor = controller[pendingPullIntos_][0];
+ const stream = controller[controlledReadableByteStream_];
+ if (stream[shared.state_] === "closed") {
+ if (bytesWritten !== 0) {
+ throw new TypeError();
+ }
+ readableByteStreamControllerRespondInClosedState(
+ controller,
+ firstDescriptor
+ );
+ } else {
+ // Assert: stream.[[state]] is "readable".
+ readableByteStreamControllerRespondInReadableState(
+ controller,
+ bytesWritten,
+ firstDescriptor
+ );
+ }
+ readableByteStreamControllerCallPullIfNeeded(controller);
+}
+
+export function readableByteStreamControllerRespondWithNewView(
+ controller: SDReadableByteStreamController,
+ view: ArrayBufferView
+): void {
+ // Assert: controller.[[pendingPullIntos]] is not empty.
+ const firstDescriptor = controller[pendingPullIntos_][0];
+ if (
+ firstDescriptor.byteOffset + firstDescriptor.bytesFilled !==
+ view.byteOffset
+ ) {
+ throw new RangeError();
+ }
+ if (firstDescriptor.byteLength !== view.byteLength) {
+ throw new RangeError();
+ }
+ firstDescriptor.buffer = view.buffer;
+ readableByteStreamControllerRespondInternal(controller, view.byteLength);
+}
+
+export function readableByteStreamControllerShiftPendingPullInto(
+ controller: SDReadableByteStreamController
+): PullIntoDescriptor | undefined {
+ const descriptor = controller[pendingPullIntos_].shift();
+ readableByteStreamControllerInvalidateBYOBRequest(controller);
+ return descriptor;
+}
+
+export function readableByteStreamControllerShouldCallPull(
+ controller: SDReadableByteStreamController
+): boolean {
+ // Let stream be controller.[[controlledReadableByteStream]].
+ const stream = controller[controlledReadableByteStream_];
+ if (stream[shared.state_] !== "readable") {
+ return false;
+ }
+ if (controller[closeRequested_]) {
+ return false;
+ }
+ if (!controller[started_]) {
+ return false;
+ }
+ if (
+ readableStreamHasDefaultReader(stream) &&
+ readableStreamGetNumReadRequests(stream) > 0
+ ) {
+ return true;
+ }
+ if (
+ readableStreamHasBYOBReader(stream) &&
+ readableStreamGetNumReadIntoRequests(stream) > 0
+ ) {
+ return true;
+ }
+ const desiredSize = readableByteStreamControllerGetDesiredSize(controller);
+ // Assert: desiredSize is not null.
+ return desiredSize! > 0;
+}
+
+export function setUpReadableStreamBYOBRequest(
+ request: SDReadableStreamBYOBRequest,
+ controller: SDReadableByteStreamController,
+ view: ArrayBufferView
+): void {
+ if (!isReadableByteStreamController(controller)) {
+ throw new TypeError();
+ }
+ if (!ArrayBuffer.isView(view)) {
+ throw new TypeError();
+ }
+ // Assert: !IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
+
+ request[associatedReadableByteStreamController_] = controller;
+ request[view_] = view;
+}