summaryrefslogtreecommitdiff
path: root/cli/js/web/streams/internals.ts
diff options
context:
space:
mode:
authorKitson Kelly <me@kitsonkelly.com>2020-04-23 00:06:51 +1000
committerGitHub <noreply@github.com>2020-04-22 10:06:51 -0400
commit8bcfc03d71cbd2cfd7ab68035ec0968d9f93b5b8 (patch)
treee1769ca51d2afde57ae18eb25b7a91388fcbf00a /cli/js/web/streams/internals.ts
parentb270d6c8d090669601465f8c9c94512d6c6a07d4 (diff)
Rewrite streams (#4842)
Diffstat (limited to 'cli/js/web/streams/internals.ts')
-rw-r--r--cli/js/web/streams/internals.ts1098
1 files changed, 1098 insertions, 0 deletions
diff --git a/cli/js/web/streams/internals.ts b/cli/js/web/streams/internals.ts
new file mode 100644
index 000000000..2559d9e5c
--- /dev/null
+++ b/cli/js/web/streams/internals.ts
@@ -0,0 +1,1098 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+// This code closely follows the WHATWG Stream Specification
+// See: https://streams.spec.whatwg.org/
+//
+// There are some parts that are not fully implemented, and there are some
+// comments which point to steps of the specification that are not implemented.
+//
+
+/* eslint-disable @typescript-eslint/no-explicit-any,require-await */
+import { ReadableByteStreamControllerImpl } from "./readable_byte_stream_controller.ts";
+import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts";
+import { ReadableStreamDefaultReaderImpl } from "./readable_stream_default_reader.ts";
+import { ReadableStreamImpl } from "./readable_stream.ts";
+import * as sym from "./symbols.ts";
+import { cloneValue } from "../util.ts";
+import { assert } from "../../util.ts";
+
+export interface BufferQueueItem extends Pair<ArrayBuffer | SharedArrayBuffer> {
+ offset: number;
+}
+export type CancelAlgorithm = (reason?: any) => PromiseLike<void>;
+type Container<R = any> = {
+ [sym.queue]: Array<Pair<R> | BufferQueueItem>;
+ [sym.queueTotalSize]: number;
+};
+export type Pair<R> = { value: R; size: number };
+export type PullAlgorithm = () => PromiseLike<void>;
+export type SizeAlgorithm<T> = (chunk: T) => number;
+export type StartAlgorithm = () => void | PromiseLike<void>;
+export interface Deferred<T> {
+ promise: Promise<T>;
+ resolve?: (value?: T | PromiseLike<T>) => void;
+ reject?: (reason?: any) => void;
+ settled: boolean;
+}
+
+export interface ReadableStreamGenericReader<R = any>
+ extends ReadableStreamReader<R> {
+ [sym.closedPromise]: Deferred<void>;
+ [sym.forAuthorCode]: boolean;
+ [sym.ownerReadableStream]: ReadableStreamImpl<R>;
+ [sym.readRequests]: Array<Deferred<ReadableStreamReadResult<R>>>;
+}
+
+export interface ReadableStreamAsyncIterator<T = any> extends AsyncIterator<T> {
+ [sym.asyncIteratorReader]: ReadableStreamDefaultReaderImpl<T>;
+ [sym.preventCancel]: boolean;
+ return(value?: any | PromiseLike<any>): Promise<IteratorResult<T, any>>;
+}
+
+export function acquireReadableStreamDefaultReader<T>(
+ stream: ReadableStreamImpl<T>,
+ forAuthorCode = false
+): ReadableStreamDefaultReaderImpl<T> {
+ const reader = new ReadableStreamDefaultReaderImpl(stream);
+ reader[sym.forAuthorCode] = forAuthorCode;
+ return reader;
+}
+
+function createAlgorithmFromUnderlyingMethod<
+ O extends UnderlyingByteSource | UnderlyingSource,
+ P extends keyof O
+>(
+ underlyingObject: O,
+ methodName: P,
+ algoArgCount: 0,
+ ...extraArgs: any[]
+): () => Promise<void>;
+function createAlgorithmFromUnderlyingMethod<
+ O extends UnderlyingByteSource | UnderlyingSource,
+ P extends keyof O
+>(
+ underlyingObject: O,
+ methodName: P,
+ algoArgCount: 1,
+ ...extraArgs: any[]
+): (arg: any) => Promise<void>;
+function createAlgorithmFromUnderlyingMethod<
+ O extends UnderlyingByteSource | UnderlyingSource,
+ P extends keyof O
+>(
+ underlyingObject: O,
+ methodName: P,
+ algoArgCount: 0 | 1,
+ ...extraArgs: any[]
+): (() => Promise<void>) | ((arg: any) => Promise<void>) {
+ const method = underlyingObject[methodName];
+ if (method) {
+ if (!isCallable(method)) {
+ throw new TypeError("method is not callable");
+ }
+ if (algoArgCount === 0) {
+ return async (): Promise<void> =>
+ method.call(underlyingObject, ...extraArgs);
+ } else {
+ return async (arg: any): Promise<void> => {
+ const fullArgs = [arg, ...extraArgs];
+ return method.call(underlyingObject, ...fullArgs);
+ };
+ }
+ }
+ return async (): Promise<void> => undefined;
+}
+
+function createReadableStream<T>(
+ startAlgorithm: StartAlgorithm,
+ pullAlgorithm: PullAlgorithm,
+ cancelAlgorithm: CancelAlgorithm,
+ highWaterMark = 1,
+ sizeAlgorithm: SizeAlgorithm<T> = (): number => 1
+): ReadableStreamImpl<T> {
+ assert(isNonNegativeNumber(highWaterMark));
+ const stream: ReadableStreamImpl<T> = Object.create(
+ ReadableStreamImpl.prototype
+ );
+ initializeReadableStream(stream);
+ const controller: ReadableStreamDefaultControllerImpl<T> = Object.create(
+ ReadableStreamDefaultControllerImpl.prototype
+ );
+ setUpReadableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ highWaterMark,
+ sizeAlgorithm
+ );
+ return stream;
+}
+
+export function dequeueValue<R>(container: Container<R>): R {
+ assert(sym.queue in container && sym.queueTotalSize in container);
+ assert(container[sym.queue].length);
+ const pair = container[sym.queue].shift()!;
+ container[sym.queueTotalSize] -= pair.size;
+ if (container[sym.queueTotalSize] <= 0) {
+ container[sym.queueTotalSize] = 0;
+ }
+ return pair.value as R;
+}
+
+function enqueueValueWithSize<R>(
+ container: Container<R>,
+ value: R,
+ size: number
+): void {
+ assert(sym.queue in container && sym.queueTotalSize in container);
+ size = Number(size);
+ if (!isFiniteNonNegativeNumber(size)) {
+ throw new RangeError("size must be a finite non-negative number.");
+ }
+ container[sym.queue].push({ value, size });
+ container[sym.queueTotalSize] += size;
+}
+
+/** Non-spec mechanism to "unwrap" a promise and store it to be resolved
+ * later. */
+function getDeferred<T>(): Deferred<T> {
+ let resolve = undefined;
+ let reject = undefined;
+ const promise = new Promise<T>((res, rej) => {
+ resolve = res;
+ reject = rej;
+ });
+ return { promise, resolve, reject, settled: false };
+}
+
+export function initializeReadableStream(stream: ReadableStreamImpl): void {
+ stream[sym.state] = "readable";
+ stream[sym.reader] = stream[sym.storedError] = undefined;
+ stream[sym.disturbed] = false;
+}
+
+function invokeOrNoop<O extends any, P extends keyof O>(
+ o: O,
+ p: P,
+ ...args: Parameters<O[P]>
+): ReturnType<O[P]> | undefined {
+ assert(o);
+ const method = o[p];
+ if (!method) {
+ return undefined;
+ }
+ return method.call(o, ...args);
+}
+
+function isCallable(value: unknown): value is (...args: any) => any {
+ return typeof value === "function";
+}
+
+export function isDetachedBuffer(value: object): boolean {
+ return sym.isFakeDetached in value;
+}
+
+function isFiniteNonNegativeNumber(v: unknown): v is number {
+ if (!isNonNegativeNumber(v)) {
+ return false;
+ }
+ if (v === Infinity) {
+ return false;
+ }
+ return true;
+}
+
+function isNonNegativeNumber(v: unknown): v is number {
+ if (typeof v !== "number") {
+ return false;
+ }
+ if (v === NaN) {
+ return false;
+ }
+ if (v < 0) {
+ return false;
+ }
+ return true;
+}
+
+export function isReadableByteStreamController(
+ x: unknown
+): x is ReadableByteStreamControllerImpl {
+ return typeof x !== "object" ||
+ x === null ||
+ !(sym.controlledReadableByteStream in x)
+ ? false
+ : true;
+}
+
+export function isReadableStream(x: unknown): x is ReadableStreamImpl {
+ return typeof x !== "object" ||
+ x === null ||
+ !(sym.readableStreamController in x)
+ ? false
+ : true;
+}
+
+export function isReadableStreamAsyncIterator(
+ x: unknown
+): x is ReadableStreamAsyncIterator<any> {
+ if (typeof x !== "object" || x === null) {
+ return false;
+ }
+ if (!(sym.asyncIteratorReader in x)) {
+ return false;
+ }
+ return true;
+}
+
+export function isReadableStreamDefaultController(
+ x: unknown
+): x is ReadableStreamDefaultControllerImpl {
+ return typeof x !== "object" ||
+ x === null ||
+ !(sym.controlledReadableStream in x)
+ ? false
+ : true;
+}
+
+export function isReadableStreamDefaultReader<T>(
+ x: unknown
+): x is ReadableStreamDefaultReaderImpl<T> {
+ return typeof x !== "object" || x === null || !(sym.readRequests in x)
+ ? false
+ : true;
+}
+
+export function isReadableStreamLocked(stream: ReadableStreamImpl): boolean {
+ assert(isReadableStream(stream));
+ return stream[sym.reader] ? true : false;
+}
+
+export function isUnderlyingByteSource(
+ underlyingSource: UnderlyingByteSource | UnderlyingSource
+): underlyingSource is UnderlyingByteSource {
+ const { type } = underlyingSource;
+ const typeString = String(type);
+ return typeString === "bytes";
+}
+
+export function makeSizeAlgorithmFromSizeFunction<T>(
+ size: QueuingStrategySizeCallback<T> | undefined
+): SizeAlgorithm<T> {
+ if (size === undefined) {
+ return (): number => 1;
+ }
+ if (typeof size !== "function") {
+ throw new TypeError("size must be callable.");
+ }
+ return (chunk: T): number => {
+ return size.call(undefined, chunk);
+ };
+}
+
+function readableByteStreamControllerShouldCallPull(
+ controller: ReadableByteStreamControllerImpl
+): boolean {
+ const stream = controller[sym.controlledReadableByteStream];
+ if (
+ stream[sym.state] !== "readable" ||
+ controller[sym.closeRequested] ||
+ !controller[sym.started]
+ ) {
+ return false;
+ }
+ if (
+ readableStreamHasDefaultReader(stream) &&
+ readableStreamGetNumReadRequests(stream) > 0
+ ) {
+ return true;
+ }
+ // 3.13.25.6 If ! ReadableStreamHasBYOBReader(stream) is true and !
+ // ReadableStreamGetNumReadIntoRequests(stream) > 0, return true.
+ const desiredSize = readableByteStreamControllerGetDesiredSize(controller);
+ assert(desiredSize !== null);
+ if (desiredSize > 0) {
+ return true;
+ }
+ return false;
+}
+
+export function readableByteStreamControllerCallPullIfNeeded(
+ controller: ReadableByteStreamControllerImpl
+): void {
+ const shouldPull = readableByteStreamControllerShouldCallPull(controller);
+ if (!shouldPull) {
+ return;
+ }
+ if (controller[sym.pulling]) {
+ controller[sym.pullAgain] = true;
+ return;
+ }
+ assert(controller[sym.pullAgain] === false);
+ controller[sym.pulling] = true;
+ const pullPromise = controller[sym.pullAlgorithm]();
+ pullPromise.then(
+ () => {
+ controller[sym.pulling] = false;
+ if (controller[sym.pullAgain]) {
+ controller[sym.pullAgain];
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ }
+ },
+ (e) => {
+ readableByteStreamControllerError(controller, e);
+ }
+ );
+}
+
+export function readableByteStreamControllerClearAlgorithms(
+ controller: ReadableByteStreamControllerImpl
+): void {
+ delete controller[sym.pullAlgorithm];
+ delete controller[sym.cancelAlgorithm];
+}
+
+export function readableByteStreamControllerClose(
+ controller: ReadableByteStreamControllerImpl
+): void {
+ const stream = controller[sym.controlledReadableByteStream];
+ if (controller[sym.closeRequested] || stream[sym.state] !== "readable") {
+ return;
+ }
+ if (controller[sym.queueTotalSize] > 0) {
+ controller[sym.closeRequested] = true;
+ return;
+ }
+ // 3.13.6.4 If controller.[[pendingPullIntos]] is not empty, (BYOB Support)
+ readableByteStreamControllerClearAlgorithms(controller);
+ readableStreamClose(stream);
+}
+
+export function readableByteStreamControllerEnqueue(
+ controller: ReadableByteStreamControllerImpl,
+ chunk: ArrayBufferView
+): void {
+ const stream = controller[sym.controlledReadableByteStream];
+ if (controller[sym.closeRequested] || stream[sym.state] !== "readable") {
+ return;
+ }
+ const { buffer, byteOffset, byteLength } = chunk;
+ const transferredBuffer = transferArrayBuffer(buffer);
+ if (readableStreamHasDefaultReader(stream)) {
+ if (readableStreamGetNumReadRequests(stream) === 0) {
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ transferredBuffer,
+ byteOffset,
+ byteLength
+ );
+ } else {
+ assert(controller[sym.queue].length === 0);
+ const transferredView = new Uint8Array(
+ transferredBuffer,
+ byteOffset,
+ byteLength
+ );
+ readableStreamFulfillReadRequest(stream, transferredView, false);
+ }
+ // 3.13.9.8 Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true
+ } else {
+ assert(!isReadableStreamLocked(stream));
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ transferredBuffer,
+ byteOffset,
+ byteLength
+ );
+ }
+ readableByteStreamControllerCallPullIfNeeded(controller);
+}
+
+function readableByteStreamControllerEnqueueChunkToQueue(
+ controller: ReadableByteStreamControllerImpl,
+ buffer: ArrayBuffer | SharedArrayBuffer,
+ byteOffset: number,
+ byteLength: number
+): void {
+ controller[sym.queue].push({
+ value: buffer,
+ offset: byteOffset,
+ size: byteLength,
+ });
+ controller[sym.queueTotalSize] += byteLength;
+}
+
+export function readableByteStreamControllerError(
+ controller: ReadableByteStreamControllerImpl,
+ e: any
+): void {
+ const stream = controller[sym.controlledReadableByteStream];
+ if (stream[sym.state] !== "readable") {
+ return;
+ }
+ // 3.13.11.3 Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
+ resetQueue(controller);
+ readableByteStreamControllerClearAlgorithms(controller);
+ readableStreamError(stream, e);
+}
+
+export function readableByteStreamControllerGetDesiredSize(
+ controller: ReadableByteStreamControllerImpl
+): number | null {
+ const stream = controller[sym.controlledReadableByteStream];
+ const state = stream[sym.state];
+ if (state === "errored") {
+ return null;
+ }
+ if (state === "closed") {
+ return 0;
+ }
+ return controller[sym.strategyHWM] - controller[sym.queueTotalSize];
+}
+
+export function readableByteStreamControllerHandleQueueDrain(
+ controller: ReadableByteStreamControllerImpl
+): void {
+ assert(
+ controller[sym.controlledReadableByteStream][sym.state] === "readable"
+ );
+ if (controller[sym.queueTotalSize] === 0 && controller[sym.closeRequested]) {
+ readableByteStreamControllerClearAlgorithms(controller);
+ readableStreamClose(controller[sym.controlledReadableByteStream]);
+ } else {
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ }
+}
+
+export function readableStreamAddReadRequest<R>(
+ stream: ReadableStreamImpl<R>
+): Promise<ReadableStreamReadResult<R>> {
+ assert(isReadableStreamDefaultReader(stream[sym.reader]));
+ assert(stream[sym.state] === "readable");
+ const promise = getDeferred<ReadableStreamReadResult<R>>();
+ stream[sym.reader]![sym.readRequests].push(promise);
+ return promise.promise;
+}
+
+export async function readableStreamCancel<T>(
+ stream: ReadableStreamImpl<T>,
+ reason: any
+): Promise<void> {
+ stream[sym.disturbed] = true;
+ if (stream[sym.state] === "closed") {
+ return Promise.resolve();
+ }
+ if (stream[sym.state] === "errored") {
+ return Promise.reject(stream[sym.storedError]);
+ }
+ readableStreamClose(stream);
+ await stream[sym.readableStreamController]![sym.cancelSteps](reason);
+}
+
+export function readableStreamClose<T>(stream: ReadableStreamImpl<T>): void {
+ assert(stream[sym.state] === "readable");
+ stream[sym.state] = "closed";
+ const reader = stream[sym.reader];
+ if (!reader) {
+ return;
+ }
+ if (isReadableStreamDefaultReader<T>(reader)) {
+ for (const readRequest of reader[sym.readRequests]) {
+ assert(readRequest.resolve);
+ readRequest.resolve(
+ readableStreamCreateReadResult<T>(
+ undefined,
+ true,
+ reader[sym.forAuthorCode]
+ )
+ );
+ }
+ reader[sym.readRequests] = [];
+ }
+ const resolve = reader[sym.closedPromise].resolve;
+ assert(resolve);
+ resolve();
+ reader[sym.closedPromise].settled = true;
+}
+
+export function readableStreamCreateReadResult<T>(
+ value: T | undefined,
+ done: boolean,
+ forAuthorCode: boolean
+): ReadableStreamReadResult<T> {
+ const prototype = forAuthorCode ? Object.prototype : null;
+ assert(typeof done === "boolean");
+ const obj: ReadableStreamReadResult<T> = Object.create(prototype);
+ Object.defineProperties(obj, {
+ value: { value, writable: true, enumerable: true, configurable: true },
+ done: { value: done, writable: true, enumerable: true, configurable: true },
+ });
+ return obj;
+}
+
+export function readableStreamDefaultControllerCallPullIfNeeded<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>
+): void {
+ const shouldPull = readableStreamDefaultControllerShouldCallPull(controller);
+ if (!shouldPull) {
+ return;
+ }
+ if (controller[sym.pulling]) {
+ controller[sym.pullAgain] = true;
+ return;
+ }
+ assert(controller[sym.pullAgain] === false);
+ controller[sym.pulling] = true;
+ const pullPromise = controller[sym.pullAlgorithm]();
+ pullPromise.then(
+ () => {
+ controller[sym.pulling] = false;
+ if (controller[sym.pullAgain]) {
+ controller[sym.pullAgain] = false;
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+ }
+ },
+ (e) => {
+ readableStreamDefaultControllerError(controller, e);
+ }
+ );
+}
+
+export function readableStreamDefaultControllerCanCloseOrEnqueue<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>
+): boolean {
+ const state = controller[sym.controlledReadableStream][sym.state];
+ if (!controller[sym.closeRequested] && state === "readable") {
+ return true;
+ }
+ return false;
+}
+
+export function readableStreamDefaultControllerClearAlgorithms<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>
+): void {
+ delete controller[sym.pullAlgorithm];
+ delete controller[sym.cancelAlgorithm];
+ delete controller[sym.strategySizeAlgorithm];
+}
+
+export function readableStreamDefaultControllerClose<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>
+): void {
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) {
+ return;
+ }
+ const stream = controller[sym.controlledReadableStream];
+ controller[sym.closeRequested] = true;
+ if (controller[sym.queue].length === 0) {
+ readableStreamDefaultControllerClearAlgorithms(controller);
+ readableStreamClose(stream);
+ }
+}
+
+export function readableStreamDefaultControllerEnqueue<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>,
+ chunk: T
+): void {
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) {
+ return;
+ }
+ const stream = controller[sym.controlledReadableStream];
+ if (
+ isReadableStreamLocked(stream) &&
+ readableStreamGetNumReadRequests(stream) > 0
+ ) {
+ readableStreamFulfillReadRequest(stream, chunk, false);
+ } else {
+ try {
+ const chunkSize = controller[sym.strategySizeAlgorithm](chunk);
+ enqueueValueWithSize(controller, chunk, chunkSize);
+ } catch (err) {
+ readableStreamDefaultControllerError(controller, err);
+ throw err;
+ }
+ }
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+}
+
+export function readableStreamDefaultControllerGetDesiredSize<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>
+): number | null {
+ const stream = controller[sym.controlledReadableStream];
+ const state = stream[sym.state];
+ if (state === "errored") {
+ return null;
+ }
+ if (state === "closed") {
+ return 0;
+ }
+ return controller[sym.strategyHWM] - controller[sym.queueTotalSize];
+}
+
+export function readableStreamDefaultControllerError<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>,
+ e: any
+): void {
+ const stream = controller[sym.controlledReadableStream];
+ if (stream[sym.state] !== "readable") {
+ return;
+ }
+ resetQueue(controller);
+ readableStreamDefaultControllerClearAlgorithms(controller);
+ readableStreamError(stream, e);
+}
+
+function readableStreamDefaultControllerShouldCallPull<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>
+): boolean {
+ const stream = controller[sym.controlledReadableStream];
+ if (
+ !readableStreamDefaultControllerCanCloseOrEnqueue(controller) ||
+ controller[sym.started] === false
+ ) {
+ return false;
+ }
+ if (
+ isReadableStreamLocked(stream) &&
+ readableStreamGetNumReadRequests(stream) > 0
+ ) {
+ return true;
+ }
+ const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller);
+ assert(desiredSize !== null);
+ if (desiredSize > 0) {
+ return true;
+ }
+ return false;
+}
+
+export function readableStreamDefaultReaderRead<R>(
+ reader: ReadableStreamDefaultReaderImpl<R>
+): Promise<ReadableStreamReadResult<R>> {
+ const stream = reader[sym.ownerReadableStream];
+ assert(stream);
+ stream[sym.disturbed] = true;
+ if (stream[sym.state] === "closed") {
+ return Promise.resolve(
+ readableStreamCreateReadResult<R>(
+ undefined,
+ true,
+ reader[sym.forAuthorCode]
+ )
+ );
+ }
+ if (stream[sym.state] === "errored") {
+ return Promise.reject(stream[sym.storedError]);
+ }
+ assert(stream[sym.state] === "readable");
+ return (stream[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl)[sym.pullSteps]();
+}
+
+export function readableStreamError(stream: ReadableStreamImpl, e: any): void {
+ assert(isReadableStream(stream));
+ assert(stream[sym.state] === "readable");
+ stream[sym.state] = "errored";
+ stream[sym.storedError] = e;
+ const reader = stream[sym.reader];
+ if (reader === undefined) {
+ return;
+ }
+ if (isReadableStreamDefaultReader(reader)) {
+ for (const readRequest of reader[sym.readRequests]) {
+ const { reject } = readRequest;
+ assert(reject);
+ reject(e);
+ }
+ reader[sym.readRequests] = [];
+ }
+ // 3.5.6.8 Otherwise, support BYOB Reader
+ const { reject } = reader[sym.closedPromise];
+ assert(reject);
+ reject(e);
+ reader[sym.closedPromise].settled = true;
+}
+
+export function readableStreamFulfillReadRequest<R>(
+ stream: ReadableStreamImpl<R>,
+ chunk: R,
+ done: boolean
+): void {
+ const reader = stream[sym.reader]!;
+ const readRequest = reader[sym.readRequests].shift()!;
+ assert(readRequest.resolve);
+ readRequest.resolve(
+ readableStreamCreateReadResult(chunk, done, reader[sym.forAuthorCode])
+ );
+}
+
+export function readableStreamGetNumReadRequests(
+ stream: ReadableStreamImpl
+): number {
+ return stream[sym.reader]?.[sym.readRequests].length ?? 0;
+}
+
+export function readableStreamHasDefaultReader(
+ stream: ReadableStreamImpl
+): boolean {
+ const reader = stream[sym.reader];
+ return reader === undefined || !isReadableStreamDefaultReader(reader)
+ ? false
+ : true;
+}
+
+export function readableStreamReaderGenericCancel<R = any>(
+ reader: ReadableStreamGenericReader<R>,
+ reason: any
+): Promise<void> {
+ const stream = reader[sym.ownerReadableStream];
+ assert(stream);
+ return readableStreamCancel(stream, reason);
+}
+
+export function readableStreamReaderGenericInitialize<R = any>(
+ reader: ReadableStreamGenericReader<R>,
+ stream: ReadableStreamImpl<R>
+): void {
+ reader[sym.forAuthorCode] = true;
+ reader[sym.ownerReadableStream] = stream;
+ stream[sym.reader] = reader;
+ if (stream[sym.state] === "readable") {
+ reader[sym.closedPromise] = getDeferred();
+ } else if (stream[sym.state] === "closed") {
+ reader[sym.closedPromise] = {
+ promise: Promise.resolve(),
+ settled: true,
+ };
+ } else {
+ assert(stream[sym.state] === "errored");
+ reader[sym.closedPromise] = {
+ promise: Promise.reject(stream[sym.storedError]),
+ settled: true,
+ };
+ }
+}
+
+export function readableStreamReaderGenericRelease<R = any>(
+ reader: ReadableStreamGenericReader<R>
+): void {
+ assert(reader[sym.ownerReadableStream]);
+ assert(reader[sym.ownerReadableStream][sym.reader] === reader);
+ const closedPromise = reader[sym.closedPromise];
+ if (reader[sym.ownerReadableStream][sym.state] === "readable") {
+ assert(closedPromise.reject);
+ closedPromise.reject(new TypeError("ReadableStream state is readable."));
+ } else {
+ closedPromise.promise = Promise.reject(new TypeError("Reading is closed."));
+ delete closedPromise.reject;
+ delete closedPromise.resolve;
+ }
+ closedPromise.settled = true;
+ delete reader[sym.ownerReadableStream][sym.reader];
+ delete reader[sym.ownerReadableStream];
+}
+
+export function readableStreamTee<T>(
+ stream: ReadableStreamImpl<T>,
+ cloneForBranch2: boolean
+): [ReadableStreamImpl<T>, ReadableStreamImpl<T>] {
+ assert(isReadableStream(stream));
+ assert(typeof cloneForBranch2 === "boolean");
+ const reader = acquireReadableStreamDefaultReader(stream);
+ let reading = false;
+ let canceled1 = false;
+ let canceled2 = false;
+ let reason1: any = undefined;
+ let reason2: any = undefined;
+ /* eslint-disable prefer-const */
+ let branch1: ReadableStreamImpl<T>;
+ let branch2: ReadableStreamImpl<T>;
+ /* eslint-enable prefer-const */
+ const cancelPromise = getDeferred<void>();
+ const pullAlgorithm = (): PromiseLike<void> => {
+ if (reading) {
+ return Promise.resolve();
+ }
+ reading = true;
+ readableStreamDefaultReaderRead(reader).then((result) => {
+ reading = false;
+ assert(typeof result === "object");
+ const { done } = result;
+ assert(typeof done === "boolean");
+ if (done) {
+ if (!canceled1) {
+ readableStreamDefaultControllerClose(
+ branch1[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl
+ );
+ }
+ if (!canceled2) {
+ readableStreamDefaultControllerClose(
+ branch2[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl
+ );
+ }
+ return;
+ }
+ const { value } = result;
+ const value1 = value!;
+ let value2 = value!;
+ if (!canceled2 && cloneForBranch2) {
+ value2 = cloneValue(value2);
+ }
+ if (!canceled1) {
+ readableStreamDefaultControllerEnqueue(
+ branch1[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl,
+ value1
+ );
+ }
+ if (!canceled2) {
+ readableStreamDefaultControllerEnqueue(
+ branch2[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl,
+ value2
+ );
+ }
+ });
+ return Promise.resolve();
+ };
+ const cancel1Algorithm = (reason?: any): PromiseLike<void> => {
+ canceled1 = true;
+ reason1 = reason;
+ if (canceled2) {
+ const compositeReason = [reason1, reason2];
+ const cancelResult = readableStreamCancel(stream, compositeReason);
+ assert(cancelPromise.resolve);
+ cancelPromise.resolve(cancelResult);
+ }
+ return cancelPromise.promise;
+ };
+ const cancel2Algorithm = (reason?: any): PromiseLike<void> => {
+ canceled2 = true;
+ reason2 = reason;
+ if (canceled1) {
+ const compositeReason = [reason1, reason2];
+ const cancelResult = readableStreamCancel(stream, compositeReason);
+ assert(cancelPromise.resolve);
+ cancelPromise.resolve(cancelResult);
+ }
+ return cancelPromise.promise;
+ };
+ const startAlgorithm = (): void => undefined;
+ branch1 = createReadableStream(
+ startAlgorithm,
+ pullAlgorithm,
+ cancel1Algorithm
+ );
+ branch2 = createReadableStream(
+ startAlgorithm,
+ pullAlgorithm,
+ cancel2Algorithm
+ );
+ reader[sym.closedPromise].promise.catch((r) => {
+ readableStreamDefaultControllerError(
+ branch1[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl,
+ r
+ );
+ readableStreamDefaultControllerError(
+ branch2[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl,
+ r
+ );
+ });
+ return [branch1, branch2];
+}
+
+export function resetQueue<R>(container: Container<R>): void {
+ assert(sym.queue in container && sym.queueTotalSize in container);
+ container[sym.queue] = [];
+ container[sym.queueTotalSize] = 0;
+}
+
+function setUpReadableByteStreamController(
+ stream: ReadableStreamImpl,
+ controller: ReadableByteStreamControllerImpl,
+ startAlgorithm: StartAlgorithm,
+ pullAlgorithm: PullAlgorithm,
+ cancelAlgorithm: CancelAlgorithm,
+ highWaterMark: number,
+ autoAllocateChunkSize: number | undefined
+): void {
+ assert(stream[sym.readableStreamController] === undefined);
+ if (autoAllocateChunkSize !== undefined) {
+ assert(Number.isInteger(autoAllocateChunkSize));
+ assert(autoAllocateChunkSize >= 0);
+ }
+ controller[sym.controlledReadableByteStream] = stream;
+ controller[sym.pulling] = controller[sym.pullAgain] = false;
+ controller[sym.byobRequest] = undefined;
+ controller[sym.queue] = [];
+ controller[sym.queueTotalSize] = 0;
+ controller[sym.closeRequested] = controller[sym.started] = false;
+ controller[sym.strategyHWM] = validateAndNormalizeHighWaterMark(
+ highWaterMark
+ );
+ controller[sym.pullAlgorithm] = pullAlgorithm;
+ controller[sym.cancelAlgorithm] = cancelAlgorithm;
+ controller[sym.autoAllocateChunkSize] = autoAllocateChunkSize;
+ // 3.13.26.12 Set controller.[[pendingPullIntos]] to a new empty List.
+ stream[sym.readableStreamController] = controller;
+ const startResult = startAlgorithm();
+ const startPromise = Promise.resolve(startResult);
+ startPromise.then(
+ () => {
+ controller[sym.started] = true;
+ assert(!controller[sym.pulling]);
+ assert(!controller[sym.pullAgain]);
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ },
+ (r) => {
+ readableByteStreamControllerError(controller, r);
+ }
+ );
+}
+
+export function setUpReadableByteStreamControllerFromUnderlyingSource(
+ stream: ReadableStreamImpl,
+ underlyingByteSource: UnderlyingByteSource,
+ highWaterMark: number
+): void {
+ assert(underlyingByteSource);
+ const controller: ReadableByteStreamControllerImpl = Object.create(
+ ReadableByteStreamControllerImpl.prototype
+ );
+ const startAlgorithm: StartAlgorithm = () => {
+ return invokeOrNoop(underlyingByteSource, "start", controller);
+ };
+ const pullAlgorithm = createAlgorithmFromUnderlyingMethod(
+ underlyingByteSource,
+ "pull",
+ 0,
+ controller
+ );
+ const cancelAlgorithm = createAlgorithmFromUnderlyingMethod(
+ underlyingByteSource,
+ "cancel",
+ 1
+ );
+ // 3.13.27.6 Let autoAllocateChunkSize be ? GetV(underlyingByteSource, "autoAllocateChunkSize").
+ const autoAllocateChunkSize = undefined;
+ setUpReadableByteStreamController(
+ stream,
+ controller,
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ highWaterMark,
+ autoAllocateChunkSize
+ );
+}
+
+function setUpReadableStreamDefaultController<T>(
+ stream: ReadableStreamImpl<T>,
+ controller: ReadableStreamDefaultControllerImpl<T>,
+ startAlgorithm: StartAlgorithm,
+ pullAlgorithm: PullAlgorithm,
+ cancelAlgorithm: CancelAlgorithm,
+ highWaterMark: number,
+ sizeAlgorithm: SizeAlgorithm<T>
+): void {
+ assert(stream[sym.readableStreamController] === undefined);
+ controller[sym.controlledReadableStream] = stream;
+ controller[sym.queue] = [];
+ controller[sym.queueTotalSize] = 0;
+ controller[sym.started] = controller[sym.closeRequested] = controller[
+ sym.pullAgain
+ ] = controller[sym.pulling] = false;
+ controller[sym.strategySizeAlgorithm] = sizeAlgorithm;
+ controller[sym.strategyHWM] = highWaterMark;
+ controller[sym.pullAlgorithm] = pullAlgorithm;
+ controller[sym.cancelAlgorithm] = cancelAlgorithm;
+ stream[sym.readableStreamController] = controller;
+ const startResult = startAlgorithm();
+ const startPromise = Promise.resolve(startResult);
+ startPromise.then(
+ () => {
+ controller[sym.started] = true;
+ assert(controller[sym.pulling] === false);
+ assert(controller[sym.pullAgain] === false);
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+ },
+ (r) => {
+ readableStreamDefaultControllerError(controller, r);
+ }
+ );
+}
+
+export function setUpReadableStreamDefaultControllerFromUnderlyingSource<T>(
+ stream: ReadableStreamImpl<T>,
+ underlyingSource: UnderlyingSource<T>,
+ highWaterMark: number,
+ sizeAlgorithm: SizeAlgorithm<T>
+): void {
+ assert(underlyingSource);
+ const controller: ReadableStreamDefaultControllerImpl<T> = Object.create(
+ ReadableStreamDefaultControllerImpl.prototype
+ );
+ const startAlgorithm: StartAlgorithm = (): void | PromiseLike<void> =>
+ invokeOrNoop(underlyingSource, "start", controller);
+ const pullAlgorithm: PullAlgorithm = createAlgorithmFromUnderlyingMethod(
+ underlyingSource,
+ "pull",
+ 0,
+ controller
+ );
+ const cancelAlgorithm: CancelAlgorithm = createAlgorithmFromUnderlyingMethod(
+ underlyingSource,
+ "cancel",
+ 1
+ );
+ setUpReadableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ highWaterMark,
+ sizeAlgorithm
+ );
+}
+
+function transferArrayBuffer(buffer: ArrayBuffer): ArrayBuffer {
+ assert(!isDetachedBuffer(buffer));
+ const transferredIshVersion = buffer.slice(0);
+
+ Object.defineProperty(buffer, "byteLength", {
+ get(): number {
+ return 0;
+ },
+ });
+ (buffer as any)[sym.isFakeDetached] = true;
+
+ return transferredIshVersion;
+}
+
+export function validateAndNormalizeHighWaterMark(
+ highWaterMark: number
+): number {
+ highWaterMark = Number(highWaterMark);
+ if (highWaterMark === NaN || highWaterMark < 0) {
+ throw new RangeError(
+ `highWaterMark must be a positive number or Infinity. Received: ${highWaterMark}.`
+ );
+ }
+ return highWaterMark;
+}
+
+/* eslint-enable */