summaryrefslogtreecommitdiff
path: root/ext/web/06_streams.js
diff options
context:
space:
mode:
authorMarcos Casagrande <marcos@denode.com>2023-10-13 14:30:09 +0200
committerGitHub <noreply@github.com>2023-10-13 14:30:09 +0200
commit7599990a4fe66bacdc00c12cbe3b8e800160bf68 (patch)
tree2bcf82e0fa0ebbadcd5cb51b609b7e41aea054ac /ext/web/06_streams.js
parent5da1bd802ca33cded6e5efad8dcd2155e448993c (diff)
perf(ext/streams): optimize streams (#20649)
This PR introduces several optimizations to streams ### Highlights: - `ReadableStream` constructor: +20% iter/s. - `WritableStream` constructor: +50% iter/s. - `TransformStream` constructor: +30% iter/s. - `ReadableStream` iterator (both 2 and 20 chunks): +42% and +25% iter/s. - `ReadableByteStream` iterator (both 2 and 20 chunks): +39% and +20% iter/s. ### Benchmarks **main** ``` cpu: 13th Gen Intel(R) Core(TM) i9-13900H runtime: deno 1.37.0 (x86_64-unknown-linux-gnu) benchmark time (avg) iter/s (min … max) p75 p99 p995 ----------------------------------------------------------------------------------------------- ----------------------------- ReadableStream constructor 294.52 ns/iter 3,395,392.9 (277.92 ns … 618.26 ns) 292.66 ns 353.87 ns 618.26 ns WritableStream constructor 235.51 ns/iter 4,246,065.3 (213.04 ns … 306.35 ns) 236.77 ns 279.08 ns 281.32 ns TransformStream constructor 672.52 ns/iter 1,486,938.7 (652.15 ns … 880.74 ns) 670.11 ns 880.74 ns 880.74 ns ReadableStream - iterator (2 chunks) 10.44 µs/iter 95,757.9 (8.97 µs … 830.91 µs) 10.22 µs 14.74 µs 18.93 µs ReadableStream - iterator (20 chunks) 21.93 µs/iter 45,593.4 (18.8 µs … 864.97 µs) 20.57 µs 57.15 µs 137.16 µs ReadableStream - reader (2 chunks) 7.09 µs/iter 140,987.2 (7.03 µs … 7.18 µs) 7.13 µs 7.18 µs 7.18 µs ReadableStream - reader (20 chunks) 18.41 µs/iter 54,324.2 (15.7 µs … 252.7 µs) 17.14 µs 68.88 µs 94.08 µs ReadableByteStream - iterator (2 chunks) 11.06 µs/iter 90,375.1 (9.75 µs … 404.69 µs) 10.88 µs 16.6 µs 29.69 µs ReadableByteStream - iterator (20 chunks) 26.71 µs/iter 37,435.0 (22.98 µs … 508.34 µs) 25.25 µs 85.28 µs 155.65 µs ReadableByteStream - reader (2 chunks) 7.99 µs/iter 125,131.1 (7.92 µs … 8.13 µs) 8.01 µs 8.13 µs 8.13 µs ReadableByteStream - reader (20 chunks) 23.46 µs/iter 42,618.5 (20.28 µs … 414.66 µs) 21.94 µs 90.52 µs 147.38 µs ``` **this PR** ``` cpu: 13th Gen Intel(R) Core(TM) i9-13900H runtime: deno 1.37.0 (x86_64-unknown-linux-gnu) benchmark time (avg) iter/s (min … max) p75 p99 p995 ----------------------------------------------------------------------------------------------- ----------------------------- ReadableStream constructor 235.48 ns/iter 4,246,584.3 (223.12 ns … 504.65 ns) 234.3 ns 290.84 ns 311.12 ns WritableStream constructor 156.31 ns/iter 6,397,537.3 (148.54 ns … 211.13 ns) 157.49 ns 199.82 ns 208.23 ns TransformStream constructor 471.29 ns/iter 2,121,815.3 (452.53 ns … 791.41 ns) 468.62 ns 540.36 ns 791.41 ns ReadableStream - iterator (2 chunks) 7.32 µs/iter 136,705.4 (6.35 µs … 639.97 µs) 7.1 µs 12.12 µs 20.98 µs ReadableStream - iterator (20 chunks) 17.48 µs/iter 57,195.1 (14.48 µs … 289.06 µs) 16.06 µs 76.98 µs 114.61 µs ReadableStream - reader (2 chunks) 6.86 µs/iter 145,847.9 (6.8 µs … 6.97 µs) 6.88 µs 6.97 µs 6.97 µs ReadableStream - reader (20 chunks) 16.88 µs/iter 59,227.7 (14.04 µs … 311.29 µs) 15.39 µs 74.95 µs 97.45 µs ReadableByteStream - iterator (2 chunks) 7.94 µs/iter 125,881.2 (6.86 µs … 811.16 µs) 7.69 µs 11.43 µs 16.6 µs ReadableByteStream - iterator (20 chunks) 22.23 µs/iter 44,978.2 (18.98 µs … 590.11 µs) 20.73 µs 45.13 µs 159.8 µs ReadableByteStream - reader (2 chunks) 7.4 µs/iter 135,206.9 (7.36 µs … 7.42 µs) 7.4 µs 7.42 µs 7.42 µs ReadableByteStream - reader (20 chunks) 21.03 µs/iter 47,555.6 (17.75 µs … 357.66 µs) 19.52 µs 98.69 µs 146.5 µs ``` --------- Co-authored-by: Luca Casonato <hello@lcas.dev>
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r--ext/web/06_streams.js465
1 files changed, 271 insertions, 194 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index 38581ac79..66be90a61 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -218,6 +218,50 @@ function uponPromise(promise, onFulfilled, onRejected) {
);
}
+class Queue {
+ #head = null;
+ #tail = null;
+ #size = 0;
+
+ enqueue(value) {
+ const node = { value, next: null };
+ if (this.#head === null) {
+ this.#head = node;
+ this.#tail = node;
+ } else {
+ this.#tail.next = node;
+ this.#tail = node;
+ }
+ return ++this.#size;
+ }
+
+ dequeue() {
+ const node = this.#head;
+ if (node === null) {
+ return null;
+ }
+
+ if (this.#head === this.#tail) {
+ this.#tail = null;
+ }
+ this.#head = this.#head.next;
+ this.#size--;
+ return node.value;
+ }
+
+ peek() {
+ if (this.#head === null) {
+ return null;
+ }
+
+ return this.#head.value;
+ }
+
+ get size() {
+ return this.#size;
+ }
+}
+
/**
* @param {ArrayBufferLike} O
* @returns {boolean}
@@ -351,6 +395,17 @@ const _writable = Symbol("[[writable]]");
const _writeAlgorithm = Symbol("[[writeAlgorithm]]");
const _writer = Symbol("[[writer]]");
const _writeRequests = Symbol("[[writeRequests]]");
+const _brand = webidl.brand;
+
+function noop() {}
+async function noopAsync() {}
+const _defaultStartAlgorithm = noop;
+const _defaultWriteAlgorithm = noopAsync;
+const _defaultCloseAlgorithm = noopAsync;
+const _defaultAbortAlgorithm = noopAsync;
+const _defaultPullAlgorithm = noopAsync;
+const _defaultFlushAlgorithm = noopAsync;
+const _defaultCancelAlgorithm = noopAsync;
/**
* @template R
@@ -358,7 +413,9 @@ const _writeRequests = Symbol("[[writeRequests]]");
* @returns {ReadableStreamDefaultReader<R>}
*/
function acquireReadableStreamDefaultReader(stream) {
- return new ReadableStreamDefaultReader(stream);
+ const reader = new ReadableStreamDefaultReader(_brand);
+ setUpReadableStreamDefaultReader(reader, stream);
+ return reader;
}
/**
@@ -367,7 +424,7 @@ function acquireReadableStreamDefaultReader(stream) {
* @returns {ReadableStreamBYOBReader<R>}
*/
function acquireReadableStreamBYOBReader(stream) {
- const reader = webidl.createBranded(ReadableStreamBYOBReader);
+ const reader = new ReadableStreamBYOBReader(_brand);
setUpReadableStreamBYOBReader(reader, stream);
return reader;
}
@@ -399,9 +456,9 @@ function createReadableStream(
) {
assert(isNonNegativeNumber(highWaterMark));
/** @type {ReadableStream} */
- const stream = webidl.createBranded(ReadableStream);
+ const stream = new ReadableStream(_brand);
initializeReadableStream(stream);
- const controller = webidl.createBranded(ReadableStreamDefaultController);
+ const controller = new ReadableStreamDefaultController(_brand);
setUpReadableStreamDefaultController(
stream,
controller,
@@ -433,9 +490,9 @@ function createWritableStream(
sizeAlgorithm,
) {
assert(isNonNegativeNumber(highWaterMark));
- const stream = webidl.createBranded(WritableStream);
+ const stream = new WritableStream(_brand);
initializeWritableStream(stream);
- const controller = webidl.createBranded(WritableStreamDefaultController);
+ const controller = new WritableStreamDefaultController(_brand);
setUpWritableStreamDefaultController(
stream,
controller,
@@ -455,19 +512,15 @@ function createWritableStream(
* @returns {T}
*/
function dequeueValue(container) {
- assert(
- ReflectHas(container, _queue) &&
- ReflectHas(container, _queueTotalSize),
- );
- assert(container[_queue].length);
- const valueWithSize = ArrayPrototypeShift(container[_queue]);
+ assert(container[_queue] && typeof container[_queueTotalSize] === "number");
+ assert(container[_queue].size);
+ const valueWithSize = container[_queue].dequeue();
container[_queueTotalSize] -= valueWithSize.size;
if (container[_queueTotalSize] < 0) {
container[_queueTotalSize] = 0;
}
return valueWithSize.value;
}
-
/**
* @template T
* @param {{ [_queue]: Array<ValueWithSize<T | _close>>, [_queueTotalSize]: number }} container
@@ -476,17 +529,14 @@ function dequeueValue(container) {
* @returns {void}
*/
function enqueueValueWithSize(container, value, size) {
- assert(
- ReflectHas(container, _queue) &&
- ReflectHas(container, _queueTotalSize),
- );
+ assert(container[_queue] && typeof container[_queueTotalSize] === "number");
if (isNonNegativeNumber(size) === false) {
throw RangeError("chunk size isn't a positive number");
}
if (size === Infinity) {
throw RangeError("chunk size is invalid");
}
- ArrayPrototypePush(container[_queue], { value, size });
+ container[_queue].enqueue({ value, size });
container[_queueTotalSize] += size;
}
@@ -537,9 +587,9 @@ function createReadableByteStream(
pullAlgorithm,
cancelAlgorithm,
) {
- const stream = webidl.createBranded(ReadableStream);
+ const stream = new ReadableStream(_brand);
initializeReadableStream(stream);
- const controller = webidl.createBranded(ReadableByteStreamController);
+ const controller = new ReadableByteStreamController(_brand);
setUpReadableByteStreamController(
stream,
controller,
@@ -646,16 +696,7 @@ function initializeWritableStream(stream) {
* @returns {v is number}
*/
function isNonNegativeNumber(v) {
- if (typeof v !== "number") {
- return false;
- }
- if (NumberIsNaN(v)) {
- return false;
- }
- if (v < 0) {
- return false;
- }
- return true;
+ return typeof v === "number" && v >= 0;
}
/**
@@ -663,8 +704,7 @@ function isNonNegativeNumber(v) {
* @returns {value is ReadableStream}
*/
function isReadableStream(value) {
- return !(typeof value !== "object" || value === null ||
- !ReflectHas(value, _controller));
+ return !(typeof value !== "object" || value === null || !value[_controller]);
}
/**
@@ -672,10 +712,7 @@ function isReadableStream(value) {
* @returns {boolean}
*/
function isReadableStreamLocked(stream) {
- if (stream[_reader] === undefined) {
- return false;
- }
- return true;
+ return stream[_reader] !== undefined;
}
/**
@@ -684,7 +721,7 @@ function isReadableStreamLocked(stream) {
*/
function isReadableStreamDefaultReader(value) {
return !(typeof value !== "object" || value === null ||
- !ReflectHas(value, _readRequests));
+ !value[_readRequests]);
}
/**
@@ -873,7 +910,7 @@ const _original = Symbol("[[original]]");
* @returns {ReadableStream<Uint8Array>}
*/
function readableStreamForRid(rid, autoClose = true) {
- const stream = webidl.createBranded(ReadableStream);
+ const stream = new ReadableStream(_brand);
stream[_resourceBacking] = { rid, autoClose };
const tryClose = () => {
@@ -943,7 +980,7 @@ const _isUnref = Symbol("isUnref");
* @returns {ReadableStream<Uint8Array>}
*/
function readableStreamForRidUnrefable(rid) {
- const stream = webidl.createBranded(ReadableStream);
+ const stream = new ReadableStream(_brand);
stream[promiseIdSymbol] = undefined;
stream[_isUnref] = false;
stream[_resourceBackingUnrefable] = { rid, autoClose: true };
@@ -1094,7 +1131,7 @@ async function readableStreamCollectIntoUint8Array(stream) {
* @returns {ReadableStream<Uint8Array>}
*/
function writableStreamForRid(rid, autoClose = true) {
- const stream = webidl.createBranded(WritableStream);
+ const stream = new WritableStream(_brand);
stream[_resourceBacking] = { rid, autoClose };
const tryClose = () => {
@@ -1173,11 +1210,11 @@ function isWritableStreamLocked(stream) {
*/
function peekQueueValue(container) {
assert(
- ReflectHas(container, _queue) &&
- ReflectHas(container, _queueTotalSize),
+ container[_queue] &&
+ typeof container[_queueTotalSize] === "number",
);
- assert(container[_queue].length);
- const valueWithSize = container[_queue][0];
+ assert(container[_queue].size);
+ const valueWithSize = container[_queue].peek();
return valueWithSize.value;
}
@@ -1347,7 +1384,7 @@ function readableByteStreamControllerEnqueue(controller, chunk) {
byteLength,
);
} else {
- assert(controller[_queue].length === 0);
+ assert(controller[_queue].size === 0);
if (controller[_pendingPullIntos].length !== 0) {
assert(controller[_pendingPullIntos][0].readerType === "default");
readableByteStreamControllerShiftPendingPullInto(controller);
@@ -1394,7 +1431,7 @@ function readableByteStreamControllerEnqueueChunkToQueue(
byteOffset,
byteLength,
) {
- ArrayPrototypePush(controller[_queue], { buffer, byteOffset, byteLength });
+ controller[_queue].enqueue({ buffer, byteOffset, byteLength });
controller[_queueTotalSize] += byteLength;
}
@@ -1476,7 +1513,7 @@ function readableByteStreamControllerGetBYOBRequest(controller) {
// deno-lint-ignore prefer-primordials
firstDescriptor.byteLength - firstDescriptor.bytesFilled,
);
- const byobRequest = webidl.createBranded(ReadableStreamBYOBRequest);
+ const byobRequest = new ReadableStreamBYOBRequest(_brand);
byobRequest[_controller] = controller;
byobRequest[_view] = view;
controller[_byobRequest] = byobRequest;
@@ -1504,7 +1541,7 @@ function readableByteStreamControllerGetDesiredSize(controller) {
* @returns {void}
*/
function resetQueue(container) {
- container[_queue] = [];
+ container[_queue] = new Queue();
container[_queueTotalSize] = 0;
}
@@ -1564,7 +1601,7 @@ function readableByteStreamControllerShouldCallPull(controller) {
function readableStreamAddReadRequest(stream, readRequest) {
assert(isReadableStreamDefaultReader(stream[_reader]));
assert(stream[_state] === "readable");
- ArrayPrototypePush(stream[_reader][_readRequests], readRequest);
+ stream[_reader][_readRequests].enqueue(readRequest);
}
/**
@@ -1587,7 +1624,7 @@ function readableStreamAddReadIntoRequest(stream, readRequest) {
function readableStreamCancel(stream, reason) {
stream[_disturbed] = true;
if (stream[_state] === "closed") {
- return resolvePromiseWith(undefined);
+ return PromiseResolve(undefined);
}
if (stream[_state] === "errored") {
return PromiseReject(stream[_storedError]);
@@ -1604,7 +1641,7 @@ function readableStreamCancel(stream, reason) {
}
/** @type {Promise<void>} */
const sourceCancelPromise = stream[_controller][_cancelSteps](reason);
- return PromisePrototypeThen(sourceCancelPromise, () => undefined);
+ return PromisePrototypeThen(sourceCancelPromise, noop);
}
/**
@@ -1623,9 +1660,8 @@ function readableStreamClose(stream) {
if (isReadableStreamDefaultReader(reader)) {
/** @type {Array<ReadRequest<R>>} */
const readRequests = reader[_readRequests];
- reader[_readRequests] = [];
- for (let i = 0; i < readRequests.length; ++i) {
- const readRequest = readRequests[i];
+ while (readRequests.size !== 0) {
+ const readRequest = readRequests.dequeue();
readRequest.closeSteps();
}
}
@@ -1676,11 +1712,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) {
*/
function readableStreamDefaultControllerCanCloseOrEnqueue(controller) {
const state = controller[_stream][_state];
- if (controller[_closeRequested] === false && state === "readable") {
- return true;
- } else {
- return false;
- }
+ return controller[_closeRequested] === false && state === "readable";
}
/** @param {ReadableStreamDefaultController<any>} controller */
@@ -1699,7 +1731,7 @@ function readableStreamDefaultControllerClose(controller) {
}
const stream = controller[_stream];
controller[_closeRequested] = true;
- if (controller[_queue].length === 0) {
+ if (controller[_queue].size === 0) {
readableStreamDefaultControllerClearAlgorithms(controller);
readableStreamClose(stream);
}
@@ -1785,7 +1817,6 @@ function readableStreamDefaultcontrollerHasBackpressure(controller) {
* @returns {boolean}
*/
function readableStreamDefaultcontrollerShouldCallPull(controller) {
- const stream = controller[_stream];
if (
readableStreamDefaultControllerCanCloseOrEnqueue(controller) === false
) {
@@ -1794,6 +1825,7 @@ function readableStreamDefaultcontrollerShouldCallPull(controller) {
if (controller[_started] === false) {
return false;
}
+ const stream = controller[_stream];
if (
isReadableStreamLocked(stream) &&
readableStreamGetNumReadRequests(stream) > 0
@@ -1803,10 +1835,11 @@ function readableStreamDefaultcontrollerShouldCallPull(controller) {
const desiredSize = readableStreamDefaultControllerGetDesiredSize(
controller,
);
- assert(desiredSize !== null);
+
if (desiredSize > 0) {
return true;
}
+ assert(desiredSize !== null);
return false;
}
@@ -1846,9 +1879,8 @@ function readableStreamBYOBReaderRelease(reader) {
*/
function readableStreamDefaultReaderErrorReadRequests(reader, e) {
const readRequests = reader[_readRequests];
- reader[_readRequests] = [];
- for (let i = 0; i < readRequests.length; ++i) {
- const readRequest = readRequests[i];
+ while (readRequests.size !== 0) {
+ const readRequest = readRequests.dequeue();
readRequest.errorSteps(e);
}
}
@@ -1887,11 +1919,11 @@ function readableByteStreamControllerProcessReadRequestsUsingQueue(
) {
const reader = controller[_stream][_reader];
assert(isReadableStreamDefaultReader(reader));
- while (reader[_readRequests].length !== 0) {
+ while (reader[_readRequests].size !== 0) {
if (controller[_queueTotalSize] === 0) {
return;
}
- const readRequest = ArrayPrototypeShift(reader[_readRequests]);
+ const readRequest = reader[_readRequests].dequeue();
readableByteStreamControllerFillReadRequestFromQueue(
controller,
readRequest,
@@ -2326,7 +2358,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
}
const queue = controller[_queue];
while (totalBytesToCopyRemaining > 0) {
- const headOfQueue = queue[0];
+ const headOfQueue = queue.peek();
const bytesToCopy = MathMin(
totalBytesToCopyRemaining,
// deno-lint-ignore prefer-primordials
@@ -2353,7 +2385,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
// deno-lint-ignore prefer-primordials
if (headOfQueue.byteLength === bytesToCopy) {
- ArrayPrototypeShift(queue);
+ queue.dequeue();
} else {
headOfQueue.byteOffset += bytesToCopy;
headOfQueue.byteLength -= bytesToCopy;
@@ -2384,7 +2416,7 @@ function readableByteStreamControllerFillReadRequestFromQueue(
readRequest,
) {
assert(controller[_queueTotalSize] > 0);
- const entry = ArrayPrototypeShift(controller[_queue]);
+ const entry = controller[_queue].dequeue();
// deno-lint-ignore prefer-primordials
controller[_queueTotalSize] -= entry.byteLength;
readableByteStreamControllerHandleQueueDrain(controller);
@@ -2526,9 +2558,9 @@ function readableStreamFulfillReadRequest(stream, chunk, done) {
assert(readableStreamHasDefaultReader(stream) === true);
/** @type {ReadableStreamDefaultReader<R>} */
const reader = stream[_reader];
- assert(reader[_readRequests].length);
+ assert(reader[_readRequests].size);
/** @type {ReadRequest<R>} */
- const readRequest = ArrayPrototypeShift(reader[_readRequests]);
+ const readRequest = reader[_readRequests].dequeue();
if (done) {
readRequest.closeSteps();
} else {
@@ -2551,7 +2583,7 @@ function readableStreamGetNumReadIntoRequests(stream) {
*/
function readableStreamGetNumReadRequests(stream) {
assert(readableStreamHasDefaultReader(stream) === true);
- return stream[_reader][_readRequests].length;
+ return stream[_reader][_readRequests].size;
}
/**
@@ -2621,7 +2653,7 @@ function readableStreamPipeTo(
const writer = acquireWritableStreamDefaultWriter(dest);
source[_disturbed] = true;
let shuttingDown = false;
- let currentWrite = resolvePromiseWith(undefined);
+ let currentWrite = PromiseResolve(undefined);
/** @type {Deferred<void>} */
const promise = new Deferred();
/** @type {() => void} */
@@ -2636,7 +2668,7 @@ function readableStreamPipeTo(
if (dest[_state] === "writable") {
return writableStreamAbort(dest, error);
} else {
- return resolvePromiseWith(undefined);
+ return PromiseResolve(undefined);
}
});
}
@@ -2645,7 +2677,7 @@ function readableStreamPipeTo(
if (source[_state] === "readable") {
return readableStreamCancel(source, error);
} else {
- return resolvePromiseWith(undefined);
+ return PromiseResolve(undefined);
}
});
}
@@ -2680,7 +2712,7 @@ function readableStreamPipeTo(
/** @returns {Promise<boolean>} */
function pipeStep() {
if (shuttingDown === true) {
- return resolvePromiseWith(true);
+ return PromiseResolve(true);
}
return transformPromiseWith(writer[_readyPromise].promise, () => {
@@ -2997,7 +3029,7 @@ function readableStreamDefaultTee(stream, cloneForBranch2) {
function pullAlgorithm() {
if (reading === true) {
readAgain = true;
- return resolvePromiseWith(undefined);
+ return PromiseResolve(undefined);
}
reading = true;
/** @type {ReadRequest<R>} */
@@ -3073,7 +3105,7 @@ function readableStreamDefaultTee(stream, cloneForBranch2) {
},
};
readableStreamDefaultReaderRead(reader, readRequest);
- return resolvePromiseWith(undefined);
+ return PromiseResolve(undefined);
}
/**
@@ -3249,7 +3281,7 @@ function readableByteStreamTee(stream) {
function pullWithBYOBReader(view, forBranch2) {
if (isReadableStreamDefaultReader(reader)) {
- assert(reader[_readRequests].length === 0);
+ assert(reader[_readRequests].size === 0);
readableStreamDefaultReaderRelease(reader);
reader = acquireReadableStreamBYOBReader(stream);
forwardReaderError(reader);
@@ -3458,7 +3490,7 @@ function setUpReadableByteStreamController(
controller[_pendingPullIntos] = [];
stream[_controller] = controller;
const startResult = startAlgorithm();
- const startPromise = resolvePromiseWith(startResult);
+ const startPromise = PromiseResolve(startResult);
setPromiseIsHandledToTrue(
PromisePrototypeThen(
startPromise,
@@ -3487,13 +3519,13 @@ function setUpReadableByteStreamControllerFromUnderlyingSource(
underlyingSourceDict,
highWaterMark,
) {
- const controller = webidl.createBranded(ReadableByteStreamController);
+ const controller = new ReadableByteStreamController(_brand);
/** @type {() => void} */
- let startAlgorithm = () => undefined;
+ let startAlgorithm = _defaultStartAlgorithm;
/** @type {() => Promise<void>} */
- let pullAlgorithm = () => resolvePromiseWith(undefined);
+ let pullAlgorithm = _defaultPullAlgorithm;
/** @type {(reason: any) => Promise<void>} */
- let cancelAlgorithm = (_reason) => resolvePromiseWith(undefined);
+ let cancelAlgorithm = _defaultCancelAlgorithm;
if (underlyingSourceDict.start !== undefined) {
startAlgorithm = () =>
webidl.invokeCallbackFunction(
@@ -3574,7 +3606,7 @@ function setUpReadableStreamDefaultController(
controller[_cancelAlgorithm] = cancelAlgorithm;
stream[_controller] = controller;
const startResult = startAlgorithm(controller);
- const startPromise = resolvePromiseWith(startResult);
+ const startPromise = PromiseResolve(startResult);
uponPromise(startPromise, () => {
controller[_started] = true;
assert(controller[_pulling] === false);
@@ -3600,13 +3632,13 @@ function setUpReadableStreamDefaultControllerFromUnderlyingSource(
highWaterMark,
sizeAlgorithm,
) {
- const controller = webidl.createBranded(ReadableStreamDefaultController);
+ const controller = new ReadableStreamDefaultController(_brand);
/** @type {() => Promise<void>} */
- let startAlgorithm = () => undefined;
+ let startAlgorithm = _defaultStartAlgorithm;
/** @type {() => Promise<void>} */
- let pullAlgorithm = () => resolvePromiseWith(undefined);
+ let pullAlgorithm = _defaultPullAlgorithm;
/** @type {(reason?: any) => Promise<void>} */
- let cancelAlgorithm = () => resolvePromiseWith(undefined);
+ let cancelAlgorithm = _defaultCancelAlgorithm;
if (underlyingSourceDict.start !== undefined) {
startAlgorithm = () =>
webidl.invokeCallbackFunction(
@@ -3681,7 +3713,7 @@ function setUpReadableStreamDefaultReader(reader, stream) {
throw new TypeError("ReadableStream is locked.");
}
readableStreamReaderGenericInitialize(reader, stream);
- reader[_readRequests] = [];
+ reader[_readRequests] = new Queue();
}
/**
@@ -3721,7 +3753,7 @@ function setUpTransformStreamDefaultControllerFromTransformer(
transformerDict,
) {
/** @type {TransformStreamDefaultController<O>} */
- const controller = webidl.createBranded(TransformStreamDefaultController);
+ const controller = new TransformStreamDefaultController(_brand);
/** @type {(chunk: O, controller: TransformStreamDefaultController<O>) => Promise<void>} */
let transformAlgorithm = (chunk) => {
try {
@@ -3729,12 +3761,11 @@ function setUpTransformStreamDefaultControllerFromTransformer(
} catch (e) {
return PromiseReject(e);
}
- return resolvePromiseWith(undefined);
+ return PromiseResolve(undefined);
};
/** @type {(controller: TransformStreamDefaultController<O>) => Promise<void>} */
- let flushAlgorithm = () => resolvePromiseWith(undefined);
- /** @type {(reason: any) => Promise<void>} */
- let cancelAlgorithm = () => resolvePromiseWith(undefined);
+ let flushAlgorithm = _defaultFlushAlgorithm;
+ let cancelAlgorithm = _defaultCancelAlgorithm;
if (transformerDict.transform !== undefined) {
transformAlgorithm = (chunk, controller) =>
webidl.invokeCallbackFunction(
@@ -3842,14 +3873,14 @@ function setUpWritableStreamDefaultControllerFromUnderlyingSink(
highWaterMark,
sizeAlgorithm,
) {
- const controller = webidl.createBranded(WritableStreamDefaultController);
+ const controller = new WritableStreamDefaultController(_brand);
/** @type {(controller: WritableStreamDefaultController<W>) => any} */
- let startAlgorithm = () => undefined;
+ let startAlgorithm = _defaultStartAlgorithm;
/** @type {(chunk: W, controller: WritableStreamDefaultController<W>) => Promise<void>} */
- let writeAlgorithm = () => resolvePromiseWith(undefined);
- let closeAlgorithm = () => resolvePromiseWith(undefined);
+ let writeAlgorithm = _defaultWriteAlgorithm;
+ let closeAlgorithm = _defaultCloseAlgorithm;
/** @type {(reason?: any) => Promise<void>} */
- let abortAlgorithm = () => resolvePromiseWith(undefined);
+ let abortAlgorithm = _defaultAbortAlgorithm;
if (underlyingSinkDict.start !== undefined) {
startAlgorithm = () =>
@@ -4215,11 +4246,11 @@ function transformStreamUnblockWrite(stream) {
function writableStreamAbort(stream, reason) {
const state = stream[_state];
if (state === "closed" || state === "errored") {
- return resolvePromiseWith(undefined);
+ return PromiseResolve(undefined);
}
stream[_controller][_signal][signalAbort](reason);
if (state === "closed" || state === "errored") {
- return resolvePromiseWith(undefined);
+ return PromiseResolve(undefined);
}
if (stream[_pendingAbortRequest] !== undefined) {
return stream[_pendingAbortRequest].deferred.promise;
@@ -4329,7 +4360,7 @@ function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller) {
writableStreamFinishErroring(stream);
return;
}
- if (controller[_queue].length === 0) {
+ if (controller[_queue].size === 0) {
return;
}
const value = peekQueueValue(controller);
@@ -4415,7 +4446,7 @@ function writableStreamDefaultControllerProcessClose(controller) {
const stream = controller[_stream];
writableStreamMarkCloseRequestInFlight(stream);
dequeueValue(controller);
- assert(controller[_queue].length === 0);
+ assert(controller[_queue].size === 0);
const sinkClosePromise = controller[_closeAlgorithm]();
writableStreamDefaultControllerClearAlgorithms(controller);
uponPromise(sinkClosePromise, () => {
@@ -4515,7 +4546,7 @@ function writableStreamDefaultWriterCloseWithErrorPropagation(writer) {
if (
writableStreamCloseQueuedOrInFlight(stream) === true || state === "closed"
) {
- return resolvePromiseWith(undefined);
+ return PromiseResolve(undefined);
}
if (state === "errored") {
return PromiseReject(stream[_storedError]);
@@ -4819,6 +4850,35 @@ const asyncIteratorPrototype = ObjectGetPrototypeOf(AsyncGeneratorPrototype);
const _iteratorNext = Symbol("[[iteratorNext]]");
const _iteratorFinished = Symbol("[[iteratorFinished]]");
+class ReadableStreamAsyncIteratorReadRequest {
+ #reader;
+ #promise;
+
+ constructor(reader, promise) {
+ this.#reader = reader;
+ this.#promise = promise;
+ }
+
+ chunkSteps(chunk) {
+ this.#reader[_iteratorNext] = null;
+ this.#promise.resolve({ value: chunk, done: false });
+ }
+
+ closeSteps() {
+ this.#reader[_iteratorNext] = null;
+ this.#reader[_iteratorFinished] = true;
+ readableStreamDefaultReaderRelease(this.#reader);
+ this.#promise.resolve({ value: undefined, done: true });
+ }
+
+ errorSteps(e) {
+ this.#reader[_iteratorNext] = null;
+ this.#reader[_iteratorFinished] = true;
+ readableStreamDefaultReaderRelease(this.#reader);
+ this.#promise.reject(e);
+ }
+}
+
/** @type {AsyncIterator<unknown>} */
const readableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
/** @returns {Promise<IteratorResult<unknown>>} */
@@ -4840,41 +4900,21 @@ const readableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
/** @type {Deferred<IteratorResult<any>>} */
const promise = new Deferred();
- /** @type {ReadRequest} */
- const readRequest = {
- chunkSteps(chunk) {
- promise.resolve({ value: chunk, done: false });
- },
- closeSteps() {
- readableStreamDefaultReaderRelease(reader);
- promise.resolve({ value: undefined, done: true });
- },
- errorSteps(e) {
- readableStreamDefaultReaderRelease(reader);
- promise.reject(e);
- },
- };
+ // internal values (_iteratorNext & _iteratorFinished) are modified inside
+ // ReadableStreamAsyncIteratorReadRequest methods
+ // see: https://webidl.spec.whatwg.org/#es-default-asynchronous-iterator-object
+ const readRequest = new ReadableStreamAsyncIteratorReadRequest(
+ reader,
+ promise,
+ );
readableStreamDefaultReaderRead(reader, readRequest);
- return PromisePrototypeThen(promise.promise, (result) => {
- reader[_iteratorNext] = null;
- if (result.done === true) {
- reader[_iteratorFinished] = true;
- return { value: undefined, done: true };
- }
- return result;
- }, (reason) => {
- reader[_iteratorNext] = null;
- reader[_iteratorFinished] = true;
- throw reason;
- });
+ return PromisePrototypeThen(promise.promise);
}
- reader[_iteratorNext] = reader[_iteratorNext]
+ return reader[_iteratorNext] = reader[_iteratorNext]
? PromisePrototypeThen(reader[_iteratorNext], nextSteps, nextSteps)
: nextSteps();
-
- return reader[_iteratorNext];
},
/**
* @param {unknown} arg
@@ -4892,7 +4932,7 @@ const readableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
if (reader[_stream] === undefined) {
return PromiseResolve({ value: undefined, done: true });
}
- assert(reader[_readRequests].length === 0);
+ assert(reader[_readRequests].size === 0);
if (this[_preventCancel] === false) {
const result = readableStreamReaderGenericCancel(reader, arg);
readableStreamDefaultReaderRelease(reader);
@@ -4918,7 +4958,7 @@ class ByteLengthQueuingStrategy {
const prefix = "Failed to construct 'ByteLengthQueuingStrategy'";
webidl.requiredArguments(arguments.length, 1, prefix);
init = webidl.converters.QueuingStrategyInit(init, prefix, "Argument 1");
- this[webidl.brand] = webidl.brand;
+ this[_brand] = _brand;
this[_globalObject] = globalThis;
this[_highWaterMark] = init.highWaterMark;
}
@@ -4972,7 +5012,7 @@ class CountQueuingStrategy {
const prefix = "Failed to construct 'CountQueuingStrategy'";
webidl.requiredArguments(arguments.length, 1, prefix);
init = webidl.converters.QueuingStrategyInit(init, prefix, "Argument 1");
- this[webidl.brand] = webidl.brand;
+ this[_brand] = _brand;
this[_globalObject] = globalThis;
this[_highWaterMark] = init.highWaterMark;
}
@@ -5070,34 +5110,36 @@ class ReadableStream {
* @param {QueuingStrategy<R>=} strategy
*/
constructor(underlyingSource = undefined, strategy = undefined) {
+ if (underlyingSource === _brand) {
+ this[_brand] = _brand;
+ return;
+ }
+
const prefix = "Failed to construct 'ReadableStream'";
- if (underlyingSource !== undefined) {
- underlyingSource = webidl.converters.object(
+ underlyingSource = underlyingSource !== undefined
+ ? webidl.converters.object(
underlyingSource,
prefix,
"Argument 1",
- );
- } else {
- underlyingSource = null;
- }
- if (strategy !== undefined) {
- strategy = webidl.converters.QueuingStrategy(
+ )
+ : null;
+ strategy = strategy !== undefined
+ ? webidl.converters.QueuingStrategy(
strategy,
prefix,
"Argument 2",
- );
- } else {
- strategy = {};
- }
- this[webidl.brand] = webidl.brand;
- let underlyingSourceDict = {};
- if (underlyingSource !== undefined) {
- underlyingSourceDict = webidl.converters.UnderlyingSource(
+ )
+ : {};
+
+ const underlyingSourceDict = underlyingSource !== undefined
+ ? webidl.converters.UnderlyingSource(
underlyingSource,
prefix,
"underlyingSource",
- );
- }
+ )
+ : {};
+ this[_brand] = _brand;
+
initializeReadableStream(this);
if (underlyingSourceDict.type === "bytes") {
if (strategy.size !== undefined) {
@@ -5114,7 +5156,6 @@ class ReadableStream {
highWaterMark,
);
} else {
- assert(!(ReflectHas(underlyingSourceDict, "type")));
const sizeAlgorithm = extractSizeAlgorithm(strategy);
const highWaterMark = extractHighWaterMark(strategy, 1);
setUpReadableStreamDefaultControllerFromUnderlyingSource(
@@ -5137,7 +5178,7 @@ class ReadableStream {
const iterator = getIterator(asyncIterable, true);
- const stream = createReadableStream(() => undefined, async () => {
+ const stream = createReadableStream(noop, async () => {
// deno-lint-ignore prefer-primordials
const res = await iterator.next();
if (typeof res !== "object") {
@@ -5310,19 +5351,23 @@ class ReadableStream {
* @param {ReadableStreamIteratorOptions=} options
* @returns {AsyncIterableIterator<R>}
*/
- values(options = {}) {
+ values(options = undefined) {
webidl.assertBranded(this, ReadableStreamPrototype);
- const prefix = "Failed to execute 'values' on 'ReadableStream'";
- options = webidl.converters.ReadableStreamIteratorOptions(
- options,
- prefix,
- "Argument 1",
- );
+ let preventCancel = false;
+ if (options !== undefined) {
+ const prefix = "Failed to execute 'values' on 'ReadableStream'";
+ options = webidl.converters.ReadableStreamIteratorOptions(
+ options,
+ prefix,
+ "Argument 1",
+ );
+ preventCancel = options.preventCancel;
+ }
/** @type {AsyncIterableIterator<R>} */
const iterator = ObjectCreate(readableStreamAsyncIteratorPrototype);
const reader = acquireReadableStreamDefaultReader(this);
iterator[_reader] = reader;
- iterator[_preventCancel] = options.preventCancel;
+ iterator[_preventCancel] = preventCancel;
return iterator;
}
@@ -5357,10 +5402,14 @@ class ReadableStreamDefaultReader {
/** @param {ReadableStream<R>} stream */
constructor(stream) {
+ if (stream === _brand) {
+ this[_brand] = _brand;
+ return;
+ }
const prefix = "Failed to construct 'ReadableStreamDefaultReader'";
webidl.requiredArguments(arguments.length, 1, prefix);
stream = webidl.converters.ReadableStream(stream, prefix, "Argument 1");
- this[webidl.brand] = webidl.brand;
+ this[_brand] = _brand;
setUpReadableStreamDefaultReader(this, stream);
}
@@ -5454,10 +5503,14 @@ class ReadableStreamBYOBReader {
/** @param {ReadableStream<R>} stream */
constructor(stream) {
+ if (stream === _brand) {
+ this[_brand] = _brand;
+ return;
+ }
const prefix = "Failed to construct 'ReadableStreamBYOBReader'";
webidl.requiredArguments(arguments.length, 1, prefix);
stream = webidl.converters.ReadableStream(stream, prefix, "Argument 1");
- this[webidl.brand] = webidl.brand;
+ this[_brand] = _brand;
setUpReadableStreamBYOBReader(this, stream);
}
@@ -5491,16 +5544,19 @@ class ReadableStreamBYOBReader {
new TypeError("view must have non-zero byteLength"),
);
}
+
if (getArrayBufferByteLength(buffer) === 0) {
+ if (isDetachedBuffer(buffer)) {
+ return PromiseReject(
+ new TypeError("view's buffer has been detached"),
+ );
+ }
+
return PromiseReject(
new TypeError("view's buffer must have non-zero byteLength"),
);
}
- if (isDetachedBuffer(buffer)) {
- return PromiseReject(
- new TypeError("view's buffer has been detached"),
- );
- }
+
if (this[_stream] === undefined) {
return PromiseReject(
new TypeError("Reader has no associated stream."),
@@ -5584,8 +5640,11 @@ class ReadableStreamBYOBRequest {
return this[_view];
}
- constructor() {
- webidl.illegalConstructor();
+ constructor(brand = undefined) {
+ if (brand !== _brand) {
+ webidl.illegalConstructor();
+ }
+ this[_brand] = _brand;
}
respond(bytesWritten) {
@@ -5680,8 +5739,11 @@ class ReadableByteStreamController {
/** @type {ReadableStream<ArrayBuffer>} */
[_stream];
- constructor() {
- webidl.illegalConstructor();
+ constructor(brand = undefined) {
+ if (brand !== _brand) {
+ webidl.illegalConstructor();
+ }
+ this[_brand] = _brand;
}
/** @returns {ReadableStreamBYOBRequest | null} */
@@ -5875,8 +5937,11 @@ class ReadableStreamDefaultController {
/** @type {ReadableStream<R>} */
[_stream];
- constructor() {
- webidl.illegalConstructor();
+ constructor(brand = undefined) {
+ if (brand !== _brand) {
+ webidl.illegalConstructor();
+ }
+ this[_brand] = _brand;
}
/** @returns {number | null} */
@@ -5949,9 +6014,9 @@ class ReadableStreamDefaultController {
*/
[_pullSteps](readRequest) {
const stream = this[_stream];
- if (this[_queue].length) {
+ if (this[_queue].size) {
const chunk = dequeueValue(this);
- if (this[_closeRequested] && this[_queue].length === 0) {
+ if (this[_closeRequested] && this[_queue].size === 0) {
readableStreamDefaultControllerClearAlgorithms(this);
readableStreamClose(stream);
} else {
@@ -6015,7 +6080,7 @@ class TransformStream {
prefix,
"Argument 3",
);
- this[webidl.brand] = webidl.brand;
+ this[_brand] = _brand;
if (transformer === undefined) {
transformer = null;
}
@@ -6103,8 +6168,11 @@ class TransformStreamDefaultController {
/** @type {(chunk: O, controller: this) => Promise<void>} */
[_transformAlgorithm];
- constructor() {
- webidl.illegalConstructor();
+ constructor(brand = undefined) {
+ if (brand !== _brand) {
+ webidl.illegalConstructor();
+ }
+ this[_brand] = _brand;
}
/** @returns {number | null} */
@@ -6191,7 +6259,11 @@ class WritableStream {
* @param {UnderlyingSink<W>=} underlyingSink
* @param {QueuingStrategy<W>=} strategy
*/
- constructor(underlyingSink = undefined, strategy = {}) {
+ constructor(underlyingSink = undefined, strategy = undefined) {
+ if (underlyingSink === _brand) {
+ this[_brand] = _brand;
+ return;
+ }
const prefix = "Failed to construct 'WritableStream'";
if (underlyingSink !== undefined) {
underlyingSink = webidl.converters.object(
@@ -6200,12 +6272,14 @@ class WritableStream {
"Argument 1",
);
}
- strategy = webidl.converters.QueuingStrategy(
- strategy,
- prefix,
- "Argument 2",
- );
- this[webidl.brand] = webidl.brand;
+ strategy = strategy !== undefined
+ ? webidl.converters.QueuingStrategy(
+ strategy,
+ prefix,
+ "Argument 2",
+ )
+ : {};
+ this[_brand] = _brand;
if (underlyingSink === undefined) {
underlyingSink = null;
}
@@ -6314,7 +6388,7 @@ class WritableStreamDefaultWriter {
const prefix = "Failed to construct 'WritableStreamDefaultWriter'";
webidl.requiredArguments(arguments.length, 1, prefix);
stream = webidl.converters.WritableStream(stream, prefix, "Argument 1");
- this[webidl.brand] = webidl.brand;
+ this[_brand] = _brand;
setUpWritableStreamDefaultWriter(this, stream);
}
@@ -6471,8 +6545,11 @@ class WritableStreamDefaultController {
return this[_signal];
}
- constructor() {
- webidl.illegalConstructor();
+ constructor(brand = undefined) {
+ if (brand !== _brand) {
+ webidl.illegalConstructor();
+ }
+ this[_brand] = _brand;
}
/**