summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2023-09-11 20:38:57 +0200
committerGitHub <noreply@github.com>2023-09-11 20:38:57 +0200
commitaaff69db3fd8cf70d1c031720e84874cb4a4d02c (patch)
tree66ecc2651c2afc598b91878317045e0d5c231296
parent9d1385896f3c170cf4e0cb744cc2e88e12af50ab (diff)
perf(node/net): optimize socket reads for 'npm:ws' package (#20449)
Fixes performance regression introduced by https://github.com/denoland/deno/pull/20223 and https://github.com/denoland/deno/pull/20314. It's enough to have one "shared" buffer per socket and no locking mechanism is required.
-rw-r--r--ext/node/polyfills/internal_binding/stream_wrap.ts115
1 files changed, 38 insertions, 77 deletions
diff --git a/ext/node/polyfills/internal_binding/stream_wrap.ts b/ext/node/polyfills/internal_binding/stream_wrap.ts
index f59acb9b9..3e4d0be6a 100644
--- a/ext/node/polyfills/internal_binding/stream_wrap.ts
+++ b/ext/node/polyfills/internal_binding/stream_wrap.ts
@@ -114,6 +114,7 @@ export class LibuvStreamWrap extends HandleWrap {
writeQueueSize = 0;
bytesRead = 0;
bytesWritten = 0;
+ #buf = new Uint8Array(SUGGESTED_SIZE);
onread!: (_arrayBuffer: Uint8Array, _nread: number) => Uint8Array | undefined;
@@ -311,88 +312,53 @@ export class LibuvStreamWrap extends HandleWrap {
/** Internal method for reading from the attached stream. */
async #read() {
- // Lock safety: We must hold this lock until we are certain that buf is no longer used
- // This setup code is a little verbose, but we need to be careful about buffer management
- let buf, locked = false;
- if (bufLocked) {
- // Already locked, allocate
- buf = new Uint8Array(SUGGESTED_SIZE);
- } else {
- // Not locked, take the buffer + lock
- buf = BUF;
- locked = bufLocked = true;
- }
+ let buf = this.#buf;
+ let nread: number | null;
+ const ridBefore = this[kStreamBaseField]!.rid;
try {
- let nread: number | null;
- const ridBefore = this[kStreamBaseField]!.rid;
- try {
- nread = await this[kStreamBaseField]!.read(buf);
- } catch (e) {
- // Lock safety: we know that the buffer will not be used in this function again
- // All exits from this block either return or re-assign buf to a different value
- if (locked) {
- bufLocked = locked = false;
- }
-
- // Try to read again if the underlying stream resource
- // changed. This can happen during TLS upgrades (eg. STARTTLS)
- if (ridBefore != this[kStreamBaseField]!.rid) {
- return this.#read();
- }
+ nread = await this[kStreamBaseField]!.read(buf);
+ } catch (e) {
+ // Try to read again if the underlying stream resource
+ // changed. This can happen during TLS upgrades (eg. STARTTLS)
+ if (ridBefore != this[kStreamBaseField]!.rid) {
+ return this.#read();
+ }
- buf = new Uint8Array(0);
-
- if (
- e instanceof Deno.errors.Interrupted ||
- e instanceof Deno.errors.BadResource
- ) {
- nread = codeMap.get("EOF")!;
- } else if (
- e instanceof Deno.errors.ConnectionReset ||
- e instanceof Deno.errors.ConnectionAborted
- ) {
- nread = codeMap.get("ECONNRESET")!;
- } else {
- nread = codeMap.get("UNKNOWN")!;
- }
+ if (
+ e instanceof Deno.errors.Interrupted ||
+ e instanceof Deno.errors.BadResource
+ ) {
+ nread = codeMap.get("EOF")!;
+ } else if (
+ e instanceof Deno.errors.ConnectionReset ||
+ e instanceof Deno.errors.ConnectionAborted
+ ) {
+ nread = codeMap.get("ECONNRESET")!;
+ } else {
+ nread = codeMap.get("UNKNOWN")!;
}
+ }
- nread ??= codeMap.get("EOF")!;
+ nread ??= codeMap.get("EOF")!;
- streamBaseState[kReadBytesOrError] = nread;
+ streamBaseState[kReadBytesOrError] = nread;
- if (nread > 0) {
- this.bytesRead += nread;
- }
+ if (nread > 0) {
+ this.bytesRead += nread;
+ }
- // We release the lock early so a re-entrant read can make use of the shared buffer, but
- // we need to make a copy of the data in the shared buffer.
- if (locked) {
- // Lock safety: we know that the buffer will not be used in this function again
- // We're making a copy of data that lives in the shared buffer
- buf = buf.slice(0, nread);
- bufLocked = locked = false;
- } else {
- // The buffer isn't owned, so let's create a subarray view
- buf = buf.subarray(0, nread);
- }
+ buf = buf.slice(0, nread);
- streamBaseState[kArrayBufferOffset] = 0;
+ streamBaseState[kArrayBufferOffset] = 0;
- try {
- this.onread!(buf, nread);
- } catch {
- // swallow callback errors.
- }
+ try {
+ this.onread!(buf, nread);
+ } catch {
+ // swallow callback errors.
+ }
- if (nread >= 0 && this.#reading) {
- this.#read();
- }
- } finally {
- // Lock safety: we know that the buffer will not be used in this function again
- if (locked) {
- bufLocked = locked = false;
- }
+ if (nread >= 0 && this.#reading) {
+ this.#read();
}
}
@@ -454,8 +420,3 @@ export class LibuvStreamWrap extends HandleWrap {
return;
}
}
-
-// Used in #read above
-const BUF = new Uint8Array(SUGGESTED_SIZE);
-// We need to ensure that only one inflight read request uses the cached buffer above
-let bufLocked = false;