diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2021-06-29 01:43:03 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-06-29 01:43:03 +0200 |
commit | 38a7128cdd6f3308ba3c13cfb0b0d4ef925a44c3 (patch) | |
tree | 8f0c86028d9ba0266f1846e7f3611f7049cb43a8 /extensions | |
parent | 30cba2484815f712502ae8937a25afa13aba0818 (diff) |
feat: Add "deno_net" extension (#11150)
This commits moves implementation of net related APIs available on "Deno"
namespace to "deno_net" extension.
Following APIs were moved:
- Deno.listen()
- Deno.connect()
- Deno.listenTls()
- Deno.serveHttp()
- Deno.shutdown()
- Deno.resolveDns()
- Deno.listenDatagram()
- Deno.startTls()
- Deno.Conn
- Deno.Listener
- Deno.DatagramConn
Diffstat (limited to 'extensions')
-rw-r--r-- | extensions/net/01_net.js | 234 | ||||
-rw-r--r-- | extensions/net/02_tls.js | 85 | ||||
-rw-r--r-- | extensions/net/03_http.js | 251 | ||||
-rw-r--r-- | extensions/net/04_net_unstable.js | 49 | ||||
-rw-r--r-- | extensions/net/Cargo.toml | 31 | ||||
-rw-r--r-- | extensions/net/README.md | 30 | ||||
-rw-r--r-- | extensions/net/io.rs | 232 | ||||
-rw-r--r-- | extensions/net/lib.deno_net.d.ts | 149 | ||||
-rw-r--r-- | extensions/net/lib.deno_net.unstable.d.ts | 262 | ||||
-rw-r--r-- | extensions/net/lib.rs | 113 | ||||
-rw-r--r-- | extensions/net/ops.rs | 795 | ||||
-rw-r--r-- | extensions/net/ops_http.rs | 577 | ||||
-rw-r--r-- | extensions/net/ops_tls.rs | 1024 | ||||
-rw-r--r-- | extensions/net/ops_unix.rs | 180 | ||||
-rw-r--r-- | extensions/net/resolve_addr.rs | 156 |
15 files changed, 4168 insertions, 0 deletions
diff --git a/extensions/net/01_net.js b/extensions/net/01_net.js new file mode 100644 index 000000000..9a531bd94 --- /dev/null +++ b/extensions/net/01_net.js @@ -0,0 +1,234 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +((window) => { + const core = window.Deno.core; + const { BadResource } = core; + + async function read( + rid, + buffer, + ) { + if (buffer.length === 0) { + return 0; + } + const nread = await core.opAsync("op_net_read_async", rid, buffer); + return nread === 0 ? null : nread; + } + + async function write(rid, data) { + return await core.opAsync("op_net_write_async", rid, data); + } + + function shutdown(rid) { + return core.opAsync("op_net_shutdown", rid); + } + + function opAccept(rid, transport) { + return core.opAsync("op_accept", { rid, transport }); + } + + function opListen(args) { + return core.opSync("op_listen", args); + } + + function opConnect(args) { + return core.opAsync("op_connect", args); + } + + function opReceive(rid, transport, zeroCopy) { + return core.opAsync( + "op_datagram_receive", + { rid, transport }, + zeroCopy, + ); + } + + function opSend(args, zeroCopy) { + return core.opAsync("op_datagram_send", args, zeroCopy); + } + + function resolveDns(query, recordType, options) { + return core.opAsync("op_dns_resolve", { query, recordType, options }); + } + + class Conn { + #rid = 0; + #remoteAddr = null; + #localAddr = null; + constructor(rid, remoteAddr, localAddr) { + this.#rid = rid; + this.#remoteAddr = remoteAddr; + this.#localAddr = localAddr; + } + + get rid() { + return this.#rid; + } + + get remoteAddr() { + return this.#remoteAddr; + } + + get localAddr() { + return this.#localAddr; + } + + write(p) { + return write(this.rid, p); + } + + read(p) { + return read(this.rid, p); + } + + close() { + core.close(this.rid); + } + + closeWrite() { + return shutdown(this.rid); + } + } + + class Listener { + #rid = 0; + #addr = null; + + constructor(rid, addr) { + this.#rid = rid; + this.#addr = addr; + } + + get rid() { + return this.#rid; + } + + get addr() { + return this.#addr; + } + + async accept() { + const res = await opAccept(this.rid, this.addr.transport); + return new Conn(res.rid, res.remoteAddr, res.localAddr); + } + + async next() { + let conn; + try { + conn = await this.accept(); + } catch (error) { + if (error instanceof BadResource) { + return { value: undefined, done: true }; + } + throw error; + } + return { value: conn, done: false }; + } + + return(value) { + this.close(); + return Promise.resolve({ value, done: true }); + } + + close() { + core.close(this.rid); + } + + [Symbol.asyncIterator]() { + return this; + } + } + + class Datagram { + #rid = 0; + #addr = null; + + constructor(rid, addr, bufSize = 1024) { + this.#rid = rid; + this.#addr = addr; + this.bufSize = bufSize; + } + + get rid() { + return this.#rid; + } + + get addr() { + return this.#addr; + } + + async receive(p) { + const buf = p || new Uint8Array(this.bufSize); + const { size, remoteAddr } = await opReceive( + this.rid, + this.addr.transport, + buf, + ); + const sub = buf.subarray(0, size); + return [sub, remoteAddr]; + } + + send(p, addr) { + const remote = { hostname: "127.0.0.1", ...addr }; + + const args = { ...remote, rid: this.rid }; + return opSend(args, p); + } + + close() { + core.close(this.rid); + } + + async *[Symbol.asyncIterator]() { + while (true) { + try { + yield await this.receive(); + } catch (err) { + if (err instanceof BadResource) { + break; + } + throw err; + } + } + } + } + + function listen({ hostname, ...options }) { + const res = opListen({ + transport: "tcp", + hostname: typeof hostname === "undefined" ? "0.0.0.0" : hostname, + ...options, + }); + + return new Listener(res.rid, res.localAddr); + } + + async function connect(options) { + let res; + + if (options.transport === "unix") { + res = await opConnect(options); + } else { + res = await opConnect({ + transport: "tcp", + hostname: "127.0.0.1", + ...options, + }); + } + + return new Conn(res.rid, res.remoteAddr, res.localAddr); + } + + window.__bootstrap.net = { + connect, + Conn, + opConnect, + listen, + opListen, + Listener, + shutdown, + Datagram, + resolveDns, + }; +})(this); diff --git a/extensions/net/02_tls.js b/extensions/net/02_tls.js new file mode 100644 index 000000000..4fafe9079 --- /dev/null +++ b/extensions/net/02_tls.js @@ -0,0 +1,85 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +((window) => { + const core = window.Deno.core; + const { Listener, Conn } = window.__bootstrap.net; + + function opConnectTls( + args, + ) { + return core.opAsync("op_connect_tls", args); + } + + function opAcceptTLS(rid) { + return core.opAsync("op_accept_tls", rid); + } + + function opListenTls(args) { + return core.opSync("op_listen_tls", args); + } + + function opStartTls(args) { + return core.opAsync("op_start_tls", args); + } + + async function connectTls({ + port, + hostname = "127.0.0.1", + transport = "tcp", + certFile = undefined, + }) { + const res = await opConnectTls({ + port, + hostname, + transport, + certFile, + }); + return new Conn(res.rid, res.remoteAddr, res.localAddr); + } + + class TLSListener extends Listener { + async accept() { + const res = await opAcceptTLS(this.rid); + return new Conn(res.rid, res.remoteAddr, res.localAddr); + } + } + + function listenTls({ + port, + certFile, + keyFile, + hostname = "0.0.0.0", + transport = "tcp", + alpnProtocols, + }) { + const res = opListenTls({ + port, + certFile, + keyFile, + hostname, + transport, + alpnProtocols, + }); + return new TLSListener(res.rid, res.localAddr); + } + + async function startTls( + conn, + { hostname = "127.0.0.1", certFile } = {}, + ) { + const res = await opStartTls({ + rid: conn.rid, + hostname, + certFile, + }); + return new Conn(res.rid, res.remoteAddr, res.localAddr); + } + + window.__bootstrap.tls = { + startTls, + listenTls, + connectTls, + TLSListener, + }; +})(this); diff --git a/extensions/net/03_http.js b/extensions/net/03_http.js new file mode 100644 index 000000000..d5054bd1a --- /dev/null +++ b/extensions/net/03_http.js @@ -0,0 +1,251 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +((window) => { + const { InnerBody } = window.__bootstrap.fetchBody; + const { Response, fromInnerRequest, toInnerResponse, newInnerRequest } = + window.__bootstrap.fetch; + const core = window.Deno.core; + const { BadResource, Interrupted } = core; + const { ReadableStream } = window.__bootstrap.streams; + const abortSignal = window.__bootstrap.abortSignal; + + function serveHttp(conn) { + const rid = Deno.core.opSync("op_http_start", conn.rid); + return new HttpConn(rid); + } + + const connErrorSymbol = Symbol("connError"); + + class HttpConn { + #rid = 0; + + constructor(rid) { + this.#rid = rid; + } + + /** @returns {number} */ + get rid() { + return this.#rid; + } + + /** @returns {Promise<ResponseEvent | null>} */ + async nextRequest() { + let nextRequest; + try { + nextRequest = await Deno.core.opAsync( + "op_http_request_next", + this.#rid, + ); + } catch (error) { + // A connection error seen here would cause disrupted responses to throw + // a generic `BadResource` error. Instead store this error and replace + // those with it. + this[connErrorSymbol] = error; + if (error instanceof BadResource) { + return null; + } else if (error instanceof Interrupted) { + return null; + } else if (error.message.includes("connection closed")) { + return null; + } + throw error; + } + if (nextRequest === null) return null; + + const [ + requestBodyRid, + responseSenderRid, + method, + headersList, + url, + ] = nextRequest; + + /** @type {ReadableStream<Uint8Array> | undefined} */ + let body = null; + if (typeof requestBodyRid === "number") { + body = createRequestBodyStream(requestBodyRid); + } + + const innerRequest = newInnerRequest( + method, + url, + headersList, + body !== null ? new InnerBody(body) : null, + ); + const signal = abortSignal.newSignal(); + const request = fromInnerRequest(innerRequest, signal, "immutable"); + + const respondWith = createRespondWith(this, responseSenderRid); + + return { request, respondWith }; + } + + /** @returns {void} */ + close() { + core.close(this.#rid); + } + + [Symbol.asyncIterator]() { + // deno-lint-ignore no-this-alias + const httpConn = this; + return { + async next() { + const reqEvt = await httpConn.nextRequest(); + // Change with caution, current form avoids a v8 deopt + return { value: reqEvt, done: reqEvt === null }; + }, + }; + } + } + + function readRequest(requestRid, zeroCopyBuf) { + return Deno.core.opAsync( + "op_http_request_read", + requestRid, + zeroCopyBuf, + ); + } + + function createRespondWith(httpConn, responseSenderRid) { + return async function respondWith(resp) { + if (resp instanceof Promise) { + resp = await resp; + } + + if (!(resp instanceof Response)) { + throw new TypeError( + "First argument to respondWith must be a Response or a promise resolving to a Response.", + ); + } + + const innerResp = toInnerResponse(resp); + + // If response body length is known, it will be sent synchronously in a + // single op, in other case a "response body" resource will be created and + // we'll be streaming it. + /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */ + let respBody = null; + if (innerResp.body !== null) { + if (innerResp.body.unusable()) throw new TypeError("Body is unusable."); + if (innerResp.body.streamOrStatic instanceof ReadableStream) { + if (innerResp.body.length === null) { + respBody = innerResp.body.stream; + } else { + const reader = innerResp.body.stream.getReader(); + const r1 = await reader.read(); + if (r1.done) { + respBody = new Uint8Array(0); + } else { + respBody = r1.value; + const r2 = await reader.read(); + if (!r2.done) throw new TypeError("Unreachable"); + } + } + } else { + innerResp.body.streamOrStatic.consumed = true; + respBody = innerResp.body.streamOrStatic.body; + } + } else { + respBody = new Uint8Array(0); + } + + let responseBodyRid; + try { + responseBodyRid = await Deno.core.opAsync("op_http_response", [ + responseSenderRid, + innerResp.status ?? 200, + innerResp.headerList, + ], respBody instanceof Uint8Array ? respBody : null); + } catch (error) { + const connError = httpConn[connErrorSymbol]; + if (error instanceof BadResource && connError != null) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); + } + if (respBody !== null && respBody instanceof ReadableStream) { + await respBody.cancel(error); + } + throw error; + } + + // If `respond` returns a responseBodyRid, we should stream the body + // to that resource. + if (responseBodyRid !== null) { + try { + if (respBody === null || !(respBody instanceof ReadableStream)) { + throw new TypeError("Unreachable"); + } + const reader = respBody.getReader(); + while (true) { + const { value, done } = await reader.read(); + if (done) break; + if (!(value instanceof Uint8Array)) { + await reader.cancel(new TypeError("Value not a Uint8Array")); + break; + } + try { + await Deno.core.opAsync( + "op_http_response_write", + responseBodyRid, + value, + ); + } catch (error) { + const connError = httpConn[connErrorSymbol]; + if (error instanceof BadResource && connError != null) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); + } + await reader.cancel(error); + throw error; + } + } + } finally { + // Once all chunks are sent, and the request body is closed, we can + // close the response body. + try { + await Deno.core.opAsync("op_http_response_close", responseBodyRid); + } catch { /* pass */ } + } + } + }; + } + + function createRequestBodyStream(requestBodyRid) { + return new ReadableStream({ + type: "bytes", + async pull(controller) { + try { + // This is the largest possible size for a single packet on a TLS + // stream. + const chunk = new Uint8Array(16 * 1024 + 256); + const read = await readRequest( + requestBodyRid, + chunk, + ); + if (read > 0) { + // We read some data. Enqueue it onto the stream. + controller.enqueue(chunk.subarray(0, read)); + } else { + // We have reached the end of the body, so we close the stream. + controller.close(); + core.close(requestBodyRid); + } + } catch (err) { + // There was an error while reading a chunk of the body, so we + // error. + controller.error(err); + controller.close(); + core.close(requestBodyRid); + } + }, + cancel() { + core.close(requestBodyRid); + }, + }); + } + + window.__bootstrap.http = { + serveHttp, + }; +})(this); diff --git a/extensions/net/04_net_unstable.js b/extensions/net/04_net_unstable.js new file mode 100644 index 000000000..ca265bfaa --- /dev/null +++ b/extensions/net/04_net_unstable.js @@ -0,0 +1,49 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +((window) => { + const net = window.__bootstrap.net; + + function listen(options) { + if (options.transport === "unix") { + const res = net.opListen(options); + return new net.Listener(res.rid, res.localAddr); + } else { + return net.listen(options); + } + } + + function listenDatagram( + options, + ) { + let res; + if (options.transport === "unixpacket") { + res = net.opListen(options); + } else { + res = net.opListen({ + transport: "udp", + hostname: "127.0.0.1", + ...options, + }); + } + + return new net.Datagram(res.rid, res.localAddr); + } + + async function connect( + options, + ) { + if (options.transport === "unix") { + const res = await net.opConnect(options); + return new net.Conn(res.rid, res.remoteAddr, res.localAddr); + } else { + return net.connect(options); + } + } + + window.__bootstrap.netUnstable = { + connect, + listenDatagram, + listen, + }; +})(this); diff --git a/extensions/net/Cargo.toml b/extensions/net/Cargo.toml new file mode 100644 index 000000000..9d5e97bbb --- /dev/null +++ b/extensions/net/Cargo.toml @@ -0,0 +1,31 @@ +# Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +[package] +name = "deno_net" +version = "0.1.0" +edition = "2018" +description = "Networking for Deno" +authors = ["the Deno authors"] +license = "MIT" +readme = "README.md" +repository = "https://github.com/denoland/deno" + +[lib] +path = "lib.rs" + +[dependencies] +deno_core = { version = "0.92.0", path = "../../core" } + +bytes = "1" +log = "0.4.14" +lazy_static = "1.4.0" +http = "0.2.3" +hyper = { version = "0.14.9", features = ["server", "stream", "http1", "http2", "runtime"] } +rustls = "0.19.0" +serde = { version = "1.0.125", features = ["derive"] } +tokio = { version = "1.7.1", features = ["full"] } +tokio-util = { version = "0.6", features = ["io"] } +webpki = "0.21.4" +webpki-roots = "0.21.1" +trust-dns-proto = "0.20.3" +trust-dns-resolver = { version = "0.20.3", features = ["tokio-runtime", "serde-config"] } diff --git a/extensions/net/README.md b/extensions/net/README.md new file mode 100644 index 000000000..cdd8923e1 --- /dev/null +++ b/extensions/net/README.md @@ -0,0 +1,30 @@ +# deno_net + +This crate implements networking APIs. + +This crate depends on following extensions: + +- "deno_web" +- "deno_fetch" + +Following ops are provided: + +- "op_net_read_async" +- "op_net_write_async" +- "op_net_shutdown" +- "op_accept" +- "op_connect" +- "op_listen" +- "op_datagram_receive" +- "op_datagram_send" +- "op_dns_resolve" +- "op_start_tls" +- "op_connect_tls" +- "op_listen_tls" +- "op_accept_tls" +- "op_http_start" +- "op_http_request_next" +- "op_http_request_read" +- "op_http_response" +- "op_http_response_write" +- "op_http_response_close" diff --git a/extensions/net/io.rs b/extensions/net/io.rs new file mode 100644 index 000000000..fc10d7e99 --- /dev/null +++ b/extensions/net/io.rs @@ -0,0 +1,232 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use crate::ops_tls as tls; +use deno_core::error::null_opbuf; +use deno_core::error::AnyError; +use deno_core::error::{bad_resource_id, not_supported}; +use deno_core::op_async; +use deno_core::AsyncMutFuture; +use deno_core::AsyncRefCell; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::OpPair; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::ZeroCopyBuf; +use std::borrow::Cow; +use std::cell::RefCell; +use std::rc::Rc; +use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWrite; +use tokio::io::AsyncWriteExt; +use tokio::net::tcp; + +#[cfg(unix)] +use tokio::net::unix; + +pub fn init() -> Vec<OpPair> { + vec![ + ("op_net_read_async", op_async(op_read_async)), + ("op_net_write_async", op_async(op_write_async)), + ("op_net_shutdown", op_async(op_shutdown)), + ] +} + +/// A full duplex resource has a read and write ends that are completely +/// independent, like TCP/Unix sockets and TLS streams. +#[derive(Debug)] +pub struct FullDuplexResource<R, W> { + rd: AsyncRefCell<R>, + wr: AsyncRefCell<W>, + // When a full-duplex resource is closed, all pending 'read' ops are + // canceled, while 'write' ops are allowed to complete. Therefore only + // 'read' futures should be attached to this cancel handle. + cancel_handle: CancelHandle, +} + +impl<R, W> FullDuplexResource<R, W> +where + R: AsyncRead + Unpin + 'static, + W: AsyncWrite + Unpin + 'static, +{ + pub fn new((rd, wr): (R, W)) -> Self { + Self { + rd: rd.into(), + wr: wr.into(), + cancel_handle: Default::default(), + } + } + + pub fn into_inner(self) -> (R, W) { + (self.rd.into_inner(), self.wr.into_inner()) + } + + pub fn rd_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<R> { + RcRef::map(self, |r| &r.rd).borrow_mut() + } + + pub fn wr_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<W> { + RcRef::map(self, |r| &r.wr).borrow_mut() + } + + pub fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> { + RcRef::map(self, |r| &r.cancel_handle) + } + + pub fn cancel_read_ops(&self) { + self.cancel_handle.cancel() + } + + pub async fn read( + self: &Rc<Self>, + buf: &mut [u8], + ) -> Result<usize, AnyError> { + let mut rd = self.rd_borrow_mut().await; + let nread = rd.read(buf).try_or_cancel(self.cancel_handle()).await?; + Ok(nread) + } + + pub async fn write(self: &Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> { + let mut wr = self.wr_borrow_mut().await; + let nwritten = wr.write(buf).await?; + Ok(nwritten) + } + + pub async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> { + let mut wr = self.wr_borrow_mut().await; + wr.shutdown().await?; + Ok(()) + } +} + +pub type TcpStreamResource = + FullDuplexResource<tcp::OwnedReadHalf, tcp::OwnedWriteHalf>; + +impl Resource for TcpStreamResource { + fn name(&self) -> Cow<str> { + "tcpStream".into() + } + + fn close(self: Rc<Self>) { + self.cancel_read_ops(); + } +} + +pub type TlsStreamResource = FullDuplexResource<tls::ReadHalf, tls::WriteHalf>; + +impl Resource for TlsStreamResource { + fn name(&self) -> Cow<str> { + "tlsStream".into() + } + + fn close(self: Rc<Self>) { + self.cancel_read_ops(); + } +} + +#[cfg(unix)] +pub type UnixStreamResource = + FullDuplexResource<unix::OwnedReadHalf, unix::OwnedWriteHalf>; + +#[cfg(not(unix))] +pub struct UnixStreamResource; + +#[cfg(not(unix))] +impl UnixStreamResource { + pub async fn read( + self: &Rc<Self>, + _buf: &mut [u8], + ) -> Result<usize, AnyError> { + unreachable!() + } + pub async fn write(self: &Rc<Self>, _buf: &[u8]) -> Result<usize, AnyError> { + unreachable!() + } + pub async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> { + unreachable!() + } + pub fn cancel_read_ops(&self) { + unreachable!() + } +} + +impl Resource for UnixStreamResource { + fn name(&self) -> Cow<str> { + "unixStream".into() + } + + fn close(self: Rc<Self>) { + self.cancel_read_ops(); + } +} + +async fn op_read_async( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + buf: Option<ZeroCopyBuf>, +) -> Result<u32, AnyError> { + let buf = &mut buf.ok_or_else(null_opbuf)?; + let resource = state + .borrow() + .resource_table + .get_any(rid) + .ok_or_else(bad_resource_id)?; + let nread = if let Some(s) = resource.downcast_rc::<TcpStreamResource>() { + s.read(buf).await? + } else if let Some(s) = resource.downcast_rc::<TlsStreamResource>() { + s.read(buf).await? + } else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() { + s.read(buf).await? + } else { + return Err(not_supported()); + }; + Ok(nread as u32) +} + +async fn op_write_async( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + buf: Option<ZeroCopyBuf>, +) -> Result<u32, AnyError> { + let buf = &buf.ok_or_else(null_opbuf)?; + let resource = state + .borrow() + .resource_table + .get_any(rid) + .ok_or_else(bad_resource_id)?; + let nwritten = if let Some(s) = resource.downcast_rc::<TcpStreamResource>() { + s.write(buf).await? + } else if let Some(s) = resource.downcast_rc::<TlsStreamResource>() { + s.write(buf).await? + } else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() { + s.write(buf).await? + } else { + return Err(not_supported()); + }; + Ok(nwritten as u32) +} + +async fn op_shutdown( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + _: (), +) -> Result<(), AnyError> { + let resource = state + .borrow() + .resource_table + .get_any(rid) + .ok_or_else(bad_resource_id)?; + if let Some(s) = resource.downcast_rc::<TcpStreamResource>() { + s.shutdown().await?; + } else if let Some(s) = resource.downcast_rc::<TlsStreamResource>() { + s.shutdown().await?; + } else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() { + s.shutdown().await?; + } else { + return Err(not_supported()); + } + Ok(()) +} diff --git a/extensions/net/lib.deno_net.d.ts b/extensions/net/lib.deno_net.d.ts new file mode 100644 index 000000000..25397f960 --- /dev/null +++ b/extensions/net/lib.deno_net.d.ts @@ -0,0 +1,149 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +/// <reference no-default-lib="true" /> +/// <reference lib="esnext" /> + +declare namespace Deno { + export interface NetAddr { + transport: "tcp" | "udp"; + hostname: string; + port: number; + } + + export interface UnixAddr { + transport: "unix" | "unixpacket"; + path: string; + } + + export type Addr = NetAddr | UnixAddr; + + /** A generic network listener for stream-oriented protocols. */ + export interface Listener extends AsyncIterable<Conn> { + /** Waits for and resolves to the next connection to the `Listener`. */ + accept(): Promise<Conn>; + /** Close closes the listener. Any pending accept promises will be rejected + * with errors. */ + close(): void; + /** Return the address of the `Listener`. */ + readonly addr: Addr; + + /** Return the rid of the `Listener`. */ + readonly rid: number; + + [Symbol.asyncIterator](): AsyncIterableIterator<Conn>; + } + + export interface Conn extends Reader, Writer, Closer { + /** The local address of the connection. */ + readonly localAddr: Addr; + /** The remote address of the connection. */ + readonly remoteAddr: Addr; + /** The resource ID of the connection. */ + readonly rid: number; + /** Shuts down (`shutdown(2)`) the write side of the connection. Most + * callers should just use `close()`. */ + closeWrite(): Promise<void>; + } + + export interface ListenOptions { + /** The port to listen on. */ + port: number; + /** A literal IP address or host name that can be resolved to an IP address. + * If not specified, defaults to `0.0.0.0`. */ + hostname?: string; + } + + /** Listen announces on the local transport address. + * + * ```ts + * const listener1 = Deno.listen({ port: 80 }) + * const listener2 = Deno.listen({ hostname: "192.0.2.1", port: 80 }) + * const listener3 = Deno.listen({ hostname: "[2001:db8::1]", port: 80 }); + * const listener4 = Deno.listen({ hostname: "golang.org", port: 80, transport: "tcp" }); + * ``` + * + * Requires `allow-net` permission. */ + export function listen( + options: ListenOptions & { transport?: "tcp" }, + ): Listener; + + export interface ListenTlsOptions extends ListenOptions { + /** Server certificate file. */ + certFile: string; + /** Server public key file. */ + keyFile: string; + + transport?: "tcp"; + } + + /** Listen announces on the local transport address over TLS (transport layer + * security). + * + * ```ts + * const lstnr = Deno.listenTls({ port: 443, certFile: "./server.crt", keyFile: "./server.key" }); + * ``` + * + * Requires `allow-net` permission. */ + export function listenTls(options: ListenTlsOptions): Listener; + + export interface ConnectOptions { + /** The port to connect to. */ + port: number; + /** A literal IP address or host name that can be resolved to an IP address. + * If not specified, defaults to `127.0.0.1`. */ + hostname?: string; + transport?: "tcp"; + } + + /** + * Connects to the hostname (default is "127.0.0.1") and port on the named + * transport (default is "tcp"), and resolves to the connection (`Conn`). + * + * ```ts + * const conn1 = await Deno.connect({ port: 80 }); + * const conn2 = await Deno.connect({ hostname: "192.0.2.1", port: 80 }); + * const conn3 = await Deno.connect({ hostname: "[2001:db8::1]", port: 80 }); + * const conn4 = await Deno.connect({ hostname: "golang.org", port: 80, transport: "tcp" }); + * ``` + * + * Requires `allow-net` permission for "tcp". */ + export function connect(options: ConnectOptions): Promise<Conn>; + + export interface ConnectTlsOptions { + /** The port to connect to. */ + port: number; + /** A literal IP address or host name that can be resolved to an IP address. + * If not specified, defaults to `127.0.0.1`. */ + hostname?: string; + /** Server certificate file. */ + certFile?: string; + } + + /** Establishes a secure connection over TLS (transport layer security) using + * an optional cert file, hostname (default is "127.0.0.1") and port. The + * cert file is optional and if not included Mozilla's root certificates will + * be used (see also https://github.com/ctz/webpki-roots for specifics) + * + * ```ts + * const conn1 = await Deno.connectTls({ port: 80 }); + * const conn2 = await Deno.connectTls({ certFile: "./certs/my_custom_root_CA.pem", hostname: "192.0.2.1", port: 80 }); + * const conn3 = await Deno.connectTls({ hostname: "[2001:db8::1]", port: 80 }); + * const conn4 = await Deno.connectTls({ certFile: "./certs/my_custom_root_CA.pem", hostname: "golang.org", port: 80}); + * ``` + * + * Requires `allow-net` permission. + */ + export function connectTls(options: ConnectTlsOptions): Promise<Conn>; + + /** Shutdown socket send operations. + * + * Matches behavior of POSIX shutdown(3). + * + * ```ts + * const listener = Deno.listen({ port: 80 }); + * const conn = await listener.accept(); + * Deno.shutdown(conn.rid); + * ``` + */ + export function shutdown(rid: number): Promise<void>; +} diff --git a/extensions/net/lib.deno_net.unstable.d.ts b/extensions/net/lib.deno_net.unstable.d.ts new file mode 100644 index 000000000..905a7acc1 --- /dev/null +++ b/extensions/net/lib.deno_net.unstable.d.ts @@ -0,0 +1,262 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +/// <reference no-default-lib="true" /> +/// <reference lib="esnext" /> + +declare namespace Deno { + /** The type of the resource record. + * Only the listed types are supported currently. */ + export type RecordType = + | "A" + | "AAAA" + | "ANAME" + | "CNAME" + | "MX" + | "PTR" + | "SRV" + | "TXT"; + + export interface ResolveDnsOptions { + /** The name server to be used for lookups. + * If not specified, defaults to the system configuration e.g. `/etc/resolv.conf` on Unix. */ + nameServer?: { + /** The IP address of the name server */ + ipAddr: string; + /** The port number the query will be sent to. + * If not specified, defaults to 53. */ + port?: number; + }; + } + + /** If `resolveDns` is called with "MX" record type specified, it will return an array of this interface. */ + export interface MXRecord { + preference: number; + exchange: string; + } + + /** If `resolveDns` is called with "SRV" record type specified, it will return an array of this interface. */ + export interface SRVRecord { + priority: number; + weight: number; + port: number; + target: string; + } + + export function resolveDns( + query: string, + recordType: "A" | "AAAA" | "ANAME" | "CNAME" | "PTR", + options?: ResolveDnsOptions, + ): Promise<string[]>; + + export function resolveDns( + query: string, + recordType: "MX", + options?: ResolveDnsOptions, + ): Promise<MXRecord[]>; + + export function resolveDns( + query: string, + recordType: "SRV", + options?: ResolveDnsOptions, + ): Promise<SRVRecord[]>; + + export function resolveDns( + query: string, + recordType: "TXT", + options?: ResolveDnsOptions, + ): Promise<string[][]>; + + /** ** UNSTABLE**: new API, yet to be vetted. +* +* Performs DNS resolution against the given query, returning resolved records. +* Fails in the cases such as: +* - the query is in invalid format +* - the options have an invalid parameter, e.g. `nameServer.port` is beyond the range of 16-bit unsigned integer +* - timed out +* +* ```ts +* const a = await Deno.resolveDns("example.com", "A"); +* +* const aaaa = await Deno.resolveDns("example.com", "AAAA", { +* nameServer: { ipAddr: "8.8.8.8", port: 1234 }, +* }); +* ``` +* +* Requires `allow-net` permission. + */ + export function resolveDns( + query: string, + recordType: RecordType, + options?: ResolveDnsOptions, + ): Promise<string[] | MXRecord[] | SRVRecord[] | string[][]>; + + /** **UNSTABLE**: new API, yet to be vetted. +* +* A generic transport listener for message-oriented protocols. */ + export interface DatagramConn extends AsyncIterable<[Uint8Array, Addr]> { + /** **UNSTABLE**: new API, yet to be vetted. + * + * Waits for and resolves to the next message to the `UDPConn`. */ + receive(p?: Uint8Array): Promise<[Uint8Array, Addr]>; + /** UNSTABLE: new API, yet to be vetted. + * + * Sends a message to the target. */ + send(p: Uint8Array, addr: Addr): Promise<number>; + /** UNSTABLE: new API, yet to be vetted. + * + * Close closes the socket. Any pending message promises will be rejected + * with errors. */ + close(): void; + /** Return the address of the `UDPConn`. */ + readonly addr: Addr; + [Symbol.asyncIterator](): AsyncIterableIterator<[Uint8Array, Addr]>; + } + + export interface UnixListenOptions { + /** A Path to the Unix Socket. */ + path: string; + } + + /** **UNSTABLE**: new API, yet to be vetted. +* +* Listen announces on the local transport address. +* +* ```ts +* const listener = Deno.listen({ path: "/foo/bar.sock", transport: "unix" }) +* ``` +* +* Requires `allow-read` and `allow-write` permission. */ + export function listen( + options: UnixListenOptions & { transport: "unix" }, + ): Listener; + + /** **UNSTABLE**: new API, yet to be vetted +* +* Listen announces on the local transport address. +* +* ```ts +* const listener1 = Deno.listenDatagram({ +* port: 80, +* transport: "udp" +* }); +* const listener2 = Deno.listenDatagram({ +* hostname: "golang.org", +* port: 80, +* transport: "udp" +* }); +* ``` +* +* Requires `allow-net` permission. */ + export function listenDatagram( + options: ListenOptions & { transport: "udp" }, + ): DatagramConn; + + /** **UNSTABLE**: new API, yet to be vetted +* +* Listen announces on the local transport address. +* +* ```ts +* const listener = Deno.listenDatagram({ +* path: "/foo/bar.sock", +* transport: "unixpacket" +* }); +* ``` +* +* Requires `allow-read` and `allow-write` permission. */ + export function listenDatagram( + options: UnixListenOptions & { transport: "unixpacket" }, + ): DatagramConn; + + export interface UnixConnectOptions { + transport: "unix"; + path: string; + } + + /** **UNSTABLE**: The unix socket transport is unstable as a new API yet to +* be vetted. The TCP transport is considered stable. +* +* Connects to the hostname (default is "127.0.0.1") and port on the named +* transport (default is "tcp"), and resolves to the connection (`Conn`). +* +* ```ts +* const conn1 = await Deno.connect({ port: 80 }); +* const conn2 = await Deno.connect({ hostname: "192.0.2.1", port: 80 }); +* const conn3 = await Deno.connect({ hostname: "[2001:db8::1]", port: 80 }); +* const conn4 = await Deno.connect({ hostname: "golang.org", port: 80, transport: "tcp" }); +* const conn5 = await Deno.connect({ path: "/foo/bar.sock", transport: "unix" }); +* ``` +* +* Requires `allow-net` permission for "tcp" and `allow-read` for "unix". */ + export function connect( + options: ConnectOptions | UnixConnectOptions, + ): Promise<Conn>; + + export interface StartTlsOptions { + /** A literal IP address or host name that can be resolved to an IP address. + * If not specified, defaults to `127.0.0.1`. */ + hostname?: string; + /** Server certificate file. */ + certFile?: string; + } + + /** **UNSTABLE**: new API, yet to be vetted. +* +* Start TLS handshake from an existing connection using +* an optional cert file, hostname (default is "127.0.0.1"). The +* cert file is optional and if not included Mozilla's root certificates will +* be used (see also https://github.com/ctz/webpki-roots for specifics) +* Using this function requires that the other end of the connection is +* prepared for TLS handshake. +* +* ```ts +* const conn = await Deno.connect({ port: 80, hostname: "127.0.0.1" }); +* const tlsConn = await Deno.startTls(conn, { certFile: "./certs/my_custom_root_CA.pem", hostname: "localhost" }); +* ``` +* +* Requires `allow-net` permission. + */ + export function startTls( + conn: Conn, + options?: StartTlsOptions, + ): Promise<Conn>; + + export interface ListenTlsOptions { + /** **UNSTABLE**: new API, yet to be vetted. + * + * Application-Layer Protocol Negotiation (ALPN) protocols to announce to + * the client. If not specified, no ALPN extension will be included in the + * TLS handshake. + */ + alpnProtocols?: string[]; + } + + export interface RequestEvent { + readonly request: Request; + respondWith(r: Response | Promise<Response>): Promise<void>; + } + + export interface HttpConn extends AsyncIterable<RequestEvent> { + readonly rid: number; + + nextRequest(): Promise<RequestEvent | null>; + close(): void; + } + + /** **UNSTABLE**: new API, yet to be vetted. + * + * Services HTTP requests given a TCP or TLS socket. + * + * ```ts + * const conn = await Deno.connect({ port: 80, hostname: "127.0.0.1" }); + * const httpConn = Deno.serveHttp(conn); + * const e = await httpConn.nextRequest(); + * if (e) { + * e.respondWith(new Response("Hello World")); + * } + * ``` + * + * If `httpConn.nextRequest()` encounters an error or returns `null` + * then the underlying HttpConn resource is closed automatically. + */ + export function serveHttp(conn: Conn): HttpConn; +} diff --git a/extensions/net/lib.rs b/extensions/net/lib.rs new file mode 100644 index 000000000..d1e836fce --- /dev/null +++ b/extensions/net/lib.rs @@ -0,0 +1,113 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +pub mod io; +pub mod ops; +pub mod ops_http; +pub mod ops_tls; +#[cfg(unix)] +pub mod ops_unix; +pub mod resolve_addr; + +use deno_core::error::AnyError; +use deno_core::include_js_files; +use deno_core::Extension; +use deno_core::OpState; +use std::cell::RefCell; +use std::path::Path; +use std::path::PathBuf; +use std::rc::Rc; + +pub trait NetPermissions { + fn check_net<T: AsRef<str>>( + &mut self, + _host: &(T, Option<u16>), + ) -> Result<(), AnyError>; + fn check_read(&mut self, _p: &Path) -> Result<(), AnyError>; + fn check_write(&mut self, _p: &Path) -> Result<(), AnyError>; +} + +/// For use with this crate when the user does not want permission checks. +pub struct NoNetPermissions; + +impl NetPermissions for NoNetPermissions { + fn check_net<T: AsRef<str>>( + &mut self, + _host: &(T, Option<u16>), + ) -> Result<(), AnyError> { + Ok(()) + } + + fn check_read(&mut self, _p: &Path) -> Result<(), AnyError> { + Ok(()) + } + + fn check_write(&mut self, _p: &Path) -> Result<(), AnyError> { + Ok(()) + } +} + +/// `UnstableChecker` is a struct so it can be placed inside `GothamState`; +/// using type alias for a bool could work, but there's a high chance +/// that there might be another type alias pointing to a bool, which +/// would override previously used alias. +pub struct UnstableChecker { + pub unstable: bool, +} + +impl UnstableChecker { + /// Quits the process if the --unstable flag was not provided. + /// + /// This is intentionally a non-recoverable check so that people cannot probe + /// for unstable APIs from stable programs. + // NOTE(bartlomieju): keep in sync with `cli/program_state.rs` + pub fn check_unstable(&self, api_name: &str) { + if !self.unstable { + eprintln!( + "Unstable API '{}'. The --unstable flag must be provided.", + api_name + ); + std::process::exit(70); + } + } +} +/// Helper for checking unstable features. Used for sync ops. +pub fn check_unstable(state: &OpState, api_name: &str) { + state.borrow::<UnstableChecker>().check_unstable(api_name) +} + +/// Helper for checking unstable features. Used for async ops. +pub fn check_unstable2(state: &Rc<RefCell<OpState>>, api_name: &str) { + let state = state.borrow(); + state.borrow::<UnstableChecker>().check_unstable(api_name) +} + +pub fn get_declaration() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_net.d.ts") +} + +pub fn get_unstable_declaration() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_net.unstable.d.ts") +} + +pub fn init<P: NetPermissions + 'static>(unstable: bool) -> Extension { + let mut ops_to_register = vec![]; + ops_to_register.extend(io::init()); + ops_to_register.extend(ops::init::<P>()); + ops_to_register.extend(ops_tls::init::<P>()); + ops_to_register.extend(ops_http::init()); + + Extension::builder() + .js(include_js_files!( + prefix "deno:extensions/net", + "01_net.js", + "02_tls.js", + "03_http.js", + "04_net_unstable.js", + )) + .ops(ops_to_register) + .state(move |state| { + state.put(UnstableChecker { unstable }); + Ok(()) + }) + .build() +} diff --git a/extensions/net/ops.rs b/extensions/net/ops.rs new file mode 100644 index 000000000..a02bbf91a --- /dev/null +++ b/extensions/net/ops.rs @@ -0,0 +1,795 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use crate::io::TcpStreamResource; +use crate::resolve_addr::resolve_addr; +use crate::resolve_addr::resolve_addr_sync; +use crate::NetPermissions; +use deno_core::error::bad_resource; +use deno_core::error::custom_error; +use deno_core::error::generic_error; +use deno_core::error::null_opbuf; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::op_async; +use deno_core::op_sync; +use deno_core::AsyncRefCell; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::OpPair; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::ZeroCopyBuf; +use log::debug; +use serde::Deserialize; +use serde::Serialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::net::SocketAddr; +use std::rc::Rc; +use tokio::net::TcpListener; +use tokio::net::TcpStream; +use tokio::net::UdpSocket; +use trust_dns_proto::rr::record_data::RData; +use trust_dns_proto::rr::record_type::RecordType; +use trust_dns_resolver::config::NameServerConfigGroup; +use trust_dns_resolver::config::ResolverConfig; +use trust_dns_resolver::config::ResolverOpts; +use trust_dns_resolver::system_conf; +use trust_dns_resolver::AsyncResolver; + +#[cfg(unix)] +use super::ops_unix as net_unix; +#[cfg(unix)] +use crate::io::UnixStreamResource; +#[cfg(unix)] +use std::path::Path; + +pub fn init<P: NetPermissions + 'static>() -> Vec<OpPair> { + vec![ + ("op_accept", op_async(op_accept)), + ("op_connect", op_async(op_connect::<P>)), + ("op_listen", op_sync(op_listen::<P>)), + ("op_datagram_receive", op_async(op_datagram_receive)), + ("op_datagram_send", op_async(op_datagram_send::<P>)), + ("op_dns_resolve", op_async(op_dns_resolve::<P>)), + ] +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct OpConn { + pub rid: ResourceId, + pub remote_addr: Option<OpAddr>, + pub local_addr: Option<OpAddr>, +} + +#[derive(Serialize)] +#[serde(tag = "transport", rename_all = "lowercase")] +pub enum OpAddr { + Tcp(IpAddr), + Udp(IpAddr), + #[cfg(unix)] + Unix(net_unix::UnixAddr), + #[cfg(unix)] + UnixPacket(net_unix::UnixAddr), +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +/// A received datagram packet (from udp or unixpacket) +pub struct OpPacket { + pub size: usize, + pub remote_addr: OpAddr, +} + +#[derive(Serialize)] +pub struct IpAddr { + pub hostname: String, + pub port: u16, +} + +#[derive(Deserialize)] +pub(crate) struct AcceptArgs { + pub rid: ResourceId, + pub transport: String, +} + +async fn accept_tcp( + state: Rc<RefCell<OpState>>, + args: AcceptArgs, + _: (), +) -> Result<OpConn, AnyError> { + let rid = args.rid; + + let resource = state + .borrow() + .resource_table + .get::<TcpListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + let listener = RcRef::map(&resource, |r| &r.listener) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (tcp_stream, _socket_addr) = + listener.accept().try_or_cancel(cancel).await.map_err(|e| { + // FIXME(bartlomieju): compatibility with current JS implementation + if let std::io::ErrorKind::Interrupted = e.kind() { + bad_resource("Listener has been closed") + } else { + e.into() + } + })?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + + let mut state = state.borrow_mut(); + let rid = state + .resource_table + .add(TcpStreamResource::new(tcp_stream.into_split())); + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) +} + +async fn op_accept( + state: Rc<RefCell<OpState>>, + args: AcceptArgs, + _: (), +) -> Result<OpConn, AnyError> { + match args.transport.as_str() { + "tcp" => accept_tcp(state, args, ()).await, + #[cfg(unix)] + "unix" => net_unix::accept_unix(state, args, ()).await, + other => Err(bad_transport(other)), + } +} + +fn bad_transport(transport: &str) -> AnyError { + generic_error(format!("Unsupported transport protocol {}", transport)) +} + +#[derive(Deserialize)] +pub(crate) struct ReceiveArgs { + pub rid: ResourceId, + pub transport: String, +} + +async fn receive_udp( + state: Rc<RefCell<OpState>>, + args: ReceiveArgs, + zero_copy: Option<ZeroCopyBuf>, +) -> Result<OpPacket, AnyError> { + let zero_copy = zero_copy.ok_or_else(null_opbuf)?; + let mut zero_copy = zero_copy.clone(); + + let rid = args.rid; + + let resource = state + .borrow_mut() + .resource_table + .get::<UdpSocketResource>(rid) + .ok_or_else(|| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + let cancel_handle = RcRef::map(&resource, |r| &r.cancel); + let (size, remote_addr) = socket + .recv_from(&mut zero_copy) + .try_or_cancel(cancel_handle) + .await?; + Ok(OpPacket { + size, + remote_addr: OpAddr::Udp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + }), + }) +} + +async fn op_datagram_receive( + state: Rc<RefCell<OpState>>, + args: ReceiveArgs, + zero_copy: Option<ZeroCopyBuf>, +) -> Result<OpPacket, AnyError> { + match args.transport.as_str() { + "udp" => receive_udp(state, args, zero_copy).await, + #[cfg(unix)] + "unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await, + other => Err(bad_transport(other)), + } +} + +#[derive(Deserialize)] +struct SendArgs { + rid: ResourceId, + transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, +} + +async fn op_datagram_send<NP>( + state: Rc<RefCell<OpState>>, + args: SendArgs, + zero_copy: Option<ZeroCopyBuf>, +) -> Result<usize, AnyError> +where + NP: NetPermissions + 'static, +{ + let zero_copy = zero_copy.ok_or_else(null_opbuf)?; + let zero_copy = zero_copy.clone(); + + match args { + SendArgs { + rid, + transport, + transport_args: ArgsEnum::Ip(args), + } if transport == "udp" => { + { + let mut s = state.borrow_mut(); + s.borrow_mut::<NP>() + .check_net(&(&args.hostname, Some(args.port)))?; + } + let addr = resolve_addr(&args.hostname, args.port) + .await? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + + let resource = state + .borrow_mut() + .resource_table + .get::<UdpSocketResource>(rid) + .ok_or_else(|| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + let byte_length = socket.send_to(&zero_copy, &addr).await?; + Ok(byte_length) + } + #[cfg(unix)] + SendArgs { + rid, + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unixpacket" => { + let address_path = Path::new(&args.path); + { + let mut s = state.borrow_mut(); + s.borrow_mut::<NP>().check_write(&address_path)?; + } + let resource = state + .borrow() + .resource_table + .get::<net_unix::UnixDatagramResource>(rid) + .ok_or_else(|| { + custom_error("NotConnected", "Socket has been closed") + })?; + let socket = RcRef::map(&resource, |r| &r.socket) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; + let byte_length = socket.send_to(&zero_copy, address_path).await?; + Ok(byte_length) + } + _ => Err(type_error("Wrong argument format!")), + } +} + +#[derive(Deserialize)] +struct ConnectArgs { + transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, +} + +async fn op_connect<NP>( + state: Rc<RefCell<OpState>>, + args: ConnectArgs, + _: (), +) -> Result<OpConn, AnyError> +where + NP: NetPermissions + 'static, +{ + match args { + ConnectArgs { + transport, + transport_args: ArgsEnum::Ip(args), + } if transport == "tcp" => { + { + let mut state_ = state.borrow_mut(); + state_ + .borrow_mut::<NP>() + .check_net(&(&args.hostname, Some(args.port)))?; + } + let addr = resolve_addr(&args.hostname, args.port) + .await? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + let tcp_stream = TcpStream::connect(&addr).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + + let mut state_ = state.borrow_mut(); + let rid = state_ + .resource_table + .add(TcpStreamResource::new(tcp_stream.into_split())); + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) + } + #[cfg(unix)] + ConnectArgs { + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unix" => { + let address_path = Path::new(&args.path); + super::check_unstable2(&state, "Deno.connect"); + { + let mut state_ = state.borrow_mut(); + state_.borrow_mut::<NP>().check_read(&address_path)?; + state_.borrow_mut::<NP>().check_write(&address_path)?; + } + let path = args.path; + let unix_stream = net_unix::UnixStream::connect(Path::new(&path)).await?; + let local_addr = unix_stream.local_addr()?; + let remote_addr = unix_stream.peer_addr()?; + + let mut state_ = state.borrow_mut(); + let resource = UnixStreamResource::new(unix_stream.into_split()); + let rid = state_.resource_table.add(resource); + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Unix(net_unix::UnixAddr { + path: local_addr.as_pathname().and_then(net_unix::pathstring), + })), + remote_addr: Some(OpAddr::Unix(net_unix::UnixAddr { + path: remote_addr.as_pathname().and_then(net_unix::pathstring), + })), + }) + } + _ => Err(type_error("Wrong argument format!")), + } +} + +pub struct TcpListenerResource { + pub listener: AsyncRefCell<TcpListener>, + pub cancel: CancelHandle, +} + +impl Resource for TcpListenerResource { + fn name(&self) -> Cow<str> { + "tcpListener".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } +} + +struct UdpSocketResource { + socket: AsyncRefCell<UdpSocket>, + cancel: CancelHandle, +} + +impl Resource for UdpSocketResource { + fn name(&self) -> Cow<str> { + "udpSocket".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel() + } +} + +#[derive(Deserialize)] +struct IpListenArgs { + hostname: String, + port: u16, +} + +#[derive(Deserialize)] +#[serde(untagged)] +enum ArgsEnum { + Ip(IpListenArgs), + #[cfg(unix)] + Unix(net_unix::UnixListenArgs), +} + +#[derive(Deserialize)] +struct ListenArgs { + transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, +} + +fn listen_tcp( + state: &mut OpState, + addr: SocketAddr, +) -> Result<(u32, SocketAddr), AnyError> { + let std_listener = std::net::TcpListener::bind(&addr)?; + std_listener.set_nonblocking(true)?; + let listener = TcpListener::from_std(std_listener)?; + let local_addr = listener.local_addr()?; + let listener_resource = TcpListenerResource { + listener: AsyncRefCell::new(listener), + cancel: Default::default(), + }; + let rid = state.resource_table.add(listener_resource); + + Ok((rid, local_addr)) +} + +fn listen_udp( + state: &mut OpState, + addr: SocketAddr, +) -> Result<(u32, SocketAddr), AnyError> { + let std_socket = std::net::UdpSocket::bind(&addr)?; + std_socket.set_nonblocking(true)?; + let socket = UdpSocket::from_std(std_socket)?; + let local_addr = socket.local_addr()?; + let socket_resource = UdpSocketResource { + socket: AsyncRefCell::new(socket), + cancel: Default::default(), + }; + let rid = state.resource_table.add(socket_resource); + + Ok((rid, local_addr)) +} + +fn op_listen<NP>( + state: &mut OpState, + args: ListenArgs, + _: (), +) -> Result<OpConn, AnyError> +where + NP: NetPermissions + 'static, +{ + match args { + ListenArgs { + transport, + transport_args: ArgsEnum::Ip(args), + } => { + { + if transport == "udp" { + super::check_unstable(state, "Deno.listenDatagram"); + } + state + .borrow_mut::<NP>() + .check_net(&(&args.hostname, Some(args.port)))?; + } + let addr = resolve_addr_sync(&args.hostname, args.port)? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + let (rid, local_addr) = if transport == "tcp" { + listen_tcp(state, addr)? + } else { + listen_udp(state, addr)? + }; + debug!( + "New listener {} {}:{}", + rid, + local_addr.ip().to_string(), + local_addr.port() + ); + let ip_addr = IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + }; + Ok(OpConn { + rid, + local_addr: Some(match transport.as_str() { + "udp" => OpAddr::Udp(ip_addr), + "tcp" => OpAddr::Tcp(ip_addr), + // NOTE: This could be unreachable!() + other => return Err(bad_transport(other)), + }), + remote_addr: None, + }) + } + #[cfg(unix)] + ListenArgs { + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unix" || transport == "unixpacket" => { + let address_path = Path::new(&args.path); + { + if transport == "unix" { + super::check_unstable(state, "Deno.listen"); + } + if transport == "unixpacket" { + super::check_unstable(state, "Deno.listenDatagram"); + } + let permissions = state.borrow_mut::<NP>(); + permissions.check_read(&address_path)?; + permissions.check_write(&address_path)?; + } + let (rid, local_addr) = if transport == "unix" { + net_unix::listen_unix(state, &address_path)? + } else { + net_unix::listen_unix_packet(state, &address_path)? + }; + debug!( + "New listener {} {}", + rid, + local_addr.as_pathname().unwrap().display(), + ); + let unix_addr = net_unix::UnixAddr { + path: local_addr.as_pathname().and_then(net_unix::pathstring), + }; + + Ok(OpConn { + rid, + local_addr: Some(match transport.as_str() { + "unix" => OpAddr::Unix(unix_addr), + "unixpacket" => OpAddr::UnixPacket(unix_addr), + other => return Err(bad_transport(other)), + }), + remote_addr: None, + }) + } + #[cfg(unix)] + _ => Err(type_error("Wrong argument format!")), + } +} + +#[derive(Serialize, PartialEq, Debug)] +#[serde(untagged)] +enum DnsReturnRecord { + A(String), + Aaaa(String), + Aname(String), + Cname(String), + Mx { + preference: u16, + exchange: String, + }, + Ptr(String), + Srv { + priority: u16, + weight: u16, + port: u16, + target: String, + }, + Txt(Vec<String>), +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ResolveAddrArgs { + query: String, + record_type: RecordType, + options: Option<ResolveDnsOption>, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ResolveDnsOption { + name_server: Option<NameServer>, +} + +fn default_port() -> u16 { + 53 +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NameServer { + ip_addr: String, + #[serde(default = "default_port")] + port: u16, +} + +async fn op_dns_resolve<NP>( + state: Rc<RefCell<OpState>>, + args: ResolveAddrArgs, + _: (), +) -> Result<Vec<DnsReturnRecord>, AnyError> +where + NP: NetPermissions + 'static, +{ + let ResolveAddrArgs { + query, + record_type, + options, + } = args; + + let (config, opts) = if let Some(name_server) = + options.as_ref().and_then(|o| o.name_server.as_ref()) + { + let group = NameServerConfigGroup::from_ips_clear( + &[name_server.ip_addr.parse()?], + name_server.port, + true, + ); + ( + ResolverConfig::from_parts(None, vec![], group), + ResolverOpts::default(), + ) + } else { + system_conf::read_system_conf()? + }; + + { + let mut s = state.borrow_mut(); + let perm = s.borrow_mut::<NP>(); + + // Checks permission against the name servers which will be actually queried. + for ns in config.name_servers() { + let socker_addr = &ns.socket_addr; + let ip = socker_addr.ip().to_string(); + let port = socker_addr.port(); + perm.check_net(&(ip, Some(port)))?; + } + } + + let resolver = AsyncResolver::tokio(config, opts)?; + + let results = resolver + .lookup(query, record_type, Default::default()) + .await + .map_err(|e| generic_error(format!("{}", e)))? + .iter() + .filter_map(rdata_to_return_record(record_type)) + .collect(); + + Ok(results) +} + +fn rdata_to_return_record( + ty: RecordType, +) -> impl Fn(&RData) -> Option<DnsReturnRecord> { + use RecordType::*; + move |r: &RData| -> Option<DnsReturnRecord> { + match ty { + A => r.as_a().map(ToString::to_string).map(DnsReturnRecord::A), + AAAA => r + .as_aaaa() + .map(ToString::to_string) + .map(DnsReturnRecord::Aaaa), + ANAME => r + .as_aname() + .map(ToString::to_string) + .map(DnsReturnRecord::Aname), + CNAME => r + .as_cname() + .map(ToString::to_string) + .map(DnsReturnRecord::Cname), + MX => r.as_mx().map(|mx| DnsReturnRecord::Mx { + preference: mx.preference(), + exchange: mx.exchange().to_string(), + }), + PTR => r + .as_ptr() + .map(ToString::to_string) + .map(DnsReturnRecord::Ptr), + SRV => r.as_srv().map(|srv| DnsReturnRecord::Srv { + priority: srv.priority(), + weight: srv.weight(), + port: srv.port(), + target: srv.target().to_string(), + }), + TXT => r.as_txt().map(|txt| { + let texts: Vec<String> = txt + .iter() + .map(|bytes| { + // Tries to parse these bytes as Latin-1 + bytes.iter().map(|&b| b as char).collect::<String>() + }) + .collect(); + DnsReturnRecord::Txt(texts) + }), + // TODO(magurotuna): Other record types are not supported + _ => todo!(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::Ipv4Addr; + use std::net::Ipv6Addr; + use trust_dns_proto::rr::rdata::mx::MX; + use trust_dns_proto::rr::rdata::srv::SRV; + use trust_dns_proto::rr::rdata::txt::TXT; + use trust_dns_proto::rr::record_data::RData; + use trust_dns_proto::rr::Name; + + #[test] + fn rdata_to_return_record_a() { + let func = rdata_to_return_record(RecordType::A); + let rdata = RData::A(Ipv4Addr::new(127, 0, 0, 1)); + assert_eq!( + func(&rdata), + Some(DnsReturnRecord::A("127.0.0.1".to_string())) + ); + } + + #[test] + fn rdata_to_return_record_aaaa() { + let func = rdata_to_return_record(RecordType::AAAA); + let rdata = RData::AAAA(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)); + assert_eq!(func(&rdata), Some(DnsReturnRecord::Aaaa("::1".to_string()))); + } + + #[test] + fn rdata_to_return_record_aname() { + let func = rdata_to_return_record(RecordType::ANAME); + let rdata = RData::ANAME(Name::new()); + assert_eq!(func(&rdata), Some(DnsReturnRecord::Aname("".to_string()))); + } + + #[test] + fn rdata_to_return_record_cname() { + let func = rdata_to_return_record(RecordType::CNAME); + let rdata = RData::CNAME(Name::new()); + assert_eq!(func(&rdata), Some(DnsReturnRecord::Cname("".to_string()))); + } + + #[test] + fn rdata_to_return_record_mx() { + let func = rdata_to_return_record(RecordType::MX); + let rdata = RData::MX(MX::new(10, Name::new())); + assert_eq!( + func(&rdata), + Some(DnsReturnRecord::Mx { + preference: 10, + exchange: "".to_string() + }) + ); + } + + #[test] + fn rdata_to_return_record_ptr() { + let func = rdata_to_return_record(RecordType::PTR); + let rdata = RData::PTR(Name::new()); + assert_eq!(func(&rdata), Some(DnsReturnRecord::Ptr("".to_string()))); + } + + #[test] + fn rdata_to_return_record_srv() { + let func = rdata_to_return_record(RecordType::SRV); + let rdata = RData::SRV(SRV::new(1, 2, 3, Name::new())); + assert_eq!( + func(&rdata), + Some(DnsReturnRecord::Srv { + priority: 1, + weight: 2, + port: 3, + target: "".to_string() + }) + ); + } + + #[test] + fn rdata_to_return_record_txt() { + let func = rdata_to_return_record(RecordType::TXT); + let rdata = RData::TXT(TXT::from_bytes(vec![ + "foo".as_bytes(), + "bar".as_bytes(), + &[0xa3], // "ÂŁ" in Latin-1 + &[0xe3, 0x81, 0x82], // "ă" in UTF-8 + ])); + assert_eq!( + func(&rdata), + Some(DnsReturnRecord::Txt(vec![ + "foo".to_string(), + "bar".to_string(), + "ÂŁ".to_string(), + "ĂŁ\u{81}\u{82}".to_string(), + ])) + ); + } +} diff --git a/extensions/net/ops_http.rs b/extensions/net/ops_http.rs new file mode 100644 index 000000000..54e06c3a7 --- /dev/null +++ b/extensions/net/ops_http.rs @@ -0,0 +1,577 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use crate::io::TcpStreamResource; +use crate::io::TlsStreamResource; +use crate::ops_tls::TlsStream; +use deno_core::error::bad_resource_id; +use deno_core::error::null_opbuf; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::futures::future::poll_fn; +use deno_core::futures::FutureExt; +use deno_core::futures::Stream; +use deno_core::futures::StreamExt; +use deno_core::op_async; +use deno_core::op_sync; +use deno_core::AsyncRefCell; +use deno_core::ByteString; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::OpPair; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::ZeroCopyBuf; +use hyper::body::HttpBody; +use hyper::http; +use hyper::server::conn::Connection; +use hyper::server::conn::Http; +use hyper::service::Service as HyperService; +use hyper::Body; +use hyper::Request; +use hyper::Response; +use serde::Deserialize; +use serde::Serialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::future::Future; +use std::net::SocketAddr; +use std::pin::Pin; +use std::rc::Rc; +use std::task::Context; +use std::task::Poll; +use tokio::io::AsyncReadExt; +use tokio::net::TcpStream; +use tokio::sync::oneshot; +use tokio_util::io::StreamReader; + +pub fn init() -> Vec<OpPair> { + vec![ + ("op_http_start", op_sync(op_http_start)), + ("op_http_request_next", op_async(op_http_request_next)), + ("op_http_request_read", op_async(op_http_request_read)), + ("op_http_response", op_async(op_http_response)), + ("op_http_response_write", op_async(op_http_response_write)), + ("op_http_response_close", op_async(op_http_response_close)), + ] +} + +struct ServiceInner { + request: Request<Body>, + response_tx: oneshot::Sender<Response<Body>>, +} + +#[derive(Clone, Default)] +struct Service { + inner: Rc<RefCell<Option<ServiceInner>>>, + waker: Rc<deno_core::futures::task::AtomicWaker>, +} + +impl HyperService<Request<Body>> for Service { + type Response = Response<Body>; + type Error = http::Error; + #[allow(clippy::type_complexity)] + type Future = + Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>; + + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> { + if self.inner.borrow().is_some() { + Poll::Pending + } else { + Poll::Ready(Ok(())) + } + } + + fn call(&mut self, req: Request<Body>) -> Self::Future { + let (resp_tx, resp_rx) = oneshot::channel(); + self.inner.borrow_mut().replace(ServiceInner { + request: req, + response_tx: resp_tx, + }); + + async move { Ok(resp_rx.await.unwrap()) }.boxed_local() + } +} + +enum ConnType { + Tcp(Rc<RefCell<Connection<TcpStream, Service, LocalExecutor>>>), + Tls(Rc<RefCell<Connection<TlsStream, Service, LocalExecutor>>>), +} + +struct ConnResource { + hyper_connection: ConnType, + deno_service: Service, + addr: SocketAddr, + cancel: CancelHandle, +} + +impl ConnResource { + // TODO(ry) impl Future for ConnResource? + fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> { + match &self.hyper_connection { + ConnType::Tcp(c) => c.borrow_mut().poll_unpin(cx), + ConnType::Tls(c) => c.borrow_mut().poll_unpin(cx), + } + .map_err(AnyError::from) + } +} + +impl Resource for ConnResource { + fn name(&self) -> Cow<str> { + "httpConnection".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel() + } +} + +// We use a tuple instead of struct to avoid serialization overhead of the keys. +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct NextRequestResponse( + // request_body_rid: + Option<ResourceId>, + // response_sender_rid: + ResourceId, + // method: + // This is a String rather than a ByteString because reqwest will only return + // the method as a str which is guaranteed to be ASCII-only. + String, + // headers: + Vec<(ByteString, ByteString)>, + // url: + String, +); + +async fn op_http_request_next( + state: Rc<RefCell<OpState>>, + conn_rid: ResourceId, + _: (), +) -> Result<Option<NextRequestResponse>, AnyError> { + let conn_resource = state + .borrow() + .resource_table + .get::<ConnResource>(conn_rid) + .ok_or_else(bad_resource_id)?; + + let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel); + + poll_fn(|cx| { + conn_resource.deno_service.waker.register(cx.waker()); + let connection_closed = match conn_resource.poll(cx) { + Poll::Pending => false, + Poll::Ready(Ok(())) => { + // try to close ConnResource, but don't unwrap as it might + // already be closed + let _ = state + .borrow_mut() + .resource_table + .take::<ConnResource>(conn_rid); + true + } + Poll::Ready(Err(e)) => { + // TODO(ry) close RequestResource associated with connection + // TODO(ry) close ResponseBodyResource associated with connection + // close ConnResource + state + .borrow_mut() + .resource_table + .take::<ConnResource>(conn_rid) + .unwrap(); + + if should_ignore_error(&e) { + true + } else { + return Poll::Ready(Err(e)); + } + } + }; + if let Some(request_resource) = + conn_resource.deno_service.inner.borrow_mut().take() + { + let tx = request_resource.response_tx; + let req = request_resource.request; + let method = req.method().to_string(); + + let mut headers = Vec::with_capacity(req.headers().len()); + for (name, value) in req.headers().iter() { + let name: &[u8] = name.as_ref(); + let value = value.as_bytes(); + headers + .push((ByteString(name.to_owned()), ByteString(value.to_owned()))); + } + + let url = { + let scheme = { + match conn_resource.hyper_connection { + ConnType::Tcp(_) => "http", + ConnType::Tls(_) => "https", + } + }; + let host: Cow<str> = if let Some(host) = req.uri().host() { + Cow::Borrowed(host) + } else if let Some(host) = req.headers().get("HOST") { + Cow::Borrowed(host.to_str()?) + } else { + Cow::Owned(conn_resource.addr.to_string()) + }; + let path = req.uri().path_and_query().map_or("/", |p| p.as_str()); + format!("{}://{}{}", scheme, host, path) + }; + + let has_body = if let Some(exact_size) = req.size_hint().exact() { + exact_size > 0 + } else { + true + }; + + let maybe_request_body_rid = if has_body { + let stream: BytesStream = Box::pin(req.into_body().map(|r| { + r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + })); + let stream_reader = StreamReader::new(stream); + let mut state = state.borrow_mut(); + let request_body_rid = state.resource_table.add(RequestBodyResource { + conn_rid, + reader: AsyncRefCell::new(stream_reader), + cancel: CancelHandle::default(), + }); + Some(request_body_rid) + } else { + None + }; + + let mut state = state.borrow_mut(); + let response_sender_rid = + state.resource_table.add(ResponseSenderResource { + sender: tx, + conn_rid, + }); + + Poll::Ready(Ok(Some(NextRequestResponse( + maybe_request_body_rid, + response_sender_rid, + method, + headers, + url, + )))) + } else if connection_closed { + Poll::Ready(Ok(None)) + } else { + Poll::Pending + } + }) + .try_or_cancel(cancel) + .await + .map_err(AnyError::from) +} + +fn should_ignore_error(e: &AnyError) -> bool { + if let Some(e) = e.downcast_ref::<hyper::Error>() { + use std::error::Error; + if let Some(std_err) = e.source() { + if let Some(io_err) = std_err.downcast_ref::<std::io::Error>() { + if io_err.kind() == std::io::ErrorKind::NotConnected { + return true; + } + } + } + } + false +} + +fn op_http_start( + state: &mut OpState, + tcp_stream_rid: ResourceId, + _: (), +) -> Result<ResourceId, AnyError> { + let deno_service = Service::default(); + + if let Some(resource_rc) = state + .resource_table + .take::<TcpStreamResource>(tcp_stream_rid) + { + let resource = Rc::try_unwrap(resource_rc) + .expect("Only a single use of this resource should happen"); + let (read_half, write_half) = resource.into_inner(); + let tcp_stream = read_half.reunite(write_half)?; + let addr = tcp_stream.local_addr()?; + let hyper_connection = Http::new() + .with_executor(LocalExecutor) + .serve_connection(tcp_stream, deno_service.clone()); + let conn_resource = ConnResource { + hyper_connection: ConnType::Tcp(Rc::new(RefCell::new(hyper_connection))), + deno_service, + addr, + cancel: CancelHandle::default(), + }; + let rid = state.resource_table.add(conn_resource); + return Ok(rid); + } + + if let Some(resource_rc) = state + .resource_table + .take::<TlsStreamResource>(tcp_stream_rid) + { + let resource = Rc::try_unwrap(resource_rc) + .expect("Only a single use of this resource should happen"); + let (read_half, write_half) = resource.into_inner(); + let tls_stream = read_half.reunite(write_half); + let addr = tls_stream.get_ref().0.local_addr()?; + + let hyper_connection = Http::new() + .with_executor(LocalExecutor) + .serve_connection(tls_stream, deno_service.clone()); + let conn_resource = ConnResource { + hyper_connection: ConnType::Tls(Rc::new(RefCell::new(hyper_connection))), + deno_service, + addr, + cancel: CancelHandle::default(), + }; + let rid = state.resource_table.add(conn_resource); + return Ok(rid); + } + + Err(bad_resource_id()) +} + +// We use a tuple instead of struct to avoid serialization overhead of the keys. +#[derive(Deserialize)] +struct RespondArgs( + // rid: + u32, + // status: + u16, + // headers: + Vec<(ByteString, ByteString)>, +); + +async fn op_http_response( + state: Rc<RefCell<OpState>>, + args: RespondArgs, + data: Option<ZeroCopyBuf>, +) -> Result<Option<ResourceId>, AnyError> { + let RespondArgs(rid, status, headers) = args; + + let response_sender = state + .borrow_mut() + .resource_table + .take::<ResponseSenderResource>(rid) + .ok_or_else(bad_resource_id)?; + let response_sender = Rc::try_unwrap(response_sender) + .ok() + .expect("multiple op_http_respond ongoing"); + + let conn_resource = state + .borrow() + .resource_table + .get::<ConnResource>(response_sender.conn_rid) + .ok_or_else(bad_resource_id)?; + + let mut builder = Response::builder().status(status); + + builder.headers_mut().unwrap().reserve(headers.len()); + for (key, value) in &headers { + builder = builder.header(key.as_ref(), value.as_ref()); + } + + let res; + let maybe_response_body_rid = if let Some(d) = data { + // If a body is passed, we use it, and don't return a body for streaming. + res = builder.body(Vec::from(&*d).into())?; + None + } else { + // If no body is passed, we return a writer for streaming the body. + let (sender, body) = Body::channel(); + res = builder.body(body)?; + + let response_body_rid = + state.borrow_mut().resource_table.add(ResponseBodyResource { + body: AsyncRefCell::new(sender), + conn_rid: response_sender.conn_rid, + }); + + Some(response_body_rid) + }; + + // oneshot::Sender::send(v) returns |v| on error, not an error object. + // The only failure mode is the receiver already having dropped its end + // of the channel. + if response_sender.sender.send(res).is_err() { + return Err(type_error("internal communication error")); + } + + poll_fn(|cx| match conn_resource.poll(cx) { + Poll::Ready(x) => Poll::Ready(x), + Poll::Pending => Poll::Ready(Ok(())), + }) + .await?; + + if maybe_response_body_rid.is_none() { + conn_resource.deno_service.waker.wake(); + } + Ok(maybe_response_body_rid) +} + +async fn op_http_response_close( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + _: (), +) -> Result<(), AnyError> { + let resource = state + .borrow_mut() + .resource_table + .take::<ResponseBodyResource>(rid) + .ok_or_else(bad_resource_id)?; + + let conn_resource = state + .borrow() + .resource_table + .get::<ConnResource>(resource.conn_rid) + .ok_or_else(bad_resource_id)?; + drop(resource); + + let r = poll_fn(|cx| match conn_resource.poll(cx) { + Poll::Ready(x) => Poll::Ready(x), + Poll::Pending => Poll::Ready(Ok(())), + }) + .await; + conn_resource.deno_service.waker.wake(); + r +} + +async fn op_http_request_read( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + data: Option<ZeroCopyBuf>, +) -> Result<usize, AnyError> { + let mut data = data.ok_or_else(null_opbuf)?; + + let resource = state + .borrow() + .resource_table + .get::<RequestBodyResource>(rid as u32) + .ok_or_else(bad_resource_id)?; + + let conn_resource = state + .borrow() + .resource_table + .get::<ConnResource>(resource.conn_rid) + .ok_or_else(bad_resource_id)?; + + let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await; + let cancel = RcRef::map(resource, |r| &r.cancel); + let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local(); + + poll_fn(|cx| { + if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { + // close ConnResource + // close RequestResource associated with connection + // close ResponseBodyResource associated with connection + return Poll::Ready(Err(e)); + } + + read_fut.poll_unpin(cx).map_err(AnyError::from) + }) + .await +} + +async fn op_http_response_write( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + data: Option<ZeroCopyBuf>, +) -> Result<(), AnyError> { + let buf = data.ok_or_else(null_opbuf)?; + let resource = state + .borrow() + .resource_table + .get::<ResponseBodyResource>(rid as u32) + .ok_or_else(bad_resource_id)?; + + let conn_resource = state + .borrow() + .resource_table + .get::<ConnResource>(resource.conn_rid) + .ok_or_else(bad_resource_id)?; + + let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; + + let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local(); + + poll_fn(|cx| { + let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from); + + // Poll connection so the data is flushed + if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { + // close ConnResource + // close RequestResource associated with connection + // close ResponseBodyResource associated with connection + return Poll::Ready(Err(e)); + } + + r + }) + .await?; + + Ok(()) +} + +type BytesStream = + Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>; + +struct RequestBodyResource { + conn_rid: ResourceId, + reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>, + cancel: CancelHandle, +} + +impl Resource for RequestBodyResource { + fn name(&self) -> Cow<str> { + "requestBody".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel() + } +} + +struct ResponseSenderResource { + sender: oneshot::Sender<Response<Body>>, + conn_rid: ResourceId, +} + +impl Resource for ResponseSenderResource { + fn name(&self) -> Cow<str> { + "responseSender".into() + } +} + +struct ResponseBodyResource { + body: AsyncRefCell<hyper::body::Sender>, + conn_rid: ResourceId, +} + +impl Resource for ResponseBodyResource { + fn name(&self) -> Cow<str> { + "responseBody".into() + } +} + +// Needed so hyper can use non Send futures +#[derive(Clone)] +struct LocalExecutor; + +impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor +where + Fut: Future + 'static, + Fut::Output: 'static, +{ + fn execute(&self, fut: Fut) { + tokio::task::spawn_local(fut); + } +} diff --git a/extensions/net/ops_tls.rs b/extensions/net/ops_tls.rs new file mode 100644 index 000000000..701c5d1a1 --- /dev/null +++ b/extensions/net/ops_tls.rs @@ -0,0 +1,1024 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +pub use rustls; +pub use webpki; + +use crate::io::TcpStreamResource; +use crate::io::TlsStreamResource; +use crate::ops::IpAddr; +use crate::ops::OpAddr; +use crate::ops::OpConn; +use crate::resolve_addr::resolve_addr; +use crate::resolve_addr::resolve_addr_sync; +use crate::NetPermissions; +use deno_core::error::bad_resource; +use deno_core::error::bad_resource_id; +use deno_core::error::custom_error; +use deno_core::error::generic_error; +use deno_core::error::invalid_hostname; +use deno_core::error::AnyError; +use deno_core::futures::future::poll_fn; +use deno_core::futures::ready; +use deno_core::futures::task::noop_waker_ref; +use deno_core::futures::task::AtomicWaker; +use deno_core::futures::task::Context; +use deno_core::futures::task::Poll; +use deno_core::futures::task::RawWaker; +use deno_core::futures::task::RawWakerVTable; +use deno_core::futures::task::Waker; +use deno_core::op_async; +use deno_core::op_sync; +use deno_core::AsyncRefCell; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::OpPair; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use io::Error; +use io::Read; +use io::Write; +use rustls::internal::pemfile::certs; +use rustls::internal::pemfile::pkcs8_private_keys; +use rustls::internal::pemfile::rsa_private_keys; +use rustls::Certificate; +use rustls::ClientConfig; +use rustls::ClientSession; +use rustls::NoClientAuth; +use rustls::PrivateKey; +use rustls::ServerConfig; +use rustls::ServerSession; +use rustls::Session; +use rustls::StoresClientSessions; +use serde::Deserialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::collections::HashMap; +use std::convert::From; +use std::fs::File; +use std::io; +use std::io::BufReader; +use std::io::ErrorKind; +use std::ops::Deref; +use std::ops::DerefMut; +use std::path::Path; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::Weak; +use tokio::io::AsyncRead; +use tokio::io::AsyncWrite; +use tokio::io::ReadBuf; +use tokio::net::TcpListener; +use tokio::net::TcpStream; +use tokio::task::spawn_local; +use webpki::DNSNameRef; + +lazy_static::lazy_static! { + static ref CLIENT_SESSION_MEMORY_CACHE: Arc<ClientSessionMemoryCache> = + Arc::new(ClientSessionMemoryCache::default()); +} + +#[derive(Default)] +struct ClientSessionMemoryCache(Mutex<HashMap<Vec<u8>, Vec<u8>>>); + +impl StoresClientSessions for ClientSessionMemoryCache { + fn get(&self, key: &[u8]) -> Option<Vec<u8>> { + self.0.lock().unwrap().get(key).cloned() + } + + fn put(&self, key: Vec<u8>, value: Vec<u8>) -> bool { + let mut sessions = self.0.lock().unwrap(); + // TODO(bnoordhuis) Evict sessions LRU-style instead of arbitrarily. + while sessions.len() >= 1024 { + let key = sessions.keys().next().unwrap().clone(); + sessions.remove(&key); + } + sessions.insert(key, value); + true + } +} + +#[derive(Debug)] +enum TlsSession { + Client(ClientSession), + Server(ServerSession), +} + +impl Deref for TlsSession { + type Target = dyn Session; + + fn deref(&self) -> &Self::Target { + match self { + TlsSession::Client(client_session) => client_session, + TlsSession::Server(server_session) => server_session, + } + } +} + +impl DerefMut for TlsSession { + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + TlsSession::Client(client_session) => client_session, + TlsSession::Server(server_session) => server_session, + } + } +} + +impl From<ClientSession> for TlsSession { + fn from(client_session: ClientSession) -> Self { + TlsSession::Client(client_session) + } +} + +impl From<ServerSession> for TlsSession { + fn from(server_session: ServerSession) -> Self { + TlsSession::Server(server_session) + } +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +enum Flow { + Read, + Write, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +enum State { + StreamOpen, + StreamClosed, + TlsClosing, + TlsClosed, + TcpClosed, +} + +#[derive(Debug)] +pub struct TlsStream(Option<TlsStreamInner>); + +impl TlsStream { + fn new(tcp: TcpStream, tls: TlsSession) -> Self { + let inner = TlsStreamInner { + tcp, + tls, + rd_state: State::StreamOpen, + wr_state: State::StreamOpen, + }; + Self(Some(inner)) + } + + pub fn new_client_side( + tcp: TcpStream, + tls_config: &Arc<ClientConfig>, + hostname: DNSNameRef, + ) -> Self { + let tls = TlsSession::Client(ClientSession::new(tls_config, hostname)); + Self::new(tcp, tls) + } + + pub fn new_server_side( + tcp: TcpStream, + tls_config: &Arc<ServerConfig>, + ) -> Self { + let tls = TlsSession::Server(ServerSession::new(tls_config)); + Self::new(tcp, tls) + } + + pub async fn handshake(&mut self) -> io::Result<()> { + poll_fn(|cx| self.inner_mut().poll_io(cx, Flow::Write)).await + } + + fn into_split(self) -> (ReadHalf, WriteHalf) { + let shared = Shared::new(self); + let rd = ReadHalf { + shared: shared.clone(), + }; + let wr = WriteHalf { shared }; + (rd, wr) + } + + /// Tokio-rustls compatibility: returns a reference to the underlying TCP + /// stream, and a reference to the Rustls `Session` object. + pub fn get_ref(&self) -> (&TcpStream, &dyn Session) { + let inner = self.0.as_ref().unwrap(); + (&inner.tcp, &*inner.tls) + } + + fn inner_mut(&mut self) -> &mut TlsStreamInner { + self.0.as_mut().unwrap() + } +} + +impl AsyncRead for TlsStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + self.inner_mut().poll_read(cx, buf) + } +} + +impl AsyncWrite for TlsStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.inner_mut().poll_write(cx, buf) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<()>> { + self.inner_mut().poll_io(cx, Flow::Write) + // The underlying TCP stream does not need to be flushed. + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<()>> { + self.inner_mut().poll_shutdown(cx) + } +} + +impl Drop for TlsStream { + fn drop(&mut self) { + let mut inner = self.0.take().unwrap(); + + let mut cx = Context::from_waker(noop_waker_ref()); + let use_linger_task = inner.poll_close(&mut cx).is_pending(); + + if use_linger_task { + spawn_local(poll_fn(move |cx| inner.poll_close(cx))); + } else if cfg!(debug_assertions) { + spawn_local(async {}); // Spawn dummy task to detect missing LocalSet. + } + } +} + +#[derive(Debug)] +pub struct TlsStreamInner { + tls: TlsSession, + tcp: TcpStream, + rd_state: State, + wr_state: State, +} + +impl TlsStreamInner { + fn poll_io( + &mut self, + cx: &mut Context<'_>, + flow: Flow, + ) -> Poll<io::Result<()>> { + loop { + let wr_ready = loop { + match self.wr_state { + _ if self.tls.is_handshaking() && !self.tls.wants_write() => { + break true; + } + _ if self.tls.is_handshaking() => {} + State::StreamOpen if !self.tls.wants_write() => break true, + State::StreamClosed => { + // Rustls will enqueue the 'CloseNotify' alert and send it after + // flusing the data that is already in the queue. + self.tls.send_close_notify(); + self.wr_state = State::TlsClosing; + continue; + } + State::TlsClosing if !self.tls.wants_write() => { + self.wr_state = State::TlsClosed; + continue; + } + // If a 'CloseNotify' alert sent by the remote end has been received, + // shut down the underlying TCP socket. Otherwise, consider polling + // done for the moment. + State::TlsClosed if self.rd_state < State::TlsClosed => break true, + State::TlsClosed + if Pin::new(&mut self.tcp).poll_shutdown(cx)?.is_pending() => + { + break false; + } + State::TlsClosed => { + self.wr_state = State::TcpClosed; + continue; + } + State::TcpClosed => break true, + _ => {} + } + + // Poll whether there is space in the socket send buffer so we can flush + // the remaining outgoing ciphertext. + if self.tcp.poll_write_ready(cx)?.is_pending() { + break false; + } + + // Write ciphertext to the TCP socket. + let mut wrapped_tcp = ImplementWriteTrait(&mut self.tcp); + match self.tls.write_tls(&mut wrapped_tcp) { + Ok(0) => unreachable!(), + Ok(_) => {} + Err(err) if err.kind() == ErrorKind::WouldBlock => {} + Err(err) => return Poll::Ready(Err(err)), + } + }; + + let rd_ready = loop { + match self.rd_state { + State::TcpClosed if self.tls.is_handshaking() => { + let err = Error::new(ErrorKind::UnexpectedEof, "tls handshake eof"); + return Poll::Ready(Err(err)); + } + _ if self.tls.is_handshaking() && !self.tls.wants_read() => { + break true; + } + _ if self.tls.is_handshaking() => {} + State::StreamOpen if !self.tls.wants_read() => break true, + State::StreamOpen => {} + State::StreamClosed if !self.tls.wants_read() => { + // Rustls has more incoming cleartext buffered up, but the TLS + // session is closing so this data will never be processed by the + // application layer. Just like what would happen if this were a raw + // TCP stream, don't gracefully end the TLS session, but abort it. + return Poll::Ready(Err(Error::from(ErrorKind::ConnectionReset))); + } + State::StreamClosed => {} + State::TlsClosed if self.wr_state == State::TcpClosed => { + // Wait for the remote end to gracefully close the TCP connection. + // TODO(piscisaureus): this is unnecessary; remove when stable. + } + _ => break true, + } + + if self.rd_state < State::TlsClosed { + // Do a zero-length plaintext read so we can detect the arrival of + // 'CloseNotify' messages, even if only the write half is open. + // Actually reading data from the socket is done in `poll_read()`. + match self.tls.read(&mut []) { + Ok(0) => {} + Err(err) if err.kind() == ErrorKind::ConnectionAborted => { + // `Session::read()` returns `ConnectionAborted` when a + // 'CloseNotify' alert has been received, which indicates that + // the remote peer wants to gracefully end the TLS session. + self.rd_state = State::TlsClosed; + continue; + } + Err(err) => return Poll::Ready(Err(err)), + _ => unreachable!(), + } + } + + // Poll whether more ciphertext is available in the socket receive + // buffer. + if self.tcp.poll_read_ready(cx)?.is_pending() { + break false; + } + + // Receive ciphertext from the socket. + let mut wrapped_tcp = ImplementReadTrait(&mut self.tcp); + match self.tls.read_tls(&mut wrapped_tcp) { + Ok(0) => self.rd_state = State::TcpClosed, + Ok(_) => self + .tls + .process_new_packets() + .map_err(|err| Error::new(ErrorKind::InvalidData, err))?, + Err(err) if err.kind() == ErrorKind::WouldBlock => {} + Err(err) => return Poll::Ready(Err(err)), + } + }; + + if wr_ready { + if self.rd_state >= State::TlsClosed + && self.wr_state >= State::TlsClosed + && self.wr_state < State::TcpClosed + { + continue; + } + if self.tls.wants_write() { + continue; + } + } + + let io_ready = match flow { + _ if self.tls.is_handshaking() => false, + Flow::Read => rd_ready, + Flow::Write => wr_ready, + }; + return match io_ready { + false => Poll::Pending, + true => Poll::Ready(Ok(())), + }; + } + } + + fn poll_read( + &mut self, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + ready!(self.poll_io(cx, Flow::Read))?; + + if self.rd_state == State::StreamOpen { + let buf_slice = + unsafe { &mut *(buf.unfilled_mut() as *mut [_] as *mut [u8]) }; + let bytes_read = self.tls.read(buf_slice)?; + assert_ne!(bytes_read, 0); + unsafe { buf.assume_init(bytes_read) }; + buf.advance(bytes_read); + } + + Poll::Ready(Ok(())) + } + + fn poll_write( + &mut self, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + if buf.is_empty() { + // Tokio-rustls compatibility: a zero byte write always succeeds. + Poll::Ready(Ok(0)) + } else if self.wr_state == State::StreamOpen { + // Flush Rustls' ciphertext send queue. + ready!(self.poll_io(cx, Flow::Write))?; + + // Copy data from `buf` to the Rustls cleartext send queue. + let bytes_written = self.tls.write(buf)?; + assert_ne!(bytes_written, 0); + + // Try to flush as much ciphertext as possible. However, since we just + // handed off at least some bytes to rustls, so we can't return + // `Poll::Pending()` any more: this would tell the caller that it should + // try to send those bytes again. + let _ = self.poll_io(cx, Flow::Write)?; + + Poll::Ready(Ok(bytes_written)) + } else { + // Return error if stream has been shut down for writing. + Poll::Ready(Err(ErrorKind::BrokenPipe.into())) + } + } + + fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + if self.wr_state == State::StreamOpen { + self.wr_state = State::StreamClosed; + } + + ready!(self.poll_io(cx, Flow::Write))?; + + // At minimum, a TLS 'CloseNotify' alert should have been sent. + assert!(self.wr_state >= State::TlsClosed); + // If we received a TLS 'CloseNotify' alert from the remote end + // already, the TCP socket should be shut down at this point. + assert!( + self.rd_state < State::TlsClosed || self.wr_state == State::TcpClosed + ); + + Poll::Ready(Ok(())) + } + + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + if self.rd_state == State::StreamOpen { + self.rd_state = State::StreamClosed; + } + + // Send TLS 'CloseNotify' alert. + ready!(self.poll_shutdown(cx))?; + // Wait for 'CloseNotify', shut down TCP stream, wait for TCP FIN packet. + ready!(self.poll_io(cx, Flow::Read))?; + + assert_eq!(self.rd_state, State::TcpClosed); + assert_eq!(self.wr_state, State::TcpClosed); + + Poll::Ready(Ok(())) + } +} + +#[derive(Debug)] +pub struct ReadHalf { + shared: Arc<Shared>, +} + +impl ReadHalf { + pub fn reunite(self, wr: WriteHalf) -> TlsStream { + assert!(Arc::ptr_eq(&self.shared, &wr.shared)); + drop(wr); // Drop `wr`, so only one strong reference to `shared` remains. + + Arc::try_unwrap(self.shared) + .unwrap_or_else(|_| panic!("Arc::<Shared>::try_unwrap() failed")) + .tls_stream + .into_inner() + .unwrap() + } +} + +impl AsyncRead for ReadHalf { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + self + .shared + .poll_with_shared_waker(cx, Flow::Read, move |tls, cx| { + tls.poll_read(cx, buf) + }) + } +} + +#[derive(Debug)] +pub struct WriteHalf { + shared: Arc<Shared>, +} + +impl AsyncWrite for WriteHalf { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self + .shared + .poll_with_shared_waker(cx, Flow::Write, move |tls, cx| { + tls.poll_write(cx, buf) + }) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<()>> { + self + .shared + .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_flush(cx)) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<()>> { + self + .shared + .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_shutdown(cx)) + } +} + +#[derive(Debug)] +struct Shared { + tls_stream: Mutex<TlsStream>, + rd_waker: AtomicWaker, + wr_waker: AtomicWaker, +} + +impl Shared { + fn new(tls_stream: TlsStream) -> Arc<Self> { + let self_ = Self { + tls_stream: Mutex::new(tls_stream), + rd_waker: AtomicWaker::new(), + wr_waker: AtomicWaker::new(), + }; + Arc::new(self_) + } + + fn poll_with_shared_waker<R>( + self: &Arc<Self>, + cx: &mut Context<'_>, + flow: Flow, + mut f: impl FnMut(Pin<&mut TlsStream>, &mut Context<'_>) -> R, + ) -> R { + match flow { + Flow::Read => self.rd_waker.register(cx.waker()), + Flow::Write => self.wr_waker.register(cx.waker()), + } + + let shared_waker = self.new_shared_waker(); + let mut cx = Context::from_waker(&shared_waker); + + let mut tls_stream = self.tls_stream.lock().unwrap(); + f(Pin::new(&mut tls_stream), &mut cx) + } + + const SHARED_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( + Self::clone_shared_waker, + Self::wake_shared_waker, + Self::wake_shared_waker_by_ref, + Self::drop_shared_waker, + ); + + fn new_shared_waker(self: &Arc<Self>) -> Waker { + let self_weak = Arc::downgrade(self); + let self_ptr = self_weak.into_raw() as *const (); + let raw_waker = RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE); + unsafe { Waker::from_raw(raw_waker) } + } + + fn clone_shared_waker(self_ptr: *const ()) -> RawWaker { + let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) }; + let ptr1 = self_weak.clone().into_raw(); + let ptr2 = self_weak.into_raw(); + assert!(ptr1 == ptr2); + RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE) + } + + fn wake_shared_waker(self_ptr: *const ()) { + Self::wake_shared_waker_by_ref(self_ptr); + Self::drop_shared_waker(self_ptr); + } + + fn wake_shared_waker_by_ref(self_ptr: *const ()) { + let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) }; + if let Some(self_arc) = Weak::upgrade(&self_weak) { + self_arc.rd_waker.wake(); + self_arc.wr_waker.wake(); + } + self_weak.into_raw(); + } + + fn drop_shared_waker(self_ptr: *const ()) { + let _ = unsafe { Weak::from_raw(self_ptr as *const Self) }; + } +} + +struct ImplementReadTrait<'a, T>(&'a mut T); + +impl Read for ImplementReadTrait<'_, TcpStream> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.0.try_read(buf) + } +} + +struct ImplementWriteTrait<'a, T>(&'a mut T); + +impl Write for ImplementWriteTrait<'_, TcpStream> { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.0.try_write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +pub fn init<P: NetPermissions + 'static>() -> Vec<OpPair> { + vec![ + ("op_start_tls", op_async(op_start_tls::<P>)), + ("op_connect_tls", op_async(op_connect_tls::<P>)), + ("op_listen_tls", op_sync(op_listen_tls::<P>)), + ("op_accept_tls", op_async(op_accept_tls)), + ] +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ConnectTlsArgs { + transport: String, + hostname: String, + port: u16, + cert_file: Option<String>, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct StartTlsArgs { + rid: ResourceId, + cert_file: Option<String>, + hostname: String, +} + +async fn op_start_tls<NP>( + state: Rc<RefCell<OpState>>, + args: StartTlsArgs, + _: (), +) -> Result<OpConn, AnyError> +where + NP: NetPermissions + 'static, +{ + let rid = args.rid; + let hostname = match &*args.hostname { + "" => "localhost", + n => n, + }; + let cert_file = args.cert_file.as_deref(); + + { + super::check_unstable2(&state, "Deno.startTls"); + let mut s = state.borrow_mut(); + let permissions = s.borrow_mut::<NP>(); + permissions.check_net(&(hostname, Some(0)))?; + if let Some(path) = cert_file { + permissions.check_read(Path::new(path))?; + } + } + + let hostname_dns = DNSNameRef::try_from_ascii_str(hostname) + .map_err(|_| invalid_hostname(hostname))?; + + let resource_rc = state + .borrow_mut() + .resource_table + .take::<TcpStreamResource>(rid) + .ok_or_else(bad_resource_id)?; + let resource = Rc::try_unwrap(resource_rc) + .expect("Only a single use of this resource should happen"); + let (read_half, write_half) = resource.into_inner(); + let tcp_stream = read_half.reunite(write_half)?; + + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + + let mut tls_config = ClientConfig::new(); + tls_config.set_persistence(CLIENT_SESSION_MEMORY_CACHE.clone()); + tls_config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + if let Some(path) = cert_file { + let key_file = File::open(path)?; + let reader = &mut BufReader::new(key_file); + tls_config.root_store.add_pem_file(reader).unwrap(); + } + let tls_config = Arc::new(tls_config); + + let tls_stream = + TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns); + + let rid = { + let mut state_ = state.borrow_mut(); + state_ + .resource_table + .add(TlsStreamResource::new(tls_stream.into_split())) + }; + + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) +} + +async fn op_connect_tls<NP>( + state: Rc<RefCell<OpState>>, + args: ConnectTlsArgs, + _: (), +) -> Result<OpConn, AnyError> +where + NP: NetPermissions + 'static, +{ + assert_eq!(args.transport, "tcp"); + let hostname = match &*args.hostname { + "" => "localhost", + n => n, + }; + let port = args.port; + let cert_file = args.cert_file.as_deref(); + + { + let mut s = state.borrow_mut(); + let permissions = s.borrow_mut::<NP>(); + permissions.check_net(&(hostname, Some(port)))?; + if let Some(path) = cert_file { + permissions.check_read(Path::new(path))?; + } + } + + let hostname_dns = DNSNameRef::try_from_ascii_str(hostname) + .map_err(|_| invalid_hostname(hostname))?; + + let connect_addr = resolve_addr(hostname, port) + .await? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + let tcp_stream = TcpStream::connect(connect_addr).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + + let mut tls_config = ClientConfig::new(); + tls_config.set_persistence(CLIENT_SESSION_MEMORY_CACHE.clone()); + tls_config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + if let Some(path) = cert_file { + let key_file = File::open(path)?; + let reader = &mut BufReader::new(key_file); + tls_config.root_store.add_pem_file(reader).unwrap(); + } + let tls_config = Arc::new(tls_config); + + let tls_stream = + TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns); + + let rid = { + let mut state_ = state.borrow_mut(); + state_ + .resource_table + .add(TlsStreamResource::new(tls_stream.into_split())) + }; + + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) +} + +fn load_certs(path: &str) -> Result<Vec<Certificate>, AnyError> { + let cert_file = File::open(path)?; + let reader = &mut BufReader::new(cert_file); + + let certs = certs(reader) + .map_err(|_| custom_error("InvalidData", "Unable to decode certificate"))?; + + if certs.is_empty() { + let e = custom_error("InvalidData", "No certificates found in cert file"); + return Err(e); + } + + Ok(certs) +} + +fn key_decode_err() -> AnyError { + custom_error("InvalidData", "Unable to decode key") +} + +fn key_not_found_err() -> AnyError { + custom_error("InvalidData", "No keys found in key file") +} + +/// Starts with -----BEGIN RSA PRIVATE KEY----- +fn load_rsa_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> { + let key_file = File::open(path)?; + let reader = &mut BufReader::new(key_file); + let keys = rsa_private_keys(reader).map_err(|_| key_decode_err())?; + Ok(keys) +} + +/// Starts with -----BEGIN PRIVATE KEY----- +fn load_pkcs8_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> { + let key_file = File::open(path)?; + let reader = &mut BufReader::new(key_file); + let keys = pkcs8_private_keys(reader).map_err(|_| key_decode_err())?; + Ok(keys) +} + +fn load_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> { + let path = path.to_string(); + let mut keys = load_rsa_keys(&path)?; + + if keys.is_empty() { + keys = load_pkcs8_keys(&path)?; + } + + if keys.is_empty() { + return Err(key_not_found_err()); + } + + Ok(keys) +} + +pub struct TlsListenerResource { + tcp_listener: AsyncRefCell<TcpListener>, + tls_config: Arc<ServerConfig>, + cancel_handle: CancelHandle, +} + +impl Resource for TlsListenerResource { + fn name(&self) -> Cow<str> { + "tlsListener".into() + } + + fn close(self: Rc<Self>) { + self.cancel_handle.cancel(); + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ListenTlsArgs { + transport: String, + hostname: String, + port: u16, + cert_file: String, + key_file: String, + alpn_protocols: Option<Vec<String>>, +} + +fn op_listen_tls<NP>( + state: &mut OpState, + args: ListenTlsArgs, + _: (), +) -> Result<OpConn, AnyError> +where + NP: NetPermissions + 'static, +{ + assert_eq!(args.transport, "tcp"); + let hostname = &*args.hostname; + let port = args.port; + let cert_file = &*args.cert_file; + let key_file = &*args.key_file; + + { + let permissions = state.borrow_mut::<NP>(); + permissions.check_net(&(hostname, Some(port)))?; + permissions.check_read(Path::new(cert_file))?; + permissions.check_read(Path::new(key_file))?; + } + + let mut tls_config = ServerConfig::new(NoClientAuth::new()); + if let Some(alpn_protocols) = args.alpn_protocols { + super::check_unstable(state, "Deno.listenTls#alpn_protocols"); + tls_config.alpn_protocols = + alpn_protocols.into_iter().map(|s| s.into_bytes()).collect(); + } + tls_config + .set_single_cert(load_certs(cert_file)?, load_keys(key_file)?.remove(0)) + .expect("invalid key or certificate"); + + let bind_addr = resolve_addr_sync(hostname, port)? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + let std_listener = std::net::TcpListener::bind(bind_addr)?; + std_listener.set_nonblocking(true)?; + let tcp_listener = TcpListener::from_std(std_listener)?; + let local_addr = tcp_listener.local_addr()?; + + let tls_listener_resource = TlsListenerResource { + tcp_listener: AsyncRefCell::new(tcp_listener), + tls_config: Arc::new(tls_config), + cancel_handle: Default::default(), + }; + + let rid = state.resource_table.add(tls_listener_resource); + + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: None, + }) +} + +async fn op_accept_tls( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + _: (), +) -> Result<OpConn, AnyError> { + let resource = state + .borrow() + .resource_table + .get::<TlsListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + + let cancel_handle = RcRef::map(&resource, |r| &r.cancel_handle); + let tcp_listener = RcRef::map(&resource, |r| &r.tcp_listener) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?; + + let (tcp_stream, remote_addr) = + match tcp_listener.accept().try_or_cancel(&cancel_handle).await { + Ok(tuple) => tuple, + Err(err) if err.kind() == ErrorKind::Interrupted => { + // FIXME(bartlomieju): compatibility with current JS implementation. + return Err(bad_resource("Listener has been closed")); + } + Err(err) => return Err(err.into()), + }; + + let local_addr = tcp_stream.local_addr()?; + + let tls_stream = TlsStream::new_server_side(tcp_stream, &resource.tls_config); + + let rid = { + let mut state_ = state.borrow_mut(); + state_ + .resource_table + .add(TlsStreamResource::new(tls_stream.into_split())) + }; + + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) +} diff --git a/extensions/net/ops_unix.rs b/extensions/net/ops_unix.rs new file mode 100644 index 000000000..9dfcc231e --- /dev/null +++ b/extensions/net/ops_unix.rs @@ -0,0 +1,180 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use crate::io::UnixStreamResource; +use crate::ops::AcceptArgs; +use crate::ops::OpAddr; +use crate::ops::OpConn; +use crate::ops::OpPacket; +use crate::ops::ReceiveArgs; +use deno_core::error::bad_resource; +use deno_core::error::custom_error; +use deno_core::error::null_opbuf; +use deno_core::error::AnyError; +use deno_core::AsyncRefCell; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ZeroCopyBuf; +use serde::Deserialize; +use serde::Serialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::fs::remove_file; +use std::path::Path; +use std::rc::Rc; +use tokio::net::UnixDatagram; +use tokio::net::UnixListener; +pub use tokio::net::UnixStream; + +/// A utility function to map OsStrings to Strings +pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> { + s.into_string().map_err(|s| { + let message = format!("File name or path {:?} is not valid UTF-8", s); + custom_error("InvalidData", message) + }) +} + +struct UnixListenerResource { + listener: AsyncRefCell<UnixListener>, + cancel: CancelHandle, +} + +impl Resource for UnixListenerResource { + fn name(&self) -> Cow<str> { + "unixListener".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } +} + +pub struct UnixDatagramResource { + pub socket: AsyncRefCell<UnixDatagram>, + pub cancel: CancelHandle, +} + +impl Resource for UnixDatagramResource { + fn name(&self) -> Cow<str> { + "unixDatagram".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } +} + +#[derive(Serialize)] +pub struct UnixAddr { + pub path: Option<String>, +} + +#[derive(Deserialize)] +pub struct UnixListenArgs { + pub path: String, +} + +pub(crate) async fn accept_unix( + state: Rc<RefCell<OpState>>, + args: AcceptArgs, + _: (), +) -> Result<OpConn, AnyError> { + let rid = args.rid; + + let resource = state + .borrow() + .resource_table + .get::<UnixListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + let listener = RcRef::map(&resource, |r| &r.listener) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Listener already in use"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (unix_stream, _socket_addr) = + listener.accept().try_or_cancel(cancel).await?; + + let local_addr = unix_stream.local_addr()?; + let remote_addr = unix_stream.peer_addr()?; + let resource = UnixStreamResource::new(unix_stream.into_split()); + let mut state = state.borrow_mut(); + let rid = state.resource_table.add(resource); + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Unix(UnixAddr { + path: local_addr.as_pathname().and_then(pathstring), + })), + remote_addr: Some(OpAddr::Unix(UnixAddr { + path: remote_addr.as_pathname().and_then(pathstring), + })), + }) +} + +pub(crate) async fn receive_unix_packet( + state: Rc<RefCell<OpState>>, + args: ReceiveArgs, + buf: Option<ZeroCopyBuf>, +) -> Result<OpPacket, AnyError> { + let mut buf = buf.ok_or_else(null_opbuf)?; + + let rid = args.rid; + + let resource = state + .borrow() + .resource_table + .get::<UnixDatagramResource>(rid) + .ok_or_else(|| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (size, remote_addr) = + socket.recv_from(&mut buf).try_or_cancel(cancel).await?; + Ok(OpPacket { + size, + remote_addr: OpAddr::UnixPacket(UnixAddr { + path: remote_addr.as_pathname().and_then(pathstring), + }), + }) +} + +pub fn listen_unix( + state: &mut OpState, + addr: &Path, +) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> { + if addr.exists() { + remove_file(&addr).unwrap(); + } + let listener = UnixListener::bind(&addr)?; + let local_addr = listener.local_addr()?; + let listener_resource = UnixListenerResource { + listener: AsyncRefCell::new(listener), + cancel: Default::default(), + }; + let rid = state.resource_table.add(listener_resource); + + Ok((rid, local_addr)) +} + +pub fn listen_unix_packet( + state: &mut OpState, + addr: &Path, +) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> { + if addr.exists() { + remove_file(&addr).unwrap(); + } + let socket = UnixDatagram::bind(&addr)?; + let local_addr = socket.local_addr()?; + let datagram_resource = UnixDatagramResource { + socket: AsyncRefCell::new(socket), + cancel: Default::default(), + }; + let rid = state.resource_table.add(datagram_resource); + + Ok((rid, local_addr)) +} + +pub fn pathstring(pathname: &Path) -> Option<String> { + into_string(pathname.into()).ok() +} diff --git a/extensions/net/resolve_addr.rs b/extensions/net/resolve_addr.rs new file mode 100644 index 000000000..ebf1374d1 --- /dev/null +++ b/extensions/net/resolve_addr.rs @@ -0,0 +1,156 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use deno_core::error::AnyError; +use std::net::SocketAddr; +use std::net::ToSocketAddrs; +use tokio::net::lookup_host; + +/// Resolve network address *asynchronously*. +pub async fn resolve_addr( + hostname: &str, + port: u16, +) -> Result<impl Iterator<Item = SocketAddr> + '_, AnyError> { + let addr_port_pair = make_addr_port_pair(hostname, port); + let result = lookup_host(addr_port_pair).await?; + Ok(result) +} + +/// Resolve network address *synchronously*. +pub fn resolve_addr_sync( + hostname: &str, + port: u16, +) -> Result<impl Iterator<Item = SocketAddr>, AnyError> { + let addr_port_pair = make_addr_port_pair(hostname, port); + let result = addr_port_pair.to_socket_addrs()?; + Ok(result) +} + +fn make_addr_port_pair(hostname: &str, port: u16) -> (&str, u16) { + // Default to localhost if given just the port. Example: ":80" + if hostname.is_empty() { + return ("0.0.0.0", port); + } + + // If this looks like an ipv6 IP address. Example: "[2001:db8::1]" + // Then we remove the brackets. + let addr = hostname.trim_start_matches('[').trim_end_matches(']'); + (addr, port) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::Ipv4Addr; + use std::net::Ipv6Addr; + use std::net::SocketAddrV4; + use std::net::SocketAddrV6; + + #[tokio::test] + async fn resolve_addr1() { + let expected = vec![SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + 80, + ))]; + let actual = resolve_addr("127.0.0.1", 80) + .await + .unwrap() + .collect::<Vec<_>>(); + assert_eq!(actual, expected); + } + + #[tokio::test] + async fn resolve_addr2() { + let expected = vec![SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(0, 0, 0, 0), + 80, + ))]; + let actual = resolve_addr("", 80).await.unwrap().collect::<Vec<_>>(); + assert_eq!(actual, expected); + } + + #[tokio::test] + async fn resolve_addr3() { + let expected = vec![SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(192, 0, 2, 1), + 25, + ))]; + let actual = resolve_addr("192.0.2.1", 25) + .await + .unwrap() + .collect::<Vec<_>>(); + assert_eq!(actual, expected); + } + + #[tokio::test] + async fn resolve_addr_ipv6() { + let expected = vec![SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1), + 8080, + 0, + 0, + ))]; + let actual = resolve_addr("[2001:db8::1]", 8080) + .await + .unwrap() + .collect::<Vec<_>>(); + assert_eq!(actual, expected); + } + + #[tokio::test] + async fn resolve_addr_err() { + assert!(resolve_addr("INVALID ADDR", 1234).await.is_err()); + } + + #[test] + fn resolve_addr_sync1() { + let expected = vec![SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + 80, + ))]; + let actual = resolve_addr_sync("127.0.0.1", 80) + .unwrap() + .collect::<Vec<_>>(); + assert_eq!(actual, expected); + } + + #[test] + fn resolve_addr_sync2() { + let expected = vec![SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(0, 0, 0, 0), + 80, + ))]; + let actual = resolve_addr_sync("", 80).unwrap().collect::<Vec<_>>(); + assert_eq!(actual, expected); + } + + #[test] + fn resolve_addr_sync3() { + let expected = vec![SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(192, 0, 2, 1), + 25, + ))]; + let actual = resolve_addr_sync("192.0.2.1", 25) + .unwrap() + .collect::<Vec<_>>(); + assert_eq!(actual, expected); + } + + #[test] + fn resolve_addr_sync_ipv6() { + let expected = vec![SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1), + 8080, + 0, + 0, + ))]; + let actual = resolve_addr_sync("[2001:db8::1]", 8080) + .unwrap() + .collect::<Vec<_>>(); + assert_eq!(actual, expected); + } + + #[test] + fn resolve_addr_sync_err() { + assert!(resolve_addr_sync("INVALID ADDR", 1234).is_err()); + } +} |