diff options
author | Divy Srivastava <dj.srivastava23@gmail.com> | 2023-07-07 22:17:08 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-07-07 22:17:08 +0530 |
commit | e4870d84be19f4ea768933522eff437f64963730 (patch) | |
tree | 789e83bc1d67754b917874b8f92d4a1227644930 | |
parent | 7d022ad11a74710ce46e4ab9f4e57635cae3ed2e (diff) |
perf(ext/node): native vectored write for server streams (#19752)
```
# main
$ ./load_test 10 0.0.0.0 8080 0 0
Using message size of 20 bytes
Running benchmark now...
Msg/sec: 106182.250000
Msg/sec: 110279.750000
^C
# this PR
$ ./load_test 10 0.0.0.0 8080 0 0
Using message size of 20 bytes
Running benchmark now...
Msg/sec: 131632.250000
Msg/sec: 134754.250000
^C
```
-rw-r--r-- | ext/http/http_next.rs | 42 | ||||
-rw-r--r-- | ext/http/lib.rs | 1 | ||||
-rw-r--r-- | ext/node/polyfills/internal/stream_base_commons.ts | 4 | ||||
-rw-r--r-- | ext/node/polyfills/internal_binding/stream_wrap.ts | 30 |
4 files changed, 75 insertions, 2 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index d1dfb498c..019054894 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -32,6 +32,7 @@ use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::CancelTryFuture; +use deno_core::JsBuffer; use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; @@ -1034,6 +1035,34 @@ impl UpgradeStream { .try_or_cancel(cancel_handle) .await } + + async fn write_vectored( + self: Rc<Self>, + buf1: &[u8], + buf2: &[u8], + ) -> Result<usize, AnyError> { + let mut wr = RcRef::map(self, |r| &r.write).borrow_mut().await; + + let total = buf1.len() + buf2.len(); + let mut bufs = [std::io::IoSlice::new(buf1), std::io::IoSlice::new(buf2)]; + let mut nwritten = wr.write_vectored(&bufs).await?; + if nwritten == total { + return Ok(nwritten); + } + + // Slightly more optimized than (unstable) write_all_vectored for 2 iovecs. + while nwritten <= buf1.len() { + bufs[0] = std::io::IoSlice::new(&buf1[nwritten..]); + nwritten += wr.write_vectored(&bufs).await?; + } + + // First buffer out of the way. + if nwritten < total && nwritten > buf1.len() { + wr.write_all(&buf2[nwritten - buf1.len()..]).await?; + } + + Ok(total) + } } impl Resource for UpgradeStream { @@ -1048,3 +1077,16 @@ impl Resource for UpgradeStream { self.cancel_handle.cancel(); } } + +#[op] +pub async fn op_raw_write_vectored( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + buf1: JsBuffer, + buf2: JsBuffer, +) -> Result<usize, AnyError> { + let resource: Rc<UpgradeStream> = + state.borrow().resource_table.get::<UpgradeStream>(rid)?; + let nwritten = resource.write_vectored(&buf1, &buf2).await?; + Ok(nwritten) +} diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 1ae156b86..26cbffd1b 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -120,6 +120,7 @@ deno_core::extension!( http_next::op_http_track, http_next::op_http_upgrade_websocket_next, http_next::op_http_upgrade_raw, + http_next::op_raw_write_vectored, http_next::op_http_try_wait, http_next::op_http_wait, ], diff --git a/ext/node/polyfills/internal/stream_base_commons.ts b/ext/node/polyfills/internal/stream_base_commons.ts index d7acf729d..01da0c5e3 100644 --- a/ext/node/polyfills/internal/stream_base_commons.ts +++ b/ext/node/polyfills/internal/stream_base_commons.ts @@ -253,7 +253,9 @@ export function onStreamRead( } } else { const offset = streamBaseState[kArrayBufferOffset]; - const buf = Buffer.from(arrayBuffer, offset, nread); + // Performance note: Pass ArrayBuffer to Buffer#from to avoid + // copy. + const buf = Buffer.from(arrayBuffer.buffer, offset, nread); result = stream.push(buf); } diff --git a/ext/node/polyfills/internal_binding/stream_wrap.ts b/ext/node/polyfills/internal_binding/stream_wrap.ts index 27870b20b..95f60fe95 100644 --- a/ext/node/polyfills/internal_binding/stream_wrap.ts +++ b/ext/node/polyfills/internal_binding/stream_wrap.ts @@ -40,6 +40,9 @@ import { } from "ext:deno_node/internal_binding/async_wrap.ts"; import { codeMap } from "ext:deno_node/internal_binding/uv.ts"; +const core = globalThis.Deno.core; +const { ops } = core; + interface Reader { read(p: Uint8Array): Promise<number | null>; } @@ -54,7 +57,7 @@ export interface Closer { type Ref = { ref(): void; unref(): void }; -enum StreamBaseStateFields { +const enum StreamBaseStateFields { kReadBytesOrError, kArrayBufferOffset, kBytesWritten, @@ -195,6 +198,31 @@ export class LibuvStreamWrap extends HandleWrap { chunks: Buffer[] | (string | Buffer)[], allBuffers: boolean, ): number { + const supportsWritev = this.provider === providerType.TCPSERVERWRAP; + // Fast case optimization: two chunks, and all buffers. + if (chunks.length === 2 && allBuffers && supportsWritev) { + // String chunks. + if (typeof chunks[0] === "string") chunks[0] = Buffer.from(chunks[0]); + if (typeof chunks[1] === "string") chunks[1] = Buffer.from(chunks[1]); + + ops.op_raw_write_vectored( + this[kStreamBaseField]!.rid, + chunks[0], + chunks[1], + ).then((nwritten) => { + try { + req.oncomplete(0); + } catch { + // swallow callback errors. + } + + streamBaseState[kBytesWritten] = nwritten; + this.bytesWritten += nwritten; + }); + + return 0; + } + const count = allBuffers ? chunks.length : chunks.length >> 1; const buffers: Buffer[] = new Array(count); |