summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-08-28 15:28:39 -0600
committerGitHub <noreply@github.com>2023-08-28 15:28:39 -0600
commit7adaf613bfbeab0f6aed92294e3eea912990d819 (patch)
tree0484633b318a0543f8c7d78ac07f31b0848c936f
parent9198bbd454c39f4d62f43ea729affe8cb789304a (diff)
fix(ext/node): shared global buffer unlock correctness fix (#20314)
The fix for #20188 was not entirely correct -- we were unlocking the global buffer incorrectly. This PR introduces a lock state that ensures we only unlock a lock we have taken out.
-rw-r--r--cli/tests/unit_node/net_test.ts70
-rw-r--r--ext/node/polyfills/internal_binding/stream_wrap.ts41
2 files changed, 73 insertions, 38 deletions
diff --git a/cli/tests/unit_node/net_test.ts b/cli/tests/unit_node/net_test.ts
index 312271f88..b9d9b796a 100644
--- a/cli/tests/unit_node/net_test.ts
+++ b/cli/tests/unit_node/net_test.ts
@@ -5,7 +5,7 @@ import {
assert,
assertEquals,
} from "../../../test_util/std/testing/asserts.ts";
-import { deferred } from "../../../test_util/std/async/deferred.ts";
+import { Deferred, deferred } from "../../../test_util/std/async/deferred.ts";
import * as path from "../../../test_util/std/path/mod.ts";
import * as http from "node:http";
@@ -131,17 +131,18 @@ Deno.test("[node/net] connection event has socket value", async () => {
await Promise.all([p, p2]);
});
+/// We need to make sure that any shared buffers are never used concurrently by two reads.
// https://github.com/denoland/deno/issues/20188
Deno.test("[node/net] multiple Sockets should get correct server data", async () => {
- const p = deferred();
- const p2 = deferred();
+ const socketCount = 9;
- const dataReceived1 = deferred();
- const dataReceived2 = deferred();
-
- const events1: string[] = [];
- const events2: string[] = [];
+ class TestSocket {
+ dataReceived: Deferred<undefined> = deferred();
+ events: string[] = [];
+ socket: net.Socket | undefined;
+ }
+ const finished = deferred();
const server = net.createServer();
server.on("connection", (socket) => {
assert(socket !== undefined);
@@ -150,40 +151,47 @@ Deno.test("[node/net] multiple Sockets should get correct server data", async ()
});
});
+ const sockets: TestSocket[] = [];
+ for (let i = 0; i < socketCount; i++) {
+ sockets[i] = new TestSocket();
+ }
+
server.listen(async () => {
// deno-lint-ignore no-explicit-any
const { port } = server.address() as any;
- const socket1 = net.createConnection(port);
- const socket2 = net.createConnection(port);
-
- socket1.on("data", (data) => {
- events1.push(new TextDecoder().decode(data));
- dataReceived1.resolve();
- });
-
- socket2.on("data", (data) => {
- events2.push(new TextDecoder().decode(data));
- dataReceived2.resolve();
- });
+ for (let i = 0; i < socketCount; i++) {
+ const socket = sockets[i].socket = net.createConnection(port);
+ socket.on("data", (data) => {
+ const count = sockets[i].events.length;
+ sockets[i].events.push(new TextDecoder().decode(data));
+ if (count === 0) {
+ // Trigger an immediate second write
+ sockets[i].socket?.write(`${i}`.repeat(3));
+ } else {
+ sockets[i].dataReceived.resolve();
+ }
+ });
+ }
- socket1.write("111");
- socket2.write("222");
+ for (let i = 0; i < socketCount; i++) {
+ sockets[i].socket?.write(`${i}`.repeat(3));
+ }
- await Promise.all([dataReceived1, dataReceived2]);
+ await Promise.all(sockets.map((socket) => socket.dataReceived));
- socket1.end();
- socket2.end();
+ for (let i = 0; i < socketCount; i++) {
+ sockets[i].socket?.end();
+ }
server.close(() => {
- p.resolve();
+ finished.resolve();
});
-
- p2.resolve();
});
- await Promise.all([p, p2]);
+ await finished;
- assertEquals(events1, ["111"]);
- assertEquals(events2, ["222"]);
+ for (let i = 0; i < socketCount; i++) {
+ assertEquals(sockets[i].events, [`${i}`.repeat(3), `${i}`.repeat(3)]);
+ }
});
diff --git a/ext/node/polyfills/internal_binding/stream_wrap.ts b/ext/node/polyfills/internal_binding/stream_wrap.ts
index 8e976da2c..f59acb9b9 100644
--- a/ext/node/polyfills/internal_binding/stream_wrap.ts
+++ b/ext/node/polyfills/internal_binding/stream_wrap.ts
@@ -311,21 +311,37 @@ export class LibuvStreamWrap extends HandleWrap {
/** Internal method for reading from the attached stream. */
async #read() {
- const isOwnedBuf = bufLocked;
- let buf = bufLocked ? new Uint8Array(SUGGESTED_SIZE) : BUF;
- bufLocked = true;
+ // Lock safety: We must hold this lock until we are certain that buf is no longer used
+ // This setup code is a little verbose, but we need to be careful about buffer management
+ let buf, locked = false;
+ if (bufLocked) {
+ // Already locked, allocate
+ buf = new Uint8Array(SUGGESTED_SIZE);
+ } else {
+ // Not locked, take the buffer + lock
+ buf = BUF;
+ locked = bufLocked = true;
+ }
try {
let nread: number | null;
const ridBefore = this[kStreamBaseField]!.rid;
try {
nread = await this[kStreamBaseField]!.read(buf);
} catch (e) {
+ // Lock safety: we know that the buffer will not be used in this function again
+ // All exits from this block either return or re-assign buf to a different value
+ if (locked) {
+ bufLocked = locked = false;
+ }
+
// Try to read again if the underlying stream resource
// changed. This can happen during TLS upgrades (eg. STARTTLS)
if (ridBefore != this[kStreamBaseField]!.rid) {
return this.#read();
}
+ buf = new Uint8Array(0);
+
if (
e instanceof Deno.errors.Interrupted ||
e instanceof Deno.errors.BadResource
@@ -339,8 +355,6 @@ export class LibuvStreamWrap extends HandleWrap {
} else {
nread = codeMap.get("UNKNOWN")!;
}
-
- buf = new Uint8Array(0);
}
nread ??= codeMap.get("EOF")!;
@@ -351,7 +365,17 @@ export class LibuvStreamWrap extends HandleWrap {
this.bytesRead += nread;
}
- buf = isOwnedBuf ? buf.subarray(0, nread) : buf.slice(0, nread);
+ // We release the lock early so a re-entrant read can make use of the shared buffer, but
+ // we need to make a copy of the data in the shared buffer.
+ if (locked) {
+ // Lock safety: we know that the buffer will not be used in this function again
+ // We're making a copy of data that lives in the shared buffer
+ buf = buf.slice(0, nread);
+ bufLocked = locked = false;
+ } else {
+ // The buffer isn't owned, so let's create a subarray view
+ buf = buf.subarray(0, nread);
+ }
streamBaseState[kArrayBufferOffset] = 0;
@@ -365,7 +389,10 @@ export class LibuvStreamWrap extends HandleWrap {
this.#read();
}
} finally {
- bufLocked = false;
+ // Lock safety: we know that the buffer will not be used in this function again
+ if (locked) {
+ bufLocked = locked = false;
+ }
}
}