From 743fcc0668f553c7902c44b1f6a484db42c1cfd0 Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Wed, 19 Oct 2022 16:41:47 +0530 Subject: perf(ext/flash): optimize path response streams (#16284) Regression caused by https://github.com/denoland/deno/pull/15591 --- ext/flash/lib.rs | 52 ++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 46 insertions(+), 6 deletions(-) (limited to 'ext/flash/lib.rs') diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs index f9ce1c744..17e3e8317 100644 --- a/ext/flash/lib.rs +++ b/ext/flash/lib.rs @@ -150,6 +150,34 @@ async fn op_flash_respond_async( Ok(()) } +#[op(fast)] +fn op_try_flash_respond_chuncked( + op_state: &mut OpState, + server_id: u32, + token: u32, + response: &[u8], + shutdown: bool, +) -> u32 { + let flash_ctx = op_state.borrow_mut::(); + let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); + let tx = ctx.requests.get(&token).unwrap(); + let sock = tx.socket(); + + // TODO(@littledivy): Use writev when `UnixIoSlice` lands. + // https://github.com/denoland/deno/pull/15629 + let h = format!("{:x}\r\n", response.len()); + let concat = [h.as_bytes(), response, b"\r\n"].concat(); + let expected = sock.try_write(&concat); + if expected != concat.len() { + return expected as u32; + } + if shutdown { + // Best case: We've written everything and the stream is done too. + let _ = ctx.requests.remove(&token).unwrap(); + } + 0 +} + #[op] async fn op_flash_respond_chuncked( op_state: Rc>, @@ -157,6 +185,7 @@ async fn op_flash_respond_chuncked( token: u32, response: Option, shutdown: bool, + nwritten: u32, ) -> Result<(), AnyError> { let mut op_state = op_state.borrow_mut(); let flash_ctx = op_state.borrow_mut::(); @@ -178,17 +207,27 @@ async fn op_flash_respond_chuncked( .with_async_stream(|stream| { Box::pin(async move { use tokio::io::AsyncWriteExt; + // TODO(@littledivy): Use writev when `UnixIoSlice` lands. + // https://github.com/denoland/deno/pull/15629 + macro_rules! write_whats_not_written { + ($e:expr) => { + let e = $e; + let n = nwritten as usize; + if n < e.len() { + stream.write_all(&e[n..]).await?; + } + }; + } if let Some(response) = response { - stream - .write_all(format!("{:x}\r\n", response.len()).as_bytes()) - .await?; - stream.write_all(&response).await?; - stream.write_all(b"\r\n").await?; + let h = format!("{:x}\r\n", response.len()); + write_whats_not_written!(h.as_bytes()); + write_whats_not_written!(&response); + write_whats_not_written!(b"\r\n"); } // The last chunk if shutdown { - stream.write_all(b"0\r\n\r\n").await?; + write_whats_not_written!(b"0\r\n\r\n"); } Ok(()) @@ -1451,6 +1490,7 @@ pub fn init(unstable: bool) -> Extension { op_flash_respond::decl(), op_flash_respond_async::decl(), op_flash_respond_chuncked::decl(), + op_try_flash_respond_chuncked::decl(), op_flash_method::decl(), op_flash_path::decl(), op_flash_headers::decl(), -- cgit v1.2.3