diff options
Diffstat (limited to 'ext/flash/lib.rs')
-rw-r--r-- | ext/flash/lib.rs | 52 |
1 files changed, 46 insertions, 6 deletions
diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs index f9ce1c744..17e3e8317 100644 --- a/ext/flash/lib.rs +++ b/ext/flash/lib.rs @@ -150,6 +150,34 @@ async fn op_flash_respond_async( Ok(()) } +#[op(fast)] +fn op_try_flash_respond_chuncked( + op_state: &mut OpState, + server_id: u32, + token: u32, + response: &[u8], + shutdown: bool, +) -> u32 { + let flash_ctx = op_state.borrow_mut::<FlashContext>(); + let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); + let tx = ctx.requests.get(&token).unwrap(); + let sock = tx.socket(); + + // TODO(@littledivy): Use writev when `UnixIoSlice` lands. + // https://github.com/denoland/deno/pull/15629 + let h = format!("{:x}\r\n", response.len()); + let concat = [h.as_bytes(), response, b"\r\n"].concat(); + let expected = sock.try_write(&concat); + if expected != concat.len() { + return expected as u32; + } + if shutdown { + // Best case: We've written everything and the stream is done too. + let _ = ctx.requests.remove(&token).unwrap(); + } + 0 +} + #[op] async fn op_flash_respond_chuncked( op_state: Rc<RefCell<OpState>>, @@ -157,6 +185,7 @@ async fn op_flash_respond_chuncked( token: u32, response: Option<ZeroCopyBuf>, shutdown: bool, + nwritten: u32, ) -> Result<(), AnyError> { let mut op_state = op_state.borrow_mut(); let flash_ctx = op_state.borrow_mut::<FlashContext>(); @@ -178,17 +207,27 @@ async fn op_flash_respond_chuncked( .with_async_stream(|stream| { Box::pin(async move { use tokio::io::AsyncWriteExt; + // TODO(@littledivy): Use writev when `UnixIoSlice` lands. + // https://github.com/denoland/deno/pull/15629 + macro_rules! write_whats_not_written { + ($e:expr) => { + let e = $e; + let n = nwritten as usize; + if n < e.len() { + stream.write_all(&e[n..]).await?; + } + }; + } if let Some(response) = response { - stream - .write_all(format!("{:x}\r\n", response.len()).as_bytes()) - .await?; - stream.write_all(&response).await?; - stream.write_all(b"\r\n").await?; + let h = format!("{:x}\r\n", response.len()); + write_whats_not_written!(h.as_bytes()); + write_whats_not_written!(&response); + write_whats_not_written!(b"\r\n"); } // The last chunk if shutdown { - stream.write_all(b"0\r\n\r\n").await?; + write_whats_not_written!(b"0\r\n\r\n"); } Ok(()) @@ -1451,6 +1490,7 @@ pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension { op_flash_respond::decl(), op_flash_respond_async::decl(), op_flash_respond_chuncked::decl(), + op_try_flash_respond_chuncked::decl(), op_flash_method::decl(), op_flash_path::decl(), op_flash_headers::decl(), |