summaryrefslogtreecommitdiff
path: root/ext/web/06_streams.js
diff options
context:
space:
mode:
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r--ext/web/06_streams.js89
1 files changed, 77 insertions, 12 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index bd3b79149..cbf781b53 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -648,29 +648,76 @@
const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB
/**
- * @callback unrefCallback
- * @param {Promise} promise
- * @returns {undefined}
+ * Create a new ReadableStream object that is backed by a Resource that
+ * implements `Resource::read_return`. This object contains enough metadata to
+ * allow callers to bypass the JavaScript ReadableStream implementation and
+ * read directly from the underlying resource if they so choose (FastStream).
+ *
+ * @param {number} rid The resource ID to read from.
+ * @returns {ReadableStream<Uint8Array>}
*/
+ function readableStreamForRid(rid) {
+ const stream = webidl.createBranded(ReadableStream);
+ stream[_maybeRid] = rid;
+ const underlyingSource = {
+ type: "bytes",
+ async pull(controller) {
+ const v = controller.byobRequest.view;
+ try {
+ const bytesRead = await core.read(rid, v);
+ if (bytesRead === 0) {
+ core.tryClose(rid);
+ controller.close();
+ controller.byobRequest.respond(0);
+ } else {
+ controller.byobRequest.respond(bytesRead);
+ }
+ } catch (e) {
+ controller.error(e);
+ core.tryClose(rid);
+ }
+ },
+ cancel() {
+ core.tryClose(rid);
+ },
+ autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
+ };
+ initializeReadableStream(stream);
+ setUpReadableByteStreamControllerFromUnderlyingSource(
+ stream,
+ underlyingSource,
+ underlyingSource,
+ 0,
+ );
+ return stream;
+ }
+
+ const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");
+ const _isUnref = Symbol("isUnref");
/**
- * @param {number} rid
- * @param {unrefCallback=} unrefCallback
+ * Create a new ReadableStream object that is backed by a Resource that
+ * implements `Resource::read_return`. This readable stream supports being
+ * refed and unrefed by calling `readableStreamForRidUnrefableRef` and
+ * `readableStreamForRidUnrefableUnref` on it. Unrefable streams are not
+ * FastStream compatible.
+ *
+ * @param {number} rid The resource ID to read from.
* @returns {ReadableStream<Uint8Array>}
*/
- function readableStreamForRid(rid, unrefCallback) {
+ function readableStreamForRidUnrefable(rid) {
const stream = webidl.createBranded(ReadableStream);
- stream[_maybeRid] = rid;
+ stream[promiseIdSymbol] = undefined;
+ stream[_isUnref] = false;
const underlyingSource = {
type: "bytes",
async pull(controller) {
const v = controller.byobRequest.view;
try {
const promise = core.read(rid, v);
-
- unrefCallback?.(promise);
-
+ const promiseId = stream[promiseIdSymbol] = promise[promiseIdSymbol];
+ if (stream[_isUnref]) core.unrefOp(promiseId);
const bytesRead = await promise;
-
+ stream[promiseIdSymbol] = undefined;
if (bytesRead === 0) {
core.tryClose(rid);
controller.close();
@@ -695,10 +742,25 @@
underlyingSource,
0,
);
-
return stream;
}
+ function readableStreamForRidUnrefableRef(stream) {
+ if (!(_isUnref in stream)) throw new TypeError("Not an unrefable stream");
+ stream[_isUnref] = false;
+ if (stream[promiseIdSymbol] !== undefined) {
+ core.refOp(stream[promiseIdSymbol]);
+ }
+ }
+
+ function readableStreamForRidUnrefableUnref(stream) {
+ if (!(_isUnref in stream)) throw new TypeError("Not an unrefable stream");
+ stream[_isUnref] = true;
+ if (stream[promiseIdSymbol] !== undefined) {
+ core.unrefOp(stream[promiseIdSymbol]);
+ }
+ }
+
function getReadableStreamRid(stream) {
return stream[_maybeRid];
}
@@ -5921,6 +5983,9 @@
readableStreamClose,
readableStreamDisturb,
readableStreamForRid,
+ readableStreamForRidUnrefable,
+ readableStreamForRidUnrefableRef,
+ readableStreamForRidUnrefableUnref,
getReadableStreamRid,
Deferred,
// Exposed in global runtime scope