diff options
author | Marcos Casagrande <marcos@denode.com> | 2023-10-13 14:30:09 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-13 14:30:09 +0200 |
commit | 7599990a4fe66bacdc00c12cbe3b8e800160bf68 (patch) | |
tree | 2bcf82e0fa0ebbadcd5cb51b609b7e41aea054ac /ext/web/06_streams.js | |
parent | 5da1bd802ca33cded6e5efad8dcd2155e448993c (diff) |
perf(ext/streams): optimize streams (#20649)
This PR introduces several optimizations to streams
### Highlights:
- `ReadableStream` constructor: +20% iter/s.
- `WritableStream` constructor: +50% iter/s.
- `TransformStream` constructor: +30% iter/s.
- `ReadableStream` iterator (both 2 and 20 chunks): +42% and +25%
iter/s.
- `ReadableByteStream` iterator (both 2 and 20 chunks): +39% and +20%
iter/s.
### Benchmarks
**main**
```
cpu: 13th Gen Intel(R) Core(TM) i9-13900H
runtime: deno 1.37.0 (x86_64-unknown-linux-gnu)
benchmark time (avg) iter/s (min … max) p75 p99 p995
----------------------------------------------------------------------------------------------- -----------------------------
ReadableStream constructor 294.52 ns/iter 3,395,392.9 (277.92 ns … 618.26 ns) 292.66 ns 353.87 ns 618.26 ns
WritableStream constructor 235.51 ns/iter 4,246,065.3 (213.04 ns … 306.35 ns) 236.77 ns 279.08 ns 281.32 ns
TransformStream constructor 672.52 ns/iter 1,486,938.7 (652.15 ns … 880.74 ns) 670.11 ns 880.74 ns 880.74 ns
ReadableStream - iterator (2 chunks) 10.44 µs/iter 95,757.9 (8.97 µs … 830.91 µs) 10.22 µs 14.74 µs 18.93 µs
ReadableStream - iterator (20 chunks) 21.93 µs/iter 45,593.4 (18.8 µs … 864.97 µs) 20.57 µs 57.15 µs 137.16 µs
ReadableStream - reader (2 chunks) 7.09 µs/iter 140,987.2 (7.03 µs … 7.18 µs) 7.13 µs 7.18 µs 7.18 µs
ReadableStream - reader (20 chunks) 18.41 µs/iter 54,324.2 (15.7 µs … 252.7 µs) 17.14 µs 68.88 µs 94.08 µs
ReadableByteStream - iterator (2 chunks) 11.06 µs/iter 90,375.1 (9.75 µs … 404.69 µs) 10.88 µs 16.6 µs 29.69 µs
ReadableByteStream - iterator (20 chunks) 26.71 µs/iter 37,435.0 (22.98 µs … 508.34 µs) 25.25 µs 85.28 µs 155.65 µs
ReadableByteStream - reader (2 chunks) 7.99 µs/iter 125,131.1 (7.92 µs … 8.13 µs) 8.01 µs 8.13 µs 8.13 µs
ReadableByteStream - reader (20 chunks) 23.46 µs/iter 42,618.5 (20.28 µs … 414.66 µs) 21.94 µs 90.52 µs 147.38 µs
```
**this PR**
```
cpu: 13th Gen Intel(R) Core(TM) i9-13900H
runtime: deno 1.37.0 (x86_64-unknown-linux-gnu)
benchmark time (avg) iter/s (min … max) p75 p99 p995
----------------------------------------------------------------------------------------------- -----------------------------
ReadableStream constructor 235.48 ns/iter 4,246,584.3 (223.12 ns … 504.65 ns) 234.3 ns 290.84 ns 311.12 ns
WritableStream constructor 156.31 ns/iter 6,397,537.3 (148.54 ns … 211.13 ns) 157.49 ns 199.82 ns 208.23 ns
TransformStream constructor 471.29 ns/iter 2,121,815.3 (452.53 ns … 791.41 ns) 468.62 ns 540.36 ns 791.41 ns
ReadableStream - iterator (2 chunks) 7.32 µs/iter 136,705.4 (6.35 µs … 639.97 µs) 7.1 µs 12.12 µs 20.98 µs
ReadableStream - iterator (20 chunks) 17.48 µs/iter 57,195.1 (14.48 µs … 289.06 µs) 16.06 µs 76.98 µs 114.61 µs
ReadableStream - reader (2 chunks) 6.86 µs/iter 145,847.9 (6.8 µs … 6.97 µs) 6.88 µs 6.97 µs 6.97 µs
ReadableStream - reader (20 chunks) 16.88 µs/iter 59,227.7 (14.04 µs … 311.29 µs) 15.39 µs 74.95 µs 97.45 µs
ReadableByteStream - iterator (2 chunks) 7.94 µs/iter 125,881.2 (6.86 µs … 811.16 µs) 7.69 µs 11.43 µs 16.6 µs
ReadableByteStream - iterator (20 chunks) 22.23 µs/iter 44,978.2 (18.98 µs … 590.11 µs) 20.73 µs 45.13 µs 159.8 µs
ReadableByteStream - reader (2 chunks) 7.4 µs/iter 135,206.9 (7.36 µs … 7.42 µs) 7.4 µs 7.42 µs 7.42 µs
ReadableByteStream - reader (20 chunks) 21.03 µs/iter 47,555.6 (17.75 µs … 357.66 µs) 19.52 µs 98.69 µs 146.5 µs
```
---------
Co-authored-by: Luca Casonato <hello@lcas.dev>
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r-- | ext/web/06_streams.js | 465 |
1 files changed, 271 insertions, 194 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 38581ac79..66be90a61 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -218,6 +218,50 @@ function uponPromise(promise, onFulfilled, onRejected) { ); } +class Queue { + #head = null; + #tail = null; + #size = 0; + + enqueue(value) { + const node = { value, next: null }; + if (this.#head === null) { + this.#head = node; + this.#tail = node; + } else { + this.#tail.next = node; + this.#tail = node; + } + return ++this.#size; + } + + dequeue() { + const node = this.#head; + if (node === null) { + return null; + } + + if (this.#head === this.#tail) { + this.#tail = null; + } + this.#head = this.#head.next; + this.#size--; + return node.value; + } + + peek() { + if (this.#head === null) { + return null; + } + + return this.#head.value; + } + + get size() { + return this.#size; + } +} + /** * @param {ArrayBufferLike} O * @returns {boolean} @@ -351,6 +395,17 @@ const _writable = Symbol("[[writable]]"); const _writeAlgorithm = Symbol("[[writeAlgorithm]]"); const _writer = Symbol("[[writer]]"); const _writeRequests = Symbol("[[writeRequests]]"); +const _brand = webidl.brand; + +function noop() {} +async function noopAsync() {} +const _defaultStartAlgorithm = noop; +const _defaultWriteAlgorithm = noopAsync; +const _defaultCloseAlgorithm = noopAsync; +const _defaultAbortAlgorithm = noopAsync; +const _defaultPullAlgorithm = noopAsync; +const _defaultFlushAlgorithm = noopAsync; +const _defaultCancelAlgorithm = noopAsync; /** * @template R @@ -358,7 +413,9 @@ const _writeRequests = Symbol("[[writeRequests]]"); * @returns {ReadableStreamDefaultReader<R>} */ function acquireReadableStreamDefaultReader(stream) { - return new ReadableStreamDefaultReader(stream); + const reader = new ReadableStreamDefaultReader(_brand); + setUpReadableStreamDefaultReader(reader, stream); + return reader; } /** @@ -367,7 +424,7 @@ function acquireReadableStreamDefaultReader(stream) { * @returns {ReadableStreamBYOBReader<R>} */ function acquireReadableStreamBYOBReader(stream) { - const reader = webidl.createBranded(ReadableStreamBYOBReader); + const reader = new ReadableStreamBYOBReader(_brand); setUpReadableStreamBYOBReader(reader, stream); return reader; } @@ -399,9 +456,9 @@ function createReadableStream( ) { assert(isNonNegativeNumber(highWaterMark)); /** @type {ReadableStream} */ - const stream = webidl.createBranded(ReadableStream); + const stream = new ReadableStream(_brand); initializeReadableStream(stream); - const controller = webidl.createBranded(ReadableStreamDefaultController); + const controller = new ReadableStreamDefaultController(_brand); setUpReadableStreamDefaultController( stream, controller, @@ -433,9 +490,9 @@ function createWritableStream( sizeAlgorithm, ) { assert(isNonNegativeNumber(highWaterMark)); - const stream = webidl.createBranded(WritableStream); + const stream = new WritableStream(_brand); initializeWritableStream(stream); - const controller = webidl.createBranded(WritableStreamDefaultController); + const controller = new WritableStreamDefaultController(_brand); setUpWritableStreamDefaultController( stream, controller, @@ -455,19 +512,15 @@ function createWritableStream( * @returns {T} */ function dequeueValue(container) { - assert( - ReflectHas(container, _queue) && - ReflectHas(container, _queueTotalSize), - ); - assert(container[_queue].length); - const valueWithSize = ArrayPrototypeShift(container[_queue]); + assert(container[_queue] && typeof container[_queueTotalSize] === "number"); + assert(container[_queue].size); + const valueWithSize = container[_queue].dequeue(); container[_queueTotalSize] -= valueWithSize.size; if (container[_queueTotalSize] < 0) { container[_queueTotalSize] = 0; } return valueWithSize.value; } - /** * @template T * @param {{ [_queue]: Array<ValueWithSize<T | _close>>, [_queueTotalSize]: number }} container @@ -476,17 +529,14 @@ function dequeueValue(container) { * @returns {void} */ function enqueueValueWithSize(container, value, size) { - assert( - ReflectHas(container, _queue) && - ReflectHas(container, _queueTotalSize), - ); + assert(container[_queue] && typeof container[_queueTotalSize] === "number"); if (isNonNegativeNumber(size) === false) { throw RangeError("chunk size isn't a positive number"); } if (size === Infinity) { throw RangeError("chunk size is invalid"); } - ArrayPrototypePush(container[_queue], { value, size }); + container[_queue].enqueue({ value, size }); container[_queueTotalSize] += size; } @@ -537,9 +587,9 @@ function createReadableByteStream( pullAlgorithm, cancelAlgorithm, ) { - const stream = webidl.createBranded(ReadableStream); + const stream = new ReadableStream(_brand); initializeReadableStream(stream); - const controller = webidl.createBranded(ReadableByteStreamController); + const controller = new ReadableByteStreamController(_brand); setUpReadableByteStreamController( stream, controller, @@ -646,16 +696,7 @@ function initializeWritableStream(stream) { * @returns {v is number} */ function isNonNegativeNumber(v) { - if (typeof v !== "number") { - return false; - } - if (NumberIsNaN(v)) { - return false; - } - if (v < 0) { - return false; - } - return true; + return typeof v === "number" && v >= 0; } /** @@ -663,8 +704,7 @@ function isNonNegativeNumber(v) { * @returns {value is ReadableStream} */ function isReadableStream(value) { - return !(typeof value !== "object" || value === null || - !ReflectHas(value, _controller)); + return !(typeof value !== "object" || value === null || !value[_controller]); } /** @@ -672,10 +712,7 @@ function isReadableStream(value) { * @returns {boolean} */ function isReadableStreamLocked(stream) { - if (stream[_reader] === undefined) { - return false; - } - return true; + return stream[_reader] !== undefined; } /** @@ -684,7 +721,7 @@ function isReadableStreamLocked(stream) { */ function isReadableStreamDefaultReader(value) { return !(typeof value !== "object" || value === null || - !ReflectHas(value, _readRequests)); + !value[_readRequests]); } /** @@ -873,7 +910,7 @@ const _original = Symbol("[[original]]"); * @returns {ReadableStream<Uint8Array>} */ function readableStreamForRid(rid, autoClose = true) { - const stream = webidl.createBranded(ReadableStream); + const stream = new ReadableStream(_brand); stream[_resourceBacking] = { rid, autoClose }; const tryClose = () => { @@ -943,7 +980,7 @@ const _isUnref = Symbol("isUnref"); * @returns {ReadableStream<Uint8Array>} */ function readableStreamForRidUnrefable(rid) { - const stream = webidl.createBranded(ReadableStream); + const stream = new ReadableStream(_brand); stream[promiseIdSymbol] = undefined; stream[_isUnref] = false; stream[_resourceBackingUnrefable] = { rid, autoClose: true }; @@ -1094,7 +1131,7 @@ async function readableStreamCollectIntoUint8Array(stream) { * @returns {ReadableStream<Uint8Array>} */ function writableStreamForRid(rid, autoClose = true) { - const stream = webidl.createBranded(WritableStream); + const stream = new WritableStream(_brand); stream[_resourceBacking] = { rid, autoClose }; const tryClose = () => { @@ -1173,11 +1210,11 @@ function isWritableStreamLocked(stream) { */ function peekQueueValue(container) { assert( - ReflectHas(container, _queue) && - ReflectHas(container, _queueTotalSize), + container[_queue] && + typeof container[_queueTotalSize] === "number", ); - assert(container[_queue].length); - const valueWithSize = container[_queue][0]; + assert(container[_queue].size); + const valueWithSize = container[_queue].peek(); return valueWithSize.value; } @@ -1347,7 +1384,7 @@ function readableByteStreamControllerEnqueue(controller, chunk) { byteLength, ); } else { - assert(controller[_queue].length === 0); + assert(controller[_queue].size === 0); if (controller[_pendingPullIntos].length !== 0) { assert(controller[_pendingPullIntos][0].readerType === "default"); readableByteStreamControllerShiftPendingPullInto(controller); @@ -1394,7 +1431,7 @@ function readableByteStreamControllerEnqueueChunkToQueue( byteOffset, byteLength, ) { - ArrayPrototypePush(controller[_queue], { buffer, byteOffset, byteLength }); + controller[_queue].enqueue({ buffer, byteOffset, byteLength }); controller[_queueTotalSize] += byteLength; } @@ -1476,7 +1513,7 @@ function readableByteStreamControllerGetBYOBRequest(controller) { // deno-lint-ignore prefer-primordials firstDescriptor.byteLength - firstDescriptor.bytesFilled, ); - const byobRequest = webidl.createBranded(ReadableStreamBYOBRequest); + const byobRequest = new ReadableStreamBYOBRequest(_brand); byobRequest[_controller] = controller; byobRequest[_view] = view; controller[_byobRequest] = byobRequest; @@ -1504,7 +1541,7 @@ function readableByteStreamControllerGetDesiredSize(controller) { * @returns {void} */ function resetQueue(container) { - container[_queue] = []; + container[_queue] = new Queue(); container[_queueTotalSize] = 0; } @@ -1564,7 +1601,7 @@ function readableByteStreamControllerShouldCallPull(controller) { function readableStreamAddReadRequest(stream, readRequest) { assert(isReadableStreamDefaultReader(stream[_reader])); assert(stream[_state] === "readable"); - ArrayPrototypePush(stream[_reader][_readRequests], readRequest); + stream[_reader][_readRequests].enqueue(readRequest); } /** @@ -1587,7 +1624,7 @@ function readableStreamAddReadIntoRequest(stream, readRequest) { function readableStreamCancel(stream, reason) { stream[_disturbed] = true; if (stream[_state] === "closed") { - return resolvePromiseWith(undefined); + return PromiseResolve(undefined); } if (stream[_state] === "errored") { return PromiseReject(stream[_storedError]); @@ -1604,7 +1641,7 @@ function readableStreamCancel(stream, reason) { } /** @type {Promise<void>} */ const sourceCancelPromise = stream[_controller][_cancelSteps](reason); - return PromisePrototypeThen(sourceCancelPromise, () => undefined); + return PromisePrototypeThen(sourceCancelPromise, noop); } /** @@ -1623,9 +1660,8 @@ function readableStreamClose(stream) { if (isReadableStreamDefaultReader(reader)) { /** @type {Array<ReadRequest<R>>} */ const readRequests = reader[_readRequests]; - reader[_readRequests] = []; - for (let i = 0; i < readRequests.length; ++i) { - const readRequest = readRequests[i]; + while (readRequests.size !== 0) { + const readRequest = readRequests.dequeue(); readRequest.closeSteps(); } } @@ -1676,11 +1712,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) { */ function readableStreamDefaultControllerCanCloseOrEnqueue(controller) { const state = controller[_stream][_state]; - if (controller[_closeRequested] === false && state === "readable") { - return true; - } else { - return false; - } + return controller[_closeRequested] === false && state === "readable"; } /** @param {ReadableStreamDefaultController<any>} controller */ @@ -1699,7 +1731,7 @@ function readableStreamDefaultControllerClose(controller) { } const stream = controller[_stream]; controller[_closeRequested] = true; - if (controller[_queue].length === 0) { + if (controller[_queue].size === 0) { readableStreamDefaultControllerClearAlgorithms(controller); readableStreamClose(stream); } @@ -1785,7 +1817,6 @@ function readableStreamDefaultcontrollerHasBackpressure(controller) { * @returns {boolean} */ function readableStreamDefaultcontrollerShouldCallPull(controller) { - const stream = controller[_stream]; if ( readableStreamDefaultControllerCanCloseOrEnqueue(controller) === false ) { @@ -1794,6 +1825,7 @@ function readableStreamDefaultcontrollerShouldCallPull(controller) { if (controller[_started] === false) { return false; } + const stream = controller[_stream]; if ( isReadableStreamLocked(stream) && readableStreamGetNumReadRequests(stream) > 0 @@ -1803,10 +1835,11 @@ function readableStreamDefaultcontrollerShouldCallPull(controller) { const desiredSize = readableStreamDefaultControllerGetDesiredSize( controller, ); - assert(desiredSize !== null); + if (desiredSize > 0) { return true; } + assert(desiredSize !== null); return false; } @@ -1846,9 +1879,8 @@ function readableStreamBYOBReaderRelease(reader) { */ function readableStreamDefaultReaderErrorReadRequests(reader, e) { const readRequests = reader[_readRequests]; - reader[_readRequests] = []; - for (let i = 0; i < readRequests.length; ++i) { - const readRequest = readRequests[i]; + while (readRequests.size !== 0) { + const readRequest = readRequests.dequeue(); readRequest.errorSteps(e); } } @@ -1887,11 +1919,11 @@ function readableByteStreamControllerProcessReadRequestsUsingQueue( ) { const reader = controller[_stream][_reader]; assert(isReadableStreamDefaultReader(reader)); - while (reader[_readRequests].length !== 0) { + while (reader[_readRequests].size !== 0) { if (controller[_queueTotalSize] === 0) { return; } - const readRequest = ArrayPrototypeShift(reader[_readRequests]); + const readRequest = reader[_readRequests].dequeue(); readableByteStreamControllerFillReadRequestFromQueue( controller, readRequest, @@ -2326,7 +2358,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue( } const queue = controller[_queue]; while (totalBytesToCopyRemaining > 0) { - const headOfQueue = queue[0]; + const headOfQueue = queue.peek(); const bytesToCopy = MathMin( totalBytesToCopyRemaining, // deno-lint-ignore prefer-primordials @@ -2353,7 +2385,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue( // deno-lint-ignore prefer-primordials if (headOfQueue.byteLength === bytesToCopy) { - ArrayPrototypeShift(queue); + queue.dequeue(); } else { headOfQueue.byteOffset += bytesToCopy; headOfQueue.byteLength -= bytesToCopy; @@ -2384,7 +2416,7 @@ function readableByteStreamControllerFillReadRequestFromQueue( readRequest, ) { assert(controller[_queueTotalSize] > 0); - const entry = ArrayPrototypeShift(controller[_queue]); + const entry = controller[_queue].dequeue(); // deno-lint-ignore prefer-primordials controller[_queueTotalSize] -= entry.byteLength; readableByteStreamControllerHandleQueueDrain(controller); @@ -2526,9 +2558,9 @@ function readableStreamFulfillReadRequest(stream, chunk, done) { assert(readableStreamHasDefaultReader(stream) === true); /** @type {ReadableStreamDefaultReader<R>} */ const reader = stream[_reader]; - assert(reader[_readRequests].length); + assert(reader[_readRequests].size); /** @type {ReadRequest<R>} */ - const readRequest = ArrayPrototypeShift(reader[_readRequests]); + const readRequest = reader[_readRequests].dequeue(); if (done) { readRequest.closeSteps(); } else { @@ -2551,7 +2583,7 @@ function readableStreamGetNumReadIntoRequests(stream) { */ function readableStreamGetNumReadRequests(stream) { assert(readableStreamHasDefaultReader(stream) === true); - return stream[_reader][_readRequests].length; + return stream[_reader][_readRequests].size; } /** @@ -2621,7 +2653,7 @@ function readableStreamPipeTo( const writer = acquireWritableStreamDefaultWriter(dest); source[_disturbed] = true; let shuttingDown = false; - let currentWrite = resolvePromiseWith(undefined); + let currentWrite = PromiseResolve(undefined); /** @type {Deferred<void>} */ const promise = new Deferred(); /** @type {() => void} */ @@ -2636,7 +2668,7 @@ function readableStreamPipeTo( if (dest[_state] === "writable") { return writableStreamAbort(dest, error); } else { - return resolvePromiseWith(undefined); + return PromiseResolve(undefined); } }); } @@ -2645,7 +2677,7 @@ function readableStreamPipeTo( if (source[_state] === "readable") { return readableStreamCancel(source, error); } else { - return resolvePromiseWith(undefined); + return PromiseResolve(undefined); } }); } @@ -2680,7 +2712,7 @@ function readableStreamPipeTo( /** @returns {Promise<boolean>} */ function pipeStep() { if (shuttingDown === true) { - return resolvePromiseWith(true); + return PromiseResolve(true); } return transformPromiseWith(writer[_readyPromise].promise, () => { @@ -2997,7 +3029,7 @@ function readableStreamDefaultTee(stream, cloneForBranch2) { function pullAlgorithm() { if (reading === true) { readAgain = true; - return resolvePromiseWith(undefined); + return PromiseResolve(undefined); } reading = true; /** @type {ReadRequest<R>} */ @@ -3073,7 +3105,7 @@ function readableStreamDefaultTee(stream, cloneForBranch2) { }, }; readableStreamDefaultReaderRead(reader, readRequest); - return resolvePromiseWith(undefined); + return PromiseResolve(undefined); } /** @@ -3249,7 +3281,7 @@ function readableByteStreamTee(stream) { function pullWithBYOBReader(view, forBranch2) { if (isReadableStreamDefaultReader(reader)) { - assert(reader[_readRequests].length === 0); + assert(reader[_readRequests].size === 0); readableStreamDefaultReaderRelease(reader); reader = acquireReadableStreamBYOBReader(stream); forwardReaderError(reader); @@ -3458,7 +3490,7 @@ function setUpReadableByteStreamController( controller[_pendingPullIntos] = []; stream[_controller] = controller; const startResult = startAlgorithm(); - const startPromise = resolvePromiseWith(startResult); + const startPromise = PromiseResolve(startResult); setPromiseIsHandledToTrue( PromisePrototypeThen( startPromise, @@ -3487,13 +3519,13 @@ function setUpReadableByteStreamControllerFromUnderlyingSource( underlyingSourceDict, highWaterMark, ) { - const controller = webidl.createBranded(ReadableByteStreamController); + const controller = new ReadableByteStreamController(_brand); /** @type {() => void} */ - let startAlgorithm = () => undefined; + let startAlgorithm = _defaultStartAlgorithm; /** @type {() => Promise<void>} */ - let pullAlgorithm = () => resolvePromiseWith(undefined); + let pullAlgorithm = _defaultPullAlgorithm; /** @type {(reason: any) => Promise<void>} */ - let cancelAlgorithm = (_reason) => resolvePromiseWith(undefined); + let cancelAlgorithm = _defaultCancelAlgorithm; if (underlyingSourceDict.start !== undefined) { startAlgorithm = () => webidl.invokeCallbackFunction( @@ -3574,7 +3606,7 @@ function setUpReadableStreamDefaultController( controller[_cancelAlgorithm] = cancelAlgorithm; stream[_controller] = controller; const startResult = startAlgorithm(controller); - const startPromise = resolvePromiseWith(startResult); + const startPromise = PromiseResolve(startResult); uponPromise(startPromise, () => { controller[_started] = true; assert(controller[_pulling] === false); @@ -3600,13 +3632,13 @@ function setUpReadableStreamDefaultControllerFromUnderlyingSource( highWaterMark, sizeAlgorithm, ) { - const controller = webidl.createBranded(ReadableStreamDefaultController); + const controller = new ReadableStreamDefaultController(_brand); /** @type {() => Promise<void>} */ - let startAlgorithm = () => undefined; + let startAlgorithm = _defaultStartAlgorithm; /** @type {() => Promise<void>} */ - let pullAlgorithm = () => resolvePromiseWith(undefined); + let pullAlgorithm = _defaultPullAlgorithm; /** @type {(reason?: any) => Promise<void>} */ - let cancelAlgorithm = () => resolvePromiseWith(undefined); + let cancelAlgorithm = _defaultCancelAlgorithm; if (underlyingSourceDict.start !== undefined) { startAlgorithm = () => webidl.invokeCallbackFunction( @@ -3681,7 +3713,7 @@ function setUpReadableStreamDefaultReader(reader, stream) { throw new TypeError("ReadableStream is locked."); } readableStreamReaderGenericInitialize(reader, stream); - reader[_readRequests] = []; + reader[_readRequests] = new Queue(); } /** @@ -3721,7 +3753,7 @@ function setUpTransformStreamDefaultControllerFromTransformer( transformerDict, ) { /** @type {TransformStreamDefaultController<O>} */ - const controller = webidl.createBranded(TransformStreamDefaultController); + const controller = new TransformStreamDefaultController(_brand); /** @type {(chunk: O, controller: TransformStreamDefaultController<O>) => Promise<void>} */ let transformAlgorithm = (chunk) => { try { @@ -3729,12 +3761,11 @@ function setUpTransformStreamDefaultControllerFromTransformer( } catch (e) { return PromiseReject(e); } - return resolvePromiseWith(undefined); + return PromiseResolve(undefined); }; /** @type {(controller: TransformStreamDefaultController<O>) => Promise<void>} */ - let flushAlgorithm = () => resolvePromiseWith(undefined); - /** @type {(reason: any) => Promise<void>} */ - let cancelAlgorithm = () => resolvePromiseWith(undefined); + let flushAlgorithm = _defaultFlushAlgorithm; + let cancelAlgorithm = _defaultCancelAlgorithm; if (transformerDict.transform !== undefined) { transformAlgorithm = (chunk, controller) => webidl.invokeCallbackFunction( @@ -3842,14 +3873,14 @@ function setUpWritableStreamDefaultControllerFromUnderlyingSink( highWaterMark, sizeAlgorithm, ) { - const controller = webidl.createBranded(WritableStreamDefaultController); + const controller = new WritableStreamDefaultController(_brand); /** @type {(controller: WritableStreamDefaultController<W>) => any} */ - let startAlgorithm = () => undefined; + let startAlgorithm = _defaultStartAlgorithm; /** @type {(chunk: W, controller: WritableStreamDefaultController<W>) => Promise<void>} */ - let writeAlgorithm = () => resolvePromiseWith(undefined); - let closeAlgorithm = () => resolvePromiseWith(undefined); + let writeAlgorithm = _defaultWriteAlgorithm; + let closeAlgorithm = _defaultCloseAlgorithm; /** @type {(reason?: any) => Promise<void>} */ - let abortAlgorithm = () => resolvePromiseWith(undefined); + let abortAlgorithm = _defaultAbortAlgorithm; if (underlyingSinkDict.start !== undefined) { startAlgorithm = () => @@ -4215,11 +4246,11 @@ function transformStreamUnblockWrite(stream) { function writableStreamAbort(stream, reason) { const state = stream[_state]; if (state === "closed" || state === "errored") { - return resolvePromiseWith(undefined); + return PromiseResolve(undefined); } stream[_controller][_signal][signalAbort](reason); if (state === "closed" || state === "errored") { - return resolvePromiseWith(undefined); + return PromiseResolve(undefined); } if (stream[_pendingAbortRequest] !== undefined) { return stream[_pendingAbortRequest].deferred.promise; @@ -4329,7 +4360,7 @@ function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller) { writableStreamFinishErroring(stream); return; } - if (controller[_queue].length === 0) { + if (controller[_queue].size === 0) { return; } const value = peekQueueValue(controller); @@ -4415,7 +4446,7 @@ function writableStreamDefaultControllerProcessClose(controller) { const stream = controller[_stream]; writableStreamMarkCloseRequestInFlight(stream); dequeueValue(controller); - assert(controller[_queue].length === 0); + assert(controller[_queue].size === 0); const sinkClosePromise = controller[_closeAlgorithm](); writableStreamDefaultControllerClearAlgorithms(controller); uponPromise(sinkClosePromise, () => { @@ -4515,7 +4546,7 @@ function writableStreamDefaultWriterCloseWithErrorPropagation(writer) { if ( writableStreamCloseQueuedOrInFlight(stream) === true || state === "closed" ) { - return resolvePromiseWith(undefined); + return PromiseResolve(undefined); } if (state === "errored") { return PromiseReject(stream[_storedError]); @@ -4819,6 +4850,35 @@ const asyncIteratorPrototype = ObjectGetPrototypeOf(AsyncGeneratorPrototype); const _iteratorNext = Symbol("[[iteratorNext]]"); const _iteratorFinished = Symbol("[[iteratorFinished]]"); +class ReadableStreamAsyncIteratorReadRequest { + #reader; + #promise; + + constructor(reader, promise) { + this.#reader = reader; + this.#promise = promise; + } + + chunkSteps(chunk) { + this.#reader[_iteratorNext] = null; + this.#promise.resolve({ value: chunk, done: false }); + } + + closeSteps() { + this.#reader[_iteratorNext] = null; + this.#reader[_iteratorFinished] = true; + readableStreamDefaultReaderRelease(this.#reader); + this.#promise.resolve({ value: undefined, done: true }); + } + + errorSteps(e) { + this.#reader[_iteratorNext] = null; + this.#reader[_iteratorFinished] = true; + readableStreamDefaultReaderRelease(this.#reader); + this.#promise.reject(e); + } +} + /** @type {AsyncIterator<unknown>} */ const readableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ /** @returns {Promise<IteratorResult<unknown>>} */ @@ -4840,41 +4900,21 @@ const readableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ /** @type {Deferred<IteratorResult<any>>} */ const promise = new Deferred(); - /** @type {ReadRequest} */ - const readRequest = { - chunkSteps(chunk) { - promise.resolve({ value: chunk, done: false }); - }, - closeSteps() { - readableStreamDefaultReaderRelease(reader); - promise.resolve({ value: undefined, done: true }); - }, - errorSteps(e) { - readableStreamDefaultReaderRelease(reader); - promise.reject(e); - }, - }; + // internal values (_iteratorNext & _iteratorFinished) are modified inside + // ReadableStreamAsyncIteratorReadRequest methods + // see: https://webidl.spec.whatwg.org/#es-default-asynchronous-iterator-object + const readRequest = new ReadableStreamAsyncIteratorReadRequest( + reader, + promise, + ); readableStreamDefaultReaderRead(reader, readRequest); - return PromisePrototypeThen(promise.promise, (result) => { - reader[_iteratorNext] = null; - if (result.done === true) { - reader[_iteratorFinished] = true; - return { value: undefined, done: true }; - } - return result; - }, (reason) => { - reader[_iteratorNext] = null; - reader[_iteratorFinished] = true; - throw reason; - }); + return PromisePrototypeThen(promise.promise); } - reader[_iteratorNext] = reader[_iteratorNext] + return reader[_iteratorNext] = reader[_iteratorNext] ? PromisePrototypeThen(reader[_iteratorNext], nextSteps, nextSteps) : nextSteps(); - - return reader[_iteratorNext]; }, /** * @param {unknown} arg @@ -4892,7 +4932,7 @@ const readableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ if (reader[_stream] === undefined) { return PromiseResolve({ value: undefined, done: true }); } - assert(reader[_readRequests].length === 0); + assert(reader[_readRequests].size === 0); if (this[_preventCancel] === false) { const result = readableStreamReaderGenericCancel(reader, arg); readableStreamDefaultReaderRelease(reader); @@ -4918,7 +4958,7 @@ class ByteLengthQueuingStrategy { const prefix = "Failed to construct 'ByteLengthQueuingStrategy'"; webidl.requiredArguments(arguments.length, 1, prefix); init = webidl.converters.QueuingStrategyInit(init, prefix, "Argument 1"); - this[webidl.brand] = webidl.brand; + this[_brand] = _brand; this[_globalObject] = globalThis; this[_highWaterMark] = init.highWaterMark; } @@ -4972,7 +5012,7 @@ class CountQueuingStrategy { const prefix = "Failed to construct 'CountQueuingStrategy'"; webidl.requiredArguments(arguments.length, 1, prefix); init = webidl.converters.QueuingStrategyInit(init, prefix, "Argument 1"); - this[webidl.brand] = webidl.brand; + this[_brand] = _brand; this[_globalObject] = globalThis; this[_highWaterMark] = init.highWaterMark; } @@ -5070,34 +5110,36 @@ class ReadableStream { * @param {QueuingStrategy<R>=} strategy */ constructor(underlyingSource = undefined, strategy = undefined) { + if (underlyingSource === _brand) { + this[_brand] = _brand; + return; + } + const prefix = "Failed to construct 'ReadableStream'"; - if (underlyingSource !== undefined) { - underlyingSource = webidl.converters.object( + underlyingSource = underlyingSource !== undefined + ? webidl.converters.object( underlyingSource, prefix, "Argument 1", - ); - } else { - underlyingSource = null; - } - if (strategy !== undefined) { - strategy = webidl.converters.QueuingStrategy( + ) + : null; + strategy = strategy !== undefined + ? webidl.converters.QueuingStrategy( strategy, prefix, "Argument 2", - ); - } else { - strategy = {}; - } - this[webidl.brand] = webidl.brand; - let underlyingSourceDict = {}; - if (underlyingSource !== undefined) { - underlyingSourceDict = webidl.converters.UnderlyingSource( + ) + : {}; + + const underlyingSourceDict = underlyingSource !== undefined + ? webidl.converters.UnderlyingSource( underlyingSource, prefix, "underlyingSource", - ); - } + ) + : {}; + this[_brand] = _brand; + initializeReadableStream(this); if (underlyingSourceDict.type === "bytes") { if (strategy.size !== undefined) { @@ -5114,7 +5156,6 @@ class ReadableStream { highWaterMark, ); } else { - assert(!(ReflectHas(underlyingSourceDict, "type"))); const sizeAlgorithm = extractSizeAlgorithm(strategy); const highWaterMark = extractHighWaterMark(strategy, 1); setUpReadableStreamDefaultControllerFromUnderlyingSource( @@ -5137,7 +5178,7 @@ class ReadableStream { const iterator = getIterator(asyncIterable, true); - const stream = createReadableStream(() => undefined, async () => { + const stream = createReadableStream(noop, async () => { // deno-lint-ignore prefer-primordials const res = await iterator.next(); if (typeof res !== "object") { @@ -5310,19 +5351,23 @@ class ReadableStream { * @param {ReadableStreamIteratorOptions=} options * @returns {AsyncIterableIterator<R>} */ - values(options = {}) { + values(options = undefined) { webidl.assertBranded(this, ReadableStreamPrototype); - const prefix = "Failed to execute 'values' on 'ReadableStream'"; - options = webidl.converters.ReadableStreamIteratorOptions( - options, - prefix, - "Argument 1", - ); + let preventCancel = false; + if (options !== undefined) { + const prefix = "Failed to execute 'values' on 'ReadableStream'"; + options = webidl.converters.ReadableStreamIteratorOptions( + options, + prefix, + "Argument 1", + ); + preventCancel = options.preventCancel; + } /** @type {AsyncIterableIterator<R>} */ const iterator = ObjectCreate(readableStreamAsyncIteratorPrototype); const reader = acquireReadableStreamDefaultReader(this); iterator[_reader] = reader; - iterator[_preventCancel] = options.preventCancel; + iterator[_preventCancel] = preventCancel; return iterator; } @@ -5357,10 +5402,14 @@ class ReadableStreamDefaultReader { /** @param {ReadableStream<R>} stream */ constructor(stream) { + if (stream === _brand) { + this[_brand] = _brand; + return; + } const prefix = "Failed to construct 'ReadableStreamDefaultReader'"; webidl.requiredArguments(arguments.length, 1, prefix); stream = webidl.converters.ReadableStream(stream, prefix, "Argument 1"); - this[webidl.brand] = webidl.brand; + this[_brand] = _brand; setUpReadableStreamDefaultReader(this, stream); } @@ -5454,10 +5503,14 @@ class ReadableStreamBYOBReader { /** @param {ReadableStream<R>} stream */ constructor(stream) { + if (stream === _brand) { + this[_brand] = _brand; + return; + } const prefix = "Failed to construct 'ReadableStreamBYOBReader'"; webidl.requiredArguments(arguments.length, 1, prefix); stream = webidl.converters.ReadableStream(stream, prefix, "Argument 1"); - this[webidl.brand] = webidl.brand; + this[_brand] = _brand; setUpReadableStreamBYOBReader(this, stream); } @@ -5491,16 +5544,19 @@ class ReadableStreamBYOBReader { new TypeError("view must have non-zero byteLength"), ); } + if (getArrayBufferByteLength(buffer) === 0) { + if (isDetachedBuffer(buffer)) { + return PromiseReject( + new TypeError("view's buffer has been detached"), + ); + } + return PromiseReject( new TypeError("view's buffer must have non-zero byteLength"), ); } - if (isDetachedBuffer(buffer)) { - return PromiseReject( - new TypeError("view's buffer has been detached"), - ); - } + if (this[_stream] === undefined) { return PromiseReject( new TypeError("Reader has no associated stream."), @@ -5584,8 +5640,11 @@ class ReadableStreamBYOBRequest { return this[_view]; } - constructor() { - webidl.illegalConstructor(); + constructor(brand = undefined) { + if (brand !== _brand) { + webidl.illegalConstructor(); + } + this[_brand] = _brand; } respond(bytesWritten) { @@ -5680,8 +5739,11 @@ class ReadableByteStreamController { /** @type {ReadableStream<ArrayBuffer>} */ [_stream]; - constructor() { - webidl.illegalConstructor(); + constructor(brand = undefined) { + if (brand !== _brand) { + webidl.illegalConstructor(); + } + this[_brand] = _brand; } /** @returns {ReadableStreamBYOBRequest | null} */ @@ -5875,8 +5937,11 @@ class ReadableStreamDefaultController { /** @type {ReadableStream<R>} */ [_stream]; - constructor() { - webidl.illegalConstructor(); + constructor(brand = undefined) { + if (brand !== _brand) { + webidl.illegalConstructor(); + } + this[_brand] = _brand; } /** @returns {number | null} */ @@ -5949,9 +6014,9 @@ class ReadableStreamDefaultController { */ [_pullSteps](readRequest) { const stream = this[_stream]; - if (this[_queue].length) { + if (this[_queue].size) { const chunk = dequeueValue(this); - if (this[_closeRequested] && this[_queue].length === 0) { + if (this[_closeRequested] && this[_queue].size === 0) { readableStreamDefaultControllerClearAlgorithms(this); readableStreamClose(stream); } else { @@ -6015,7 +6080,7 @@ class TransformStream { prefix, "Argument 3", ); - this[webidl.brand] = webidl.brand; + this[_brand] = _brand; if (transformer === undefined) { transformer = null; } @@ -6103,8 +6168,11 @@ class TransformStreamDefaultController { /** @type {(chunk: O, controller: this) => Promise<void>} */ [_transformAlgorithm]; - constructor() { - webidl.illegalConstructor(); + constructor(brand = undefined) { + if (brand !== _brand) { + webidl.illegalConstructor(); + } + this[_brand] = _brand; } /** @returns {number | null} */ @@ -6191,7 +6259,11 @@ class WritableStream { * @param {UnderlyingSink<W>=} underlyingSink * @param {QueuingStrategy<W>=} strategy */ - constructor(underlyingSink = undefined, strategy = {}) { + constructor(underlyingSink = undefined, strategy = undefined) { + if (underlyingSink === _brand) { + this[_brand] = _brand; + return; + } const prefix = "Failed to construct 'WritableStream'"; if (underlyingSink !== undefined) { underlyingSink = webidl.converters.object( @@ -6200,12 +6272,14 @@ class WritableStream { "Argument 1", ); } - strategy = webidl.converters.QueuingStrategy( - strategy, - prefix, - "Argument 2", - ); - this[webidl.brand] = webidl.brand; + strategy = strategy !== undefined + ? webidl.converters.QueuingStrategy( + strategy, + prefix, + "Argument 2", + ) + : {}; + this[_brand] = _brand; if (underlyingSink === undefined) { underlyingSink = null; } @@ -6314,7 +6388,7 @@ class WritableStreamDefaultWriter { const prefix = "Failed to construct 'WritableStreamDefaultWriter'"; webidl.requiredArguments(arguments.length, 1, prefix); stream = webidl.converters.WritableStream(stream, prefix, "Argument 1"); - this[webidl.brand] = webidl.brand; + this[_brand] = _brand; setUpWritableStreamDefaultWriter(this, stream); } @@ -6471,8 +6545,11 @@ class WritableStreamDefaultController { return this[_signal]; } - constructor() { - webidl.illegalConstructor(); + constructor(brand = undefined) { + if (brand !== _brand) { + webidl.illegalConstructor(); + } + this[_brand] = _brand; } /** |