summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/unit/net_test.ts49
-rw-r--r--ext/net/01_net.js63
-rw-r--r--ext/net/lib.deno_net.d.ts14
3 files changed, 110 insertions, 16 deletions
diff --git a/cli/tests/unit/net_test.ts b/cli/tests/unit/net_test.ts
index b4a21578e..930e4f052 100644
--- a/cli/tests/unit/net_test.ts
+++ b/cli/tests/unit/net_test.ts
@@ -906,6 +906,55 @@ Deno.test({
listener.close();
});
+Deno.test(
+ { permissions: { net: true, read: true, run: true } },
+ async function netConnUnref() {
+ const listener = Deno.listen({ port: 3500 });
+ const intervalId = setInterval(() => {}); // This keeps event loop alive.
+
+ const program = execCode(`
+ async function main() {
+ const conn = await Deno.connect({ port: 3500 });
+ conn.unref();
+ await conn.read(new Uint8Array(10)); // The program exits here
+ throw new Error(); // The program doesn't reach here
+ }
+ main();
+ `);
+ const conn = await listener.accept();
+ const [statusCode, _output] = await program;
+ conn.close();
+ listener.close();
+ clearInterval(intervalId);
+ assertEquals(statusCode, 0);
+ },
+);
+
+Deno.test(
+ { permissions: { net: true, read: true, run: true } },
+ async function netConnUnrefReadable() {
+ const listener = Deno.listen({ port: 3500 });
+ const intervalId = setInterval(() => {}); // This keeps event loop alive.
+
+ const program = execCode(`
+ async function main() {
+ const conn = await Deno.connect({ port: 3500 });
+ conn.unref();
+ const reader = conn.readable.getReader();
+ await reader.read(); // The program exits here
+ throw new Error(); // The program doesn't reach here
+ }
+ main();
+ `);
+ const conn = await listener.accept();
+ const [statusCode, _output] = await program;
+ conn.close();
+ listener.close();
+ clearInterval(intervalId);
+ assertEquals(statusCode, 0);
+ },
+);
+
Deno.test({ permissions: { net: true } }, async function netTcpReuseAddr() {
const listener1 = Deno.listen({
hostname: "127.0.0.1",
diff --git a/ext/net/01_net.js b/ext/net/01_net.js
index f25904e80..244b50a51 100644
--- a/ext/net/01_net.js
+++ b/ext/net/01_net.js
@@ -4,8 +4,12 @@
((window) => {
const core = window.Deno.core;
const { BadResourcePrototype, InterruptedPrototype, ops } = core;
- const { readableStreamForRid, writableStreamForRid } =
- window.__bootstrap.streams;
+ const {
+ readableStreamForRidUnrefable,
+ readableStreamForRidUnrefableRef,
+ readableStreamForRidUnrefableUnref,
+ writableStreamForRid,
+ } = window.__bootstrap.streams;
const {
Error,
ObjectPrototypeIsPrototypeOf,
@@ -19,17 +23,6 @@
const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");
- async function read(
- rid,
- buffer,
- ) {
- if (buffer.length === 0) {
- return 0;
- }
- const nread = await core.read(rid, buffer);
- return nread === 0 ? null : nread;
- }
-
async function write(rid, data) {
return await core.write(rid, data);
}
@@ -46,6 +39,8 @@
#rid = 0;
#remoteAddr = null;
#localAddr = null;
+ #unref = false;
+ #pendingReadPromiseIds = [];
#readable;
#writable;
@@ -72,8 +67,25 @@
return write(this.rid, p);
}
- read(p) {
- return read(this.rid, p);
+ async read(buffer) {
+ if (buffer.length === 0) {
+ return 0;
+ }
+ const promise = core.read(this.rid, buffer);
+ const promiseId = promise[promiseIdSymbol];
+ if (this.#unref) core.unrefOp(promiseId);
+ this.#pendingReadPromiseIds.push(promiseId);
+ let nread;
+ try {
+ nread = await promise;
+ } catch (e) {
+ throw e;
+ } finally {
+ this.#pendingReadPromiseIds = this.#pendingReadPromiseIds.filter((id) =>
+ id !== promiseId
+ );
+ }
+ return nread === 0 ? null : nread;
}
close() {
@@ -86,7 +98,10 @@
get readable() {
if (this.#readable === undefined) {
- this.#readable = readableStreamForRid(this.rid);
+ this.#readable = readableStreamForRidUnrefable(this.rid);
+ if (this.#unref) {
+ readableStreamForRidUnrefableUnref(this.#readable);
+ }
}
return this.#readable;
}
@@ -97,6 +112,22 @@
}
return this.#writable;
}
+
+ ref() {
+ this.#unref = false;
+ if (this.#readable) {
+ readableStreamForRidUnrefableRef(this.#readable);
+ }
+ this.#pendingReadPromiseIds.forEach((id) => core.refOp(id));
+ }
+
+ unref() {
+ this.#unref = true;
+ if (this.#readable) {
+ readableStreamForRidUnrefableUnref(this.#readable);
+ }
+ this.#pendingReadPromiseIds.forEach((id) => core.unrefOp(id));
+ }
}
class TcpConn extends Conn {
diff --git a/ext/net/lib.deno_net.d.ts b/ext/net/lib.deno_net.d.ts
index 639d0f8af..136723262 100644
--- a/ext/net/lib.deno_net.d.ts
+++ b/ext/net/lib.deno_net.d.ts
@@ -61,6 +61,20 @@ declare namespace Deno {
* callers should just use `close()`. */
closeWrite(): Promise<void>;
+ /** **UNSTABLE**: New API, yet to be vetted.
+ *
+ * Make the connection block the event loop from finishing.
+ *
+ * Note: the connection blocks the event loop from finishing by default.
+ * This method is only meaningful after `.unref()` is called.
+ */
+ ref(): void;
+ /** **UNSTABLE**: New API, yet to be vetted.
+ *
+ * Make the connection not block the event loop from finishing.
+ */
+ unref(): void;
+
readonly readable: ReadableStream<Uint8Array>;
readonly writable: WritableStream<Uint8Array>;
}