summaryrefslogtreecommitdiff
path: root/ext/web/06_streams.js
diff options
context:
space:
mode:
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r--ext/web/06_streams.js1284
1 files changed, 1259 insertions, 25 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index bf6b9722c..92ee08404 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -11,12 +11,21 @@
const webidl = window.__bootstrap.webidl;
// TODO(lucacasonato): get AbortSignal from __bootstrap.
const {
+ ArrayBuffer,
+ ArrayBufferIsView,
ArrayPrototypeMap,
ArrayPrototypePush,
ArrayPrototypeShift,
+ BigInt64Array,
+ BigUint64Array,
+ DataView,
Error,
+ Int8Array,
+ Int16Array,
+ Int32Array,
NumberIsInteger,
NumberIsNaN,
+ MathMin,
ObjectCreate,
ObjectDefineProperties,
ObjectDefineProperty,
@@ -24,15 +33,21 @@
ObjectSetPrototypeOf,
Promise,
PromiseAll,
+ PromisePrototypeCatch,
PromisePrototypeThen,
PromiseReject,
+ PromiseResolve,
queueMicrotask,
RangeError,
+ SharedArrayBuffer,
Symbol,
SymbolAsyncIterator,
SymbolFor,
TypeError,
Uint8Array,
+ Uint16Array,
+ Uint32Array,
+ Uint8ClampedArray,
WeakMap,
WeakMapPrototypeGet,
WeakMapPrototypeHas,
@@ -195,6 +210,20 @@
/**
* @param {ArrayBufferLike} O
+ * @returns {boolean}
+ */
+ function canTransferArrayBuffer(O) {
+ assert(typeof O === "object");
+ assert(O instanceof ArrayBuffer || O instanceof SharedArrayBuffer);
+ if (isDetachedBuffer(O)) {
+ return false;
+ }
+ // TODO(@crowlKats): 4. If SameValue(O.[[ArrayBufferDetachKey]], undefined) is false, return false.
+ return true;
+ }
+
+ /**
+ * @param {ArrayBufferLike} O
* @returns {ArrayBufferLike}
*/
function transferArrayBuffer(O) {
@@ -209,6 +238,18 @@
return transferredIshVersion;
}
+ /**
+ * @param {ArrayBufferView} O
+ * @returns {Uint8Array}
+ */
+ function cloneAsUint8Array(O) {
+ assert(typeof O === "object");
+ assert(ArrayBufferIsView(O));
+ assert(!isDetachedBuffer(O.buffer));
+ const buffer = O.buffer.slice(O.byteOffset, O.byteOffset + O.byteLength);
+ return new Uint8Array(buffer);
+ }
+
const _abortAlgorithm = Symbol("[[abortAlgorithm]]");
const _abortSteps = Symbol("[[AbortSteps]]");
const _autoAllocateChunkSize = Symbol("[[autoAllocateChunkSize]]");
@@ -232,6 +273,7 @@
const _inFlightCloseRequest = Symbol("[[inFlightCloseRequest]]");
const _inFlightWriteRequest = Symbol("[[inFlightWriteRequest]]");
const _pendingAbortRequest = Symbol("[pendingAbortRequest]");
+ const _pendingPullIntos = Symbol("[[pendingPullIntos]]");
const _preventCancel = Symbol("[[preventCancel]]");
const _pullAgain = Symbol("[[pullAgain]]");
const _pullAlgorithm = Symbol("[[pullAlgorithm]]");
@@ -242,6 +284,7 @@
const _readable = Symbol("[[readable]]");
const _reader = Symbol("[[reader]]");
const _readRequests = Symbol("[[readRequests]]");
+ const _readIntoRequests = Symbol("[[readIntoRequests]]");
const _readyPromise = Symbol("[[readyPromise]]");
const _started = Symbol("[[started]]");
const _state = Symbol("[[state]]");
@@ -250,6 +293,7 @@
const _strategySizeAlgorithm = Symbol("[[strategySizeAlgorithm]]");
const _stream = Symbol("[[stream]]");
const _transformAlgorithm = Symbol("[[transformAlgorithm]]");
+ const _view = Symbol("[[view]]");
const _writable = Symbol("[[writable]]");
const _writeAlgorithm = Symbol("[[writeAlgorithm]]");
const _writer = Symbol("[[writer]]");
@@ -265,6 +309,17 @@
}
/**
+ * @template R
+ * @param {ReadableStream<R>} stream
+ * @returns {ReadableStreamBYOBReader<R>}
+ */
+ function acquireReadableStreamBYOBReader(stream) {
+ const reader = webidl.createBranded(ReadableStreamBYOBReader);
+ setUpReadableStreamBYOBReader(reader, stream);
+ return reader;
+ }
+
+ /**
* @template W
* @param {WritableStream<W>} stream
* @returns {WritableStreamDefaultWriter<W>}
@@ -413,6 +468,32 @@
}
/**
+ * @param {() => void} startAlgorithm
+ * @param {() => Promise<void>} pullAlgorithm
+ * @param {(reason: any) => Promise<void>} cancelAlgorithm
+ * @returns {ReadableStream}
+ */
+ function createReadableByteStream(
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ ) {
+ const stream = webidl.createBranded(ReadableStream);
+ initializeReadableStream(stream);
+ const controller = webidl.createBranded(ReadableByteStreamController);
+ setUpReadableByteStreamController(
+ stream,
+ controller,
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ 0,
+ undefined,
+ );
+ return stream;
+ }
+
+ /**
* @param {ReadableStream} stream
* @returns {void}
*/
@@ -545,6 +626,15 @@
}
/**
+ * @param {unknown} value
+ * @returns {value is ReadableStreamBYOBReader}
+ */
+ function isReadableStreamBYOBReader(value) {
+ return !(typeof value !== "object" || value === null ||
+ !(_readIntoRequests in value));
+ }
+
+ /**
* @param {ReadableStream} stream
* @returns {boolean}
*/
@@ -638,7 +728,7 @@
if (stream[_state] !== "readable") {
return;
}
- // 3. Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
+ readableByteStreamControllerClearPendingPullIntos(controller);
resetQueue(controller);
readableByteStreamControllerClearAlgorithms(controller);
readableStreamError(stream, e);
@@ -648,6 +738,15 @@
* @param {ReadableByteStreamController} controller
* @returns {void}
*/
+ function readableByteStreamControllerClearPendingPullIntos(controller) {
+ readableByteStreamControllerInvalidateBYOBRequest(controller);
+ controller[_pendingPullIntos] = [];
+ }
+
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @returns {void}
+ */
function readableByteStreamControllerClose(controller) {
/** @type {ReadableStream<ArrayBuffer>} */
const stream = controller[_stream];
@@ -658,7 +757,16 @@
controller[_closeRequested] = true;
return;
}
- // 3.13.6.4 If controller.[[pendingPullIntos]] is not empty, (BYOB Support)
+ if (controller[_pendingPullIntos].length !== 0) {
+ const firstPendingPullInto = controller[_pendingPullIntos][0];
+ if (firstPendingPullInto.bytesFilled > 0) {
+ const e = new TypeError(
+ "Insufficient bytes to fill elements in the given buffer",
+ );
+ readableByteStreamControllerError(controller, e);
+ throw e;
+ }
+ }
readableByteStreamControllerClearAlgorithms(controller);
readableStreamClose(stream);
}
@@ -678,9 +786,27 @@
}
const { buffer, byteOffset, byteLength } = chunk;
+ if (isDetachedBuffer(buffer)) {
+ throw new TypeError(
+ "chunk's buffer is detached and so cannot be enqueued",
+ );
+ }
const transferredBuffer = transferArrayBuffer(buffer);
+ if (controller[_pendingPullIntos].length !== 0) {
+ const firstPendingPullInto = controller[_pendingPullIntos][0];
+ if (isDetachedBuffer(firstPendingPullInto.buffer)) {
+ throw new TypeError(
+ "The BYOB request's buffer has been detached and so cannot be filled with an enqueued chunk",
+ );
+ }
+ firstPendingPullInto.buffer = transferArrayBuffer(
+ firstPendingPullInto.buffer,
+ );
+ }
+ readableByteStreamControllerInvalidateBYOBRequest(controller);
if (readableStreamHasDefaultReader(stream)) {
if (readableStreamGetNumReadRequests(stream) === 0) {
+ assert(controller[_pendingPullIntos].length === 0);
readableByteStreamControllerEnqueueChunkToQueue(
controller,
transferredBuffer,
@@ -689,6 +815,10 @@
);
} else {
assert(controller[_queue].length === 0);
+ if (controller[_pendingPullIntos].length !== 0) {
+ assert(controller[_pendingPullIntos][0].readerType === "default");
+ readableByteStreamControllerShiftPendingPullInto(controller);
+ }
const transferredView = new Uint8Array(
transferredBuffer,
byteOffset,
@@ -696,7 +826,16 @@
);
readableStreamFulfillReadRequest(stream, transferredView, false);
}
- // 8 Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true,
+ } else if (readableStreamHasBYOBReader(stream)) {
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ transferredBuffer,
+ byteOffset,
+ byteLength,
+ );
+ readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
+ controller,
+ );
} else {
assert(isReadableStreamLocked(stream) === false);
readableByteStreamControllerEnqueueChunkToQueue(
@@ -728,6 +867,29 @@
/**
* @param {ReadableByteStreamController} controller
+ * @returns {ReadableStreamBYOBRequest | null}
+ */
+ function readableByteStreamControllerGetBYOBRequest(controller) {
+ if (
+ controller[_byobRequest] === null &&
+ controller[_pendingPullIntos].length !== 0
+ ) {
+ const firstDescriptor = controller[_pendingPullIntos][0];
+ const view = new Uint8Array(
+ firstDescriptor.buffer,
+ firstDescriptor.byteOffset + firstDescriptor.bytesFilled,
+ firstDescriptor.byteLength - firstDescriptor.bytesFilled,
+ );
+ const byobRequest = webidl.createBranded(ReadableStreamBYOBRequest);
+ byobRequest[_controller] = controller;
+ byobRequest[_view] = view;
+ controller[_byobRequest] = byobRequest;
+ }
+ return controller[_byobRequest];
+ }
+
+ /**
+ * @param {ReadableByteStreamController} controller
* @returns {number | null}
*/
function readableByteStreamControllerGetDesiredSize(controller) {
@@ -786,8 +948,12 @@
) {
return true;
}
- // 3.13.25.6 If ! ReadableStreamHasBYOBReader(stream) is true and !
- // ReadableStreamGetNumReadIntoRequests(stream) > 0, return true.
+ if (
+ readableStreamHasBYOBReader(stream) &&
+ readableStreamGetNumReadIntoRequests(stream) > 0
+ ) {
+ return true;
+ }
const desiredSize = readableByteStreamControllerGetDesiredSize(controller);
assert(desiredSize !== null);
return desiredSize > 0;
@@ -806,6 +972,17 @@
}
/**
+ * @param {ReadableStream} stream
+ * @param {ReadIntoRequest} readRequest
+ * @returns {void}
+ */
+ function readableStreamAddReadIntoRequest(stream, readRequest) {
+ assert(isReadableStreamBYOBReader(stream[_reader]));
+ assert(stream[_state] === "readable" || stream[_state] === "closed");
+ ArrayPrototypePush(stream[_reader][_readIntoRequests], readRequest);
+ }
+
+ /**
* @template R
* @param {ReadableStream<R>} stream
* @param {any=} reason
@@ -820,6 +997,13 @@
return PromiseReject(stream[_storedError]);
}
readableStreamClose(stream);
+ const reader = stream[_reader];
+ if (reader !== undefined && isReadableStreamBYOBReader(reader)) {
+ for (const readIntoRequest of reader[_readIntoRequests]) {
+ readIntoRequest.closeSteps(undefined);
+ }
+ reader[_readIntoRequests] = [];
+ }
/** @type {Promise<void>} */
const sourceCancelPromise = stream[_controller][_cancelSteps](reason);
return PromisePrototypeThen(sourceCancelPromise, () => undefined);
@@ -1019,6 +1203,491 @@
}
/**
+ * @param {ReadableStreamBYOBReader} reader
+ * @param {ArrayBufferView} view
+ * @param {ReadIntoRequest} readIntoRequest
+ * @returns {void}
+ */
+ function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
+ const stream = reader[_stream];
+ assert(stream);
+ stream[_disturbed] = true;
+ if (stream[_state] === "errored") {
+ readIntoRequest.errorSteps(stream[_storedError]);
+ } else {
+ readableByteStreamControllerPullInto(
+ stream[_controller],
+ view,
+ readIntoRequest,
+ );
+ }
+ }
+
+ /**
+ * @param {ReadableByteStreamController} controller
+ */
+ function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
+ controller,
+ ) {
+ assert(!controller[_closeRequested]);
+ while (controller[_pendingPullIntos].length !== 0) {
+ if (controller[_queueTotalSize] === 0) {
+ return;
+ }
+ const pullIntoDescriptor = controller[_pendingPullIntos][0];
+ if (
+ readableByteStreamControllerFillPullIntoDescriptorFromQueue(
+ controller,
+ pullIntoDescriptor,
+ )
+ ) {
+ readableByteStreamControllerShiftPendingPullInto(controller);
+ readableByteStreamControllerCommitPullIntoDescriptor(
+ controller[_stream],
+ pullIntoDescriptor,
+ );
+ }
+ }
+ }
+
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @param {ArrayBufferView} view
+ * @param {ReadIntoRequest} readIntoRequest
+ * @returns {void}
+ */
+ function readableByteStreamControllerPullInto(
+ controller,
+ view,
+ readIntoRequest,
+ ) {
+ const stream = controller[_stream];
+ let elementSize = 1;
+ let ctor = DataView;
+
+ if (
+ view instanceof Int8Array ||
+ view instanceof Uint8Array ||
+ view instanceof Uint8ClampedArray ||
+ view instanceof Int16Array ||
+ view instanceof Uint16Array ||
+ view instanceof Int32Array ||
+ view instanceof Uint32Array ||
+ view instanceof BigInt64Array ||
+ view instanceof BigUint64Array
+ ) {
+ elementSize = view.constructor.BYTES_PER_ELEMENT;
+ ctor = view.constructor;
+ }
+ const byteOffset = view.byteOffset;
+ const byteLength = view.byteLength;
+
+ /** @type {ArrayBufferLike} */
+ let buffer;
+
+ try {
+ buffer = transferArrayBuffer(view.buffer);
+ } catch (e) {
+ readIntoRequest.errorSteps(e);
+ return;
+ }
+
+ /** @type {PullIntoDescriptor} */
+ const pullIntoDescriptor = {
+ buffer,
+ bufferByteLength: buffer.byteLength,
+ byteOffset,
+ byteLength,
+ bytesFilled: 0,
+ elementSize,
+ viewConstructor: ctor,
+ readerType: "byob",
+ };
+
+ if (controller[_pendingPullIntos].length !== 0) {
+ ArrayPrototypePush(controller[_pendingPullIntos], pullIntoDescriptor);
+ readableStreamAddReadIntoRequest(stream, readIntoRequest);
+ return;
+ }
+ if (stream[_state] === "closed") {
+ const emptyView = new ctor(
+ pullIntoDescriptor.buffer,
+ pullIntoDescriptor.byteOffset,
+ 0,
+ );
+ readIntoRequest.closeSteps(emptyView);
+ return;
+ }
+ if (controller[_queueTotalSize] > 0) {
+ if (
+ readableByteStreamControllerFillPullIntoDescriptorFromQueue(
+ controller,
+ pullIntoDescriptor,
+ )
+ ) {
+ const filledView =
+ readableByteStreamControllerConvertPullIntoDescriptor(
+ pullIntoDescriptor,
+ );
+ readableByteStreamControllerHandleQueueDrain(controller);
+ readIntoRequest.chunkSteps(filledView);
+ return;
+ }
+ if (controller[_closeRequested]) {
+ const e = new TypeError(
+ "Insufficient bytes to fill elements in the given buffer",
+ );
+ readableByteStreamControllerError(controller, e);
+ readIntoRequest.errorSteps(e);
+ return;
+ }
+ }
+ controller[_pendingPullIntos].push(pullIntoDescriptor);
+ readableStreamAddReadIntoRequest(stream, readIntoRequest);
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ }
+
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @param {number} bytesWritten
+ * @returns {void}
+ */
+ function readableByteStreamControllerRespond(controller, bytesWritten) {
+ assert(controller[_pendingPullIntos].length !== 0);
+ const firstDescriptor = controller[_pendingPullIntos][0];
+ const state = controller[_stream][_state];
+ if (state === "closed") {
+ if (bytesWritten !== 0) {
+ throw new TypeError(
+ "bytesWritten must be 0 when calling respond() on a closed stream",
+ );
+ }
+ } else {
+ assert(state === "readable");
+ if (bytesWritten === 0) {
+ throw new TypeError(
+ "bytesWritten must be greater than 0 when calling respond() on a readable stream",
+ );
+ }
+ if (
+ (firstDescriptor.bytesFilled + bytesWritten) >
+ firstDescriptor.byteLength
+ ) {
+ throw new RangeError("bytesWritten out of range");
+ }
+ }
+ firstDescriptor.buffer = transferArrayBuffer(firstDescriptor.buffer);
+ readableByteStreamControllerRespondInternal(controller, bytesWritten);
+ }
+
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @param {number} bytesWritten
+ * @param {PullIntoDescriptor} pullIntoDescriptor
+ * @returns {void}
+ */
+ function readableByteStreamControllerRespondInReadableState(
+ controller,
+ bytesWritten,
+ pullIntoDescriptor,
+ ) {
+ assert(
+ (pullIntoDescriptor.bytesFilled + bytesWritten) <=
+ pullIntoDescriptor.byteLength,
+ );
+ readableByteStreamControllerFillHeadPullIntoDescriptor(
+ controller,
+ bytesWritten,
+ pullIntoDescriptor,
+ );
+ if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {
+ return;
+ }
+ readableByteStreamControllerShiftPendingPullInto(controller);
+ const remainderSize = pullIntoDescriptor.bytesFilled %
+ pullIntoDescriptor.elementSize;
+ if (remainderSize > 0) {
+ const end = pullIntoDescriptor.byteOffset +
+ pullIntoDescriptor.bytesFilled;
+ // We dont have access to CloneArrayBuffer, so we use .slice(). End is non-inclusive, as the spec says.
+ const remainder = pullIntoDescriptor.buffer.slice(
+ end - remainderSize,
+ end,
+ );
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ remainder,
+ 0,
+ remainder.byteLength,
+ );
+ }
+ pullIntoDescriptor.bytesFilled -= remainderSize;
+ readableByteStreamControllerCommitPullIntoDescriptor(
+ controller[_stream],
+ pullIntoDescriptor,
+ );
+ readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
+ controller,
+ );
+ }
+
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @param {number} bytesWritten
+ * @returns {void}
+ */
+ function readableByteStreamControllerRespondInternal(
+ controller,
+ bytesWritten,
+ ) {
+ const firstDescriptor = controller[_pendingPullIntos][0];
+ assert(canTransferArrayBuffer(firstDescriptor.buffer));
+ readableByteStreamControllerInvalidateBYOBRequest(controller);
+ const state = controller[_stream][_state];
+ if (state === "closed") {
+ assert(bytesWritten === 0);
+ readableByteStreamControllerRespondInClosedState(
+ controller,
+ firstDescriptor,
+ );
+ } else {
+ assert(state === "readable");
+ assert(bytesWritten > 0);
+ readableByteStreamControllerRespondInReadableState(
+ controller,
+ bytesWritten,
+ firstDescriptor,
+ );
+ }
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ }
+
+ /**
+ * @param {ReadableByteStreamController} controller
+ */
+ function readableByteStreamControllerInvalidateBYOBRequest(controller) {
+ if (controller[_byobRequest] === null) {
+ return;
+ }
+ controller[_byobRequest][_controller] = undefined;
+ controller[_byobRequest][_view] = null;
+ controller[_byobRequest] = null;
+ }
+
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @param {PullIntoDescriptor} firstDescriptor
+ */
+ function readableByteStreamControllerRespondInClosedState(
+ controller,
+ firstDescriptor,
+ ) {
+ assert(firstDescriptor.bytesFilled === 0);
+ const stream = controller[_stream];
+ if (readableStreamHasBYOBReader(stream)) {
+ while (readableStreamGetNumReadIntoRequests(stream) > 0) {
+ const pullIntoDescriptor =
+ readableByteStreamControllerShiftPendingPullInto(controller);
+ readableByteStreamControllerCommitPullIntoDescriptor(
+ stream,
+ pullIntoDescriptor,
+ );
+ }
+ }
+ }
+
+ /**
+ * @template R
+ * @param {ReadableStream<R>} stream
+ * @param {PullIntoDescriptor} pullIntoDescriptor
+ */
+ function readableByteStreamControllerCommitPullIntoDescriptor(
+ stream,
+ pullIntoDescriptor,
+ ) {
+ assert(stream[_state] !== "errored");
+ let done = false;
+ if (stream[_state] === "closed") {
+ assert(pullIntoDescriptor.bytesFilled === 0);
+ done = true;
+ }
+ const filledView = readableByteStreamControllerConvertPullIntoDescriptor(
+ pullIntoDescriptor,
+ );
+ if (pullIntoDescriptor.readerType === "default") {
+ readableStreamFulfillReadRequest(stream, filledView, done);
+ } else {
+ assert(pullIntoDescriptor.readerType === "byob");
+ readableStreamFulfillReadIntoRequest(stream, filledView, done);
+ }
+ }
+
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @param {ArrayBufferView} view
+ */
+ function readableByteStreamControllerRespondWithNewView(controller, view) {
+ assert(controller[_pendingPullIntos].length !== 0);
+ assert(!isDetachedBuffer(view.buffer));
+ const firstDescriptor = controller[_pendingPullIntos][0];
+ const state = controller[_stream][_state];
+ if (state === "closed") {
+ if (view.byteLength !== 0) {
+ throw new TypeError(
+ "The view's length must be 0 when calling respondWithNewView() on a closed stream",
+ );
+ }
+ } else {
+ assert(state === "readable");
+ if (view.byteLength === 0) {
+ throw new TypeError(
+ "The view's length must be greater than 0 when calling respondWithNewView() on a readable stream",
+ );
+ }
+ }
+ if (
+ (firstDescriptor.byteOffset + firstDescriptor.bytesFilled) !==
+ view.byteOffset
+ ) {
+ throw new RangeError(
+ "The region specified by view does not match byobRequest",
+ );
+ }
+ if (firstDescriptor.bufferByteLength !== view.buffer.byteLength) {
+ throw new RangeError(
+ "The buffer of view has different capacity than byobRequest",
+ );
+ }
+ if (
+ (firstDescriptor.bytesFilled + view.byteLength) >
+ firstDescriptor.byteLength
+ ) {
+ throw new RangeError(
+ "The region specified by view is larger than byobRequest",
+ );
+ }
+ const viewByteLength = view.byteLength;
+ firstDescriptor.buffer = transferArrayBuffer(view.buffer);
+ readableByteStreamControllerRespondInternal(controller, viewByteLength);
+ }
+
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @returns {PullIntoDescriptor}
+ */
+ function readableByteStreamControllerShiftPendingPullInto(controller) {
+ assert(controller[_byobRequest] === null);
+ return ArrayPrototypeShift(controller[_pendingPullIntos]);
+ }
+
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @param {PullIntoDescriptor} pullIntoDescriptor
+ * @returns {boolean}
+ */
+ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
+ controller,
+ pullIntoDescriptor,
+ ) {
+ const elementSize = pullIntoDescriptor.elementSize;
+ const currentAlignedBytes = pullIntoDescriptor.bytesFilled -
+ (pullIntoDescriptor.bytesFilled % elementSize);
+ const maxBytesToCopy = MathMin(
+ controller[_queueTotalSize],
+ pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled,
+ );
+ const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;
+ const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
+ let totalBytesToCopyRemaining = maxBytesToCopy;
+ let ready = false;
+ if (maxAlignedBytes > currentAlignedBytes) {
+ totalBytesToCopyRemaining = maxAlignedBytes -
+ pullIntoDescriptor.bytesFilled;
+ ready = true;
+ }
+ const queue = controller[_queue];
+ while (totalBytesToCopyRemaining > 0) {
+ const headOfQueue = queue[0];
+ const bytesToCopy = MathMin(
+ totalBytesToCopyRemaining,
+ headOfQueue.byteLength,
+ );
+ const destStart = pullIntoDescriptor.byteOffset +
+ pullIntoDescriptor.bytesFilled;
+
+ const destBuffer = new Uint8Array(
+ pullIntoDescriptor.buffer,
+ destStart,
+ bytesToCopy,
+ );
+ const srcBuffer = new Uint8Array(
+ headOfQueue.buffer,
+ headOfQueue.byteOffset,
+ bytesToCopy,
+ );
+ destBuffer.set(srcBuffer);
+
+ if (headOfQueue.byteLength === bytesToCopy) {
+ ArrayPrototypeShift(queue);
+ } else {
+ headOfQueue.byteOffset += bytesToCopy;
+ headOfQueue.byteLength -= bytesToCopy;
+ }
+ controller[_queueTotalSize] -= bytesToCopy;
+ readableByteStreamControllerFillHeadPullIntoDescriptor(
+ controller,
+ bytesToCopy,
+ pullIntoDescriptor,
+ );
+ totalBytesToCopyRemaining -= bytesToCopy;
+ }
+ if (!ready) {
+ assert(controller[_queueTotalSize] === 0);
+ assert(pullIntoDescriptor.bytesFilled > 0);
+ assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize);
+ }
+ return ready;
+ }
+
+ /**
+ * @param {ReadableByteStreamController} controller
+ * @param {number} size
+ * @param {PullIntoDescriptor} pullIntoDescriptor
+ * @returns {void}
+ */
+ function readableByteStreamControllerFillHeadPullIntoDescriptor(
+ controller,
+ size,
+ pullIntoDescriptor,
+ ) {
+ assert(
+ controller[_pendingPullIntos].length === 0 ||
+ controller[_pendingPullIntos][0] === pullIntoDescriptor,
+ );
+ assert(controller[_byobRequest] === null);
+ pullIntoDescriptor.bytesFilled += size;
+ }
+
+ /**
+ * @param {PullIntoDescriptor} pullIntoDescriptor
+ * @returns {ArrayBufferView}
+ */
+ function readableByteStreamControllerConvertPullIntoDescriptor(
+ pullIntoDescriptor,
+ ) {
+ const bytesFilled = pullIntoDescriptor.bytesFilled;
+ const elementSize = pullIntoDescriptor.elementSize;
+ assert(bytesFilled <= pullIntoDescriptor.byteLength);
+ assert((bytesFilled % elementSize) === 0);
+ const buffer = transferArrayBuffer(pullIntoDescriptor.buffer);
+ return new pullIntoDescriptor.viewConstructor(
+ buffer,
+ pullIntoDescriptor.byteOffset,
+ bytesFilled / elementSize,
+ );
+ }
+
+ /**
* @template R
* @param {ReadableStreamDefaultReader<R>} reader
* @param {ReadRequest<R>} readRequest
@@ -1063,8 +1732,33 @@
readRequest.errorSteps(e);
}
reader[_readRequests] = [];
+ } else {
+ assert(isReadableStreamBYOBReader(reader));
+ for (const readIntoRequest of reader[_readIntoRequests]) {
+ readIntoRequest.errorSteps(e);
+ }
+ reader[_readIntoRequests] = [];
+ }
+ }
+
+ /**
+ * @template R
+ * @param {ReadableStream<R>} stream
+ * @param {R} chunk
+ * @param {boolean} done
+ */
+ function readableStreamFulfillReadIntoRequest(stream, chunk, done) {
+ assert(readableStreamHasBYOBReader(stream));
+ /** @type {ReadableStreamDefaultReader<R>} */
+ const reader = stream[_reader];
+ assert(reader[_readIntoRequests].length !== 0);
+ /** @type {ReadIntoRequest} */
+ const readIntoRequest = ArrayPrototypeShift(reader[_readIntoRequests]);
+ if (done) {
+ readIntoRequest.closeSteps(chunk);
+ } else {
+ readIntoRequest.chunkSteps(chunk);
}
- // 3.5.6.8 Otherwise, support BYOB Reader
}
/**
@@ -1091,6 +1785,15 @@
* @param {ReadableStream} stream
* @return {number}
*/
+ function readableStreamGetNumReadIntoRequests(stream) {
+ assert(readableStreamHasBYOBReader(stream) === true);
+ return stream[_reader][_readIntoRequests].length;
+ }
+
+ /**
+ * @param {ReadableStream} stream
+ * @return {number}
+ */
function readableStreamGetNumReadRequests(stream) {
assert(readableStreamHasDefaultReader(stream) === true);
return stream[_reader][_readRequests].length;
@@ -1100,6 +1803,21 @@
* @param {ReadableStream} stream
* @returns {boolean}
*/
+ function readableStreamHasBYOBReader(stream) {
+ const reader = stream[_reader];
+ if (reader === undefined) {
+ return false;
+ }
+ if (isReadableStreamBYOBReader(reader)) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * @param {ReadableStream} stream
+ * @returns {boolean}
+ */
function readableStreamHasDefaultReader(stream) {
const reader = stream[_reader];
if (reader === undefined) {
@@ -1138,6 +1856,9 @@
assert(signal === undefined || signal instanceof AbortSignal);
assert(!isReadableStreamLocked(source));
assert(!isWritableStreamLocked(dest));
+ // We use acquireReadableStreamDefaultReader even in case of ReadableByteStreamController
+ // as the spec allows us, and the only reason to use BYOBReader is to do some smart things
+ // with it, but the spec does not specify what things, so to simplify we stick to DefaultReader.
const reader = acquireReadableStreamDefaultReader(source);
const writer = acquireWritableStreamDefaultWriter(dest);
source[_disturbed] = true;
@@ -1398,7 +2119,7 @@
}
/**
- * @param {ReadableStreamGenericReader<any>} reader
+ * @param {ReadableStreamGenericReader<any> | ReadableStreamBYOBReader} reader
* @param {any} reason
* @returns {Promise<void>}
*/
@@ -1410,7 +2131,7 @@
/**
* @template R
- * @param {ReadableStreamDefaultReader<R>} reader
+ * @param {ReadableStreamDefaultReader<R> | ReadableStreamBYOBReader} reader
* @param {ReadableStream<R>} stream
*/
function readableStreamReaderGenericInitialize(reader, stream) {
@@ -1431,7 +2152,7 @@
/**
* @template R
- * @param {ReadableStreamGenericReader<R>} reader
+ * @param {ReadableStreamGenericReader<R> | ReadableStreamBYOBReader} reader
*/
function readableStreamReaderGenericRelease(reader) {
assert(reader[_stream] !== undefined);
@@ -1464,6 +2185,22 @@
function readableStreamTee(stream, cloneForBranch2) {
assert(isReadableStream(stream));
assert(typeof cloneForBranch2 === "boolean");
+ if (stream[_controller] instanceof ReadableByteStreamController) {
+ return readableByteStreamTee(stream);
+ } else {
+ return readableStreamDefaultTee(stream, cloneForBranch2);
+ }
+ }
+
+ /**
+ * @template R
+ * @param {ReadableStream<R>} stream
+ * @param {boolean} cloneForBranch2
+ * @returns {[ReadableStream<R>, ReadableStream<R>]}
+ */
+ function readableStreamDefaultTee(stream, cloneForBranch2) {
+ assert(isReadableStream(stream));
+ assert(typeof cloneForBranch2 === "boolean");
const reader = acquireReadableStreamDefaultReader(stream);
let reading = false;
let canceled1 = false;
@@ -1606,6 +2343,270 @@
}
/**
+ * @template R
+ * @param {ReadableStream<R>} stream
+ * @returns {[ReadableStream<R>, ReadableStream<R>]}
+ */
+ function readableByteStreamTee(stream) {
+ assert(isReadableStream(stream));
+ assert(stream[_controller] instanceof ReadableByteStreamController);
+ let reader = acquireReadableStreamDefaultReader(stream);
+ let reading = false;
+ let readAgainForBranch1 = false;
+ let readAgainForBranch2 = false;
+ let canceled1 = false;
+ let canceled2 = false;
+ let reason1 = undefined;
+ let reason2 = undefined;
+ let branch1 = undefined;
+ let branch2 = undefined;
+ /** @type {Deferred<void>} */
+ const cancelPromise = new Deferred();
+
+ /**
+ * @param {ReadableStreamBYOBReader} thisReader
+ */
+ function forwardReaderError(thisReader) {
+ PromisePrototypeCatch(thisReader[_closedPromise].promise, (e) => {
+ if (thisReader !== reader) {
+ return;
+ }
+ readableByteStreamControllerError(branch1[_controller], e);
+ readableByteStreamControllerError(branch2[_controller], e);
+ if (!canceled1 || !canceled2) {
+ cancelPromise.resolve(undefined);
+ }
+ });
+ }
+
+ function pullWithDefaultReader() {
+ if (isReadableStreamBYOBReader(reader)) {
+ assert(reader[_readIntoRequests].length === 0);
+ readableStreamReaderGenericRelease(reader);
+ reader = acquireReadableStreamDefaultReader(stream);
+ forwardReaderError(reader);
+ }
+
+ /** @type {ReadRequest} */
+ const readRequest = {
+ chunkSteps(chunk) {
+ queueMicrotask(() => {
+ readAgainForBranch1 = false;
+ readAgainForBranch2 = false;
+ const chunk1 = chunk;
+ let chunk2 = chunk;
+ if (!canceled1 && !canceled2) {
+ try {
+ chunk2 = cloneAsUint8Array(chunk);
+ } catch (e) {
+ readableByteStreamControllerError(branch1[_controller], e);
+ readableByteStreamControllerError(branch2[_controller], e);
+ cancelPromise.resolve(readableStreamCancel(stream, e));
+ return;
+ }
+ }
+ if (!canceled1) {
+ readableByteStreamControllerEnqueue(branch1[_controller], chunk1);
+ }
+ if (!canceled2) {
+ readableByteStreamControllerEnqueue(branch2[_controller], chunk2);
+ }
+ reading = false;
+ if (readAgainForBranch1) {
+ pull1Algorithm();
+ } else if (readAgainForBranch2) {
+ pull2Algorithm();
+ }
+ });
+ },
+ closeSteps() {
+ reading = false;
+ if (!canceled1) {
+ readableByteStreamControllerClose(branch1[_controller]);
+ }
+ if (!canceled2) {
+ readableByteStreamControllerClose(branch2[_controller]);
+ }
+ if (branch1[_controller][_pendingPullIntos].length !== 0) {
+ readableByteStreamControllerRespond(branch1[_controller], 0);
+ }
+ if (branch2[_controller][_pendingPullIntos].length !== 0) {
+ readableByteStreamControllerRespond(branch2[_controller], 0);
+ }
+ if (!canceled1 || !canceled2) {
+ cancelPromise.resolve(undefined);
+ }
+ },
+ errorSteps() {
+ reading = false;
+ },
+ };
+ readableStreamDefaultReaderRead(reader, readRequest);
+ }
+
+ function pullWithBYOBReader(view, forBranch2) {
+ if (isReadableStreamDefaultReader(reader)) {
+ assert(reader[_readRequests].length === 0);
+ readableStreamReaderGenericRelease(reader);
+ reader = acquireReadableStreamBYOBReader(stream);
+ forwardReaderError(reader);
+ }
+ const byobBranch = forBranch2 ? branch2 : branch1;
+ const otherBranch = forBranch2 ? branch1 : branch2;
+
+ /** @type {ReadIntoRequest} */
+ const readIntoRequest = {
+ chunkSteps(chunk) {
+ queueMicrotask(() => {
+ readAgainForBranch1 = false;
+ readAgainForBranch2 = false;
+ const byobCanceled = forBranch2 ? canceled2 : canceled1;
+ const otherCanceled = forBranch2 ? canceled1 : canceled2;
+ if (!otherCanceled) {
+ let clonedChunk;
+ try {
+ clonedChunk = cloneAsUint8Array(chunk);
+ } catch (e) {
+ readableByteStreamControllerError(byobBranch[_controller], e);
+ readableByteStreamControllerError(otherBranch[_controller], e);
+ cancelPromise.resolve(readableStreamCancel(stream, e));
+ return;
+ }
+ if (!byobCanceled) {
+ readableByteStreamControllerRespondWithNewView(
+ byobBranch[_controller],
+ chunk,
+ );
+ }
+ readableByteStreamControllerEnqueue(
+ otherBranch[_controller],
+ clonedChunk,
+ );
+ } else if (!byobCanceled) {
+ readableByteStreamControllerRespondWithNewView(
+ byobBranch[_controller],
+ chunk,
+ );
+ }
+ reading = false;
+ if (readAgainForBranch1) {
+ pull1Algorithm();
+ } else if (readAgainForBranch2) {
+ pull2Algorithm();
+ }
+ });
+ },
+ closeSteps(chunk) {
+ reading = false;
+ const byobCanceled = forBranch2 ? canceled2 : canceled1;
+ const otherCanceled = forBranch2 ? canceled1 : canceled2;
+ if (!byobCanceled) {
+ readableByteStreamControllerClose(byobBranch[_controller]);
+ }
+ if (!otherCanceled) {
+ readableByteStreamControllerClose(otherBranch[_controller]);
+ }
+ if (chunk !== undefined) {
+ assert(chunk.byteLength === 0);
+ if (!byobCanceled) {
+ readableByteStreamControllerRespondWithNewView(
+ byobBranch[_controller],
+ chunk,
+ );
+ }
+ if (
+ !otherCanceled &&
+ otherBranch[_controller][_pendingPullIntos].length !== 0
+ ) {
+ readableByteStreamControllerRespond(otherBranch[_controller], 0);
+ }
+ }
+ if (!byobCanceled || !otherCanceled) {
+ cancelPromise.resolve(undefined);
+ }
+ },
+ errorSteps() {
+ reading = false;
+ },
+ };
+ readableStreamBYOBReaderRead(reader, view, readIntoRequest);
+ }
+
+ function pull1Algorithm() {
+ if (reading) {
+ readAgainForBranch1 = true;
+ return PromiseResolve(undefined);
+ }
+ reading = true;
+ const byobRequest = readableByteStreamControllerGetBYOBRequest(
+ branch1[_controller],
+ );
+ if (byobRequest === null) {
+ pullWithDefaultReader();
+ } else {
+ pullWithBYOBReader(byobRequest[_view], false);
+ }
+ return PromiseResolve(undefined);
+ }
+
+ function pull2Algorithm() {
+ if (reading) {
+ readAgainForBranch2 = true;
+ return PromiseResolve(undefined);
+ }
+ reading = true;
+ const byobRequest = readableByteStreamControllerGetBYOBRequest(
+ branch2[_controller],
+ );
+ if (byobRequest === null) {
+ pullWithDefaultReader();
+ } else {
+ pullWithBYOBReader(byobRequest[_view], true);
+ }
+ return PromiseResolve(undefined);
+ }
+
+ function cancel1Algorithm(reason) {
+ canceled1 = true;
+ reason1 = reason;
+ if (canceled2) {
+ const compositeReason = [reason1, reason2];
+ const cancelResult = readableStreamCancel(stream, compositeReason);
+ cancelPromise.resolve(cancelResult);
+ }
+ return cancelPromise.promise;
+ }
+
+ function cancel2Algorithm(reason) {
+ canceled2 = true;
+ reason2 = reason;
+ if (canceled1) {
+ const compositeReason = [reason1, reason2];
+ const cancelResult = readableStreamCancel(stream, compositeReason);
+ cancelPromise.resolve(cancelResult);
+ }
+ return cancelPromise.promise;
+ }
+
+ function startAlgorithm() {
+ return undefined;
+ }
+
+ branch1 = createReadableByteStream(
+ startAlgorithm,
+ pull1Algorithm,
+ cancel1Algorithm,
+ );
+ branch2 = createReadableByteStream(
+ startAlgorithm,
+ pull2Algorithm,
+ cancel2Algorithm,
+ );
+ forwardReaderError(reader);
+ return [branch1, branch2];
+ }
+
+ /**
* @param {ReadableStream<ArrayBuffer>} stream
* @param {ReadableByteStreamController} controller
* @param {() => void} startAlgorithm
@@ -1630,14 +2631,14 @@
}
controller[_stream] = stream;
controller[_pullAgain] = controller[_pulling] = false;
- controller[_byobRequest] = undefined;
+ controller[_byobRequest] = null;
resetQueue(controller);
controller[_closeRequested] = controller[_started] = false;
controller[_strategyHWM] = highWaterMark;
controller[_pullAlgorithm] = pullAlgorithm;
controller[_cancelAlgorithm] = cancelAlgorithm;
controller[_autoAllocateChunkSize] = autoAllocateChunkSize;
- // 12. Set controller.[[pendingPullIntos]] to a new empty list.
+ controller[_pendingPullIntos] = [];
stream[_controller] = controller;
const startResult = startAlgorithm();
const startPromise = resolvePromiseWith(startResult);
@@ -1717,9 +2718,10 @@
},
);
}
- // 3.13.27.6 Let autoAllocateChunkSize be ? GetV(underlyingByteSource, "autoAllocateChunkSize").
- /** @type {undefined} */
- const autoAllocateChunkSize = undefined;
+ const autoAllocateChunkSize = underlyingSourceDict["autoAllocateChunkSize"];
+ if (autoAllocateChunkSize === 0) {
+ throw new TypeError("autoAllocateChunkSize must be greater than 0");
+ }
setUpReadableByteStreamController(
stream,
controller,
@@ -1848,6 +2850,22 @@
/**
* @template R
+ * @param {ReadableStreamBYOBReader} reader
+ * @param {ReadableStream<R>} stream
+ */
+ function setUpReadableStreamBYOBReader(reader, stream) {
+ if (isReadableStreamLocked(stream)) {
+ throw new TypeError("ReadableStream is locked.");
+ }
+ if (!(stream[_controller] instanceof ReadableByteStreamController)) {
+ throw new TypeError("Cannot use a BYOB reader with a non-byte stream");
+ }
+ readableStreamReaderGenericInitialize(reader, stream);
+ reader[_readIntoRequests] = [];
+ }
+
+ /**
+ * @template R
* @param {ReadableStreamDefaultReader<R>} reader
* @param {ReadableStream<R>} stream
*/
@@ -3106,7 +4124,7 @@
[_detached];
/** @type {boolean} */
[_disturbed];
- /** @type {ReadableStreamDefaultReader | undefined} */
+ /** @type {ReadableStreamDefaultReader | ReadableStreamBYOBReader} */
[_reader];
/** @type {"readable" | "closed" | "errored"} */
[_state];
@@ -3204,7 +4222,7 @@
/**
* @param {ReadableStreamGetReaderOptions=} options
- * @returns {ReadableStreamDefaultReader<R>}
+ * @returns {ReadableStreamDefaultReader<R> | ReadableStreamBYOBReader}
*/
getReader(options = {}) {
webidl.assertBranded(this, ReadableStream);
@@ -3213,12 +4231,12 @@
prefix,
context: "Argument 1",
});
- const { mode } = options;
- if (mode === undefined) {
+ if (options.mode === undefined) {
return acquireReadableStreamDefaultReader(this);
+ } else {
+ assert(options.mode === "byob");
+ return acquireReadableStreamBYOBReader(this);
}
- // 3. Return ? AcquireReadableStreamBYOBReader(this).
- throw new RangeError(`${prefix}: Unsupported mode '${mode}'`);
}
/**
@@ -3450,10 +4468,201 @@
webidl.configurePrototype(ReadableStreamDefaultReader);
+ /** @template R */
+ class ReadableStreamBYOBReader {
+ /** @type {Deferred<void>} */
+ [_closedPromise];
+ /** @type {ReadableStream<R> | undefined} */
+ [_stream];
+ /** @type {ReadIntoRequest[]} */
+ [_readIntoRequests];
+
+ /** @param {ReadableStream<R>} stream */
+ constructor(stream) {
+ const prefix = "Failed to construct 'ReadableStreamBYOBReader'";
+ webidl.requiredArguments(arguments.length, 1, { prefix });
+ stream = webidl.converters.ReadableStream(stream, {
+ prefix,
+ context: "Argument 1",
+ });
+ this[webidl.brand] = webidl.brand;
+ setUpReadableStreamBYOBReader(this, stream);
+ }
+
+ /**
+ * @param {ArrayBufferView} view
+ * @returns {Promise<ReadableStreamBYOBReadResult>}
+ */
+ read(view) {
+ try {
+ webidl.assertBranded(this, ReadableStreamBYOBReader);
+ const prefix = "Failed to execute 'read' on 'ReadableStreamBYOBReader'";
+ view = webidl.converters.ArrayBufferView(view, {
+ prefix,
+ context: "Argument 1",
+ });
+ } catch (err) {
+ return PromiseReject(err);
+ }
+
+ if (view.byteLength === 0) {
+ return PromiseReject(
+ new TypeError("view must have non-zero byteLength"),
+ );
+ }
+ if (view.buffer.byteLength === 0) {
+ return PromiseReject(
+ new TypeError("view's buffer must have non-zero byteLength"),
+ );
+ }
+ if (isDetachedBuffer(view.buffer)) {
+ return PromiseReject(
+ new TypeError("view's buffer has been detached"),
+ );
+ }
+ if (this[_stream] === undefined) {
+ return PromiseReject(
+ new TypeError("Reader has no associated stream."),
+ );
+ }
+ /** @type {Deferred<ReadableStreamBYOBReadResult>} */
+ const promise = new Deferred();
+ /** @type {ReadIntoRequest} */
+ const readIntoRequest = {
+ chunkSteps(chunk) {
+ promise.resolve({ value: chunk, done: false });
+ },
+ closeSteps(chunk) {
+ promise.resolve({ value: chunk, done: true });
+ },
+ errorSteps(e) {
+ promise.reject(e);
+ },
+ };
+ readableStreamBYOBReaderRead(this, view, readIntoRequest);
+ return promise.promise;
+ }
+
+ /** @returns {void} */
+ releaseLock() {
+ webidl.assertBranded(this, ReadableStreamBYOBReader);
+ if (this[_stream] === undefined) {
+ return;
+ }
+ if (this[_readIntoRequests].length !== 0) {
+ throw new TypeError(
+ "There are pending read requests, so the reader cannot be released.",
+ );
+ }
+ readableStreamReaderGenericRelease(this);
+ }
+
+ get closed() {
+ try {
+ webidl.assertBranded(this, ReadableStreamBYOBReader);
+ } catch (err) {
+ return PromiseReject(err);
+ }
+ return this[_closedPromise].promise;
+ }
+
+ /**
+ * @param {any} reason
+ * @returns {Promise<void>}
+ */
+ cancel(reason = undefined) {
+ try {
+ webidl.assertBranded(this, ReadableStreamBYOBReader);
+ if (reason !== undefined) {
+ reason = webidl.converters.any(reason);
+ }
+ } catch (err) {
+ return PromiseReject(err);
+ }
+
+ if (this[_stream] === undefined) {
+ return PromiseReject(
+ new TypeError("Reader has no associated stream."),
+ );
+ }
+ return readableStreamReaderGenericCancel(this, reason);
+ }
+
+ [SymbolFor("Deno.privateCustomInspect")](inspect) {
+ return `${this.constructor.name} ${inspect({ closed: this.closed })}`;
+ }
+ }
+
+ webidl.configurePrototype(ReadableStreamBYOBReader);
+
+ class ReadableStreamBYOBRequest {
+ /** @type {ReadableByteStreamController} */
+ [_controller];
+ /** @type {ArrayBufferView | null} */
+ [_view];
+
+ /** @returns {ArrayBufferView | null} */
+ get view() {
+ webidl.assertBranded(this, ReadableStreamBYOBRequest);
+ return this[_view];
+ }
+
+ constructor() {
+ webidl.illegalConstructor();
+ }
+
+ respond(bytesWritten) {
+ webidl.assertBranded(this, ReadableStreamBYOBRequest);
+ const prefix =
+ "Failed to execute 'respond' on 'ReadableStreamBYOBRequest'";
+ webidl.requiredArguments(arguments.length, 1, { prefix });
+ bytesWritten = webidl.converters["unsigned long long"](bytesWritten, {
+ enforceRange: true,
+ prefix,
+ context: "Argument 1",
+ });
+
+ if (this[_controller] === undefined) {
+ throw new TypeError("This BYOB request has been invalidated");
+ }
+ if (isDetachedBuffer(this[_view].buffer)) {
+ throw new TypeError(
+ "The BYOB request's buffer has been detached and so cannot be used as a response",
+ );
+ }
+ assert(this[_view].byteLength > 0);
+ assert(this[_view].buffer.byteLength > 0);
+ readableByteStreamControllerRespond(this[_controller], bytesWritten);
+ }
+
+ respondWithNewView(view) {
+ webidl.assertBranded(this, ReadableStreamBYOBRequest);
+ const prefix =
+ "Failed to execute 'respondWithNewView' on 'ReadableStreamBYOBRequest'";
+ webidl.requiredArguments(arguments.length, 1, { prefix });
+ view = webidl.converters.ArrayBufferView(view, {
+ prefix,
+ context: "Argument 1",
+ });
+
+ if (this[_controller] === undefined) {
+ throw new TypeError("This BYOB request has been invalidated");
+ }
+ if (isDetachedBuffer(view.buffer)) {
+ throw new TypeError(
+ "The given view's buffer has been detached and so cannot be used as a response",
+ );
+ }
+ readableByteStreamControllerRespondWithNewView(this[_controller], view);
+ }
+ }
+
+ webidl.configurePrototype(ReadableStreamBYOBRequest);
+
class ReadableByteStreamController {
/** @type {number | undefined} */
[_autoAllocateChunkSize];
- /** @type {null} */
+ /** @type {ReadableStreamBYOBRequest | null} */
[_byobRequest];
/** @type {(reason: any) => Promise<void>} */
[_cancelAlgorithm];
@@ -3465,6 +4674,8 @@
[_pullAlgorithm];
/** @type {boolean} */
[_pulling];
+ /** @type {PullIntoDescriptor[]} */
+ [_pendingPullIntos];
/** @type {ReadableByteStreamQueueEntry[]} */
[_queue];
/** @type {number} */
@@ -3480,9 +4691,10 @@
webidl.illegalConstructor();
}
+ /** @returns {ReadableStreamBYOBRequest | null} */
get byobRequest() {
webidl.assertBranded(this, ReadableByteStreamController);
- return undefined;
+ return readableByteStreamControllerGetBYOBRequest(this);
}
/** @returns {number | null} */
@@ -3570,7 +4782,7 @@
* @returns {Promise<void>}
*/
[_cancelSteps](reason) {
- // 4.7.4. CancelStep 1. If this.[[pendingPullIntos]] is not empty,
+ readableByteStreamControllerClearPendingPullIntos(this);
resetQueue(this);
const result = this[_cancelAlgorithm](reason);
readableByteStreamControllerClearAlgorithms(this);
@@ -3598,8 +4810,28 @@
readRequest.chunkSteps(view);
return;
}
- // 4. Let autoAllocateChunkSize be this.[[autoAllocateChunkSize]].
- // 5. If autoAllocateChunkSize is not undefined,
+ const autoAllocateChunkSize = this[_autoAllocateChunkSize];
+ if (autoAllocateChunkSize !== undefined) {
+ let buffer;
+ try {
+ buffer = new ArrayBuffer(autoAllocateChunkSize);
+ } catch (e) {
+ readRequest.errorSteps(e);
+ return;
+ }
+ /** @type {PullIntoDescriptor} */
+ const pullIntoDescriptor = {
+ buffer,
+ bufferByteLength: autoAllocateChunkSize,
+ byteOffset: 0,
+ byteLength: autoAllocateChunkSize,
+ bytesFilled: 0,
+ elementSize: 1,
+ viewConstructor: Uint8Array,
+ readerType: "default",
+ };
+ ArrayPrototypePush(this[_pendingPullIntos], pullIntoDescriptor);
+ }
readableStreamAddReadRequest(stream, readRequest);
readableByteStreamControllerCallPullIfNeeded(this);
}
@@ -4421,6 +5653,8 @@
WritableStreamDefaultWriter,
WritableStreamDefaultController,
ReadableByteStreamController,
+ ReadableStreamBYOBReader,
+ ReadableStreamBYOBRequest,
ReadableStreamDefaultController,
TransformStreamDefaultController,
};