summaryrefslogtreecommitdiff
path: root/cli/js/web/streams/internals.ts
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2020-07-19 19:49:44 +0200
committerGitHub <noreply@github.com>2020-07-19 19:49:44 +0200
commitfa61956f03491101b6ef64423ea2f1f73af26a73 (patch)
treec3800702071ca78aa4dd71bdd0a59a9bbe460bdd /cli/js/web/streams/internals.ts
parent53adde866dd399aa2509d14508642fce37afb8f5 (diff)
Port internal TS code to JS (#6793)
Co-authored-by: Ryan Dahl <ry@tinyclouds.org>
Diffstat (limited to 'cli/js/web/streams/internals.ts')
-rw-r--r--cli/js/web/streams/internals.ts2405
1 files changed, 0 insertions, 2405 deletions
diff --git a/cli/js/web/streams/internals.ts b/cli/js/web/streams/internals.ts
deleted file mode 100644
index 06c5e304d..000000000
--- a/cli/js/web/streams/internals.ts
+++ /dev/null
@@ -1,2405 +0,0 @@
-// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-
-// This code closely follows the WHATWG Stream Specification
-// See: https://streams.spec.whatwg.org/
-//
-// There are some parts that are not fully implemented, and there are some
-// comments which point to steps of the specification that are not implemented.
-
-/* eslint-disable @typescript-eslint/no-explicit-any,require-await */
-import { ReadableByteStreamControllerImpl } from "./readable_byte_stream_controller.ts";
-import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts";
-import { ReadableStreamDefaultReaderImpl } from "./readable_stream_default_reader.ts";
-import { ReadableStreamImpl } from "./readable_stream.ts";
-import * as sym from "./symbols.ts";
-import type { TransformStreamImpl } from "./transform_stream.ts";
-import { TransformStreamDefaultControllerImpl } from "./transform_stream_default_controller.ts";
-import { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts";
-import { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts";
-import { WritableStreamImpl } from "./writable_stream.ts";
-import { AbortSignalImpl } from "../abort_signal.ts";
-import { DOMExceptionImpl as DOMException } from "../dom_exception.ts";
-import { cloneValue, setFunctionName } from "../util.ts";
-import { assert, AssertionError } from "../../util.ts";
-
-export type AbortAlgorithm = (reason?: any) => PromiseLike<void>;
-export interface AbortRequest {
- promise: Deferred<void>;
- reason?: any;
- wasAlreadyErroring: boolean;
-}
-export interface BufferQueueItem extends Pair<ArrayBuffer | SharedArrayBuffer> {
- offset: number;
-}
-export type CancelAlgorithm = (reason?: any) => PromiseLike<void>;
-export type CloseAlgorithm = () => PromiseLike<void>;
-type Container<R = any> = {
- [sym.queue]: Array<Pair<R> | BufferQueueItem>;
- [sym.queueTotalSize]: number;
-};
-export type FlushAlgorithm = () => Promise<void>;
-export type Pair<R> = { value: R; size: number };
-export type PullAlgorithm = () => PromiseLike<void>;
-export type SizeAlgorithm<T> = (chunk: T) => number;
-export type StartAlgorithm = () => void | PromiseLike<void>;
-export type TransformAlgorithm<I> = (chunk: I) => Promise<void>;
-export type WriteAlgorithm<W> = (chunk: W) => Promise<void>;
-export interface Deferred<T> {
- promise: Promise<T>;
- resolve?: (value?: T | PromiseLike<T>) => void;
- reject?: (reason?: any) => void;
-}
-
-export interface ReadableStreamGenericReader<R = any>
- extends ReadableStreamReader<R> {
- [sym.closedPromise]: Deferred<void>;
- [sym.forAuthorCode]: boolean;
- [sym.ownerReadableStream]: ReadableStreamImpl<R>;
- [sym.readRequests]: Array<Deferred<ReadableStreamReadResult<R>>>;
-}
-
-export interface ReadableStreamAsyncIterator<T = any> extends AsyncIterator<T> {
- [sym.asyncIteratorReader]: ReadableStreamDefaultReaderImpl<T>;
- [sym.preventCancel]: boolean;
- return(value?: any | PromiseLike<any>): Promise<IteratorResult<T>>;
-}
-
-export function acquireReadableStreamDefaultReader<T>(
- stream: ReadableStreamImpl<T>,
- forAuthorCode = false,
-): ReadableStreamDefaultReaderImpl<T> {
- const reader = new ReadableStreamDefaultReaderImpl(stream);
- reader[sym.forAuthorCode] = forAuthorCode;
- return reader;
-}
-
-export function acquireWritableStreamDefaultWriter<W>(
- stream: WritableStreamImpl<W>,
-): WritableStreamDefaultWriterImpl<W> {
- return new WritableStreamDefaultWriterImpl(stream);
-}
-
-export function call<F extends (...args: any[]) => any>(
- fn: F,
- v: ThisType<F>,
- args: Parameters<F>,
-): ReturnType<F> {
- return Function.prototype.apply.call(fn, v, args);
-}
-
-function createAlgorithmFromUnderlyingMethod<
- O extends UnderlyingByteSource | UnderlyingSource | Transformer,
- P extends keyof O,
->(
- underlyingObject: O,
- methodName: P,
- algoArgCount: 0,
- ...extraArgs: any[]
-): () => Promise<void>;
-
-function createAlgorithmFromUnderlyingMethod<
- O extends UnderlyingByteSource | UnderlyingSource | Transformer,
- P extends keyof O,
->(
- underlyingObject: O,
- methodName: P,
- algoArgCount: 1,
- ...extraArgs: any[]
-): (arg: any) => Promise<void>;
-function createAlgorithmFromUnderlyingMethod<
- O extends UnderlyingByteSource | UnderlyingSource | Transformer,
- P extends keyof O,
->(
- underlyingObject: O,
- methodName: P,
- algoArgCount: 0 | 1,
- ...extraArgs: any[]
-): (() => Promise<void>) | ((arg: any) => Promise<void>) {
- const method = underlyingObject[methodName];
- if (method) {
- if (!isCallable(method)) {
- throw new TypeError("method is not callable");
- }
- if (algoArgCount === 0) {
- return async (): Promise<void> =>
- call(method, underlyingObject, extraArgs as any);
- } else {
- return async (arg: any): Promise<void> => {
- const fullArgs = [arg, ...extraArgs];
- return call(method, underlyingObject, fullArgs as any);
- };
- }
- }
- return async (): Promise<void> => undefined;
-}
-
-function createReadableStream<T>(
- startAlgorithm: StartAlgorithm,
- pullAlgorithm: PullAlgorithm,
- cancelAlgorithm: CancelAlgorithm,
- highWaterMark = 1,
- sizeAlgorithm: SizeAlgorithm<T> = (): number => 1,
-): ReadableStreamImpl<T> {
- highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark);
- const stream: ReadableStreamImpl<T> = Object.create(
- ReadableStreamImpl.prototype,
- );
- initializeReadableStream(stream);
- const controller: ReadableStreamDefaultControllerImpl<T> = Object.create(
- ReadableStreamDefaultControllerImpl.prototype,
- );
- setUpReadableStreamDefaultController(
- stream,
- controller,
- startAlgorithm,
- pullAlgorithm,
- cancelAlgorithm,
- highWaterMark,
- sizeAlgorithm,
- );
- return stream;
-}
-
-function createWritableStream<W>(
- startAlgorithm: StartAlgorithm,
- writeAlgorithm: WriteAlgorithm<W>,
- closeAlgorithm: CloseAlgorithm,
- abortAlgorithm: AbortAlgorithm,
- highWaterMark = 1,
- sizeAlgorithm: SizeAlgorithm<W> = (): number => 1,
-): WritableStreamImpl<W> {
- highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark);
- const stream = Object.create(WritableStreamImpl.prototype);
- initializeWritableStream(stream);
- const controller = Object.create(
- WritableStreamDefaultControllerImpl.prototype,
- );
- setUpWritableStreamDefaultController(
- stream,
- controller,
- startAlgorithm,
- writeAlgorithm,
- closeAlgorithm,
- abortAlgorithm,
- highWaterMark,
- sizeAlgorithm,
- );
- return stream;
-}
-
-export function dequeueValue<R>(container: Container<R>): R {
- assert(sym.queue in container && sym.queueTotalSize in container);
- assert(container[sym.queue].length);
- const pair = container[sym.queue].shift()!;
- container[sym.queueTotalSize] -= pair.size;
- if (container[sym.queueTotalSize] <= 0) {
- container[sym.queueTotalSize] = 0;
- }
- return pair.value as R;
-}
-
-function enqueueValueWithSize<R>(
- container: Container<R>,
- value: R,
- size: number,
-): void {
- assert(sym.queue in container && sym.queueTotalSize in container);
- size = Number(size);
- if (!isFiniteNonNegativeNumber(size)) {
- throw new RangeError("size must be a finite non-negative number.");
- }
- container[sym.queue].push({ value, size });
- container[sym.queueTotalSize] += size;
-}
-
-/** Non-spec mechanism to "unwrap" a promise and store it to be resolved
- * later. */
-export function getDeferred<T>(): Required<Deferred<T>> {
- let resolve: (value?: T | PromiseLike<T>) => void;
- let reject: (reason?: any) => void;
- const promise = new Promise<T>((res, rej) => {
- resolve = res;
- reject = rej;
- });
- return { promise, resolve: resolve!, reject: reject! };
-}
-
-export function initializeReadableStream<R>(
- stream: ReadableStreamImpl<R>,
-): void {
- stream[sym.state] = "readable";
- stream[sym.reader] = stream[sym.storedError] = undefined;
- stream[sym.disturbed] = false;
-}
-
-export function initializeTransformStream<I, O>(
- stream: TransformStreamImpl<I, O>,
- startPromise: Promise<void>,
- writableHighWaterMark: number,
- writableSizeAlgorithm: SizeAlgorithm<I>,
- readableHighWaterMark: number,
- readableSizeAlgorithm: SizeAlgorithm<O>,
-): void {
- const startAlgorithm = (): Promise<void> => startPromise;
- const writeAlgorithm = (chunk: any): Promise<void> =>
- transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
- const abortAlgorithm = (reason: any): Promise<void> =>
- transformStreamDefaultSinkAbortAlgorithm(stream, reason);
- const closeAlgorithm = (): Promise<void> =>
- transformStreamDefaultSinkCloseAlgorithm(stream);
- stream[sym.writable] = createWritableStream(
- startAlgorithm,
- writeAlgorithm,
- closeAlgorithm,
- abortAlgorithm,
- writableHighWaterMark,
- writableSizeAlgorithm,
- );
- const pullAlgorithm = (): PromiseLike<void> =>
- transformStreamDefaultSourcePullAlgorithm(stream);
- const cancelAlgorithm = (reason: any): Promise<void> => {
- transformStreamErrorWritableAndUnblockWrite(stream, reason);
- return Promise.resolve(undefined);
- };
- stream[sym.readable] = createReadableStream(
- startAlgorithm,
- pullAlgorithm,
- cancelAlgorithm,
- readableHighWaterMark,
- readableSizeAlgorithm,
- );
- stream[sym.backpressure] = stream[sym.backpressureChangePromise] = undefined;
- transformStreamSetBackpressure(stream, true);
- Object.defineProperty(stream, sym.transformStreamController, {
- value: undefined,
- configurable: true,
- });
-}
-
-export function initializeWritableStream<W>(
- stream: WritableStreamImpl<W>,
-): void {
- stream[sym.state] = "writable";
- stream[sym.storedError] = stream[sym.writer] = stream[
- sym.writableStreamController
- ] = stream[sym.inFlightWriteRequest] = stream[sym.closeRequest] = stream[
- sym.inFlightCloseRequest
- ] = stream[sym.pendingAbortRequest] = undefined;
- stream[sym.writeRequests] = [];
- stream[sym.backpressure] = false;
-}
-
-export function invokeOrNoop<O extends Record<string, any>, P extends keyof O>(
- o: O,
- p: P,
- ...args: Parameters<O[P]>
-): ReturnType<O[P]> | undefined {
- assert(o);
- const method = o[p];
- if (!method) {
- return undefined;
- }
- return call(method, o, args);
-}
-
-function isCallable(value: unknown): value is (...args: any) => any {
- return typeof value === "function";
-}
-
-export function isDetachedBuffer(value: object): boolean {
- return sym.isFakeDetached in value;
-}
-
-function isFiniteNonNegativeNumber(v: unknown): v is number {
- return Number.isFinite(v) && (v as number) >= 0;
-}
-
-export function isReadableByteStreamController(
- x: unknown,
-): x is ReadableByteStreamControllerImpl {
- return !(
- typeof x !== "object" ||
- x === null ||
- !(sym.controlledReadableByteStream in x)
- );
-}
-
-export function isReadableStream(x: unknown): x is ReadableStreamImpl {
- return !(
- typeof x !== "object" ||
- x === null ||
- !(sym.readableStreamController in x)
- );
-}
-
-export function isReadableStreamAsyncIterator(
- x: unknown,
-): x is ReadableStreamAsyncIterator {
- if (typeof x !== "object" || x === null) {
- return false;
- }
- return sym.asyncIteratorReader in x;
-}
-
-export function isReadableStreamDefaultController(
- x: unknown,
-): x is ReadableStreamDefaultControllerImpl {
- return !(
- typeof x !== "object" ||
- x === null ||
- !(sym.controlledReadableStream in x)
- );
-}
-
-export function isReadableStreamDefaultReader<T>(
- x: unknown,
-): x is ReadableStreamDefaultReaderImpl<T> {
- return !(typeof x !== "object" || x === null || !(sym.readRequests in x));
-}
-
-export function isReadableStreamLocked(stream: ReadableStreamImpl): boolean {
- assert(isReadableStream(stream));
- return !!stream[sym.reader];
-}
-
-export function isReadableStreamDisturbed(stream: ReadableStream): boolean {
- assert(isReadableStream(stream));
- return !!stream[sym.disturbed];
-}
-
-export function isTransformStream(x: unknown): x is TransformStreamImpl {
- return !(
- typeof x !== "object" ||
- x === null ||
- !(sym.transformStreamController in x)
- );
-}
-
-export function isTransformStreamDefaultController(
- x: unknown,
-): x is TransformStreamDefaultControllerImpl {
- return !(
- typeof x !== "object" ||
- x === null ||
- !(sym.controlledTransformStream in x)
- );
-}
-
-export function isUnderlyingByteSource(
- underlyingSource: UnderlyingByteSource | UnderlyingSource,
-): underlyingSource is UnderlyingByteSource {
- const { type } = underlyingSource;
- const typeString = String(type);
- return typeString === "bytes";
-}
-
-export function isWritableStream(x: unknown): x is WritableStreamImpl {
- return !(
- typeof x !== "object" ||
- x === null ||
- !(sym.writableStreamController in x)
- );
-}
-
-export function isWritableStreamDefaultController(
- x: unknown,
-): x is WritableStreamDefaultControllerImpl<any> {
- return !(
- typeof x !== "object" ||
- x === null ||
- !(sym.controlledWritableStream in x)
- );
-}
-
-export function isWritableStreamDefaultWriter(
- x: unknown,
-): x is WritableStreamDefaultWriterImpl<any> {
- return !(
- typeof x !== "object" ||
- x === null ||
- !(sym.ownerWritableStream in x)
- );
-}
-
-export function isWritableStreamLocked(stream: WritableStreamImpl): boolean {
- assert(isWritableStream(stream));
- return stream[sym.writer] !== undefined;
-}
-
-export function makeSizeAlgorithmFromSizeFunction<T>(
- size: QueuingStrategySizeCallback<T> | undefined,
-): SizeAlgorithm<T> {
- if (size === undefined) {
- return (): number => 1;
- }
- if (typeof size !== "function") {
- throw new TypeError("size must be callable.");
- }
- return (chunk: T): number => {
- return size.call(undefined, chunk);
- };
-}
-
-function peekQueueValue<T>(container: Container<T>): T | "close" {
- assert(sym.queue in container && sym.queueTotalSize in container);
- assert(container[sym.queue].length);
- const [pair] = container[sym.queue];
- return pair.value as T;
-}
-
-function readableByteStreamControllerShouldCallPull(
- controller: ReadableByteStreamControllerImpl,
-): boolean {
- const stream = controller[sym.controlledReadableByteStream];
- if (
- stream[sym.state] !== "readable" ||
- controller[sym.closeRequested] ||
- !controller[sym.started]
- ) {
- return false;
- }
- if (
- readableStreamHasDefaultReader(stream) &&
- readableStreamGetNumReadRequests(stream) > 0
- ) {
- return true;
- }
- // 3.13.25.6 If ! ReadableStreamHasBYOBReader(stream) is true and !
- // ReadableStreamGetNumReadIntoRequests(stream) > 0, return true.
- const desiredSize = readableByteStreamControllerGetDesiredSize(controller);
- assert(desiredSize !== null);
- return desiredSize > 0;
-}
-
-export function readableByteStreamControllerCallPullIfNeeded(
- controller: ReadableByteStreamControllerImpl,
-): void {
- const shouldPull = readableByteStreamControllerShouldCallPull(controller);
- if (!shouldPull) {
- return;
- }
- if (controller[sym.pulling]) {
- controller[sym.pullAgain] = true;
- return;
- }
- assert(controller[sym.pullAgain] === false);
- controller[sym.pulling] = true;
- const pullPromise = controller[sym.pullAlgorithm]();
- setPromiseIsHandledToTrue(
- pullPromise.then(
- () => {
- controller[sym.pulling] = false;
- if (controller[sym.pullAgain]) {
- controller[sym.pullAgain] = false;
- readableByteStreamControllerCallPullIfNeeded(controller);
- }
- },
- (e) => {
- readableByteStreamControllerError(controller, e);
- },
- ),
- );
-}
-
-export function readableByteStreamControllerClearAlgorithms(
- controller: ReadableByteStreamControllerImpl,
-): void {
- (controller as any)[sym.pullAlgorithm] = undefined;
- (controller as any)[sym.cancelAlgorithm] = undefined;
-}
-
-export function readableByteStreamControllerClose(
- controller: ReadableByteStreamControllerImpl,
-): void {
- const stream = controller[sym.controlledReadableByteStream];
- if (controller[sym.closeRequested] || stream[sym.state] !== "readable") {
- return;
- }
- if (controller[sym.queueTotalSize] > 0) {
- controller[sym.closeRequested] = true;
- return;
- }
- // 3.13.6.4 If controller.[[pendingPullIntos]] is not empty, (BYOB Support)
- readableByteStreamControllerClearAlgorithms(controller);
- readableStreamClose(stream);
-}
-
-export function readableByteStreamControllerEnqueue(
- controller: ReadableByteStreamControllerImpl,
- chunk: ArrayBufferView,
-): void {
- const stream = controller[sym.controlledReadableByteStream];
- if (controller[sym.closeRequested] || stream[sym.state] !== "readable") {
- return;
- }
- const { buffer, byteOffset, byteLength } = chunk;
- const transferredBuffer = transferArrayBuffer(buffer);
- if (readableStreamHasDefaultReader(stream)) {
- if (readableStreamGetNumReadRequests(stream) === 0) {
- readableByteStreamControllerEnqueueChunkToQueue(
- controller,
- transferredBuffer,
- byteOffset,
- byteLength,
- );
- } else {
- assert(controller[sym.queue].length === 0);
- const transferredView = new Uint8Array(
- transferredBuffer,
- byteOffset,
- byteLength,
- );
- readableStreamFulfillReadRequest(stream, transferredView, false);
- }
- // 3.13.9.8 Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true
- } else {
- assert(!isReadableStreamLocked(stream));
- readableByteStreamControllerEnqueueChunkToQueue(
- controller,
- transferredBuffer,
- byteOffset,
- byteLength,
- );
- }
- readableByteStreamControllerCallPullIfNeeded(controller);
-}
-
-function readableByteStreamControllerEnqueueChunkToQueue(
- controller: ReadableByteStreamControllerImpl,
- buffer: ArrayBuffer | SharedArrayBuffer,
- byteOffset: number,
- byteLength: number,
-): void {
- controller[sym.queue].push({
- value: buffer,
- offset: byteOffset,
- size: byteLength,
- });
- controller[sym.queueTotalSize] += byteLength;
-}
-
-export function readableByteStreamControllerError(
- controller: ReadableByteStreamControllerImpl,
- e: any,
-): void {
- const stream = controller[sym.controlledReadableByteStream];
- if (stream[sym.state] !== "readable") {
- return;
- }
- // 3.13.11.3 Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
- resetQueue(controller);
- readableByteStreamControllerClearAlgorithms(controller);
- readableStreamError(stream, e);
-}
-
-export function readableByteStreamControllerGetDesiredSize(
- controller: ReadableByteStreamControllerImpl,
-): number | null {
- const stream = controller[sym.controlledReadableByteStream];
- const state = stream[sym.state];
- if (state === "errored") {
- return null;
- }
- if (state === "closed") {
- return 0;
- }
- return controller[sym.strategyHWM] - controller[sym.queueTotalSize];
-}
-
-export function readableByteStreamControllerHandleQueueDrain(
- controller: ReadableByteStreamControllerImpl,
-): void {
- assert(
- controller[sym.controlledReadableByteStream][sym.state] === "readable",
- );
- if (controller[sym.queueTotalSize] === 0 && controller[sym.closeRequested]) {
- readableByteStreamControllerClearAlgorithms(controller);
- readableStreamClose(controller[sym.controlledReadableByteStream]);
- } else {
- readableByteStreamControllerCallPullIfNeeded(controller);
- }
-}
-
-export function readableStreamAddReadRequest<R>(
- stream: ReadableStreamImpl<R>,
-): Promise<ReadableStreamReadResult<R>> {
- assert(isReadableStreamDefaultReader(stream[sym.reader]));
- assert(stream[sym.state] === "readable");
- const promise = getDeferred<ReadableStreamReadResult<R>>();
- stream[sym.reader]![sym.readRequests].push(promise);
- return promise.promise;
-}
-
-export function readableStreamCancel<T>(
- stream: ReadableStreamImpl<T>,
- reason: any,
-): Promise<void> {
- stream[sym.disturbed] = true;
- if (stream[sym.state] === "closed") {
- return Promise.resolve();
- }
- if (stream[sym.state] === "errored") {
- return Promise.reject(stream[sym.storedError]);
- }
- readableStreamClose(stream);
- return stream[sym.readableStreamController]![sym.cancelSteps](reason).then(
- () => undefined,
- ) as Promise<void>;
-}
-
-export function readableStreamClose<T>(stream: ReadableStreamImpl<T>): void {
- assert(stream[sym.state] === "readable");
- stream[sym.state] = "closed";
- const reader = stream[sym.reader];
- if (!reader) {
- return;
- }
- if (isReadableStreamDefaultReader<T>(reader)) {
- for (const readRequest of reader[sym.readRequests]) {
- assert(readRequest.resolve);
- readRequest.resolve(
- readableStreamCreateReadResult<T>(
- undefined,
- true,
- reader[sym.forAuthorCode],
- ),
- );
- }
- reader[sym.readRequests] = [];
- }
- const resolve = reader[sym.closedPromise].resolve;
- assert(resolve);
- resolve();
-}
-
-export function readableStreamCreateReadResult<T>(
- value: T | undefined,
- done: boolean,
- forAuthorCode: boolean,
-): ReadableStreamReadResult<T> {
- const prototype = forAuthorCode ? Object.prototype : null;
- assert(typeof done === "boolean");
- const obj: ReadableStreamReadResult<T> = Object.create(prototype);
- Object.defineProperties(obj, {
- value: { value, writable: true, enumerable: true, configurable: true },
- done: { value: done, writable: true, enumerable: true, configurable: true },
- });
- return obj;
-}
-
-export function readableStreamDefaultControllerCallPullIfNeeded<T>(
- controller: ReadableStreamDefaultControllerImpl<T>,
-): void {
- const shouldPull = readableStreamDefaultControllerShouldCallPull(controller);
- if (!shouldPull) {
- return;
- }
- if (controller[sym.pulling]) {
- controller[sym.pullAgain] = true;
- return;
- }
- assert(controller[sym.pullAgain] === false);
- controller[sym.pulling] = true;
- const pullPromise = controller[sym.pullAlgorithm]();
- pullPromise.then(
- () => {
- controller[sym.pulling] = false;
- if (controller[sym.pullAgain]) {
- controller[sym.pullAgain] = false;
- readableStreamDefaultControllerCallPullIfNeeded(controller);
- }
- },
- (e) => {
- readableStreamDefaultControllerError(controller, e);
- },
- );
-}
-
-export function readableStreamDefaultControllerCanCloseOrEnqueue<T>(
- controller: ReadableStreamDefaultControllerImpl<T>,
-): boolean {
- const state = controller[sym.controlledReadableStream][sym.state];
- return !controller[sym.closeRequested] && state === "readable";
-}
-
-export function readableStreamDefaultControllerClearAlgorithms<T>(
- controller: ReadableStreamDefaultControllerImpl<T>,
-): void {
- (controller as any)[sym.pullAlgorithm] = undefined;
- (controller as any)[sym.cancelAlgorithm] = undefined;
- (controller as any)[sym.strategySizeAlgorithm] = undefined;
-}
-
-export function readableStreamDefaultControllerClose<T>(
- controller: ReadableStreamDefaultControllerImpl<T>,
-): void {
- if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) {
- return;
- }
- const stream = controller[sym.controlledReadableStream];
- controller[sym.closeRequested] = true;
- if (controller[sym.queue].length === 0) {
- readableStreamDefaultControllerClearAlgorithms(controller);
- readableStreamClose(stream);
- }
-}
-
-export function readableStreamDefaultControllerEnqueue<T>(
- controller: ReadableStreamDefaultControllerImpl<T>,
- chunk: T,
-): void {
- if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) {
- return;
- }
- const stream = controller[sym.controlledReadableStream];
- if (
- isReadableStreamLocked(stream) &&
- readableStreamGetNumReadRequests(stream) > 0
- ) {
- readableStreamFulfillReadRequest(stream, chunk, false);
- } else {
- try {
- const chunkSize = controller[sym.strategySizeAlgorithm](chunk);
- enqueueValueWithSize(controller, chunk, chunkSize);
- } catch (err) {
- readableStreamDefaultControllerError(controller, err);
- throw err;
- }
- }
- readableStreamDefaultControllerCallPullIfNeeded(controller);
-}
-
-export function readableStreamDefaultControllerGetDesiredSize<T>(
- controller: ReadableStreamDefaultControllerImpl<T>,
-): number | null {
- const stream = controller[sym.controlledReadableStream];
- const state = stream[sym.state];
- if (state === "errored") {
- return null;
- }
- if (state === "closed") {
- return 0;
- }
- return controller[sym.strategyHWM] - controller[sym.queueTotalSize];
-}
-
-export function readableStreamDefaultControllerError<T>(
- controller: ReadableStreamDefaultControllerImpl<T>,
- e: any,
-): void {
- const stream = controller[sym.controlledReadableStream];
- if (stream[sym.state] !== "readable") {
- return;
- }
- resetQueue(controller);
- readableStreamDefaultControllerClearAlgorithms(controller);
- readableStreamError(stream, e);
-}
-
-function readableStreamDefaultControllerHasBackpressure<T>(
- controller: ReadableStreamDefaultControllerImpl<T>,
-): boolean {
- return readableStreamDefaultControllerShouldCallPull(controller);
-}
-
-function readableStreamDefaultControllerShouldCallPull<T>(
- controller: ReadableStreamDefaultControllerImpl<T>,
-): boolean {
- const stream = controller[sym.controlledReadableStream];
- if (
- !readableStreamDefaultControllerCanCloseOrEnqueue(controller) ||
- controller[sym.started] === false
- ) {
- return false;
- }
- if (
- isReadableStreamLocked(stream) &&
- readableStreamGetNumReadRequests(stream) > 0
- ) {
- return true;
- }
- const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller);
- assert(desiredSize !== null);
- return desiredSize > 0;
-}
-
-export function readableStreamDefaultReaderRead<R>(
- reader: ReadableStreamDefaultReaderImpl<R>,
-): Promise<ReadableStreamReadResult<R>> {
- const stream = reader[sym.ownerReadableStream];
- assert(stream);
- stream[sym.disturbed] = true;
- if (stream[sym.state] === "closed") {
- return Promise.resolve(
- readableStreamCreateReadResult<R>(
- undefined,
- true,
- reader[sym.forAuthorCode],
- ),
- );
- }
- if (stream[sym.state] === "errored") {
- return Promise.reject(stream[sym.storedError]);
- }
- assert(stream[sym.state] === "readable");
- return (stream[
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl)[sym.pullSteps]();
-}
-
-export function readableStreamError(stream: ReadableStreamImpl, e: any): void {
- assert(isReadableStream(stream));
- assert(stream[sym.state] === "readable");
- stream[sym.state] = "errored";
- stream[sym.storedError] = e;
- const reader = stream[sym.reader];
- if (reader === undefined) {
- return;
- }
- if (isReadableStreamDefaultReader(reader)) {
- for (const readRequest of reader[sym.readRequests]) {
- assert(readRequest.reject);
- readRequest.reject(e);
- readRequest.reject = undefined;
- readRequest.resolve = undefined;
- }
- reader[sym.readRequests] = [];
- }
- // 3.5.6.8 Otherwise, support BYOB Reader
- reader[sym.closedPromise].reject!(e);
- reader[sym.closedPromise].reject = undefined;
- reader[sym.closedPromise].resolve = undefined;
- setPromiseIsHandledToTrue(reader[sym.closedPromise].promise);
-}
-
-export function readableStreamFulfillReadRequest<R>(
- stream: ReadableStreamImpl<R>,
- chunk: R,
- done: boolean,
-): void {
- const reader = stream[sym.reader]!;
- const readRequest = reader[sym.readRequests].shift()!;
- assert(readRequest.resolve);
- readRequest.resolve(
- readableStreamCreateReadResult(chunk, done, reader[sym.forAuthorCode]),
- );
-}
-
-export function readableStreamGetNumReadRequests(
- stream: ReadableStreamImpl,
-): number {
- return stream[sym.reader]?.[sym.readRequests].length ?? 0;
-}
-
-export function readableStreamHasDefaultReader(
- stream: ReadableStreamImpl,
-): boolean {
- const reader = stream[sym.reader];
- return !(reader === undefined || !isReadableStreamDefaultReader(reader));
-}
-
-export function readableStreamPipeTo<T>(
- source: ReadableStreamImpl<T>,
- dest: WritableStreamImpl<T>,
- preventClose: boolean,
- preventAbort: boolean,
- preventCancel: boolean,
- signal: AbortSignalImpl | undefined,
-): Promise<void> {
- assert(isReadableStream(source));
- assert(isWritableStream(dest));
- assert(
- typeof preventClose === "boolean" &&
- typeof preventAbort === "boolean" &&
- typeof preventCancel === "boolean",
- );
- assert(signal === undefined || signal instanceof AbortSignalImpl);
- assert(!isReadableStreamLocked(source));
- assert(!isWritableStreamLocked(dest));
- const reader = acquireReadableStreamDefaultReader(source);
- const writer = acquireWritableStreamDefaultWriter(dest);
- source[sym.disturbed] = true;
- let shuttingDown = false;
- const promise = getDeferred<void>();
- let abortAlgorithm: () => void;
- if (signal) {
- abortAlgorithm = (): void => {
- const error = new DOMException("Abort signal received.", "AbortSignal");
- const actions: Array<() => Promise<void>> = [];
- if (!preventAbort) {
- actions.push(() => {
- if (dest[sym.state] === "writable") {
- return writableStreamAbort(dest, error);
- } else {
- return Promise.resolve(undefined);
- }
- });
- }
- if (!preventCancel) {
- actions.push(() => {
- if (source[sym.state] === "readable") {
- return readableStreamCancel(source, error);
- } else {
- return Promise.resolve(undefined);
- }
- });
- }
- shutdownWithAction(
- () => Promise.all(actions.map((action) => action())),
- true,
- error,
- );
- };
- if (signal.aborted) {
- abortAlgorithm();
- return promise.promise;
- }
- signal.addEventListener("abort", abortAlgorithm);
- }
-
- let currentWrite = Promise.resolve();
-
- // At this point, the spec becomes non-specific and vague. Most of the rest
- // of this code is based on the reference implementation that is part of the
- // specification. This is why the functions are only scoped to this function
- // to ensure they don't leak into the spec compliant parts.
-
- function isOrBecomesClosed(
- stream: ReadableStreamImpl | WritableStreamImpl,
- promise: Promise<void>,
- action: () => void,
- ): void {
- if (stream[sym.state] === "closed") {
- action();
- } else {
- setPromiseIsHandledToTrue(promise.then(action));
- }
- }
-
- function isOrBecomesErrored(
- stream: ReadableStreamImpl | WritableStreamImpl,
- promise: Promise<void>,
- action: (error: any) => void,
- ): void {
- if (stream[sym.state] === "errored") {
- action(stream[sym.storedError]);
- } else {
- setPromiseIsHandledToTrue(promise.catch((error) => action(error)));
- }
- }
-
- function finalize(isError?: boolean, error?: any): void {
- writableStreamDefaultWriterRelease(writer);
- readableStreamReaderGenericRelease(reader);
-
- if (signal) {
- signal.removeEventListener("abort", abortAlgorithm);
- }
- if (isError) {
- promise.reject(error);
- } else {
- promise.resolve();
- }
- }
-
- function waitForWritesToFinish(): Promise<void> {
- const oldCurrentWrite = currentWrite;
- return currentWrite.then(() =>
- oldCurrentWrite !== currentWrite ? waitForWritesToFinish() : undefined
- );
- }
-
- function shutdownWithAction(
- action: () => Promise<any>,
- originalIsError?: boolean,
- originalError?: any,
- ): void {
- function doTheRest(): void {
- setPromiseIsHandledToTrue(
- action().then(
- () => finalize(originalIsError, originalError),
- (newError) => finalize(true, newError),
- ),
- );
- }
-
- if (shuttingDown) {
- return;
- }
- shuttingDown = true;
-
- if (
- dest[sym.state] === "writable" &&
- writableStreamCloseQueuedOrInFlight(dest) === false
- ) {
- setPromiseIsHandledToTrue(waitForWritesToFinish().then(doTheRest));
- } else {
- doTheRest();
- }
- }
-
- function shutdown(isError: boolean, error?: any): void {
- if (shuttingDown) {
- return;
- }
- shuttingDown = true;
-
- if (
- dest[sym.state] === "writable" &&
- !writableStreamCloseQueuedOrInFlight(dest)
- ) {
- setPromiseIsHandledToTrue(
- waitForWritesToFinish().then(() => finalize(isError, error)),
- );
- }
- finalize(isError, error);
- }
-
- function pipeStep(): Promise<boolean> {
- if (shuttingDown) {
- return Promise.resolve(true);
- }
- return writer[sym.readyPromise].promise.then(() => {
- return readableStreamDefaultReaderRead(reader).then(({ value, done }) => {
- if (done === true) {
- return true;
- }
- currentWrite = writableStreamDefaultWriterWrite(
- writer,
- value!,
- ).then(undefined, () => {});
- return false;
- });
- });
- }
-
- function pipeLoop(): Promise<void> {
- return new Promise((resolveLoop, rejectLoop) => {
- function next(done: boolean): void {
- if (done) {
- resolveLoop(undefined);
- } else {
- setPromiseIsHandledToTrue(pipeStep().then(next, rejectLoop));
- }
- }
- next(false);
- });
- }
-
- isOrBecomesErrored(
- source,
- reader[sym.closedPromise].promise,
- (storedError) => {
- if (!preventAbort) {
- shutdownWithAction(
- () => writableStreamAbort(dest, storedError),
- true,
- storedError,
- );
- } else {
- shutdown(true, storedError);
- }
- },
- );
-
- isOrBecomesErrored(dest, writer[sym.closedPromise].promise, (storedError) => {
- if (!preventCancel) {
- shutdownWithAction(
- () => readableStreamCancel(source, storedError),
- true,
- storedError,
- );
- } else {
- shutdown(true, storedError);
- }
- });
-
- isOrBecomesClosed(source, reader[sym.closedPromise].promise, () => {
- if (!preventClose) {
- shutdownWithAction(() =>
- writableStreamDefaultWriterCloseWithErrorPropagation(writer)
- );
- }
- });
-
- if (
- writableStreamCloseQueuedOrInFlight(dest) ||
- dest[sym.state] === "closed"
- ) {
- const destClosed = new TypeError(
- "The destination writable stream closed before all data could be piped to it.",
- );
- if (!preventCancel) {
- shutdownWithAction(
- () => readableStreamCancel(source, destClosed),
- true,
- destClosed,
- );
- } else {
- shutdown(true, destClosed);
- }
- }
-
- setPromiseIsHandledToTrue(pipeLoop());
- return promise.promise;
-}
-
-export function readableStreamReaderGenericCancel<R = any>(
- reader: ReadableStreamGenericReader<R>,
- reason: any,
-): Promise<void> {
- const stream = reader[sym.ownerReadableStream];
- assert(stream);
- return readableStreamCancel(stream, reason);
-}
-
-export function readableStreamReaderGenericInitialize<R = any>(
- reader: ReadableStreamGenericReader<R>,
- stream: ReadableStreamImpl<R>,
-): void {
- reader[sym.forAuthorCode] = true;
- reader[sym.ownerReadableStream] = stream;
- stream[sym.reader] = reader;
- if (stream[sym.state] === "readable") {
- reader[sym.closedPromise] = getDeferred();
- } else if (stream[sym.state] === "closed") {
- reader[sym.closedPromise] = { promise: Promise.resolve() };
- } else {
- assert(stream[sym.state] === "errored");
- reader[sym.closedPromise] = {
- promise: Promise.reject(stream[sym.storedError]),
- };
- setPromiseIsHandledToTrue(reader[sym.closedPromise].promise);
- }
-}
-
-export function readableStreamReaderGenericRelease<R = any>(
- reader: ReadableStreamGenericReader<R>,
-): void {
- assert(reader[sym.ownerReadableStream]);
- assert(reader[sym.ownerReadableStream][sym.reader] === reader);
- const closedPromise = reader[sym.closedPromise];
- if (reader[sym.ownerReadableStream][sym.state] === "readable") {
- assert(closedPromise.reject);
- closedPromise.reject(new TypeError("ReadableStream state is readable."));
- } else {
- closedPromise.promise = Promise.reject(new TypeError("Reading is closed."));
- delete closedPromise.reject;
- delete closedPromise.resolve;
- }
- setPromiseIsHandledToTrue(closedPromise.promise);
- reader[sym.ownerReadableStream][sym.reader] = undefined;
- (reader as any)[sym.ownerReadableStream] = undefined;
-}
-
-export function readableStreamTee<T>(
- stream: ReadableStreamImpl<T>,
- cloneForBranch2: boolean,
-): [ReadableStreamImpl<T>, ReadableStreamImpl<T>] {
- assert(isReadableStream(stream));
- assert(typeof cloneForBranch2 === "boolean");
- const reader = acquireReadableStreamDefaultReader(stream);
- let reading = false;
- let canceled1 = false;
- let canceled2 = false;
- let reason1: any = undefined;
- let reason2: any = undefined;
- /* eslint-disable prefer-const */
- let branch1: ReadableStreamImpl<T>;
- let branch2: ReadableStreamImpl<T>;
- /* eslint-enable prefer-const */
- const cancelPromise = getDeferred<void>();
- const pullAlgorithm = (): PromiseLike<void> => {
- if (reading) {
- return Promise.resolve();
- }
- reading = true;
- const readPromise = readableStreamDefaultReaderRead(reader).then(
- (result) => {
- reading = false;
- assert(typeof result === "object");
- const { done } = result;
- assert(typeof done === "boolean");
- if (done) {
- if (!canceled1) {
- readableStreamDefaultControllerClose(
- branch1[
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl,
- );
- }
- if (!canceled2) {
- readableStreamDefaultControllerClose(
- branch2[
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl,
- );
- }
- return;
- }
- const { value } = result;
- const value1 = value!;
- let value2 = value!;
- if (!canceled2 && cloneForBranch2) {
- value2 = cloneValue(value2);
- }
- if (!canceled1) {
- readableStreamDefaultControllerEnqueue(
- branch1[
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl,
- value1,
- );
- }
- if (!canceled2) {
- readableStreamDefaultControllerEnqueue(
- branch2[
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl,
- value2,
- );
- }
- },
- );
- setPromiseIsHandledToTrue(readPromise);
- return Promise.resolve();
- };
- const cancel1Algorithm = (reason?: any): PromiseLike<void> => {
- canceled1 = true;
- reason1 = reason;
- if (canceled2) {
- const compositeReason = [reason1, reason2];
- const cancelResult = readableStreamCancel(stream, compositeReason);
- cancelPromise.resolve(cancelResult);
- }
- return cancelPromise.promise;
- };
- const cancel2Algorithm = (reason?: any): PromiseLike<void> => {
- canceled2 = true;
- reason2 = reason;
- if (canceled1) {
- const compositeReason = [reason1, reason2];
- const cancelResult = readableStreamCancel(stream, compositeReason);
- cancelPromise.resolve(cancelResult);
- }
- return cancelPromise.promise;
- };
- const startAlgorithm = (): void => undefined;
- branch1 = createReadableStream(
- startAlgorithm,
- pullAlgorithm,
- cancel1Algorithm,
- );
- branch2 = createReadableStream(
- startAlgorithm,
- pullAlgorithm,
- cancel2Algorithm,
- );
- setPromiseIsHandledToTrue(
- reader[sym.closedPromise].promise.catch((r) => {
- readableStreamDefaultControllerError(
- branch1[
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl,
- r,
- );
- readableStreamDefaultControllerError(
- branch2[
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl,
- r,
- );
- }),
- );
- return [branch1, branch2];
-}
-
-export function resetQueue<R>(container: Container<R>): void {
- assert(sym.queue in container && sym.queueTotalSize in container);
- container[sym.queue] = [];
- container[sym.queueTotalSize] = 0;
-}
-
-/** An internal function which mimics the behavior of setting the promise to
- * handled in JavaScript. In this situation, an assertion failure, which
- * shouldn't happen will get thrown, instead of swallowed. */
-export function setPromiseIsHandledToTrue(promise: PromiseLike<unknown>): void {
- promise.then(undefined, (e) => {
- if (e && e instanceof AssertionError) {
- queueMicrotask(() => {
- throw e;
- });
- }
- });
-}
-
-function setUpReadableByteStreamController(
- stream: ReadableStreamImpl,
- controller: ReadableByteStreamControllerImpl,
- startAlgorithm: StartAlgorithm,
- pullAlgorithm: PullAlgorithm,
- cancelAlgorithm: CancelAlgorithm,
- highWaterMark: number,
- autoAllocateChunkSize: number | undefined,
-): void {
- assert(stream[sym.readableStreamController] === undefined);
- if (autoAllocateChunkSize !== undefined) {
- assert(Number.isInteger(autoAllocateChunkSize));
- assert(autoAllocateChunkSize >= 0);
- }
- controller[sym.controlledReadableByteStream] = stream;
- controller[sym.pulling] = controller[sym.pullAgain] = false;
- controller[sym.byobRequest] = undefined;
- controller[sym.queue] = [];
- controller[sym.queueTotalSize] = 0;
- controller[sym.closeRequested] = controller[sym.started] = false;
- controller[sym.strategyHWM] = validateAndNormalizeHighWaterMark(
- highWaterMark,
- );
- controller[sym.pullAlgorithm] = pullAlgorithm;
- controller[sym.cancelAlgorithm] = cancelAlgorithm;
- controller[sym.autoAllocateChunkSize] = autoAllocateChunkSize;
- // 3.13.26.12 Set controller.[[pendingPullIntos]] to a new empty List.
- stream[sym.readableStreamController] = controller;
- const startResult = startAlgorithm();
- const startPromise = Promise.resolve(startResult);
- setPromiseIsHandledToTrue(
- startPromise.then(
- () => {
- controller[sym.started] = true;
- assert(!controller[sym.pulling]);
- assert(!controller[sym.pullAgain]);
- readableByteStreamControllerCallPullIfNeeded(controller);
- },
- (r) => {
- readableByteStreamControllerError(controller, r);
- },
- ),
- );
-}
-
-export function setUpReadableByteStreamControllerFromUnderlyingSource(
- stream: ReadableStreamImpl,
- underlyingByteSource: UnderlyingByteSource,
- highWaterMark: number,
-): void {
- assert(underlyingByteSource);
- const controller: ReadableByteStreamControllerImpl = Object.create(
- ReadableByteStreamControllerImpl.prototype,
- );
- const startAlgorithm: StartAlgorithm = () => {
- return invokeOrNoop(underlyingByteSource, "start", controller);
- };
- const pullAlgorithm = createAlgorithmFromUnderlyingMethod(
- underlyingByteSource,
- "pull",
- 0,
- controller,
- );
- setFunctionName(pullAlgorithm, "[[pullAlgorithm]]");
- const cancelAlgorithm = createAlgorithmFromUnderlyingMethod(
- underlyingByteSource,
- "cancel",
- 1,
- );
- setFunctionName(cancelAlgorithm, "[[cancelAlgorithm]]");
- // 3.13.27.6 Let autoAllocateChunkSize be ? GetV(underlyingByteSource, "autoAllocateChunkSize").
- const autoAllocateChunkSize = undefined;
- setUpReadableByteStreamController(
- stream,
- controller,
- startAlgorithm,
- pullAlgorithm,
- cancelAlgorithm,
- highWaterMark,
- autoAllocateChunkSize,
- );
-}
-
-function setUpReadableStreamDefaultController<T>(
- stream: ReadableStreamImpl<T>,
- controller: ReadableStreamDefaultControllerImpl<T>,
- startAlgorithm: StartAlgorithm,
- pullAlgorithm: PullAlgorithm,
- cancelAlgorithm: CancelAlgorithm,
- highWaterMark: number,
- sizeAlgorithm: SizeAlgorithm<T>,
-): void {
- assert(stream[sym.readableStreamController] === undefined);
- controller[sym.controlledReadableStream] = stream;
- controller[sym.queue] = [];
- controller[sym.queueTotalSize] = 0;
- controller[sym.started] = controller[sym.closeRequested] = controller[
- sym.pullAgain
- ] = controller[sym.pulling] = false;
- controller[sym.strategySizeAlgorithm] = sizeAlgorithm;
- controller[sym.strategyHWM] = highWaterMark;
- controller[sym.pullAlgorithm] = pullAlgorithm;
- controller[sym.cancelAlgorithm] = cancelAlgorithm;
- stream[sym.readableStreamController] = controller;
- const startResult = startAlgorithm();
- const startPromise = Promise.resolve(startResult);
- setPromiseIsHandledToTrue(
- startPromise.then(
- () => {
- controller[sym.started] = true;
- assert(controller[sym.pulling] === false);
- assert(controller[sym.pullAgain] === false);
- readableStreamDefaultControllerCallPullIfNeeded(controller);
- },
- (r) => {
- readableStreamDefaultControllerError(controller, r);
- },
- ),
- );
-}
-
-export function setUpReadableStreamDefaultControllerFromUnderlyingSource<T>(
- stream: ReadableStreamImpl<T>,
- underlyingSource: UnderlyingSource<T>,
- highWaterMark: number,
- sizeAlgorithm: SizeAlgorithm<T>,
-): void {
- assert(underlyingSource);
- const controller: ReadableStreamDefaultControllerImpl<T> = Object.create(
- ReadableStreamDefaultControllerImpl.prototype,
- );
- const startAlgorithm: StartAlgorithm = (): void | PromiseLike<void> =>
- invokeOrNoop(underlyingSource, "start", controller);
- const pullAlgorithm: PullAlgorithm = createAlgorithmFromUnderlyingMethod(
- underlyingSource,
- "pull",
- 0,
- controller,
- );
- setFunctionName(pullAlgorithm, "[[pullAlgorithm]]");
- const cancelAlgorithm: CancelAlgorithm = createAlgorithmFromUnderlyingMethod(
- underlyingSource,
- "cancel",
- 1,
- );
- setFunctionName(cancelAlgorithm, "[[cancelAlgorithm]]");
- setUpReadableStreamDefaultController(
- stream,
- controller,
- startAlgorithm,
- pullAlgorithm,
- cancelAlgorithm,
- highWaterMark,
- sizeAlgorithm,
- );
-}
-
-function setUpTransformStreamDefaultController<I, O>(
- stream: TransformStreamImpl<I, O>,
- controller: TransformStreamDefaultControllerImpl<I, O>,
- transformAlgorithm: TransformAlgorithm<I>,
- flushAlgorithm: FlushAlgorithm,
-): void {
- assert(isTransformStream(stream));
- assert(stream[sym.transformStreamController] === undefined);
- controller[sym.controlledTransformStream] = stream;
- stream[sym.transformStreamController] = controller;
- controller[sym.transformAlgorithm] = transformAlgorithm;
- controller[sym.flushAlgorithm] = flushAlgorithm;
-}
-
-export function setUpTransformStreamDefaultControllerFromTransformer<I, O>(
- stream: TransformStreamImpl<I, O>,
- transformer: Transformer<I, O>,
-): void {
- assert(transformer);
- const controller = Object.create(
- TransformStreamDefaultControllerImpl.prototype,
- ) as TransformStreamDefaultControllerImpl<I, O>;
- let transformAlgorithm: TransformAlgorithm<I> = (chunk) => {
- try {
- transformStreamDefaultControllerEnqueue(
- controller,
- // it defaults to no tranformation, so I is assumed to be O
- (chunk as unknown) as O,
- );
- } catch (e) {
- return Promise.reject(e);
- }
- return Promise.resolve();
- };
- const transformMethod = transformer.transform;
- if (transformMethod) {
- if (typeof transformMethod !== "function") {
- throw new TypeError("tranformer.transform must be callable.");
- }
- transformAlgorithm = async (chunk): Promise<void> =>
- call(transformMethod, transformer, [chunk, controller]);
- }
- const flushAlgorithm = createAlgorithmFromUnderlyingMethod(
- transformer,
- "flush",
- 0,
- controller,
- );
- setUpTransformStreamDefaultController(
- stream,
- controller,
- transformAlgorithm,
- flushAlgorithm,
- );
-}
-
-function setUpWritableStreamDefaultController<W>(
- stream: WritableStreamImpl<W>,
- controller: WritableStreamDefaultControllerImpl<W>,
- startAlgorithm: StartAlgorithm,
- writeAlgorithm: WriteAlgorithm<W>,
- closeAlgorithm: CloseAlgorithm,
- abortAlgorithm: AbortAlgorithm,
- highWaterMark: number,
- sizeAlgorithm: SizeAlgorithm<W>,
-): void {
- assert(isWritableStream(stream));
- assert(stream[sym.writableStreamController] === undefined);
- controller[sym.controlledWritableStream] = stream;
- stream[sym.writableStreamController] = controller;
- controller[sym.queue] = [];
- controller[sym.queueTotalSize] = 0;
- controller[sym.started] = false;
- controller[sym.strategySizeAlgorithm] = sizeAlgorithm;
- controller[sym.strategyHWM] = highWaterMark;
- controller[sym.writeAlgorithm] = writeAlgorithm;
- controller[sym.closeAlgorithm] = closeAlgorithm;
- controller[sym.abortAlgorithm] = abortAlgorithm;
- const backpressure = writableStreamDefaultControllerGetBackpressure(
- controller,
- );
- writableStreamUpdateBackpressure(stream, backpressure);
- const startResult = startAlgorithm();
- const startPromise = Promise.resolve(startResult);
- setPromiseIsHandledToTrue(
- startPromise.then(
- () => {
- assert(
- stream[sym.state] === "writable" || stream[sym.state] === "erroring",
- );
- controller[sym.started] = true;
- writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
- },
- (r) => {
- assert(
- stream[sym.state] === "writable" || stream[sym.state] === "erroring",
- );
- controller[sym.started] = true;
- writableStreamDealWithRejection(stream, r);
- },
- ),
- );
-}
-
-export function setUpWritableStreamDefaultControllerFromUnderlyingSink<W>(
- stream: WritableStreamImpl<W>,
- underlyingSink: UnderlyingSink<W>,
- highWaterMark: number,
- sizeAlgorithm: SizeAlgorithm<W>,
-): void {
- assert(underlyingSink);
- const controller = Object.create(
- WritableStreamDefaultControllerImpl.prototype,
- );
- const startAlgorithm = (): void | PromiseLike<void> => {
- return invokeOrNoop(underlyingSink, "start", controller);
- };
- const writeAlgorithm = createAlgorithmFromUnderlyingMethod(
- underlyingSink,
- "write",
- 1,
- controller,
- );
- setFunctionName(writeAlgorithm, "[[writeAlgorithm]]");
- const closeAlgorithm = createAlgorithmFromUnderlyingMethod(
- underlyingSink,
- "close",
- 0,
- );
- setFunctionName(closeAlgorithm, "[[closeAlgorithm]]");
- const abortAlgorithm = createAlgorithmFromUnderlyingMethod(
- underlyingSink,
- "abort",
- 1,
- );
- setFunctionName(abortAlgorithm, "[[abortAlgorithm]]");
- setUpWritableStreamDefaultController(
- stream,
- controller,
- startAlgorithm,
- writeAlgorithm,
- closeAlgorithm,
- abortAlgorithm,
- highWaterMark,
- sizeAlgorithm,
- );
-}
-
-function transformStreamDefaultControllerClearAlgorithms<I, O>(
- controller: TransformStreamDefaultControllerImpl<I, O>,
-): void {
- (controller as any)[sym.transformAlgorithm] = undefined;
- (controller as any)[sym.flushAlgorithm] = undefined;
-}
-
-export function transformStreamDefaultControllerEnqueue<I, O>(
- controller: TransformStreamDefaultControllerImpl<I, O>,
- chunk: O,
-): void {
- const stream = controller[sym.controlledTransformStream];
- const readableController = stream[sym.readable][
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl<O>;
- if (!readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) {
- throw new TypeError(
- "TransformStream's readable controller cannot be closed or enqueued.",
- );
- }
- try {
- readableStreamDefaultControllerEnqueue(readableController, chunk);
- } catch (e) {
- transformStreamErrorWritableAndUnblockWrite(stream, e);
- throw stream[sym.readable][sym.storedError];
- }
- const backpressure = readableStreamDefaultControllerHasBackpressure(
- readableController,
- );
- if (backpressure) {
- transformStreamSetBackpressure(stream, true);
- }
-}
-
-export function transformStreamDefaultControllerError<I, O>(
- controller: TransformStreamDefaultControllerImpl<I, O>,
- e: any,
-): void {
- transformStreamError(controller[sym.controlledTransformStream], e);
-}
-
-function transformStreamDefaultControllerPerformTransform<I, O>(
- controller: TransformStreamDefaultControllerImpl<I, O>,
- chunk: I,
-): Promise<void> {
- const transformPromise = controller[sym.transformAlgorithm](chunk);
- return transformPromise.then(undefined, (r) => {
- transformStreamError(controller[sym.controlledTransformStream], r);
- throw r;
- });
-}
-
-function transformStreamDefaultSinkAbortAlgorithm<I, O>(
- stream: TransformStreamImpl<I, O>,
- reason: any,
-): Promise<void> {
- transformStreamError(stream, reason);
- return Promise.resolve(undefined);
-}
-
-function transformStreamDefaultSinkCloseAlgorithm<I, O>(
- stream: TransformStreamImpl<I, O>,
-): Promise<void> {
- const readable = stream[sym.readable];
- const controller = stream[sym.transformStreamController];
- const flushPromise = controller[sym.flushAlgorithm]();
- transformStreamDefaultControllerClearAlgorithms(controller);
- return flushPromise.then(
- () => {
- if (readable[sym.state] === "errored") {
- throw readable[sym.storedError];
- }
- const readableController = readable[
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl<O>;
- if (
- readableStreamDefaultControllerCanCloseOrEnqueue(readableController)
- ) {
- readableStreamDefaultControllerClose(readableController);
- }
- },
- (r) => {
- transformStreamError(stream, r);
- throw readable[sym.storedError];
- },
- );
-}
-
-function transformStreamDefaultSinkWriteAlgorithm<I, O>(
- stream: TransformStreamImpl<I, O>,
- chunk: I,
-): Promise<void> {
- assert(stream[sym.writable][sym.state] === "writable");
- const controller = stream[sym.transformStreamController];
- if (stream[sym.backpressure]) {
- const backpressureChangePromise = stream[sym.backpressureChangePromise];
- assert(backpressureChangePromise);
- return backpressureChangePromise.promise.then(() => {
- const writable = stream[sym.writable];
- const state = writable[sym.state];
- if (state === "erroring") {
- throw writable[sym.storedError];
- }
- assert(state === "writable");
- return transformStreamDefaultControllerPerformTransform(
- controller,
- chunk,
- );
- });
- }
- return transformStreamDefaultControllerPerformTransform(controller, chunk);
-}
-
-function transformStreamDefaultSourcePullAlgorithm<I, O>(
- stream: TransformStreamImpl<I, O>,
-): Promise<void> {
- assert(stream[sym.backpressure] === true);
- assert(stream[sym.backpressureChangePromise] !== undefined);
- transformStreamSetBackpressure(stream, false);
- return stream[sym.backpressureChangePromise]!.promise;
-}
-
-function transformStreamError<I, O>(
- stream: TransformStreamImpl<I, O>,
- e: any,
-): void {
- readableStreamDefaultControllerError(
- stream[sym.readable][
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl<O>,
- e,
- );
- transformStreamErrorWritableAndUnblockWrite(stream, e);
-}
-
-export function transformStreamDefaultControllerTerminate<I, O>(
- controller: TransformStreamDefaultControllerImpl<I, O>,
-): void {
- const stream = controller[sym.controlledTransformStream];
- const readableController = stream[sym.readable][
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl<O>;
- readableStreamDefaultControllerClose(readableController);
- const error = new TypeError("TransformStream is closed.");
- transformStreamErrorWritableAndUnblockWrite(stream, error);
-}
-
-function transformStreamErrorWritableAndUnblockWrite<I, O>(
- stream: TransformStreamImpl<I, O>,
- e: any,
-): void {
- transformStreamDefaultControllerClearAlgorithms(
- stream[sym.transformStreamController],
- );
- writableStreamDefaultControllerErrorIfNeeded(
- stream[sym.writable][sym.writableStreamController]!,
- e,
- );
- if (stream[sym.backpressure]) {
- transformStreamSetBackpressure(stream, false);
- }
-}
-
-function transformStreamSetBackpressure<I, O>(
- stream: TransformStreamImpl<I, O>,
- backpressure: boolean,
-): void {
- assert(stream[sym.backpressure] !== backpressure);
- if (stream[sym.backpressureChangePromise] !== undefined) {
- stream[sym.backpressureChangePromise]!.resolve!(undefined);
- }
- stream[sym.backpressureChangePromise] = getDeferred<void>();
- stream[sym.backpressure] = backpressure;
-}
-
-function transferArrayBuffer(buffer: ArrayBuffer): ArrayBuffer {
- assert(!isDetachedBuffer(buffer));
- const transferredIshVersion = buffer.slice(0);
-
- Object.defineProperty(buffer, "byteLength", {
- get(): number {
- return 0;
- },
- });
- (buffer as any)[sym.isFakeDetached] = true;
-
- return transferredIshVersion;
-}
-
-export function validateAndNormalizeHighWaterMark(
- highWaterMark: number,
-): number {
- highWaterMark = Number(highWaterMark);
- if (Number.isNaN(highWaterMark) || highWaterMark < 0) {
- throw new RangeError(
- `highWaterMark must be a positive number or Infinity. Received: ${highWaterMark}.`,
- );
- }
- return highWaterMark;
-}
-
-export function writableStreamAbort<W>(
- stream: WritableStreamImpl<W>,
- reason: any,
-): Promise<void> {
- const state = stream[sym.state];
- if (state === "closed" || state === "errored") {
- return Promise.resolve(undefined);
- }
- if (stream[sym.pendingAbortRequest]) {
- return stream[sym.pendingAbortRequest]!.promise.promise;
- }
- assert(state === "writable" || state === "erroring");
- let wasAlreadyErroring = false;
- if (state === "erroring") {
- wasAlreadyErroring = true;
- reason = undefined;
- }
- const promise = getDeferred<void>();
- stream[sym.pendingAbortRequest] = { promise, reason, wasAlreadyErroring };
-
- if (wasAlreadyErroring === false) {
- writableStreamStartErroring(stream, reason);
- }
- return promise.promise;
-}
-
-function writableStreamAddWriteRequest<W>(
- stream: WritableStreamImpl<W>,
-): Promise<void> {
- assert(isWritableStream(stream));
- assert(stream[sym.state] === "writable");
- const promise = getDeferred<void>();
- stream[sym.writeRequests].push(promise);
- return promise.promise;
-}
-
-export function writableStreamClose<W>(
- stream: WritableStreamImpl<W>,
-): Promise<void> {
- const state = stream[sym.state];
- if (state === "closed" || state === "errored") {
- return Promise.reject(
- new TypeError(
- "Cannot close an already closed or errored WritableStream.",
- ),
- );
- }
- assert(!writableStreamCloseQueuedOrInFlight(stream));
- const promise = getDeferred<void>();
- stream[sym.closeRequest] = promise;
- const writer = stream[sym.writer];
- if (writer && stream[sym.backpressure] && state === "writable") {
- writer[sym.readyPromise].resolve!();
- writer[sym.readyPromise].resolve = undefined;
- writer[sym.readyPromise].reject = undefined;
- }
- writableStreamDefaultControllerClose(stream[sym.writableStreamController]!);
- return promise.promise;
-}
-
-export function writableStreamCloseQueuedOrInFlight<W>(
- stream: WritableStreamImpl<W>,
-): boolean {
- return !(
- stream[sym.closeRequest] === undefined &&
- stream[sym.inFlightCloseRequest] === undefined
- );
-}
-
-function writableStreamDealWithRejection<W>(
- stream: WritableStreamImpl<W>,
- error: any,
-): void {
- const state = stream[sym.state];
- if (state === "writable") {
- writableStreamStartErroring(stream, error);
- return;
- }
- assert(state === "erroring");
- writableStreamFinishErroring(stream);
-}
-
-function writableStreamDefaultControllerAdvanceQueueIfNeeded<W>(
- controller: WritableStreamDefaultControllerImpl<W>,
-): void {
- const stream = controller[sym.controlledWritableStream];
- if (!controller[sym.started]) {
- return;
- }
- if (stream[sym.inFlightWriteRequest]) {
- return;
- }
- const state = stream[sym.state];
- assert(state !== "closed" && state !== "errored");
- if (state === "erroring") {
- writableStreamFinishErroring(stream);
- return;
- }
- if (!controller[sym.queue].length) {
- return;
- }
- const writeRecord = peekQueueValue(controller);
- if (writeRecord === "close") {
- writableStreamDefaultControllerProcessClose(controller);
- } else {
- writableStreamDefaultControllerProcessWrite(controller, writeRecord.chunk);
- }
-}
-
-export function writableStreamDefaultControllerClearAlgorithms<W>(
- controller: WritableStreamDefaultControllerImpl<W>,
-): void {
- (controller as any)[sym.writeAlgorithm] = undefined;
- (controller as any)[sym.closeAlgorithm] = undefined;
- (controller as any)[sym.abortAlgorithm] = undefined;
- (controller as any)[sym.strategySizeAlgorithm] = undefined;
-}
-
-function writableStreamDefaultControllerClose<W>(
- controller: WritableStreamDefaultControllerImpl<W>,
-): void {
- enqueueValueWithSize(controller, "close", 0);
- writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
-}
-
-export function writableStreamDefaultControllerError<W>(
- controller: WritableStreamDefaultControllerImpl<W>,
- error: any,
-): void {
- const stream = controller[sym.controlledWritableStream];
- assert(stream[sym.state] === "writable");
- writableStreamDefaultControllerClearAlgorithms(controller);
- writableStreamStartErroring(stream, error);
-}
-
-function writableStreamDefaultControllerErrorIfNeeded<W>(
- controller: WritableStreamDefaultControllerImpl<W>,
- error: any,
-): void {
- if (controller[sym.controlledWritableStream][sym.state] === "writable") {
- writableStreamDefaultControllerError(controller, error);
- }
-}
-
-function writableStreamDefaultControllerGetBackpressure<W>(
- controller: WritableStreamDefaultControllerImpl<W>,
-): boolean {
- const desiredSize = writableStreamDefaultControllerGetDesiredSize(controller);
- return desiredSize <= 0;
-}
-
-function writableStreamDefaultControllerGetChunkSize<W>(
- controller: WritableStreamDefaultControllerImpl<W>,
- chunk: W,
-): number {
- let returnValue: number;
- try {
- returnValue = controller[sym.strategySizeAlgorithm](chunk);
- } catch (e) {
- writableStreamDefaultControllerErrorIfNeeded(controller, e);
- return 1;
- }
- return returnValue;
-}
-
-function writableStreamDefaultControllerGetDesiredSize<W>(
- controller: WritableStreamDefaultControllerImpl<W>,
-): number {
- return controller[sym.strategyHWM] - controller[sym.queueTotalSize];
-}
-
-function writableStreamDefaultControllerProcessClose<W>(
- controller: WritableStreamDefaultControllerImpl<W>,
-): void {
- const stream = controller[sym.controlledWritableStream];
- writableStreamMarkCloseRequestInFlight(stream);
- dequeueValue(controller);
- assert(controller[sym.queue].length === 0);
- const sinkClosePromise = controller[sym.closeAlgorithm]();
- writableStreamDefaultControllerClearAlgorithms(controller);
- setPromiseIsHandledToTrue(
- sinkClosePromise.then(
- () => {
- writableStreamFinishInFlightClose(stream);
- },
- (reason) => {
- writableStreamFinishInFlightCloseWithError(stream, reason);
- },
- ),
- );
-}
-
-function writableStreamDefaultControllerProcessWrite<W>(
- controller: WritableStreamDefaultControllerImpl<W>,
- chunk: W,
-): void {
- const stream = controller[sym.controlledWritableStream];
- writableStreamMarkFirstWriteRequestInFlight(stream);
- const sinkWritePromise = controller[sym.writeAlgorithm](chunk);
- setPromiseIsHandledToTrue(
- sinkWritePromise.then(
- () => {
- writableStreamFinishInFlightWrite(stream);
- const state = stream[sym.state];
- assert(state === "writable" || state === "erroring");
- dequeueValue(controller);
- if (
- !writableStreamCloseQueuedOrInFlight(stream) &&
- state === "writable"
- ) {
- const backpressure = writableStreamDefaultControllerGetBackpressure(
- controller,
- );
- writableStreamUpdateBackpressure(stream, backpressure);
- }
- writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
- },
- (reason) => {
- if (stream[sym.state] === "writable") {
- writableStreamDefaultControllerClearAlgorithms(controller);
- }
- writableStreamFinishInFlightWriteWithError(stream, reason);
- },
- ),
- );
-}
-
-function writableStreamDefaultControllerWrite<W>(
- controller: WritableStreamDefaultControllerImpl<W>,
- chunk: W,
- chunkSize: number,
-): void {
- const writeRecord = { chunk };
- try {
- enqueueValueWithSize(controller, writeRecord, chunkSize);
- } catch (e) {
- writableStreamDefaultControllerErrorIfNeeded(controller, e);
- return;
- }
- const stream = controller[sym.controlledWritableStream];
- if (
- !writableStreamCloseQueuedOrInFlight(stream) &&
- stream[sym.state] === "writable"
- ) {
- const backpressure = writableStreamDefaultControllerGetBackpressure(
- controller,
- );
- writableStreamUpdateBackpressure(stream, backpressure);
- }
- writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
-}
-
-export function writableStreamDefaultWriterAbort<W>(
- writer: WritableStreamDefaultWriterImpl<W>,
- reason: any,
-): Promise<void> {
- const stream = writer[sym.ownerWritableStream];
- assert(stream);
- return writableStreamAbort(stream, reason);
-}
-
-export function writableStreamDefaultWriterClose<W>(
- writer: WritableStreamDefaultWriterImpl<W>,
-): Promise<void> {
- const stream = writer[sym.ownerWritableStream];
- assert(stream);
- return writableStreamClose(stream);
-}
-
-function writableStreamDefaultWriterCloseWithErrorPropagation<W>(
- writer: WritableStreamDefaultWriterImpl<W>,
-): Promise<void> {
- const stream = writer[sym.ownerWritableStream];
- assert(stream);
- const state = stream[sym.state];
- if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") {
- return Promise.resolve();
- }
- if (state === "errored") {
- return Promise.reject(stream[sym.storedError]);
- }
- assert(state === "writable" || state === "erroring");
- return writableStreamDefaultWriterClose(writer);
-}
-
-function writableStreamDefaultWriterEnsureClosePromiseRejected<W>(
- writer: WritableStreamDefaultWriterImpl<W>,
- error: any,
-): void {
- if (writer[sym.closedPromise].reject) {
- writer[sym.closedPromise].reject!(error);
- } else {
- writer[sym.closedPromise] = {
- promise: Promise.reject(error),
- };
- }
- setPromiseIsHandledToTrue(writer[sym.closedPromise].promise);
-}
-
-function writableStreamDefaultWriterEnsureReadyPromiseRejected<W>(
- writer: WritableStreamDefaultWriterImpl<W>,
- error: any,
-): void {
- if (writer[sym.readyPromise].reject) {
- writer[sym.readyPromise].reject!(error);
- writer[sym.readyPromise].reject = undefined;
- writer[sym.readyPromise].resolve = undefined;
- } else {
- writer[sym.readyPromise] = {
- promise: Promise.reject(error),
- };
- }
- setPromiseIsHandledToTrue(writer[sym.readyPromise].promise);
-}
-
-export function writableStreamDefaultWriterWrite<W>(
- writer: WritableStreamDefaultWriterImpl<W>,
- chunk: W,
-): Promise<void> {
- const stream = writer[sym.ownerWritableStream];
- assert(stream);
- const controller = stream[sym.writableStreamController];
- assert(controller);
- const chunkSize = writableStreamDefaultControllerGetChunkSize(
- controller,
- chunk,
- );
- if (stream !== writer[sym.ownerWritableStream]) {
- return Promise.reject("Writer has incorrect WritableStream.");
- }
- const state = stream[sym.state];
- if (state === "errored") {
- return Promise.reject(stream[sym.storedError]);
- }
- if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") {
- return Promise.reject(new TypeError("The stream is closed or closing."));
- }
- if (state === "erroring") {
- return Promise.reject(stream[sym.storedError]);
- }
- assert(state === "writable");
- const promise = writableStreamAddWriteRequest(stream);
- writableStreamDefaultControllerWrite(controller, chunk, chunkSize);
- return promise;
-}
-
-export function writableStreamDefaultWriterGetDesiredSize<W>(
- writer: WritableStreamDefaultWriterImpl<W>,
-): number | null {
- const stream = writer[sym.ownerWritableStream];
- const state = stream[sym.state];
- if (state === "errored" || state === "erroring") {
- return null;
- }
- if (state === "closed") {
- return 0;
- }
- return writableStreamDefaultControllerGetDesiredSize(
- stream[sym.writableStreamController]!,
- );
-}
-
-export function writableStreamDefaultWriterRelease<W>(
- writer: WritableStreamDefaultWriterImpl<W>,
-): void {
- const stream = writer[sym.ownerWritableStream];
- assert(stream);
- assert(stream[sym.writer] === writer);
- const releasedError = new TypeError(
- "Writer was released and can no longer be used to monitor the stream's closedness.",
- );
- writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError);
- writableStreamDefaultWriterEnsureClosePromiseRejected(writer, releasedError);
- stream[sym.writer] = undefined;
- (writer as any)[sym.ownerWritableStream] = undefined;
-}
-
-function writableStreamFinishErroring<W>(stream: WritableStreamImpl<W>): void {
- assert(stream[sym.state] === "erroring");
- assert(!writableStreamHasOperationMarkedInFlight(stream));
- stream[sym.state] = "errored";
- stream[sym.writableStreamController]![sym.errorSteps]();
- const storedError = stream[sym.storedError];
- for (const writeRequest of stream[sym.writeRequests]) {
- assert(writeRequest.reject);
- writeRequest.reject(storedError);
- }
- stream[sym.writeRequests] = [];
- if (!stream[sym.pendingAbortRequest]) {
- writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
- return;
- }
- const abortRequest = stream[sym.pendingAbortRequest];
- assert(abortRequest);
- stream[sym.pendingAbortRequest] = undefined;
- if (abortRequest.wasAlreadyErroring) {
- assert(abortRequest.promise.reject);
- abortRequest.promise.reject(storedError);
- writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
- return;
- }
- const promise = stream[sym.writableStreamController]![sym.abortSteps](
- abortRequest.reason,
- );
- setPromiseIsHandledToTrue(
- promise.then(
- () => {
- assert(abortRequest.promise.resolve);
- abortRequest.promise.resolve();
- writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
- },
- (reason) => {
- assert(abortRequest.promise.reject);
- abortRequest.promise.reject(reason);
- writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
- },
- ),
- );
-}
-
-function writableStreamFinishInFlightClose<W>(
- stream: WritableStreamImpl<W>,
-): void {
- assert(stream[sym.inFlightCloseRequest]);
- stream[sym.inFlightCloseRequest]?.resolve!();
- stream[sym.inFlightCloseRequest] = undefined;
- const state = stream[sym.state];
- assert(state === "writable" || state === "erroring");
- if (state === "erroring") {
- stream[sym.storedError] = undefined;
- if (stream[sym.pendingAbortRequest]) {
- stream[sym.pendingAbortRequest]!.promise.resolve!();
- stream[sym.pendingAbortRequest] = undefined;
- }
- }
- stream[sym.state] = "closed";
- const writer = stream[sym.writer];
- if (writer) {
- writer[sym.closedPromise].resolve!();
- }
- assert(stream[sym.pendingAbortRequest] === undefined);
- assert(stream[sym.storedError] === undefined);
-}
-
-function writableStreamFinishInFlightCloseWithError<W>(
- stream: WritableStreamImpl<W>,
- error: any,
-): void {
- assert(stream[sym.inFlightCloseRequest]);
- stream[sym.inFlightCloseRequest]?.reject!(error);
- stream[sym.inFlightCloseRequest] = undefined;
- assert(stream[sym.state] === "writable" || stream[sym.state] === "erroring");
- if (stream[sym.pendingAbortRequest]) {
- stream[sym.pendingAbortRequest]?.promise.reject!(error);
- stream[sym.pendingAbortRequest] = undefined;
- }
- writableStreamDealWithRejection(stream, error);
-}
-
-function writableStreamFinishInFlightWrite<W>(
- stream: WritableStreamImpl<W>,
-): void {
- assert(stream[sym.inFlightWriteRequest]);
- stream[sym.inFlightWriteRequest]!.resolve();
- stream[sym.inFlightWriteRequest] = undefined;
-}
-
-function writableStreamFinishInFlightWriteWithError<W>(
- stream: WritableStreamImpl<W>,
- error: any,
-): void {
- assert(stream[sym.inFlightWriteRequest]);
- stream[sym.inFlightWriteRequest]!.reject!(error);
- stream[sym.inFlightWriteRequest] = undefined;
- assert(stream[sym.state] === "writable" || stream[sym.state] === "erroring");
- writableStreamDealWithRejection(stream, error);
-}
-
-function writableStreamHasOperationMarkedInFlight<W>(
- stream: WritableStreamImpl<W>,
-): boolean {
- return !(
- stream[sym.inFlightWriteRequest] === undefined &&
- stream[sym.inFlightCloseRequest] === undefined
- );
-}
-
-function writableStreamMarkCloseRequestInFlight<W>(
- stream: WritableStreamImpl<W>,
-): void {
- assert(stream[sym.inFlightCloseRequest] === undefined);
- assert(stream[sym.closeRequest] !== undefined);
- stream[sym.inFlightCloseRequest] = stream[sym.closeRequest];
- stream[sym.closeRequest] = undefined;
-}
-
-function writableStreamMarkFirstWriteRequestInFlight<W>(
- stream: WritableStreamImpl<W>,
-): void {
- assert(stream[sym.inFlightWriteRequest] === undefined);
- assert(stream[sym.writeRequests].length);
- const writeRequest = stream[sym.writeRequests].shift();
- stream[sym.inFlightWriteRequest] = writeRequest;
-}
-
-function writableStreamRejectCloseAndClosedPromiseIfNeeded<W>(
- stream: WritableStreamImpl<W>,
-): void {
- assert(stream[sym.state] === "errored");
- if (stream[sym.closeRequest]) {
- assert(stream[sym.inFlightCloseRequest] === undefined);
- stream[sym.closeRequest]!.reject!(stream[sym.storedError]);
- stream[sym.closeRequest] = undefined;
- }
- const writer = stream[sym.writer];
- if (writer) {
- writer[sym.closedPromise].reject!(stream[sym.storedError]);
- setPromiseIsHandledToTrue(writer[sym.closedPromise].promise);
- }
-}
-
-function writableStreamStartErroring<W>(
- stream: WritableStreamImpl<W>,
- reason: any,
-): void {
- assert(stream[sym.storedError] === undefined);
- assert(stream[sym.state] === "writable");
- const controller = stream[sym.writableStreamController];
- assert(controller);
- stream[sym.state] = "erroring";
- stream[sym.storedError] = reason;
- const writer = stream[sym.writer];
- if (writer) {
- writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);
- }
- if (
- !writableStreamHasOperationMarkedInFlight(stream) &&
- controller[sym.started]
- ) {
- writableStreamFinishErroring(stream);
- }
-}
-
-function writableStreamUpdateBackpressure<W>(
- stream: WritableStreamImpl<W>,
- backpressure: boolean,
-): void {
- assert(stream[sym.state] === "writable");
- assert(!writableStreamCloseQueuedOrInFlight(stream));
- const writer = stream[sym.writer];
- if (writer && backpressure !== stream[sym.backpressure]) {
- if (backpressure) {
- writer[sym.readyPromise] = getDeferred();
- } else {
- assert(backpressure === false);
- writer[sym.readyPromise].resolve!();
- writer[sym.readyPromise].resolve = undefined;
- writer[sym.readyPromise].reject = undefined;
- }
- }
- stream[sym.backpressure] = backpressure;
-}
-
-/* eslint-enable */