diff options
author | Divy Srivastava <dj.srivastava23@gmail.com> | 2023-07-07 22:17:08 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-07-07 22:17:08 +0530 |
commit | e4870d84be19f4ea768933522eff437f64963730 (patch) | |
tree | 789e83bc1d67754b917874b8f92d4a1227644930 /ext/http/http_next.rs | |
parent | 7d022ad11a74710ce46e4ab9f4e57635cae3ed2e (diff) |
perf(ext/node): native vectored write for server streams (#19752)
```
# main
$ ./load_test 10 0.0.0.0 8080 0 0
Using message size of 20 bytes
Running benchmark now...
Msg/sec: 106182.250000
Msg/sec: 110279.750000
^C
# this PR
$ ./load_test 10 0.0.0.0 8080 0 0
Using message size of 20 bytes
Running benchmark now...
Msg/sec: 131632.250000
Msg/sec: 134754.250000
^C
```
Diffstat (limited to 'ext/http/http_next.rs')
-rw-r--r-- | ext/http/http_next.rs | 42 |
1 files changed, 42 insertions, 0 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index d1dfb498c..019054894 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -32,6 +32,7 @@ use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::CancelTryFuture; +use deno_core::JsBuffer; use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; @@ -1034,6 +1035,34 @@ impl UpgradeStream { .try_or_cancel(cancel_handle) .await } + + async fn write_vectored( + self: Rc<Self>, + buf1: &[u8], + buf2: &[u8], + ) -> Result<usize, AnyError> { + let mut wr = RcRef::map(self, |r| &r.write).borrow_mut().await; + + let total = buf1.len() + buf2.len(); + let mut bufs = [std::io::IoSlice::new(buf1), std::io::IoSlice::new(buf2)]; + let mut nwritten = wr.write_vectored(&bufs).await?; + if nwritten == total { + return Ok(nwritten); + } + + // Slightly more optimized than (unstable) write_all_vectored for 2 iovecs. + while nwritten <= buf1.len() { + bufs[0] = std::io::IoSlice::new(&buf1[nwritten..]); + nwritten += wr.write_vectored(&bufs).await?; + } + + // First buffer out of the way. + if nwritten < total && nwritten > buf1.len() { + wr.write_all(&buf2[nwritten - buf1.len()..]).await?; + } + + Ok(total) + } } impl Resource for UpgradeStream { @@ -1048,3 +1077,16 @@ impl Resource for UpgradeStream { self.cancel_handle.cancel(); } } + +#[op] +pub async fn op_raw_write_vectored( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + buf1: JsBuffer, + buf2: JsBuffer, +) -> Result<usize, AnyError> { + let resource: Rc<UpgradeStream> = + state.borrow().resource_table.get::<UpgradeStream>(rid)?; + let nwritten = resource.write_vectored(&buf1, &buf2).await?; + Ok(nwritten) +} |