summaryrefslogtreecommitdiff
path: root/ext/net/01_net.js
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2022-02-15 13:35:22 +0100
committerGitHub <noreply@github.com>2022-02-15 13:35:22 +0100
commitbdc8006a362b4f95107a25ca816dcdedb7f44e4a (patch)
tree928d7c08e1d16302af03404f37ce84b8d39e4a40 /ext/net/01_net.js
parent7b893bd57f2f013c4a11e1e9f0ba435a3cfc96c0 (diff)
feat(runtime): web streams in fs & net APIs (#13615)
This commit adds `readable` and `writable` properties to `Deno.File` and `Deno.Conn`. This makes it very simple to use files and network sockets with fetch or the native HTTP server.
Diffstat (limited to 'ext/net/01_net.js')
-rw-r--r--ext/net/01_net.js84
1 files changed, 84 insertions, 0 deletions
diff --git a/ext/net/01_net.js b/ext/net/01_net.js
index f135a1655..2970dd817 100644
--- a/ext/net/01_net.js
+++ b/ext/net/01_net.js
@@ -4,6 +4,7 @@
((window) => {
const core = window.Deno.core;
const { BadResourcePrototype, InterruptedPrototype } = core;
+ const { ReadableStream, WritableStream } = window.__bootstrap.streams;
const {
ObjectPrototypeIsPrototypeOf,
PromiseResolve,
@@ -59,10 +60,75 @@
return core.opAsync("op_dns_resolve", { query, recordType, options });
}
+ const DEFAULT_CHUNK_SIZE = 16_640;
+
+ function tryClose(rid) {
+ try {
+ core.close(rid);
+ } catch {
+ // Ignore errors
+ }
+ }
+
+ function readableStreamForRid(rid) {
+ return new ReadableStream({
+ type: "bytes",
+ async pull(controller) {
+ const v = controller.byobRequest.view;
+ try {
+ const bytesRead = await read(rid, v);
+ if (bytesRead === null) {
+ tryClose(rid);
+ controller.close();
+ controller.byobRequest.respond(0);
+ } else {
+ controller.byobRequest.respond(bytesRead);
+ }
+ } catch (e) {
+ controller.error(e);
+ tryClose(rid);
+ }
+ },
+ cancel() {
+ tryClose(rid);
+ },
+ autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
+ });
+ }
+
+ function writableStreamForRid(rid) {
+ return new WritableStream({
+ async write(chunk, controller) {
+ try {
+ let nwritten = 0;
+ while (nwritten < chunk.length) {
+ nwritten += await write(
+ rid,
+ TypedArrayPrototypeSubarray(chunk, nwritten),
+ );
+ }
+ } catch (e) {
+ controller.error(e);
+ tryClose(rid);
+ }
+ },
+ close() {
+ tryClose(rid);
+ },
+ abort() {
+ tryClose(rid);
+ },
+ });
+ }
+
class Conn {
#rid = 0;
#remoteAddr = null;
#localAddr = null;
+
+ #readable;
+ #writable;
+
constructor(rid, remoteAddr, localAddr) {
this.#rid = rid;
this.#remoteAddr = remoteAddr;
@@ -104,6 +170,20 @@
setKeepAlive(keepalive = true) {
return core.opSync("op_set_keepalive", this.rid, keepalive);
}
+
+ get readable() {
+ if (this.#readable === undefined) {
+ this.#readable = readableStreamForRid(this.rid);
+ }
+ return this.#readable;
+ }
+
+ get writable() {
+ if (this.#writable === undefined) {
+ this.#writable = writableStreamForRid(this.rid);
+ }
+ return this.#writable;
+ }
}
class Listener {
@@ -252,4 +332,8 @@
Datagram,
resolveDns,
};
+ window.__bootstrap.streamUtils = {
+ readableStreamForRid,
+ writableStreamForRid,
+ };
})(this);