summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDivy Srivastava <dj.srivastava23@gmail.com>2023-07-07 22:17:08 +0530
committerGitHub <noreply@github.com>2023-07-07 22:17:08 +0530
commite4870d84be19f4ea768933522eff437f64963730 (patch)
tree789e83bc1d67754b917874b8f92d4a1227644930
parent7d022ad11a74710ce46e4ab9f4e57635cae3ed2e (diff)
perf(ext/node): native vectored write for server streams (#19752)
``` # main $ ./load_test 10 0.0.0.0 8080 0 0 Using message size of 20 bytes Running benchmark now... Msg/sec: 106182.250000 Msg/sec: 110279.750000 ^C # this PR $ ./load_test 10 0.0.0.0 8080 0 0 Using message size of 20 bytes Running benchmark now... Msg/sec: 131632.250000 Msg/sec: 134754.250000 ^C ```
-rw-r--r--ext/http/http_next.rs42
-rw-r--r--ext/http/lib.rs1
-rw-r--r--ext/node/polyfills/internal/stream_base_commons.ts4
-rw-r--r--ext/node/polyfills/internal_binding/stream_wrap.ts30
4 files changed, 75 insertions, 2 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index d1dfb498c..019054894 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -32,6 +32,7 @@ use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
+use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
@@ -1034,6 +1035,34 @@ impl UpgradeStream {
.try_or_cancel(cancel_handle)
.await
}
+
+ async fn write_vectored(
+ self: Rc<Self>,
+ buf1: &[u8],
+ buf2: &[u8],
+ ) -> Result<usize, AnyError> {
+ let mut wr = RcRef::map(self, |r| &r.write).borrow_mut().await;
+
+ let total = buf1.len() + buf2.len();
+ let mut bufs = [std::io::IoSlice::new(buf1), std::io::IoSlice::new(buf2)];
+ let mut nwritten = wr.write_vectored(&bufs).await?;
+ if nwritten == total {
+ return Ok(nwritten);
+ }
+
+ // Slightly more optimized than (unstable) write_all_vectored for 2 iovecs.
+ while nwritten <= buf1.len() {
+ bufs[0] = std::io::IoSlice::new(&buf1[nwritten..]);
+ nwritten += wr.write_vectored(&bufs).await?;
+ }
+
+ // First buffer out of the way.
+ if nwritten < total && nwritten > buf1.len() {
+ wr.write_all(&buf2[nwritten - buf1.len()..]).await?;
+ }
+
+ Ok(total)
+ }
}
impl Resource for UpgradeStream {
@@ -1048,3 +1077,16 @@ impl Resource for UpgradeStream {
self.cancel_handle.cancel();
}
}
+
+#[op]
+pub async fn op_raw_write_vectored(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ buf1: JsBuffer,
+ buf2: JsBuffer,
+) -> Result<usize, AnyError> {
+ let resource: Rc<UpgradeStream> =
+ state.borrow().resource_table.get::<UpgradeStream>(rid)?;
+ let nwritten = resource.write_vectored(&buf1, &buf2).await?;
+ Ok(nwritten)
+}
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 1ae156b86..26cbffd1b 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -120,6 +120,7 @@ deno_core::extension!(
http_next::op_http_track,
http_next::op_http_upgrade_websocket_next,
http_next::op_http_upgrade_raw,
+ http_next::op_raw_write_vectored,
http_next::op_http_try_wait,
http_next::op_http_wait,
],
diff --git a/ext/node/polyfills/internal/stream_base_commons.ts b/ext/node/polyfills/internal/stream_base_commons.ts
index d7acf729d..01da0c5e3 100644
--- a/ext/node/polyfills/internal/stream_base_commons.ts
+++ b/ext/node/polyfills/internal/stream_base_commons.ts
@@ -253,7 +253,9 @@ export function onStreamRead(
}
} else {
const offset = streamBaseState[kArrayBufferOffset];
- const buf = Buffer.from(arrayBuffer, offset, nread);
+ // Performance note: Pass ArrayBuffer to Buffer#from to avoid
+ // copy.
+ const buf = Buffer.from(arrayBuffer.buffer, offset, nread);
result = stream.push(buf);
}
diff --git a/ext/node/polyfills/internal_binding/stream_wrap.ts b/ext/node/polyfills/internal_binding/stream_wrap.ts
index 27870b20b..95f60fe95 100644
--- a/ext/node/polyfills/internal_binding/stream_wrap.ts
+++ b/ext/node/polyfills/internal_binding/stream_wrap.ts
@@ -40,6 +40,9 @@ import {
} from "ext:deno_node/internal_binding/async_wrap.ts";
import { codeMap } from "ext:deno_node/internal_binding/uv.ts";
+const core = globalThis.Deno.core;
+const { ops } = core;
+
interface Reader {
read(p: Uint8Array): Promise<number | null>;
}
@@ -54,7 +57,7 @@ export interface Closer {
type Ref = { ref(): void; unref(): void };
-enum StreamBaseStateFields {
+const enum StreamBaseStateFields {
kReadBytesOrError,
kArrayBufferOffset,
kBytesWritten,
@@ -195,6 +198,31 @@ export class LibuvStreamWrap extends HandleWrap {
chunks: Buffer[] | (string | Buffer)[],
allBuffers: boolean,
): number {
+ const supportsWritev = this.provider === providerType.TCPSERVERWRAP;
+ // Fast case optimization: two chunks, and all buffers.
+ if (chunks.length === 2 && allBuffers && supportsWritev) {
+ // String chunks.
+ if (typeof chunks[0] === "string") chunks[0] = Buffer.from(chunks[0]);
+ if (typeof chunks[1] === "string") chunks[1] = Buffer.from(chunks[1]);
+
+ ops.op_raw_write_vectored(
+ this[kStreamBaseField]!.rid,
+ chunks[0],
+ chunks[1],
+ ).then((nwritten) => {
+ try {
+ req.oncomplete(0);
+ } catch {
+ // swallow callback errors.
+ }
+
+ streamBaseState[kBytesWritten] = nwritten;
+ this.bytesWritten += nwritten;
+ });
+
+ return 0;
+ }
+
const count = allBuffers ? chunks.length : chunks.length >> 1;
const buffers: Buffer[] = new Array(count);