summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDivy Srivastava <dj.srivastava23@gmail.com>2022-10-19 16:41:47 +0530
committerGitHub <noreply@github.com>2022-10-19 16:41:47 +0530
commit743fcc0668f553c7902c44b1f6a484db42c1cfd0 (patch)
treeebd71545dbb0f3402ac50fc663ed04c73db3a50b
parent36307c45e95b599eb01bf53df161973a7ef8b58e (diff)
perf(ext/flash): optimize path response streams (#16284)
Regression caused by https://github.com/denoland/deno/pull/15591
-rw-r--r--ext/flash/01_http.js40
-rw-r--r--ext/flash/lib.rs52
2 files changed, 75 insertions, 17 deletions
diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js
index 17f99ca98..c7dce421d 100644
--- a/ext/flash/01_http.js
+++ b/ext/flash/01_http.js
@@ -365,6 +365,8 @@
}
} else {
const reader = respBody.getReader();
+ const { value, done } = await reader.read();
+ // Best case: sends headers + first chunk in a single go.
writeFixedResponse(
serverId,
i,
@@ -379,14 +381,21 @@
false,
respondFast,
);
- while (true) {
- const { value, done } = await reader.read();
- await respondChunked(
- i,
- value,
- done,
- );
- if (done) break;
+ await respondChunked(
+ i,
+ value,
+ done,
+ );
+ if (!done) {
+ while (true) {
+ const chunk = await reader.read();
+ await respondChunked(
+ i,
+ chunk.value,
+ chunk.done,
+ );
+ if (chunk.done) break;
+ }
}
}
}
@@ -591,13 +600,22 @@
});
function respondChunked(token, chunk, shutdown) {
- return core.opAsync(
- "op_flash_respond_chuncked",
+ const nwritten = core.ops.op_try_flash_respond_chuncked(
serverId,
token,
- chunk,
+ chunk ?? new Uint8Array(),
shutdown,
);
+ if (nwritten > 0) {
+ return core.opAsync(
+ "op_flash_respond_chuncked",
+ serverId,
+ token,
+ chunk,
+ shutdown,
+ nwritten,
+ );
+ }
}
const fastOp = prepareFastCalls();
diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs
index f9ce1c744..17e3e8317 100644
--- a/ext/flash/lib.rs
+++ b/ext/flash/lib.rs
@@ -150,6 +150,34 @@ async fn op_flash_respond_async(
Ok(())
}
+#[op(fast)]
+fn op_try_flash_respond_chuncked(
+ op_state: &mut OpState,
+ server_id: u32,
+ token: u32,
+ response: &[u8],
+ shutdown: bool,
+) -> u32 {
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ let tx = ctx.requests.get(&token).unwrap();
+ let sock = tx.socket();
+
+ // TODO(@littledivy): Use writev when `UnixIoSlice` lands.
+ // https://github.com/denoland/deno/pull/15629
+ let h = format!("{:x}\r\n", response.len());
+ let concat = [h.as_bytes(), response, b"\r\n"].concat();
+ let expected = sock.try_write(&concat);
+ if expected != concat.len() {
+ return expected as u32;
+ }
+ if shutdown {
+ // Best case: We've written everything and the stream is done too.
+ let _ = ctx.requests.remove(&token).unwrap();
+ }
+ 0
+}
+
#[op]
async fn op_flash_respond_chuncked(
op_state: Rc<RefCell<OpState>>,
@@ -157,6 +185,7 @@ async fn op_flash_respond_chuncked(
token: u32,
response: Option<ZeroCopyBuf>,
shutdown: bool,
+ nwritten: u32,
) -> Result<(), AnyError> {
let mut op_state = op_state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>();
@@ -178,17 +207,27 @@ async fn op_flash_respond_chuncked(
.with_async_stream(|stream| {
Box::pin(async move {
use tokio::io::AsyncWriteExt;
+ // TODO(@littledivy): Use writev when `UnixIoSlice` lands.
+ // https://github.com/denoland/deno/pull/15629
+ macro_rules! write_whats_not_written {
+ ($e:expr) => {
+ let e = $e;
+ let n = nwritten as usize;
+ if n < e.len() {
+ stream.write_all(&e[n..]).await?;
+ }
+ };
+ }
if let Some(response) = response {
- stream
- .write_all(format!("{:x}\r\n", response.len()).as_bytes())
- .await?;
- stream.write_all(&response).await?;
- stream.write_all(b"\r\n").await?;
+ let h = format!("{:x}\r\n", response.len());
+ write_whats_not_written!(h.as_bytes());
+ write_whats_not_written!(&response);
+ write_whats_not_written!(b"\r\n");
}
// The last chunk
if shutdown {
- stream.write_all(b"0\r\n\r\n").await?;
+ write_whats_not_written!(b"0\r\n\r\n");
}
Ok(())
@@ -1451,6 +1490,7 @@ pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension {
op_flash_respond::decl(),
op_flash_respond_async::decl(),
op_flash_respond_chuncked::decl(),
+ op_try_flash_respond_chuncked::decl(),
op_flash_method::decl(),
op_flash_path::decl(),
op_flash_headers::decl(),