diff options
author | Divy Srivastava <dj.srivastava23@gmail.com> | 2022-10-19 16:41:47 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-19 16:41:47 +0530 |
commit | 743fcc0668f553c7902c44b1f6a484db42c1cfd0 (patch) | |
tree | ebd71545dbb0f3402ac50fc663ed04c73db3a50b | |
parent | 36307c45e95b599eb01bf53df161973a7ef8b58e (diff) |
perf(ext/flash): optimize path response streams (#16284)
Regression caused by https://github.com/denoland/deno/pull/15591
-rw-r--r-- | ext/flash/01_http.js | 40 | ||||
-rw-r--r-- | ext/flash/lib.rs | 52 |
2 files changed, 75 insertions, 17 deletions
diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js index 17f99ca98..c7dce421d 100644 --- a/ext/flash/01_http.js +++ b/ext/flash/01_http.js @@ -365,6 +365,8 @@ } } else { const reader = respBody.getReader(); + const { value, done } = await reader.read(); + // Best case: sends headers + first chunk in a single go. writeFixedResponse( serverId, i, @@ -379,14 +381,21 @@ false, respondFast, ); - while (true) { - const { value, done } = await reader.read(); - await respondChunked( - i, - value, - done, - ); - if (done) break; + await respondChunked( + i, + value, + done, + ); + if (!done) { + while (true) { + const chunk = await reader.read(); + await respondChunked( + i, + chunk.value, + chunk.done, + ); + if (chunk.done) break; + } } } } @@ -591,13 +600,22 @@ }); function respondChunked(token, chunk, shutdown) { - return core.opAsync( - "op_flash_respond_chuncked", + const nwritten = core.ops.op_try_flash_respond_chuncked( serverId, token, - chunk, + chunk ?? new Uint8Array(), shutdown, ); + if (nwritten > 0) { + return core.opAsync( + "op_flash_respond_chuncked", + serverId, + token, + chunk, + shutdown, + nwritten, + ); + } } const fastOp = prepareFastCalls(); diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs index f9ce1c744..17e3e8317 100644 --- a/ext/flash/lib.rs +++ b/ext/flash/lib.rs @@ -150,6 +150,34 @@ async fn op_flash_respond_async( Ok(()) } +#[op(fast)] +fn op_try_flash_respond_chuncked( + op_state: &mut OpState, + server_id: u32, + token: u32, + response: &[u8], + shutdown: bool, +) -> u32 { + let flash_ctx = op_state.borrow_mut::<FlashContext>(); + let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); + let tx = ctx.requests.get(&token).unwrap(); + let sock = tx.socket(); + + // TODO(@littledivy): Use writev when `UnixIoSlice` lands. + // https://github.com/denoland/deno/pull/15629 + let h = format!("{:x}\r\n", response.len()); + let concat = [h.as_bytes(), response, b"\r\n"].concat(); + let expected = sock.try_write(&concat); + if expected != concat.len() { + return expected as u32; + } + if shutdown { + // Best case: We've written everything and the stream is done too. + let _ = ctx.requests.remove(&token).unwrap(); + } + 0 +} + #[op] async fn op_flash_respond_chuncked( op_state: Rc<RefCell<OpState>>, @@ -157,6 +185,7 @@ async fn op_flash_respond_chuncked( token: u32, response: Option<ZeroCopyBuf>, shutdown: bool, + nwritten: u32, ) -> Result<(), AnyError> { let mut op_state = op_state.borrow_mut(); let flash_ctx = op_state.borrow_mut::<FlashContext>(); @@ -178,17 +207,27 @@ async fn op_flash_respond_chuncked( .with_async_stream(|stream| { Box::pin(async move { use tokio::io::AsyncWriteExt; + // TODO(@littledivy): Use writev when `UnixIoSlice` lands. + // https://github.com/denoland/deno/pull/15629 + macro_rules! write_whats_not_written { + ($e:expr) => { + let e = $e; + let n = nwritten as usize; + if n < e.len() { + stream.write_all(&e[n..]).await?; + } + }; + } if let Some(response) = response { - stream - .write_all(format!("{:x}\r\n", response.len()).as_bytes()) - .await?; - stream.write_all(&response).await?; - stream.write_all(b"\r\n").await?; + let h = format!("{:x}\r\n", response.len()); + write_whats_not_written!(h.as_bytes()); + write_whats_not_written!(&response); + write_whats_not_written!(b"\r\n"); } // The last chunk if shutdown { - stream.write_all(b"0\r\n\r\n").await?; + write_whats_not_written!(b"0\r\n\r\n"); } Ok(()) @@ -1451,6 +1490,7 @@ pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension { op_flash_respond::decl(), op_flash_respond_async::decl(), op_flash_respond_chuncked::decl(), + op_try_flash_respond_chuncked::decl(), op_flash_method::decl(), op_flash_path::decl(), op_flash_headers::decl(), |