summaryrefslogtreecommitdiff
path: root/cli/js/web/streams/internals.ts
diff options
context:
space:
mode:
Diffstat (limited to 'cli/js/web/streams/internals.ts')
-rw-r--r--cli/js/web/streams/internals.ts1224
1 files changed, 1115 insertions, 109 deletions
diff --git a/cli/js/web/streams/internals.ts b/cli/js/web/streams/internals.ts
index 2559d9e5c..846db096e 100644
--- a/cli/js/web/streams/internals.ts
+++ b/cli/js/web/streams/internals.ts
@@ -13,13 +13,25 @@ import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_c
import { ReadableStreamDefaultReaderImpl } from "./readable_stream_default_reader.ts";
import { ReadableStreamImpl } from "./readable_stream.ts";
import * as sym from "./symbols.ts";
+import { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts";
+import { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts";
+import { WritableStreamImpl } from "./writable_stream.ts";
+import { AbortSignalImpl } from "../abort_signal.ts";
+import { DOMExceptionImpl as DOMException } from "../dom_exception.ts";
import { cloneValue } from "../util.ts";
-import { assert } from "../../util.ts";
+import { assert, AssertionError } from "../../util.ts";
+export type AbortAlgorithm = (reason?: any) => PromiseLike<void>;
+export interface AbortRequest {
+ promise: Deferred<void>;
+ reason?: any;
+ wasAlreadyErroring: boolean;
+}
export interface BufferQueueItem extends Pair<ArrayBuffer | SharedArrayBuffer> {
offset: number;
}
export type CancelAlgorithm = (reason?: any) => PromiseLike<void>;
+export type CloseAlgorithm = () => PromiseLike<void>;
type Container<R = any> = {
[sym.queue]: Array<Pair<R> | BufferQueueItem>;
[sym.queueTotalSize]: number;
@@ -28,11 +40,11 @@ export type Pair<R> = { value: R; size: number };
export type PullAlgorithm = () => PromiseLike<void>;
export type SizeAlgorithm<T> = (chunk: T) => number;
export type StartAlgorithm = () => void | PromiseLike<void>;
+export type WriteAlgorithm<W> = (chunk: W) => Promise<void>;
export interface Deferred<T> {
promise: Promise<T>;
resolve?: (value?: T | PromiseLike<T>) => void;
reject?: (reason?: any) => void;
- settled: boolean;
}
export interface ReadableStreamGenericReader<R = any>
@@ -58,6 +70,12 @@ export function acquireReadableStreamDefaultReader<T>(
return reader;
}
+export function acquireWritableStreamDefaultWriter<W>(
+ stream: WritableStreamImpl<W>
+): WritableStreamDefaultWriterImpl<W> {
+ return new WritableStreamDefaultWriterImpl(stream);
+}
+
function createAlgorithmFromUnderlyingMethod<
O extends UnderlyingByteSource | UnderlyingSource,
P extends keyof O
@@ -157,14 +175,14 @@ function enqueueValueWithSize<R>(
/** Non-spec mechanism to "unwrap" a promise and store it to be resolved
* later. */
-function getDeferred<T>(): Deferred<T> {
- let resolve = undefined;
- let reject = undefined;
+export function getDeferred<T>(): Required<Deferred<T>> {
+ let resolve: (value?: T | PromiseLike<T>) => void;
+ let reject: (reason?: any) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
- return { promise, resolve, reject, settled: false };
+ return { promise, resolve: resolve!, reject: reject! };
}
export function initializeReadableStream(stream: ReadableStreamImpl): void {
@@ -173,6 +191,17 @@ export function initializeReadableStream(stream: ReadableStreamImpl): void {
stream[sym.disturbed] = false;
}
+export function initializeWritableStream(stream: WritableStreamImpl): void {
+ stream[sym.state] = "writable";
+ stream[sym.storedError] = stream[sym.writer] = stream[
+ sym.writableStreamController
+ ] = stream[sym.inFlightWriteRequest] = stream[sym.closeRequest] = stream[
+ sym.inFlightCloseRequest
+ ] = stream[sym.pendingAbortRequest] = undefined;
+ stream[sym.writeRequests] = [];
+ stream[sym.backpressure] = false;
+}
+
function invokeOrNoop<O extends any, P extends keyof O>(
o: O,
p: P,
@@ -278,6 +307,40 @@ export function isUnderlyingByteSource(
return typeString === "bytes";
}
+export function isWritableStream(x: unknown): x is WritableStreamImpl {
+ return typeof x !== "object" ||
+ x === null ||
+ !(sym.writableStreamController in x)
+ ? false
+ : true;
+}
+
+export function isWritableStreamDefaultController(
+ x: unknown
+): x is WritableStreamDefaultControllerImpl<any> {
+ return typeof x !== "object" ||
+ x === null ||
+ !(sym.controlledWritableStream in x)
+ ? false
+ : true;
+}
+
+export function isWritableStreamDefaultWriter(
+ x: unknown
+): x is WritableStreamDefaultWriterImpl<any> {
+ return typeof x !== "object" || x === null || !(sym.ownerWritableStream in x)
+ ? false
+ : true;
+}
+
+export function isWritableStreamLocked(stream: WritableStreamImpl): boolean {
+ assert(isWritableStream(stream));
+ if (stream[sym.writer] === undefined) {
+ return false;
+ }
+ return true;
+}
+
export function makeSizeAlgorithmFromSizeFunction<T>(
size: QueuingStrategySizeCallback<T> | undefined
): SizeAlgorithm<T> {
@@ -292,6 +355,13 @@ export function makeSizeAlgorithmFromSizeFunction<T>(
};
}
+function peekQueueValue<T>(container: Container<T>): T | "close" {
+ assert(sym.queue in container && sym.queueTotalSize in container);
+ assert(container[sym.queue].length);
+ const [pair] = container[sym.queue];
+ return pair.value as T;
+}
+
function readableByteStreamControllerShouldCallPull(
controller: ReadableByteStreamControllerImpl
): boolean {
@@ -333,25 +403,27 @@ export function readableByteStreamControllerCallPullIfNeeded(
assert(controller[sym.pullAgain] === false);
controller[sym.pulling] = true;
const pullPromise = controller[sym.pullAlgorithm]();
- pullPromise.then(
- () => {
- controller[sym.pulling] = false;
- if (controller[sym.pullAgain]) {
- controller[sym.pullAgain];
- readableByteStreamControllerCallPullIfNeeded(controller);
+ setPromiseIsHandledToTrue(
+ pullPromise.then(
+ () => {
+ controller[sym.pulling] = false;
+ if (controller[sym.pullAgain]) {
+ controller[sym.pullAgain] = false;
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ }
+ },
+ (e) => {
+ readableByteStreamControllerError(controller, e);
}
- },
- (e) => {
- readableByteStreamControllerError(controller, e);
- }
+ )
);
}
export function readableByteStreamControllerClearAlgorithms(
controller: ReadableByteStreamControllerImpl
): void {
- delete controller[sym.pullAlgorithm];
- delete controller[sym.cancelAlgorithm];
+ (controller as any)[sym.pullAlgorithm] = undefined;
+ (controller as any)[sym.cancelAlgorithm] = undefined;
}
export function readableByteStreamControllerClose(
@@ -476,7 +548,7 @@ export function readableStreamAddReadRequest<R>(
return promise.promise;
}
-export async function readableStreamCancel<T>(
+export function readableStreamCancel<T>(
stream: ReadableStreamImpl<T>,
reason: any
): Promise<void> {
@@ -488,7 +560,9 @@ export async function readableStreamCancel<T>(
return Promise.reject(stream[sym.storedError]);
}
readableStreamClose(stream);
- await stream[sym.readableStreamController]![sym.cancelSteps](reason);
+ return stream[sym.readableStreamController]![sym.cancelSteps](reason).then(
+ () => undefined
+ ) as Promise<void>;
}
export function readableStreamClose<T>(stream: ReadableStreamImpl<T>): void {
@@ -514,7 +588,6 @@ export function readableStreamClose<T>(stream: ReadableStreamImpl<T>): void {
const resolve = reader[sym.closedPromise].resolve;
assert(resolve);
resolve();
- reader[sym.closedPromise].settled = true;
}
export function readableStreamCreateReadResult<T>(
@@ -573,9 +646,9 @@ export function readableStreamDefaultControllerCanCloseOrEnqueue<T>(
export function readableStreamDefaultControllerClearAlgorithms<T>(
controller: ReadableStreamDefaultControllerImpl<T>
): void {
- delete controller[sym.pullAlgorithm];
- delete controller[sym.cancelAlgorithm];
- delete controller[sym.strategySizeAlgorithm];
+ (controller as any)[sym.pullAlgorithm] = undefined;
+ (controller as any)[sym.cancelAlgorithm] = undefined;
+ (controller as any)[sym.strategySizeAlgorithm] = undefined;
}
export function readableStreamDefaultControllerClose<T>(
@@ -703,17 +776,18 @@ export function readableStreamError(stream: ReadableStreamImpl, e: any): void {
}
if (isReadableStreamDefaultReader(reader)) {
for (const readRequest of reader[sym.readRequests]) {
- const { reject } = readRequest;
- assert(reject);
- reject(e);
+ assert(readRequest.reject);
+ readRequest.reject(e);
+ readRequest.reject = undefined;
+ readRequest.resolve = undefined;
}
reader[sym.readRequests] = [];
}
// 3.5.6.8 Otherwise, support BYOB Reader
- const { reject } = reader[sym.closedPromise];
- assert(reject);
- reject(e);
- reader[sym.closedPromise].settled = true;
+ reader[sym.closedPromise].reject!(e);
+ reader[sym.closedPromise].reject = undefined;
+ reader[sym.closedPromise].resolve = undefined;
+ setPromiseIsHandledToTrue(reader[sym.closedPromise].promise);
}
export function readableStreamFulfillReadRequest<R>(
@@ -744,6 +818,252 @@ export function readableStreamHasDefaultReader(
: true;
}
+export function readableStreamPipeTo<T>(
+ source: ReadableStreamImpl<T>,
+ dest: WritableStreamImpl<T>,
+ preventClose: boolean,
+ preventAbort: boolean,
+ preventCancel: boolean,
+ signal: AbortSignalImpl | undefined
+): Promise<void> {
+ assert(isReadableStream(source));
+ assert(isWritableStream(dest));
+ assert(
+ typeof preventClose === "boolean" &&
+ typeof preventAbort === "boolean" &&
+ typeof preventCancel === "boolean"
+ );
+ assert(signal === undefined || signal instanceof AbortSignalImpl);
+ assert(!isReadableStreamLocked(source));
+ assert(!isWritableStreamLocked(dest));
+ const reader = acquireReadableStreamDefaultReader(source);
+ const writer = acquireWritableStreamDefaultWriter(dest);
+ source[sym.disturbed] = true;
+ let shuttingDown = false;
+ const promise = getDeferred<void>();
+ let abortAlgorithm: () => void;
+ if (signal) {
+ abortAlgorithm = (): void => {
+ const error = new DOMException("Abort signal received.", "AbortSignal");
+ const actions: Array<() => Promise<void>> = [];
+ if (!preventAbort) {
+ actions.push(() => {
+ if (dest[sym.state] === "writable") {
+ return writableStreamAbort(dest, error);
+ } else {
+ return Promise.resolve(undefined);
+ }
+ });
+ }
+ if (!preventCancel) {
+ actions.push(() => {
+ if (source[sym.state] === "readable") {
+ return readableStreamCancel(source, error);
+ } else {
+ return Promise.resolve(undefined);
+ }
+ });
+ }
+ shutdownWithAction(
+ () => Promise.all(actions.map((action) => action())),
+ true,
+ error
+ );
+ };
+ if (signal.aborted) {
+ abortAlgorithm();
+ return promise.promise;
+ }
+ signal.addEventListener("abort", abortAlgorithm);
+ }
+
+ let currentWrite = Promise.resolve();
+
+ // At this point, the spec becomes non-specific and vague. Most of the rest
+ // of this code is based on the reference implementation that is part of the
+ // specification. This is why the functions are only scoped to this function
+ // to ensure they don't leak into the spec compliant parts.
+
+ function isOrBecomesClosed(
+ stream: ReadableStreamImpl | WritableStreamImpl,
+ promise: Promise<void>,
+ action: () => void
+ ): void {
+ if (stream[sym.state] === "closed") {
+ action();
+ } else {
+ setPromiseIsHandledToTrue(promise.then(action));
+ }
+ }
+
+ function isOrBecomesErrored(
+ stream: ReadableStreamImpl | WritableStreamImpl,
+ promise: Promise<void>,
+ action: (error: any) => void
+ ): void {
+ if (stream[sym.state] === "errored") {
+ action(stream[sym.storedError]);
+ } else {
+ setPromiseIsHandledToTrue(promise.catch((error) => action(error)));
+ }
+ }
+
+ function finalize(isError?: boolean, error?: any): void {
+ writableStreamDefaultWriterRelease(writer);
+ readableStreamReaderGenericRelease(reader);
+
+ if (signal) {
+ signal.removeEventListener("abort", abortAlgorithm);
+ }
+ if (isError) {
+ promise.reject(error);
+ } else {
+ promise.resolve();
+ }
+ }
+
+ function waitForWritesToFinish(): Promise<void> {
+ const oldCurrentWrite = currentWrite;
+ return currentWrite.then(() =>
+ oldCurrentWrite !== currentWrite ? waitForWritesToFinish() : undefined
+ );
+ }
+
+ function shutdownWithAction(
+ action: () => Promise<any>,
+ originalIsError?: boolean,
+ originalError?: any
+ ): void {
+ function doTheRest(): void {
+ setPromiseIsHandledToTrue(
+ action().then(
+ () => finalize(originalIsError, originalError),
+ (newError) => finalize(true, newError)
+ )
+ );
+ }
+
+ if (shuttingDown) {
+ return;
+ }
+ shuttingDown = true;
+
+ if (
+ dest[sym.state] === "writable" &&
+ writableStreamCloseQueuedOrInFlight(dest) === false
+ ) {
+ setPromiseIsHandledToTrue(waitForWritesToFinish().then(doTheRest));
+ } else {
+ doTheRest();
+ }
+ }
+
+ function shutdown(isError: boolean, error?: any): void {
+ if (shuttingDown) {
+ return;
+ }
+ shuttingDown = true;
+
+ if (
+ dest[sym.state] === "writable" &&
+ !writableStreamCloseQueuedOrInFlight(dest)
+ ) {
+ setPromiseIsHandledToTrue(
+ waitForWritesToFinish().then(() => finalize(isError, error))
+ );
+ }
+ finalize(isError, error);
+ }
+
+ function pipeStep(): Promise<boolean> {
+ if (shuttingDown) {
+ return Promise.resolve(true);
+ }
+ return writer[sym.readyPromise].promise.then(() => {
+ return readableStreamDefaultReaderRead(reader).then(({ value, done }) => {
+ if (done === true) {
+ return true;
+ }
+ currentWrite = writableStreamDefaultWriterWrite(
+ writer,
+ value!
+ ).then(undefined, () => {});
+ return false;
+ });
+ });
+ }
+
+ function pipeLoop(): Promise<void> {
+ return new Promise((resolveLoop, rejectLoop) => {
+ function next(done: boolean): void {
+ if (done) {
+ resolveLoop(undefined);
+ } else {
+ setPromiseIsHandledToTrue(pipeStep().then(next, rejectLoop));
+ }
+ }
+ next(false);
+ });
+ }
+
+ isOrBecomesErrored(
+ source,
+ reader[sym.closedPromise].promise,
+ (storedError) => {
+ if (!preventAbort) {
+ shutdownWithAction(
+ () => writableStreamAbort(dest, storedError),
+ true,
+ storedError
+ );
+ } else {
+ shutdown(true, storedError);
+ }
+ }
+ );
+
+ isOrBecomesErrored(dest, writer[sym.closedPromise].promise, (storedError) => {
+ if (!preventCancel) {
+ shutdownWithAction(
+ () => readableStreamCancel(source, storedError),
+ true,
+ storedError
+ );
+ } else {
+ shutdown(true, storedError);
+ }
+ });
+
+ isOrBecomesClosed(source, reader[sym.closedPromise].promise, () => {
+ if (!preventClose) {
+ shutdownWithAction(() =>
+ writableStreamDefaultWriterCloseWithErrorPropagation(writer)
+ );
+ }
+ });
+
+ if (
+ writableStreamCloseQueuedOrInFlight(dest) ||
+ dest[sym.state] === "closed"
+ ) {
+ const destClosed = new TypeError(
+ "The destination writable stream closed before all data could be piped to it."
+ );
+ if (!preventCancel) {
+ shutdownWithAction(
+ () => readableStreamCancel(source, destClosed),
+ true,
+ destClosed
+ );
+ } else {
+ shutdown(true, destClosed);
+ }
+ }
+
+ setPromiseIsHandledToTrue(pipeLoop());
+ return promise.promise;
+}
+
export function readableStreamReaderGenericCancel<R = any>(
reader: ReadableStreamGenericReader<R>,
reason: any
@@ -763,16 +1083,13 @@ export function readableStreamReaderGenericInitialize<R = any>(
if (stream[sym.state] === "readable") {
reader[sym.closedPromise] = getDeferred();
} else if (stream[sym.state] === "closed") {
- reader[sym.closedPromise] = {
- promise: Promise.resolve(),
- settled: true,
- };
+ reader[sym.closedPromise] = { promise: Promise.resolve() };
} else {
assert(stream[sym.state] === "errored");
reader[sym.closedPromise] = {
promise: Promise.reject(stream[sym.storedError]),
- settled: true,
};
+ setPromiseIsHandledToTrue(reader[sym.closedPromise].promise);
}
}
@@ -790,9 +1107,9 @@ export function readableStreamReaderGenericRelease<R = any>(
delete closedPromise.reject;
delete closedPromise.resolve;
}
- closedPromise.settled = true;
- delete reader[sym.ownerReadableStream][sym.reader];
- delete reader[sym.ownerReadableStream];
+ setPromiseIsHandledToTrue(closedPromise.promise);
+ reader[sym.ownerReadableStream][sym.reader] = undefined;
+ (reader as any)[sym.ownerReadableStream] = undefined;
}
export function readableStreamTee<T>(
@@ -817,51 +1134,54 @@ export function readableStreamTee<T>(
return Promise.resolve();
}
reading = true;
- readableStreamDefaultReaderRead(reader).then((result) => {
- reading = false;
- assert(typeof result === "object");
- const { done } = result;
- assert(typeof done === "boolean");
- if (done) {
+ const readPromise = readableStreamDefaultReaderRead(reader).then(
+ (result) => {
+ reading = false;
+ assert(typeof result === "object");
+ const { done } = result;
+ assert(typeof done === "boolean");
+ if (done) {
+ if (!canceled1) {
+ readableStreamDefaultControllerClose(
+ branch1[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl
+ );
+ }
+ if (!canceled2) {
+ readableStreamDefaultControllerClose(
+ branch2[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl
+ );
+ }
+ return;
+ }
+ const { value } = result;
+ const value1 = value!;
+ let value2 = value!;
+ if (!canceled2 && cloneForBranch2) {
+ value2 = cloneValue(value2);
+ }
if (!canceled1) {
- readableStreamDefaultControllerClose(
+ readableStreamDefaultControllerEnqueue(
branch1[
sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl
+ ] as ReadableStreamDefaultControllerImpl,
+ value1
);
}
if (!canceled2) {
- readableStreamDefaultControllerClose(
+ readableStreamDefaultControllerEnqueue(
branch2[
sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl
+ ] as ReadableStreamDefaultControllerImpl,
+ value2
);
}
- return;
- }
- const { value } = result;
- const value1 = value!;
- let value2 = value!;
- if (!canceled2 && cloneForBranch2) {
- value2 = cloneValue(value2);
}
- if (!canceled1) {
- readableStreamDefaultControllerEnqueue(
- branch1[
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl,
- value1
- );
- }
- if (!canceled2) {
- readableStreamDefaultControllerEnqueue(
- branch2[
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl,
- value2
- );
- }
- });
+ );
+ setPromiseIsHandledToTrue(readPromise);
return Promise.resolve();
};
const cancel1Algorithm = (reason?: any): PromiseLike<void> => {
@@ -870,7 +1190,6 @@ export function readableStreamTee<T>(
if (canceled2) {
const compositeReason = [reason1, reason2];
const cancelResult = readableStreamCancel(stream, compositeReason);
- assert(cancelPromise.resolve);
cancelPromise.resolve(cancelResult);
}
return cancelPromise.promise;
@@ -881,7 +1200,6 @@ export function readableStreamTee<T>(
if (canceled1) {
const compositeReason = [reason1, reason2];
const cancelResult = readableStreamCancel(stream, compositeReason);
- assert(cancelPromise.resolve);
cancelPromise.resolve(cancelResult);
}
return cancelPromise.promise;
@@ -897,20 +1215,22 @@ export function readableStreamTee<T>(
pullAlgorithm,
cancel2Algorithm
);
- reader[sym.closedPromise].promise.catch((r) => {
- readableStreamDefaultControllerError(
- branch1[
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl,
- r
- );
- readableStreamDefaultControllerError(
- branch2[
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl,
- r
- );
- });
+ setPromiseIsHandledToTrue(
+ reader[sym.closedPromise].promise.catch((r) => {
+ readableStreamDefaultControllerError(
+ branch1[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl,
+ r
+ );
+ readableStreamDefaultControllerError(
+ branch2[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl,
+ r
+ );
+ })
+ );
return [branch1, branch2];
}
@@ -920,6 +1240,25 @@ export function resetQueue<R>(container: Container<R>): void {
container[sym.queueTotalSize] = 0;
}
+/** An internal function which provides a function name for some generated
+ * functions, so stack traces are a bit more readable. */
+export function setFunctionName(fn: Function, value: string): void {
+ Object.defineProperty(fn, "name", { value, configurable: true });
+}
+
+/** An internal function which mimics the behavior of setting the promise to
+ * handled in JavaScript. In this situation, an assertion failure, which
+ * shouldn't happen will get thrown, instead of swallowed. */
+export function setPromiseIsHandledToTrue(promise: PromiseLike<unknown>): void {
+ promise.then(undefined, (e) => {
+ if (e && e instanceof AssertionError) {
+ queueMicrotask(() => {
+ throw e;
+ });
+ }
+ });
+}
+
function setUpReadableByteStreamController(
stream: ReadableStreamImpl,
controller: ReadableByteStreamControllerImpl,
@@ -950,16 +1289,18 @@ function setUpReadableByteStreamController(
stream[sym.readableStreamController] = controller;
const startResult = startAlgorithm();
const startPromise = Promise.resolve(startResult);
- startPromise.then(
- () => {
- controller[sym.started] = true;
- assert(!controller[sym.pulling]);
- assert(!controller[sym.pullAgain]);
- readableByteStreamControllerCallPullIfNeeded(controller);
- },
- (r) => {
- readableByteStreamControllerError(controller, r);
- }
+ setPromiseIsHandledToTrue(
+ startPromise.then(
+ () => {
+ controller[sym.started] = true;
+ assert(!controller[sym.pulling]);
+ assert(!controller[sym.pullAgain]);
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ },
+ (r) => {
+ readableByteStreamControllerError(controller, r);
+ }
+ )
);
}
@@ -981,11 +1322,13 @@ export function setUpReadableByteStreamControllerFromUnderlyingSource(
0,
controller
);
+ setFunctionName(pullAlgorithm, "[[pullAlgorithm]]");
const cancelAlgorithm = createAlgorithmFromUnderlyingMethod(
underlyingByteSource,
"cancel",
1
);
+ setFunctionName(cancelAlgorithm, "[[cancelAlgorithm]]");
// 3.13.27.6 Let autoAllocateChunkSize be ? GetV(underlyingByteSource, "autoAllocateChunkSize").
const autoAllocateChunkSize = undefined;
setUpReadableByteStreamController(
@@ -1022,16 +1365,18 @@ function setUpReadableStreamDefaultController<T>(
stream[sym.readableStreamController] = controller;
const startResult = startAlgorithm();
const startPromise = Promise.resolve(startResult);
- startPromise.then(
- () => {
- controller[sym.started] = true;
- assert(controller[sym.pulling] === false);
- assert(controller[sym.pullAgain] === false);
- readableStreamDefaultControllerCallPullIfNeeded(controller);
- },
- (r) => {
- readableStreamDefaultControllerError(controller, r);
- }
+ setPromiseIsHandledToTrue(
+ startPromise.then(
+ () => {
+ controller[sym.started] = true;
+ assert(controller[sym.pulling] === false);
+ assert(controller[sym.pullAgain] === false);
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+ },
+ (r) => {
+ readableStreamDefaultControllerError(controller, r);
+ }
+ )
);
}
@@ -1053,11 +1398,13 @@ export function setUpReadableStreamDefaultControllerFromUnderlyingSource<T>(
0,
controller
);
+ setFunctionName(pullAlgorithm, "[[pullAlgorithm]]");
const cancelAlgorithm: CancelAlgorithm = createAlgorithmFromUnderlyingMethod(
underlyingSource,
"cancel",
1
);
+ setFunctionName(cancelAlgorithm, "[[cancelAlgorithm]]");
setUpReadableStreamDefaultController(
stream,
controller,
@@ -1069,6 +1416,98 @@ export function setUpReadableStreamDefaultControllerFromUnderlyingSource<T>(
);
}
+function setUpWritableStreamDefaultController<W>(
+ stream: WritableStreamImpl<W>,
+ controller: WritableStreamDefaultControllerImpl<W>,
+ startAlgorithm: StartAlgorithm,
+ writeAlgorithm: WriteAlgorithm<W>,
+ closeAlgorithm: CloseAlgorithm,
+ abortAlgorithm: AbortAlgorithm,
+ highWaterMark: number,
+ sizeAlgorithm: SizeAlgorithm<W>
+): void {
+ assert(isWritableStream(stream));
+ assert(stream[sym.writableStreamController] === undefined);
+ controller[sym.controlledWritableStream] = stream;
+ stream[sym.writableStreamController] = controller;
+ controller[sym.queue] = [];
+ controller[sym.queueTotalSize] = 0;
+ controller[sym.started] = false;
+ controller[sym.strategySizeAlgorithm] = sizeAlgorithm;
+ controller[sym.strategyHWM] = highWaterMark;
+ controller[sym.writeAlgorithm] = writeAlgorithm;
+ controller[sym.closeAlgorithm] = closeAlgorithm;
+ controller[sym.abortAlgorithm] = abortAlgorithm;
+ const backpressure = writableStreamDefaultControllerGetBackpressure(
+ controller
+ );
+ writableStreamUpdateBackpressure(stream, backpressure);
+ const startResult = startAlgorithm();
+ const startPromise = Promise.resolve(startResult);
+ setPromiseIsHandledToTrue(
+ startPromise.then(
+ () => {
+ assert(
+ stream[sym.state] === "writable" || stream[sym.state] === "erroring"
+ );
+ controller[sym.started] = true;
+ writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+ },
+ (r) => {
+ assert(
+ stream[sym.state] === "writable" || stream[sym.state] === "erroring"
+ );
+ controller[sym.started] = true;
+ writableStreamDealWithRejection(stream, r);
+ }
+ )
+ );
+}
+
+export function setUpWritableStreamDefaultControllerFromUnderlyingSink<W>(
+ stream: WritableStreamImpl<W>,
+ underlyingSink: UnderlyingSink<W>,
+ highWaterMark: number,
+ sizeAlgorithm: SizeAlgorithm<W>
+): void {
+ assert(underlyingSink);
+ const controller = Object.create(
+ WritableStreamDefaultControllerImpl.prototype
+ );
+ const startAlgorithm = (): void | PromiseLike<void> => {
+ return invokeOrNoop(underlyingSink, "start", controller);
+ };
+ const writeAlgorithm = createAlgorithmFromUnderlyingMethod(
+ underlyingSink,
+ "write",
+ 1,
+ controller
+ );
+ setFunctionName(writeAlgorithm, "[[writeAlgorithm]]");
+ const closeAlgorithm = createAlgorithmFromUnderlyingMethod(
+ underlyingSink,
+ "close",
+ 0
+ );
+ setFunctionName(closeAlgorithm, "[[closeAlgorithm]]");
+ const abortAlgorithm = createAlgorithmFromUnderlyingMethod(
+ underlyingSink,
+ "abort",
+ 1
+ );
+ setFunctionName(abortAlgorithm, "[[abortAlgorithm]]");
+ setUpWritableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ highWaterMark,
+ sizeAlgorithm
+ );
+}
+
function transferArrayBuffer(buffer: ArrayBuffer): ArrayBuffer {
assert(!isDetachedBuffer(buffer));
const transferredIshVersion = buffer.slice(0);
@@ -1095,4 +1534,571 @@ export function validateAndNormalizeHighWaterMark(
return highWaterMark;
}
+export function writableStreamAbort<W>(
+ stream: WritableStreamImpl<W>,
+ reason: any
+): Promise<void> {
+ const state = stream[sym.state];
+ if (state === "closed" || state === "errored") {
+ return Promise.resolve(undefined);
+ }
+ if (stream[sym.pendingAbortRequest]) {
+ return stream[sym.pendingAbortRequest]!.promise.promise;
+ }
+ assert(state === "writable" || state === "erroring");
+ let wasAlreadyErroring = false;
+ if (state === "erroring") {
+ wasAlreadyErroring = true;
+ reason = undefined;
+ }
+ const promise = getDeferred<void>();
+ stream[sym.pendingAbortRequest] = { promise, reason, wasAlreadyErroring };
+
+ if (wasAlreadyErroring === false) {
+ writableStreamStartErroring(stream, reason);
+ }
+ return promise.promise;
+}
+
+function writableStreamAddWriteRequest<W>(
+ stream: WritableStreamImpl<W>
+): Promise<void> {
+ assert(isWritableStream(stream));
+ assert(stream[sym.state] === "writable");
+ const promise = getDeferred<void>();
+ stream[sym.writeRequests].push(promise);
+ return promise.promise;
+}
+
+export function writableStreamClose<W>(
+ stream: WritableStreamImpl<W>
+): Promise<void> {
+ const state = stream[sym.state];
+ if (state === "closed" || state === "errored") {
+ return Promise.reject(
+ new TypeError("Cannot close an already closed or errored WritableStream.")
+ );
+ }
+ assert(!writableStreamCloseQueuedOrInFlight(stream));
+ const promise = getDeferred<void>();
+ stream[sym.closeRequest] = promise;
+ const writer = stream[sym.writer];
+ if (writer && stream[sym.backpressure] && state === "writable") {
+ writer[sym.readyPromise].resolve!();
+ writer[sym.readyPromise].resolve = undefined;
+ writer[sym.readyPromise].reject = undefined;
+ }
+ writableStreamDefaultControllerClose(stream[sym.writableStreamController]!);
+ return promise.promise;
+}
+
+export function writableStreamCloseQueuedOrInFlight<W>(
+ stream: WritableStreamImpl<W>
+): boolean {
+ if (
+ stream[sym.closeRequest] === undefined &&
+ stream[sym.inFlightCloseRequest] === undefined
+ ) {
+ return false;
+ }
+ return true;
+}
+
+function writableStreamDealWithRejection<W>(
+ stream: WritableStreamImpl<W>,
+ error: any
+): void {
+ const state = stream[sym.state];
+ if (state === "writable") {
+ writableStreamStartErroring(stream, error);
+ return;
+ }
+ assert(state === "erroring");
+ writableStreamFinishErroring(stream);
+}
+
+function writableStreamDefaultControllerAdvanceQueueIfNeeded<W>(
+ controller: WritableStreamDefaultControllerImpl<W>
+): void {
+ const stream = controller[sym.controlledWritableStream];
+ if (!controller[sym.started]) {
+ return;
+ }
+ if (stream[sym.inFlightWriteRequest]) {
+ return;
+ }
+ const state = stream[sym.state];
+ assert(state !== "closed" && state !== "errored");
+ if (state === "erroring") {
+ writableStreamFinishErroring(stream);
+ return;
+ }
+ if (!controller[sym.queue].length) {
+ return;
+ }
+ const writeRecord = peekQueueValue(controller);
+ if (writeRecord === "close") {
+ writableStreamDefaultControllerProcessClose(controller);
+ } else {
+ writableStreamDefaultControllerProcessWrite(controller, writeRecord.chunk);
+ }
+}
+
+export function writableStreamDefaultControllerClearAlgorithms<W>(
+ controller: WritableStreamDefaultControllerImpl<W>
+): void {
+ (controller as any)[sym.writeAlgorithm] = undefined;
+ (controller as any)[sym.closeAlgorithm] = undefined;
+ (controller as any)[sym.abortAlgorithm] = undefined;
+ (controller as any)[sym.strategySizeAlgorithm] = undefined;
+}
+
+function writableStreamDefaultControllerClose<W>(
+ controller: WritableStreamDefaultControllerImpl<W>
+): void {
+ enqueueValueWithSize(controller, "close", 0);
+ writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+}
+
+export function writableStreamDefaultControllerError<W>(
+ controller: WritableStreamDefaultControllerImpl<W>,
+ error: any
+): void {
+ const stream = controller[sym.controlledWritableStream];
+ assert(stream[sym.state] === "writable");
+ writableStreamDefaultControllerClearAlgorithms(controller);
+ writableStreamStartErroring(stream, error);
+}
+
+function writableStreamDefaultControllerErrorIfNeeded<W>(
+ controller: WritableStreamDefaultControllerImpl<W>,
+ error: any
+): void {
+ if (controller[sym.controlledWritableStream][sym.state] === "writable") {
+ writableStreamDefaultControllerError(controller, error);
+ }
+}
+
+function writableStreamDefaultControllerGetBackpressure<W>(
+ controller: WritableStreamDefaultControllerImpl<W>
+): boolean {
+ const desiredSize = writableStreamDefaultControllerGetDesiredSize(controller);
+ return desiredSize <= 0;
+}
+
+function writableStreamDefaultControllerGetChunkSize<W>(
+ controller: WritableStreamDefaultControllerImpl<W>,
+ chunk: W
+): number {
+ let returnValue: number;
+ try {
+ returnValue = controller[sym.strategySizeAlgorithm](chunk);
+ } catch (e) {
+ writableStreamDefaultControllerErrorIfNeeded(controller, e);
+ return 1;
+ }
+ return returnValue;
+}
+
+function writableStreamDefaultControllerGetDesiredSize<W>(
+ controller: WritableStreamDefaultControllerImpl<W>
+): number {
+ return controller[sym.strategyHWM] - controller[sym.queueTotalSize];
+}
+
+function writableStreamDefaultControllerProcessClose<W>(
+ controller: WritableStreamDefaultControllerImpl<W>
+): void {
+ const stream = controller[sym.controlledWritableStream];
+ writableStreamMarkCloseRequestInFlight(stream);
+ dequeueValue(controller);
+ assert(controller[sym.queue].length === 0);
+ const sinkClosePromise = controller[sym.closeAlgorithm]();
+ writableStreamDefaultControllerClearAlgorithms(controller);
+ setPromiseIsHandledToTrue(
+ sinkClosePromise.then(
+ () => {
+ writableStreamFinishInFlightClose(stream);
+ },
+ (reason) => {
+ writableStreamFinishInFlightCloseWithError(stream, reason);
+ }
+ )
+ );
+}
+
+function writableStreamDefaultControllerProcessWrite<W>(
+ controller: WritableStreamDefaultControllerImpl<W>,
+ chunk: W
+): void {
+ const stream = controller[sym.controlledWritableStream];
+ writableStreamMarkFirstWriteRequestInFlight(stream);
+ const sinkWritePromise = controller[sym.writeAlgorithm](chunk);
+ setPromiseIsHandledToTrue(
+ sinkWritePromise.then(
+ () => {
+ writableStreamFinishInFlightWrite(stream);
+ const state = stream[sym.state];
+ assert(state === "writable" || state === "erroring");
+ dequeueValue(controller);
+ if (
+ !writableStreamCloseQueuedOrInFlight(stream) &&
+ state === "writable"
+ ) {
+ const backpressure = writableStreamDefaultControllerGetBackpressure(
+ controller
+ );
+ writableStreamUpdateBackpressure(stream, backpressure);
+ }
+ writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+ },
+ (reason) => {
+ if (stream[sym.state] === "writable") {
+ writableStreamDefaultControllerClearAlgorithms(controller);
+ }
+ writableStreamFinishInFlightWriteWithError(stream, reason);
+ }
+ )
+ );
+}
+
+function writableStreamDefaultControllerWrite<W>(
+ controller: WritableStreamDefaultControllerImpl<W>,
+ chunk: W,
+ chunkSize: number
+): void {
+ const writeRecord = { chunk };
+ try {
+ enqueueValueWithSize(controller, writeRecord, chunkSize);
+ } catch (e) {
+ writableStreamDefaultControllerErrorIfNeeded(controller, e);
+ return;
+ }
+ const stream = controller[sym.controlledWritableStream];
+ if (
+ !writableStreamCloseQueuedOrInFlight(stream) &&
+ stream[sym.state] === "writable"
+ ) {
+ const backpressure = writableStreamDefaultControllerGetBackpressure(
+ controller
+ );
+ writableStreamUpdateBackpressure(stream, backpressure);
+ }
+ writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+}
+
+export function writableStreamDefaultWriterAbort<W>(
+ writer: WritableStreamDefaultWriterImpl<W>,
+ reason: any
+): Promise<void> {
+ const stream = writer[sym.ownerWritableStream];
+ assert(stream);
+ return writableStreamAbort(stream, reason);
+}
+
+export function writableStreamDefaultWriterClose<W>(
+ writer: WritableStreamDefaultWriterImpl<W>
+): Promise<void> {
+ const stream = writer[sym.ownerWritableStream];
+ assert(stream);
+ return writableStreamClose(stream);
+}
+
+function writableStreamDefaultWriterCloseWithErrorPropagation<W>(
+ writer: WritableStreamDefaultWriterImpl<W>
+): Promise<void> {
+ const stream = writer[sym.ownerWritableStream];
+ assert(stream);
+ const state = stream[sym.state];
+ if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") {
+ return Promise.resolve();
+ }
+ if (state === "errored") {
+ return Promise.reject(stream[sym.storedError]);
+ }
+ assert(state === "writable" || state === "erroring");
+ return writableStreamDefaultWriterClose(writer);
+}
+
+function writableStreamDefaultWriterEnsureClosePromiseRejected<W>(
+ writer: WritableStreamDefaultWriterImpl<W>,
+ error: any
+): void {
+ if (writer[sym.closedPromise].reject) {
+ writer[sym.closedPromise].reject!(error);
+ } else {
+ writer[sym.closedPromise] = {
+ promise: Promise.reject(error),
+ };
+ }
+ setPromiseIsHandledToTrue(writer[sym.closedPromise].promise);
+}
+
+function writableStreamDefaultWriterEnsureReadyPromiseRejected<W>(
+ writer: WritableStreamDefaultWriterImpl<W>,
+ error: any
+): void {
+ if (writer[sym.readyPromise].reject) {
+ writer[sym.readyPromise].reject!(error);
+ writer[sym.readyPromise].reject = undefined;
+ writer[sym.readyPromise].resolve = undefined;
+ } else {
+ writer[sym.readyPromise] = {
+ promise: Promise.reject(error),
+ };
+ }
+ setPromiseIsHandledToTrue(writer[sym.readyPromise].promise);
+}
+
+export function writableStreamDefaultWriterWrite<W>(
+ writer: WritableStreamDefaultWriterImpl<W>,
+ chunk: W
+): Promise<void> {
+ const stream = writer[sym.ownerWritableStream];
+ assert(stream);
+ const controller = stream[sym.writableStreamController];
+ assert(controller);
+ const chunkSize = writableStreamDefaultControllerGetChunkSize(
+ controller,
+ chunk
+ );
+ if (stream !== writer[sym.ownerWritableStream]) {
+ return Promise.reject("Writer has incorrect WritableStream.");
+ }
+ const state = stream[sym.state];
+ if (state === "errored") {
+ return Promise.reject(stream[sym.storedError]);
+ }
+ if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") {
+ return Promise.reject(new TypeError("The stream is closed or closing."));
+ }
+ if (state === "erroring") {
+ return Promise.reject(stream[sym.storedError]);
+ }
+ assert(state === "writable");
+ const promise = writableStreamAddWriteRequest(stream);
+ writableStreamDefaultControllerWrite(controller, chunk, chunkSize);
+ return promise;
+}
+
+export function writableStreamDefaultWriterGetDesiredSize<W>(
+ writer: WritableStreamDefaultWriterImpl<W>
+): number | null {
+ const stream = writer[sym.ownerWritableStream];
+ const state = stream[sym.state];
+ if (state === "errored" || state === "erroring") {
+ return null;
+ }
+ if (state === "closed") {
+ return 0;
+ }
+ return writableStreamDefaultControllerGetDesiredSize(
+ stream[sym.writableStreamController]!
+ );
+}
+
+export function writableStreamDefaultWriterRelease<W>(
+ writer: WritableStreamDefaultWriterImpl<W>
+): void {
+ const stream = writer[sym.ownerWritableStream];
+ assert(stream);
+ assert(stream[sym.writer] === writer);
+ const releasedError = new TypeError(
+ "Writer was released and can no longer be used to monitor the stream's closedness."
+ );
+ writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError);
+ writableStreamDefaultWriterEnsureClosePromiseRejected(writer, releasedError);
+ stream[sym.writer] = undefined;
+ (writer as any)[sym.ownerWritableStream] = undefined;
+}
+
+function writableStreamFinishErroring<W>(stream: WritableStreamImpl<W>): void {
+ assert(stream[sym.state] === "erroring");
+ assert(!writableStreamHasOperationMarkedInFlight(stream));
+ stream[sym.state] = "errored";
+ stream[sym.writableStreamController]![sym.errorSteps]();
+ const storedError = stream[sym.storedError];
+ for (const writeRequest of stream[sym.writeRequests]) {
+ assert(writeRequest.reject);
+ writeRequest.reject(storedError);
+ }
+ stream[sym.writeRequests] = [];
+ if (!stream[sym.pendingAbortRequest]) {
+ writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ return;
+ }
+ const abortRequest = stream[sym.pendingAbortRequest];
+ assert(abortRequest);
+ stream[sym.pendingAbortRequest] = undefined;
+ if (abortRequest.wasAlreadyErroring) {
+ assert(abortRequest.promise.reject);
+ abortRequest.promise.reject(storedError);
+ writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ return;
+ }
+ const promise = stream[sym.writableStreamController]![sym.abortSteps](
+ abortRequest.reason
+ );
+ setPromiseIsHandledToTrue(
+ promise.then(
+ () => {
+ assert(abortRequest.promise.resolve);
+ abortRequest.promise.resolve();
+ writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ },
+ (reason) => {
+ assert(abortRequest.promise.reject);
+ abortRequest.promise.reject(reason);
+ writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ }
+ )
+ );
+}
+
+function writableStreamFinishInFlightClose<W>(
+ stream: WritableStreamImpl<W>
+): void {
+ assert(stream[sym.inFlightCloseRequest]);
+ stream[sym.inFlightCloseRequest]?.resolve!();
+ stream[sym.inFlightCloseRequest] = undefined;
+ const state = stream[sym.state];
+ assert(state === "writable" || state === "erroring");
+ if (state === "erroring") {
+ stream[sym.storedError] = undefined;
+ if (stream[sym.pendingAbortRequest]) {
+ stream[sym.pendingAbortRequest]!.promise.resolve!();
+ stream[sym.pendingAbortRequest] = undefined;
+ }
+ }
+ stream[sym.state] = "closed";
+ const writer = stream[sym.writer];
+ if (writer) {
+ writer[sym.closedPromise].resolve!();
+ }
+ assert(stream[sym.pendingAbortRequest] === undefined);
+ assert(stream[sym.storedError] === undefined);
+}
+
+function writableStreamFinishInFlightCloseWithError<W>(
+ stream: WritableStreamImpl<W>,
+ error: any
+): void {
+ assert(stream[sym.inFlightCloseRequest]);
+ stream[sym.inFlightCloseRequest]?.reject!(error);
+ stream[sym.inFlightCloseRequest] = undefined;
+ assert(stream[sym.state] === "writable" || stream[sym.state] === "erroring");
+ if (stream[sym.pendingAbortRequest]) {
+ stream[sym.pendingAbortRequest]?.promise.reject!(error);
+ stream[sym.pendingAbortRequest] = undefined;
+ }
+ writableStreamDealWithRejection(stream, error);
+}
+
+function writableStreamFinishInFlightWrite<W>(
+ stream: WritableStreamImpl<W>
+): void {
+ assert(stream[sym.inFlightWriteRequest]);
+ stream[sym.inFlightWriteRequest]!.resolve();
+ stream[sym.inFlightWriteRequest] = undefined;
+}
+
+function writableStreamFinishInFlightWriteWithError<W>(
+ stream: WritableStreamImpl<W>,
+ error: any
+): void {
+ assert(stream[sym.inFlightWriteRequest]);
+ stream[sym.inFlightWriteRequest]!.reject!(error);
+ stream[sym.inFlightWriteRequest] = undefined;
+ assert(stream[sym.state] === "writable" || stream[sym.state] === "erroring");
+ writableStreamDealWithRejection(stream, error);
+}
+
+function writableStreamHasOperationMarkedInFlight<W>(
+ stream: WritableStreamImpl<W>
+): boolean {
+ if (
+ stream[sym.inFlightWriteRequest] === undefined &&
+ stream[sym.inFlightCloseRequest] === undefined
+ ) {
+ return false;
+ }
+ return true;
+}
+
+function writableStreamMarkCloseRequestInFlight<W>(
+ stream: WritableStreamImpl<W>
+): void {
+ assert(stream[sym.inFlightCloseRequest] === undefined);
+ assert(stream[sym.closeRequest] !== undefined);
+ stream[sym.inFlightCloseRequest] = stream[sym.closeRequest];
+ stream[sym.closeRequest] = undefined;
+}
+
+function writableStreamMarkFirstWriteRequestInFlight<W>(
+ stream: WritableStreamImpl<W>
+): void {
+ assert(stream[sym.inFlightWriteRequest] === undefined);
+ assert(stream[sym.writeRequests].length);
+ const writeRequest = stream[sym.writeRequests].shift();
+ stream[sym.inFlightWriteRequest] = writeRequest;
+}
+
+function writableStreamRejectCloseAndClosedPromiseIfNeeded<W>(
+ stream: WritableStreamImpl<W>
+): void {
+ assert(stream[sym.state] === "errored");
+ if (stream[sym.closeRequest]) {
+ assert(stream[sym.inFlightCloseRequest] === undefined);
+ stream[sym.closeRequest]!.reject!(stream[sym.storedError]);
+ stream[sym.closeRequest] = undefined;
+ }
+ const writer = stream[sym.writer];
+ if (writer) {
+ writer[sym.closedPromise].reject!(stream[sym.storedError]);
+ setPromiseIsHandledToTrue(writer[sym.closedPromise].promise);
+ }
+}
+
+function writableStreamStartErroring<W>(
+ stream: WritableStreamImpl<W>,
+ reason: any
+): void {
+ assert(stream[sym.storedError] === undefined);
+ assert(stream[sym.state] === "writable");
+ const controller = stream[sym.writableStreamController];
+ assert(controller);
+ stream[sym.state] = "erroring";
+ stream[sym.storedError] = reason;
+ const writer = stream[sym.writer];
+ if (writer) {
+ writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);
+ }
+ if (
+ !writableStreamHasOperationMarkedInFlight(stream) &&
+ controller[sym.started]
+ ) {
+ writableStreamFinishErroring(stream);
+ }
+}
+
+function writableStreamUpdateBackpressure<W>(
+ stream: WritableStreamImpl<W>,
+ backpressure: boolean
+): void {
+ assert(stream[sym.state] === "writable");
+ assert(!writableStreamCloseQueuedOrInFlight(stream));
+ const writer = stream[sym.writer];
+ if (writer && backpressure !== stream[sym.backpressure]) {
+ if (backpressure) {
+ writer[sym.readyPromise] = getDeferred();
+ } else {
+ assert(backpressure === false);
+ writer[sym.readyPromise].resolve!();
+ writer[sym.readyPromise].resolve = undefined;
+ writer[sym.readyPromise].reject = undefined;
+ }
+ }
+ stream[sym.backpressure] = backpressure;
+}
+
/* eslint-enable */