From bdc8006a362b4f95107a25ca816dcdedb7f44e4a Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Tue, 15 Feb 2022 13:35:22 +0100 Subject: feat(runtime): web streams in fs & net APIs (#13615) This commit adds `readable` and `writable` properties to `Deno.File` and `Deno.Conn`. This makes it very simple to use files and network sockets with fetch or the native HTTP server. --- ext/net/01_net.js | 84 +++++++++++++++++++++++++++++++++++++++++++++++ ext/net/lib.deno_net.d.ts | 3 ++ 2 files changed, 87 insertions(+) (limited to 'ext/net') diff --git a/ext/net/01_net.js b/ext/net/01_net.js index f135a1655..2970dd817 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -4,6 +4,7 @@ ((window) => { const core = window.Deno.core; const { BadResourcePrototype, InterruptedPrototype } = core; + const { ReadableStream, WritableStream } = window.__bootstrap.streams; const { ObjectPrototypeIsPrototypeOf, PromiseResolve, @@ -59,10 +60,75 @@ return core.opAsync("op_dns_resolve", { query, recordType, options }); } + const DEFAULT_CHUNK_SIZE = 16_640; + + function tryClose(rid) { + try { + core.close(rid); + } catch { + // Ignore errors + } + } + + function readableStreamForRid(rid) { + return new ReadableStream({ + type: "bytes", + async pull(controller) { + const v = controller.byobRequest.view; + try { + const bytesRead = await read(rid, v); + if (bytesRead === null) { + tryClose(rid); + controller.close(); + controller.byobRequest.respond(0); + } else { + controller.byobRequest.respond(bytesRead); + } + } catch (e) { + controller.error(e); + tryClose(rid); + } + }, + cancel() { + tryClose(rid); + }, + autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, + }); + } + + function writableStreamForRid(rid) { + return new WritableStream({ + async write(chunk, controller) { + try { + let nwritten = 0; + while (nwritten < chunk.length) { + nwritten += await write( + rid, + TypedArrayPrototypeSubarray(chunk, nwritten), + ); + } + } catch (e) { + controller.error(e); + tryClose(rid); + } + }, + close() { + tryClose(rid); + }, + abort() { + tryClose(rid); + }, + }); + } + class Conn { #rid = 0; #remoteAddr = null; #localAddr = null; + + #readable; + #writable; + constructor(rid, remoteAddr, localAddr) { this.#rid = rid; this.#remoteAddr = remoteAddr; @@ -104,6 +170,20 @@ setKeepAlive(keepalive = true) { return core.opSync("op_set_keepalive", this.rid, keepalive); } + + get readable() { + if (this.#readable === undefined) { + this.#readable = readableStreamForRid(this.rid); + } + return this.#readable; + } + + get writable() { + if (this.#writable === undefined) { + this.#writable = writableStreamForRid(this.rid); + } + return this.#writable; + } } class Listener { @@ -252,4 +332,8 @@ Datagram, resolveDns, }; + window.__bootstrap.streamUtils = { + readableStreamForRid, + writableStreamForRid, + }; })(this); diff --git a/ext/net/lib.deno_net.d.ts b/ext/net/lib.deno_net.d.ts index ebed8ac87..9ac274e94 100644 --- a/ext/net/lib.deno_net.d.ts +++ b/ext/net/lib.deno_net.d.ts @@ -54,6 +54,9 @@ declare namespace Deno { setNoDelay(nodelay?: boolean): void; /** Enable/disable keep-alive functionality */ setKeepAlive(keepalive?: boolean): void; + + readonly readable: ReadableStream; + readonly writable: WritableStream; } // deno-lint-ignore no-empty-interface -- cgit v1.2.3