summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/01_core.js1
-rw-r--r--core/examples/http_bench_json_ops.js2
-rw-r--r--core/lib.deno_core.d.ts5
-rw-r--r--core/ops_builtin.rs13
-rw-r--r--ext/cache/01_cache.js2
-rw-r--r--ext/fetch/26_fetch.js2
-rw-r--r--ext/net/01_net.js39
-rw-r--r--ext/web/06_streams.js58
-rw-r--r--runtime/js/40_files.js4
-rw-r--r--runtime/js/40_spawn.js2
10 files changed, 85 insertions, 43 deletions
diff --git a/core/01_core.js b/core/01_core.js
index 655b4219e..b98e54160 100644
--- a/core/01_core.js
+++ b/core/01_core.js
@@ -329,6 +329,7 @@
tryClose: (rid) => ops.op_try_close(rid),
read: opAsync.bind(null, "op_read"),
write: opAsync.bind(null, "op_write"),
+ writeAll: opAsync.bind(null, "op_write_all"),
shutdown: opAsync.bind(null, "op_shutdown"),
print: (msg, isErr) => ops.op_print(msg, isErr),
setMacrotaskCallback: (fn) => ops.op_set_macrotask_callback(fn),
diff --git a/core/examples/http_bench_json_ops.js b/core/examples/http_bench_json_ops.js
index cea344987..98b2f4ef8 100644
--- a/core/examples/http_bench_json_ops.js
+++ b/core/examples/http_bench_json_ops.js
@@ -23,7 +23,7 @@ async function serve(rid) {
try {
while (true) {
await Deno.core.read(rid, requestBuf);
- await Deno.core.write(rid, responseBuf);
+ await Deno.core.writeAll(rid, responseBuf);
}
} catch (e) {
if (
diff --git a/core/lib.deno_core.d.ts b/core/lib.deno_core.d.ts
index 7e46d0f14..2a3764730 100644
--- a/core/lib.deno_core.d.ts
+++ b/core/lib.deno_core.d.ts
@@ -62,6 +62,11 @@ declare namespace Deno {
function write(rid: number, buf: Uint8Array): Promise<number>;
/**
+ * Write to a (stream) resource that implements write()
+ */
+ function writeAll(rid: number, buf: Uint8Array): Promise<void>;
+
+ /**
* Print a message to stdout or stderr
*/
function print(message: string, is_err?: boolean): void;
diff --git a/core/ops_builtin.rs b/core/ops_builtin.rs
index 41741bf28..3fc9d62d6 100644
--- a/core/ops_builtin.rs
+++ b/core/ops_builtin.rs
@@ -38,6 +38,7 @@ pub(crate) fn init_builtins() -> Extension {
op_read::decl(),
op_read_all::decl(),
op_write::decl(),
+ op_write_all::decl(),
op_shutdown::decl(),
op_metrics::decl(),
op_format_file_name::decl(),
@@ -254,6 +255,18 @@ async fn op_write(
}
#[op]
+async fn op_write_all(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ buf: ZeroCopyBuf,
+) -> Result<(), Error> {
+ let resource = state.borrow().resource_table.get_any(rid)?;
+ let view = BufView::from(buf);
+ resource.write_all(view).await?;
+ Ok(())
+}
+
+#[op]
async fn op_shutdown(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
diff --git a/ext/cache/01_cache.js b/ext/cache/01_cache.js
index fa0b68037..afde7f789 100644
--- a/ext/cache/01_cache.js
+++ b/ext/cache/01_cache.js
@@ -148,7 +148,7 @@
await core.shutdown(rid);
break;
}
- await core.write(rid, value);
+ await core.writeAll(rid, value);
}
} finally {
core.close(rid);
diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js
index 169db2bbf..5c824898d 100644
--- a/ext/fetch/26_fetch.js
+++ b/ext/fetch/26_fetch.js
@@ -225,7 +225,7 @@
}
try {
await PromisePrototypeCatch(
- core.write(requestBodyRid, value),
+ core.writeAll(requestBodyRid, value),
(err) => {
if (terminator.aborted) return;
throw err;
diff --git a/ext/net/01_net.js b/ext/net/01_net.js
index 04360116b..d7a093ba6 100644
--- a/ext/net/01_net.js
+++ b/ext/net/01_net.js
@@ -4,7 +4,8 @@
((window) => {
const core = window.Deno.core;
const { BadResourcePrototype, InterruptedPrototype, ops } = core;
- const { WritableStream, readableStreamForRid } = window.__bootstrap.streams;
+ const { readableStreamForRid, writableStreamForRid } =
+ window.__bootstrap.streams;
const {
Error,
ObjectPrototypeIsPrototypeOf,
@@ -65,39 +66,6 @@
return core.opAsync("op_dns_resolve", { query, recordType, options });
}
- function tryClose(rid) {
- try {
- core.close(rid);
- } catch {
- // Ignore errors
- }
- }
-
- function writableStreamForRid(rid) {
- return new WritableStream({
- async write(chunk, controller) {
- try {
- let nwritten = 0;
- while (nwritten < chunk.length) {
- nwritten += await write(
- rid,
- TypedArrayPrototypeSubarray(chunk, nwritten),
- );
- }
- } catch (e) {
- controller.error(e);
- tryClose(rid);
- }
- },
- close() {
- tryClose(rid);
- },
- abort() {
- tryClose(rid);
- },
- });
- }
-
class Conn {
#rid = 0;
#remoteAddr = null;
@@ -353,7 +321,4 @@
Datagram,
resolveDns,
};
- window.__bootstrap.streamUtils = {
- writableStreamForRid,
- };
})(this);
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index 76e31503f..09e5b7414 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -826,6 +826,62 @@
return finalBuffer;
}
+ /**
+ * Create a new Writable object that is backed by a Resource that implements
+ * `Resource::write` / `Resource::write_all`. This object contains enough
+ * metadata to allow callers to bypass the JavaScript WritableStream
+ * implementation and write directly to the underlying resource if they so
+ * choose (FastStream).
+ *
+ * @param {number} rid The resource ID to write to.
+ * @param {boolean=} autoClose If the resource should be auto-closed when the stream closes. Defaults to true.
+ * @returns {ReadableStream<Uint8Array>}
+ */
+ function writableStreamForRid(rid, autoClose = true) {
+ const stream = webidl.createBranded(WritableStream);
+ stream[_resourceBacking] = { rid, autoClose };
+
+ const tryClose = () => {
+ if (!autoClose) return;
+ RESOURCE_REGISTRY.unregister(stream);
+ core.tryClose(rid);
+ };
+
+ if (autoClose) {
+ RESOURCE_REGISTRY.register(stream, rid, stream);
+ }
+
+ const underlyingSink = {
+ async write(chunk, controller) {
+ try {
+ await core.writeAll(rid, chunk);
+ } catch (e) {
+ controller.error(e);
+ tryClose();
+ }
+ },
+ close() {
+ tryClose();
+ },
+ abort() {
+ tryClose();
+ },
+ };
+ initializeWritableStream(stream);
+ setUpWritableStreamDefaultControllerFromUnderlyingSink(
+ stream,
+ underlyingSink,
+ underlyingSink,
+ 1,
+ () => 1,
+ );
+ return stream;
+ }
+
+ function getWritableStreamResourceBacking(stream) {
+ return stream[_resourceBacking];
+ }
+
/*
* @param {ReadableStream} stream
*/
@@ -6059,6 +6115,8 @@
readableStreamForRidUnrefableUnref,
readableStreamThrowIfErrored,
getReadableStreamResourceBacking,
+ writableStreamForRid,
+ getWritableStreamResourceBacking,
Deferred,
// Exposed in global runtime scope
ByteLengthQueuingStrategy,
diff --git a/runtime/js/40_files.js b/runtime/js/40_files.js
index c864d3970..226abb33e 100644
--- a/runtime/js/40_files.js
+++ b/runtime/js/40_files.js
@@ -7,8 +7,8 @@
const { read, readSync, write, writeSync } = window.__bootstrap.io;
const { ftruncate, ftruncateSync, fstat, fstatSync } = window.__bootstrap.fs;
const { pathFromURL } = window.__bootstrap.util;
- const { writableStreamForRid } = window.__bootstrap.streamUtils;
- const { readableStreamForRid } = window.__bootstrap.streams;
+ const { readableStreamForRid, writableStreamForRid } =
+ window.__bootstrap.streams;
const {
ArrayPrototypeFilter,
Error,
diff --git a/runtime/js/40_spawn.js b/runtime/js/40_spawn.js
index 99661bf1a..a0283f0ff 100644
--- a/runtime/js/40_spawn.js
+++ b/runtime/js/40_spawn.js
@@ -20,8 +20,8 @@
readableStreamForRidUnrefable,
readableStreamForRidUnrefableRef,
readableStreamForRidUnrefableUnref,
+ writableStreamForRid,
} = window.__bootstrap.streams;
- const { writableStreamForRid } = window.__bootstrap.streamUtils;
const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");