diff options
author | Leo Kettmeir <crowlkats@toaxl.com> | 2021-11-03 10:47:40 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-03 10:47:40 +0100 |
commit | 95b2955712b0daae3c8e8f7bb0eccf341b5c8fa3 (patch) | |
tree | 42dda897e499393fe6b19799b00f840eaf723c43 /ext/web/06_streams.js | |
parent | 8e31bbbe551e95a40a78fd96671916f917218b93 (diff) |
feat(ext/web): BYOB support for ReadableStream (#12616)
This commit introduces support for BYOB readers in the WHATWG Streams API implementation.
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r-- | ext/web/06_streams.js | 1284 |
1 files changed, 1259 insertions, 25 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index bf6b9722c..92ee08404 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -11,12 +11,21 @@ const webidl = window.__bootstrap.webidl; // TODO(lucacasonato): get AbortSignal from __bootstrap. const { + ArrayBuffer, + ArrayBufferIsView, ArrayPrototypeMap, ArrayPrototypePush, ArrayPrototypeShift, + BigInt64Array, + BigUint64Array, + DataView, Error, + Int8Array, + Int16Array, + Int32Array, NumberIsInteger, NumberIsNaN, + MathMin, ObjectCreate, ObjectDefineProperties, ObjectDefineProperty, @@ -24,15 +33,21 @@ ObjectSetPrototypeOf, Promise, PromiseAll, + PromisePrototypeCatch, PromisePrototypeThen, PromiseReject, + PromiseResolve, queueMicrotask, RangeError, + SharedArrayBuffer, Symbol, SymbolAsyncIterator, SymbolFor, TypeError, Uint8Array, + Uint16Array, + Uint32Array, + Uint8ClampedArray, WeakMap, WeakMapPrototypeGet, WeakMapPrototypeHas, @@ -195,6 +210,20 @@ /** * @param {ArrayBufferLike} O + * @returns {boolean} + */ + function canTransferArrayBuffer(O) { + assert(typeof O === "object"); + assert(O instanceof ArrayBuffer || O instanceof SharedArrayBuffer); + if (isDetachedBuffer(O)) { + return false; + } + // TODO(@crowlKats): 4. If SameValue(O.[[ArrayBufferDetachKey]], undefined) is false, return false. + return true; + } + + /** + * @param {ArrayBufferLike} O * @returns {ArrayBufferLike} */ function transferArrayBuffer(O) { @@ -209,6 +238,18 @@ return transferredIshVersion; } + /** + * @param {ArrayBufferView} O + * @returns {Uint8Array} + */ + function cloneAsUint8Array(O) { + assert(typeof O === "object"); + assert(ArrayBufferIsView(O)); + assert(!isDetachedBuffer(O.buffer)); + const buffer = O.buffer.slice(O.byteOffset, O.byteOffset + O.byteLength); + return new Uint8Array(buffer); + } + const _abortAlgorithm = Symbol("[[abortAlgorithm]]"); const _abortSteps = Symbol("[[AbortSteps]]"); const _autoAllocateChunkSize = Symbol("[[autoAllocateChunkSize]]"); @@ -232,6 +273,7 @@ const _inFlightCloseRequest = Symbol("[[inFlightCloseRequest]]"); const _inFlightWriteRequest = Symbol("[[inFlightWriteRequest]]"); const _pendingAbortRequest = Symbol("[pendingAbortRequest]"); + const _pendingPullIntos = Symbol("[[pendingPullIntos]]"); const _preventCancel = Symbol("[[preventCancel]]"); const _pullAgain = Symbol("[[pullAgain]]"); const _pullAlgorithm = Symbol("[[pullAlgorithm]]"); @@ -242,6 +284,7 @@ const _readable = Symbol("[[readable]]"); const _reader = Symbol("[[reader]]"); const _readRequests = Symbol("[[readRequests]]"); + const _readIntoRequests = Symbol("[[readIntoRequests]]"); const _readyPromise = Symbol("[[readyPromise]]"); const _started = Symbol("[[started]]"); const _state = Symbol("[[state]]"); @@ -250,6 +293,7 @@ const _strategySizeAlgorithm = Symbol("[[strategySizeAlgorithm]]"); const _stream = Symbol("[[stream]]"); const _transformAlgorithm = Symbol("[[transformAlgorithm]]"); + const _view = Symbol("[[view]]"); const _writable = Symbol("[[writable]]"); const _writeAlgorithm = Symbol("[[writeAlgorithm]]"); const _writer = Symbol("[[writer]]"); @@ -265,6 +309,17 @@ } /** + * @template R + * @param {ReadableStream<R>} stream + * @returns {ReadableStreamBYOBReader<R>} + */ + function acquireReadableStreamBYOBReader(stream) { + const reader = webidl.createBranded(ReadableStreamBYOBReader); + setUpReadableStreamBYOBReader(reader, stream); + return reader; + } + + /** * @template W * @param {WritableStream<W>} stream * @returns {WritableStreamDefaultWriter<W>} @@ -413,6 +468,32 @@ } /** + * @param {() => void} startAlgorithm + * @param {() => Promise<void>} pullAlgorithm + * @param {(reason: any) => Promise<void>} cancelAlgorithm + * @returns {ReadableStream} + */ + function createReadableByteStream( + startAlgorithm, + pullAlgorithm, + cancelAlgorithm, + ) { + const stream = webidl.createBranded(ReadableStream); + initializeReadableStream(stream); + const controller = webidl.createBranded(ReadableByteStreamController); + setUpReadableByteStreamController( + stream, + controller, + startAlgorithm, + pullAlgorithm, + cancelAlgorithm, + 0, + undefined, + ); + return stream; + } + + /** * @param {ReadableStream} stream * @returns {void} */ @@ -545,6 +626,15 @@ } /** + * @param {unknown} value + * @returns {value is ReadableStreamBYOBReader} + */ + function isReadableStreamBYOBReader(value) { + return !(typeof value !== "object" || value === null || + !(_readIntoRequests in value)); + } + + /** * @param {ReadableStream} stream * @returns {boolean} */ @@ -638,7 +728,7 @@ if (stream[_state] !== "readable") { return; } - // 3. Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller). + readableByteStreamControllerClearPendingPullIntos(controller); resetQueue(controller); readableByteStreamControllerClearAlgorithms(controller); readableStreamError(stream, e); @@ -648,6 +738,15 @@ * @param {ReadableByteStreamController} controller * @returns {void} */ + function readableByteStreamControllerClearPendingPullIntos(controller) { + readableByteStreamControllerInvalidateBYOBRequest(controller); + controller[_pendingPullIntos] = []; + } + + /** + * @param {ReadableByteStreamController} controller + * @returns {void} + */ function readableByteStreamControllerClose(controller) { /** @type {ReadableStream<ArrayBuffer>} */ const stream = controller[_stream]; @@ -658,7 +757,16 @@ controller[_closeRequested] = true; return; } - // 3.13.6.4 If controller.[[pendingPullIntos]] is not empty, (BYOB Support) + if (controller[_pendingPullIntos].length !== 0) { + const firstPendingPullInto = controller[_pendingPullIntos][0]; + if (firstPendingPullInto.bytesFilled > 0) { + const e = new TypeError( + "Insufficient bytes to fill elements in the given buffer", + ); + readableByteStreamControllerError(controller, e); + throw e; + } + } readableByteStreamControllerClearAlgorithms(controller); readableStreamClose(stream); } @@ -678,9 +786,27 @@ } const { buffer, byteOffset, byteLength } = chunk; + if (isDetachedBuffer(buffer)) { + throw new TypeError( + "chunk's buffer is detached and so cannot be enqueued", + ); + } const transferredBuffer = transferArrayBuffer(buffer); + if (controller[_pendingPullIntos].length !== 0) { + const firstPendingPullInto = controller[_pendingPullIntos][0]; + if (isDetachedBuffer(firstPendingPullInto.buffer)) { + throw new TypeError( + "The BYOB request's buffer has been detached and so cannot be filled with an enqueued chunk", + ); + } + firstPendingPullInto.buffer = transferArrayBuffer( + firstPendingPullInto.buffer, + ); + } + readableByteStreamControllerInvalidateBYOBRequest(controller); if (readableStreamHasDefaultReader(stream)) { if (readableStreamGetNumReadRequests(stream) === 0) { + assert(controller[_pendingPullIntos].length === 0); readableByteStreamControllerEnqueueChunkToQueue( controller, transferredBuffer, @@ -689,6 +815,10 @@ ); } else { assert(controller[_queue].length === 0); + if (controller[_pendingPullIntos].length !== 0) { + assert(controller[_pendingPullIntos][0].readerType === "default"); + readableByteStreamControllerShiftPendingPullInto(controller); + } const transferredView = new Uint8Array( transferredBuffer, byteOffset, @@ -696,7 +826,16 @@ ); readableStreamFulfillReadRequest(stream, transferredView, false); } - // 8 Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true, + } else if (readableStreamHasBYOBReader(stream)) { + readableByteStreamControllerEnqueueChunkToQueue( + controller, + transferredBuffer, + byteOffset, + byteLength, + ); + readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( + controller, + ); } else { assert(isReadableStreamLocked(stream) === false); readableByteStreamControllerEnqueueChunkToQueue( @@ -728,6 +867,29 @@ /** * @param {ReadableByteStreamController} controller + * @returns {ReadableStreamBYOBRequest | null} + */ + function readableByteStreamControllerGetBYOBRequest(controller) { + if ( + controller[_byobRequest] === null && + controller[_pendingPullIntos].length !== 0 + ) { + const firstDescriptor = controller[_pendingPullIntos][0]; + const view = new Uint8Array( + firstDescriptor.buffer, + firstDescriptor.byteOffset + firstDescriptor.bytesFilled, + firstDescriptor.byteLength - firstDescriptor.bytesFilled, + ); + const byobRequest = webidl.createBranded(ReadableStreamBYOBRequest); + byobRequest[_controller] = controller; + byobRequest[_view] = view; + controller[_byobRequest] = byobRequest; + } + return controller[_byobRequest]; + } + + /** + * @param {ReadableByteStreamController} controller * @returns {number | null} */ function readableByteStreamControllerGetDesiredSize(controller) { @@ -786,8 +948,12 @@ ) { return true; } - // 3.13.25.6 If ! ReadableStreamHasBYOBReader(stream) is true and ! - // ReadableStreamGetNumReadIntoRequests(stream) > 0, return true. + if ( + readableStreamHasBYOBReader(stream) && + readableStreamGetNumReadIntoRequests(stream) > 0 + ) { + return true; + } const desiredSize = readableByteStreamControllerGetDesiredSize(controller); assert(desiredSize !== null); return desiredSize > 0; @@ -806,6 +972,17 @@ } /** + * @param {ReadableStream} stream + * @param {ReadIntoRequest} readRequest + * @returns {void} + */ + function readableStreamAddReadIntoRequest(stream, readRequest) { + assert(isReadableStreamBYOBReader(stream[_reader])); + assert(stream[_state] === "readable" || stream[_state] === "closed"); + ArrayPrototypePush(stream[_reader][_readIntoRequests], readRequest); + } + + /** * @template R * @param {ReadableStream<R>} stream * @param {any=} reason @@ -820,6 +997,13 @@ return PromiseReject(stream[_storedError]); } readableStreamClose(stream); + const reader = stream[_reader]; + if (reader !== undefined && isReadableStreamBYOBReader(reader)) { + for (const readIntoRequest of reader[_readIntoRequests]) { + readIntoRequest.closeSteps(undefined); + } + reader[_readIntoRequests] = []; + } /** @type {Promise<void>} */ const sourceCancelPromise = stream[_controller][_cancelSteps](reason); return PromisePrototypeThen(sourceCancelPromise, () => undefined); @@ -1019,6 +1203,491 @@ } /** + * @param {ReadableStreamBYOBReader} reader + * @param {ArrayBufferView} view + * @param {ReadIntoRequest} readIntoRequest + * @returns {void} + */ + function readableStreamBYOBReaderRead(reader, view, readIntoRequest) { + const stream = reader[_stream]; + assert(stream); + stream[_disturbed] = true; + if (stream[_state] === "errored") { + readIntoRequest.errorSteps(stream[_storedError]); + } else { + readableByteStreamControllerPullInto( + stream[_controller], + view, + readIntoRequest, + ); + } + } + + /** + * @param {ReadableByteStreamController} controller + */ + function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( + controller, + ) { + assert(!controller[_closeRequested]); + while (controller[_pendingPullIntos].length !== 0) { + if (controller[_queueTotalSize] === 0) { + return; + } + const pullIntoDescriptor = controller[_pendingPullIntos][0]; + if ( + readableByteStreamControllerFillPullIntoDescriptorFromQueue( + controller, + pullIntoDescriptor, + ) + ) { + readableByteStreamControllerShiftPendingPullInto(controller); + readableByteStreamControllerCommitPullIntoDescriptor( + controller[_stream], + pullIntoDescriptor, + ); + } + } + } + + /** + * @param {ReadableByteStreamController} controller + * @param {ArrayBufferView} view + * @param {ReadIntoRequest} readIntoRequest + * @returns {void} + */ + function readableByteStreamControllerPullInto( + controller, + view, + readIntoRequest, + ) { + const stream = controller[_stream]; + let elementSize = 1; + let ctor = DataView; + + if ( + view instanceof Int8Array || + view instanceof Uint8Array || + view instanceof Uint8ClampedArray || + view instanceof Int16Array || + view instanceof Uint16Array || + view instanceof Int32Array || + view instanceof Uint32Array || + view instanceof BigInt64Array || + view instanceof BigUint64Array + ) { + elementSize = view.constructor.BYTES_PER_ELEMENT; + ctor = view.constructor; + } + const byteOffset = view.byteOffset; + const byteLength = view.byteLength; + + /** @type {ArrayBufferLike} */ + let buffer; + + try { + buffer = transferArrayBuffer(view.buffer); + } catch (e) { + readIntoRequest.errorSteps(e); + return; + } + + /** @type {PullIntoDescriptor} */ + const pullIntoDescriptor = { + buffer, + bufferByteLength: buffer.byteLength, + byteOffset, + byteLength, + bytesFilled: 0, + elementSize, + viewConstructor: ctor, + readerType: "byob", + }; + + if (controller[_pendingPullIntos].length !== 0) { + ArrayPrototypePush(controller[_pendingPullIntos], pullIntoDescriptor); + readableStreamAddReadIntoRequest(stream, readIntoRequest); + return; + } + if (stream[_state] === "closed") { + const emptyView = new ctor( + pullIntoDescriptor.buffer, + pullIntoDescriptor.byteOffset, + 0, + ); + readIntoRequest.closeSteps(emptyView); + return; + } + if (controller[_queueTotalSize] > 0) { + if ( + readableByteStreamControllerFillPullIntoDescriptorFromQueue( + controller, + pullIntoDescriptor, + ) + ) { + const filledView = + readableByteStreamControllerConvertPullIntoDescriptor( + pullIntoDescriptor, + ); + readableByteStreamControllerHandleQueueDrain(controller); + readIntoRequest.chunkSteps(filledView); + return; + } + if (controller[_closeRequested]) { + const e = new TypeError( + "Insufficient bytes to fill elements in the given buffer", + ); + readableByteStreamControllerError(controller, e); + readIntoRequest.errorSteps(e); + return; + } + } + controller[_pendingPullIntos].push(pullIntoDescriptor); + readableStreamAddReadIntoRequest(stream, readIntoRequest); + readableByteStreamControllerCallPullIfNeeded(controller); + } + + /** + * @param {ReadableByteStreamController} controller + * @param {number} bytesWritten + * @returns {void} + */ + function readableByteStreamControllerRespond(controller, bytesWritten) { + assert(controller[_pendingPullIntos].length !== 0); + const firstDescriptor = controller[_pendingPullIntos][0]; + const state = controller[_stream][_state]; + if (state === "closed") { + if (bytesWritten !== 0) { + throw new TypeError( + "bytesWritten must be 0 when calling respond() on a closed stream", + ); + } + } else { + assert(state === "readable"); + if (bytesWritten === 0) { + throw new TypeError( + "bytesWritten must be greater than 0 when calling respond() on a readable stream", + ); + } + if ( + (firstDescriptor.bytesFilled + bytesWritten) > + firstDescriptor.byteLength + ) { + throw new RangeError("bytesWritten out of range"); + } + } + firstDescriptor.buffer = transferArrayBuffer(firstDescriptor.buffer); + readableByteStreamControllerRespondInternal(controller, bytesWritten); + } + + /** + * @param {ReadableByteStreamController} controller + * @param {number} bytesWritten + * @param {PullIntoDescriptor} pullIntoDescriptor + * @returns {void} + */ + function readableByteStreamControllerRespondInReadableState( + controller, + bytesWritten, + pullIntoDescriptor, + ) { + assert( + (pullIntoDescriptor.bytesFilled + bytesWritten) <= + pullIntoDescriptor.byteLength, + ); + readableByteStreamControllerFillHeadPullIntoDescriptor( + controller, + bytesWritten, + pullIntoDescriptor, + ); + if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) { + return; + } + readableByteStreamControllerShiftPendingPullInto(controller); + const remainderSize = pullIntoDescriptor.bytesFilled % + pullIntoDescriptor.elementSize; + if (remainderSize > 0) { + const end = pullIntoDescriptor.byteOffset + + pullIntoDescriptor.bytesFilled; + // We dont have access to CloneArrayBuffer, so we use .slice(). End is non-inclusive, as the spec says. + const remainder = pullIntoDescriptor.buffer.slice( + end - remainderSize, + end, + ); + readableByteStreamControllerEnqueueChunkToQueue( + controller, + remainder, + 0, + remainder.byteLength, + ); + } + pullIntoDescriptor.bytesFilled -= remainderSize; + readableByteStreamControllerCommitPullIntoDescriptor( + controller[_stream], + pullIntoDescriptor, + ); + readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( + controller, + ); + } + + /** + * @param {ReadableByteStreamController} controller + * @param {number} bytesWritten + * @returns {void} + */ + function readableByteStreamControllerRespondInternal( + controller, + bytesWritten, + ) { + const firstDescriptor = controller[_pendingPullIntos][0]; + assert(canTransferArrayBuffer(firstDescriptor.buffer)); + readableByteStreamControllerInvalidateBYOBRequest(controller); + const state = controller[_stream][_state]; + if (state === "closed") { + assert(bytesWritten === 0); + readableByteStreamControllerRespondInClosedState( + controller, + firstDescriptor, + ); + } else { + assert(state === "readable"); + assert(bytesWritten > 0); + readableByteStreamControllerRespondInReadableState( + controller, + bytesWritten, + firstDescriptor, + ); + } + readableByteStreamControllerCallPullIfNeeded(controller); + } + + /** + * @param {ReadableByteStreamController} controller + */ + function readableByteStreamControllerInvalidateBYOBRequest(controller) { + if (controller[_byobRequest] === null) { + return; + } + controller[_byobRequest][_controller] = undefined; + controller[_byobRequest][_view] = null; + controller[_byobRequest] = null; + } + + /** + * @param {ReadableByteStreamController} controller + * @param {PullIntoDescriptor} firstDescriptor + */ + function readableByteStreamControllerRespondInClosedState( + controller, + firstDescriptor, + ) { + assert(firstDescriptor.bytesFilled === 0); + const stream = controller[_stream]; + if (readableStreamHasBYOBReader(stream)) { + while (readableStreamGetNumReadIntoRequests(stream) > 0) { + const pullIntoDescriptor = + readableByteStreamControllerShiftPendingPullInto(controller); + readableByteStreamControllerCommitPullIntoDescriptor( + stream, + pullIntoDescriptor, + ); + } + } + } + + /** + * @template R + * @param {ReadableStream<R>} stream + * @param {PullIntoDescriptor} pullIntoDescriptor + */ + function readableByteStreamControllerCommitPullIntoDescriptor( + stream, + pullIntoDescriptor, + ) { + assert(stream[_state] !== "errored"); + let done = false; + if (stream[_state] === "closed") { + assert(pullIntoDescriptor.bytesFilled === 0); + done = true; + } + const filledView = readableByteStreamControllerConvertPullIntoDescriptor( + pullIntoDescriptor, + ); + if (pullIntoDescriptor.readerType === "default") { + readableStreamFulfillReadRequest(stream, filledView, done); + } else { + assert(pullIntoDescriptor.readerType === "byob"); + readableStreamFulfillReadIntoRequest(stream, filledView, done); + } + } + + /** + * @param {ReadableByteStreamController} controller + * @param {ArrayBufferView} view + */ + function readableByteStreamControllerRespondWithNewView(controller, view) { + assert(controller[_pendingPullIntos].length !== 0); + assert(!isDetachedBuffer(view.buffer)); + const firstDescriptor = controller[_pendingPullIntos][0]; + const state = controller[_stream][_state]; + if (state === "closed") { + if (view.byteLength !== 0) { + throw new TypeError( + "The view's length must be 0 when calling respondWithNewView() on a closed stream", + ); + } + } else { + assert(state === "readable"); + if (view.byteLength === 0) { + throw new TypeError( + "The view's length must be greater than 0 when calling respondWithNewView() on a readable stream", + ); + } + } + if ( + (firstDescriptor.byteOffset + firstDescriptor.bytesFilled) !== + view.byteOffset + ) { + throw new RangeError( + "The region specified by view does not match byobRequest", + ); + } + if (firstDescriptor.bufferByteLength !== view.buffer.byteLength) { + throw new RangeError( + "The buffer of view has different capacity than byobRequest", + ); + } + if ( + (firstDescriptor.bytesFilled + view.byteLength) > + firstDescriptor.byteLength + ) { + throw new RangeError( + "The region specified by view is larger than byobRequest", + ); + } + const viewByteLength = view.byteLength; + firstDescriptor.buffer = transferArrayBuffer(view.buffer); + readableByteStreamControllerRespondInternal(controller, viewByteLength); + } + + /** + * @param {ReadableByteStreamController} controller + * @returns {PullIntoDescriptor} + */ + function readableByteStreamControllerShiftPendingPullInto(controller) { + assert(controller[_byobRequest] === null); + return ArrayPrototypeShift(controller[_pendingPullIntos]); + } + + /** + * @param {ReadableByteStreamController} controller + * @param {PullIntoDescriptor} pullIntoDescriptor + * @returns {boolean} + */ + function readableByteStreamControllerFillPullIntoDescriptorFromQueue( + controller, + pullIntoDescriptor, + ) { + const elementSize = pullIntoDescriptor.elementSize; + const currentAlignedBytes = pullIntoDescriptor.bytesFilled - + (pullIntoDescriptor.bytesFilled % elementSize); + const maxBytesToCopy = MathMin( + controller[_queueTotalSize], + pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled, + ); + const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy; + const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize); + let totalBytesToCopyRemaining = maxBytesToCopy; + let ready = false; + if (maxAlignedBytes > currentAlignedBytes) { + totalBytesToCopyRemaining = maxAlignedBytes - + pullIntoDescriptor.bytesFilled; + ready = true; + } + const queue = controller[_queue]; + while (totalBytesToCopyRemaining > 0) { + const headOfQueue = queue[0]; + const bytesToCopy = MathMin( + totalBytesToCopyRemaining, + headOfQueue.byteLength, + ); + const destStart = pullIntoDescriptor.byteOffset + + pullIntoDescriptor.bytesFilled; + + const destBuffer = new Uint8Array( + pullIntoDescriptor.buffer, + destStart, + bytesToCopy, + ); + const srcBuffer = new Uint8Array( + headOfQueue.buffer, + headOfQueue.byteOffset, + bytesToCopy, + ); + destBuffer.set(srcBuffer); + + if (headOfQueue.byteLength === bytesToCopy) { + ArrayPrototypeShift(queue); + } else { + headOfQueue.byteOffset += bytesToCopy; + headOfQueue.byteLength -= bytesToCopy; + } + controller[_queueTotalSize] -= bytesToCopy; + readableByteStreamControllerFillHeadPullIntoDescriptor( + controller, + bytesToCopy, + pullIntoDescriptor, + ); + totalBytesToCopyRemaining -= bytesToCopy; + } + if (!ready) { + assert(controller[_queueTotalSize] === 0); + assert(pullIntoDescriptor.bytesFilled > 0); + assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize); + } + return ready; + } + + /** + * @param {ReadableByteStreamController} controller + * @param {number} size + * @param {PullIntoDescriptor} pullIntoDescriptor + * @returns {void} + */ + function readableByteStreamControllerFillHeadPullIntoDescriptor( + controller, + size, + pullIntoDescriptor, + ) { + assert( + controller[_pendingPullIntos].length === 0 || + controller[_pendingPullIntos][0] === pullIntoDescriptor, + ); + assert(controller[_byobRequest] === null); + pullIntoDescriptor.bytesFilled += size; + } + + /** + * @param {PullIntoDescriptor} pullIntoDescriptor + * @returns {ArrayBufferView} + */ + function readableByteStreamControllerConvertPullIntoDescriptor( + pullIntoDescriptor, + ) { + const bytesFilled = pullIntoDescriptor.bytesFilled; + const elementSize = pullIntoDescriptor.elementSize; + assert(bytesFilled <= pullIntoDescriptor.byteLength); + assert((bytesFilled % elementSize) === 0); + const buffer = transferArrayBuffer(pullIntoDescriptor.buffer); + return new pullIntoDescriptor.viewConstructor( + buffer, + pullIntoDescriptor.byteOffset, + bytesFilled / elementSize, + ); + } + + /** * @template R * @param {ReadableStreamDefaultReader<R>} reader * @param {ReadRequest<R>} readRequest @@ -1063,8 +1732,33 @@ readRequest.errorSteps(e); } reader[_readRequests] = []; + } else { + assert(isReadableStreamBYOBReader(reader)); + for (const readIntoRequest of reader[_readIntoRequests]) { + readIntoRequest.errorSteps(e); + } + reader[_readIntoRequests] = []; + } + } + + /** + * @template R + * @param {ReadableStream<R>} stream + * @param {R} chunk + * @param {boolean} done + */ + function readableStreamFulfillReadIntoRequest(stream, chunk, done) { + assert(readableStreamHasBYOBReader(stream)); + /** @type {ReadableStreamDefaultReader<R>} */ + const reader = stream[_reader]; + assert(reader[_readIntoRequests].length !== 0); + /** @type {ReadIntoRequest} */ + const readIntoRequest = ArrayPrototypeShift(reader[_readIntoRequests]); + if (done) { + readIntoRequest.closeSteps(chunk); + } else { + readIntoRequest.chunkSteps(chunk); } - // 3.5.6.8 Otherwise, support BYOB Reader } /** @@ -1091,6 +1785,15 @@ * @param {ReadableStream} stream * @return {number} */ + function readableStreamGetNumReadIntoRequests(stream) { + assert(readableStreamHasBYOBReader(stream) === true); + return stream[_reader][_readIntoRequests].length; + } + + /** + * @param {ReadableStream} stream + * @return {number} + */ function readableStreamGetNumReadRequests(stream) { assert(readableStreamHasDefaultReader(stream) === true); return stream[_reader][_readRequests].length; @@ -1100,6 +1803,21 @@ * @param {ReadableStream} stream * @returns {boolean} */ + function readableStreamHasBYOBReader(stream) { + const reader = stream[_reader]; + if (reader === undefined) { + return false; + } + if (isReadableStreamBYOBReader(reader)) { + return true; + } + return false; + } + + /** + * @param {ReadableStream} stream + * @returns {boolean} + */ function readableStreamHasDefaultReader(stream) { const reader = stream[_reader]; if (reader === undefined) { @@ -1138,6 +1856,9 @@ assert(signal === undefined || signal instanceof AbortSignal); assert(!isReadableStreamLocked(source)); assert(!isWritableStreamLocked(dest)); + // We use acquireReadableStreamDefaultReader even in case of ReadableByteStreamController + // as the spec allows us, and the only reason to use BYOBReader is to do some smart things + // with it, but the spec does not specify what things, so to simplify we stick to DefaultReader. const reader = acquireReadableStreamDefaultReader(source); const writer = acquireWritableStreamDefaultWriter(dest); source[_disturbed] = true; @@ -1398,7 +2119,7 @@ } /** - * @param {ReadableStreamGenericReader<any>} reader + * @param {ReadableStreamGenericReader<any> | ReadableStreamBYOBReader} reader * @param {any} reason * @returns {Promise<void>} */ @@ -1410,7 +2131,7 @@ /** * @template R - * @param {ReadableStreamDefaultReader<R>} reader + * @param {ReadableStreamDefaultReader<R> | ReadableStreamBYOBReader} reader * @param {ReadableStream<R>} stream */ function readableStreamReaderGenericInitialize(reader, stream) { @@ -1431,7 +2152,7 @@ /** * @template R - * @param {ReadableStreamGenericReader<R>} reader + * @param {ReadableStreamGenericReader<R> | ReadableStreamBYOBReader} reader */ function readableStreamReaderGenericRelease(reader) { assert(reader[_stream] !== undefined); @@ -1464,6 +2185,22 @@ function readableStreamTee(stream, cloneForBranch2) { assert(isReadableStream(stream)); assert(typeof cloneForBranch2 === "boolean"); + if (stream[_controller] instanceof ReadableByteStreamController) { + return readableByteStreamTee(stream); + } else { + return readableStreamDefaultTee(stream, cloneForBranch2); + } + } + + /** + * @template R + * @param {ReadableStream<R>} stream + * @param {boolean} cloneForBranch2 + * @returns {[ReadableStream<R>, ReadableStream<R>]} + */ + function readableStreamDefaultTee(stream, cloneForBranch2) { + assert(isReadableStream(stream)); + assert(typeof cloneForBranch2 === "boolean"); const reader = acquireReadableStreamDefaultReader(stream); let reading = false; let canceled1 = false; @@ -1606,6 +2343,270 @@ } /** + * @template R + * @param {ReadableStream<R>} stream + * @returns {[ReadableStream<R>, ReadableStream<R>]} + */ + function readableByteStreamTee(stream) { + assert(isReadableStream(stream)); + assert(stream[_controller] instanceof ReadableByteStreamController); + let reader = acquireReadableStreamDefaultReader(stream); + let reading = false; + let readAgainForBranch1 = false; + let readAgainForBranch2 = false; + let canceled1 = false; + let canceled2 = false; + let reason1 = undefined; + let reason2 = undefined; + let branch1 = undefined; + let branch2 = undefined; + /** @type {Deferred<void>} */ + const cancelPromise = new Deferred(); + + /** + * @param {ReadableStreamBYOBReader} thisReader + */ + function forwardReaderError(thisReader) { + PromisePrototypeCatch(thisReader[_closedPromise].promise, (e) => { + if (thisReader !== reader) { + return; + } + readableByteStreamControllerError(branch1[_controller], e); + readableByteStreamControllerError(branch2[_controller], e); + if (!canceled1 || !canceled2) { + cancelPromise.resolve(undefined); + } + }); + } + + function pullWithDefaultReader() { + if (isReadableStreamBYOBReader(reader)) { + assert(reader[_readIntoRequests].length === 0); + readableStreamReaderGenericRelease(reader); + reader = acquireReadableStreamDefaultReader(stream); + forwardReaderError(reader); + } + + /** @type {ReadRequest} */ + const readRequest = { + chunkSteps(chunk) { + queueMicrotask(() => { + readAgainForBranch1 = false; + readAgainForBranch2 = false; + const chunk1 = chunk; + let chunk2 = chunk; + if (!canceled1 && !canceled2) { + try { + chunk2 = cloneAsUint8Array(chunk); + } catch (e) { + readableByteStreamControllerError(branch1[_controller], e); + readableByteStreamControllerError(branch2[_controller], e); + cancelPromise.resolve(readableStreamCancel(stream, e)); + return; + } + } + if (!canceled1) { + readableByteStreamControllerEnqueue(branch1[_controller], chunk1); + } + if (!canceled2) { + readableByteStreamControllerEnqueue(branch2[_controller], chunk2); + } + reading = false; + if (readAgainForBranch1) { + pull1Algorithm(); + } else if (readAgainForBranch2) { + pull2Algorithm(); + } + }); + }, + closeSteps() { + reading = false; + if (!canceled1) { + readableByteStreamControllerClose(branch1[_controller]); + } + if (!canceled2) { + readableByteStreamControllerClose(branch2[_controller]); + } + if (branch1[_controller][_pendingPullIntos].length !== 0) { + readableByteStreamControllerRespond(branch1[_controller], 0); + } + if (branch2[_controller][_pendingPullIntos].length !== 0) { + readableByteStreamControllerRespond(branch2[_controller], 0); + } + if (!canceled1 || !canceled2) { + cancelPromise.resolve(undefined); + } + }, + errorSteps() { + reading = false; + }, + }; + readableStreamDefaultReaderRead(reader, readRequest); + } + + function pullWithBYOBReader(view, forBranch2) { + if (isReadableStreamDefaultReader(reader)) { + assert(reader[_readRequests].length === 0); + readableStreamReaderGenericRelease(reader); + reader = acquireReadableStreamBYOBReader(stream); + forwardReaderError(reader); + } + const byobBranch = forBranch2 ? branch2 : branch1; + const otherBranch = forBranch2 ? branch1 : branch2; + + /** @type {ReadIntoRequest} */ + const readIntoRequest = { + chunkSteps(chunk) { + queueMicrotask(() => { + readAgainForBranch1 = false; + readAgainForBranch2 = false; + const byobCanceled = forBranch2 ? canceled2 : canceled1; + const otherCanceled = forBranch2 ? canceled1 : canceled2; + if (!otherCanceled) { + let clonedChunk; + try { + clonedChunk = cloneAsUint8Array(chunk); + } catch (e) { + readableByteStreamControllerError(byobBranch[_controller], e); + readableByteStreamControllerError(otherBranch[_controller], e); + cancelPromise.resolve(readableStreamCancel(stream, e)); + return; + } + if (!byobCanceled) { + readableByteStreamControllerRespondWithNewView( + byobBranch[_controller], + chunk, + ); + } + readableByteStreamControllerEnqueue( + otherBranch[_controller], + clonedChunk, + ); + } else if (!byobCanceled) { + readableByteStreamControllerRespondWithNewView( + byobBranch[_controller], + chunk, + ); + } + reading = false; + if (readAgainForBranch1) { + pull1Algorithm(); + } else if (readAgainForBranch2) { + pull2Algorithm(); + } + }); + }, + closeSteps(chunk) { + reading = false; + const byobCanceled = forBranch2 ? canceled2 : canceled1; + const otherCanceled = forBranch2 ? canceled1 : canceled2; + if (!byobCanceled) { + readableByteStreamControllerClose(byobBranch[_controller]); + } + if (!otherCanceled) { + readableByteStreamControllerClose(otherBranch[_controller]); + } + if (chunk !== undefined) { + assert(chunk.byteLength === 0); + if (!byobCanceled) { + readableByteStreamControllerRespondWithNewView( + byobBranch[_controller], + chunk, + ); + } + if ( + !otherCanceled && + otherBranch[_controller][_pendingPullIntos].length !== 0 + ) { + readableByteStreamControllerRespond(otherBranch[_controller], 0); + } + } + if (!byobCanceled || !otherCanceled) { + cancelPromise.resolve(undefined); + } + }, + errorSteps() { + reading = false; + }, + }; + readableStreamBYOBReaderRead(reader, view, readIntoRequest); + } + + function pull1Algorithm() { + if (reading) { + readAgainForBranch1 = true; + return PromiseResolve(undefined); + } + reading = true; + const byobRequest = readableByteStreamControllerGetBYOBRequest( + branch1[_controller], + ); + if (byobRequest === null) { + pullWithDefaultReader(); + } else { + pullWithBYOBReader(byobRequest[_view], false); + } + return PromiseResolve(undefined); + } + + function pull2Algorithm() { + if (reading) { + readAgainForBranch2 = true; + return PromiseResolve(undefined); + } + reading = true; + const byobRequest = readableByteStreamControllerGetBYOBRequest( + branch2[_controller], + ); + if (byobRequest === null) { + pullWithDefaultReader(); + } else { + pullWithBYOBReader(byobRequest[_view], true); + } + return PromiseResolve(undefined); + } + + function cancel1Algorithm(reason) { + canceled1 = true; + reason1 = reason; + if (canceled2) { + const compositeReason = [reason1, reason2]; + const cancelResult = readableStreamCancel(stream, compositeReason); + cancelPromise.resolve(cancelResult); + } + return cancelPromise.promise; + } + + function cancel2Algorithm(reason) { + canceled2 = true; + reason2 = reason; + if (canceled1) { + const compositeReason = [reason1, reason2]; + const cancelResult = readableStreamCancel(stream, compositeReason); + cancelPromise.resolve(cancelResult); + } + return cancelPromise.promise; + } + + function startAlgorithm() { + return undefined; + } + + branch1 = createReadableByteStream( + startAlgorithm, + pull1Algorithm, + cancel1Algorithm, + ); + branch2 = createReadableByteStream( + startAlgorithm, + pull2Algorithm, + cancel2Algorithm, + ); + forwardReaderError(reader); + return [branch1, branch2]; + } + + /** * @param {ReadableStream<ArrayBuffer>} stream * @param {ReadableByteStreamController} controller * @param {() => void} startAlgorithm @@ -1630,14 +2631,14 @@ } controller[_stream] = stream; controller[_pullAgain] = controller[_pulling] = false; - controller[_byobRequest] = undefined; + controller[_byobRequest] = null; resetQueue(controller); controller[_closeRequested] = controller[_started] = false; controller[_strategyHWM] = highWaterMark; controller[_pullAlgorithm] = pullAlgorithm; controller[_cancelAlgorithm] = cancelAlgorithm; controller[_autoAllocateChunkSize] = autoAllocateChunkSize; - // 12. Set controller.[[pendingPullIntos]] to a new empty list. + controller[_pendingPullIntos] = []; stream[_controller] = controller; const startResult = startAlgorithm(); const startPromise = resolvePromiseWith(startResult); @@ -1717,9 +2718,10 @@ }, ); } - // 3.13.27.6 Let autoAllocateChunkSize be ? GetV(underlyingByteSource, "autoAllocateChunkSize"). - /** @type {undefined} */ - const autoAllocateChunkSize = undefined; + const autoAllocateChunkSize = underlyingSourceDict["autoAllocateChunkSize"]; + if (autoAllocateChunkSize === 0) { + throw new TypeError("autoAllocateChunkSize must be greater than 0"); + } setUpReadableByteStreamController( stream, controller, @@ -1848,6 +2850,22 @@ /** * @template R + * @param {ReadableStreamBYOBReader} reader + * @param {ReadableStream<R>} stream + */ + function setUpReadableStreamBYOBReader(reader, stream) { + if (isReadableStreamLocked(stream)) { + throw new TypeError("ReadableStream is locked."); + } + if (!(stream[_controller] instanceof ReadableByteStreamController)) { + throw new TypeError("Cannot use a BYOB reader with a non-byte stream"); + } + readableStreamReaderGenericInitialize(reader, stream); + reader[_readIntoRequests] = []; + } + + /** + * @template R * @param {ReadableStreamDefaultReader<R>} reader * @param {ReadableStream<R>} stream */ @@ -3106,7 +4124,7 @@ [_detached]; /** @type {boolean} */ [_disturbed]; - /** @type {ReadableStreamDefaultReader | undefined} */ + /** @type {ReadableStreamDefaultReader | ReadableStreamBYOBReader} */ [_reader]; /** @type {"readable" | "closed" | "errored"} */ [_state]; @@ -3204,7 +4222,7 @@ /** * @param {ReadableStreamGetReaderOptions=} options - * @returns {ReadableStreamDefaultReader<R>} + * @returns {ReadableStreamDefaultReader<R> | ReadableStreamBYOBReader} */ getReader(options = {}) { webidl.assertBranded(this, ReadableStream); @@ -3213,12 +4231,12 @@ prefix, context: "Argument 1", }); - const { mode } = options; - if (mode === undefined) { + if (options.mode === undefined) { return acquireReadableStreamDefaultReader(this); + } else { + assert(options.mode === "byob"); + return acquireReadableStreamBYOBReader(this); } - // 3. Return ? AcquireReadableStreamBYOBReader(this). - throw new RangeError(`${prefix}: Unsupported mode '${mode}'`); } /** @@ -3450,10 +4468,201 @@ webidl.configurePrototype(ReadableStreamDefaultReader); + /** @template R */ + class ReadableStreamBYOBReader { + /** @type {Deferred<void>} */ + [_closedPromise]; + /** @type {ReadableStream<R> | undefined} */ + [_stream]; + /** @type {ReadIntoRequest[]} */ + [_readIntoRequests]; + + /** @param {ReadableStream<R>} stream */ + constructor(stream) { + const prefix = "Failed to construct 'ReadableStreamBYOBReader'"; + webidl.requiredArguments(arguments.length, 1, { prefix }); + stream = webidl.converters.ReadableStream(stream, { + prefix, + context: "Argument 1", + }); + this[webidl.brand] = webidl.brand; + setUpReadableStreamBYOBReader(this, stream); + } + + /** + * @param {ArrayBufferView} view + * @returns {Promise<ReadableStreamBYOBReadResult>} + */ + read(view) { + try { + webidl.assertBranded(this, ReadableStreamBYOBReader); + const prefix = "Failed to execute 'read' on 'ReadableStreamBYOBReader'"; + view = webidl.converters.ArrayBufferView(view, { + prefix, + context: "Argument 1", + }); + } catch (err) { + return PromiseReject(err); + } + + if (view.byteLength === 0) { + return PromiseReject( + new TypeError("view must have non-zero byteLength"), + ); + } + if (view.buffer.byteLength === 0) { + return PromiseReject( + new TypeError("view's buffer must have non-zero byteLength"), + ); + } + if (isDetachedBuffer(view.buffer)) { + return PromiseReject( + new TypeError("view's buffer has been detached"), + ); + } + if (this[_stream] === undefined) { + return PromiseReject( + new TypeError("Reader has no associated stream."), + ); + } + /** @type {Deferred<ReadableStreamBYOBReadResult>} */ + const promise = new Deferred(); + /** @type {ReadIntoRequest} */ + const readIntoRequest = { + chunkSteps(chunk) { + promise.resolve({ value: chunk, done: false }); + }, + closeSteps(chunk) { + promise.resolve({ value: chunk, done: true }); + }, + errorSteps(e) { + promise.reject(e); + }, + }; + readableStreamBYOBReaderRead(this, view, readIntoRequest); + return promise.promise; + } + + /** @returns {void} */ + releaseLock() { + webidl.assertBranded(this, ReadableStreamBYOBReader); + if (this[_stream] === undefined) { + return; + } + if (this[_readIntoRequests].length !== 0) { + throw new TypeError( + "There are pending read requests, so the reader cannot be released.", + ); + } + readableStreamReaderGenericRelease(this); + } + + get closed() { + try { + webidl.assertBranded(this, ReadableStreamBYOBReader); + } catch (err) { + return PromiseReject(err); + } + return this[_closedPromise].promise; + } + + /** + * @param {any} reason + * @returns {Promise<void>} + */ + cancel(reason = undefined) { + try { + webidl.assertBranded(this, ReadableStreamBYOBReader); + if (reason !== undefined) { + reason = webidl.converters.any(reason); + } + } catch (err) { + return PromiseReject(err); + } + + if (this[_stream] === undefined) { + return PromiseReject( + new TypeError("Reader has no associated stream."), + ); + } + return readableStreamReaderGenericCancel(this, reason); + } + + [SymbolFor("Deno.privateCustomInspect")](inspect) { + return `${this.constructor.name} ${inspect({ closed: this.closed })}`; + } + } + + webidl.configurePrototype(ReadableStreamBYOBReader); + + class ReadableStreamBYOBRequest { + /** @type {ReadableByteStreamController} */ + [_controller]; + /** @type {ArrayBufferView | null} */ + [_view]; + + /** @returns {ArrayBufferView | null} */ + get view() { + webidl.assertBranded(this, ReadableStreamBYOBRequest); + return this[_view]; + } + + constructor() { + webidl.illegalConstructor(); + } + + respond(bytesWritten) { + webidl.assertBranded(this, ReadableStreamBYOBRequest); + const prefix = + "Failed to execute 'respond' on 'ReadableStreamBYOBRequest'"; + webidl.requiredArguments(arguments.length, 1, { prefix }); + bytesWritten = webidl.converters["unsigned long long"](bytesWritten, { + enforceRange: true, + prefix, + context: "Argument 1", + }); + + if (this[_controller] === undefined) { + throw new TypeError("This BYOB request has been invalidated"); + } + if (isDetachedBuffer(this[_view].buffer)) { + throw new TypeError( + "The BYOB request's buffer has been detached and so cannot be used as a response", + ); + } + assert(this[_view].byteLength > 0); + assert(this[_view].buffer.byteLength > 0); + readableByteStreamControllerRespond(this[_controller], bytesWritten); + } + + respondWithNewView(view) { + webidl.assertBranded(this, ReadableStreamBYOBRequest); + const prefix = + "Failed to execute 'respondWithNewView' on 'ReadableStreamBYOBRequest'"; + webidl.requiredArguments(arguments.length, 1, { prefix }); + view = webidl.converters.ArrayBufferView(view, { + prefix, + context: "Argument 1", + }); + + if (this[_controller] === undefined) { + throw new TypeError("This BYOB request has been invalidated"); + } + if (isDetachedBuffer(view.buffer)) { + throw new TypeError( + "The given view's buffer has been detached and so cannot be used as a response", + ); + } + readableByteStreamControllerRespondWithNewView(this[_controller], view); + } + } + + webidl.configurePrototype(ReadableStreamBYOBRequest); + class ReadableByteStreamController { /** @type {number | undefined} */ [_autoAllocateChunkSize]; - /** @type {null} */ + /** @type {ReadableStreamBYOBRequest | null} */ [_byobRequest]; /** @type {(reason: any) => Promise<void>} */ [_cancelAlgorithm]; @@ -3465,6 +4674,8 @@ [_pullAlgorithm]; /** @type {boolean} */ [_pulling]; + /** @type {PullIntoDescriptor[]} */ + [_pendingPullIntos]; /** @type {ReadableByteStreamQueueEntry[]} */ [_queue]; /** @type {number} */ @@ -3480,9 +4691,10 @@ webidl.illegalConstructor(); } + /** @returns {ReadableStreamBYOBRequest | null} */ get byobRequest() { webidl.assertBranded(this, ReadableByteStreamController); - return undefined; + return readableByteStreamControllerGetBYOBRequest(this); } /** @returns {number | null} */ @@ -3570,7 +4782,7 @@ * @returns {Promise<void>} */ [_cancelSteps](reason) { - // 4.7.4. CancelStep 1. If this.[[pendingPullIntos]] is not empty, + readableByteStreamControllerClearPendingPullIntos(this); resetQueue(this); const result = this[_cancelAlgorithm](reason); readableByteStreamControllerClearAlgorithms(this); @@ -3598,8 +4810,28 @@ readRequest.chunkSteps(view); return; } - // 4. Let autoAllocateChunkSize be this.[[autoAllocateChunkSize]]. - // 5. If autoAllocateChunkSize is not undefined, + const autoAllocateChunkSize = this[_autoAllocateChunkSize]; + if (autoAllocateChunkSize !== undefined) { + let buffer; + try { + buffer = new ArrayBuffer(autoAllocateChunkSize); + } catch (e) { + readRequest.errorSteps(e); + return; + } + /** @type {PullIntoDescriptor} */ + const pullIntoDescriptor = { + buffer, + bufferByteLength: autoAllocateChunkSize, + byteOffset: 0, + byteLength: autoAllocateChunkSize, + bytesFilled: 0, + elementSize: 1, + viewConstructor: Uint8Array, + readerType: "default", + }; + ArrayPrototypePush(this[_pendingPullIntos], pullIntoDescriptor); + } readableStreamAddReadRequest(stream, readRequest); readableByteStreamControllerCallPullIfNeeded(this); } @@ -4421,6 +5653,8 @@ WritableStreamDefaultWriter, WritableStreamDefaultController, ReadableByteStreamController, + ReadableStreamBYOBReader, + ReadableStreamBYOBRequest, ReadableStreamDefaultController, TransformStreamDefaultController, }; |