summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
authorDivy Srivastava <dj.srivastava23@gmail.com>2022-04-22 16:19:08 +0530
committerGitHub <noreply@github.com>2022-04-22 16:19:08 +0530
commit57f7e07c13a1a692602022af4fc32c6ac352bb72 (patch)
treee2b6bf271ebda2aecf158b25c9d6c466461549dd /ext
parent2724235ec798f1fbf8fb5bd291615987ac4b919e (diff)
Reland "perf(http): optimize ReadableStreams backed by a resource" (#14346)
Diffstat (limited to 'ext')
-rw-r--r--ext/fetch/lib.rs7
-rw-r--r--ext/http/01_http.js60
-rw-r--r--ext/http/lib.rs58
-rw-r--r--ext/net/01_net.js30
-rw-r--r--ext/net/io.rs16
-rw-r--r--ext/net/ops_tls.rs9
-rw-r--r--ext/web/06_streams.js42
7 files changed, 167 insertions, 55 deletions
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index c216d53fa..def823d8f 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -485,12 +485,15 @@ impl Resource for FetchResponseBodyResource {
"fetchResponseBody".into()
}
- fn read(self: Rc<Self>, mut buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn read_return(
+ self: Rc<Self>,
+ mut buf: ZeroCopyBuf,
+ ) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(async move {
let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
let read = reader.read(&mut buf).try_or_cancel(cancel).await?;
- Ok(read)
+ Ok((read, buf))
})
}
diff --git a/ext/http/01_http.js b/ext/http/01_http.js
index 217bfc061..152241522 100644
--- a/ext/http/01_http.js
+++ b/ext/http/01_http.js
@@ -32,7 +32,8 @@
} = window.__bootstrap.webSocket;
const { TcpConn, UnixConn } = window.__bootstrap.net;
const { TlsConn } = window.__bootstrap.tls;
- const { Deferred } = window.__bootstrap.streams;
+ const { Deferred, getReadableStreamRid, readableStreamClose } =
+ window.__bootstrap.streams;
const {
ArrayPrototypeIncludes,
ArrayPrototypePush,
@@ -235,7 +236,6 @@
typeof respBody === "string" ||
ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody)
);
-
try {
await core.opAsync(
"op_http_write_headers",
@@ -269,16 +269,19 @@
) {
throw new TypeError("Unreachable");
}
- const reader = respBody.getReader();
- while (true) {
- const { value, done } = await reader.read();
- if (done) break;
- if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) {
- await reader.cancel(new TypeError("Value not a Uint8Array"));
- break;
+ const resourceRid = getReadableStreamRid(respBody);
+ if (resourceRid) {
+ if (respBody.locked) {
+ throw new TypeError("ReadableStream is locked.");
}
+ const reader = respBody.getReader(); // Aquire JS lock.
try {
- await core.opAsync("op_http_write", streamRid, value);
+ await core.opAsync(
+ "op_http_write_resource",
+ streamRid,
+ resourceRid,
+ );
+ readableStreamClose(respBody); // Release JS lock.
} catch (error) {
const connError = httpConn[connErrorSymbol];
if (
@@ -291,12 +294,37 @@
await reader.cancel(error);
throw error;
}
- }
- try {
- await core.opAsync("op_http_shutdown", streamRid);
- } catch (error) {
- await reader.cancel(error);
- throw error;
+ } else {
+ const reader = respBody.getReader();
+ while (true) {
+ const { value, done } = await reader.read();
+ if (done) break;
+ if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) {
+ await reader.cancel(new TypeError("Value not a Uint8Array"));
+ break;
+ }
+ try {
+ await core.opAsync("op_http_write", streamRid, value);
+ } catch (error) {
+ const connError = httpConn[connErrorSymbol];
+ if (
+ ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) &&
+ connError != null
+ ) {
+ // deno-lint-ignore no-ex-assign
+ error = new connError.constructor(connError.message);
+ }
+ await reader.cancel(error);
+ throw error;
+ }
+ }
+
+ try {
+ await core.opAsync("op_http_shutdown", streamRid);
+ } catch (error) {
+ await reader.cancel(error);
+ throw error;
+ }
}
}
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 9c1b48fff..7fc90843f 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -77,6 +77,7 @@ pub fn init() -> Extension {
op_http_read::decl(),
op_http_write_headers::decl(),
op_http_write::decl(),
+ op_http_write_resource::decl(),
op_http_shutdown::decl(),
op_http_websocket_accept_header::decl(),
op_http_upgrade_websocket::decl(),
@@ -684,6 +685,63 @@ async fn op_http_write_headers(
}
#[op]
+async fn op_http_write_resource(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ stream: ResourceId,
+) -> Result<(), AnyError> {
+ let http_stream = state
+ .borrow()
+ .resource_table
+ .get::<HttpStreamResource>(rid)?;
+ let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
+ let resource = state.borrow().resource_table.get_any(stream)?;
+ loop {
+ let body_writer = match &mut *wr {
+ HttpResponseWriter::Body(body_writer) => body_writer,
+ HttpResponseWriter::Headers(_) => {
+ return Err(http_error("no response headers"))
+ }
+ HttpResponseWriter::Closed => {
+ return Err(http_error("response already completed"))
+ }
+ };
+
+ let vec = vec![0u8; 64 * 1024]; // 64KB
+ let buf = ZeroCopyBuf::new_temp(vec);
+ let (nread, buf) = resource.clone().read_return(buf).await?;
+ if nread == 0 {
+ break;
+ }
+ match body_writer.write_all(&buf[..nread]).await {
+ Ok(_) => {}
+ Err(err) => {
+ assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
+ // Don't return "broken pipe", that's an implementation detail.
+ // Pull up the failure associated with the transport connection instead.
+ http_stream.conn.closed().await?;
+ // If there was no connection error, drop body_tx.
+ *wr = HttpResponseWriter::Closed;
+ }
+ }
+ }
+
+ let wr = take(&mut *wr);
+ if let HttpResponseWriter::Body(mut body_writer) = wr {
+ match body_writer.shutdown().await {
+ Ok(_) => {}
+ Err(err) => {
+ assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
+ // Don't return "broken pipe", that's an implementation detail.
+ // Pull up the failure associated with the transport connection instead.
+ http_stream.conn.closed().await?;
+ }
+ }
+ }
+ Ok(())
+}
+
+#[op]
async fn op_http_write(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
diff --git a/ext/net/01_net.js b/ext/net/01_net.js
index 48cbfaaab..fde75fe56 100644
--- a/ext/net/01_net.js
+++ b/ext/net/01_net.js
@@ -4,7 +4,7 @@
((window) => {
const core = window.Deno.core;
const { BadResourcePrototype, InterruptedPrototype } = core;
- const { ReadableStream, WritableStream } = window.__bootstrap.streams;
+ const { WritableStream, readableStreamForRid } = window.__bootstrap.streams;
const {
Error,
ObjectPrototypeIsPrototypeOf,
@@ -65,8 +65,6 @@
return core.opAsync("op_dns_resolve", { query, recordType, options });
}
- const DEFAULT_CHUNK_SIZE = 64 * 1024;
-
function tryClose(rid) {
try {
core.close(rid);
@@ -75,32 +73,6 @@
}
}
- function readableStreamForRid(rid) {
- return new ReadableStream({
- type: "bytes",
- async pull(controller) {
- const v = controller.byobRequest.view;
- try {
- const bytesRead = await read(rid, v);
- if (bytesRead === null) {
- tryClose(rid);
- controller.close();
- controller.byobRequest.respond(0);
- } else {
- controller.byobRequest.respond(bytesRead);
- }
- } catch (e) {
- controller.error(e);
- tryClose(rid);
- }
- },
- cancel() {
- tryClose(rid);
- },
- autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
- });
- }
-
function writableStreamForRid(rid) {
return new WritableStream({
async write(chunk, controller) {
diff --git a/ext/net/io.rs b/ext/net/io.rs
index 17b86af17..02caf7473 100644
--- a/ext/net/io.rs
+++ b/ext/net/io.rs
@@ -70,13 +70,13 @@ where
pub async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
- ) -> Result<usize, AnyError> {
+ ) -> Result<(usize, ZeroCopyBuf), AnyError> {
let mut rd = self.rd_borrow_mut().await;
let nread = rd
.read(&mut buf)
.try_or_cancel(self.cancel_handle())
.await?;
- Ok(nread)
+ Ok((nread, buf))
}
pub async fn write(
@@ -103,7 +103,10 @@ impl Resource for TcpStreamResource {
"tcpStream".into()
}
- fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn read_return(
+ self: Rc<Self>,
+ buf: ZeroCopyBuf,
+ ) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}
@@ -160,7 +163,7 @@ impl UnixStreamResource {
pub async fn read(
self: Rc<Self>,
_buf: ZeroCopyBuf,
- ) -> Result<usize, AnyError> {
+ ) -> Result<(usize, ZeroCopyBuf), AnyError> {
unreachable!()
}
pub async fn write(
@@ -182,7 +185,10 @@ impl Resource for UnixStreamResource {
"unixStream".into()
}
- fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn read_return(
+ self: Rc<Self>,
+ buf: ZeroCopyBuf,
+ ) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}
diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs
index d6b83e6e8..ca922203c 100644
--- a/ext/net/ops_tls.rs
+++ b/ext/net/ops_tls.rs
@@ -674,11 +674,11 @@ impl TlsStreamResource {
pub async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
- ) -> Result<usize, AnyError> {
+ ) -> Result<(usize, ZeroCopyBuf), AnyError> {
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
let nread = rd.read(&mut buf).try_or_cancel(cancel_handle).await?;
- Ok(nread)
+ Ok((nread, buf))
}
pub async fn write(
@@ -722,7 +722,10 @@ impl Resource for TlsStreamResource {
"tlsStream".into()
}
- fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn read_return(
+ self: Rc<Self>,
+ buf: ZeroCopyBuf,
+ ) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index 6daea0898..492694563 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -8,6 +8,7 @@
"use strict";
((window) => {
+ const core = window.Deno.core;
const webidl = window.__bootstrap.webidl;
const { add, remove, signalAbort, newSignal, AbortSignalPrototype } =
window.__bootstrap.abortSignal;
@@ -640,6 +641,41 @@
return stream[_disturbed];
}
+ const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB
+
+ function readableStreamForRid(rid) {
+ const stream = new ReadableStream({
+ type: "bytes",
+ async pull(controller) {
+ const v = controller.byobRequest.view;
+ try {
+ const bytesRead = await core.read(rid, v);
+ if (bytesRead === 0) {
+ core.tryClose(rid);
+ controller.close();
+ controller.byobRequest.respond(0);
+ } else {
+ controller.byobRequest.respond(bytesRead);
+ }
+ } catch (e) {
+ controller.error(e);
+ core.tryClose(rid);
+ }
+ },
+ cancel() {
+ core.tryClose(rid);
+ },
+ autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
+ });
+
+ stream[_maybeRid] = rid;
+ return stream;
+ }
+
+ function getReadableStreamRid(stream) {
+ return stream[_maybeRid];
+ }
+
/**
* @param {unknown} value
* @returns {value is WritableStream}
@@ -4288,6 +4324,7 @@
WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size);
}
+ const _maybeRid = Symbol("[[maybeRid]]");
/** @template R */
class ReadableStream {
/** @type {ReadableStreamDefaultController | ReadableByteStreamController} */
@@ -4302,6 +4339,8 @@
[_state];
/** @type {any} */
[_storedError];
+ /** @type {number | null} */
+ [_maybeRid] = null;
/**
* @param {UnderlyingSource<R>=} underlyingSource
@@ -5840,6 +5879,9 @@
errorReadableStream,
createProxy,
writableStreamClose,
+ readableStreamClose,
+ readableStreamForRid,
+ getReadableStreamRid,
Deferred,
// Exposed in global runtime scope
ByteLengthQueuingStrategy,