diff options
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r-- | ext/web/06_streams.js | 252 |
1 files changed, 197 insertions, 55 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index b66d4dca3..5e60f1e71 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -279,6 +279,7 @@ const _pullAlgorithm = Symbol("[[pullAlgorithm]]"); const _pulling = Symbol("[[pulling]]"); const _pullSteps = Symbol("[[PullSteps]]"); + const _releaseSteps = Symbol("[[ReleaseSteps]]"); const _queue = Symbol("[[queue]]"); const _queueTotalSize = Symbol("[[queueTotalSize]]"); const _readable = Symbol("[[readable]]"); @@ -800,12 +801,19 @@ "The BYOB request's buffer has been detached and so cannot be filled with an enqueued chunk", ); } + readableByteStreamControllerInvalidateBYOBRequest(controller); firstPendingPullInto.buffer = transferArrayBuffer( firstPendingPullInto.buffer, ); + if (firstPendingPullInto.readerType === "none") { + readableByteStreamControllerEnqueueDetachedPullIntoToQueue( + controller, + firstPendingPullInto, + ); + } } - readableByteStreamControllerInvalidateBYOBRequest(controller); if (readableStreamHasDefaultReader(stream)) { + readableByteStreamControllerProcessReadRequestsUsingQueue(controller); if (readableStreamGetNumReadRequests(stream) === 0) { assert(controller[_pendingPullIntos].length === 0); readableByteStreamControllerEnqueueChunkToQueue( @@ -868,6 +876,54 @@ /** * @param {ReadableByteStreamController} controller + * @param {ArrayBufferLike} buffer + * @param {number} byteOffset + * @param {number} byteLength + * @returns {void} + */ + function readableByteStreamControllerEnqueueClonedChunkToQueue( + controller, + buffer, + byteOffset, + byteLength, + ) { + let cloneResult; + try { + cloneResult = buffer.slice(byteOffset, byteOffset + byteLength); + } catch (e) { + readableByteStreamControllerError(controller, e); + } + readableByteStreamControllerEnqueueChunkToQueue( + controller, + cloneResult, + 0, + byteLength, + ); + } + + /** + * @param {ReadableByteStreamController} controller + * @param {PullIntoDescriptor} pullIntoDescriptor + * @returns {void} + */ + function readableByteStreamControllerEnqueueDetachedPullIntoToQueue( + controller, + pullIntoDescriptor, + ) { + assert(pullIntoDescriptor.readerType === "none"); + if (pullIntoDescriptor.bytesFilled > 0) { + readableByteStreamControllerEnqueueClonedChunkToQueue( + controller, + pullIntoDescriptor.buffer, + pullIntoDescriptor.byteOffset, + pullIntoDescriptor.bytesFilled, + ); + } + readableByteStreamControllerShiftPendingPullInto(controller); + } + + /** + * @param {ReadableByteStreamController} controller * @returns {ReadableStreamBYOBRequest | null} */ function readableByteStreamControllerGetBYOBRequest(controller) { @@ -1000,10 +1056,11 @@ readableStreamClose(stream); const reader = stream[_reader]; if (reader !== undefined && isReadableStreamBYOBReader(reader)) { - for (const readIntoRequest of reader[_readIntoRequests]) { + const readIntoRequests = reader[_readIntoRequests]; + reader[_readIntoRequests] = []; + for (const readIntoRequest of readIntoRequests) { readIntoRequest.closeSteps(undefined); } - reader[_readIntoRequests] = []; } /** @type {Promise<void>} */ const sourceCancelPromise = stream[_controller][_cancelSteps](reason); @@ -1026,10 +1083,10 @@ if (isReadableStreamDefaultReader(reader)) { /** @type {Array<ReadRequest<R>>} */ const readRequests = reader[_readRequests]; + reader[_readRequests] = []; for (const readRequest of readRequests) { readRequest.closeSteps(); } - reader[_readRequests] = []; } // This promise can be double resolved. // See: https://github.com/whatwg/streams/issues/1100 @@ -1225,6 +1282,29 @@ } /** + * @param {ReadableStreamBYOBReader} reader + */ + function readableStreamBYOBReaderRelease(reader) { + readableStreamReaderGenericRelease(reader); + const e = new TypeError( + "There are pending read requests, so the reader cannot be released.", + ); + readableStreamBYOBReaderErrorReadIntoRequests(reader, e); + } + + /** + * @param {ReadableStreamBYOBReader} reader + * @param {any} e + */ + function readableStreamDefaultReaderErrorReadRequests(reader, e) { + const readRequests = reader[_readRequests]; + reader[_readRequests] = []; + for (const readRequest of readRequests) { + readRequest.errorSteps(e); + } + } + + /** * @param {ReadableByteStreamController} controller */ function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( @@ -1250,6 +1330,25 @@ } } } + /** + * @param {ReadableByteStreamController} controller + */ + function readableByteStreamControllerProcessReadRequestsUsingQueue( + controller, + ) { + const reader = controller[_stream][_reader]; + assert(isReadableStreamDefaultReader(reader)); + while (reader[_readRequests].length !== 0) { + if (controller[_queueTotalSize] === 0) { + return; + } + const readRequest = ArrayPrototypeShift(reader[_readRequests]); + readableByteStreamControllerFillReadRequestFromQueue( + controller, + readRequest, + ); + } + } /** * @param {ReadableByteStreamController} controller @@ -1401,6 +1500,16 @@ bytesWritten, pullIntoDescriptor, ); + if (pullIntoDescriptor.readerType === "none") { + readableByteStreamControllerEnqueueDetachedPullIntoToQueue( + controller, + pullIntoDescriptor, + ); + readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( + controller, + ); + return; + } if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) { return; } @@ -1410,16 +1519,11 @@ if (remainderSize > 0) { const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled; - // We dont have access to CloneArrayBuffer, so we use .slice(). End is non-inclusive, as the spec says. - const remainder = pullIntoDescriptor.buffer.slice( - end - remainderSize, - end, - ); - readableByteStreamControllerEnqueueChunkToQueue( + readableByteStreamControllerEnqueueClonedChunkToQueue( controller, - remainder, - 0, - remainder.byteLength, + pullIntoDescriptor.buffer, + end - remainderSize, + remainderSize, ); } pullIntoDescriptor.bytesFilled -= remainderSize; @@ -1484,6 +1588,9 @@ firstDescriptor, ) { assert(firstDescriptor.bytesFilled === 0); + if (firstDescriptor.readerType === "none") { + readableByteStreamControllerShiftPendingPullInto(controller); + } const stream = controller[_stream]; if (readableStreamHasBYOBReader(stream)) { while (readableStreamGetNumReadIntoRequests(stream) > 0) { @@ -1507,6 +1614,7 @@ pullIntoDescriptor, ) { assert(stream[_state] !== "errored"); + assert(pullIntoDescriptor.readerType !== "none"); let done = false; if (stream[_state] === "closed") { assert(pullIntoDescriptor.bytesFilled === 0); @@ -1652,6 +1760,27 @@ /** * @param {ReadableByteStreamController} controller + * @param {ReadRequest} readRequest + * @returns {void} + */ + function readableByteStreamControllerFillReadRequestFromQueue( + controller, + readRequest, + ) { + assert(controller[_queueTotalSize] > 0); + const entry = ArrayPrototypeShift(controller[_queue]); + controller[_queueTotalSize] -= entry.byteLength; + readableByteStreamControllerHandleQueueDrain(controller); + const view = new Uint8Array( + entry.buffer, + entry.byteOffset, + entry.byteLength, + ); + readRequest.chunkSteps(view); + } + + /** + * @param {ReadableByteStreamController} controller * @param {number} size * @param {PullIntoDescriptor} pullIntoDescriptor * @returns {void} @@ -1710,6 +1839,18 @@ /** * @template R + * @param {ReadableStreamDefaultReader<R>} reader + */ + function readableStreamDefaultReaderRelease(reader) { + readableStreamReaderGenericRelease(reader); + const e = new TypeError( + "There are pending read requests, so the reader cannot be released.", + ); + readableStreamDefaultReaderErrorReadRequests(reader, e); + } + + /** + * @template R * @param {ReadableStream<R>} stream * @param {any} e */ @@ -1727,18 +1868,10 @@ closedPromise.reject(e); setPromiseIsHandledToTrue(closedPromise.promise); if (isReadableStreamDefaultReader(reader)) { - /** @type {Array<ReadRequest<R>>} */ - const readRequests = reader[_readRequests]; - for (const readRequest of readRequests) { - readRequest.errorSteps(e); - } - reader[_readRequests] = []; + readableStreamDefaultReaderErrorReadRequests(reader, e); } else { assert(isReadableStreamBYOBReader(reader)); - for (const readIntoRequest of reader[_readIntoRequests]) { - readIntoRequest.errorSteps(e); - } - reader[_readIntoRequests] = []; + readableStreamBYOBReaderErrorReadIntoRequests(reader, e); } } @@ -2104,7 +2237,7 @@ */ function finalize(isError, error) { writableStreamDefaultWriterRelease(writer); - readableStreamReaderGenericRelease(reader); + readableStreamDefaultReaderRelease(reader); if (signal !== undefined) { signal[remove](abortAlgorithm); @@ -2154,9 +2287,10 @@ * @param {ReadableStreamGenericReader<R> | ReadableStreamBYOBReader} reader */ function readableStreamReaderGenericRelease(reader) { - assert(reader[_stream] !== undefined); - assert(reader[_stream][_reader] === reader); - if (reader[_stream][_state] === "readable") { + const stream = reader[_stream]; + assert(stream !== undefined); + assert(stream[_reader] === reader); + if (stream[_state] === "readable") { reader[_closedPromise].reject( new TypeError( "Reader was released and can no longer be used to monitor the stream's closedness.", @@ -2171,11 +2305,24 @@ ); } setPromiseIsHandledToTrue(reader[_closedPromise].promise); - reader[_stream][_reader] = undefined; + stream[_controller][_releaseSteps](); + stream[_reader] = undefined; reader[_stream] = undefined; } /** + * @param {ReadableStreamBYOBReader} reader + * @param {any} e + */ + function readableStreamBYOBReaderErrorReadIntoRequests(reader, e) { + const readIntoRequests = reader[_readIntoRequests]; + reader[_readIntoRequests] = []; + for (const readIntoRequest of readIntoRequests) { + readIntoRequest.errorSteps(e); + } + } + + /** * @template R * @param {ReadableStream<R>} stream * @param {boolean} cloneForBranch2 @@ -2381,7 +2528,7 @@ function pullWithDefaultReader() { if (isReadableStreamBYOBReader(reader)) { assert(reader[_readIntoRequests].length === 0); - readableStreamReaderGenericRelease(reader); + readableStreamBYOBReaderRelease(reader); reader = acquireReadableStreamDefaultReader(stream); forwardReaderError(reader); } @@ -2446,7 +2593,7 @@ function pullWithBYOBReader(view, forBranch2) { if (isReadableStreamDefaultReader(reader)) { assert(reader[_readRequests].length === 0); - readableStreamReaderGenericRelease(reader); + readableStreamDefaultReaderRelease(reader); reader = acquireReadableStreamBYOBReader(stream); forwardReaderError(reader); } @@ -3982,11 +4129,11 @@ promise.resolve(createIteratorResult(chunk, false)); }, closeSteps() { - readableStreamReaderGenericRelease(reader); + readableStreamDefaultReaderRelease(reader); promise.resolve(createIteratorResult(undefined, true)); }, errorSteps(e) { - readableStreamReaderGenericRelease(reader); + readableStreamDefaultReaderRelease(reader); promise.reject(e); }, }; @@ -4006,11 +4153,11 @@ assert(reader[_readRequests].length === 0); if (this[_preventCancel] === false) { const result = readableStreamReaderGenericCancel(reader, arg); - readableStreamReaderGenericRelease(reader); + readableStreamDefaultReaderRelease(reader); await result; return createIteratorResult(arg, true); } - readableStreamReaderGenericRelease(reader); + readableStreamDefaultReaderRelease(reader); return createIteratorResult(undefined, true); }, }, asyncIteratorPrototype); @@ -4417,12 +4564,7 @@ if (this[_stream] === undefined) { return; } - if (this[_readRequests].length) { - throw new TypeError( - "There are pending read requests, so the reader cannot be release.", - ); - } - readableStreamReaderGenericRelease(this); + readableStreamDefaultReaderRelease(this); } get closed() { @@ -4544,12 +4686,7 @@ if (this[_stream] === undefined) { return; } - if (this[_readIntoRequests].length !== 0) { - throw new TypeError( - "There are pending read requests, so the reader cannot be released.", - ); - } - readableStreamReaderGenericRelease(this); + readableStreamBYOBReaderRelease(this); } get closed() { @@ -4794,15 +4931,7 @@ assert(readableStreamHasDefaultReader(stream)); if (this[_queueTotalSize] > 0) { assert(readableStreamGetNumReadRequests(stream) === 0); - const entry = ArrayPrototypeShift(this[_queue]); - this[_queueTotalSize] -= entry.byteLength; - readableByteStreamControllerHandleQueueDrain(this); - const view = new Uint8Array( - entry.buffer, - entry.byteOffset, - entry.byteLength, - ); - readRequest.chunkSteps(view); + readableByteStreamControllerFillReadRequestFromQueue(this, readRequest); return; } const autoAllocateChunkSize = this[_autoAllocateChunkSize]; @@ -4830,6 +4959,15 @@ readableStreamAddReadRequest(stream, readRequest); readableByteStreamControllerCallPullIfNeeded(this); } + + [_releaseSteps]() { + if (this[_pendingPullIntos].length !== 0) { + /** @type {PullIntoDescriptor} */ + const firstPendingPullInto = this[_pendingPullIntos][0]; + firstPendingPullInto.readerType = "none"; + this[_pendingPullIntos] = [firstPendingPullInto]; + } + } } webidl.configurePrototype(ReadableByteStreamController); @@ -4944,6 +5082,10 @@ readableStreamDefaultControllerCallPullIfNeeded(this); } } + + [_releaseSteps]() { + return; + } } webidl.configurePrototype(ReadableStreamDefaultController); |