summaryrefslogtreecommitdiff
path: root/ext/web/06_streams.js
diff options
context:
space:
mode:
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r--ext/web/06_streams.js252
1 files changed, 197 insertions, 55 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index b66d4dca3..5e60f1e71 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -279,6 +279,7 @@
const _pullAlgorithm = Symbol("[[pullAlgorithm]]");
const _pulling = Symbol("[[pulling]]");
const _pullSteps = Symbol("[[PullSteps]]");
+ const _releaseSteps = Symbol("[[ReleaseSteps]]");
const _queue = Symbol("[[queue]]");
const _queueTotalSize = Symbol("[[queueTotalSize]]");
const _readable = Symbol("[[readable]]");
@@ -800,12 +801,19 @@
"The BYOB request's buffer has been detached and so cannot be filled with an enqueued chunk",
);
}
+ readableByteStreamControllerInvalidateBYOBRequest(controller);
firstPendingPullInto.buffer = transferArrayBuffer(
firstPendingPullInto.buffer,
);
+ if (firstPendingPullInto.readerType === "none") {
+ readableByteStreamControllerEnqueueDetachedPullIntoToQueue(
+ controller,
+ firstPendingPullInto,
+ );
+ }
}
- readableByteStreamControllerInvalidateBYOBRequest(controller);
if (readableStreamHasDefaultReader(stream)) {
+ readableByteStreamControllerProcessReadRequestsUsingQueue(controller);
if (readableStreamGetNumReadRequests(stream) === 0) {
assert(controller[_pendingPullIntos].length === 0);
readableByteStreamControllerEnqueueChunkToQueue(
@@ -868,6 +876,54 @@
/**
* @param {ReadableByteStreamController} controller
+ * @param {ArrayBufferLike} buffer
+ * @param {number} byteOffset
+ * @param {number} byteLength
+ * @returns {void}
+ */
+ function readableByteStreamControllerEnqueueClonedChunkToQueue(
+ controller,
+ buffer,
+ byteOffset,
+ byteLength,
+ ) {
+ let cloneResult;
+ try {
+ cloneResult = buffer.slice(byteOffset, byteOffset + byteLength);
+ } catch (e) {
+ readableByteStreamControllerError(controller, e);
+ }
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ cloneResult,
+ 0,
+ byteLength,
+ );
+ }
+
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @param {PullIntoDescriptor} pullIntoDescriptor
+ * @returns {void}
+ */
+ function readableByteStreamControllerEnqueueDetachedPullIntoToQueue(
+ controller,
+ pullIntoDescriptor,
+ ) {
+ assert(pullIntoDescriptor.readerType === "none");
+ if (pullIntoDescriptor.bytesFilled > 0) {
+ readableByteStreamControllerEnqueueClonedChunkToQueue(
+ controller,
+ pullIntoDescriptor.buffer,
+ pullIntoDescriptor.byteOffset,
+ pullIntoDescriptor.bytesFilled,
+ );
+ }
+ readableByteStreamControllerShiftPendingPullInto(controller);
+ }
+
+ /**
+ * @param {ReadableByteStreamController} controller
* @returns {ReadableStreamBYOBRequest | null}
*/
function readableByteStreamControllerGetBYOBRequest(controller) {
@@ -1000,10 +1056,11 @@
readableStreamClose(stream);
const reader = stream[_reader];
if (reader !== undefined && isReadableStreamBYOBReader(reader)) {
- for (const readIntoRequest of reader[_readIntoRequests]) {
+ const readIntoRequests = reader[_readIntoRequests];
+ reader[_readIntoRequests] = [];
+ for (const readIntoRequest of readIntoRequests) {
readIntoRequest.closeSteps(undefined);
}
- reader[_readIntoRequests] = [];
}
/** @type {Promise<void>} */
const sourceCancelPromise = stream[_controller][_cancelSteps](reason);
@@ -1026,10 +1083,10 @@
if (isReadableStreamDefaultReader(reader)) {
/** @type {Array<ReadRequest<R>>} */
const readRequests = reader[_readRequests];
+ reader[_readRequests] = [];
for (const readRequest of readRequests) {
readRequest.closeSteps();
}
- reader[_readRequests] = [];
}
// This promise can be double resolved.
// See: https://github.com/whatwg/streams/issues/1100
@@ -1225,6 +1282,29 @@
}
/**
+ * @param {ReadableStreamBYOBReader} reader
+ */
+ function readableStreamBYOBReaderRelease(reader) {
+ readableStreamReaderGenericRelease(reader);
+ const e = new TypeError(
+ "There are pending read requests, so the reader cannot be released.",
+ );
+ readableStreamBYOBReaderErrorReadIntoRequests(reader, e);
+ }
+
+ /**
+ * @param {ReadableStreamBYOBReader} reader
+ * @param {any} e
+ */
+ function readableStreamDefaultReaderErrorReadRequests(reader, e) {
+ const readRequests = reader[_readRequests];
+ reader[_readRequests] = [];
+ for (const readRequest of readRequests) {
+ readRequest.errorSteps(e);
+ }
+ }
+
+ /**
* @param {ReadableByteStreamController} controller
*/
function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
@@ -1250,6 +1330,25 @@
}
}
}
+ /**
+ * @param {ReadableByteStreamController} controller
+ */
+ function readableByteStreamControllerProcessReadRequestsUsingQueue(
+ controller,
+ ) {
+ const reader = controller[_stream][_reader];
+ assert(isReadableStreamDefaultReader(reader));
+ while (reader[_readRequests].length !== 0) {
+ if (controller[_queueTotalSize] === 0) {
+ return;
+ }
+ const readRequest = ArrayPrototypeShift(reader[_readRequests]);
+ readableByteStreamControllerFillReadRequestFromQueue(
+ controller,
+ readRequest,
+ );
+ }
+ }
/**
* @param {ReadableByteStreamController} controller
@@ -1401,6 +1500,16 @@
bytesWritten,
pullIntoDescriptor,
);
+ if (pullIntoDescriptor.readerType === "none") {
+ readableByteStreamControllerEnqueueDetachedPullIntoToQueue(
+ controller,
+ pullIntoDescriptor,
+ );
+ readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
+ controller,
+ );
+ return;
+ }
if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {
return;
}
@@ -1410,16 +1519,11 @@
if (remainderSize > 0) {
const end = pullIntoDescriptor.byteOffset +
pullIntoDescriptor.bytesFilled;
- // We dont have access to CloneArrayBuffer, so we use .slice(). End is non-inclusive, as the spec says.
- const remainder = pullIntoDescriptor.buffer.slice(
- end - remainderSize,
- end,
- );
- readableByteStreamControllerEnqueueChunkToQueue(
+ readableByteStreamControllerEnqueueClonedChunkToQueue(
controller,
- remainder,
- 0,
- remainder.byteLength,
+ pullIntoDescriptor.buffer,
+ end - remainderSize,
+ remainderSize,
);
}
pullIntoDescriptor.bytesFilled -= remainderSize;
@@ -1484,6 +1588,9 @@
firstDescriptor,
) {
assert(firstDescriptor.bytesFilled === 0);
+ if (firstDescriptor.readerType === "none") {
+ readableByteStreamControllerShiftPendingPullInto(controller);
+ }
const stream = controller[_stream];
if (readableStreamHasBYOBReader(stream)) {
while (readableStreamGetNumReadIntoRequests(stream) > 0) {
@@ -1507,6 +1614,7 @@
pullIntoDescriptor,
) {
assert(stream[_state] !== "errored");
+ assert(pullIntoDescriptor.readerType !== "none");
let done = false;
if (stream[_state] === "closed") {
assert(pullIntoDescriptor.bytesFilled === 0);
@@ -1652,6 +1760,27 @@
/**
* @param {ReadableByteStreamController} controller
+ * @param {ReadRequest} readRequest
+ * @returns {void}
+ */
+ function readableByteStreamControllerFillReadRequestFromQueue(
+ controller,
+ readRequest,
+ ) {
+ assert(controller[_queueTotalSize] > 0);
+ const entry = ArrayPrototypeShift(controller[_queue]);
+ controller[_queueTotalSize] -= entry.byteLength;
+ readableByteStreamControllerHandleQueueDrain(controller);
+ const view = new Uint8Array(
+ entry.buffer,
+ entry.byteOffset,
+ entry.byteLength,
+ );
+ readRequest.chunkSteps(view);
+ }
+
+ /**
+ * @param {ReadableByteStreamController} controller
* @param {number} size
* @param {PullIntoDescriptor} pullIntoDescriptor
* @returns {void}
@@ -1710,6 +1839,18 @@
/**
* @template R
+ * @param {ReadableStreamDefaultReader<R>} reader
+ */
+ function readableStreamDefaultReaderRelease(reader) {
+ readableStreamReaderGenericRelease(reader);
+ const e = new TypeError(
+ "There are pending read requests, so the reader cannot be released.",
+ );
+ readableStreamDefaultReaderErrorReadRequests(reader, e);
+ }
+
+ /**
+ * @template R
* @param {ReadableStream<R>} stream
* @param {any} e
*/
@@ -1727,18 +1868,10 @@
closedPromise.reject(e);
setPromiseIsHandledToTrue(closedPromise.promise);
if (isReadableStreamDefaultReader(reader)) {
- /** @type {Array<ReadRequest<R>>} */
- const readRequests = reader[_readRequests];
- for (const readRequest of readRequests) {
- readRequest.errorSteps(e);
- }
- reader[_readRequests] = [];
+ readableStreamDefaultReaderErrorReadRequests(reader, e);
} else {
assert(isReadableStreamBYOBReader(reader));
- for (const readIntoRequest of reader[_readIntoRequests]) {
- readIntoRequest.errorSteps(e);
- }
- reader[_readIntoRequests] = [];
+ readableStreamBYOBReaderErrorReadIntoRequests(reader, e);
}
}
@@ -2104,7 +2237,7 @@
*/
function finalize(isError, error) {
writableStreamDefaultWriterRelease(writer);
- readableStreamReaderGenericRelease(reader);
+ readableStreamDefaultReaderRelease(reader);
if (signal !== undefined) {
signal[remove](abortAlgorithm);
@@ -2154,9 +2287,10 @@
* @param {ReadableStreamGenericReader<R> | ReadableStreamBYOBReader} reader
*/
function readableStreamReaderGenericRelease(reader) {
- assert(reader[_stream] !== undefined);
- assert(reader[_stream][_reader] === reader);
- if (reader[_stream][_state] === "readable") {
+ const stream = reader[_stream];
+ assert(stream !== undefined);
+ assert(stream[_reader] === reader);
+ if (stream[_state] === "readable") {
reader[_closedPromise].reject(
new TypeError(
"Reader was released and can no longer be used to monitor the stream's closedness.",
@@ -2171,11 +2305,24 @@
);
}
setPromiseIsHandledToTrue(reader[_closedPromise].promise);
- reader[_stream][_reader] = undefined;
+ stream[_controller][_releaseSteps]();
+ stream[_reader] = undefined;
reader[_stream] = undefined;
}
/**
+ * @param {ReadableStreamBYOBReader} reader
+ * @param {any} e
+ */
+ function readableStreamBYOBReaderErrorReadIntoRequests(reader, e) {
+ const readIntoRequests = reader[_readIntoRequests];
+ reader[_readIntoRequests] = [];
+ for (const readIntoRequest of readIntoRequests) {
+ readIntoRequest.errorSteps(e);
+ }
+ }
+
+ /**
* @template R
* @param {ReadableStream<R>} stream
* @param {boolean} cloneForBranch2
@@ -2381,7 +2528,7 @@
function pullWithDefaultReader() {
if (isReadableStreamBYOBReader(reader)) {
assert(reader[_readIntoRequests].length === 0);
- readableStreamReaderGenericRelease(reader);
+ readableStreamBYOBReaderRelease(reader);
reader = acquireReadableStreamDefaultReader(stream);
forwardReaderError(reader);
}
@@ -2446,7 +2593,7 @@
function pullWithBYOBReader(view, forBranch2) {
if (isReadableStreamDefaultReader(reader)) {
assert(reader[_readRequests].length === 0);
- readableStreamReaderGenericRelease(reader);
+ readableStreamDefaultReaderRelease(reader);
reader = acquireReadableStreamBYOBReader(stream);
forwardReaderError(reader);
}
@@ -3982,11 +4129,11 @@
promise.resolve(createIteratorResult(chunk, false));
},
closeSteps() {
- readableStreamReaderGenericRelease(reader);
+ readableStreamDefaultReaderRelease(reader);
promise.resolve(createIteratorResult(undefined, true));
},
errorSteps(e) {
- readableStreamReaderGenericRelease(reader);
+ readableStreamDefaultReaderRelease(reader);
promise.reject(e);
},
};
@@ -4006,11 +4153,11 @@
assert(reader[_readRequests].length === 0);
if (this[_preventCancel] === false) {
const result = readableStreamReaderGenericCancel(reader, arg);
- readableStreamReaderGenericRelease(reader);
+ readableStreamDefaultReaderRelease(reader);
await result;
return createIteratorResult(arg, true);
}
- readableStreamReaderGenericRelease(reader);
+ readableStreamDefaultReaderRelease(reader);
return createIteratorResult(undefined, true);
},
}, asyncIteratorPrototype);
@@ -4417,12 +4564,7 @@
if (this[_stream] === undefined) {
return;
}
- if (this[_readRequests].length) {
- throw new TypeError(
- "There are pending read requests, so the reader cannot be release.",
- );
- }
- readableStreamReaderGenericRelease(this);
+ readableStreamDefaultReaderRelease(this);
}
get closed() {
@@ -4544,12 +4686,7 @@
if (this[_stream] === undefined) {
return;
}
- if (this[_readIntoRequests].length !== 0) {
- throw new TypeError(
- "There are pending read requests, so the reader cannot be released.",
- );
- }
- readableStreamReaderGenericRelease(this);
+ readableStreamBYOBReaderRelease(this);
}
get closed() {
@@ -4794,15 +4931,7 @@
assert(readableStreamHasDefaultReader(stream));
if (this[_queueTotalSize] > 0) {
assert(readableStreamGetNumReadRequests(stream) === 0);
- const entry = ArrayPrototypeShift(this[_queue]);
- this[_queueTotalSize] -= entry.byteLength;
- readableByteStreamControllerHandleQueueDrain(this);
- const view = new Uint8Array(
- entry.buffer,
- entry.byteOffset,
- entry.byteLength,
- );
- readRequest.chunkSteps(view);
+ readableByteStreamControllerFillReadRequestFromQueue(this, readRequest);
return;
}
const autoAllocateChunkSize = this[_autoAllocateChunkSize];
@@ -4830,6 +4959,15 @@
readableStreamAddReadRequest(stream, readRequest);
readableByteStreamControllerCallPullIfNeeded(this);
}
+
+ [_releaseSteps]() {
+ if (this[_pendingPullIntos].length !== 0) {
+ /** @type {PullIntoDescriptor} */
+ const firstPendingPullInto = this[_pendingPullIntos][0];
+ firstPendingPullInto.readerType = "none";
+ this[_pendingPullIntos] = [firstPendingPullInto];
+ }
+ }
}
webidl.configurePrototype(ReadableByteStreamController);
@@ -4944,6 +5082,10 @@
readableStreamDefaultControllerCallPullIfNeeded(this);
}
}
+
+ [_releaseSteps]() {
+ return;
+ }
}
webidl.configurePrototype(ReadableStreamDefaultController);