summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ext/net/01_net.js1
-rw-r--r--ext/web/06_streams.js89
-rw-r--r--runtime/js/40_spawn.js26
3 files changed, 89 insertions, 27 deletions
diff --git a/ext/net/01_net.js b/ext/net/01_net.js
index 2c7ec0f47..04360116b 100644
--- a/ext/net/01_net.js
+++ b/ext/net/01_net.js
@@ -354,7 +354,6 @@
resolveDns,
};
window.__bootstrap.streamUtils = {
- readableStreamForRid,
writableStreamForRid,
};
})(this);
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index bd3b79149..cbf781b53 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -648,29 +648,76 @@
const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB
/**
- * @callback unrefCallback
- * @param {Promise} promise
- * @returns {undefined}
+ * Create a new ReadableStream object that is backed by a Resource that
+ * implements `Resource::read_return`. This object contains enough metadata to
+ * allow callers to bypass the JavaScript ReadableStream implementation and
+ * read directly from the underlying resource if they so choose (FastStream).
+ *
+ * @param {number} rid The resource ID to read from.
+ * @returns {ReadableStream<Uint8Array>}
*/
+ function readableStreamForRid(rid) {
+ const stream = webidl.createBranded(ReadableStream);
+ stream[_maybeRid] = rid;
+ const underlyingSource = {
+ type: "bytes",
+ async pull(controller) {
+ const v = controller.byobRequest.view;
+ try {
+ const bytesRead = await core.read(rid, v);
+ if (bytesRead === 0) {
+ core.tryClose(rid);
+ controller.close();
+ controller.byobRequest.respond(0);
+ } else {
+ controller.byobRequest.respond(bytesRead);
+ }
+ } catch (e) {
+ controller.error(e);
+ core.tryClose(rid);
+ }
+ },
+ cancel() {
+ core.tryClose(rid);
+ },
+ autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
+ };
+ initializeReadableStream(stream);
+ setUpReadableByteStreamControllerFromUnderlyingSource(
+ stream,
+ underlyingSource,
+ underlyingSource,
+ 0,
+ );
+ return stream;
+ }
+
+ const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");
+ const _isUnref = Symbol("isUnref");
/**
- * @param {number} rid
- * @param {unrefCallback=} unrefCallback
+ * Create a new ReadableStream object that is backed by a Resource that
+ * implements `Resource::read_return`. This readable stream supports being
+ * refed and unrefed by calling `readableStreamForRidUnrefableRef` and
+ * `readableStreamForRidUnrefableUnref` on it. Unrefable streams are not
+ * FastStream compatible.
+ *
+ * @param {number} rid The resource ID to read from.
* @returns {ReadableStream<Uint8Array>}
*/
- function readableStreamForRid(rid, unrefCallback) {
+ function readableStreamForRidUnrefable(rid) {
const stream = webidl.createBranded(ReadableStream);
- stream[_maybeRid] = rid;
+ stream[promiseIdSymbol] = undefined;
+ stream[_isUnref] = false;
const underlyingSource = {
type: "bytes",
async pull(controller) {
const v = controller.byobRequest.view;
try {
const promise = core.read(rid, v);
-
- unrefCallback?.(promise);
-
+ const promiseId = stream[promiseIdSymbol] = promise[promiseIdSymbol];
+ if (stream[_isUnref]) core.unrefOp(promiseId);
const bytesRead = await promise;
-
+ stream[promiseIdSymbol] = undefined;
if (bytesRead === 0) {
core.tryClose(rid);
controller.close();
@@ -695,10 +742,25 @@
underlyingSource,
0,
);
-
return stream;
}
+ function readableStreamForRidUnrefableRef(stream) {
+ if (!(_isUnref in stream)) throw new TypeError("Not an unrefable stream");
+ stream[_isUnref] = false;
+ if (stream[promiseIdSymbol] !== undefined) {
+ core.refOp(stream[promiseIdSymbol]);
+ }
+ }
+
+ function readableStreamForRidUnrefableUnref(stream) {
+ if (!(_isUnref in stream)) throw new TypeError("Not an unrefable stream");
+ stream[_isUnref] = true;
+ if (stream[promiseIdSymbol] !== undefined) {
+ core.unrefOp(stream[promiseIdSymbol]);
+ }
+ }
+
function getReadableStreamRid(stream) {
return stream[_maybeRid];
}
@@ -5921,6 +5983,9 @@
readableStreamClose,
readableStreamDisturb,
readableStreamForRid,
+ readableStreamForRidUnrefable,
+ readableStreamForRidUnrefableRef,
+ readableStreamForRidUnrefableUnref,
getReadableStreamRid,
Deferred,
// Exposed in global runtime scope
diff --git a/runtime/js/40_spawn.js b/runtime/js/40_spawn.js
index daa4f8ff8..99661bf1a 100644
--- a/runtime/js/40_spawn.js
+++ b/runtime/js/40_spawn.js
@@ -16,8 +16,12 @@
PromiseAll,
SymbolFor,
} = window.__bootstrap.primordials;
- const { readableStreamForRid, writableStreamForRid } =
- window.__bootstrap.streamUtils;
+ const {
+ readableStreamForRidUnrefable,
+ readableStreamForRidUnrefableRef,
+ readableStreamForRidUnrefableUnref,
+ } = window.__bootstrap.streams;
+ const { writableStreamForRid } = window.__bootstrap.streamUtils;
const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");
@@ -136,18 +140,12 @@
if (stdoutRid !== null) {
this.#stdoutRid = stdoutRid;
- this.#stdout = readableStreamForRid(stdoutRid, (promise) => {
- this.#stdoutPromiseId = promise[promiseIdSymbol];
- if (this.#unrefed) core.unrefOp(this.#stdoutPromiseId);
- });
+ this.#stdout = readableStreamForRidUnrefable(stdoutRid);
}
if (stderrRid !== null) {
this.#stderrRid = stderrRid;
- this.#stderr = readableStreamForRid(stderrRid, (promise) => {
- this.#stderrPromiseId = promise[promiseIdSymbol];
- if (this.#unrefed) core.unrefOp(this.#stderrPromiseId);
- });
+ this.#stderr = readableStreamForRidUnrefable(stderrRid);
}
const onAbort = () => this.kill("SIGTERM");
@@ -214,15 +212,15 @@
ref() {
this.#unrefed = false;
core.refOp(this.#waitPromiseId);
- if (this.#stdoutPromiseId) core.refOp(this.#stdoutPromiseId);
- if (this.#stderrPromiseId) core.refOp(this.#stderrPromiseId);
+ if (this.#stdout) readableStreamForRidUnrefableRef(this.#stdout);
+ if (this.#stderr) readableStreamForRidUnrefableRef(this.#stderr);
}
unref() {
this.#unrefed = true;
core.unrefOp(this.#waitPromiseId);
- if (this.#stdoutPromiseId) core.unrefOp(this.#stdoutPromiseId);
- if (this.#stderrPromiseId) core.unrefOp(this.#stderrPromiseId);
+ if (this.#stdout) readableStreamForRidUnrefableUnref(this.#stdout);
+ if (this.#stderr) readableStreamForRidUnrefableUnref(this.#stderr);
}
}