summaryrefslogtreecommitdiff
path: root/ext/flash/lib.rs
diff options
context:
space:
mode:
authorDivy Srivastava <dj.srivastava23@gmail.com>2022-10-19 16:41:47 +0530
committerGitHub <noreply@github.com>2022-10-19 16:41:47 +0530
commit743fcc0668f553c7902c44b1f6a484db42c1cfd0 (patch)
treeebd71545dbb0f3402ac50fc663ed04c73db3a50b /ext/flash/lib.rs
parent36307c45e95b599eb01bf53df161973a7ef8b58e (diff)
perf(ext/flash): optimize path response streams (#16284)
Regression caused by https://github.com/denoland/deno/pull/15591
Diffstat (limited to 'ext/flash/lib.rs')
-rw-r--r--ext/flash/lib.rs52
1 files changed, 46 insertions, 6 deletions
diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs
index f9ce1c744..17e3e8317 100644
--- a/ext/flash/lib.rs
+++ b/ext/flash/lib.rs
@@ -150,6 +150,34 @@ async fn op_flash_respond_async(
Ok(())
}
+#[op(fast)]
+fn op_try_flash_respond_chuncked(
+ op_state: &mut OpState,
+ server_id: u32,
+ token: u32,
+ response: &[u8],
+ shutdown: bool,
+) -> u32 {
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ let tx = ctx.requests.get(&token).unwrap();
+ let sock = tx.socket();
+
+ // TODO(@littledivy): Use writev when `UnixIoSlice` lands.
+ // https://github.com/denoland/deno/pull/15629
+ let h = format!("{:x}\r\n", response.len());
+ let concat = [h.as_bytes(), response, b"\r\n"].concat();
+ let expected = sock.try_write(&concat);
+ if expected != concat.len() {
+ return expected as u32;
+ }
+ if shutdown {
+ // Best case: We've written everything and the stream is done too.
+ let _ = ctx.requests.remove(&token).unwrap();
+ }
+ 0
+}
+
#[op]
async fn op_flash_respond_chuncked(
op_state: Rc<RefCell<OpState>>,
@@ -157,6 +185,7 @@ async fn op_flash_respond_chuncked(
token: u32,
response: Option<ZeroCopyBuf>,
shutdown: bool,
+ nwritten: u32,
) -> Result<(), AnyError> {
let mut op_state = op_state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>();
@@ -178,17 +207,27 @@ async fn op_flash_respond_chuncked(
.with_async_stream(|stream| {
Box::pin(async move {
use tokio::io::AsyncWriteExt;
+ // TODO(@littledivy): Use writev when `UnixIoSlice` lands.
+ // https://github.com/denoland/deno/pull/15629
+ macro_rules! write_whats_not_written {
+ ($e:expr) => {
+ let e = $e;
+ let n = nwritten as usize;
+ if n < e.len() {
+ stream.write_all(&e[n..]).await?;
+ }
+ };
+ }
if let Some(response) = response {
- stream
- .write_all(format!("{:x}\r\n", response.len()).as_bytes())
- .await?;
- stream.write_all(&response).await?;
- stream.write_all(b"\r\n").await?;
+ let h = format!("{:x}\r\n", response.len());
+ write_whats_not_written!(h.as_bytes());
+ write_whats_not_written!(&response);
+ write_whats_not_written!(b"\r\n");
}
// The last chunk
if shutdown {
- stream.write_all(b"0\r\n\r\n").await?;
+ write_whats_not_written!(b"0\r\n\r\n");
}
Ok(())
@@ -1451,6 +1490,7 @@ pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension {
op_flash_respond::decl(),
op_flash_respond_async::decl(),
op_flash_respond_chuncked::decl(),
+ op_try_flash_respond_chuncked::decl(),
op_flash_method::decl(),
op_flash_path::decl(),
op_flash_headers::decl(),