diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2022-12-28 10:29:48 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-12-28 10:29:48 +0100 |
commit | bece1ce05704d9a81efd2e04a22ee721795903e7 (patch) | |
tree | 1da9f477eef90e3837a2366bec0bdd9f8a79b3c0 /ext/net/01_net.js | |
parent | 7ce2b58bcf412924464578f3469c210b34894c8b (diff) |
feat(unstable): Add Deno.Conn.ref()/unref() (#17170)
This commit adds "Deno.Conn.ref()" and "Deno.Conn.unref()" methods.
These methods can be used to make connection block or not block the
event loop from finishing. Refing/unrefing only influences "read"
operations - ie. scheduling writes to a connection _do_ keep event
loop alive.
Required for https://github.com/denoland/deno/issues/16710
Diffstat (limited to 'ext/net/01_net.js')
-rw-r--r-- | ext/net/01_net.js | 63 |
1 files changed, 47 insertions, 16 deletions
diff --git a/ext/net/01_net.js b/ext/net/01_net.js index f25904e80..244b50a51 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -4,8 +4,12 @@ ((window) => { const core = window.Deno.core; const { BadResourcePrototype, InterruptedPrototype, ops } = core; - const { readableStreamForRid, writableStreamForRid } = - window.__bootstrap.streams; + const { + readableStreamForRidUnrefable, + readableStreamForRidUnrefableRef, + readableStreamForRidUnrefableUnref, + writableStreamForRid, + } = window.__bootstrap.streams; const { Error, ObjectPrototypeIsPrototypeOf, @@ -19,17 +23,6 @@ const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId"); - async function read( - rid, - buffer, - ) { - if (buffer.length === 0) { - return 0; - } - const nread = await core.read(rid, buffer); - return nread === 0 ? null : nread; - } - async function write(rid, data) { return await core.write(rid, data); } @@ -46,6 +39,8 @@ #rid = 0; #remoteAddr = null; #localAddr = null; + #unref = false; + #pendingReadPromiseIds = []; #readable; #writable; @@ -72,8 +67,25 @@ return write(this.rid, p); } - read(p) { - return read(this.rid, p); + async read(buffer) { + if (buffer.length === 0) { + return 0; + } + const promise = core.read(this.rid, buffer); + const promiseId = promise[promiseIdSymbol]; + if (this.#unref) core.unrefOp(promiseId); + this.#pendingReadPromiseIds.push(promiseId); + let nread; + try { + nread = await promise; + } catch (e) { + throw e; + } finally { + this.#pendingReadPromiseIds = this.#pendingReadPromiseIds.filter((id) => + id !== promiseId + ); + } + return nread === 0 ? null : nread; } close() { @@ -86,7 +98,10 @@ get readable() { if (this.#readable === undefined) { - this.#readable = readableStreamForRid(this.rid); + this.#readable = readableStreamForRidUnrefable(this.rid); + if (this.#unref) { + readableStreamForRidUnrefableUnref(this.#readable); + } } return this.#readable; } @@ -97,6 +112,22 @@ } return this.#writable; } + + ref() { + this.#unref = false; + if (this.#readable) { + readableStreamForRidUnrefableRef(this.#readable); + } + this.#pendingReadPromiseIds.forEach((id) => core.refOp(id)); + } + + unref() { + this.#unref = true; + if (this.#readable) { + readableStreamForRidUnrefableUnref(this.#readable); + } + this.#pendingReadPromiseIds.forEach((id) => core.unrefOp(id)); + } } class TcpConn extends Conn { |