summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/unit_node/net_test.ts57
-rw-r--r--ext/node/polyfills/internal_binding/stream_wrap.ts88
2 files changed, 105 insertions, 40 deletions
diff --git a/cli/tests/unit_node/net_test.ts b/cli/tests/unit_node/net_test.ts
index 3b78cbe32..312271f88 100644
--- a/cli/tests/unit_node/net_test.ts
+++ b/cli/tests/unit_node/net_test.ts
@@ -130,3 +130,60 @@ Deno.test("[node/net] connection event has socket value", async () => {
await Promise.all([p, p2]);
});
+
+// https://github.com/denoland/deno/issues/20188
+Deno.test("[node/net] multiple Sockets should get correct server data", async () => {
+ const p = deferred();
+ const p2 = deferred();
+
+ const dataReceived1 = deferred();
+ const dataReceived2 = deferred();
+
+ const events1: string[] = [];
+ const events2: string[] = [];
+
+ const server = net.createServer();
+ server.on("connection", (socket) => {
+ assert(socket !== undefined);
+ socket.on("data", (data) => {
+ socket.write(new TextDecoder().decode(data));
+ });
+ });
+
+ server.listen(async () => {
+ // deno-lint-ignore no-explicit-any
+ const { port } = server.address() as any;
+
+ const socket1 = net.createConnection(port);
+ const socket2 = net.createConnection(port);
+
+ socket1.on("data", (data) => {
+ events1.push(new TextDecoder().decode(data));
+ dataReceived1.resolve();
+ });
+
+ socket2.on("data", (data) => {
+ events2.push(new TextDecoder().decode(data));
+ dataReceived2.resolve();
+ });
+
+ socket1.write("111");
+ socket2.write("222");
+
+ await Promise.all([dataReceived1, dataReceived2]);
+
+ socket1.end();
+ socket2.end();
+
+ server.close(() => {
+ p.resolve();
+ });
+
+ p2.resolve();
+ });
+
+ await Promise.all([p, p2]);
+
+ assertEquals(events1, ["111"]);
+ assertEquals(events2, ["222"]);
+});
diff --git a/ext/node/polyfills/internal_binding/stream_wrap.ts b/ext/node/polyfills/internal_binding/stream_wrap.ts
index 66ebbe682..8e976da2c 100644
--- a/ext/node/polyfills/internal_binding/stream_wrap.ts
+++ b/ext/node/polyfills/internal_binding/stream_wrap.ts
@@ -311,56 +311,61 @@ export class LibuvStreamWrap extends HandleWrap {
/** Internal method for reading from the attached stream. */
async #read() {
- let buf = BUF;
-
- let nread: number | null;
- const ridBefore = this[kStreamBaseField]!.rid;
+ const isOwnedBuf = bufLocked;
+ let buf = bufLocked ? new Uint8Array(SUGGESTED_SIZE) : BUF;
+ bufLocked = true;
try {
- nread = await this[kStreamBaseField]!.read(buf);
- } catch (e) {
- // Try to read again if the underlying stream resource
- // changed. This can happen during TLS upgrades (eg. STARTTLS)
- if (ridBefore != this[kStreamBaseField]!.rid) {
- return this.#read();
- }
+ let nread: number | null;
+ const ridBefore = this[kStreamBaseField]!.rid;
+ try {
+ nread = await this[kStreamBaseField]!.read(buf);
+ } catch (e) {
+ // Try to read again if the underlying stream resource
+ // changed. This can happen during TLS upgrades (eg. STARTTLS)
+ if (ridBefore != this[kStreamBaseField]!.rid) {
+ return this.#read();
+ }
- if (
- e instanceof Deno.errors.Interrupted ||
- e instanceof Deno.errors.BadResource
- ) {
- nread = codeMap.get("EOF")!;
- } else if (
- e instanceof Deno.errors.ConnectionReset ||
- e instanceof Deno.errors.ConnectionAborted
- ) {
- nread = codeMap.get("ECONNRESET")!;
- } else {
- nread = codeMap.get("UNKNOWN")!;
- }
+ if (
+ e instanceof Deno.errors.Interrupted ||
+ e instanceof Deno.errors.BadResource
+ ) {
+ nread = codeMap.get("EOF")!;
+ } else if (
+ e instanceof Deno.errors.ConnectionReset ||
+ e instanceof Deno.errors.ConnectionAborted
+ ) {
+ nread = codeMap.get("ECONNRESET")!;
+ } else {
+ nread = codeMap.get("UNKNOWN")!;
+ }
- buf = new Uint8Array(0);
- }
+ buf = new Uint8Array(0);
+ }
- nread ??= codeMap.get("EOF")!;
+ nread ??= codeMap.get("EOF")!;
- streamBaseState[kReadBytesOrError] = nread;
+ streamBaseState[kReadBytesOrError] = nread;
- if (nread > 0) {
- this.bytesRead += nread;
- }
+ if (nread > 0) {
+ this.bytesRead += nread;
+ }
- buf = buf.slice(0, nread);
+ buf = isOwnedBuf ? buf.subarray(0, nread) : buf.slice(0, nread);
- streamBaseState[kArrayBufferOffset] = 0;
+ streamBaseState[kArrayBufferOffset] = 0;
- try {
- this.onread!(buf, nread);
- } catch {
- // swallow callback errors.
- }
+ try {
+ this.onread!(buf, nread);
+ } catch {
+ // swallow callback errors.
+ }
- if (nread >= 0 && this.#reading) {
- this.#read();
+ if (nread >= 0 && this.#reading) {
+ this.#read();
+ }
+ } finally {
+ bufLocked = false;
}
}
@@ -423,4 +428,7 @@ export class LibuvStreamWrap extends HandleWrap {
}
}
+// Used in #read above
const BUF = new Uint8Array(SUGGESTED_SIZE);
+// We need to ensure that only one inflight read request uses the cached buffer above
+let bufLocked = false;