summaryrefslogtreecommitdiff
path: root/ext/http/http_next.rs
diff options
context:
space:
mode:
authorDivy Srivastava <dj.srivastava23@gmail.com>2023-07-07 22:17:08 +0530
committerGitHub <noreply@github.com>2023-07-07 22:17:08 +0530
commite4870d84be19f4ea768933522eff437f64963730 (patch)
tree789e83bc1d67754b917874b8f92d4a1227644930 /ext/http/http_next.rs
parent7d022ad11a74710ce46e4ab9f4e57635cae3ed2e (diff)
perf(ext/node): native vectored write for server streams (#19752)
``` # main $ ./load_test 10 0.0.0.0 8080 0 0 Using message size of 20 bytes Running benchmark now... Msg/sec: 106182.250000 Msg/sec: 110279.750000 ^C # this PR $ ./load_test 10 0.0.0.0 8080 0 0 Using message size of 20 bytes Running benchmark now... Msg/sec: 131632.250000 Msg/sec: 134754.250000 ^C ```
Diffstat (limited to 'ext/http/http_next.rs')
-rw-r--r--ext/http/http_next.rs42
1 files changed, 42 insertions, 0 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index d1dfb498c..019054894 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -32,6 +32,7 @@ use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
+use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
@@ -1034,6 +1035,34 @@ impl UpgradeStream {
.try_or_cancel(cancel_handle)
.await
}
+
+ async fn write_vectored(
+ self: Rc<Self>,
+ buf1: &[u8],
+ buf2: &[u8],
+ ) -> Result<usize, AnyError> {
+ let mut wr = RcRef::map(self, |r| &r.write).borrow_mut().await;
+
+ let total = buf1.len() + buf2.len();
+ let mut bufs = [std::io::IoSlice::new(buf1), std::io::IoSlice::new(buf2)];
+ let mut nwritten = wr.write_vectored(&bufs).await?;
+ if nwritten == total {
+ return Ok(nwritten);
+ }
+
+ // Slightly more optimized than (unstable) write_all_vectored for 2 iovecs.
+ while nwritten <= buf1.len() {
+ bufs[0] = std::io::IoSlice::new(&buf1[nwritten..]);
+ nwritten += wr.write_vectored(&bufs).await?;
+ }
+
+ // First buffer out of the way.
+ if nwritten < total && nwritten > buf1.len() {
+ wr.write_all(&buf2[nwritten - buf1.len()..]).await?;
+ }
+
+ Ok(total)
+ }
}
impl Resource for UpgradeStream {
@@ -1048,3 +1077,16 @@ impl Resource for UpgradeStream {
self.cancel_handle.cancel();
}
}
+
+#[op]
+pub async fn op_raw_write_vectored(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ buf1: JsBuffer,
+ buf2: JsBuffer,
+) -> Result<usize, AnyError> {
+ let resource: Rc<UpgradeStream> =
+ state.borrow().resource_table.get::<UpgradeStream>(rid)?;
+ let nwritten = resource.write_vectored(&buf1, &buf2).await?;
+ Ok(nwritten)
+}