diff options
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r-- | ext/web/06_streams.js | 89 |
1 files changed, 77 insertions, 12 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index bd3b79149..cbf781b53 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -648,29 +648,76 @@ const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB /** - * @callback unrefCallback - * @param {Promise} promise - * @returns {undefined} + * Create a new ReadableStream object that is backed by a Resource that + * implements `Resource::read_return`. This object contains enough metadata to + * allow callers to bypass the JavaScript ReadableStream implementation and + * read directly from the underlying resource if they so choose (FastStream). + * + * @param {number} rid The resource ID to read from. + * @returns {ReadableStream<Uint8Array>} */ + function readableStreamForRid(rid) { + const stream = webidl.createBranded(ReadableStream); + stream[_maybeRid] = rid; + const underlyingSource = { + type: "bytes", + async pull(controller) { + const v = controller.byobRequest.view; + try { + const bytesRead = await core.read(rid, v); + if (bytesRead === 0) { + core.tryClose(rid); + controller.close(); + controller.byobRequest.respond(0); + } else { + controller.byobRequest.respond(bytesRead); + } + } catch (e) { + controller.error(e); + core.tryClose(rid); + } + }, + cancel() { + core.tryClose(rid); + }, + autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, + }; + initializeReadableStream(stream); + setUpReadableByteStreamControllerFromUnderlyingSource( + stream, + underlyingSource, + underlyingSource, + 0, + ); + return stream; + } + + const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId"); + const _isUnref = Symbol("isUnref"); /** - * @param {number} rid - * @param {unrefCallback=} unrefCallback + * Create a new ReadableStream object that is backed by a Resource that + * implements `Resource::read_return`. This readable stream supports being + * refed and unrefed by calling `readableStreamForRidUnrefableRef` and + * `readableStreamForRidUnrefableUnref` on it. Unrefable streams are not + * FastStream compatible. + * + * @param {number} rid The resource ID to read from. * @returns {ReadableStream<Uint8Array>} */ - function readableStreamForRid(rid, unrefCallback) { + function readableStreamForRidUnrefable(rid) { const stream = webidl.createBranded(ReadableStream); - stream[_maybeRid] = rid; + stream[promiseIdSymbol] = undefined; + stream[_isUnref] = false; const underlyingSource = { type: "bytes", async pull(controller) { const v = controller.byobRequest.view; try { const promise = core.read(rid, v); - - unrefCallback?.(promise); - + const promiseId = stream[promiseIdSymbol] = promise[promiseIdSymbol]; + if (stream[_isUnref]) core.unrefOp(promiseId); const bytesRead = await promise; - + stream[promiseIdSymbol] = undefined; if (bytesRead === 0) { core.tryClose(rid); controller.close(); @@ -695,10 +742,25 @@ underlyingSource, 0, ); - return stream; } + function readableStreamForRidUnrefableRef(stream) { + if (!(_isUnref in stream)) throw new TypeError("Not an unrefable stream"); + stream[_isUnref] = false; + if (stream[promiseIdSymbol] !== undefined) { + core.refOp(stream[promiseIdSymbol]); + } + } + + function readableStreamForRidUnrefableUnref(stream) { + if (!(_isUnref in stream)) throw new TypeError("Not an unrefable stream"); + stream[_isUnref] = true; + if (stream[promiseIdSymbol] !== undefined) { + core.unrefOp(stream[promiseIdSymbol]); + } + } + function getReadableStreamRid(stream) { return stream[_maybeRid]; } @@ -5921,6 +5983,9 @@ readableStreamClose, readableStreamDisturb, readableStreamForRid, + readableStreamForRidUnrefable, + readableStreamForRidUnrefableRef, + readableStreamForRidUnrefableUnref, getReadableStreamRid, Deferred, // Exposed in global runtime scope |