diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2023-09-11 20:38:57 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-11 20:38:57 +0200 |
commit | aaff69db3fd8cf70d1c031720e84874cb4a4d02c (patch) | |
tree | 66ecc2651c2afc598b91878317045e0d5c231296 | |
parent | 9d1385896f3c170cf4e0cb744cc2e88e12af50ab (diff) |
perf(node/net): optimize socket reads for 'npm:ws' package (#20449)
Fixes performance regression introduced by
https://github.com/denoland/deno/pull/20223 and
https://github.com/denoland/deno/pull/20314. It's enough to have one
"shared" buffer per socket
and no locking mechanism is required.
-rw-r--r-- | ext/node/polyfills/internal_binding/stream_wrap.ts | 115 |
1 files changed, 38 insertions, 77 deletions
diff --git a/ext/node/polyfills/internal_binding/stream_wrap.ts b/ext/node/polyfills/internal_binding/stream_wrap.ts index f59acb9b9..3e4d0be6a 100644 --- a/ext/node/polyfills/internal_binding/stream_wrap.ts +++ b/ext/node/polyfills/internal_binding/stream_wrap.ts @@ -114,6 +114,7 @@ export class LibuvStreamWrap extends HandleWrap { writeQueueSize = 0; bytesRead = 0; bytesWritten = 0; + #buf = new Uint8Array(SUGGESTED_SIZE); onread!: (_arrayBuffer: Uint8Array, _nread: number) => Uint8Array | undefined; @@ -311,88 +312,53 @@ export class LibuvStreamWrap extends HandleWrap { /** Internal method for reading from the attached stream. */ async #read() { - // Lock safety: We must hold this lock until we are certain that buf is no longer used - // This setup code is a little verbose, but we need to be careful about buffer management - let buf, locked = false; - if (bufLocked) { - // Already locked, allocate - buf = new Uint8Array(SUGGESTED_SIZE); - } else { - // Not locked, take the buffer + lock - buf = BUF; - locked = bufLocked = true; - } + let buf = this.#buf; + let nread: number | null; + const ridBefore = this[kStreamBaseField]!.rid; try { - let nread: number | null; - const ridBefore = this[kStreamBaseField]!.rid; - try { - nread = await this[kStreamBaseField]!.read(buf); - } catch (e) { - // Lock safety: we know that the buffer will not be used in this function again - // All exits from this block either return or re-assign buf to a different value - if (locked) { - bufLocked = locked = false; - } - - // Try to read again if the underlying stream resource - // changed. This can happen during TLS upgrades (eg. STARTTLS) - if (ridBefore != this[kStreamBaseField]!.rid) { - return this.#read(); - } + nread = await this[kStreamBaseField]!.read(buf); + } catch (e) { + // Try to read again if the underlying stream resource + // changed. This can happen during TLS upgrades (eg. STARTTLS) + if (ridBefore != this[kStreamBaseField]!.rid) { + return this.#read(); + } - buf = new Uint8Array(0); - - if ( - e instanceof Deno.errors.Interrupted || - e instanceof Deno.errors.BadResource - ) { - nread = codeMap.get("EOF")!; - } else if ( - e instanceof Deno.errors.ConnectionReset || - e instanceof Deno.errors.ConnectionAborted - ) { - nread = codeMap.get("ECONNRESET")!; - } else { - nread = codeMap.get("UNKNOWN")!; - } + if ( + e instanceof Deno.errors.Interrupted || + e instanceof Deno.errors.BadResource + ) { + nread = codeMap.get("EOF")!; + } else if ( + e instanceof Deno.errors.ConnectionReset || + e instanceof Deno.errors.ConnectionAborted + ) { + nread = codeMap.get("ECONNRESET")!; + } else { + nread = codeMap.get("UNKNOWN")!; } + } - nread ??= codeMap.get("EOF")!; + nread ??= codeMap.get("EOF")!; - streamBaseState[kReadBytesOrError] = nread; + streamBaseState[kReadBytesOrError] = nread; - if (nread > 0) { - this.bytesRead += nread; - } + if (nread > 0) { + this.bytesRead += nread; + } - // We release the lock early so a re-entrant read can make use of the shared buffer, but - // we need to make a copy of the data in the shared buffer. - if (locked) { - // Lock safety: we know that the buffer will not be used in this function again - // We're making a copy of data that lives in the shared buffer - buf = buf.slice(0, nread); - bufLocked = locked = false; - } else { - // The buffer isn't owned, so let's create a subarray view - buf = buf.subarray(0, nread); - } + buf = buf.slice(0, nread); - streamBaseState[kArrayBufferOffset] = 0; + streamBaseState[kArrayBufferOffset] = 0; - try { - this.onread!(buf, nread); - } catch { - // swallow callback errors. - } + try { + this.onread!(buf, nread); + } catch { + // swallow callback errors. + } - if (nread >= 0 && this.#reading) { - this.#read(); - } - } finally { - // Lock safety: we know that the buffer will not be used in this function again - if (locked) { - bufLocked = locked = false; - } + if (nread >= 0 && this.#reading) { + this.#read(); } } @@ -454,8 +420,3 @@ export class LibuvStreamWrap extends HandleWrap { return; } } - -// Used in #read above -const BUF = new Uint8Array(SUGGESTED_SIZE); -// We need to ensure that only one inflight read request uses the cached buffer above -let bufLocked = false; |