diff options
Diffstat (limited to 'ext/net/01_net.js')
-rw-r--r-- | ext/net/01_net.js | 63 |
1 files changed, 47 insertions, 16 deletions
diff --git a/ext/net/01_net.js b/ext/net/01_net.js index f25904e80..244b50a51 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -4,8 +4,12 @@ ((window) => { const core = window.Deno.core; const { BadResourcePrototype, InterruptedPrototype, ops } = core; - const { readableStreamForRid, writableStreamForRid } = - window.__bootstrap.streams; + const { + readableStreamForRidUnrefable, + readableStreamForRidUnrefableRef, + readableStreamForRidUnrefableUnref, + writableStreamForRid, + } = window.__bootstrap.streams; const { Error, ObjectPrototypeIsPrototypeOf, @@ -19,17 +23,6 @@ const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId"); - async function read( - rid, - buffer, - ) { - if (buffer.length === 0) { - return 0; - } - const nread = await core.read(rid, buffer); - return nread === 0 ? null : nread; - } - async function write(rid, data) { return await core.write(rid, data); } @@ -46,6 +39,8 @@ #rid = 0; #remoteAddr = null; #localAddr = null; + #unref = false; + #pendingReadPromiseIds = []; #readable; #writable; @@ -72,8 +67,25 @@ return write(this.rid, p); } - read(p) { - return read(this.rid, p); + async read(buffer) { + if (buffer.length === 0) { + return 0; + } + const promise = core.read(this.rid, buffer); + const promiseId = promise[promiseIdSymbol]; + if (this.#unref) core.unrefOp(promiseId); + this.#pendingReadPromiseIds.push(promiseId); + let nread; + try { + nread = await promise; + } catch (e) { + throw e; + } finally { + this.#pendingReadPromiseIds = this.#pendingReadPromiseIds.filter((id) => + id !== promiseId + ); + } + return nread === 0 ? null : nread; } close() { @@ -86,7 +98,10 @@ get readable() { if (this.#readable === undefined) { - this.#readable = readableStreamForRid(this.rid); + this.#readable = readableStreamForRidUnrefable(this.rid); + if (this.#unref) { + readableStreamForRidUnrefableUnref(this.#readable); + } } return this.#readable; } @@ -97,6 +112,22 @@ } return this.#writable; } + + ref() { + this.#unref = false; + if (this.#readable) { + readableStreamForRidUnrefableRef(this.#readable); + } + this.#pendingReadPromiseIds.forEach((id) => core.refOp(id)); + } + + unref() { + this.#unref = true; + if (this.#readable) { + readableStreamForRidUnrefableUnref(this.#readable); + } + this.#pendingReadPromiseIds.forEach((id) => core.unrefOp(id)); + } } class TcpConn extends Conn { |