diff options
Diffstat (limited to 'ext/net/01_net.js')
-rw-r--r-- | ext/net/01_net.js | 84 |
1 files changed, 84 insertions, 0 deletions
diff --git a/ext/net/01_net.js b/ext/net/01_net.js index f135a1655..2970dd817 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -4,6 +4,7 @@ ((window) => { const core = window.Deno.core; const { BadResourcePrototype, InterruptedPrototype } = core; + const { ReadableStream, WritableStream } = window.__bootstrap.streams; const { ObjectPrototypeIsPrototypeOf, PromiseResolve, @@ -59,10 +60,75 @@ return core.opAsync("op_dns_resolve", { query, recordType, options }); } + const DEFAULT_CHUNK_SIZE = 16_640; + + function tryClose(rid) { + try { + core.close(rid); + } catch { + // Ignore errors + } + } + + function readableStreamForRid(rid) { + return new ReadableStream({ + type: "bytes", + async pull(controller) { + const v = controller.byobRequest.view; + try { + const bytesRead = await read(rid, v); + if (bytesRead === null) { + tryClose(rid); + controller.close(); + controller.byobRequest.respond(0); + } else { + controller.byobRequest.respond(bytesRead); + } + } catch (e) { + controller.error(e); + tryClose(rid); + } + }, + cancel() { + tryClose(rid); + }, + autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, + }); + } + + function writableStreamForRid(rid) { + return new WritableStream({ + async write(chunk, controller) { + try { + let nwritten = 0; + while (nwritten < chunk.length) { + nwritten += await write( + rid, + TypedArrayPrototypeSubarray(chunk, nwritten), + ); + } + } catch (e) { + controller.error(e); + tryClose(rid); + } + }, + close() { + tryClose(rid); + }, + abort() { + tryClose(rid); + }, + }); + } + class Conn { #rid = 0; #remoteAddr = null; #localAddr = null; + + #readable; + #writable; + constructor(rid, remoteAddr, localAddr) { this.#rid = rid; this.#remoteAddr = remoteAddr; @@ -104,6 +170,20 @@ setKeepAlive(keepalive = true) { return core.opSync("op_set_keepalive", this.rid, keepalive); } + + get readable() { + if (this.#readable === undefined) { + this.#readable = readableStreamForRid(this.rid); + } + return this.#readable; + } + + get writable() { + if (this.#writable === undefined) { + this.#writable = writableStreamForRid(this.rid); + } + return this.#writable; + } } class Listener { @@ -252,4 +332,8 @@ Datagram, resolveDns, }; + window.__bootstrap.streamUtils = { + readableStreamForRid, + writableStreamForRid, + }; })(this); |