summaryrefslogtreecommitdiff
path: root/op_crates/fetch/11_streams.js
diff options
context:
space:
mode:
Diffstat (limited to 'op_crates/fetch/11_streams.js')
-rw-r--r--op_crates/fetch/11_streams.js5488
1 files changed, 2961 insertions, 2527 deletions
diff --git a/op_crates/fetch/11_streams.js b/op_crates/fetch/11_streams.js
index 0704465cc..6031fa3ef 100644
--- a/op_crates/fetch/11_streams.js
+++ b/op_crates/fetch/11_streams.js
@@ -1,118 +1,13 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-// This code closely follows the WHATWG Stream Specification
-// See: https://streams.spec.whatwg.org/
-//
-// There are some parts that are not fully implemented, and there are some
-// comments which point to steps of the specification that are not implemented.
+// @ts-check
+/// <reference path="./11_streams_types.d.ts" />
+/// <reference path="./lib.deno_fetch.d.ts" />
+/// <reference lib="esnext" />
((window) => {
const customInspect = Symbol.for("Deno.customInspect");
- function cloneArrayBuffer(
- srcBuffer,
- srcByteOffset,
- srcLength,
- _cloneConstructor,
- ) {
- // this function fudges the return type but SharedArrayBuffer is disabled for a while anyway
- return srcBuffer.slice(
- srcByteOffset,
- srcByteOffset + srcLength,
- );
- }
-
- const objectCloneMemo = new WeakMap();
-
- /** Clone a value in a similar way to structured cloning. It is similar to a
- * StructureDeserialize(StructuredSerialize(...)). */
- function cloneValue(value) {
- switch (typeof value) {
- case "number":
- case "string":
- case "boolean":
- case "undefined":
- case "bigint":
- return value;
- case "object": {
- if (objectCloneMemo.has(value)) {
- return objectCloneMemo.get(value);
- }
- if (value === null) {
- return value;
- }
- if (value instanceof Date) {
- return new Date(value.valueOf());
- }
- if (value instanceof RegExp) {
- return new RegExp(value);
- }
- if (value instanceof SharedArrayBuffer) {
- return value;
- }
- if (value instanceof ArrayBuffer) {
- const cloned = cloneArrayBuffer(
- value,
- 0,
- value.byteLength,
- ArrayBuffer,
- );
- objectCloneMemo.set(value, cloned);
- return cloned;
- }
- if (ArrayBuffer.isView(value)) {
- const clonedBuffer = cloneValue(value.buffer);
- // Use DataViewConstructor type purely for type-checking, can be a
- // DataView or TypedArray. They use the same constructor signature,
- // only DataView has a length in bytes and TypedArrays use a length in
- // terms of elements, so we adjust for that.
- let length;
- if (value instanceof DataView) {
- length = value.byteLength;
- } else {
- length = value.length;
- }
- return new (value.constructor)(
- clonedBuffer,
- value.byteOffset,
- length,
- );
- }
- if (value instanceof Map) {
- const clonedMap = new Map();
- objectCloneMemo.set(value, clonedMap);
- value.forEach((v, k) => {
- clonedMap.set(cloneValue(k), cloneValue(v));
- });
- return clonedMap;
- }
- if (value instanceof Set) {
- const clonedSet = new Set([...value].map(cloneValue));
- objectCloneMemo.set(value, clonedSet);
- return clonedSet;
- }
-
- const clonedObj = {};
- objectCloneMemo.set(value, clonedObj);
- const sourceKeys = Object.getOwnPropertyNames(value);
- for (const key of sourceKeys) {
- clonedObj[key] = cloneValue(value[key]);
- }
- Reflect.setPrototypeOf(clonedObj, Reflect.getPrototypeOf(value));
- return clonedObj;
- }
- case "symbol":
- case "function":
- // fallthrough
- default:
- throw new DOMException("Uncloneable value in stream", "DataCloneError");
- }
- }
-
- function setFunctionName(fn, value) {
- Object.defineProperty(fn, "name", { value, configurable: true });
- }
-
class AssertionError extends Error {
constructor(msg) {
super(msg);
@@ -120,973 +15,368 @@
}
}
+ /**
+ * @param {unknown} cond
+ * @param {string=} msg
+ * @returns {asserts cond}
+ */
function assert(cond, msg = "Assertion failed.") {
if (!cond) {
throw new AssertionError(msg);
}
}
- const sym = {
- abortAlgorithm: Symbol("abortAlgorithm"),
- abortSteps: Symbol("abortSteps"),
- asyncIteratorReader: Symbol("asyncIteratorReader"),
- autoAllocateChunkSize: Symbol("autoAllocateChunkSize"),
- backpressure: Symbol("backpressure"),
- backpressureChangePromise: Symbol("backpressureChangePromise"),
- byobRequest: Symbol("byobRequest"),
- cancelAlgorithm: Symbol("cancelAlgorithm"),
- cancelSteps: Symbol("cancelSteps"),
- closeAlgorithm: Symbol("closeAlgorithm"),
- closedPromise: Symbol("closedPromise"),
- closeRequest: Symbol("closeRequest"),
- closeRequested: Symbol("closeRequested"),
- controlledReadableByteStream: Symbol(
- "controlledReadableByteStream",
- ),
- controlledReadableStream: Symbol("controlledReadableStream"),
- controlledTransformStream: Symbol("controlledTransformStream"),
- controlledWritableStream: Symbol("controlledWritableStream"),
- disturbed: Symbol("disturbed"),
- errorSteps: Symbol("errorSteps"),
- flushAlgorithm: Symbol("flushAlgorithm"),
- forAuthorCode: Symbol("forAuthorCode"),
- inFlightWriteRequest: Symbol("inFlightWriteRequest"),
- inFlightCloseRequest: Symbol("inFlightCloseRequest"),
- isFakeDetached: Symbol("isFakeDetached"),
- ownerReadableStream: Symbol("ownerReadableStream"),
- ownerWritableStream: Symbol("ownerWritableStream"),
- pendingAbortRequest: Symbol("pendingAbortRequest"),
- preventCancel: Symbol("preventCancel"),
- pullAgain: Symbol("pullAgain"),
- pullAlgorithm: Symbol("pullAlgorithm"),
- pulling: Symbol("pulling"),
- pullSteps: Symbol("pullSteps"),
- queue: Symbol("queue"),
- queueTotalSize: Symbol("queueTotalSize"),
- readable: Symbol("readable"),
- readableStreamController: Symbol("readableStreamController"),
- reader: Symbol("reader"),
- readRequests: Symbol("readRequests"),
- readyPromise: Symbol("readyPromise"),
- started: Symbol("started"),
- state: Symbol("state"),
- storedError: Symbol("storedError"),
- strategyHWM: Symbol("strategyHWM"),
- strategySizeAlgorithm: Symbol("strategySizeAlgorithm"),
- transformAlgorithm: Symbol("transformAlgorithm"),
- transformStreamController: Symbol("transformStreamController"),
- writableStreamController: Symbol("writableStreamController"),
- writeAlgorithm: Symbol("writeAlgorithm"),
- writable: Symbol("writable"),
- writer: Symbol("writer"),
- writeRequests: Symbol("writeRequests"),
- };
- class ReadableByteStreamController {
- constructor() {
- throw new TypeError(
- "ReadableByteStreamController's constructor cannot be called.",
- );
- }
-
- get byobRequest() {
- return undefined;
- }
-
- get desiredSize() {
- if (!isReadableByteStreamController(this)) {
- throw new TypeError("Invalid ReadableByteStreamController.");
- }
- return readableByteStreamControllerGetDesiredSize(this);
- }
-
- close() {
- if (!isReadableByteStreamController(this)) {
- throw new TypeError("Invalid ReadableByteStreamController.");
- }
- if (this[sym.closeRequested]) {
- throw new TypeError("Closed already requested.");
- }
- if (this[sym.controlledReadableByteStream][sym.state] !== "readable") {
- throw new TypeError(
- "ReadableByteStreamController's stream is not in a readable state.",
- );
- }
- readableByteStreamControllerClose(this);
- }
-
- enqueue(chunk) {
- if (!isReadableByteStreamController(this)) {
- throw new TypeError("Invalid ReadableByteStreamController.");
- }
- if (this[sym.closeRequested]) {
- throw new TypeError("Closed already requested.");
- }
- if (this[sym.controlledReadableByteStream][sym.state] !== "readable") {
- throw new TypeError(
- "ReadableByteStreamController's stream is not in a readable state.",
- );
- }
- if (!ArrayBuffer.isView(chunk)) {
- throw new TypeError(
- "You can only enqueue array buffer views when using a ReadableByteStreamController",
- );
- }
- if (isDetachedBuffer(chunk.buffer)) {
- throw new TypeError(
- "Cannot enqueue a view onto a detached ArrayBuffer",
- );
- }
- readableByteStreamControllerEnqueue(this, chunk);
- }
-
- error(error) {
- if (!isReadableByteStreamController(this)) {
- throw new TypeError("Invalid ReadableByteStreamController.");
- }
- readableByteStreamControllerError(this, error);
- }
-
- [sym.cancelSteps](reason) {
- // 3.11.5.1.1 If this.[[pendingPullIntos]] is not empty,
- resetQueue(this);
- const result = this[sym.cancelAlgorithm](reason);
- readableByteStreamControllerClearAlgorithms(this);
- return result;
- }
-
- [sym.pullSteps]() {
- const stream = this[sym.controlledReadableByteStream];
- assert(readableStreamHasDefaultReader(stream));
- if (this[sym.queueTotalSize] > 0) {
- assert(readableStreamGetNumReadRequests(stream) === 0);
- const entry = this[sym.queue].shift();
- assert(entry);
- this[sym.queueTotalSize] -= entry.size;
- readableByteStreamControllerHandleQueueDrain(this);
- const view = new Uint8Array(entry.value, entry.offset, entry.size);
- return Promise.resolve(
- readableStreamCreateReadResult(
- view,
- false,
- stream[sym.reader][sym.forAuthorCode],
- ),
- );
- }
- // 3.11.5.2.5 If autoAllocateChunkSize is not undefined,
- const promise = readableStreamAddReadRequest(stream);
- readableByteStreamControllerCallPullIfNeeded(this);
- return promise;
- }
-
- [customInspect]() {
- return `${this.constructor.name} { byobRequest: ${
- String(this.byobRequest)
- }, desiredSize: ${String(this.desiredSize)} }`;
- }
- }
+ /** @template T */
+ class Deferred {
+ /** @type {Promise<T>} */
+ #promise;
+ /** @type {(reject?: any) => void} */
+ #reject;
+ /** @type {(value: T | PromiseLike<T>) => void} */
+ #resolve;
+ /** @type {"pending" | "fulfilled"} */
+ #state = "pending";
- class ReadableStreamDefaultController {
constructor() {
- throw new TypeError(
- "ReadableStreamDefaultController's constructor cannot be called.",
- );
+ this.#promise = new Promise((resolve, reject) => {
+ this.#resolve = resolve;
+ this.#reject = reject;
+ });
}
- get desiredSize() {
- if (!isReadableStreamDefaultController(this)) {
- throw new TypeError("Invalid ReadableStreamDefaultController.");
- }
- return readableStreamDefaultControllerGetDesiredSize(this);
+ /** @returns {Promise<T>} */
+ get promise() {
+ return this.#promise;
}
- close() {
- if (!isReadableStreamDefaultController(this)) {
- throw new TypeError("Invalid ReadableStreamDefaultController.");
- }
- if (!readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
- throw new TypeError(
- "ReadableStreamDefaultController cannot close or enqueue.",
- );
- }
- readableStreamDefaultControllerClose(this);
+ /** @returns {"pending" | "fulfilled"} */
+ get state() {
+ return this.#state;
}
- enqueue(chunk) {
- if (!isReadableStreamDefaultController(this)) {
- throw new TypeError("Invalid ReadableStreamDefaultController.");
- }
- if (!readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
- throw new TypeError("ReadableSteamController cannot enqueue.");
+ /** @param {any=} reason */
+ reject(reason) {
+ // already settled promises are a no-op
+ if (this.#state !== "pending") {
+ return;
}
- return readableStreamDefaultControllerEnqueue(this, chunk);
+ this.#state = "fulfilled";
+ this.#reject(reason);
}
- error(error) {
- if (!isReadableStreamDefaultController(this)) {
- throw new TypeError("Invalid ReadableStreamDefaultController.");
+ /** @param {T | PromiseLike<T>} value */
+ resolve(value) {
+ // already settled promises are a no-op
+ if (this.#state !== "pending") {
+ return;
}
- readableStreamDefaultControllerError(this, error);
+ this.#state = "fulfilled";
+ this.#resolve(value);
}
+ }
- [sym.cancelSteps](reason) {
- resetQueue(this);
- const result = this[sym.cancelAlgorithm](reason);
- readableStreamDefaultControllerClearAlgorithms(this);
- return result;
+ /**
+ * @param {(...args: any[]) => any} fn
+ * @param {boolean} enforcePromise
+ * @returns {(...args: any[]) => any}
+ */
+ function reflectApply(fn, enforcePromise) {
+ if (typeof fn !== "function") {
+ throw new TypeError("The property must be a function.");
}
-
- [sym.pullSteps]() {
- const stream = this[sym.controlledReadableStream];
- if (this[sym.queue].length) {
- const chunk = dequeueValue(this);
- if (this[sym.closeRequested] && this[sym.queue].length === 0) {
- readableStreamDefaultControllerClearAlgorithms(this);
- readableStreamClose(stream);
- } else {
- readableStreamDefaultControllerCallPullIfNeeded(this);
+ return function (...args) {
+ if (enforcePromise) {
+ try {
+ return resolvePromiseWith(Reflect.apply(fn, this, args));
+ } catch (err) {
+ return Promise.reject(err);
}
- return Promise.resolve(
- readableStreamCreateReadResult(
- chunk,
- false,
- stream[sym.reader][sym.forAuthorCode],
- ),
- );
}
- const pendingPromise = readableStreamAddReadRequest(stream);
- readableStreamDefaultControllerCallPullIfNeeded(this);
- return pendingPromise;
- }
-
- [customInspect]() {
- return `${this.constructor.name} { desiredSize: ${
- String(this.desiredSize)
- } }`;
- }
+ return Reflect.apply(fn, this, args);
+ };
}
- class ReadableStreamDefaultReader {
- constructor(stream) {
- if (!isReadableStream(stream)) {
- throw new TypeError("stream is not a ReadableStream.");
- }
- if (isReadableStreamLocked(stream)) {
- throw new TypeError("stream is locked.");
- }
- readableStreamReaderGenericInitialize(this, stream);
- this[sym.readRequests] = [];
+ /**
+ * @template I
+ * @template O
+ * @param {Transformer<I, O>} transformer
+ * @returns {Transformer<I, O>}
+ */
+ function convertTransformer(transformer) {
+ const transformerDict = Object.create(null);
+ if (transformer === null) {
+ return transformerDict;
}
-
- get closed() {
- if (!isReadableStreamDefaultReader(this)) {
- return Promise.reject(
- new TypeError("Invalid ReadableStreamDefaultReader."),
- );
- }
- return (
- this[sym.closedPromise].promise ??
- Promise.reject(new TypeError("Invalid reader."))
- );
+ if ("flush" in transformer) {
+ transformerDict.flush = reflectApply(transformer.flush, true);
}
-
- cancel(reason) {
- if (!isReadableStreamDefaultReader(this)) {
- return Promise.reject(
- new TypeError("Invalid ReadableStreamDefaultReader."),
- );
- }
- if (!this[sym.ownerReadableStream]) {
- return Promise.reject(new TypeError("Invalid reader."));
- }
- return readableStreamReaderGenericCancel(this, reason);
+ if ("readableType" in transformer) {
+ transformerDict.readableType = transformer.readableType;
}
-
- read() {
- if (!isReadableStreamDefaultReader(this)) {
- return Promise.reject(
- new TypeError("Invalid ReadableStreamDefaultReader."),
- );
- }
- if (!this[sym.ownerReadableStream]) {
- return Promise.reject(new TypeError("Invalid reader."));
- }
- return readableStreamDefaultReaderRead(this);
+ if ("start" in transformer) {
+ transformerDict.start = reflectApply(transformer.start, false);
}
-
- releaseLock() {
- if (!isReadableStreamDefaultReader(this)) {
- throw new TypeError("Invalid ReadableStreamDefaultReader.");
- }
- if (this[sym.ownerReadableStream] === undefined) {
- return;
- }
- if (this[sym.readRequests].length) {
- throw new TypeError("Cannot release lock with pending read requests.");
- }
- readableStreamReaderGenericRelease(this);
+ if ("transform" in transformer) {
+ transformerDict.transform = reflectApply(transformer.transform, true);
}
-
- [customInspect]() {
- return `${this.constructor.name} { closed: Promise }`;
+ if ("writableType" in transformer) {
+ transformerDict.writableType = transformer.writableType;
}
+ return transformerDict;
}
- const AsyncIteratorPrototype = Object
- .getPrototypeOf(Object.getPrototypeOf(async function* () {}).prototype);
-
- const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
- next() {
- if (!isReadableStreamAsyncIterator(this)) {
- return Promise.reject(
- new TypeError("invalid ReadableStreamAsyncIterator."),
- );
- }
- const reader = this[sym.asyncIteratorReader];
- if (!reader[sym.ownerReadableStream]) {
- return Promise.reject(
- new TypeError("reader owner ReadableStream is undefined."),
- );
- }
- return readableStreamDefaultReaderRead(reader).then((result) => {
- assert(typeof result === "object");
- const { done } = result;
- assert(typeof done === "boolean");
- if (done) {
- readableStreamReaderGenericRelease(reader);
- }
- const { value } = result;
- return readableStreamCreateReadResult(value, done, true);
- });
- },
- return(
- value,
- ) {
- if (!isReadableStreamAsyncIterator(this)) {
- return Promise.reject(
- new TypeError("invalid ReadableStreamAsyncIterator."),
- );
- }
- const reader = this[sym.asyncIteratorReader];
- if (!reader[sym.ownerReadableStream]) {
- return Promise.reject(
- new TypeError("reader owner ReadableStream is undefined."),
- );
- }
- if (reader[sym.readRequests].length) {
- return Promise.reject(
- new TypeError("reader has outstanding read requests."),
- );
- }
- if (!this[sym.preventCancel]) {
- const result = readableStreamReaderGenericCancel(reader, value);
- readableStreamReaderGenericRelease(reader);
- return result.then(() =>
- readableStreamCreateReadResult(value, true, true)
- );
- }
- readableStreamReaderGenericRelease(reader);
- return Promise.resolve(
- readableStreamCreateReadResult(value, true, true),
- );
- },
- }, AsyncIteratorPrototype);
-
- class ReadableStream {
- constructor(
- underlyingSource = {},
- strategy = {},
- ) {
- initializeReadableStream(this);
- const { size } = strategy;
- let { highWaterMark } = strategy;
- const { type } = underlyingSource;
-
- if (underlyingSource.type == "bytes") {
- if (size !== undefined) {
- throw new RangeError(
- `When underlying source is "bytes", strategy.size must be undefined.`,
- );
- }
- highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark ?? 0);
- setUpReadableByteStreamControllerFromUnderlyingSource(
- this,
- underlyingSource,
- highWaterMark,
- );
- } else if (type === undefined) {
- const sizeAlgorithm = makeSizeAlgorithmFromSizeFunction(size);
- highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark ?? 1);
- setUpReadableStreamDefaultControllerFromUnderlyingSource(
- this,
- underlyingSource,
- highWaterMark,
- sizeAlgorithm,
- );
- } else {
- throw new RangeError(
- `Valid values for underlyingSource are "bytes" or undefined. Received: "${type}".`,
- );
- }
+ /**
+ * @template W
+ * @param {UnderlyingSink<W>} underlyingSink
+ * @returns {UnderlyingSink<W>}
+ */
+ function convertUnderlyingSink(underlyingSink) {
+ const underlyingSinkDict = Object.create(null);
+ if (underlyingSink === null) {
+ return underlyingSinkDict;
}
-
- get locked() {
- if (!isReadableStream(this)) {
- throw new TypeError("Invalid ReadableStream.");
- }
- return isReadableStreamLocked(this);
+ if ("abort" in underlyingSink) {
+ underlyingSinkDict.abort = reflectApply(underlyingSink.abort, true);
}
-
- cancel(reason) {
- if (!isReadableStream(this)) {
- return Promise.reject(new TypeError("Invalid ReadableStream."));
- }
- if (isReadableStreamLocked(this)) {
- return Promise.reject(
- new TypeError("Cannot cancel a locked ReadableStream."),
- );
- }
- return readableStreamCancel(this, reason);
+ if ("close" in underlyingSink) {
+ underlyingSinkDict.close = reflectApply(underlyingSink.close, true);
}
-
- getIterator({
- preventCancel,
- } = {}) {
- if (!isReadableStream(this)) {
- throw new TypeError("Invalid ReadableStream.");
- }
- const reader = acquireReadableStreamDefaultReader(this);
- const iterator = Object.create(ReadableStreamAsyncIteratorPrototype);
- iterator[sym.asyncIteratorReader] = reader;
- iterator[sym.preventCancel] = Boolean(preventCancel);
- return iterator;
+ if ("start" in underlyingSink) {
+ underlyingSinkDict.start = reflectApply(underlyingSink.start, false);
}
-
- getReader({ mode } = {}) {
- if (!isReadableStream(this)) {
- throw new TypeError("Invalid ReadableStream.");
- }
- if (mode === undefined) {
- return acquireReadableStreamDefaultReader(this, true);
- }
- mode = String(mode);
- // 3.2.5.4.4 If mode is "byob", return ? AcquireReadableStreamBYOBReader(this, true).
- throw new RangeError(`Unsupported mode "${mode}"`);
+ if (underlyingSink.type) {
+ underlyingSinkDict.type = underlyingSink.type;
}
-
- pipeThrough(
- {
- writable,
- readable,
- },
- { preventClose, preventAbort, preventCancel, signal } = {},
- ) {
- if (!isReadableStream(this)) {
- throw new TypeError("Invalid ReadableStream.");
- }
- if (!isWritableStream(writable)) {
- throw new TypeError("writable is not a valid WritableStream.");
- }
- if (!isReadableStream(readable)) {
- throw new TypeError("readable is not a valid ReadableStream.");
- }
- preventClose = Boolean(preventClose);
- preventAbort = Boolean(preventAbort);
- preventCancel = Boolean(preventCancel);
- if (signal && !(signal instanceof AbortSignal)) {
- throw new TypeError("Invalid signal.");
- }
- if (isReadableStreamLocked(this)) {
- throw new TypeError("ReadableStream is locked.");
- }
- if (isWritableStreamLocked(writable)) {
- throw new TypeError("writable is locked.");
- }
- const promise = readableStreamPipeTo(
- this,
- writable,
- preventClose,
- preventAbort,
- preventCancel,
- signal,
- );
- setPromiseIsHandledToTrue(promise);
- return readable;
- }
-
- pipeTo(
- dest,
- { preventClose, preventAbort, preventCancel, signal } = {},
- ) {
- if (!isReadableStream(this)) {
- return Promise.reject(new TypeError("Invalid ReadableStream."));
- }
- if (!isWritableStream(dest)) {
- return Promise.reject(
- new TypeError("dest is not a valid WritableStream."),
- );
- }
- preventClose = Boolean(preventClose);
- preventAbort = Boolean(preventAbort);
- preventCancel = Boolean(preventCancel);
- if (signal && !(signal instanceof AbortSignal)) {
- return Promise.reject(new TypeError("Invalid signal."));
- }
- if (isReadableStreamLocked(this)) {
- return Promise.reject(new TypeError("ReadableStream is locked."));
- }
- if (isWritableStreamLocked(dest)) {
- return Promise.reject(new TypeError("dest is locked."));
- }
- return readableStreamPipeTo(
- this,
- dest,
- preventClose,
- preventAbort,
- preventCancel,
- signal,
- );
- }
-
- tee() {
- if (!isReadableStream(this)) {
- throw new TypeError("Invalid ReadableStream.");
- }
- return readableStreamTee(this, false);
- }
-
- [customInspect]() {
- return `${this.constructor.name} { locked: ${String(this.locked)} }`;
- }
-
- [Symbol.asyncIterator](
- options = {},
- ) {
- return this.getIterator(options);
+ if ("write" in underlyingSink) {
+ underlyingSinkDict.write = reflectApply(underlyingSink.write, true);
}
+ return underlyingSinkDict;
}
- class TransformStream {
- constructor(
- transformer = {},
- writableStrategy = {},
- readableStrategy = {},
- ) {
- const writableSizeFunction = writableStrategy.size;
- let writableHighWaterMark = writableStrategy.highWaterMark;
- const readableSizeFunction = readableStrategy.size;
- let readableHighWaterMark = readableStrategy.highWaterMark;
- const writableType = transformer.writableType;
- if (writableType !== undefined) {
- throw new RangeError(
- `Expected transformer writableType to be undefined, received "${
- String(writableType)
- }"`,
- );
- }
- const writableSizeAlgorithm = makeSizeAlgorithmFromSizeFunction(
- writableSizeFunction,
- );
- if (writableHighWaterMark === undefined) {
- writableHighWaterMark = 1;
- }
- writableHighWaterMark = validateAndNormalizeHighWaterMark(
- writableHighWaterMark,
- );
- const readableType = transformer.readableType;
- if (readableType !== undefined) {
- throw new RangeError(
- `Expected transformer readableType to be undefined, received "${
- String(readableType)
- }"`,
- );
- }
- const readableSizeAlgorithm = makeSizeAlgorithmFromSizeFunction(
- readableSizeFunction,
- );
- if (readableHighWaterMark === undefined) {
- readableHighWaterMark = 1;
- }
- readableHighWaterMark = validateAndNormalizeHighWaterMark(
- readableHighWaterMark,
- );
- const startPromise = getDeferred();
- initializeTransformStream(
- this,
- startPromise.promise,
- writableHighWaterMark,
- writableSizeAlgorithm,
- readableHighWaterMark,
- readableSizeAlgorithm,
- );
- // the brand check expects this, and the brand check occurs in the following
- // but the property hasn't been defined.
- Object.defineProperty(this, sym.transformStreamController, {
- value: undefined,
- writable: true,
- configurable: true,
- });
- setUpTransformStreamDefaultControllerFromTransformer(this, transformer);
- const startResult = invokeOrNoop(
- transformer,
- "start",
- this[sym.transformStreamController],
- );
- startPromise.resolve(startResult);
+ /**
+ * @template R
+ * @param {UnderlyingSource<R>} underlyingSource
+ * @returns {UnderlyingSource<R>}
+ */
+ function convertUnderlyingSource(underlyingSource) {
+ const underlyingSourceDict = Object.create(null);
+ if (underlyingSource === null) {
+ throw new TypeError("Underlying source cannot be null");
}
-
- get readable() {
- if (!isTransformStream(this)) {
- throw new TypeError("Invalid TransformStream.");
- }
- return this[sym.readable];
- }
-
- get writable() {
- if (!isTransformStream(this)) {
- throw new TypeError("Invalid TransformStream.");
- }
- return this[sym.writable];
- }
-
- [customInspect]() {
- return this.constructor.name;
+ if (underlyingSource === undefined) {
+ return underlyingSourceDict;
}
- }
-
- class TransformStreamDefaultController {
- constructor() {
- throw new TypeError(
- "TransformStreamDefaultController's constructor cannot be called.",
- );
+ if ("cancel" in underlyingSource) {
+ underlyingSourceDict.cancel = reflectApply(underlyingSource.cancel, true);
}
-
- get desiredSize() {
- if (!isTransformStreamDefaultController(this)) {
- throw new TypeError("Invalid TransformStreamDefaultController.");
- }
- const readableController = this[sym.controlledTransformStream][
- sym.readable
- ][sym.readableStreamController];
- return readableStreamDefaultControllerGetDesiredSize(
- readableController,
- );
+ if ("pull" in underlyingSource) {
+ underlyingSourceDict.pull = reflectApply(underlyingSource.pull, true);
}
-
- enqueue(chunk) {
- if (!isTransformStreamDefaultController(this)) {
- throw new TypeError("Invalid TransformStreamDefaultController.");
- }
- transformStreamDefaultControllerEnqueue(this, chunk);
+ if ("start" in underlyingSource) {
+ underlyingSourceDict.start = reflectApply(underlyingSource.start, false);
}
-
- error(reason) {
- if (!isTransformStreamDefaultController(this)) {
- throw new TypeError("Invalid TransformStreamDefaultController.");
+ if (underlyingSource.type !== undefined) {
+ if (underlyingSourceDict.type === null) {
+ throw new TypeError("type cannot be null");
}
- transformStreamDefaultControllerError(this, reason);
- }
-
- terminate() {
- if (!isTransformStreamDefaultController(this)) {
- throw new TypeError("Invalid TransformStreamDefaultController.");
+ const type = String(underlyingSource.type);
+ if (type !== "bytes") {
+ throw new TypeError("invalid underlying source type");
}
- transformStreamDefaultControllerTerminate(this);
- }
-
- [customInspect]() {
- return `${this.constructor.name} { desiredSize: ${
- String(this.desiredSize)
- } }`;
+ underlyingSourceDict.type = type;
}
+ return underlyingSourceDict;
}
- class WritableStreamDefaultController {
- constructor() {
- throw new TypeError(
- "WritableStreamDefaultController's constructor cannot be called.",
- );
- }
-
- error(e) {
- if (!isWritableStreamDefaultController(this)) {
- throw new TypeError("Invalid WritableStreamDefaultController.");
- }
- const state = this[sym.controlledWritableStream][sym.state];
- if (state !== "writable") {
- return;
- }
- writableStreamDefaultControllerError(this, e);
- }
-
- [sym.abortSteps](reason) {
- const result = this[sym.abortAlgorithm](reason);
- writableStreamDefaultControllerClearAlgorithms(this);
- return result;
- }
+ const originalPromise = Promise;
+ const originalPromiseThen = Promise.prototype.then;
- [sym.errorSteps]() {
- resetQueue(this);
- }
-
- [customInspect]() {
- return `${this.constructor.name} { }`;
- }
+ /**
+ * @template T
+ * @template TResult1
+ * @template TResult2
+ * @param {Promise<T>} promise
+ * @param {(value: T) => TResult1 | PromiseLike<TResult1>} onFulfilled
+ * @param {(reason: any) => TResult2 | PromiseLike<TResult2>=} onRejected
+ * @returns {Promise<TResult1 | TResult2>}
+ */
+ function performPromiseThen(promise, onFulfilled, onRejected) {
+ return originalPromiseThen.call(promise, onFulfilled, onRejected);
}
- class WritableStreamDefaultWriter {
- constructor(stream) {
- if (!isWritableStream(stream)) {
- throw new TypeError("Invalid stream.");
- }
- if (isWritableStreamLocked(stream)) {
- throw new TypeError("Cannot create a writer for a locked stream.");
- }
- this[sym.ownerWritableStream] = stream;
- stream[sym.writer] = this;
- const state = stream[sym.state];
- if (state === "writable") {
- if (
- !writableStreamCloseQueuedOrInFlight(stream) &&
- stream[sym.backpressure]
- ) {
- this[sym.readyPromise] = getDeferred();
- } else {
- this[sym.readyPromise] = { promise: Promise.resolve() };
- }
- this[sym.closedPromise] = getDeferred();
- } else if (state === "erroring") {
- this[sym.readyPromise] = {
- promise: Promise.reject(stream[sym.storedError]),
- };
- setPromiseIsHandledToTrue(this[sym.readyPromise].promise);
- this[sym.closedPromise] = getDeferred();
- } else if (state === "closed") {
- this[sym.readyPromise] = { promise: Promise.resolve() };
- this[sym.closedPromise] = { promise: Promise.resolve() };
- } else {
- assert(state === "errored");
- const storedError = stream[sym.storedError];
- this[sym.readyPromise] = { promise: Promise.reject(storedError) };
- setPromiseIsHandledToTrue(this[sym.readyPromise].promise);
- this[sym.closedPromise] = { promise: Promise.reject(storedError) };
- setPromiseIsHandledToTrue(this[sym.closedPromise].promise);
- }
- }
-
- get closed() {
- if (!isWritableStreamDefaultWriter(this)) {
- return Promise.reject(
- new TypeError("Invalid WritableStreamDefaultWriter."),
- );
- }
- return this[sym.closedPromise].promise;
- }
-
- get desiredSize() {
- if (!isWritableStreamDefaultWriter(this)) {
- throw new TypeError("Invalid WritableStreamDefaultWriter.");
- }
- if (!this[sym.ownerWritableStream]) {
- throw new TypeError("WritableStreamDefaultWriter has no owner.");
- }
- return writableStreamDefaultWriterGetDesiredSize(this);
- }
-
- get ready() {
- if (!isWritableStreamDefaultWriter(this)) {
- return Promise.reject(
- new TypeError("Invalid WritableStreamDefaultWriter."),
- );
- }
- return this[sym.readyPromise].promise;
- }
-
- abort(reason) {
- if (!isWritableStreamDefaultWriter(this)) {
- return Promise.reject(
- new TypeError("Invalid WritableStreamDefaultWriter."),
- );
- }
- if (!this[sym.ownerWritableStream]) {
- Promise.reject(
- new TypeError("WritableStreamDefaultWriter has no owner."),
- );
- }
- return writableStreamDefaultWriterAbort(this, reason);
- }
-
- close() {
- if (!isWritableStreamDefaultWriter(this)) {
- return Promise.reject(
- new TypeError("Invalid WritableStreamDefaultWriter."),
- );
- }
- const stream = this[sym.ownerWritableStream];
- if (!stream) {
- Promise.reject(
- new TypeError("WritableStreamDefaultWriter has no owner."),
- );
- }
- if (writableStreamCloseQueuedOrInFlight(stream)) {
- Promise.reject(
- new TypeError("Stream is in an invalid state to be closed."),
- );
- }
- return writableStreamDefaultWriterClose(this);
- }
-
- releaseLock() {
- if (!isWritableStreamDefaultWriter(this)) {
- throw new TypeError("Invalid WritableStreamDefaultWriter.");
- }
- const stream = this[sym.ownerWritableStream];
- if (!stream) {
- return;
- }
- assert(stream[sym.writer]);
- writableStreamDefaultWriterRelease(this);
- }
-
- write(chunk) {
- if (!isWritableStreamDefaultWriter(this)) {
- return Promise.reject(
- new TypeError("Invalid WritableStreamDefaultWriter."),
- );
- }
- if (!this[sym.ownerWritableStream]) {
- Promise.reject(
- new TypeError("WritableStreamDefaultWriter has no owner."),
- );
- }
- return writableStreamDefaultWriterWrite(this, chunk);
- }
-
- [customInspect]() {
- return `${this.constructor.name} { closed: Promise, desiredSize: ${
- String(this.desiredSize)
- }, ready: Promise }`;
- }
+ /**
+ * @template T
+ * @param {T | PromiseLike<T>} value
+ * @returns {Promise<T>}
+ */
+ function resolvePromiseWith(value) {
+ return new originalPromise((resolve) => resolve(value));
}
- class WritableStream {
- constructor(
- underlyingSink = {},
- strategy = {},
- ) {
- initializeWritableStream(this);
- const size = strategy.size;
- let highWaterMark = strategy.highWaterMark ?? 1;
- const { type } = underlyingSink;
- if (type !== undefined) {
- throw new RangeError(`Sink type of "${String(type)}" not supported.`);
- }
- const sizeAlgorithm = makeSizeAlgorithmFromSizeFunction(size);
- highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark);
- setUpWritableStreamDefaultControllerFromUnderlyingSink(
- this,
- underlyingSink,
- highWaterMark,
- sizeAlgorithm,
- );
- }
-
- get locked() {
- if (!isWritableStream(this)) {
- throw new TypeError("Invalid WritableStream.");
- }
- return isWritableStreamLocked(this);
- }
-
- abort(reason) {
- if (!isWritableStream(this)) {
- return Promise.reject(new TypeError("Invalid WritableStream."));
- }
- if (isWritableStreamLocked(this)) {
- return Promise.reject(
- new TypeError("Cannot abort a locked WritableStream."),
- );
- }
- return writableStreamAbort(this, reason);
- }
-
- close() {
- if (!isWritableStream(this)) {
- return Promise.reject(new TypeError("Invalid WritableStream."));
- }
- if (isWritableStreamLocked(this)) {
- return Promise.reject(
- new TypeError("Cannot abort a locked WritableStream."),
- );
- }
- if (writableStreamCloseQueuedOrInFlight(this)) {
- return Promise.reject(
- new TypeError("Cannot close an already closing WritableStream."),
- );
- }
- return writableStreamClose(this);
- }
-
- getWriter() {
- if (!isWritableStream(this)) {
- throw new TypeError("Invalid WritableStream.");
- }
- return acquireWritableStreamDefaultWriter(this);
- }
-
- [customInspect]() {
- return `${this.constructor.name} { locked: ${String(this.locked)} }`;
+ /** @param {any} e */
+ function rethrowAssertionErrorRejection(e) {
+ if (e && e instanceof AssertionError) {
+ queueMicrotask(() => {
+ console.error(`Internal Error: ${e.stack}`);
+ });
}
}
- function acquireReadableStreamDefaultReader(
- stream,
- forAuthorCode = false,
- ) {
- const reader = new ReadableStreamDefaultReader(stream);
- reader[sym.forAuthorCode] = forAuthorCode;
- return reader;
+ /** @param {Promise<any>} promise */
+ function setPromiseIsHandledToTrue(promise) {
+ performPromiseThen(promise, undefined, rethrowAssertionErrorRejection);
+ }
+
+ /**
+ * @template T
+ * @template TResult1
+ * @template TResult2
+ * @param {Promise<T>} promise
+ * @param {(value: T) => TResult1 | PromiseLike<TResult1>} fulfillmentHandler
+ * @param {(reason: any) => TResult2 | PromiseLike<TResult2>=} rejectionHandler
+ * @returns {Promise<TResult1 | TResult2>}
+ */
+ function transformPromiseWith(promise, fulfillmentHandler, rejectionHandler) {
+ return performPromiseThen(promise, fulfillmentHandler, rejectionHandler);
+ }
+
+ /**
+ * @template T
+ * @template TResult
+ * @param {Promise<T>} promise
+ * @param {(value: T) => TResult | PromiseLike<TResult>} onFulfilled
+ * @returns {void}
+ */
+ function uponFulfillment(promise, onFulfilled) {
+ uponPromise(promise, onFulfilled);
+ }
+
+ /**
+ * @template T
+ * @template TResult
+ * @param {Promise<T>} promise
+ * @param {(value: T) => TResult | PromiseLike<TResult>} onRejected
+ * @returns {void}
+ */
+ function uponRejection(promise, onRejected) {
+ uponPromise(promise, undefined, onRejected);
+ }
+
+ /**
+ * @template T
+ * @template TResult1
+ * @template TResult2
+ * @param {Promise<T>} promise
+ * @param {(value: T) => TResult1 | PromiseLike<TResult1>} onFulfilled
+ * @param {(reason: any) => TResult2 | PromiseLike<TResult2>=} onRejected
+ * @returns {void}
+ */
+ function uponPromise(promise, onFulfilled, onRejected) {
+ performPromiseThen(
+ performPromiseThen(promise, onFulfilled, onRejected),
+ undefined,
+ rethrowAssertionErrorRejection,
+ );
+ }
+
+ const isFakeDetached = Symbol("<<detached>>");
+
+ /**
+ * @param {ArrayBufferLike} O
+ * @returns {boolean}
+ */
+ function isDetachedBuffer(O) {
+ return isFakeDetached in O;
+ }
+
+ /**
+ * @param {ArrayBufferLike} O
+ * @returns {ArrayBufferLike}
+ */
+ function transferArrayBuffer(O) {
+ assert(!isDetachedBuffer(O));
+ const transferredIshVersion = O.slice(0);
+ Object.defineProperty(O, "byteLength", {
+ get() {
+ return 0;
+ },
+ });
+ O[isFakeDetached] = true;
+ return transferredIshVersion;
}
- function acquireWritableStreamDefaultWriter(
- stream,
- ) {
+ const _abortAlgorithm = Symbol("[[abortAlgorithm]]");
+ const _abortSteps = Symbol("[[AbortSteps]]");
+ const _autoAllocateChunkSize = Symbol("[[autoAllocateChunkSize]]");
+ const _backpressure = Symbol("[[backpressure]]");
+ const _backpressureChangePromise = Symbol("[[backpressureChangePromise]]");
+ const _byobRequest = Symbol("[[byobRequest]]");
+ const _cancelAlgorithm = Symbol("[[cancelAlgorithm]]");
+ const _cancelSteps = Symbol("[[CancelSteps]]");
+ const _close = Symbol("close sentinel");
+ const _closeAlgorithm = Symbol("[[closeAlgorithm]]");
+ const _closedPromise = Symbol("[[closedPromise]]");
+ const _closeRequest = Symbol("[[closeRequest]]");
+ const _closeRequested = Symbol("[[closeRequested]]");
+ const _controller = Symbol("[[controller]]");
+ const _detached = Symbol("[[Detached]]");
+ const _disturbed = Symbol("[[disturbed]]");
+ const _errorSteps = Symbol("[[ErrorSteps]]");
+ const _flushAlgorithm = Symbol("[[flushAlgorithm]]");
+ const _globalObject = Symbol("[[globalObject]]");
+ const _inFlightCloseRequest = Symbol("[[inFlightCloseRequest]]");
+ const _inFlightWriteRequest = Symbol("[[inFlightWriteRequest]]");
+ const _pendingAbortRequest = Symbol("[pendingAbortRequest]");
+ const _preventCancel = Symbol("[[preventCancel]]");
+ const _pullAgain = Symbol("[[pullAgain]]");
+ const _pullAlgorithm = Symbol("[[pullAlgorithm]]");
+ const _pulling = Symbol("[[pulling]]");
+ const _pullSteps = Symbol("[[PullSteps]]");
+ const _queue = Symbol("[[queue]]");
+ const _queueTotalSize = Symbol("[[queueTotalSize]]");
+ const _readable = Symbol("[[readable]]");
+ const _reader = Symbol("[[reader]]");
+ const _readRequests = Symbol("[[readRequests]]");
+ const _readyPromise = Symbol("[[readyPromise]]");
+ const _started = Symbol("[[started]]");
+ const _state = Symbol("[[state]]");
+ const _storedError = Symbol("[[storedError]]");
+ const _strategyHWM = Symbol("[[strategyHWM]]");
+ const _strategySizeAlgorithm = Symbol("[[strategySizeAlgorithm]]");
+ const _stream = Symbol("[[stream]]");
+ const _transformAlgorithm = Symbol("[[transformAlgorithm]]");
+ const _writable = Symbol("[[writable]]");
+ const _writeAlgorithm = Symbol("[[writeAlgorithm]]");
+ const _writer = Symbol("[[writer]]");
+ const _writeRequests = Symbol("[[writeRequests]]");
+
+ /**
+ * @template R
+ * @param {ReadableStream<R>} stream
+ * @returns {ReadableStreamDefaultReader<R>}
+ */
+ function acquireReadableStreamDefaultReader(stream) {
+ return new ReadableStreamDefaultReader(stream);
+ }
+
+ /**
+ * @template W
+ * @param {WritableStream<W>} stream
+ * @returns {WritableStreamDefaultWriter<W>}
+ */
+ function acquireWritableStreamDefaultWriter(stream) {
return new WritableStreamDefaultWriter(stream);
}
- function call(
- fn,
- v,
- args,
- ) {
- return Function.prototype.apply.call(fn, v, args);
- }
-
- function createAlgorithmFromUnderlyingMethod(
- underlyingObject,
- methodName,
- algoArgCount,
- ...extraArgs
- ) {
- const method = underlyingObject[methodName];
- if (method) {
- if (!isCallable(method)) {
- throw new TypeError("method is not callable");
- }
- if (algoArgCount === 0) {
- // deno-lint-ignore require-await
- return async () => call(method, underlyingObject, extraArgs);
- } else {
- // deno-lint-ignore require-await
- return async (arg) => {
- const fullArgs = [arg, ...extraArgs];
- return call(method, underlyingObject, fullArgs);
- };
- }
- }
- // deno-lint-ignore require-await
- return async () => undefined;
- }
-
+ /**
+ * @template R
+ * @param {() => void} startAlgorithm
+ * @param {() => Promise<void>} pullAlgorithm
+ * @param {(reason: any) => Promise<void>} cancelAlgorithm
+ * @param {number=} highWaterMark
+ * @param {((chunk: R) => number)=} sizeAlgorithm
+ * @returns {ReadableStream<R>}
+ */
function createReadableStream(
startAlgorithm,
pullAlgorithm,
@@ -1094,14 +384,11 @@
highWaterMark = 1,
sizeAlgorithm = () => 1,
) {
- highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark);
- const stream = Object.create(
- ReadableStream.prototype,
- );
+ assert(isNonNegativeNumber(highWaterMark));
+ /** @type {ReadableStream} */
+ const stream = Object.create(ReadableStream.prototype);
initializeReadableStream(stream);
- const controller = Object.create(
- ReadableStreamDefaultController.prototype,
- );
+ const controller = Object.create(ReadableStreamDefaultController.prototype);
setUpReadableStreamDefaultController(
stream,
controller,
@@ -1114,20 +401,28 @@
return stream;
}
+ /**
+ * @template W
+ * @param {(controller: WritableStreamDefaultController<W>) => Promise<void>} startAlgorithm
+ * @param {(chunk: W) => Promise<void>} writeAlgorithm
+ * @param {() => Promise<void>} closeAlgorithm
+ * @param {(reason: any) => Promise<void>} abortAlgorithm
+ * @param {number} highWaterMark
+ * @param {(chunk: W) => number} sizeAlgorithm
+ * @returns {WritableStream<W>}
+ */
function createWritableStream(
startAlgorithm,
writeAlgorithm,
closeAlgorithm,
abortAlgorithm,
- highWaterMark = 1,
- sizeAlgorithm = () => 1,
+ highWaterMark,
+ sizeAlgorithm,
) {
- highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark);
+ assert(isNonNegativeNumber(highWaterMark));
const stream = Object.create(WritableStream.prototype);
initializeWritableStream(stream);
- const controller = Object.create(
- WritableStreamDefaultController.prototype,
- );
+ const controller = Object.create(WritableStreamDefaultController.prototype);
setUpWritableStreamDefaultController(
stream,
controller,
@@ -1141,51 +436,92 @@
return stream;
}
+ /**
+ * @template T
+ * @param {{ [_queue]: Array<ValueWithSize<T>>, [_queueTotalSize]: number }} container
+ * @returns {T}
+ */
function dequeueValue(container) {
- assert(sym.queue in container && sym.queueTotalSize in container);
- assert(container[sym.queue].length);
- const pair = container[sym.queue].shift();
- container[sym.queueTotalSize] -= pair.size;
- if (container[sym.queueTotalSize] <= 0) {
- container[sym.queueTotalSize] = 0;
+ assert(_queue in container && _queueTotalSize in container);
+ assert(container[_queue].length);
+ const valueWithSize = container[_queue].shift();
+ container[_queueTotalSize] -= valueWithSize.size;
+ if (container[_queueTotalSize] < 0) {
+ container[_queueTotalSize] = 0;
+ }
+ return valueWithSize.value;
+ }
+
+ /**
+ * @template T
+ * @param {{ [_queue]: Array<ValueWithSize<T | _close>>, [_queueTotalSize]: number }} container
+ * @param {T} value
+ * @param {number} size
+ * @returns {void}
+ */
+ function enqueueValueWithSize(container, value, size) {
+ assert(_queue in container && _queueTotalSize in container);
+ if (isNonNegativeNumber(size) === false) {
+ throw RangeError("chunk size isn't a positive number");
+ }
+ if (size === Infinity) {
+ throw RangeError("chunk size is invalid");
+ }
+ container[_queue].push({ value, size });
+ container[_queueTotalSize] += size;
+ }
+
+ /**
+ * @param {QueuingStrategy} strategy
+ * @param {number} defaultHWM
+ */
+ function extractHighWaterMark(strategy, defaultHWM) {
+ if (!("highWaterMark" in strategy)) {
+ return defaultHWM;
+ }
+ const highWaterMark = Number(strategy.highWaterMark);
+ if (Number.isNaN(highWaterMark) || highWaterMark < 0) {
+ throw RangeError(
+ `Expected highWaterMark to be a positive number or Infinity, got "${highWaterMark}".`,
+ );
}
- return pair.value;
- }
-
- function enqueueValueWithSize(
- container,
- value,
- size,
- ) {
- assert(sym.queue in container && sym.queueTotalSize in container);
- size = Number(size);
- if (!isFiniteNonNegativeNumber(size)) {
- throw new RangeError("size must be a finite non-negative number.");
- }
- container[sym.queue].push({ value, size });
- container[sym.queueTotalSize] += size;
- }
-
- /** Non-spec mechanism to "unwrap" a promise and store it to be resolved
- * later. */
- function getDeferred() {
- let resolve;
- let reject;
- const promise = new Promise((res, rej) => {
- resolve = res;
- reject = rej;
- });
- return { promise, resolve: resolve, reject: reject };
+ return highWaterMark;
}
- function initializeReadableStream(
- stream,
- ) {
- stream[sym.state] = "readable";
- stream[sym.reader] = stream[sym.storedError] = undefined;
- stream[sym.disturbed] = false;
- }
+ /**
+ * @template T
+ * @param {QueuingStrategy<T>} strategy
+ * @return {(chunk: T) => number}
+ */
+ function extractSizeAlgorithm(strategy) {
+ const { size } = strategy;
+ if (!size) {
+ return () => 1;
+ }
+ return (chunk) => size(chunk);
+ }
+
+ /**
+ * @param {ReadableStream} stream
+ * @returns {void}
+ */
+ function initializeReadableStream(stream) {
+ stream[_state] = "readable";
+ stream[_reader] = stream[_storedError] = undefined;
+ stream[_disturbed] = false;
+ }
+
+ /**
+ * @template I
+ * @template O
+ * @param {TransformStream<I, O>} stream
+ * @param {Deferred<void>} startPromise
+ * @param {number} writableHighWaterMark
+ * @param {(chunk: I) => number} writableSizeAlgorithm
+ * @param {number} readableHighWaterMark
+ * @param {(chunk: O) => number} readableSizeAlgorithm
+ */
function initializeTransformStream(
stream,
startPromise,
@@ -1194,14 +530,23 @@
readableHighWaterMark,
readableSizeAlgorithm,
) {
- const startAlgorithm = () => startPromise;
- const writeAlgorithm = (chunk) =>
- transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
- const abortAlgorithm = (reason) =>
- transformStreamDefaultSinkAbortAlgorithm(stream, reason);
- const closeAlgorithm = () =>
- transformStreamDefaultSinkCloseAlgorithm(stream);
- stream[sym.writable] = createWritableStream(
+ function startAlgorithm() {
+ return startPromise.promise;
+ }
+
+ function writeAlgorithm(chunk) {
+ return transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
+ }
+
+ function abortAlgorithm(reason) {
+ return transformStreamDefaultSinkAbortAlgorithm(stream, reason);
+ }
+
+ function closeAlgorithm() {
+ return transformStreamDefaultSinkCloseAlgorithm(stream);
+ }
+
+ stream[_writable] = createWritableStream(
startAlgorithm,
writeAlgorithm,
closeAlgorithm,
@@ -1209,235 +554,150 @@
writableHighWaterMark,
writableSizeAlgorithm,
);
- const pullAlgorithm = () =>
- transformStreamDefaultSourcePullAlgorithm(stream);
- const cancelAlgorithm = (reason) => {
+
+ function pullAlgorithm() {
+ return transformStreamDefaultSourcePullAlgorithm(stream);
+ }
+
+ function cancelAlgorithm(reason) {
transformStreamErrorWritableAndUnblockWrite(stream, reason);
- return Promise.resolve(undefined);
- };
- stream[sym.readable] = createReadableStream(
+ return resolvePromiseWith(undefined);
+ }
+
+ stream[_readable] = createReadableStream(
startAlgorithm,
pullAlgorithm,
cancelAlgorithm,
readableHighWaterMark,
readableSizeAlgorithm,
);
- stream[sym.backpressure] = stream[sym.backpressureChangePromise] =
- undefined;
- transformStreamSetBackpressure(stream, true);
- Object.defineProperty(stream, sym.transformStreamController, {
- value: undefined,
- configurable: true,
- });
- }
- function initializeWritableStream(
- stream,
- ) {
- stream[sym.state] = "writable";
- stream[sym.storedError] = stream[sym.writer] = stream[
- sym.writableStreamController
- ] = stream[sym.inFlightWriteRequest] = stream[sym.closeRequest] = stream[
- sym.inFlightCloseRequest
- ] = stream[sym.pendingAbortRequest] = undefined;
- stream[sym.writeRequests] = [];
- stream[sym.backpressure] = false;
- }
-
- function invokeOrNoop(
- o,
- p,
- ...args
- ) {
- assert(o);
- const method = o[p];
- if (!method) {
- return undefined;
+ stream[_backpressure] = stream[_backpressureChangePromise] = undefined;
+ transformStreamSetBackpressure(stream, true);
+ stream[_controller] = undefined;
+ }
+
+ /** @param {WritableStream} stream */
+ function initializeWritableStream(stream) {
+ stream[_state] = "writable";
+ stream[_storedError] = stream[_writer] = stream[_controller] =
+ stream[_inFlightWriteRequest] = stream[_closeRequest] =
+ stream[_inFlightCloseRequest] = stream[_pendingAbortRequest] =
+ undefined;
+ stream[_writeRequests] = [];
+ stream[_backpressure] = false;
+ }
+
+ /**
+ * @param {unknown} v
+ * @returns {v is number}
+ */
+ function isNonNegativeNumber(v) {
+ if (typeof v !== "number") {
+ return false;
}
- return call(method, o, args);
- }
-
- function isCallable(value) {
- return typeof value === "function";
- }
-
- function isDetachedBuffer(value) {
- return sym.isFakeDetached in value;
- }
-
- function isFiniteNonNegativeNumber(v) {
- return Number.isFinite(v) && (v) >= 0;
- }
-
- function isReadableByteStreamController(
- x,
- ) {
- return !(
- typeof x !== "object" ||
- x === null ||
- !(sym.controlledReadableByteStream in x)
- );
- }
-
- function isReadableStream(x) {
- return !(
- typeof x !== "object" ||
- x === null ||
- !(sym.readableStreamController in x)
- );
- }
-
- function isReadableStreamAsyncIterator(
- x,
- ) {
- if (typeof x !== "object" || x === null) {
+ if (Number.isNaN(v)) {
return false;
}
- return sym.asyncIteratorReader in x;
+ if (v < 0) {
+ return false;
+ }
+ return true;
}
- function isReadableStreamDefaultController(
- x,
- ) {
- return !(
- typeof x !== "object" ||
- x === null ||
- !(sym.controlledReadableStream in x)
- );
+ /**
+ * @param {unknown} value
+ * @returns {value is ReadableStream}
+ */
+ function isReadableStream(value) {
+ return !(typeof value !== "object" || value === null ||
+ !(_controller in value));
}
- function isReadableStreamDefaultReader(
- x,
- ) {
- return !(typeof x !== "object" || x === null || !(sym.readRequests in x));
+ /**
+ * @param {ReadableStream} stream
+ * @returns {boolean}
+ */
+ function isReadableStreamLocked(stream) {
+ if (stream[_reader] === undefined) {
+ return false;
+ }
+ return true;
}
- function isReadableStreamLocked(stream) {
- assert(isReadableStream(stream));
- return !!stream[sym.reader];
+ /**
+ * @param {unknown} value
+ * @returns {value is ReadableStreamDefaultReader}
+ */
+ function isReadableStreamDefaultReader(value) {
+ return !(typeof value !== "object" || value === null ||
+ !(_readRequests in value));
}
+ /**
+ * @param {ReadableStream} stream
+ * @returns {boolean}
+ */
function isReadableStreamDisturbed(stream) {
assert(isReadableStream(stream));
- return !!stream[sym.disturbed];
+ return stream[_disturbed];
}
- function isTransformStream(x) {
- return !(
- typeof x !== "object" ||
- x === null ||
- !(sym.transformStreamController in x)
- );
- }
-
- function isTransformStreamDefaultController(
- x,
- ) {
- return !(
- typeof x !== "object" ||
- x === null ||
- !(sym.controlledTransformStream in x)
- );
- }
-
- function isWritableStream(x) {
- return !(
- typeof x !== "object" ||
- x === null ||
- !(sym.writableStreamController in x)
- );
- }
-
- function isWritableStreamDefaultController(
- x,
- ) {
- return !(
- typeof x !== "object" ||
- x === null ||
- !(sym.controlledWritableStream in x)
- );
- }
-
- function isWritableStreamDefaultWriter(
- x,
- ) {
- return !(
- typeof x !== "object" ||
- x === null ||
- !(sym.ownerWritableStream in x)
- );
+ /**
+ * @param {unknown} value
+ * @returns {value is WritableStream}
+ */
+ function isWritableStream(value) {
+ return !(typeof value !== "object" || value === null ||
+ !(_controller in value));
}
+ /**
+ * @param {WritableStream} stream
+ * @returns {boolean}
+ */
function isWritableStreamLocked(stream) {
- assert(isWritableStream(stream));
- return stream[sym.writer] !== undefined;
- }
-
- function makeSizeAlgorithmFromSizeFunction(
- size,
- ) {
- if (size === undefined) {
- return () => 1;
- }
- if (typeof size !== "function") {
- throw new TypeError("size must be callable.");
+ if (stream[_writer] === undefined) {
+ return false;
}
- return (chunk) => {
- return size.call(undefined, chunk);
- };
+ return true;
}
+ /**
+ * @template T
+ * @param {{ [_queue]: Array<ValueWithSize<T | _close>>, [_queueTotalSize]: number }} container
+ * @returns {T | _close}
+ */
function peekQueueValue(container) {
- assert(sym.queue in container && sym.queueTotalSize in container);
- assert(container[sym.queue].length);
- const [pair] = container[sym.queue];
- return pair.value;
+ assert(_queue in container && _queueTotalSize in container);
+ assert(container[_queue].length);
+ const valueWithSize = container[_queue][0];
+ return valueWithSize.value;
}
- function readableByteStreamControllerShouldCallPull(
- controller,
- ) {
- const stream = controller[sym.controlledReadableByteStream];
- if (
- stream[sym.state] !== "readable" ||
- controller[sym.closeRequested] ||
- !controller[sym.started]
- ) {
- return false;
- }
- if (
- readableStreamHasDefaultReader(stream) &&
- readableStreamGetNumReadRequests(stream) > 0
- ) {
- return true;
- }
- // 3.13.25.6 If ! ReadableStreamHasBYOBReader(stream) is true and !
- // ReadableStreamGetNumReadIntoRequests(stream) > 0, return true.
- const desiredSize = readableByteStreamControllerGetDesiredSize(controller);
- assert(desiredSize !== null);
- return desiredSize > 0;
- }
-
- function readableByteStreamControllerCallPullIfNeeded(
- controller,
- ) {
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @returns {void}
+ */
+ function readableByteStreamControllerCallPullIfNeeded(controller) {
const shouldPull = readableByteStreamControllerShouldCallPull(controller);
if (!shouldPull) {
return;
}
- if (controller[sym.pulling]) {
- controller[sym.pullAgain] = true;
+ if (controller[_pulling]) {
+ controller[_pullAgain] = true;
return;
}
- assert(controller[sym.pullAgain] === false);
- controller[sym.pulling] = true;
- const pullPromise = controller[sym.pullAlgorithm]();
+ assert(controller[_pullAgain] === false);
+ controller[_pulling] = true;
+ /** @type {Promise<void>} */
+ const pullPromise = controller[_pullAlgorithm](controller);
setPromiseIsHandledToTrue(
pullPromise.then(
() => {
- controller[sym.pulling] = false;
- if (controller[sym.pullAgain]) {
- controller[sym.pullAgain] = false;
+ controller[_pulling] = false;
+ if (controller[_pullAgain]) {
+ controller[_pullAgain] = false;
readableByteStreamControllerCallPullIfNeeded(controller);
}
},
@@ -1448,22 +708,43 @@
);
}
- function readableByteStreamControllerClearAlgorithms(
- controller,
- ) {
- controller[sym.pullAlgorithm] = undefined;
- controller[sym.cancelAlgorithm] = undefined;
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @returns {void}
+ */
+ function readableByteStreamControllerClearAlgorithms(controller) {
+ controller[_pullAlgorithm] = undefined;
+ controller[_cancelAlgorithm] = undefined;
}
- function readableByteStreamControllerClose(
- controller,
- ) {
- const stream = controller[sym.controlledReadableByteStream];
- if (controller[sym.closeRequested] || stream[sym.state] !== "readable") {
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @param {any} e
+ */
+ function readableByteStreamControllerError(controller, e) {
+ /** @type {ReadableStream<ArrayBuffer>} */
+ const stream = controller[_stream];
+ if (stream[_state] !== "readable") {
return;
}
- if (controller[sym.queueTotalSize] > 0) {
- controller[sym.closeRequested] = true;
+ // 3. Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
+ resetQueue(controller);
+ readableByteStreamControllerClearAlgorithms(controller);
+ readableStreamError(stream, e);
+ }
+
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @returns {void}
+ */
+ function readableByteStreamControllerClose(controller) {
+ /** @type {ReadableStream<ArrayBuffer>} */
+ const stream = controller[_stream];
+ if (controller[_closeRequested] || stream[_state] !== "readable") {
+ return;
+ }
+ if (controller[_queueTotalSize] > 0) {
+ controller[_closeRequested] = true;
return;
}
// 3.13.6.4 If controller.[[pendingPullIntos]] is not empty, (BYOB Support)
@@ -1471,14 +752,20 @@
readableStreamClose(stream);
}
- function readableByteStreamControllerEnqueue(
- controller,
- chunk,
- ) {
- const stream = controller[sym.controlledReadableByteStream];
- if (controller[sym.closeRequested] || stream[sym.state] !== "readable") {
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @param {ArrayBufferView} chunk
+ */
+ function readableByteStreamControllerEnqueue(controller, chunk) {
+ /** @type {ReadableStream<ArrayBuffer>} */
+ const stream = controller[_stream];
+ if (
+ controller[_closeRequested] ||
+ controller[_stream][_state] !== "readable"
+ ) {
return;
}
+
const { buffer, byteOffset, byteLength } = chunk;
const transferredBuffer = transferArrayBuffer(buffer);
if (readableStreamHasDefaultReader(stream)) {
@@ -1490,7 +777,7 @@
byteLength,
);
} else {
- assert(controller[sym.queue].length === 0);
+ assert(controller[_queue].length === 0);
const transferredView = new Uint8Array(
transferredBuffer,
byteOffset,
@@ -1498,9 +785,9 @@
);
readableStreamFulfillReadRequest(stream, transferredView, false);
}
- // 3.13.9.8 Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true
+ // 8 Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true,
} else {
- assert(!isReadableStreamLocked(stream));
+ assert(isReadableStreamLocked(stream) === false);
readableByteStreamControllerEnqueueChunkToQueue(
controller,
transferredBuffer,
@@ -1511,263 +798,299 @@
readableByteStreamControllerCallPullIfNeeded(controller);
}
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @param {ArrayBufferLike} buffer
+ * @param {number} byteOffset
+ * @param {number} byteLength
+ * @returns {void}
+ */
function readableByteStreamControllerEnqueueChunkToQueue(
controller,
buffer,
byteOffset,
byteLength,
) {
- controller[sym.queue].push({
- value: buffer,
- offset: byteOffset,
- size: byteLength,
- });
- controller[sym.queueTotalSize] += byteLength;
- }
-
- function readableByteStreamControllerError(
- controller,
- e,
- ) {
- const stream = controller[sym.controlledReadableByteStream];
- if (stream[sym.state] !== "readable") {
- return;
- }
- // 3.13.11.3 Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
- resetQueue(controller);
- readableByteStreamControllerClearAlgorithms(controller);
- readableStreamError(stream, e);
+ controller[_queue].push({ buffer, byteOffset, byteLength });
+ controller[_queueTotalSize] += byteLength;
}
- function readableByteStreamControllerGetDesiredSize(
- controller,
- ) {
- const stream = controller[sym.controlledReadableByteStream];
- const state = stream[sym.state];
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @returns {number | null}
+ */
+ function readableByteStreamControllerGetDesiredSize(controller) {
+ const state = controller[_stream][_state];
if (state === "errored") {
return null;
}
if (state === "closed") {
return 0;
}
- return controller[sym.strategyHWM] - controller[sym.queueTotalSize];
+ return controller[_strategyHWM] - controller[_queueTotalSize];
}
- function readableByteStreamControllerHandleQueueDrain(
- controller,
- ) {
- assert(
- controller[sym.controlledReadableByteStream][sym.state] === "readable",
- );
+ /**
+ * @param {{ [_queue]: any[], [_queueTotalSize]: number }} container
+ * @returns {void}
+ */
+ function resetQueue(container) {
+ container[_queue] = [];
+ container[_queueTotalSize] = 0;
+ }
+
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @returns {void}
+ */
+ function readableByteStreamControllerHandleQueueDrain(controller) {
+ assert(controller[_stream][_state] === "readable");
if (
- controller[sym.queueTotalSize] === 0 && controller[sym.closeRequested]
+ controller[_queueTotalSize] === 0 && controller[_closeRequested]
) {
readableByteStreamControllerClearAlgorithms(controller);
- readableStreamClose(controller[sym.controlledReadableByteStream]);
+ readableStreamClose(controller[_stream]);
} else {
readableByteStreamControllerCallPullIfNeeded(controller);
}
}
- function readableStreamAddReadRequest(
- stream,
- ) {
- assert(isReadableStreamDefaultReader(stream[sym.reader]));
- assert(stream[sym.state] === "readable");
- const promise = getDeferred();
- stream[sym.reader][sym.readRequests].push(promise);
- return promise.promise;
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @returns {boolean}
+ */
+ function readableByteStreamControllerShouldCallPull(controller) {
+ /** @type {ReadableStream<ArrayBuffer>} */
+ const stream = controller[_stream];
+ if (
+ stream[_state] !== "readable" ||
+ controller[_closeRequested] ||
+ !controller[_started]
+ ) {
+ return false;
+ }
+ if (
+ readableStreamHasDefaultReader(stream) &&
+ readableStreamGetNumReadRequests(stream) > 0
+ ) {
+ return true;
+ }
+ // 3.13.25.6 If ! ReadableStreamHasBYOBReader(stream) is true and !
+ // ReadableStreamGetNumReadIntoRequests(stream) > 0, return true.
+ const desiredSize = readableByteStreamControllerGetDesiredSize(controller);
+ assert(desiredSize !== null);
+ return desiredSize > 0;
}
- function readableStreamCancel(
- stream,
- reason,
- ) {
- stream[sym.disturbed] = true;
- if (stream[sym.state] === "closed") {
- return Promise.resolve();
- }
- if (stream[sym.state] === "errored") {
- return Promise.reject(stream[sym.storedError]);
+ /**
+ * @template R
+ * @param {ReadableStream<R>} stream
+ * @param {ReadRequest<R>} readRequest
+ * @returns {void}
+ */
+ function readableStreamAddReadRequest(stream, readRequest) {
+ assert(isReadableStreamDefaultReader(stream[_reader]));
+ assert(stream[_state] === "readable");
+ stream[_reader][_readRequests].push(readRequest);
+ }
+
+ /**
+ * @template R
+ * @param {ReadableStream<R>} stream
+ * @param {any=} reason
+ * @returns {Promise<void>}
+ */
+ function readableStreamCancel(stream, reason) {
+ stream[_disturbed] = true;
+ if (stream[_state] === "closed") {
+ return resolvePromiseWith(undefined);
+ }
+ if (stream[_state] === "errored") {
+ return Promise.reject(stream[_storedError]);
}
readableStreamClose(stream);
- return stream[sym.readableStreamController][sym.cancelSteps](reason).then(
- () => undefined,
- );
+ /** @type {Promise<void>} */
+ const sourceCancelPromise = stream[_controller][_cancelSteps](reason);
+ return sourceCancelPromise.then(() => undefined);
}
+ /**
+ * @template R
+ * @param {ReadableStream<R>} stream
+ * @returns {void}
+ */
function readableStreamClose(stream) {
- assert(stream[sym.state] === "readable");
- stream[sym.state] = "closed";
- const reader = stream[sym.reader];
+ assert(stream[_state] === "readable");
+ stream[_state] = "closed";
+ /** @type {ReadableStreamDefaultReader<R> | undefined} */
+ const reader = stream[_reader];
if (!reader) {
return;
}
if (isReadableStreamDefaultReader(reader)) {
- for (const readRequest of reader[sym.readRequests]) {
- assert(readRequest.resolve);
- readRequest.resolve(
- readableStreamCreateReadResult(
- undefined,
- true,
- reader[sym.forAuthorCode],
- ),
- );
+ /** @type {Array<ReadRequest<R>>} */
+ const readRequests = reader[_readRequests];
+ for (const readRequest of readRequests) {
+ readRequest.closeSteps();
}
- reader[sym.readRequests] = [];
+ reader[_readRequests] = [];
}
- const resolve = reader[sym.closedPromise].resolve;
- assert(resolve);
- resolve();
- }
-
- function readableStreamCreateReadResult(
- value,
- done,
- forAuthorCode,
- ) {
- const prototype = forAuthorCode ? Object.prototype : null;
- assert(typeof done === "boolean");
- const obj = Object.create(prototype);
- Object.defineProperties(obj, {
- value: { value, writable: true, enumerable: true, configurable: true },
- done: {
- value: done,
- writable: true,
- enumerable: true,
- configurable: true,
- },
- });
- return obj;
+ // This promise can be double resolved.
+ // See: https://github.com/whatwg/streams/issues/1100
+ reader[_closedPromise].resolve(undefined);
}
- function readableStreamDefaultControllerCallPullIfNeeded(
- controller,
- ) {
- const shouldPull = readableStreamDefaultControllerShouldCallPull(
+ /** @param {ReadableStreamDefaultController<any>} controller */
+ function readableStreamDefaultControllerCallPullIfNeeded(controller) {
+ const shouldPull = readableStreamDefaultcontrollerShouldCallPull(
controller,
);
- if (!shouldPull) {
+ if (shouldPull === false) {
return;
}
- if (controller[sym.pulling]) {
- controller[sym.pullAgain] = true;
+ if (controller[_pulling] === true) {
+ controller[_pullAgain] = true;
return;
}
- assert(controller[sym.pullAgain] === false);
- controller[sym.pulling] = true;
- const pullPromise = controller[sym.pullAlgorithm]();
- pullPromise.then(
- () => {
- controller[sym.pulling] = false;
- if (controller[sym.pullAgain]) {
- controller[sym.pullAgain] = false;
- readableStreamDefaultControllerCallPullIfNeeded(controller);
- }
- },
- (e) => {
- readableStreamDefaultControllerError(controller, e);
- },
- );
+ assert(controller[_pullAgain] === false);
+ controller[_pulling] = true;
+ const pullPromise = controller[_pullAlgorithm](controller);
+ uponFulfillment(pullPromise, () => {
+ controller[_pulling] = false;
+ if (controller[_pullAgain] === true) {
+ controller[_pullAgain] = false;
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+ }
+ });
+ uponRejection(pullPromise, (e) => {
+ readableStreamDefaultControllerError(controller, e);
+ });
}
- function readableStreamDefaultControllerCanCloseOrEnqueue(
- controller,
- ) {
- const state = controller[sym.controlledReadableStream][sym.state];
- return !controller[sym.closeRequested] && state === "readable";
+ /**
+ * @param {ReadableStreamDefaultController<any>} controller
+ * @returns {boolean}
+ */
+ function readableStreamDefaultControllerCanCloseOrEnqueue(controller) {
+ const state = controller[_stream][_state];
+ if (controller[_closeRequested] === false && state === "readable") {
+ return true;
+ } else {
+ return false;
+ }
}
- function readableStreamDefaultControllerClearAlgorithms(
- controller,
- ) {
- controller[sym.pullAlgorithm] = undefined;
- controller[sym.cancelAlgorithm] = undefined;
- controller[sym.strategySizeAlgorithm] = undefined;
+ /** @param {ReadableStreamDefaultController<any>} controller */
+ function readableStreamDefaultControllerClearAlgorithms(controller) {
+ controller[_pullAlgorithm] = undefined;
+ controller[_cancelAlgorithm] = undefined;
+ controller[_strategySizeAlgorithm] = undefined;
}
- function readableStreamDefaultControllerClose(
- controller,
- ) {
- if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) {
+ /** @param {ReadableStreamDefaultController<any>} controller */
+ function readableStreamDefaultControllerClose(controller) {
+ if (
+ readableStreamDefaultControllerCanCloseOrEnqueue(controller) === false
+ ) {
return;
}
- const stream = controller[sym.controlledReadableStream];
- controller[sym.closeRequested] = true;
- if (controller[sym.queue].length === 0) {
+ const stream = controller[_stream];
+ controller[_closeRequested] = true;
+ if (controller[_queue].length === 0) {
readableStreamDefaultControllerClearAlgorithms(controller);
readableStreamClose(stream);
}
}
- function readableStreamDefaultControllerEnqueue(
- controller,
- chunk,
- ) {
- if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) {
+ /**
+ * @template R
+ * @param {ReadableStreamDefaultController<R>} controller
+ * @param {R} chunk
+ * @returns {void}
+ */
+ function readableStreamDefaultControllerEnqueue(controller, chunk) {
+ if (
+ readableStreamDefaultControllerCanCloseOrEnqueue(controller) === false
+ ) {
return;
}
- const stream = controller[sym.controlledReadableStream];
+ const stream = controller[_stream];
if (
- isReadableStreamLocked(stream) &&
+ isReadableStreamLocked(stream) === true &&
readableStreamGetNumReadRequests(stream) > 0
) {
readableStreamFulfillReadRequest(stream, chunk, false);
} else {
+ let chunkSize;
+ try {
+ chunkSize = controller[_strategySizeAlgorithm](chunk);
+ } catch (e) {
+ readableStreamDefaultControllerError(controller, e);
+ throw e;
+ }
+
try {
- const chunkSize = controller[sym.strategySizeAlgorithm](chunk);
enqueueValueWithSize(controller, chunk, chunkSize);
- } catch (err) {
- readableStreamDefaultControllerError(controller, err);
- throw err;
+ } catch (e) {
+ readableStreamDefaultControllerError(controller, e);
+ throw e;
}
}
readableStreamDefaultControllerCallPullIfNeeded(controller);
}
- function readableStreamDefaultControllerGetDesiredSize(
- controller,
- ) {
- const stream = controller[sym.controlledReadableStream];
- const state = stream[sym.state];
+ /**
+ * @param {ReadableStreamDefaultController<any>} controller
+ * @param {any} e
+ */
+ function readableStreamDefaultControllerError(controller, e) {
+ const stream = controller[_stream];
+ if (stream[_state] !== "readable") {
+ return;
+ }
+ resetQueue(controller);
+ readableStreamDefaultControllerClearAlgorithms(controller);
+ readableStreamError(stream, e);
+ }
+
+ /**
+ * @param {ReadableStreamDefaultController<any>} controller
+ * @returns {number | null}
+ */
+ function readableStreamDefaultControllerGetDesiredSize(controller) {
+ const state = controller[_stream][_state];
if (state === "errored") {
return null;
}
if (state === "closed") {
return 0;
}
- return controller[sym.strategyHWM] - controller[sym.queueTotalSize];
+ return controller[_strategyHWM] - controller[_queueTotalSize];
}
- function readableStreamDefaultControllerError(
- controller,
- e,
- ) {
- const stream = controller[sym.controlledReadableStream];
- if (stream[sym.state] !== "readable") {
- return;
+ /** @param {ReadableStreamDefaultController} controller */
+ function readableStreamDefaultcontrollerHasBackpressure(controller) {
+ if (readableStreamDefaultcontrollerShouldCallPull(controller) === true) {
+ return false;
+ } else {
+ return true;
}
- resetQueue(controller);
- readableStreamDefaultControllerClearAlgorithms(controller);
- readableStreamError(stream, e);
- }
-
- function readableStreamDefaultControllerHasBackpressure(
- controller,
- ) {
- return readableStreamDefaultControllerShouldCallPull(controller);
}
- function readableStreamDefaultControllerShouldCallPull(
- controller,
- ) {
- const stream = controller[sym.controlledReadableStream];
+ /**
+ * @param {ReadableStreamDefaultController<any>} controller
+ * @returns {boolean}
+ */
+ function readableStreamDefaultcontrollerShouldCallPull(controller) {
+ const stream = controller[_stream];
if (
- !readableStreamDefaultControllerCanCloseOrEnqueue(controller) ||
- controller[sym.started] === false
+ readableStreamDefaultControllerCanCloseOrEnqueue(controller) === false
) {
return false;
}
+ if (controller[_started] === false) {
+ return false;
+ }
if (
isReadableStreamLocked(stream) &&
readableStreamGetNumReadRequests(stream) > 0
@@ -1778,84 +1101,116 @@
controller,
);
assert(desiredSize !== null);
- return desiredSize > 0;
+ if (desiredSize > 0) {
+ return true;
+ }
+ return false;
}
- function readableStreamDefaultReaderRead(
- reader,
- ) {
- const stream = reader[sym.ownerReadableStream];
+ /**
+ * @template R
+ * @param {ReadableStreamDefaultReader<R>} reader
+ * @param {ReadRequest<R>} readRequest
+ * @returns {void}
+ */
+ function readableStreamDefaultReaderRead(reader, readRequest) {
+ const stream = reader[_stream];
assert(stream);
- stream[sym.disturbed] = true;
- if (stream[sym.state] === "closed") {
- return Promise.resolve(
- readableStreamCreateReadResult(
- undefined,
- true,
- reader[sym.forAuthorCode],
- ),
- );
- }
- if (stream[sym.state] === "errored") {
- return Promise.reject(stream[sym.storedError]);
+ stream[_disturbed] = true;
+ if (stream[_state] === "closed") {
+ readRequest.closeSteps();
+ } else if (stream[_state] === "errored") {
+ readRequest.errorSteps(stream[_storedError]);
+ } else {
+ assert(stream[_state] === "readable");
+ stream[_controller][_pullSteps](readRequest);
}
- assert(stream[sym.state] === "readable");
- return (stream[
- sym.readableStreamController
- ])[sym.pullSteps]();
}
+ /**
+ * @template R
+ * @param {ReadableStream<R>} stream
+ * @param {any} e
+ */
function readableStreamError(stream, e) {
- assert(isReadableStream(stream));
- assert(stream[sym.state] === "readable");
- stream[sym.state] = "errored";
- stream[sym.storedError] = e;
- const reader = stream[sym.reader];
+ assert(stream[_state] === "readable");
+ stream[_state] = "errored";
+ stream[_storedError] = e;
+ /** @type {ReadableStreamDefaultReader<R> | undefined} */
+ const reader = stream[_reader];
if (reader === undefined) {
return;
}
if (isReadableStreamDefaultReader(reader)) {
- for (const readRequest of reader[sym.readRequests]) {
- assert(readRequest.reject);
- readRequest.reject(e);
- readRequest.reject = undefined;
- readRequest.resolve = undefined;
+ /** @type {Array<ReadRequest<R>>} */
+ const readRequests = reader[_readRequests];
+ for (const readRequest of readRequests) {
+ readRequest.errorSteps(e);
}
- reader[sym.readRequests] = [];
+ reader[_readRequests] = [];
}
// 3.5.6.8 Otherwise, support BYOB Reader
- reader[sym.closedPromise].reject(e);
- reader[sym.closedPromise].reject = undefined;
- reader[sym.closedPromise].resolve = undefined;
- setPromiseIsHandledToTrue(reader[sym.closedPromise].promise);
+ /** @type {Deferred<void>} */
+ const closedPromise = reader[_closedPromise];
+ console.log("closedPromise rejected");
+ closedPromise.reject(e);
+ setPromiseIsHandledToTrue(closedPromise.promise);
}
- function readableStreamFulfillReadRequest(
- stream,
- chunk,
- done,
- ) {
- const reader = stream[sym.reader];
- const readRequest = reader[sym.readRequests].shift();
- assert(readRequest.resolve);
- readRequest.resolve(
- readableStreamCreateReadResult(chunk, done, reader[sym.forAuthorCode]),
- );
+ /**
+ * @template R
+ * @param {ReadableStream<R>} stream
+ * @param {R} chunk
+ * @param {boolean} done
+ */
+ function readableStreamFulfillReadRequest(stream, chunk, done) {
+ assert(readableStreamHasDefaultReader(stream) === true);
+ /** @type {ReadableStreamDefaultReader<R>} */
+ const reader = stream[_reader];
+ assert(reader[_readRequests].length);
+ /** @type {ReadRequest<R>} */
+ const readRequest = reader[_readRequests].shift();
+ if (done) {
+ readRequest.closeSteps();
+ } else {
+ readRequest.chunkSteps(chunk);
+ }
}
- function readableStreamGetNumReadRequests(
- stream,
- ) {
- return stream[sym.reader]?.[sym.readRequests].length ?? 0;
+ /**
+ * @param {ReadableStream} stream
+ * @return {number}
+ */
+ function readableStreamGetNumReadRequests(stream) {
+ assert(readableStreamHasDefaultReader(stream) === true);
+ return stream[_reader][_readRequests].length;
}
- function readableStreamHasDefaultReader(
- stream,
- ) {
- const reader = stream[sym.reader];
- return !(reader === undefined || !isReadableStreamDefaultReader(reader));
+ /**
+ * @param {ReadableStream} stream
+ * @returns {boolean}
+ */
+ function readableStreamHasDefaultReader(stream) {
+ const reader = stream[_reader];
+ if (reader === undefined) {
+ return false;
+ }
+ if (isReadableStreamDefaultReader(reader)) {
+ return true;
+ }
+ return false;
}
+ /**
+ * @template T
+ * @param {ReadableStream<T>} source
+ * @param {WritableStream<T>} dest
+ * @param {boolean} preventClose
+ * @param {boolean} preventAbort
+ * @param {boolean} preventCancel
+ * @param {AbortSignal=} signal
+ * @returns {Promise<void>}
+ */
function readableStreamPipeTo(
source,
dest,
@@ -1867,8 +1222,7 @@
assert(isReadableStream(source));
assert(isWritableStream(dest));
assert(
- typeof preventClose === "boolean" &&
- typeof preventAbort === "boolean" &&
+ typeof preventClose === "boolean" && typeof preventAbort === "boolean" &&
typeof preventCancel === "boolean",
);
assert(signal === undefined || signal instanceof AbortSignal);
@@ -1876,29 +1230,33 @@
assert(!isWritableStreamLocked(dest));
const reader = acquireReadableStreamDefaultReader(source);
const writer = acquireWritableStreamDefaultWriter(dest);
- source[sym.disturbed] = true;
+ source[_disturbed] = true;
let shuttingDown = false;
- const promise = getDeferred();
+ let currentWrite = resolvePromiseWith(undefined);
+ /** @type {Deferred<void>} */
+ const promise = new Deferred();
+ /** @type {() => void} */
let abortAlgorithm;
if (signal) {
abortAlgorithm = () => {
- const error = new DOMException("Abort signal received.", "AbortSignal");
+ const error = new DOMException("Aborted", "AbortError");
+ /** @type {Array<() => Promise<void>>} */
const actions = [];
- if (!preventAbort) {
+ if (preventAbort === false) {
actions.push(() => {
- if (dest[sym.state] === "writable") {
+ if (dest[_state] === "writable") {
return writableStreamAbort(dest, error);
} else {
- return Promise.resolve(undefined);
+ return resolvePromiseWith(undefined);
}
});
}
- if (!preventCancel) {
+ if (preventCancel === false) {
actions.push(() => {
- if (source[sym.state] === "readable") {
+ if (source[_state] === "readable") {
return readableStreamCancel(source, error);
} else {
- return Promise.resolve(undefined);
+ return resolvePromiseWith(undefined);
}
});
}
@@ -1908,6 +1266,7 @@
error,
);
};
+
if (signal.aborted) {
abortAlgorithm();
return promise.promise;
@@ -1915,342 +1274,391 @@
signal.addEventListener("abort", abortAlgorithm);
}
- let currentWrite = Promise.resolve();
+ function pipeLoop() {
+ return new Promise((resolveLoop, rejectLoop) => {
+ /** @param {boolean} done */
+ function next(done) {
+ if (done) {
+ resolveLoop();
+ } else {
+ uponPromise(pipeStep(), next, rejectLoop);
+ }
+ }
+ next(false);
+ });
+ }
+
+ /** @returns {Promise<boolean>} */
+ function pipeStep() {
+ if (shuttingDown === true) {
+ return resolvePromiseWith(true);
+ }
+
+ return transformPromiseWith(writer[_readyPromise].promise, () => {
+ return new Promise((resolveRead, rejectRead) => {
+ readableStreamDefaultReaderRead(
+ reader,
+ {
+ chunkSteps(chunk) {
+ currentWrite = transformPromiseWith(
+ writableStreamDefaultWriterWrite(writer, chunk),
+ undefined,
+ () => {},
+ );
+ resolveRead(false);
+ },
+ closeSteps() {
+ resolveRead(true);
+ },
+ errorSteps: rejectRead,
+ },
+ );
+ });
+ });
+ }
- // At this point, the spec becomes non-specific and vague. Most of the rest
- // of this code is based on the reference implementation that is part of the
- // specification. This is why the functions are only scoped to this function
- // to ensure they don't leak into the spec compliant parts.
+ isOrBecomesErrored(
+ source,
+ reader[_closedPromise].promise,
+ (storedError) => {
+ if (preventAbort === false) {
+ shutdownWithAction(
+ () => writableStreamAbort(dest, storedError),
+ true,
+ storedError,
+ );
+ } else {
+ shutdown(true, storedError);
+ }
+ },
+ );
- function isOrBecomesClosed(
- stream,
- promise,
- action,
- ) {
- if (stream[sym.state] === "closed") {
- action();
+ isOrBecomesErrored(dest, writer[_closedPromise].promise, (storedError) => {
+ if (preventCancel === false) {
+ shutdownWithAction(
+ () => readableStreamCancel(source, storedError),
+ true,
+ storedError,
+ );
} else {
- setPromiseIsHandledToTrue(promise.then(action));
+ shutdown(true, storedError);
}
- }
+ });
- function isOrBecomesErrored(
- stream,
- promise,
- action,
- ) {
- if (stream[sym.state] === "errored") {
- action(stream[sym.storedError]);
+ isOrBecomesClosed(source, reader[_closedPromise].promise, () => {
+ if (preventClose === false) {
+ shutdownWithAction(() =>
+ writableStreamDefaultWriterCloseWithErrorPropagation(writer)
+ );
} else {
- setPromiseIsHandledToTrue(promise.catch((error) => action(error)));
+ shutdown();
}
- }
-
- function finalize(isError, error) {
- writableStreamDefaultWriterRelease(writer);
- readableStreamReaderGenericRelease(reader);
+ });
- if (signal) {
- signal.removeEventListener("abort", abortAlgorithm);
- }
- if (isError) {
- promise.reject(error);
+ if (
+ writableStreamCloseQueuedOrInFlight(dest) === true ||
+ dest[_state] === "closed"
+ ) {
+ const destClosed = new TypeError(
+ "The destination writable stream closed before all the data could be piped to it.",
+ );
+ if (preventCancel === false) {
+ shutdownWithAction(
+ () => readableStreamCancel(source, destClosed),
+ true,
+ destClosed,
+ );
} else {
- promise.resolve();
+ shutdown(true, destClosed);
}
}
+ setPromiseIsHandledToTrue(pipeLoop());
+
+ return promise.promise;
+
+ /** @returns {Promise<void>} */
function waitForWritesToFinish() {
const oldCurrentWrite = currentWrite;
- return currentWrite.then(() =>
- oldCurrentWrite !== currentWrite ? waitForWritesToFinish() : undefined
+ return transformPromiseWith(
+ currentWrite,
+ () =>
+ oldCurrentWrite !== currentWrite
+ ? waitForWritesToFinish()
+ : undefined,
);
}
- function shutdownWithAction(
- action,
- originalIsError,
- originalError,
- ) {
+ /**
+ * @param {ReadableStream | WritableStream} stream
+ * @param {Promise<any>} promise
+ * @param {(e: any) => void} action
+ */
+ function isOrBecomesErrored(stream, promise, action) {
+ if (stream[_state] === "errored") {
+ action(stream[_storedError]);
+ } else {
+ uponRejection(promise, action);
+ }
+ }
+
+ /**
+ * @param {ReadableStream} stream
+ * @param {Promise<any>} promise
+ * @param {() => void} action
+ */
+ function isOrBecomesClosed(stream, promise, action) {
+ if (stream[_state] === "closed") {
+ action();
+ } else {
+ uponFulfillment(promise, action);
+ }
+ }
+
+ /**
+ * @param {() => Promise<void[] | void>} action
+ * @param {boolean=} originalIsError
+ * @param {any=} originalError
+ */
+ function shutdownWithAction(action, originalIsError, originalError) {
function doTheRest() {
- setPromiseIsHandledToTrue(
- action().then(
- () => finalize(originalIsError, originalError),
- (newError) => finalize(true, newError),
- ),
+ uponPromise(
+ action(),
+ () => finalize(originalIsError, originalError),
+ (newError) => finalize(true, newError),
);
}
- if (shuttingDown) {
+ if (shuttingDown === true) {
return;
}
shuttingDown = true;
if (
- dest[sym.state] === "writable" &&
+ dest[_state] === "writable" &&
writableStreamCloseQueuedOrInFlight(dest) === false
) {
- setPromiseIsHandledToTrue(waitForWritesToFinish().then(doTheRest));
+ uponFulfillment(waitForWritesToFinish(), doTheRest);
} else {
doTheRest();
}
}
+ /**
+ * @param {boolean=} isError
+ * @param {any=} error
+ */
function shutdown(isError, error) {
if (shuttingDown) {
return;
}
shuttingDown = true;
-
if (
- dest[sym.state] === "writable" &&
- !writableStreamCloseQueuedOrInFlight(dest)
+ dest[_state] === "writable" &&
+ writableStreamCloseQueuedOrInFlight(dest) === false
) {
- setPromiseIsHandledToTrue(
- waitForWritesToFinish().then(() => finalize(isError, error)),
+ uponFulfillment(
+ waitForWritesToFinish(),
+ () => finalize(isError, error),
);
+ } else {
+ finalize(isError, error);
}
- finalize(isError, error);
- }
-
- function pipeStep() {
- if (shuttingDown) {
- return Promise.resolve(true);
- }
- return writer[sym.readyPromise].promise.then(() => {
- return readableStreamDefaultReaderRead(reader).then(
- ({ value, done }) => {
- if (done === true) {
- return true;
- }
- currentWrite = writableStreamDefaultWriterWrite(
- writer,
- value,
- ).then(undefined, () => {});
- return false;
- },
- );
- });
}
- function pipeLoop() {
- return new Promise((resolveLoop, rejectLoop) => {
- function next(done) {
- if (done) {
- resolveLoop(undefined);
- } else {
- setPromiseIsHandledToTrue(pipeStep().then(next, rejectLoop));
- }
- }
- next(false);
- });
- }
-
- isOrBecomesErrored(
- source,
- reader[sym.closedPromise].promise,
- (storedError) => {
- if (!preventAbort) {
- shutdownWithAction(
- () => writableStreamAbort(dest, storedError),
- true,
- storedError,
- );
- } else {
- shutdown(true, storedError);
- }
- },
- );
-
- isOrBecomesErrored(
- dest,
- writer[sym.closedPromise].promise,
- (storedError) => {
- if (!preventCancel) {
- shutdownWithAction(
- () => readableStreamCancel(source, storedError),
- true,
- storedError,
- );
- } else {
- shutdown(true, storedError);
- }
- },
- );
+ /**
+ * @param {boolean=} isError
+ * @param {any=} error
+ */
+ function finalize(isError, error) {
+ writableStreamDefaultWriterRelease(writer);
+ readableStreamReaderGenericRelease(reader);
- isOrBecomesClosed(source, reader[sym.closedPromise].promise, () => {
- if (!preventClose) {
- shutdownWithAction(() =>
- writableStreamDefaultWriterCloseWithErrorPropagation(writer)
- );
+ if (signal !== undefined) {
+ signal.removeEventListener("abort", abortAlgorithm);
}
- });
-
- if (
- writableStreamCloseQueuedOrInFlight(dest) ||
- dest[sym.state] === "closed"
- ) {
- const destClosed = new TypeError(
- "The destination writable stream closed before all data could be piped to it.",
- );
- if (!preventCancel) {
- shutdownWithAction(
- () => readableStreamCancel(source, destClosed),
- true,
- destClosed,
- );
+ if (isError) {
+ promise.reject(error);
} else {
- shutdown(true, destClosed);
+ promise.resolve(undefined);
}
}
-
- setPromiseIsHandledToTrue(pipeLoop());
- return promise.promise;
}
- function readableStreamReaderGenericCancel(
- reader,
- reason,
- ) {
- const stream = reader[sym.ownerReadableStream];
- assert(stream);
+ /**
+ * @param {ReadableStreamGenericReader<any>} reader
+ * @param {any} reason
+ * @returns {Promise<void>}
+ */
+ function readableStreamReaderGenericCancel(reader, reason) {
+ const stream = reader[_stream];
+ assert(stream !== undefined);
return readableStreamCancel(stream, reason);
}
- function readableStreamReaderGenericInitialize(
- reader,
- stream,
- ) {
- reader[sym.forAuthorCode] = true;
- reader[sym.ownerReadableStream] = stream;
- stream[sym.reader] = reader;
- if (stream[sym.state] === "readable") {
- reader[sym.closedPromise] = getDeferred();
- } else if (stream[sym.state] === "closed") {
- reader[sym.closedPromise] = { promise: Promise.resolve() };
+ /**
+ * @template R
+ * @param {ReadableStreamDefaultReader<R>} reader
+ * @param {ReadableStream<R>} stream
+ */
+ function readableStreamReaderGenericInitialize(reader, stream) {
+ reader[_stream] = stream;
+ stream[_reader] = reader;
+ if (stream[_state] === "readable") {
+ reader[_closedPromise] = new Deferred();
+ } else if (stream[_state] === "closed") {
+ reader[_closedPromise] = new Deferred();
+ reader[_closedPromise].resolve(undefined);
} else {
- assert(stream[sym.state] === "errored");
- reader[sym.closedPromise] = {
- promise: Promise.reject(stream[sym.storedError]),
- };
- setPromiseIsHandledToTrue(reader[sym.closedPromise].promise);
- }
- }
-
- function readableStreamReaderGenericRelease(
- reader,
- ) {
- assert(reader[sym.ownerReadableStream]);
- assert(reader[sym.ownerReadableStream][sym.reader] === reader);
- const closedPromise = reader[sym.closedPromise];
- if (reader[sym.ownerReadableStream][sym.state] === "readable") {
- assert(closedPromise.reject);
- closedPromise.reject(new TypeError("ReadableStream state is readable."));
+ assert(stream[_state] === "errored");
+ reader[_closedPromise] = new Deferred();
+ reader[_closedPromise].reject(stream[_storedError]);
+ setPromiseIsHandledToTrue(reader[_closedPromise].promise);
+ }
+ }
+
+ /**
+ * @template R
+ * @param {ReadableStreamGenericReader<R>} reader
+ */
+ function readableStreamReaderGenericRelease(reader) {
+ assert(reader[_stream] !== undefined);
+ assert(reader[_stream][_reader] === reader);
+ if (reader[_stream][_state] === "readable") {
+ reader[_closedPromise].reject(
+ new TypeError(
+ "Reader was released and can no longer be used to monitor the stream's closedness.",
+ ),
+ );
} else {
- closedPromise.promise = Promise.reject(
- new TypeError("Reading is closed."),
+ reader[_closedPromise] = new Deferred();
+ reader[_closedPromise].reject(
+ new TypeError(
+ "Reader was released and can no longer be used to monitor the stream's closedness.",
+ ),
);
- delete closedPromise.reject;
- delete closedPromise.resolve;
}
- setPromiseIsHandledToTrue(closedPromise.promise);
- reader[sym.ownerReadableStream][sym.reader] = undefined;
- reader[sym.ownerReadableStream] = undefined;
+ setPromiseIsHandledToTrue(reader[_closedPromise].promise);
+ reader[_stream][_reader] = undefined;
+ reader[_stream] = undefined;
}
- function readableStreamTee(
- stream,
- cloneForBranch2,
- ) {
+ /**
+ * @template R
+ * @param {ReadableStream<R>} stream
+ * @param {boolean} cloneForBranch2
+ * @returns {[ReadableStream<R>, ReadableStream<R>]}
+ */
+ function readableStreamTee(stream, cloneForBranch2) {
assert(isReadableStream(stream));
assert(typeof cloneForBranch2 === "boolean");
const reader = acquireReadableStreamDefaultReader(stream);
let reading = false;
let canceled1 = false;
let canceled2 = false;
- let reason1 = undefined;
- let reason2 = undefined;
+ /** @type {any} */
+ let reason1;
+ /** @type {any} */
+ let reason2;
+ /** @type {ReadableStream<R>} */
// deno-lint-ignore prefer-const
let branch1;
+ /** @type {ReadableStream<R>} */
// deno-lint-ignore prefer-const
let branch2;
- const cancelPromise = getDeferred();
- const pullAlgorithm = () => {
- if (reading) {
- return Promise.resolve();
+
+ /** @type {Deferred<void>} */
+ const cancelPromise = new Deferred();
+
+ function pullAlgorithm() {
+ if (reading === true) {
+ return resolvePromiseWith(undefined);
}
reading = true;
- const readPromise = readableStreamDefaultReaderRead(reader).then(
- (result) => {
- reading = false;
- assert(typeof result === "object");
- const { done } = result;
- assert(typeof done === "boolean");
- if (done) {
- if (!canceled1) {
- readableStreamDefaultControllerClose(
- branch1[
- sym.readableStreamController
- ],
+ /** @type {ReadRequest<R>} */
+ const readRequest = {
+ chunkSteps(value) {
+ queueMicrotask(() => {
+ reading = false;
+ const value1 = value;
+ const value2 = value;
+
+ if (canceled1 === false) {
+ readableStreamDefaultControllerEnqueue(
+ /** @type {ReadableStreamDefaultController<any>} */ (branch1[
+ _controller
+ ]),
+ value1,
);
}
- if (!canceled2) {
- readableStreamDefaultControllerClose(
- branch2[
- sym.readableStreamController
- ],
+ if (canceled2 === false) {
+ readableStreamDefaultControllerEnqueue(
+ /** @type {ReadableStreamDefaultController<any>} */ (branch2[
+ _controller
+ ]),
+ value2,
);
}
- return;
- }
- const { value } = result;
- const value1 = value;
- let value2 = value;
- if (!canceled2 && cloneForBranch2) {
- value2 = cloneValue(value2);
- }
- if (!canceled1) {
- readableStreamDefaultControllerEnqueue(
- branch1[
- sym.readableStreamController
- ],
- value1,
+ });
+ },
+ closeSteps() {
+ reading = false;
+ if (canceled1 === false) {
+ readableStreamDefaultControllerClose(
+ /** @type {ReadableStreamDefaultController<any>} */ (branch1[
+ _controller
+ ]),
);
}
- if (!canceled2) {
- readableStreamDefaultControllerEnqueue(
- branch2[
- sym.readableStreamController
- ],
- value2,
+ if (canceled2 === false) {
+ readableStreamDefaultControllerClose(
+ /** @type {ReadableStreamDefaultController<any>} */ (branch2[
+ _controller
+ ]),
);
}
+ cancelPromise.resolve(undefined);
},
- );
- setPromiseIsHandledToTrue(readPromise);
- return Promise.resolve();
- };
- const cancel1Algorithm = (reason) => {
+ errorSteps() {
+ reading = false;
+ },
+ };
+ readableStreamDefaultReaderRead(reader, readRequest);
+ return resolvePromiseWith(undefined);
+ }
+
+ /**
+ * @param {any} reason
+ * @returns {Promise<void>}
+ */
+ function cancel1Algorithm(reason) {
canceled1 = true;
reason1 = reason;
- if (canceled2) {
+ if (canceled2 === true) {
const compositeReason = [reason1, reason2];
const cancelResult = readableStreamCancel(stream, compositeReason);
cancelPromise.resolve(cancelResult);
}
return cancelPromise.promise;
- };
- const cancel2Algorithm = (reason) => {
+ }
+
+ /**
+ * @param {any} reason
+ * @returns {Promise<void>}
+ */
+ function cancel2Algorithm(reason) {
canceled2 = true;
reason2 = reason;
- if (canceled1) {
+ if (canceled1 === true) {
const compositeReason = [reason1, reason2];
const cancelResult = readableStreamCancel(stream, compositeReason);
cancelPromise.resolve(cancelResult);
}
return cancelPromise.promise;
- };
- const startAlgorithm = () => undefined;
+ }
+
+ function startAlgorithm() {}
+
branch1 = createReadableStream(
startAlgorithm,
pullAlgorithm,
@@ -2261,44 +1669,35 @@
pullAlgorithm,
cancel2Algorithm,
);
- setPromiseIsHandledToTrue(
- reader[sym.closedPromise].promise.catch((r) => {
- readableStreamDefaultControllerError(
- branch1[
- sym.readableStreamController
- ],
- r,
- );
- readableStreamDefaultControllerError(
- branch2[
- sym.readableStreamController
- ],
- r,
- );
- }),
- );
- return [branch1, branch2];
- }
-
- function resetQueue(container) {
- assert(sym.queue in container && sym.queueTotalSize in container);
- container[sym.queue] = [];
- container[sym.queueTotalSize] = 0;
- }
- /** An internal function which mimics the behavior of setting the promise to
- * handled in JavaScript. In this situation, an assertion failure, which
- * shouldn't happen will get thrown, instead of swallowed. */
- function setPromiseIsHandledToTrue(promise) {
- promise.then(undefined, (e) => {
- if (e && e instanceof AssertionError) {
- queueMicrotask(() => {
- throw e;
- });
- }
+ uponRejection(reader[_closedPromise].promise, (r) => {
+ readableStreamDefaultControllerError(
+ /** @type {ReadableStreamDefaultController<any>} */ (branch1[
+ _controller
+ ]),
+ r,
+ );
+ readableStreamDefaultControllerError(
+ /** @type {ReadableStreamDefaultController<any>} */ (branch2[
+ _controller
+ ]),
+ r,
+ );
+ cancelPromise.resolve(undefined);
});
+
+ return [branch1, branch2];
}
+ /**
+ * @param {ReadableStream<ArrayBuffer>} stream
+ * @param {ReadableByteStreamController} controller
+ * @param {() => void} startAlgorithm
+ * @param {() => Promise<void>} pullAlgorithm
+ * @param {(reason: any) => Promise<void>} cancelAlgorithm
+ * @param {number} highWaterMark
+ * @param {number | undefined} autoAllocateChunkSize
+ */
function setUpReadableByteStreamController(
stream,
controller,
@@ -2308,33 +1707,30 @@
highWaterMark,
autoAllocateChunkSize,
) {
- assert(stream[sym.readableStreamController] === undefined);
+ assert(stream[_controller] === undefined);
if (autoAllocateChunkSize !== undefined) {
assert(Number.isInteger(autoAllocateChunkSize));
assert(autoAllocateChunkSize >= 0);
}
- controller[sym.controlledReadableByteStream] = stream;
- controller[sym.pulling] = controller[sym.pullAgain] = false;
- controller[sym.byobRequest] = undefined;
- controller[sym.queue] = [];
- controller[sym.queueTotalSize] = 0;
- controller[sym.closeRequested] = controller[sym.started] = false;
- controller[sym.strategyHWM] = validateAndNormalizeHighWaterMark(
- highWaterMark,
- );
- controller[sym.pullAlgorithm] = pullAlgorithm;
- controller[sym.cancelAlgorithm] = cancelAlgorithm;
- controller[sym.autoAllocateChunkSize] = autoAllocateChunkSize;
- // 3.13.26.12 Set controller.[[pendingPullIntos]] to a new empty List.
- stream[sym.readableStreamController] = controller;
+ controller[_stream] = stream;
+ controller[_pullAgain] = controller[_pulling] = false;
+ controller[_byobRequest] = undefined;
+ resetQueue(controller);
+ controller[_closeRequested] = controller[_started] = false;
+ controller[_strategyHWM] = highWaterMark;
+ controller[_pullAlgorithm] = pullAlgorithm;
+ controller[_cancelAlgorithm] = cancelAlgorithm;
+ controller[_autoAllocateChunkSize] = autoAllocateChunkSize;
+ // 12. Set controller.[[pendingPullIntos]] to a new empty list.
+ stream[_controller] = controller;
const startResult = startAlgorithm();
- const startPromise = Promise.resolve(startResult);
+ const startPromise = resolvePromiseWith(startResult);
setPromiseIsHandledToTrue(
startPromise.then(
() => {
- controller[sym.started] = true;
- assert(!controller[sym.pulling]);
- assert(!controller[sym.pullAgain]);
+ controller[_started] = true;
+ assert(controller[_pulling] === false);
+ assert(controller[_pullAgain] === false);
readableByteStreamControllerCallPullIfNeeded(controller);
},
(r) => {
@@ -2344,32 +1740,39 @@
);
}
+ /**
+ * @param {ReadableStream<ArrayBuffer>} stream
+ * @param {UnderlyingSource<ArrayBuffer>} underlyingSource
+ * @param {UnderlyingSource<ArrayBuffer>} underlyingSourceDict
+ * @param {number} highWaterMark
+ */
function setUpReadableByteStreamControllerFromUnderlyingSource(
stream,
- underlyingByteSource,
+ underlyingSource,
+ underlyingSourceDict,
highWaterMark,
) {
- assert(underlyingByteSource);
- const controller = Object.create(
- ReadableByteStreamController.prototype,
- );
- const startAlgorithm = () => {
- return invokeOrNoop(underlyingByteSource, "start", controller);
- };
- const pullAlgorithm = createAlgorithmFromUnderlyingMethod(
- underlyingByteSource,
- "pull",
- 0,
- controller,
- );
- setFunctionName(pullAlgorithm, "[[pullAlgorithm]]");
- const cancelAlgorithm = createAlgorithmFromUnderlyingMethod(
- underlyingByteSource,
- "cancel",
- 1,
- );
- setFunctionName(cancelAlgorithm, "[[cancelAlgorithm]]");
+ const controller = new ReadableByteStreamController();
+ /** @type {() => void} */
+ let startAlgorithm = () => undefined;
+ /** @type {() => Promise<void>} */
+ let pullAlgorithm = () => resolvePromiseWith(undefined);
+ /** @type {(reason: any) => Promise<void>} */
+ let cancelAlgorithm = (_reason) => resolvePromiseWith(undefined);
+ if ("start" in underlyingSourceDict) {
+ startAlgorithm = () =>
+ underlyingSourceDict.start.call(underlyingSource, controller);
+ }
+ if ("pull" in underlyingSourceDict) {
+ pullAlgorithm = () =>
+ underlyingSourceDict.pull.call(underlyingSource, controller);
+ }
+ if ("cancel" in underlyingSourceDict) {
+ cancelAlgorithm = (reason) =>
+ underlyingSourceDict.cancel.call(underlyingSource, reason);
+ }
// 3.13.27.6 Let autoAllocateChunkSize be ? GetV(underlyingByteSource, "autoAllocateChunkSize").
+ /** @type {undefined} */
const autoAllocateChunkSize = undefined;
setUpReadableByteStreamController(
stream,
@@ -2382,6 +1785,16 @@
);
}
+ /**
+ * @template R
+ * @param {ReadableStream<R>} stream
+ * @param {ReadableStreamDefaultController<R>} controller
+ * @param {(controller: ReadableStreamDefaultController<R>) => void | Promise<void>} startAlgorithm
+ * @param {(controller: ReadableStreamDefaultController<R>) => Promise<void>} pullAlgorithm
+ * @param {(reason: any) => Promise<void>} cancelAlgorithm
+ * @param {number} highWaterMark
+ * @param {(chunk: R) => number} sizeAlgorithm
+ */
function setUpReadableStreamDefaultController(
stream,
controller,
@@ -2391,60 +1804,62 @@
highWaterMark,
sizeAlgorithm,
) {
- assert(stream[sym.readableStreamController] === undefined);
- controller[sym.controlledReadableStream] = stream;
- controller[sym.queue] = [];
- controller[sym.queueTotalSize] = 0;
- controller[sym.started] = controller[sym.closeRequested] = controller[
- sym.pullAgain
- ] = controller[sym.pulling] = false;
- controller[sym.strategySizeAlgorithm] = sizeAlgorithm;
- controller[sym.strategyHWM] = highWaterMark;
- controller[sym.pullAlgorithm] = pullAlgorithm;
- controller[sym.cancelAlgorithm] = cancelAlgorithm;
- stream[sym.readableStreamController] = controller;
- const startResult = startAlgorithm();
- const startPromise = Promise.resolve(startResult);
- setPromiseIsHandledToTrue(
- startPromise.then(
- () => {
- controller[sym.started] = true;
- assert(controller[sym.pulling] === false);
- assert(controller[sym.pullAgain] === false);
- readableStreamDefaultControllerCallPullIfNeeded(controller);
- },
- (r) => {
- readableStreamDefaultControllerError(controller, r);
- },
- ),
- );
+ assert(stream[_controller] === undefined);
+ controller[_stream] = stream;
+ resetQueue(controller);
+ controller[_started] = controller[_closeRequested] =
+ controller[_pullAgain] = controller[_pulling] = false;
+ controller[_strategySizeAlgorithm] = sizeAlgorithm;
+ controller[_strategyHWM] = highWaterMark;
+ controller[_pullAlgorithm] = pullAlgorithm;
+ controller[_cancelAlgorithm] = cancelAlgorithm;
+ stream[_controller] = controller;
+ const startResult = startAlgorithm(controller);
+ const startPromise = resolvePromiseWith(startResult);
+ uponPromise(startPromise, () => {
+ controller[_started] = true;
+ assert(controller[_pulling] === false);
+ assert(controller[_pullAgain] === false);
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+ }, (r) => {
+ readableStreamDefaultControllerError(controller, r);
+ });
}
+ /**
+ * @template R
+ * @param {ReadableStream<R>} stream
+ * @param {UnderlyingSource<R>} underlyingSource
+ * @param {UnderlyingSource<R>} underlyingSourceDict
+ * @param {number} highWaterMark
+ * @param {(chunk: R) => number} sizeAlgorithm
+ */
function setUpReadableStreamDefaultControllerFromUnderlyingSource(
stream,
underlyingSource,
+ underlyingSourceDict,
highWaterMark,
sizeAlgorithm,
) {
- assert(underlyingSource);
- const controller = Object.create(
- ReadableStreamDefaultController.prototype,
- );
- const startAlgorithm = () =>
- invokeOrNoop(underlyingSource, "start", controller);
- const pullAlgorithm = createAlgorithmFromUnderlyingMethod(
- underlyingSource,
- "pull",
- 0,
- controller,
- );
- setFunctionName(pullAlgorithm, "[[pullAlgorithm]]");
- const cancelAlgorithm = createAlgorithmFromUnderlyingMethod(
- underlyingSource,
- "cancel",
- 1,
- );
- setFunctionName(cancelAlgorithm, "[[cancelAlgorithm]]");
+ const controller = new ReadableStreamDefaultController();
+ /** @type {(controller: ReadableStreamDefaultController<R>) => Promise<void>} */
+ let startAlgorithm = () => undefined;
+ /** @type {(controller: ReadableStreamDefaultController<R>) => Promise<void>} */
+ let pullAlgorithm = () => resolvePromiseWith(undefined);
+ /** @type {(reason?: any) => Promise<void>} */
+ let cancelAlgorithm = () => resolvePromiseWith(undefined);
+ if ("start" in underlyingSourceDict) {
+ startAlgorithm = () =>
+ underlyingSourceDict.start.call(underlyingSource, controller);
+ }
+ if ("pull" in underlyingSourceDict) {
+ pullAlgorithm = () =>
+ underlyingSourceDict.pull.call(underlyingSource, controller);
+ }
+ if ("cancel" in underlyingSourceDict) {
+ cancelAlgorithm = (reason) =>
+ underlyingSourceDict.cancel.call(underlyingSource, reason);
+ }
setUpReadableStreamDefaultController(
stream,
controller,
@@ -2456,55 +1871,73 @@
);
}
+ /**
+ * @template R
+ * @param {ReadableStreamDefaultReader<R>} reader
+ * @param {ReadableStream<R>} stream
+ */
+ function setUpReadableStreamDefaultReader(reader, stream) {
+ if (isReadableStreamLocked(stream)) {
+ throw new TypeError("ReadableStream is locked.");
+ }
+ readableStreamReaderGenericInitialize(reader, stream);
+ reader[_readRequests] = [];
+ }
+
+ /**
+ * @template O
+ * @param {TransformStream<any, O>} stream
+ * @param {TransformStreamDefaultController<O>} controller
+ * @param {(chunk: O, controller: TransformStreamDefaultController<O>) => Promise<void>} transformAlgorithm
+ * @param {(controller: TransformStreamDefaultController<O>) => Promise<void>} flushAlgorithm
+ */
function setUpTransformStreamDefaultController(
stream,
controller,
transformAlgorithm,
flushAlgorithm,
) {
- assert(isTransformStream(stream));
- assert(stream[sym.transformStreamController] === undefined);
- controller[sym.controlledTransformStream] = stream;
- stream[sym.transformStreamController] = controller;
- controller[sym.transformAlgorithm] = transformAlgorithm;
- controller[sym.flushAlgorithm] = flushAlgorithm;
- }
-
+ assert(stream instanceof TransformStream);
+ assert(stream[_controller] === undefined);
+ controller[_stream] = stream;
+ stream[_controller] = controller;
+ controller[_transformAlgorithm] = transformAlgorithm;
+ controller[_flushAlgorithm] = flushAlgorithm;
+ }
+
+ /**
+ * @template I
+ * @template O
+ * @param {TransformStream<I, O>} stream
+ * @param {Transformer<I, O>} transformer
+ * @param {Transformer<I, O>} transformerDict
+ */
function setUpTransformStreamDefaultControllerFromTransformer(
stream,
transformer,
+ transformerDict,
) {
- assert(transformer);
- const controller = Object.create(
- TransformStreamDefaultController.prototype,
- );
+ /** @type {TransformStreamDefaultController<O>} */
+ const controller = new TransformStreamDefaultController();
+ /** @type {(chunk: O, controller: TransformStreamDefaultController<O>) => Promise<void>} */
let transformAlgorithm = (chunk) => {
try {
- transformStreamDefaultControllerEnqueue(
- controller,
- // it defaults to no transformation, so I is assumed to be O
- chunk,
- );
+ transformStreamDefaultControllerEnqueue(controller, chunk);
} catch (e) {
return Promise.reject(e);
}
- return Promise.resolve();
+ return resolvePromiseWith(undefined);
};
- const transformMethod = transformer.transform;
- if (transformMethod) {
- if (typeof transformMethod !== "function") {
- throw new TypeError("tranformer.transform must be callable.");
- }
- // deno-lint-ignore require-await
- transformAlgorithm = async (chunk) =>
- call(transformMethod, transformer, [chunk, controller]);
- }
- const flushAlgorithm = createAlgorithmFromUnderlyingMethod(
- transformer,
- "flush",
- 0,
- controller,
- );
+ /** @type {(controller: TransformStreamDefaultController<O>) => Promise<void>} */
+ let flushAlgorithm = () => resolvePromiseWith(undefined);
+ if ("transform" in transformerDict) {
+ transformAlgorithm = (chunk, controller) =>
+ transformerDict.transform.call(transformer, chunk, controller);
+ }
+ if ("flush" in transformerDict) {
+ flushAlgorithm = (controller) =>
+ transformerDict.flush.call(transformer, controller);
+ }
setUpTransformStreamDefaultController(
stream,
controller,
@@ -2513,6 +1946,17 @@
);
}
+ /**
+ * @template W
+ * @param {WritableStream<W>} stream
+ * @param {WritableStreamDefaultController<W>} controller
+ * @param {(controller: WritableStreamDefaultController<W>) => Promise<void>} startAlgorithm
+ * @param {(chunk: W, controller: WritableStreamDefaultController<W>) => Promise<void>} writeAlgorithm
+ * @param {() => Promise<void>} closeAlgorithm
+ * @param {(reason?: any) => Promise<void>} abortAlgorithm
+ * @param {number} highWaterMark
+ * @param {(chunk: W) => number} sizeAlgorithm
+ */
function setUpWritableStreamDefaultController(
stream,
controller,
@@ -2524,77 +1968,70 @@
sizeAlgorithm,
) {
assert(isWritableStream(stream));
- assert(stream[sym.writableStreamController] === undefined);
- controller[sym.controlledWritableStream] = stream;
- stream[sym.writableStreamController] = controller;
- controller[sym.queue] = [];
- controller[sym.queueTotalSize] = 0;
- controller[sym.started] = false;
- controller[sym.strategySizeAlgorithm] = sizeAlgorithm;
- controller[sym.strategyHWM] = highWaterMark;
- controller[sym.writeAlgorithm] = writeAlgorithm;
- controller[sym.closeAlgorithm] = closeAlgorithm;
- controller[sym.abortAlgorithm] = abortAlgorithm;
+ assert(stream[_controller] === undefined);
+ controller[_stream] = stream;
+ stream[_controller] = controller;
+ resetQueue(controller);
+ controller[_started] = false;
+ controller[_strategySizeAlgorithm] = sizeAlgorithm;
+ controller[_strategyHWM] = highWaterMark;
+ controller[_writeAlgorithm] = writeAlgorithm;
+ controller[_closeAlgorithm] = closeAlgorithm;
+ controller[_abortAlgorithm] = abortAlgorithm;
const backpressure = writableStreamDefaultControllerGetBackpressure(
controller,
);
writableStreamUpdateBackpressure(stream, backpressure);
- const startResult = startAlgorithm();
- const startPromise = Promise.resolve(startResult);
- setPromiseIsHandledToTrue(
- startPromise.then(
- () => {
- assert(
- stream[sym.state] === "writable" ||
- stream[sym.state] === "erroring",
- );
- controller[sym.started] = true;
- writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
- },
- (r) => {
- assert(
- stream[sym.state] === "writable" ||
- stream[sym.state] === "erroring",
- );
- controller[sym.started] = true;
- writableStreamDealWithRejection(stream, r);
- },
- ),
- );
+ const startResult = startAlgorithm(controller);
+ const startPromise = resolvePromiseWith(startResult);
+ uponPromise(startPromise, () => {
+ assert(stream[_state] === "writable" || stream[_state] === "erroring");
+ controller[_started] = true;
+ writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+ }, (r) => {
+ assert(stream[_state] === "writable" || stream[_state] === "erroring");
+ controller[_started] = true;
+ writableStreamDealWithRejection(stream, r);
+ });
}
+ /**
+ * @template W
+ * @param {WritableStream<W>} stream
+ * @param {UnderlyingSink<W>} underlyingSink
+ * @param {UnderlyingSink<W>} underlyingSinkDict
+ * @param {number} highWaterMark
+ * @param {(chunk: W) => number} sizeAlgorithm
+ */
function setUpWritableStreamDefaultControllerFromUnderlyingSink(
stream,
underlyingSink,
+ underlyingSinkDict,
highWaterMark,
sizeAlgorithm,
) {
- assert(underlyingSink);
- const controller = Object.create(
- WritableStreamDefaultController.prototype,
- );
- const startAlgorithm = () => {
- return invokeOrNoop(underlyingSink, "start", controller);
- };
- const writeAlgorithm = createAlgorithmFromUnderlyingMethod(
- underlyingSink,
- "write",
- 1,
- controller,
- );
- setFunctionName(writeAlgorithm, "[[writeAlgorithm]]");
- const closeAlgorithm = createAlgorithmFromUnderlyingMethod(
- underlyingSink,
- "close",
- 0,
- );
- setFunctionName(closeAlgorithm, "[[closeAlgorithm]]");
- const abortAlgorithm = createAlgorithmFromUnderlyingMethod(
- underlyingSink,
- "abort",
- 1,
- );
- setFunctionName(abortAlgorithm, "[[abortAlgorithm]]");
+ const controller = new WritableStreamDefaultController();
+ let startAlgorithm = () => undefined;
+ /** @type {(chunk: W) => Promise<void>} */
+ let writeAlgorithm = () => resolvePromiseWith(undefined);
+ let closeAlgorithm = () => resolvePromiseWith(undefined);
+ /** @type {(reason?: any) => Promise<void>} */
+ let abortAlgorithm = () => resolvePromiseWith(undefined);
+ if ("start" in underlyingSinkDict) {
+ startAlgorithm = () =>
+ underlyingSinkDict.start.call(underlyingSink, controller);
+ }
+ if ("write" in underlyingSinkDict) {
+ writeAlgorithm = (chunk) =>
+ underlyingSinkDict.write.call(underlyingSink, chunk, controller);
+ }
+ if ("close" in underlyingSinkDict) {
+ closeAlgorithm = () => underlyingSinkDict.close.call(underlyingSink);
+ }
+ if ("abort" in underlyingSinkDict) {
+ abortAlgorithm = (reason) =>
+ underlyingSinkDict.abort.call(underlyingSink, reason);
+ }
setUpWritableStreamDefaultController(
stream,
controller,
@@ -2607,108 +2044,175 @@
);
}
- function transformStreamDefaultControllerClearAlgorithms(
- controller,
- ) {
- controller[sym.transformAlgorithm] = undefined;
- controller[sym.flushAlgorithm] = undefined;
- }
-
- function transformStreamDefaultControllerEnqueue(
- controller,
- chunk,
- ) {
- const stream = controller[sym.controlledTransformStream];
- const readableController = stream[sym.readable][
- sym.readableStreamController
- ];
- if (!readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) {
- throw new TypeError(
- "TransformStream's readable controller cannot be closed or enqueued.",
- );
+ /**
+ * @template W
+ * @param {WritableStreamDefaultWriter<W>} writer
+ * @param {WritableStream<W>} stream
+ */
+ function setUpWritableStreamDefaultWriter(writer, stream) {
+ if (isWritableStreamLocked(stream) === true) {
+ throw new TypeError("The stream is already locked.");
+ }
+ writer[_stream] = stream;
+ stream[_writer] = writer;
+ const state = stream[_state];
+ if (state === "writable") {
+ if (
+ writableStreamCloseQueuedOrInFlight(stream) === false &&
+ stream[_backpressure] === true
+ ) {
+ writer[_readyPromise] = new Deferred();
+ } else {
+ writer[_readyPromise] = new Deferred();
+ writer[_readyPromise].resolve(undefined);
+ }
+ writer[_closedPromise] = new Deferred();
+ } else if (state === "erroring") {
+ writer[_readyPromise] = new Deferred();
+ writer[_readyPromise].reject(stream[_storedError]);
+ setPromiseIsHandledToTrue(writer[_readyPromise].promise);
+ writer[_closedPromise] = new Deferred();
+ } else if (state === "closed") {
+ writer[_readyPromise] = new Deferred();
+ writer[_readyPromise].resolve(undefined);
+ writer[_closedPromise] = new Deferred();
+ writer[_closedPromise].resolve(undefined);
+ } else {
+ assert(state === "errored");
+ const storedError = stream[_storedError];
+ writer[_readyPromise] = new Deferred();
+ writer[_readyPromise].reject(storedError);
+ setPromiseIsHandledToTrue(writer[_readyPromise].promise);
+ writer[_closedPromise] = new Deferred();
+ writer[_closedPromise].reject(storedError);
+ setPromiseIsHandledToTrue(writer[_closedPromise].promise);
+ }
+ }
+
+ /** @param {TransformStreamDefaultController} controller */
+ function transformStreamDefaultControllerClearAlgorithms(controller) {
+ controller[_transformAlgorithm] = undefined;
+ controller[_flushAlgorithm] = undefined;
+ }
+
+ /**
+ * @template O
+ * @param {TransformStreamDefaultController<O>} controller
+ * @param {O} chunk
+ */
+ function transformStreamDefaultControllerEnqueue(controller, chunk) {
+ const stream = controller[_stream];
+ const readableController = stream[_readable][_controller];
+ if (
+ readableStreamDefaultControllerCanCloseOrEnqueue(
+ /** @type {ReadableStreamDefaultController<O>} */ (readableController),
+ ) === false
+ ) {
+ throw new TypeError("Readable stream is unavailable.");
}
try {
- readableStreamDefaultControllerEnqueue(readableController, chunk);
+ readableStreamDefaultControllerEnqueue(
+ /** @type {ReadableStreamDefaultController<O>} */ (readableController),
+ chunk,
+ );
} catch (e) {
transformStreamErrorWritableAndUnblockWrite(stream, e);
- throw stream[sym.readable][sym.storedError];
+ throw stream[_readable][_storedError];
}
- const backpressure = readableStreamDefaultControllerHasBackpressure(
- readableController,
+ const backpressure = readableStreamDefaultcontrollerHasBackpressure(
+ /** @type {ReadableStreamDefaultController<O>} */ (readableController),
);
- if (backpressure) {
+ if (backpressure !== stream[_backpressure]) {
+ assert(backpressure === true);
transformStreamSetBackpressure(stream, true);
}
}
- function transformStreamDefaultControllerError(
- controller,
- e,
- ) {
- transformStreamError(controller[sym.controlledTransformStream], e);
+ /**
+ * @param {TransformStreamDefaultController} controller
+ * @param {any=} e
+ */
+ function transformStreamDefaultControllerError(controller, e) {
+ transformStreamError(controller[_stream], e);
}
- function transformStreamDefaultControllerPerformTransform(
- controller,
- chunk,
- ) {
- const transformPromise = controller[sym.transformAlgorithm](chunk);
- return transformPromise.then(undefined, (r) => {
- transformStreamError(controller[sym.controlledTransformStream], r);
+ /**
+ * @template O
+ * @param {TransformStreamDefaultController<O>} controller
+ * @param {any} chunk
+ * @returns {Promise<void>}
+ */
+ function transformStreamDefaultControllerPerformTransform(controller, chunk) {
+ const transformPromise = controller[_transformAlgorithm](chunk, controller);
+ return transformPromiseWith(transformPromise, undefined, (r) => {
+ transformStreamError(controller[_stream], r);
throw r;
});
}
- function transformStreamDefaultSinkAbortAlgorithm(
- stream,
- reason,
- ) {
- transformStreamError(stream, reason);
- return Promise.resolve(undefined);
+ /** @param {TransformStreamDefaultController} controller */
+ function transformStreamDefaultControllerTerminate(controller) {
+ const stream = controller[_stream];
+ const readableController = stream[_readable][_controller];
+ readableStreamDefaultControllerClose(
+ /** @type {ReadableStreamDefaultController} */ (readableController),
+ );
+ const error = new TypeError("The stream has been terminated.");
+ transformStreamErrorWritableAndUnblockWrite(stream, error);
}
- function transformStreamDefaultSinkCloseAlgorithm(
- stream,
- ) {
- const readable = stream[sym.readable];
- const controller = stream[sym.transformStreamController];
- const flushPromise = controller[sym.flushAlgorithm]();
+ /**
+ * @param {TransformStream} stream
+ * @param {any=} reason
+ * @returns {Promise<void>}
+ */
+ function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
+ transformStreamError(stream, reason);
+ return resolvePromiseWith(undefined);
+ }
+
+ /**
+ * @template I
+ * @template O
+ * @param {TransformStream<I, O>} stream
+ * @returns {Promise<void>}
+ */
+ function transformStreamDefaultSinkCloseAlgorithm(stream) {
+ const readable = stream[_readable];
+ const controller = stream[_controller];
+ const flushPromise = controller[_flushAlgorithm](controller);
transformStreamDefaultControllerClearAlgorithms(controller);
- return flushPromise.then(
- () => {
- if (readable[sym.state] === "errored") {
- throw readable[sym.storedError];
- }
- const readableController = readable[
- sym.readableStreamController
- ];
- if (
- readableStreamDefaultControllerCanCloseOrEnqueue(readableController)
- ) {
- readableStreamDefaultControllerClose(readableController);
- }
- },
- (r) => {
- transformStreamError(stream, r);
- throw readable[sym.storedError];
- },
- );
+ return transformPromiseWith(flushPromise, () => {
+ if (readable[_state] === "errored") {
+ throw readable[_storedError];
+ }
+ readableStreamDefaultControllerClose(
+ /** @type {ReadableStreamDefaultController} */ (readable[_controller]),
+ );
+ }, (r) => {
+ transformStreamError(stream, r);
+ throw readable[_storedError];
+ });
}
- function transformStreamDefaultSinkWriteAlgorithm(
- stream,
- chunk,
- ) {
- assert(stream[sym.writable][sym.state] === "writable");
- const controller = stream[sym.transformStreamController];
- if (stream[sym.backpressure]) {
- const backpressureChangePromise = stream[sym.backpressureChangePromise];
- assert(backpressureChangePromise);
- return backpressureChangePromise.promise.then(() => {
- const writable = stream[sym.writable];
- const state = writable[sym.state];
+ /**
+ * @template I
+ * @template O
+ * @param {TransformStream<I, O>} stream
+ * @param {I} chunk
+ * @returns {Promise<void>}
+ */
+ function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
+ assert(stream[_writable][_state] === "writable");
+ const controller = stream[_controller];
+ if (stream[_backpressure] === true) {
+ const backpressureChangePromise = stream[_backpressureChangePromise];
+ assert(backpressureChangePromise !== undefined);
+ return transformPromiseWith(backpressureChangePromise.promise, () => {
+ const writable = stream[_writable];
+ const state = writable[_state];
if (state === "erroring") {
- throw writable[sym.storedError];
+ throw writable[_storedError];
}
assert(state === "writable");
return transformStreamDefaultControllerPerformTransform(
@@ -2720,104 +2224,71 @@
return transformStreamDefaultControllerPerformTransform(controller, chunk);
}
- function transformStreamDefaultSourcePullAlgorithm(
- stream,
- ) {
- assert(stream[sym.backpressure] === true);
- assert(stream[sym.backpressureChangePromise] !== undefined);
+ /**
+ * @param {TransformStream} stream
+ * @returns {Promise<void>}
+ */
+ function transformStreamDefaultSourcePullAlgorithm(stream) {
+ assert(stream[_backpressure] === true);
+ assert(stream[_backpressureChangePromise] !== undefined);
transformStreamSetBackpressure(stream, false);
- return stream[sym.backpressureChangePromise].promise;
+ return stream[_backpressureChangePromise].promise;
}
- function transformStreamError(
- stream,
- e,
- ) {
+ /**
+ * @param {TransformStream} stream
+ * @param {any=} e
+ */
+ function transformStreamError(stream, e) {
readableStreamDefaultControllerError(
- stream[sym.readable][
- sym.readableStreamController
- ],
+ /** @type {ReadableStreamDefaultController} */ (stream[_readable][
+ _controller
+ ]),
e,
);
transformStreamErrorWritableAndUnblockWrite(stream, e);
}
- function transformStreamDefaultControllerTerminate(
- controller,
- ) {
- const stream = controller[sym.controlledTransformStream];
- const readableController = stream[sym.readable][
- sym.readableStreamController
- ];
- readableStreamDefaultControllerClose(readableController);
- const error = new TypeError("TransformStream is closed.");
- transformStreamErrorWritableAndUnblockWrite(stream, error);
- }
-
- function transformStreamErrorWritableAndUnblockWrite(
- stream,
- e,
- ) {
- transformStreamDefaultControllerClearAlgorithms(
- stream[sym.transformStreamController],
- );
+ /**
+ * @param {TransformStream} stream
+ * @param {any=} e
+ */
+ function transformStreamErrorWritableAndUnblockWrite(stream, e) {
+ transformStreamDefaultControllerClearAlgorithms(stream[_controller]);
writableStreamDefaultControllerErrorIfNeeded(
- stream[sym.writable][sym.writableStreamController],
+ stream[_writable][_controller],
e,
);
- if (stream[sym.backpressure]) {
+ if (stream[_backpressure] === true) {
transformStreamSetBackpressure(stream, false);
}
}
- function transformStreamSetBackpressure(
- stream,
- backpressure,
- ) {
- assert(stream[sym.backpressure] !== backpressure);
- if (stream[sym.backpressureChangePromise] !== undefined) {
- stream[sym.backpressureChangePromise].resolve(undefined);
- }
- stream[sym.backpressureChangePromise] = getDeferred();
- stream[sym.backpressure] = backpressure;
- }
-
- function transferArrayBuffer(buffer) {
- assert(!isDetachedBuffer(buffer));
- const transferredIshVersion = buffer.slice(0);
-
- Object.defineProperty(buffer, "byteLength", {
- get() {
- return 0;
- },
- });
- buffer[sym.isFakeDetached] = true;
-
- return transferredIshVersion;
- }
-
- function validateAndNormalizeHighWaterMark(
- highWaterMark,
- ) {
- highWaterMark = Number(highWaterMark);
- if (Number.isNaN(highWaterMark) || highWaterMark < 0) {
- throw new RangeError(
- `highWaterMark must be a positive number or Infinity. Received: ${highWaterMark}.`,
- );
+ /**
+ * @param {TransformStream} stream
+ * @param {boolean} backpressure
+ */
+ function transformStreamSetBackpressure(stream, backpressure) {
+ assert(stream[_backpressure] !== backpressure);
+ if (stream[_backpressureChangePromise] !== undefined) {
+ stream[_backpressureChangePromise].resolve(undefined);
}
- return highWaterMark;
+ stream[_backpressureChangePromise] = new Deferred();
+ stream[_backpressure] = backpressure;
}
- function writableStreamAbort(
- stream,
- reason,
- ) {
- const state = stream[sym.state];
+ /**
+ * @param {WritableStream} stream
+ * @param {any=} reason
+ * @returns {Promise<void>}
+ */
+ function writableStreamAbort(stream, reason) {
+ const state = stream[_state];
if (state === "closed" || state === "errored") {
- return Promise.resolve(undefined);
+ return resolvePromiseWith(undefined);
}
- if (stream[sym.pendingAbortRequest]) {
- return stream[sym.pendingAbortRequest].promise.promise;
+ if (stream[_pendingAbortRequest] !== undefined) {
+ return stream[_pendingAbortRequest].deferred.promise;
}
assert(state === "writable" || state === "erroring");
let wasAlreadyErroring = false;
@@ -2825,63 +2296,79 @@
wasAlreadyErroring = true;
reason = undefined;
}
- const promise = getDeferred();
- stream[sym.pendingAbortRequest] = { promise, reason, wasAlreadyErroring };
-
+ /** Deferred<void> */
+ const deferred = new Deferred();
+ stream[_pendingAbortRequest] = {
+ deferred,
+ reason,
+ wasAlreadyErroring,
+ };
if (wasAlreadyErroring === false) {
writableStreamStartErroring(stream, reason);
}
- return promise.promise;
+ return deferred.promise;
}
- function writableStreamAddWriteRequest(
- stream,
- ) {
- assert(isWritableStream(stream));
- assert(stream[sym.state] === "writable");
- const promise = getDeferred();
- stream[sym.writeRequests].push(promise);
- return promise.promise;
+ /**
+ * @param {WritableStream} stream
+ * @returns {Promise<void>}
+ */
+ function writableStreamAddWriteRequest(stream) {
+ assert(isWritableStreamLocked(stream) === true);
+ assert(stream[_state] === "writable");
+ /** @type {Deferred<void>} */
+ const deferred = new Deferred();
+ stream[_writeRequests].push(deferred);
+ return deferred.promise;
}
- function writableStreamClose(
- stream,
- ) {
- const state = stream[sym.state];
+ /**
+ * @param {WritableStream} stream
+ * @returns {Promise<void>}
+ */
+ function writableStreamClose(stream) {
+ const state = stream[_state];
if (state === "closed" || state === "errored") {
return Promise.reject(
- new TypeError(
- "Cannot close an already closed or errored WritableStream.",
- ),
+ new TypeError("Writable stream is closed or errored."),
);
}
- assert(!writableStreamCloseQueuedOrInFlight(stream));
- const promise = getDeferred();
- stream[sym.closeRequest] = promise;
- const writer = stream[sym.writer];
- if (writer && stream[sym.backpressure] && state === "writable") {
- writer[sym.readyPromise].resolve();
- writer[sym.readyPromise].resolve = undefined;
- writer[sym.readyPromise].reject = undefined;
+ assert(state === "writable" || state === "erroring");
+ assert(writableStreamCloseQueuedOrInFlight(stream) === false);
+ /** @type {Deferred<void>} */
+ const deferred = new Deferred();
+ stream[_closeRequest] = deferred;
+ const writer = stream[_writer];
+ if (
+ writer !== undefined && stream[_backpressure] === true &&
+ state === "writable"
+ ) {
+ writer[_readyPromise].resolve(undefined);
}
- writableStreamDefaultControllerClose(stream[sym.writableStreamController]);
- return promise.promise;
+ writableStreamDefaultControllerClose(stream[_controller]);
+ return deferred.promise;
}
- function writableStreamCloseQueuedOrInFlight(
- stream,
- ) {
- return !(
- stream[sym.closeRequest] === undefined &&
- stream[sym.inFlightCloseRequest] === undefined
- );
+ /**
+ * @param {WritableStream} stream
+ * @returns {boolean}
+ */
+ function writableStreamCloseQueuedOrInFlight(stream) {
+ if (
+ stream[_closeRequest] === undefined &&
+ stream[_inFlightCloseRequest] === undefined
+ ) {
+ return false;
+ }
+ return true;
}
- function writableStreamDealWithRejection(
- stream,
- error,
- ) {
- const state = stream[sym.state];
+ /**
+ * @param {WritableStream} stream
+ * @param {any=} error
+ */
+ function writableStreamDealWithRejection(stream, error) {
+ const state = stream[_state];
if (state === "writable") {
writableStreamStartErroring(stream, error);
return;
@@ -2890,172 +2377,169 @@
writableStreamFinishErroring(stream);
}
- function writableStreamDefaultControllerAdvanceQueueIfNeeded(
- controller,
- ) {
- const stream = controller[sym.controlledWritableStream];
- if (!controller[sym.started]) {
+ /**
+ * @template W
+ * @param {WritableStreamDefaultController<W>} controller
+ */
+ function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller) {
+ const stream = controller[_stream];
+ if (controller[_started] === false) {
return;
}
- if (stream[sym.inFlightWriteRequest]) {
+ if (stream[_inFlightWriteRequest] !== undefined) {
return;
}
- const state = stream[sym.state];
+ const state = stream[_state];
assert(state !== "closed" && state !== "errored");
if (state === "erroring") {
writableStreamFinishErroring(stream);
return;
}
- if (!controller[sym.queue].length) {
+ if (controller[_queue].length === 0) {
return;
}
- const writeRecord = peekQueueValue(controller);
- if (writeRecord === "close") {
+ const value = peekQueueValue(controller);
+ if (value === _close) {
writableStreamDefaultControllerProcessClose(controller);
} else {
- writableStreamDefaultControllerProcessWrite(
- controller,
- writeRecord.chunk,
- );
+ writableStreamDefaultControllerProcessWrite(controller, value);
}
}
- function writableStreamDefaultControllerClearAlgorithms(
- controller,
- ) {
- controller[sym.writeAlgorithm] = undefined;
- controller[sym.closeAlgorithm] = undefined;
- controller[sym.abortAlgorithm] = undefined;
- controller[sym.strategySizeAlgorithm] = undefined;
+ function writableStreamDefaultControllerClearAlgorithms(controller) {
+ controller[_writeAlgorithm] = undefined;
+ controller[_closeAlgorithm] = undefined;
+ controller[_abortAlgorithm] = undefined;
+ controller[_strategySizeAlgorithm] = undefined;
}
- function writableStreamDefaultControllerClose(
- controller,
- ) {
- enqueueValueWithSize(controller, "close", 0);
+ /** @param {WritableStreamDefaultController} controller */
+ function writableStreamDefaultControllerClose(controller) {
+ enqueueValueWithSize(controller, _close, 0);
writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
}
- function writableStreamDefaultControllerError(
- controller,
- error,
- ) {
- const stream = controller[sym.controlledWritableStream];
- assert(stream[sym.state] === "writable");
+ /**
+ * @param {WritableStreamDefaultController} controller
+ * @param {any} error
+ */
+ function writableStreamDefaultControllerError(controller, error) {
+ const stream = controller[_stream];
+ assert(stream[_state] === "writable");
writableStreamDefaultControllerClearAlgorithms(controller);
writableStreamStartErroring(stream, error);
}
- function writableStreamDefaultControllerErrorIfNeeded(
- controller,
- error,
- ) {
- if (controller[sym.controlledWritableStream][sym.state] === "writable") {
+ /**
+ * @param {WritableStreamDefaultController} controller
+ * @param {any} error
+ */
+ function writableStreamDefaultControllerErrorIfNeeded(controller, error) {
+ if (controller[_stream][_state] === "writable") {
writableStreamDefaultControllerError(controller, error);
}
}
- function writableStreamDefaultControllerGetBackpressure(
- controller,
- ) {
+ /**
+ * @param {WritableStreamDefaultController} controller
+ * @returns {boolean}
+ */
+ function writableStreamDefaultControllerGetBackpressure(controller) {
const desiredSize = writableStreamDefaultControllerGetDesiredSize(
controller,
);
return desiredSize <= 0;
}
- function writableStreamDefaultControllerGetChunkSize(
- controller,
- chunk,
- ) {
- let returnValue;
+ /**
+ * @template W
+ * @param {WritableStreamDefaultController<W>} controller
+ * @param {W} chunk
+ * @returns {number}
+ */
+ function writableStreamDefaultControllerGetChunkSize(controller, chunk) {
+ let value;
try {
- returnValue = controller[sym.strategySizeAlgorithm](chunk);
+ value = controller[_strategySizeAlgorithm](chunk);
} catch (e) {
writableStreamDefaultControllerErrorIfNeeded(controller, e);
return 1;
}
- return returnValue;
+ return value;
}
- function writableStreamDefaultControllerGetDesiredSize(
- controller,
- ) {
- return controller[sym.strategyHWM] - controller[sym.queueTotalSize];
+ /**
+ * @param {WritableStreamDefaultController} controller
+ * @returns {number}
+ */
+ function writableStreamDefaultControllerGetDesiredSize(controller) {
+ return controller[_strategyHWM] - controller[_queueTotalSize];
}
- function writableStreamDefaultControllerProcessClose(
- controller,
- ) {
- const stream = controller[sym.controlledWritableStream];
+ /** @param {WritableStreamDefaultController} controller */
+ function writableStreamDefaultControllerProcessClose(controller) {
+ const stream = controller[_stream];
writableStreamMarkCloseRequestInFlight(stream);
dequeueValue(controller);
- assert(controller[sym.queue].length === 0);
- const sinkClosePromise = controller[sym.closeAlgorithm]();
+ assert(controller[_queue].length === 0);
+ const sinkClosePromise = controller[_closeAlgorithm]();
writableStreamDefaultControllerClearAlgorithms(controller);
- setPromiseIsHandledToTrue(
- sinkClosePromise.then(
- () => {
- writableStreamFinishInFlightClose(stream);
- },
- (reason) => {
- writableStreamFinishInFlightCloseWithError(stream, reason);
- },
- ),
- );
+ uponPromise(sinkClosePromise, () => {
+ writableStreamFinishInFlightClose(stream);
+ }, (reason) => {
+ writableStreamFinishInFlightCloseWithError(stream, reason);
+ });
}
- function writableStreamDefaultControllerProcessWrite(
- controller,
- chunk,
- ) {
- const stream = controller[sym.controlledWritableStream];
+ /**
+ * @template W
+ * @param {WritableStreamDefaultController<W>} controller
+ * @param {W} chunk
+ */
+ function writableStreamDefaultControllerProcessWrite(controller, chunk) {
+ const stream = controller[_stream];
writableStreamMarkFirstWriteRequestInFlight(stream);
- const sinkWritePromise = controller[sym.writeAlgorithm](chunk);
- setPromiseIsHandledToTrue(
- sinkWritePromise.then(
- () => {
- writableStreamFinishInFlightWrite(stream);
- const state = stream[sym.state];
- assert(state === "writable" || state === "erroring");
- dequeueValue(controller);
- if (
- !writableStreamCloseQueuedOrInFlight(stream) &&
- state === "writable"
- ) {
- const backpressure = writableStreamDefaultControllerGetBackpressure(
- controller,
- );
- writableStreamUpdateBackpressure(stream, backpressure);
- }
- writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
- },
- (reason) => {
- if (stream[sym.state] === "writable") {
- writableStreamDefaultControllerClearAlgorithms(controller);
- }
- writableStreamFinishInFlightWriteWithError(stream, reason);
- },
- ),
- );
+ const sinkWritePromise = controller[_writeAlgorithm](chunk, controller);
+ uponPromise(sinkWritePromise, () => {
+ writableStreamFinishInFlightWrite(stream);
+ const state = stream[_state];
+ assert(state === "writable" || state === "erroring");
+ dequeueValue(controller);
+ if (
+ writableStreamCloseQueuedOrInFlight(stream) === false &&
+ state === "writable"
+ ) {
+ const backpressure = writableStreamDefaultControllerGetBackpressure(
+ controller,
+ );
+ writableStreamUpdateBackpressure(stream, backpressure);
+ }
+ writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+ }, (reason) => {
+ if (stream[_state] === "writable") {
+ writableStreamDefaultControllerClearAlgorithms(controller);
+ }
+ writableStreamFinishInFlightWriteWithError(stream, reason);
+ });
}
- function writableStreamDefaultControllerWrite(
- controller,
- chunk,
- chunkSize,
- ) {
- const writeRecord = { chunk };
+ /**
+ * @template W
+ * @param {WritableStreamDefaultController<W>} controller
+ * @param {W} chunk
+ * @param {number} chunkSize
+ */
+ function writableStreamDefaultControllerWrite(controller, chunk, chunkSize) {
try {
- enqueueValueWithSize(controller, writeRecord, chunkSize);
+ enqueueValueWithSize(controller, chunk, chunkSize);
} catch (e) {
writableStreamDefaultControllerErrorIfNeeded(controller, e);
return;
}
- const stream = controller[sym.controlledWritableStream];
+ const stream = controller[_stream];
if (
- !writableStreamCloseQueuedOrInFlight(stream) &&
- stream[sym.state] === "writable"
+ writableStreamCloseQueuedOrInFlight(stream) === false &&
+ stream[_state] === "writable"
) {
const backpressure = writableStreamDefaultControllerGetBackpressure(
controller,
@@ -3065,373 +2549,1323 @@
writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
}
- function writableStreamDefaultWriterAbort(
- writer,
- reason,
- ) {
- const stream = writer[sym.ownerWritableStream];
- assert(stream);
+ /**
+ * @param {WritableStreamDefaultWriter} writer
+ * @param {any=} reason
+ * @returns {Promise<void>}
+ */
+ function writableStreamDefaultWriterAbort(writer, reason) {
+ const stream = writer[_stream];
+ assert(stream !== undefined);
return writableStreamAbort(stream, reason);
}
- function writableStreamDefaultWriterClose(
- writer,
- ) {
- const stream = writer[sym.ownerWritableStream];
- assert(stream);
+ /**
+ * @param {WritableStreamDefaultWriter} writer
+ * @returns {Promise<void>}
+ */
+ function writableStreamDefaultWriterClose(writer) {
+ const stream = writer[_stream];
+ assert(stream !== undefined);
return writableStreamClose(stream);
}
- function writableStreamDefaultWriterCloseWithErrorPropagation(
- writer,
- ) {
- const stream = writer[sym.ownerWritableStream];
- assert(stream);
- const state = stream[sym.state];
- if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") {
- return Promise.resolve();
+ /**
+ * @param {WritableStreamDefaultWriter} writer
+ * @returns {Promise<void>}
+ */
+ function writableStreamDefaultWriterCloseWithErrorPropagation(writer) {
+ const stream = writer[_stream];
+ assert(stream !== undefined);
+ const state = stream[_state];
+ if (
+ writableStreamCloseQueuedOrInFlight(stream) === true || state === "closed"
+ ) {
+ return resolvePromiseWith(undefined);
}
if (state === "errored") {
- return Promise.reject(stream[sym.storedError]);
+ return Promise.reject(stream[_storedError]);
}
assert(state === "writable" || state === "erroring");
return writableStreamDefaultWriterClose(writer);
}
- function writableStreamDefaultWriterEnsureClosePromiseRejected(
+ /**
+ * @param {WritableStreamDefaultWriter} writer
+ * @param {any=} error
+ */
+ function writableStreamDefaultWriterEnsureClosedPromiseRejected(
writer,
error,
) {
- if (writer[sym.closedPromise].reject) {
- writer[sym.closedPromise].reject(error);
+ if (writer[_closedPromise].state === "pending") {
+ writer[_closedPromise].reject(error);
} else {
- writer[sym.closedPromise] = {
- promise: Promise.reject(error),
- };
+ writer[_closedPromise] = new Deferred();
+ writer[_closedPromise].reject(error);
}
- setPromiseIsHandledToTrue(writer[sym.closedPromise].promise);
+ setPromiseIsHandledToTrue(writer[_closedPromise].promise);
}
+ /**
+ * @param {WritableStreamDefaultWriter} writer
+ * @param {any=} error
+ */
function writableStreamDefaultWriterEnsureReadyPromiseRejected(
writer,
error,
) {
- if (writer[sym.readyPromise].reject) {
- writer[sym.readyPromise].reject(error);
- writer[sym.readyPromise].reject = undefined;
- writer[sym.readyPromise].resolve = undefined;
+ if (writer[_readyPromise].state === "pending") {
+ writer[_readyPromise].reject(error);
} else {
- writer[sym.readyPromise] = {
- promise: Promise.reject(error),
- };
- }
- setPromiseIsHandledToTrue(writer[sym.readyPromise].promise);
- }
-
- function writableStreamDefaultWriterWrite(
- writer,
- chunk,
- ) {
- const stream = writer[sym.ownerWritableStream];
- assert(stream);
- const controller = stream[sym.writableStreamController];
- assert(controller);
- const chunkSize = writableStreamDefaultControllerGetChunkSize(
- controller,
- chunk,
- );
- if (stream !== writer[sym.ownerWritableStream]) {
- return Promise.reject("Writer has incorrect WritableStream.");
- }
- const state = stream[sym.state];
- if (state === "errored") {
- return Promise.reject(stream[sym.storedError]);
- }
- if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") {
- return Promise.reject(new TypeError("The stream is closed or closing."));
+ writer[_readyPromise] = new Deferred();
+ writer[_readyPromise].reject(error);
}
- if (state === "erroring") {
- return Promise.reject(stream[sym.storedError]);
- }
- assert(state === "writable");
- const promise = writableStreamAddWriteRequest(stream);
- writableStreamDefaultControllerWrite(controller, chunk, chunkSize);
- return promise;
+ setPromiseIsHandledToTrue(writer[_readyPromise].promise);
}
- function writableStreamDefaultWriterGetDesiredSize(
- writer,
- ) {
- const stream = writer[sym.ownerWritableStream];
- const state = stream[sym.state];
+ /**
+ * @param {WritableStreamDefaultWriter} writer
+ * @returns {number | null}
+ */
+ function writableStreamDefaultWriterGetDesiredSize(writer) {
+ const stream = writer[_stream];
+ const state = stream[_state];
if (state === "errored" || state === "erroring") {
return null;
}
if (state === "closed") {
return 0;
}
- return writableStreamDefaultControllerGetDesiredSize(
- stream[sym.writableStreamController],
- );
+ return writableStreamDefaultControllerGetDesiredSize(stream[_controller]);
}
- function writableStreamDefaultWriterRelease(
- writer,
- ) {
- const stream = writer[sym.ownerWritableStream];
- assert(stream);
- assert(stream[sym.writer] === writer);
+ /** @param {WritableStreamDefaultWriter} writer */
+ function writableStreamDefaultWriterRelease(writer) {
+ const stream = writer[_stream];
+ assert(stream !== undefined);
+ assert(stream[_writer] === writer);
const releasedError = new TypeError(
- "Writer was released and can no longer be used to monitor the stream's closedness.",
+ "The writer has already been released.",
);
writableStreamDefaultWriterEnsureReadyPromiseRejected(
writer,
releasedError,
);
- writableStreamDefaultWriterEnsureClosePromiseRejected(
+ writableStreamDefaultWriterEnsureClosedPromiseRejected(
writer,
releasedError,
);
- stream[sym.writer] = undefined;
- writer[sym.ownerWritableStream] = undefined;
+ stream[_writer] = undefined;
+ writer[_stream] = undefined;
}
+ /**
+ * @template W
+ * @param {WritableStreamDefaultWriter<W>} writer
+ * @param {W} chunk
+ * @returns {Promise<void>}
+ */
+ function writableStreamDefaultWriterWrite(writer, chunk) {
+ const stream = writer[_stream];
+ assert(stream !== undefined);
+ const controller = stream[_controller];
+ const chunkSize = writableStreamDefaultControllerGetChunkSize(
+ controller,
+ chunk,
+ );
+ if (stream !== writer[_stream]) {
+ return Promise.reject(new TypeError("Writer's stream is unexpected."));
+ }
+ const state = stream[_state];
+ if (state === "errored") {
+ return Promise.reject(stream[_storedError]);
+ }
+ if (
+ writableStreamCloseQueuedOrInFlight(stream) === true || state === "closed"
+ ) {
+ return Promise.reject(
+ new TypeError("The stream is closing or is closed."),
+ );
+ }
+ if (state === "erroring") {
+ return Promise.reject(stream[_storedError]);
+ }
+ assert(state === "writable");
+ const promise = writableStreamAddWriteRequest(stream);
+ writableStreamDefaultControllerWrite(controller, chunk, chunkSize);
+ return promise;
+ }
+
+ /** @param {WritableStream} stream */
function writableStreamFinishErroring(stream) {
- assert(stream[sym.state] === "erroring");
- assert(!writableStreamHasOperationMarkedInFlight(stream));
- stream[sym.state] = "errored";
- stream[sym.writableStreamController][sym.errorSteps]();
- const storedError = stream[sym.storedError];
- for (const writeRequest of stream[sym.writeRequests]) {
- assert(writeRequest.reject);
+ assert(stream[_state] === "erroring");
+ assert(writableStreamHasOperationMarkedInFlight(stream) === false);
+ stream[_state] = "errored";
+ stream[_controller][_errorSteps]();
+ const storedError = stream[_storedError];
+ for (const writeRequest of stream[_writeRequests]) {
writeRequest.reject(storedError);
}
- stream[sym.writeRequests] = [];
- if (!stream[sym.pendingAbortRequest]) {
+ stream[_writeRequests] = [];
+ if (stream[_pendingAbortRequest] === undefined) {
writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
return;
}
- const abortRequest = stream[sym.pendingAbortRequest];
- assert(abortRequest);
- stream[sym.pendingAbortRequest] = undefined;
- if (abortRequest.wasAlreadyErroring) {
- assert(abortRequest.promise.reject);
- abortRequest.promise.reject(storedError);
+ const abortRequest = stream[_pendingAbortRequest];
+ stream[_pendingAbortRequest] = undefined;
+ if (abortRequest.wasAlreadyErroring === true) {
+ abortRequest.deferred.reject(storedError);
writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
return;
}
- const promise = stream[sym.writableStreamController][sym.abortSteps](
- abortRequest.reason,
- );
- setPromiseIsHandledToTrue(
- promise.then(
- () => {
- assert(abortRequest.promise.resolve);
- abortRequest.promise.resolve();
- writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
- },
- (reason) => {
- assert(abortRequest.promise.reject);
- abortRequest.promise.reject(reason);
- writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
- },
- ),
- );
+ const promise = stream[_controller][_abortSteps](abortRequest.reason);
+ uponPromise(promise, () => {
+ abortRequest.deferred.resolve(undefined);
+ writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ }, (reason) => {
+ abortRequest.deferred.reject(reason);
+ writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ });
}
- function writableStreamFinishInFlightClose(
- stream,
- ) {
- assert(stream[sym.inFlightCloseRequest]);
- stream[sym.inFlightCloseRequest]?.resolve();
- stream[sym.inFlightCloseRequest] = undefined;
- const state = stream[sym.state];
+ /** @param {WritableStream} stream */
+ function writableStreamFinishInFlightClose(stream) {
+ assert(stream[_inFlightCloseRequest] !== undefined);
+ stream[_inFlightCloseRequest].resolve(undefined);
+ stream[_inFlightCloseRequest] = undefined;
+ const state = stream[_state];
assert(state === "writable" || state === "erroring");
if (state === "erroring") {
- stream[sym.storedError] = undefined;
- if (stream[sym.pendingAbortRequest]) {
- stream[sym.pendingAbortRequest].promise.resolve();
- stream[sym.pendingAbortRequest] = undefined;
- }
- }
- stream[sym.state] = "closed";
- const writer = stream[sym.writer];
- if (writer) {
- writer[sym.closedPromise].resolve();
- }
- assert(stream[sym.pendingAbortRequest] === undefined);
- assert(stream[sym.storedError] === undefined);
- }
-
- function writableStreamFinishInFlightCloseWithError(
- stream,
- error,
- ) {
- assert(stream[sym.inFlightCloseRequest]);
- stream[sym.inFlightCloseRequest]?.reject(error);
- stream[sym.inFlightCloseRequest] = undefined;
- assert(
- stream[sym.state] === "writable" || stream[sym.state] === "erroring",
- );
- if (stream[sym.pendingAbortRequest]) {
- stream[sym.pendingAbortRequest]?.promise.reject(error);
- stream[sym.pendingAbortRequest] = undefined;
+ stream[_storedError] = undefined;
+ if (stream[_pendingAbortRequest] !== undefined) {
+ stream[_pendingAbortRequest].deferred.resolve(undefined);
+ stream[_pendingAbortRequest] = undefined;
+ }
+ }
+ stream[_state] = "closed";
+ const writer = stream[_writer];
+ if (writer !== undefined) {
+ writer[_closedPromise].resolve(undefined);
+ }
+ assert(stream[_pendingAbortRequest] === undefined);
+ assert(stream[_storedError] === undefined);
+ }
+
+ /**
+ * @param {WritableStream} stream
+ * @param {any=} error
+ */
+ function writableStreamFinishInFlightCloseWithError(stream, error) {
+ assert(stream[_inFlightCloseRequest] !== undefined);
+ stream[_inFlightCloseRequest].reject(error);
+ stream[_inFlightCloseRequest] = undefined;
+ assert(stream[_state] === "writable" || stream[_state] === "erroring");
+ if (stream[_pendingAbortRequest] !== undefined) {
+ stream[_pendingAbortRequest].deferred.reject(error);
+ stream[_pendingAbortRequest] = undefined;
}
writableStreamDealWithRejection(stream, error);
}
- function writableStreamFinishInFlightWrite(
- stream,
- ) {
- assert(stream[sym.inFlightWriteRequest]);
- stream[sym.inFlightWriteRequest].resolve();
- stream[sym.inFlightWriteRequest] = undefined;
+ /** @param {WritableStream} stream */
+ function writableStreamFinishInFlightWrite(stream) {
+ assert(stream[_inFlightWriteRequest] !== undefined);
+ stream[_inFlightWriteRequest].resolve(undefined);
+ stream[_inFlightWriteRequest] = undefined;
}
- function writableStreamFinishInFlightWriteWithError(
- stream,
- error,
- ) {
- assert(stream[sym.inFlightWriteRequest]);
- stream[sym.inFlightWriteRequest].reject(error);
- stream[sym.inFlightWriteRequest] = undefined;
- assert(
- stream[sym.state] === "writable" || stream[sym.state] === "erroring",
- );
+ /**
+ * @param {WritableStream} stream
+ * @param {any=} error
+ */
+ function writableStreamFinishInFlightWriteWithError(stream, error) {
+ assert(stream[_inFlightWriteRequest] !== undefined);
+ stream[_inFlightWriteRequest].reject(error);
+ stream[_inFlightWriteRequest] = undefined;
+ assert(stream[_state] === "writable" || stream[_state] === "erroring");
writableStreamDealWithRejection(stream, error);
}
- function writableStreamHasOperationMarkedInFlight(
- stream,
- ) {
- return !(
- stream[sym.inFlightWriteRequest] === undefined &&
- stream[sym.inFlightCloseRequest] === undefined
- );
+ /**
+ * @param {WritableStream} stream
+ * @returns {boolean}
+ */
+ function writableStreamHasOperationMarkedInFlight(stream) {
+ if (
+ stream[_inFlightWriteRequest] === undefined &&
+ stream[_controller][_inFlightCloseRequest] === undefined
+ ) {
+ return false;
+ }
+ return true;
}
- function writableStreamMarkCloseRequestInFlight(
- stream,
- ) {
- assert(stream[sym.inFlightCloseRequest] === undefined);
- assert(stream[sym.closeRequest] !== undefined);
- stream[sym.inFlightCloseRequest] = stream[sym.closeRequest];
- stream[sym.closeRequest] = undefined;
+ /** @param {WritableStream} stream */
+ function writableStreamMarkCloseRequestInFlight(stream) {
+ assert(stream[_inFlightCloseRequest] === undefined);
+ assert(stream[_closeRequest] !== undefined);
+ stream[_inFlightCloseRequest] = stream[_closeRequest];
+ stream[_closeRequest] = undefined;
}
- function writableStreamMarkFirstWriteRequestInFlight(
- stream,
- ) {
- assert(stream[sym.inFlightWriteRequest] === undefined);
- assert(stream[sym.writeRequests].length);
- const writeRequest = stream[sym.writeRequests].shift();
- stream[sym.inFlightWriteRequest] = writeRequest;
+ /**
+ * @template W
+ * @param {WritableStream<W>} stream
+ * */
+ function writableStreamMarkFirstWriteRequestInFlight(stream) {
+ assert(stream[_inFlightWriteRequest] === undefined);
+ assert(stream[_writeRequests].length);
+ const writeRequest = stream[_writeRequests].shift();
+ stream[_inFlightWriteRequest] = writeRequest;
}
- function writableStreamRejectCloseAndClosedPromiseIfNeeded(
- stream,
- ) {
- assert(stream[sym.state] === "errored");
- if (stream[sym.closeRequest]) {
- assert(stream[sym.inFlightCloseRequest] === undefined);
- stream[sym.closeRequest].reject(stream[sym.storedError]);
- stream[sym.closeRequest] = undefined;
+ /** @param {WritableStream} stream */
+ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) {
+ assert(stream[_state] === "errored");
+ if (stream[_closeRequest] !== undefined) {
+ assert(stream[_inFlightCloseRequest] === undefined);
+ stream[_closeRequest].reject(stream[_storedError]);
+ stream[_closeRequest] = undefined;
}
- const writer = stream[sym.writer];
- if (writer) {
- writer[sym.closedPromise].reject(stream[sym.storedError]);
- setPromiseIsHandledToTrue(writer[sym.closedPromise].promise);
+ const writer = stream[_writer];
+ if (writer !== undefined) {
+ writer[_closedPromise].reject(stream[_storedError]);
+ setPromiseIsHandledToTrue(writer[_closedPromise].promise);
}
}
- function writableStreamStartErroring(
- stream,
- reason,
- ) {
- assert(stream[sym.storedError] === undefined);
- assert(stream[sym.state] === "writable");
- const controller = stream[sym.writableStreamController];
+ /**
+ * @param {WritableStream} stream
+ * @param {any=} reason
+ */
+ function writableStreamStartErroring(stream, reason) {
+ assert(stream[_storedError] === undefined);
+ assert(stream[_state] === "writable");
+ const controller = stream[_controller];
assert(controller);
- stream[sym.state] = "erroring";
- stream[sym.storedError] = reason;
- const writer = stream[sym.writer];
+ stream[_state] = "erroring";
+ stream[_storedError] = reason;
+ const writer = stream[_writer];
if (writer) {
writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);
}
if (
- !writableStreamHasOperationMarkedInFlight(stream) &&
- controller[sym.started]
+ writableStreamHasOperationMarkedInFlight(stream) === false &&
+ controller[_started] === true
) {
writableStreamFinishErroring(stream);
}
}
- function writableStreamUpdateBackpressure(
- stream,
- backpressure,
- ) {
- assert(stream[sym.state] === "writable");
- assert(!writableStreamCloseQueuedOrInFlight(stream));
- const writer = stream[sym.writer];
- if (writer && backpressure !== stream[sym.backpressure]) {
- if (backpressure) {
- writer[sym.readyPromise] = getDeferred();
+ /**
+ * @param {WritableStream} stream
+ * @param {boolean} backpressure
+ */
+ function writableStreamUpdateBackpressure(stream, backpressure) {
+ assert(stream[_state] === "writable");
+ assert(writableStreamCloseQueuedOrInFlight(stream) === false);
+ const writer = stream[_writer];
+ if (writer !== undefined && backpressure !== stream[_backpressure]) {
+ if (backpressure === true) {
+ writer[_readyPromise] = new Deferred();
} else {
assert(backpressure === false);
- writer[sym.readyPromise].resolve();
- writer[sym.readyPromise].resolve = undefined;
- writer[sym.readyPromise].reject = undefined;
+ writer[_readyPromise].resolve(undefined);
}
}
- stream[sym.backpressure] = backpressure;
+ stream[_backpressure] = backpressure;
+ }
+
+ /**
+ * @template T
+ * @param {T} value
+ * @param {boolean} done
+ * @returns {IteratorResult<T>}
+ */
+ function createIteratorResult(value, done) {
+ const result = Object.create(null);
+ Object.defineProperties(result, {
+ value: { value, writable: true, enumerable: true, configurable: true },
+ done: {
+ value: done,
+ writable: true,
+ enumerable: true,
+ configurable: true,
+ },
+ });
+ return result;
+ }
+
+ /** @type {AsyncIterator<unknown, unknown>} */
+ const asyncIteratorPrototype = Object.getPrototypeOf(
+ Object.getPrototypeOf(async function* () {}).prototype,
+ );
+
+ /** @type {AsyncIterator<unknown>} */
+ const readableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
+ /** @returns {Promise<IteratorResult<unknown>>} */
+ next() {
+ /** @type {ReadableStreamDefaultReader} */
+ const reader = this[_reader];
+ if (reader[_stream] === undefined) {
+ return Promise.reject(
+ new TypeError(
+ "Cannot get the next iteration result once the reader has been released.",
+ ),
+ );
+ }
+ /** @type {Deferred<IteratorResult<any>>} */
+ const promise = new Deferred();
+ /** @type {ReadRequest} */
+ const readRequest = {
+ chunkSteps(chunk) {
+ promise.resolve(createIteratorResult(chunk, false));
+ },
+ closeSteps() {
+ readableStreamReaderGenericRelease(reader);
+ promise.resolve(createIteratorResult(undefined, true));
+ },
+ errorSteps(e) {
+ readableStreamReaderGenericRelease(reader);
+ promise.reject(e);
+ },
+ };
+ readableStreamDefaultReaderRead(reader, readRequest);
+ return promise.promise;
+ },
+ /**
+ * @param {unknown} arg
+ * @returns {Promise<IteratorResult<unknown>>}
+ */
+ async return(arg) {
+ /** @type {ReadableStreamDefaultReader} */
+ const reader = this[_reader];
+ if (reader[_stream] === undefined) {
+ return createIteratorResult(undefined, true);
+ }
+ assert(reader[_readRequests].length === 0);
+ if (this[_preventCancel] === false) {
+ const result = readableStreamReaderGenericCancel(reader, arg);
+ readableStreamReaderGenericRelease(reader);
+ await result;
+ return createIteratorResult(arg, true);
+ }
+ readableStreamReaderGenericRelease(reader);
+ return createIteratorResult(undefined, true);
+ },
+ }, asyncIteratorPrototype);
+
+ class ByteLengthQueuingStrategy {
+ /** @type {number} */
+ highWaterMark;
+
+ /** @param {{ highWaterMark: number }} init */
+ constructor(init) {
+ if (
+ typeof init !== "object" || init === null || !("highWaterMark" in init)
+ ) {
+ throw new TypeError(
+ "init must be an object that contains a property named highWaterMark",
+ );
+ }
+ const { highWaterMark } = init;
+ this[_globalObject] = window;
+ this.highWaterMark = Number(highWaterMark);
+ }
+
+ /** @returns {(chunk: ArrayBufferView) => number} */
+ get size() {
+ initializeByteLengthSizeFunction(this[_globalObject]);
+ return byteSizeFunctionWeakMap.get(this[_globalObject]);
+ }
+ }
+
+ /** @type {WeakMap<typeof globalThis, (chunk: ArrayBufferView) => number>} */
+ const byteSizeFunctionWeakMap = new WeakMap();
+
+ function initializeByteLengthSizeFunction(globalObject) {
+ if (byteSizeFunctionWeakMap.has(globalObject)) {
+ return;
+ }
+ byteSizeFunctionWeakMap.set(globalObject, function size(chunk) {
+ return chunk.byteLength;
+ });
}
class CountQueuingStrategy {
- constructor({ highWaterMark }) {
- this.highWaterMark = highWaterMark;
+ /** @type {number} */
+ highWaterMark;
+
+ /** @param {{ highWaterMark: number }} init */
+ constructor(init) {
+ if (
+ typeof init !== "object" || init === null || !("highWaterMark" in init)
+ ) {
+ throw new TypeError(
+ "init must be an object that contains a property named highWaterMark",
+ );
+ }
+ const { highWaterMark } = init;
+ this[_globalObject] = window;
+ this.highWaterMark = Number(highWaterMark);
}
- size() {
+ /** @returns {(chunk: any) => 1} */
+ get size() {
+ initializeCountSizeFunction(this[_globalObject]);
+ return countSizeFunctionWeakMap.get(this[_globalObject]);
+ }
+ }
+
+ /** @type {WeakMap<typeof globalThis, () => 1>} */
+ const countSizeFunctionWeakMap = new WeakMap();
+
+ /** @param {typeof globalThis} globalObject */
+ function initializeCountSizeFunction(globalObject) {
+ if (countSizeFunctionWeakMap.has(globalObject)) {
+ return;
+ }
+ countSizeFunctionWeakMap.set(globalObject, function size() {
return 1;
+ });
+ }
+
+ /** @template R */
+ class ReadableStream {
+ /** @type {ReadableStreamDefaultController | ReadableByteStreamController} */
+ [_controller];
+ /** @type {boolean} */
+ [_detached];
+ /** @type {boolean} */
+ [_disturbed];
+ /** @type {ReadableStreamDefaultReader | undefined} */
+ [_reader];
+ /** @type {"readable" | "closed" | "errored"} */
+ [_state];
+ /** @type {any} */
+ [_storedError];
+
+ /**
+ * @param {UnderlyingSource<R>=} underlyingSource
+ * @param {QueuingStrategy<R>=} strategy
+ */
+ constructor(underlyingSource, strategy = {}) {
+ const underlyingSourceDict = convertUnderlyingSource(underlyingSource);
+ initializeReadableStream(this);
+ if (underlyingSourceDict.type === "bytes") {
+ if (strategy.size !== undefined) {
+ throw new RangeError(
+ `When underlying source is "bytes", strategy.size must be undefined.`,
+ );
+ }
+ const highWaterMark = extractHighWaterMark(strategy, 0);
+ setUpReadableByteStreamControllerFromUnderlyingSource(
+ // @ts-ignore cannot easily assert this is ReadableStream<ArrayBuffer>
+ this,
+ underlyingSource,
+ underlyingSourceDict,
+ highWaterMark,
+ );
+ } else {
+ assert(!("type" in underlyingSourceDict));
+ const sizeAlgorithm = extractSizeAlgorithm(strategy);
+ const highWaterMark = extractHighWaterMark(strategy, 1);
+ setUpReadableStreamDefaultControllerFromUnderlyingSource(
+ this,
+ underlyingSource,
+ underlyingSourceDict,
+ highWaterMark,
+ sizeAlgorithm,
+ );
+ }
+ }
+
+ /** @returns {boolean} */
+ get locked() {
+ return isReadableStreamLocked(this);
+ }
+
+ /**
+ * @param {any=} reason
+ * @returns {Promise<void>}
+ */
+ cancel(reason) {
+ if (isReadableStreamLocked(this)) {
+ Promise.reject(new TypeError("Cannot cancel a locked ReadableStream."));
+ }
+ return readableStreamCancel(this, reason);
+ }
+
+ /**
+ * @param {ReadableStreamGetReaderOptions=} options
+ * @returns {ReadableStreamDefaultReader<R>}
+ */
+ getReader(options = {}) {
+ if (typeof options !== "object") {
+ throw new TypeError("options must be an object");
+ }
+ if (options === null) {
+ options = {};
+ }
+ /** @type {any} */
+ let { mode } = options;
+ if (mode === undefined) {
+ return acquireReadableStreamDefaultReader(this);
+ }
+ mode = String(mode);
+ if (mode !== "byob") {
+ throw new TypeError("Invalid mode.");
+ }
+ // 3. Return ? AcquireReadableStreamBYOBReader(this).
+ throw new RangeError(`Unsupported mode "${String(mode)}"`);
+ }
+
+ /**
+ * @template T
+ * @param {{ readable: ReadableStream<T>, writable: WritableStream<R> }} transform
+ * @param {PipeOptions=} options
+ * @returns {ReadableStream<T>}
+ */
+ pipeThrough(
+ { readable, writable },
+ { preventClose, preventAbort, preventCancel, signal } = {},
+ ) {
+ if (isReadableStreamLocked(this)) {
+ throw new TypeError("ReadableStream is already locked.");
+ }
+ if (isWritableStreamLocked(writable)) {
+ throw new TypeError("Target WritableStream is already locked.");
+ }
+ const promise = readableStreamPipeTo(
+ this,
+ writable,
+ preventClose,
+ preventAbort,
+ preventCancel,
+ signal,
+ );
+ setPromiseIsHandledToTrue(promise);
+ return readable;
+ }
+
+ /**
+ * @param {WritableStream<R>} destination
+ * @param {PipeOptions=} options
+ * @returns {Promise<void>}
+ */
+ pipeTo(
+ destination,
+ {
+ preventClose = false,
+ preventAbort = false,
+ preventCancel = false,
+ signal,
+ } = {},
+ ) {
+ if (isReadableStreamLocked(this)) {
+ return Promise.reject(
+ new TypeError("ReadableStream is already locked."),
+ );
+ }
+ if (isWritableStreamLocked(destination)) {
+ return Promise.reject(
+ new TypeError("destination WritableStream is already locked."),
+ );
+ }
+ return readableStreamPipeTo(
+ this,
+ destination,
+ preventClose,
+ preventAbort,
+ preventCancel,
+ signal,
+ );
+ }
+
+ /** @returns {[ReadableStream<R>, ReadableStream<R>]} */
+ tee() {
+ return readableStreamTee(this, false);
+ }
+
+ /**
+ * @param {ReadableStreamIteratorOptions=} options
+ * @returns {AsyncIterableIterator<R>}
+ */
+ [Symbol.asyncIterator]({ preventCancel } = {}) {
+ /** @type {AsyncIterableIterator<R>} */
+ const iterator = Object.create(readableStreamAsyncIteratorPrototype);
+ const reader = acquireReadableStreamDefaultReader(this);
+ iterator[_reader] = reader;
+ iterator[_preventCancel] = preventCancel;
+ return iterator;
}
[customInspect]() {
- return `${this.constructor.name} { highWaterMark: ${
- String(this.highWaterMark)
- }, size: f }`;
+ return `${this.constructor.name} ${
+ Deno.inspect({ locked: this.locked })
+ }`;
}
}
- Object.defineProperty(CountQueuingStrategy.prototype, "size", {
- enumerable: true,
- });
+ /** @template R */
+ class ReadableStreamGenericReader {
+ /** @type {Deferred<void>} */
+ [_closedPromise];
+ /** @type {ReadableStream<R> | undefined} */
+ [_stream];
- class ByteLengthQueuingStrategy {
- constructor({ highWaterMark }) {
- this.highWaterMark = highWaterMark;
+ get closed() {
+ return this[_closedPromise].promise;
}
- size(chunk) {
- return chunk.byteLength;
+ /**
+ * @param {any} reason
+ * @returns {Promise<void>}
+ */
+ cancel(reason) {
+ if (this[_stream] === undefined) {
+ return Promise.reject(
+ new TypeError("Reader has no associated stream."),
+ );
+ }
+ return readableStreamReaderGenericCancel(this, reason);
+ }
+ }
+
+ /** @template R */
+ class ReadableStreamDefaultReader extends ReadableStreamGenericReader {
+ /** @type {ReadRequest[]} */
+ [_readRequests];
+
+ /** @param {ReadableStream<R>} stream */
+ constructor(stream) {
+ if (!(stream instanceof ReadableStream)) {
+ throw new TypeError("stream is not a ReadableStream");
+ }
+ super();
+ setUpReadableStreamDefaultReader(this, stream);
+ }
+
+ /** @returns {Promise<ReadableStreamReadResult<R>>} */
+ read() {
+ if (this[_stream] === undefined) {
+ return Promise.reject(
+ new TypeError("Reader has no associated stream."),
+ );
+ }
+ /** @type {Deferred<ReadableStreamReadResult<R>>} */
+ const promise = new Deferred();
+ /** @type {ReadRequest<R>} */
+ const readRequest = {
+ chunkSteps(chunk) {
+ promise.resolve({ value: chunk, done: false });
+ },
+ closeSteps() {
+ promise.resolve({ value: undefined, done: true });
+ },
+ errorSteps(e) {
+ promise.reject(e);
+ },
+ };
+ readableStreamDefaultReaderRead(this, readRequest);
+ return promise.promise;
+ }
+
+ /** @returns {void} */
+ releaseLock() {
+ if (this[_stream] === undefined) {
+ return;
+ }
+ if (this[_readRequests].length) {
+ throw new TypeError(
+ "There are pending read requests, so the reader cannot be release.",
+ );
+ }
+ readableStreamReaderGenericRelease(this);
+ }
+
+ [customInspect]() {
+ return `${this.constructor.name} { closed: ${String(this.closed)} }`;
+ }
+ }
+
+ class ReadableByteStreamController {
+ /** @type {number | undefined} */
+ [_autoAllocateChunkSize];
+ /** @type {null} */
+ [_byobRequest];
+ /** @type {(reason: any) => Promise<void>} */
+ [_cancelAlgorithm];
+ /** @type {boolean} */
+ [_closeRequested];
+ /** @type {boolean} */
+ [_pullAgain];
+ /** @type {(controller: this) => Promise<void>} */
+ [_pullAlgorithm];
+ /** @type {boolean} */
+ [_pulling];
+ /** @type {ReadableByteStreamQueueEntry[]} */
+ [_queue];
+ /** @type {number} */
+ [_queueTotalSize];
+ /** @type {boolean} */
+ [_started];
+ /** @type {number} */
+ [_strategyHWM];
+ /** @type {ReadableStream<ArrayBuffer>} */
+ [_stream];
+
+ get byobRequest() {
+ return undefined;
+ }
+
+ /** @returns {number | null} */
+ get desiredSize() {
+ return readableByteStreamControllerGetDesiredSize(this);
+ }
+
+ /** @returns {void} */
+ close() {
+ if (this[_closeRequested] === true) {
+ throw new TypeError("Closed already requested.");
+ }
+ if (this[_stream][_state] !== "readable") {
+ throw new TypeError(
+ "ReadableByteStreamController's stream is not in a readable state.",
+ );
+ }
+ readableByteStreamControllerClose(this);
+ }
+
+ /**
+ * @param {ArrayBufferView} chunk
+ * @returns {void}
+ */
+ enqueue(chunk) {
+ if (chunk.byteLength === 0) {
+ throw new TypeError("chunk must have a non-zero byteLength.");
+ }
+ if (chunk.buffer.byteLength === 0) {
+ throw new TypeError("chunk's buffer must have a non-zero byteLength.");
+ }
+ if (this[_closeRequested] === true) {
+ throw new TypeError(
+ "Cannot enqueue chunk after a close has been requested.",
+ );
+ }
+ if (this[_stream][_state] !== "readable") {
+ throw new TypeError(
+ "Cannot enqueue chunk when underlying stream is not readable.",
+ );
+ }
+ return readableByteStreamControllerEnqueue(this, chunk);
+ }
+
+ /**
+ * @param {any=} e
+ * @returns {void}
+ */
+ error(e) {
+ readableByteStreamControllerError(this, e);
+ }
+
+ /**
+ * @param {any} reason
+ * @returns {Promise<void>}
+ */
+ [_cancelSteps](reason) {
+ // 4.7.4. CancelStep 1. If this.[[pendingPullIntos]] is not empty,
+ resetQueue(this);
+ const result = this[_cancelAlgorithm](reason);
+ readableByteStreamControllerClearAlgorithms(this);
+ return result;
+ }
+
+ /**
+ * @param {ReadRequest<ArrayBuffer>} readRequest
+ * @returns {void}
+ */
+ [_pullSteps](readRequest) {
+ /** @type {ReadableStream<ArrayBuffer>} */
+ const stream = this[_stream];
+ assert(readableStreamHasDefaultReader(stream));
+ if (this[_queueTotalSize] > 0) {
+ assert(readableStreamGetNumReadRequests(stream) === 0);
+ const entry = this[_queue].shift();
+ this[_queueTotalSize] -= entry.byteLength;
+ readableByteStreamControllerHandleQueueDrain(this);
+ const view = new Uint8Array(
+ entry.buffer,
+ entry.byteOffset,
+ entry.byteLength,
+ );
+ readRequest.chunkSteps(view);
+ return;
+ }
+ // 4. Let autoAllocateChunkSize be this.[[autoAllocateChunkSize]].
+ // 5. If autoAllocateChunkSize is not undefined,
+ readableStreamAddReadRequest(stream, readRequest);
+ readableByteStreamControllerCallPullIfNeeded(this);
+ }
+ }
+
+ /** @template R */
+ class ReadableStreamDefaultController {
+ /** @type {(reason: any) => Promise<void>} */
+ [_cancelAlgorithm];
+ /** @type {boolean} */
+ [_closeRequested];
+ /** @type {boolean} */
+ [_pullAgain];
+ /** @type {(controller: this) => Promise<void>} */
+ [_pullAlgorithm];
+ /** @type {boolean} */
+ [_pulling];
+ /** @type {Array<ValueWithSize<R>>} */
+ [_queue];
+ /** @type {number} */
+ [_queueTotalSize];
+ /** @type {boolean} */
+ [_started];
+ /** @type {number} */
+ [_strategyHWM];
+ /** @type {(chunk: R) => number} */
+ [_strategySizeAlgorithm];
+ /** @type {ReadableStream<R>} */
+ [_stream];
+
+ /** @returns {number | null} */
+ get desiredSize() {
+ return readableStreamDefaultControllerGetDesiredSize(this);
+ }
+
+ /** @returns {void} */
+ close() {
+ if (readableStreamDefaultControllerCanCloseOrEnqueue(this) === false) {
+ throw new TypeError("The stream controller cannot close or enqueue.");
+ }
+ readableStreamDefaultControllerClose(this);
+ }
+
+ /**
+ * @param {R} chunk
+ * @returns {void}
+ */
+ enqueue(chunk) {
+ if (readableStreamDefaultControllerCanCloseOrEnqueue(this) === false) {
+ throw new TypeError("The stream controller cannot close or enqueue.");
+ }
+ readableStreamDefaultControllerEnqueue(this, chunk);
+ }
+
+ /**
+ * @param {any=} e
+ * @returns {void}
+ */
+ error(e) {
+ readableStreamDefaultControllerError(this, e);
+ }
+
+ /**
+ * @param {any} reason
+ * @returns {Promise<void>}
+ */
+ [_cancelSteps](reason) {
+ resetQueue(this);
+ const result = this[_cancelAlgorithm](reason);
+ readableStreamDefaultControllerClearAlgorithms(this);
+ return result;
+ }
+
+ /**
+ * @param {ReadRequest<R>} readRequest
+ * @returns {void}
+ */
+ [_pullSteps](readRequest) {
+ const stream = this[_stream];
+ if (this[_queue].length) {
+ const chunk = dequeueValue(this);
+ if (this[_closeRequested] && this[_queue].length === 0) {
+ readableStreamDefaultControllerClearAlgorithms(this);
+ readableStreamClose(stream);
+ } else {
+ readableStreamDefaultControllerCallPullIfNeeded(this);
+ }
+ readRequest.chunkSteps(chunk);
+ } else {
+ readableStreamAddReadRequest(stream, readRequest);
+ readableStreamDefaultControllerCallPullIfNeeded(this);
+ }
+ }
+ }
+
+ /**
+ * @template I
+ * @template O
+ */
+ class TransformStream {
+ /** @type {boolean} */
+ [_backpressure];
+ /** @type {Deferred<void>} */
+ [_backpressureChangePromise];
+ /** @type {TransformStreamDefaultController<O>} */
+ [_controller];
+ /** @type {boolean} */
+ [_detached];
+ /** @type {ReadableStream<O>} */
+ [_readable];
+ /** @type {WritableStream<I>} */
+ [_writable];
+
+ /**
+ *
+ * @param {Transformer<I, O>} transformer
+ * @param {QueuingStrategy<I>} writableStrategy
+ * @param {QueuingStrategy<O>} readableStrategy
+ */
+ constructor(
+ transformer = null,
+ writableStrategy = {},
+ readableStrategy = {},
+ ) {
+ const transformerDict = convertTransformer(transformer);
+ if (transformerDict.readableType) {
+ throw new RangeError("readableType transformers not supported.");
+ }
+ if (transformerDict.writableType) {
+ throw new RangeError("writableType transformers not supported.");
+ }
+ const readableHighWaterMark = extractHighWaterMark(readableStrategy, 0);
+ const readableSizeAlgorithm = extractSizeAlgorithm(readableStrategy);
+ const writableHighWaterMark = extractHighWaterMark(writableStrategy, 1);
+ const writableSizeAlgorithm = extractSizeAlgorithm(writableStrategy);
+ /** @type {Deferred<void>} */
+ const startPromise = new Deferred();
+ initializeTransformStream(
+ this,
+ startPromise,
+ writableHighWaterMark,
+ writableSizeAlgorithm,
+ readableHighWaterMark,
+ readableSizeAlgorithm,
+ );
+ setUpTransformStreamDefaultControllerFromTransformer(
+ this,
+ transformer,
+ transformerDict,
+ );
+ if ("start" in transformerDict) {
+ startPromise.resolve(
+ transformerDict.start.call(transformer, this[_controller]),
+ );
+ } else {
+ startPromise.resolve(undefined);
+ }
+ }
+
+ /** @returns {ReadableStream<O>} */
+ get readable() {
+ return this[_readable];
+ }
+
+ /** @returns {WritableStream<I>} */
+ get writable() {
+ return this[_writable];
+ }
+
+ [customInspect]() {
+ return `${this.constructor.name} ${
+ Deno.inspect(
+ { readable: this.readable, writable: this.writable },
+ { depth: 1 },
+ )
+ }`;
+ }
+ }
+
+ /** @template O */
+ class TransformStreamDefaultController {
+ /** @type {(controller: this) => Promise<void>} */
+ [_flushAlgorithm];
+ /** @type {TransformStream<O>} */
+ [_stream];
+ /** @type {(chunk: O, controller: this) => Promise<void>} */
+ [_transformAlgorithm];
+
+ /** @returns {number | null} */
+ get desiredSize() {
+ const readableController = this[_stream][_readable][_controller];
+ return readableStreamDefaultControllerGetDesiredSize(
+ /** @type {ReadableStreamDefaultController<O>} */ (readableController),
+ );
+ }
+
+ /**
+ * @param {O} chunk
+ * @returns {void}
+ */
+ enqueue(chunk) {
+ transformStreamDefaultControllerEnqueue(this, chunk);
+ }
+
+ /**
+ * @param {any=} reason
+ * @returns {void}
+ */
+ error(reason) {
+ transformStreamDefaultControllerError(this, reason);
+ }
+
+ /** @returns {void} */
+ terminate() {
+ transformStreamDefaultControllerTerminate(this);
+ }
+ }
+
+ /** @template W */
+ class WritableStream {
+ /** @type {boolean} */
+ [_backpressure];
+ /** @type {Deferred<void> | undefined} */
+ [_closeRequest];
+ /** @type {WritableStreamDefaultController<W>} */
+ [_controller];
+ /** @type {boolean} */
+ [_detached];
+ /** @type {Deferred<void> | undefined} */
+ [_inFlightWriteRequest];
+ /** @type {Deferred<void> | undefined} */
+ [_inFlightCloseRequest];
+ /** @type {PendingAbortRequest | undefined} */
+ [_pendingAbortRequest];
+ /** @type {"writable" | "closed" | "erroring" | "errored"} */
+ [_state];
+ /** @type {any} */
+ [_storedError];
+ /** @type {WritableStreamDefaultWriter<W>} */
+ [_writer];
+ /** @type {Deferred<void>[]} */
+ [_writeRequests];
+
+ /**
+ * @param {UnderlyingSink<W>=} underlyingSink
+ * @param {QueuingStrategy<W>=} strategy
+ */
+ constructor(underlyingSink = null, strategy = {}) {
+ const underlyingSinkDict = convertUnderlyingSink(underlyingSink);
+ if (underlyingSinkDict.type != null) {
+ throw new RangeError(
+ 'WritableStream does not support "type" in the underlying sink.',
+ );
+ }
+ initializeWritableStream(this);
+ const sizeAlgorithm = extractSizeAlgorithm(strategy);
+ const highWaterMark = extractHighWaterMark(strategy, 1);
+ setUpWritableStreamDefaultControllerFromUnderlyingSink(
+ this,
+ underlyingSink,
+ underlyingSinkDict,
+ highWaterMark,
+ sizeAlgorithm,
+ );
+ }
+
+ /** @returns {boolean} */
+ get locked() {
+ return isWritableStreamLocked(this);
+ }
+
+ /**
+ * @param {any=} reason
+ * @returns {Promise<void>}
+ */
+ abort(reason) {
+ if (isWritableStreamLocked(this)) {
+ return Promise.reject(
+ new TypeError(
+ "The writable stream is locked, therefore cannot be aborted.",
+ ),
+ );
+ }
+ return writableStreamAbort(this, reason);
+ }
+
+ /** @returns {Promise<void>} */
+ close() {
+ if (isWritableStreamLocked(this)) {
+ return Promise.reject(
+ new TypeError(
+ "The writable stream is locked, therefore cannot be closed.",
+ ),
+ );
+ }
+ if (writableStreamCloseQueuedOrInFlight(this) === true) {
+ return Promise.reject(
+ new TypeError("The writable stream is already closing."),
+ );
+ }
+ return writableStreamClose(this);
+ }
+
+ /** @returns {WritableStreamDefaultWriter<W>} */
+ getWriter() {
+ return acquireWritableStreamDefaultWriter(this);
}
[customInspect]() {
- return `${this.constructor.name} { highWaterMark: ${
- String(this.highWaterMark)
- }, size: f }`;
+ return `${this.constructor.name} ${
+ Deno.inspect({ locked: this.locked })
+ }`;
}
}
- Object.defineProperty(ByteLengthQueuingStrategy.prototype, "size", {
- enumerable: true,
- });
+ /** @template W */
+ class WritableStreamDefaultWriter {
+ /** @type {Deferred<void>} */
+ [_closedPromise];
+
+ /** @type {Deferred<void>} */
+ [_readyPromise];
+
+ /** @type {WritableStream<W>} */
+ [_stream];
+
+ constructor(stream) {
+ setUpWritableStreamDefaultWriter(this, stream);
+ }
+
+ /** @returns {Promise<void>} */
+ get closed() {
+ return this[_closedPromise].promise;
+ }
+
+ /** @returns {number} */
+ get desiredSize() {
+ if (this[_stream] === undefined) {
+ throw new TypeError(
+ "A writable stream is not associated with the writer.",
+ );
+ }
+ return writableStreamDefaultWriterGetDesiredSize(this);
+ }
+
+ /** @returns {Promise<void>} */
+ get ready() {
+ return this[_readyPromise].promise;
+ }
+
+ /**
+ * @param {any} reason
+ * @returns {Promise<void>}
+ */
+ abort(reason) {
+ if (this[_stream] === undefined) {
+ return Promise.reject(
+ new TypeError("A writable stream is not associated with the writer."),
+ );
+ }
+ return writableStreamDefaultWriterAbort(this, reason);
+ }
+
+ /** @returns {Promise<void>} */
+ close() {
+ const stream = this[_stream];
+ if (stream === undefined) {
+ return Promise.reject(
+ new TypeError("A writable stream is not associated with the writer."),
+ );
+ }
+ if (writableStreamCloseQueuedOrInFlight(stream) === true) {
+ return Promise.reject(
+ new TypeError("The associated stream is already closing."),
+ );
+ }
+ return writableStreamDefaultWriterClose(this);
+ }
+
+ /** @returns {void} */
+ releaseLock() {
+ const stream = this[_stream];
+ if (stream === undefined) {
+ return;
+ }
+ assert(stream[_writer] !== undefined);
+ writableStreamDefaultWriterRelease(this);
+ }
+
+ /**
+ * @param {W} chunk
+ * @returns {Promise<void>}
+ */
+ write(chunk) {
+ if (this[_stream] === undefined) {
+ return Promise.reject(
+ new TypeError("A writable stream is not associate with the writer."),
+ );
+ }
+ return writableStreamDefaultWriterWrite(this, chunk);
+ }
+ }
+
+ /** @template W */
+ class WritableStreamDefaultController {
+ /** @type {(reason?: any) => Promise<void>} */
+ [_abortAlgorithm];
+ /** @type {() => Promise<void>} */
+ [_closeAlgorithm];
+ /** @type {ValueWithSize<W | _close>[]} */
+ [_queue];
+ /** @type {number} */
+ [_queueTotalSize];
+ /** @type {boolean} */
+ [_started];
+ /** @type {number} */
+ [_strategyHWM];
+ /** @type {(chunk: W) => number} */
+ [_strategySizeAlgorithm];
+ /** @type {WritableStream<W>} */
+ [_stream];
+ /** @type {(chunk: W, controller: this) => Promise<void>} */
+ [_writeAlgorithm];
+
+ /**
+ * @param {any=} e
+ * @returns {void}
+ */
+ error(e) {
+ const state = this[_stream][_state];
+ if (state !== "writable") {
+ return;
+ }
+ writableStreamDefaultControllerError(this, e);
+ }
+
+ /**
+ * @param {any=} reason
+ * @returns {Promise<void>}
+ */
+ [_abortSteps](reason) {
+ const result = this[_abortAlgorithm](reason);
+ writableStreamDefaultControllerClearAlgorithms(this);
+ return result;
+ }
+
+ [_errorSteps]() {
+ resetQueue(this);
+ }
+ }
window.__bootstrap.streams = {
+ // Non-Public
+ isReadableStreamDisturbed,
+ // Exposed in global runtime scope
+ ByteLengthQueuingStrategy,
+ CountQueuingStrategy,
ReadableStream,
+ ReadableStreamDefaultReader,
TransformStream,
WritableStream,
- isReadableStreamDisturbed,
- CountQueuingStrategy,
- ByteLengthQueuingStrategy,
+ WritableStreamDefaultWriter,
};
})(this);