diff options
author | Ryan Dahl <ry@tinyclouds.org> | 2021-08-11 12:27:05 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-11 12:27:05 +0200 |
commit | a0285e2eb88f6254f6494b0ecd1878db3a3b2a58 (patch) | |
tree | 90671b004537e20f9493fd3277ffd21d30b39a0e /ext/net | |
parent | 3a6994115176781b3a93d70794b1b81bc95e42b4 (diff) |
Rename extensions/ directory to ext/ (#11643)
Diffstat (limited to 'ext/net')
-rw-r--r-- | ext/net/01_net.js | 240 | ||||
-rw-r--r-- | ext/net/02_tls.js | 89 | ||||
-rw-r--r-- | ext/net/04_net_unstable.js | 49 | ||||
-rw-r--r-- | ext/net/Cargo.toml | 25 | ||||
-rw-r--r-- | ext/net/README.md | 30 | ||||
-rw-r--r-- | ext/net/io.rs | 232 | ||||
-rw-r--r-- | ext/net/lib.deno_net.d.ts | 150 | ||||
-rw-r--r-- | ext/net/lib.deno_net.unstable.d.ts | 258 | ||||
-rw-r--r-- | ext/net/lib.rs | 131 | ||||
-rw-r--r-- | ext/net/ops.rs | 795 | ||||
-rw-r--r-- | ext/net/ops_tls.rs | 1061 | ||||
-rw-r--r-- | ext/net/ops_unix.rs | 180 | ||||
-rw-r--r-- | ext/net/resolve_addr.rs | 156 |
13 files changed, 3396 insertions, 0 deletions
diff --git a/ext/net/01_net.js b/ext/net/01_net.js new file mode 100644 index 000000000..cc10a1c0a --- /dev/null +++ b/ext/net/01_net.js @@ -0,0 +1,240 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +((window) => { + const core = window.Deno.core; + const { BadResource } = core; + const { + PromiseResolve, + SymbolAsyncIterator, + Uint8Array, + TypedArrayPrototypeSubarray, + } = window.__bootstrap.primordials; + + async function read( + rid, + buffer, + ) { + if (buffer.length === 0) { + return 0; + } + const nread = await core.opAsync("op_net_read_async", rid, buffer); + return nread === 0 ? null : nread; + } + + async function write(rid, data) { + return await core.opAsync("op_net_write_async", rid, data); + } + + function shutdown(rid) { + return core.opAsync("op_net_shutdown", rid); + } + + function opAccept(rid, transport) { + return core.opAsync("op_accept", { rid, transport }); + } + + function opListen(args) { + return core.opSync("op_listen", args); + } + + function opConnect(args) { + return core.opAsync("op_connect", args); + } + + function opReceive(rid, transport, zeroCopy) { + return core.opAsync( + "op_datagram_receive", + { rid, transport }, + zeroCopy, + ); + } + + function opSend(args, zeroCopy) { + return core.opAsync("op_datagram_send", args, zeroCopy); + } + + function resolveDns(query, recordType, options) { + return core.opAsync("op_dns_resolve", { query, recordType, options }); + } + + class Conn { + #rid = 0; + #remoteAddr = null; + #localAddr = null; + constructor(rid, remoteAddr, localAddr) { + this.#rid = rid; + this.#remoteAddr = remoteAddr; + this.#localAddr = localAddr; + } + + get rid() { + return this.#rid; + } + + get remoteAddr() { + return this.#remoteAddr; + } + + get localAddr() { + return this.#localAddr; + } + + write(p) { + return write(this.rid, p); + } + + read(p) { + return read(this.rid, p); + } + + close() { + core.close(this.rid); + } + + closeWrite() { + return shutdown(this.rid); + } + } + + class Listener { + #rid = 0; + #addr = null; + + constructor(rid, addr) { + this.#rid = rid; + this.#addr = addr; + } + + get rid() { + return this.#rid; + } + + get addr() { + return this.#addr; + } + + async accept() { + const res = await opAccept(this.rid, this.addr.transport); + return new Conn(res.rid, res.remoteAddr, res.localAddr); + } + + async next() { + let conn; + try { + conn = await this.accept(); + } catch (error) { + if (error instanceof BadResource) { + return { value: undefined, done: true }; + } + throw error; + } + return { value: conn, done: false }; + } + + return(value) { + this.close(); + return PromiseResolve({ value, done: true }); + } + + close() { + core.close(this.rid); + } + + [SymbolAsyncIterator]() { + return this; + } + } + + class Datagram { + #rid = 0; + #addr = null; + + constructor(rid, addr, bufSize = 1024) { + this.#rid = rid; + this.#addr = addr; + this.bufSize = bufSize; + } + + get rid() { + return this.#rid; + } + + get addr() { + return this.#addr; + } + + async receive(p) { + const buf = p || new Uint8Array(this.bufSize); + const { size, remoteAddr } = await opReceive( + this.rid, + this.addr.transport, + buf, + ); + const sub = TypedArrayPrototypeSubarray(buf, 0, size); + return [sub, remoteAddr]; + } + + send(p, addr) { + const remote = { hostname: "127.0.0.1", ...addr }; + + const args = { ...remote, rid: this.rid }; + return opSend(args, p); + } + + close() { + core.close(this.rid); + } + + async *[SymbolAsyncIterator]() { + while (true) { + try { + yield await this.receive(); + } catch (err) { + if (err instanceof BadResource) { + break; + } + throw err; + } + } + } + } + + function listen({ hostname, ...options }) { + const res = opListen({ + transport: "tcp", + hostname: typeof hostname === "undefined" ? "0.0.0.0" : hostname, + ...options, + }); + + return new Listener(res.rid, res.localAddr); + } + + async function connect(options) { + let res; + + if (options.transport === "unix") { + res = await opConnect(options); + } else { + res = await opConnect({ + transport: "tcp", + hostname: "127.0.0.1", + ...options, + }); + } + + return new Conn(res.rid, res.remoteAddr, res.localAddr); + } + + window.__bootstrap.net = { + connect, + Conn, + opConnect, + listen, + opListen, + Listener, + shutdown, + Datagram, + resolveDns, + }; +})(this); diff --git a/ext/net/02_tls.js b/ext/net/02_tls.js new file mode 100644 index 000000000..343ec2e4f --- /dev/null +++ b/ext/net/02_tls.js @@ -0,0 +1,89 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +((window) => { + const core = window.Deno.core; + const { Listener, Conn } = window.__bootstrap.net; + + function opConnectTls( + args, + ) { + return core.opAsync("op_connect_tls", args); + } + + function opAcceptTLS(rid) { + return core.opAsync("op_accept_tls", rid); + } + + function opListenTls(args) { + return core.opSync("op_listen_tls", args); + } + + function opStartTls(args) { + return core.opAsync("op_start_tls", args); + } + + async function connectTls({ + port, + hostname = "127.0.0.1", + transport = "tcp", + certFile = undefined, + certChain = undefined, + privateKey = undefined, + }) { + const res = await opConnectTls({ + port, + hostname, + transport, + certFile, + certChain, + privateKey, + }); + return new Conn(res.rid, res.remoteAddr, res.localAddr); + } + + class TLSListener extends Listener { + async accept() { + const res = await opAcceptTLS(this.rid); + return new Conn(res.rid, res.remoteAddr, res.localAddr); + } + } + + function listenTls({ + port, + certFile, + keyFile, + hostname = "0.0.0.0", + transport = "tcp", + alpnProtocols, + }) { + const res = opListenTls({ + port, + certFile, + keyFile, + hostname, + transport, + alpnProtocols, + }); + return new TLSListener(res.rid, res.localAddr); + } + + async function startTls( + conn, + { hostname = "127.0.0.1", certFile } = {}, + ) { + const res = await opStartTls({ + rid: conn.rid, + hostname, + certFile, + }); + return new Conn(res.rid, res.remoteAddr, res.localAddr); + } + + window.__bootstrap.tls = { + startTls, + listenTls, + connectTls, + TLSListener, + }; +})(this); diff --git a/ext/net/04_net_unstable.js b/ext/net/04_net_unstable.js new file mode 100644 index 000000000..ca265bfaa --- /dev/null +++ b/ext/net/04_net_unstable.js @@ -0,0 +1,49 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +((window) => { + const net = window.__bootstrap.net; + + function listen(options) { + if (options.transport === "unix") { + const res = net.opListen(options); + return new net.Listener(res.rid, res.localAddr); + } else { + return net.listen(options); + } + } + + function listenDatagram( + options, + ) { + let res; + if (options.transport === "unixpacket") { + res = net.opListen(options); + } else { + res = net.opListen({ + transport: "udp", + hostname: "127.0.0.1", + ...options, + }); + } + + return new net.Datagram(res.rid, res.localAddr); + } + + async function connect( + options, + ) { + if (options.transport === "unix") { + const res = await net.opConnect(options); + return new net.Conn(res.rid, res.remoteAddr, res.localAddr); + } else { + return net.connect(options); + } + } + + window.__bootstrap.netUnstable = { + connect, + listenDatagram, + listen, + }; +})(this); diff --git a/ext/net/Cargo.toml b/ext/net/Cargo.toml new file mode 100644 index 000000000..09daf0e48 --- /dev/null +++ b/ext/net/Cargo.toml @@ -0,0 +1,25 @@ +# Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +[package] +name = "deno_net" +version = "0.5.0" +authors = ["the Deno authors"] +edition = "2018" +license = "MIT" +readme = "README.md" +repository = "https://github.com/denoland/deno" +description = "Networking for Deno" + +[lib] +path = "lib.rs" + +[dependencies] +deno_core = { version = "0.96.0", path = "../../core" } +deno_tls = { version = "0.1.0", path = "../tls" } + +lazy_static = "1.4.0" +log = "0.4.14" +serde = { version = "1.0.126", features = ["derive"] } +tokio = { version = "1.8.1", features = ["full"] } +trust-dns-proto = "0.20.3" +trust-dns-resolver = { version = "0.20.3", features = ["tokio-runtime", "serde-config"] } diff --git a/ext/net/README.md b/ext/net/README.md new file mode 100644 index 000000000..cdd8923e1 --- /dev/null +++ b/ext/net/README.md @@ -0,0 +1,30 @@ +# deno_net + +This crate implements networking APIs. + +This crate depends on following extensions: + +- "deno_web" +- "deno_fetch" + +Following ops are provided: + +- "op_net_read_async" +- "op_net_write_async" +- "op_net_shutdown" +- "op_accept" +- "op_connect" +- "op_listen" +- "op_datagram_receive" +- "op_datagram_send" +- "op_dns_resolve" +- "op_start_tls" +- "op_connect_tls" +- "op_listen_tls" +- "op_accept_tls" +- "op_http_start" +- "op_http_request_next" +- "op_http_request_read" +- "op_http_response" +- "op_http_response_write" +- "op_http_response_close" diff --git a/ext/net/io.rs b/ext/net/io.rs new file mode 100644 index 000000000..fc10d7e99 --- /dev/null +++ b/ext/net/io.rs @@ -0,0 +1,232 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use crate::ops_tls as tls; +use deno_core::error::null_opbuf; +use deno_core::error::AnyError; +use deno_core::error::{bad_resource_id, not_supported}; +use deno_core::op_async; +use deno_core::AsyncMutFuture; +use deno_core::AsyncRefCell; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::OpPair; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::ZeroCopyBuf; +use std::borrow::Cow; +use std::cell::RefCell; +use std::rc::Rc; +use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWrite; +use tokio::io::AsyncWriteExt; +use tokio::net::tcp; + +#[cfg(unix)] +use tokio::net::unix; + +pub fn init() -> Vec<OpPair> { + vec![ + ("op_net_read_async", op_async(op_read_async)), + ("op_net_write_async", op_async(op_write_async)), + ("op_net_shutdown", op_async(op_shutdown)), + ] +} + +/// A full duplex resource has a read and write ends that are completely +/// independent, like TCP/Unix sockets and TLS streams. +#[derive(Debug)] +pub struct FullDuplexResource<R, W> { + rd: AsyncRefCell<R>, + wr: AsyncRefCell<W>, + // When a full-duplex resource is closed, all pending 'read' ops are + // canceled, while 'write' ops are allowed to complete. Therefore only + // 'read' futures should be attached to this cancel handle. + cancel_handle: CancelHandle, +} + +impl<R, W> FullDuplexResource<R, W> +where + R: AsyncRead + Unpin + 'static, + W: AsyncWrite + Unpin + 'static, +{ + pub fn new((rd, wr): (R, W)) -> Self { + Self { + rd: rd.into(), + wr: wr.into(), + cancel_handle: Default::default(), + } + } + + pub fn into_inner(self) -> (R, W) { + (self.rd.into_inner(), self.wr.into_inner()) + } + + pub fn rd_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<R> { + RcRef::map(self, |r| &r.rd).borrow_mut() + } + + pub fn wr_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<W> { + RcRef::map(self, |r| &r.wr).borrow_mut() + } + + pub fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> { + RcRef::map(self, |r| &r.cancel_handle) + } + + pub fn cancel_read_ops(&self) { + self.cancel_handle.cancel() + } + + pub async fn read( + self: &Rc<Self>, + buf: &mut [u8], + ) -> Result<usize, AnyError> { + let mut rd = self.rd_borrow_mut().await; + let nread = rd.read(buf).try_or_cancel(self.cancel_handle()).await?; + Ok(nread) + } + + pub async fn write(self: &Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> { + let mut wr = self.wr_borrow_mut().await; + let nwritten = wr.write(buf).await?; + Ok(nwritten) + } + + pub async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> { + let mut wr = self.wr_borrow_mut().await; + wr.shutdown().await?; + Ok(()) + } +} + +pub type TcpStreamResource = + FullDuplexResource<tcp::OwnedReadHalf, tcp::OwnedWriteHalf>; + +impl Resource for TcpStreamResource { + fn name(&self) -> Cow<str> { + "tcpStream".into() + } + + fn close(self: Rc<Self>) { + self.cancel_read_ops(); + } +} + +pub type TlsStreamResource = FullDuplexResource<tls::ReadHalf, tls::WriteHalf>; + +impl Resource for TlsStreamResource { + fn name(&self) -> Cow<str> { + "tlsStream".into() + } + + fn close(self: Rc<Self>) { + self.cancel_read_ops(); + } +} + +#[cfg(unix)] +pub type UnixStreamResource = + FullDuplexResource<unix::OwnedReadHalf, unix::OwnedWriteHalf>; + +#[cfg(not(unix))] +pub struct UnixStreamResource; + +#[cfg(not(unix))] +impl UnixStreamResource { + pub async fn read( + self: &Rc<Self>, + _buf: &mut [u8], + ) -> Result<usize, AnyError> { + unreachable!() + } + pub async fn write(self: &Rc<Self>, _buf: &[u8]) -> Result<usize, AnyError> { + unreachable!() + } + pub async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> { + unreachable!() + } + pub fn cancel_read_ops(&self) { + unreachable!() + } +} + +impl Resource for UnixStreamResource { + fn name(&self) -> Cow<str> { + "unixStream".into() + } + + fn close(self: Rc<Self>) { + self.cancel_read_ops(); + } +} + +async fn op_read_async( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + buf: Option<ZeroCopyBuf>, +) -> Result<u32, AnyError> { + let buf = &mut buf.ok_or_else(null_opbuf)?; + let resource = state + .borrow() + .resource_table + .get_any(rid) + .ok_or_else(bad_resource_id)?; + let nread = if let Some(s) = resource.downcast_rc::<TcpStreamResource>() { + s.read(buf).await? + } else if let Some(s) = resource.downcast_rc::<TlsStreamResource>() { + s.read(buf).await? + } else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() { + s.read(buf).await? + } else { + return Err(not_supported()); + }; + Ok(nread as u32) +} + +async fn op_write_async( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + buf: Option<ZeroCopyBuf>, +) -> Result<u32, AnyError> { + let buf = &buf.ok_or_else(null_opbuf)?; + let resource = state + .borrow() + .resource_table + .get_any(rid) + .ok_or_else(bad_resource_id)?; + let nwritten = if let Some(s) = resource.downcast_rc::<TcpStreamResource>() { + s.write(buf).await? + } else if let Some(s) = resource.downcast_rc::<TlsStreamResource>() { + s.write(buf).await? + } else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() { + s.write(buf).await? + } else { + return Err(not_supported()); + }; + Ok(nwritten as u32) +} + +async fn op_shutdown( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + _: (), +) -> Result<(), AnyError> { + let resource = state + .borrow() + .resource_table + .get_any(rid) + .ok_or_else(bad_resource_id)?; + if let Some(s) = resource.downcast_rc::<TcpStreamResource>() { + s.shutdown().await?; + } else if let Some(s) = resource.downcast_rc::<TlsStreamResource>() { + s.shutdown().await?; + } else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() { + s.shutdown().await?; + } else { + return Err(not_supported()); + } + Ok(()) +} diff --git a/ext/net/lib.deno_net.d.ts b/ext/net/lib.deno_net.d.ts new file mode 100644 index 000000000..d35e01e31 --- /dev/null +++ b/ext/net/lib.deno_net.d.ts @@ -0,0 +1,150 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +/// <reference no-default-lib="true" /> +/// <reference lib="esnext" /> + +declare namespace Deno { + export interface NetAddr { + transport: "tcp" | "udp"; + hostname: string; + port: number; + } + + export interface UnixAddr { + transport: "unix" | "unixpacket"; + path: string; + } + + export type Addr = NetAddr | UnixAddr; + + /** A generic network listener for stream-oriented protocols. */ + export interface Listener extends AsyncIterable<Conn> { + /** Waits for and resolves to the next connection to the `Listener`. */ + accept(): Promise<Conn>; + /** Close closes the listener. Any pending accept promises will be rejected + * with errors. */ + close(): void; + /** Return the address of the `Listener`. */ + readonly addr: Addr; + + /** Return the rid of the `Listener`. */ + readonly rid: number; + + [Symbol.asyncIterator](): AsyncIterableIterator<Conn>; + } + + export interface Conn extends Reader, Writer, Closer { + /** The local address of the connection. */ + readonly localAddr: Addr; + /** The remote address of the connection. */ + readonly remoteAddr: Addr; + /** The resource ID of the connection. */ + readonly rid: number; + /** Shuts down (`shutdown(2)`) the write side of the connection. Most + * callers should just use `close()`. */ + closeWrite(): Promise<void>; + } + + export interface ListenOptions { + /** The port to listen on. */ + port: number; + /** A literal IP address or host name that can be resolved to an IP address. + * If not specified, defaults to `0.0.0.0`. */ + hostname?: string; + } + + /** Listen announces on the local transport address. + * + * ```ts + * const listener1 = Deno.listen({ port: 80 }) + * const listener2 = Deno.listen({ hostname: "192.0.2.1", port: 80 }) + * const listener3 = Deno.listen({ hostname: "[2001:db8::1]", port: 80 }); + * const listener4 = Deno.listen({ hostname: "golang.org", port: 80, transport: "tcp" }); + * ``` + * + * Requires `allow-net` permission. */ + export function listen( + options: ListenOptions & { transport?: "tcp" }, + ): Listener; + + export interface ListenTlsOptions extends ListenOptions { + /** Path to a file containing a PEM formatted CA certificate. Requires + * `--allow-read`. */ + certFile: string; + /** Server public key file. Requires `--allow-read`.*/ + keyFile: string; + + transport?: "tcp"; + } + + /** Listen announces on the local transport address over TLS (transport layer + * security). + * + * ```ts + * const lstnr = Deno.listenTls({ port: 443, certFile: "./server.crt", keyFile: "./server.key" }); + * ``` + * + * Requires `allow-net` permission. */ + export function listenTls(options: ListenTlsOptions): Listener; + + export interface ConnectOptions { + /** The port to connect to. */ + port: number; + /** A literal IP address or host name that can be resolved to an IP address. + * If not specified, defaults to `127.0.0.1`. */ + hostname?: string; + transport?: "tcp"; + } + + /** + * Connects to the hostname (default is "127.0.0.1") and port on the named + * transport (default is "tcp"), and resolves to the connection (`Conn`). + * + * ```ts + * const conn1 = await Deno.connect({ port: 80 }); + * const conn2 = await Deno.connect({ hostname: "192.0.2.1", port: 80 }); + * const conn3 = await Deno.connect({ hostname: "[2001:db8::1]", port: 80 }); + * const conn4 = await Deno.connect({ hostname: "golang.org", port: 80, transport: "tcp" }); + * ``` + * + * Requires `allow-net` permission for "tcp". */ + export function connect(options: ConnectOptions): Promise<Conn>; + + export interface ConnectTlsOptions { + /** The port to connect to. */ + port: number; + /** A literal IP address or host name that can be resolved to an IP address. + * If not specified, defaults to `127.0.0.1`. */ + hostname?: string; + /** Server certificate file. */ + certFile?: string; + } + + /** Establishes a secure connection over TLS (transport layer security) using + * an optional cert file, hostname (default is "127.0.0.1") and port. The + * cert file is optional and if not included Mozilla's root certificates will + * be used (see also https://github.com/ctz/webpki-roots for specifics) + * + * ```ts + * const conn1 = await Deno.connectTls({ port: 80 }); + * const conn2 = await Deno.connectTls({ certFile: "./certs/my_custom_root_CA.pem", hostname: "192.0.2.1", port: 80 }); + * const conn3 = await Deno.connectTls({ hostname: "[2001:db8::1]", port: 80 }); + * const conn4 = await Deno.connectTls({ certFile: "./certs/my_custom_root_CA.pem", hostname: "golang.org", port: 80}); + * ``` + * + * Requires `allow-net` permission. + */ + export function connectTls(options: ConnectTlsOptions): Promise<Conn>; + + /** Shutdown socket send operations. + * + * Matches behavior of POSIX shutdown(3). + * + * ```ts + * const listener = Deno.listen({ port: 80 }); + * const conn = await listener.accept(); + * Deno.shutdown(conn.rid); + * ``` + */ + export function shutdown(rid: number): Promise<void>; +} diff --git a/ext/net/lib.deno_net.unstable.d.ts b/ext/net/lib.deno_net.unstable.d.ts new file mode 100644 index 000000000..145f232c0 --- /dev/null +++ b/ext/net/lib.deno_net.unstable.d.ts @@ -0,0 +1,258 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +/// <reference no-default-lib="true" /> +/// <reference lib="esnext" /> + +declare namespace Deno { + /** The type of the resource record. + * Only the listed types are supported currently. */ + export type RecordType = + | "A" + | "AAAA" + | "ANAME" + | "CNAME" + | "MX" + | "PTR" + | "SRV" + | "TXT"; + + export interface ResolveDnsOptions { + /** The name server to be used for lookups. + * If not specified, defaults to the system configuration e.g. `/etc/resolv.conf` on Unix. */ + nameServer?: { + /** The IP address of the name server */ + ipAddr: string; + /** The port number the query will be sent to. + * If not specified, defaults to 53. */ + port?: number; + }; + } + + /** If `resolveDns` is called with "MX" record type specified, it will return an array of this interface. */ + export interface MXRecord { + preference: number; + exchange: string; + } + + /** If `resolveDns` is called with "SRV" record type specified, it will return an array of this interface. */ + export interface SRVRecord { + priority: number; + weight: number; + port: number; + target: string; + } + + export function resolveDns( + query: string, + recordType: "A" | "AAAA" | "ANAME" | "CNAME" | "PTR", + options?: ResolveDnsOptions, + ): Promise<string[]>; + + export function resolveDns( + query: string, + recordType: "MX", + options?: ResolveDnsOptions, + ): Promise<MXRecord[]>; + + export function resolveDns( + query: string, + recordType: "SRV", + options?: ResolveDnsOptions, + ): Promise<SRVRecord[]>; + + export function resolveDns( + query: string, + recordType: "TXT", + options?: ResolveDnsOptions, + ): Promise<string[][]>; + + /** ** UNSTABLE**: new API, yet to be vetted. +* +* Performs DNS resolution against the given query, returning resolved records. +* Fails in the cases such as: +* - the query is in invalid format +* - the options have an invalid parameter, e.g. `nameServer.port` is beyond the range of 16-bit unsigned integer +* - timed out +* +* ```ts +* const a = await Deno.resolveDns("example.com", "A"); +* +* const aaaa = await Deno.resolveDns("example.com", "AAAA", { +* nameServer: { ipAddr: "8.8.8.8", port: 1234 }, +* }); +* ``` +* +* Requires `allow-net` permission. + */ + export function resolveDns( + query: string, + recordType: RecordType, + options?: ResolveDnsOptions, + ): Promise<string[] | MXRecord[] | SRVRecord[] | string[][]>; + + /** **UNSTABLE**: new API, yet to be vetted. +* +* A generic transport listener for message-oriented protocols. */ + export interface DatagramConn extends AsyncIterable<[Uint8Array, Addr]> { + /** **UNSTABLE**: new API, yet to be vetted. + * + * Waits for and resolves to the next message to the `UDPConn`. */ + receive(p?: Uint8Array): Promise<[Uint8Array, Addr]>; + /** UNSTABLE: new API, yet to be vetted. + * + * Sends a message to the target. */ + send(p: Uint8Array, addr: Addr): Promise<number>; + /** UNSTABLE: new API, yet to be vetted. + * + * Close closes the socket. Any pending message promises will be rejected + * with errors. */ + close(): void; + /** Return the address of the `UDPConn`. */ + readonly addr: Addr; + [Symbol.asyncIterator](): AsyncIterableIterator<[Uint8Array, Addr]>; + } + + export interface UnixListenOptions { + /** A Path to the Unix Socket. */ + path: string; + } + + /** **UNSTABLE**: new API, yet to be vetted. +* +* Listen announces on the local transport address. +* +* ```ts +* const listener = Deno.listen({ path: "/foo/bar.sock", transport: "unix" }) +* ``` +* +* Requires `allow-read` and `allow-write` permission. */ + export function listen( + options: UnixListenOptions & { transport: "unix" }, + ): Listener; + + /** **UNSTABLE**: new API, yet to be vetted +* +* Listen announces on the local transport address. +* +* ```ts +* const listener1 = Deno.listenDatagram({ +* port: 80, +* transport: "udp" +* }); +* const listener2 = Deno.listenDatagram({ +* hostname: "golang.org", +* port: 80, +* transport: "udp" +* }); +* ``` +* +* Requires `allow-net` permission. */ + export function listenDatagram( + options: ListenOptions & { transport: "udp" }, + ): DatagramConn; + + /** **UNSTABLE**: new API, yet to be vetted +* +* Listen announces on the local transport address. +* +* ```ts +* const listener = Deno.listenDatagram({ +* path: "/foo/bar.sock", +* transport: "unixpacket" +* }); +* ``` +* +* Requires `allow-read` and `allow-write` permission. */ + export function listenDatagram( + options: UnixListenOptions & { transport: "unixpacket" }, + ): DatagramConn; + + export interface UnixConnectOptions { + transport: "unix"; + path: string; + } + + /** **UNSTABLE**: The unix socket transport is unstable as a new API yet to +* be vetted. The TCP transport is considered stable. +* +* Connects to the hostname (default is "127.0.0.1") and port on the named +* transport (default is "tcp"), and resolves to the connection (`Conn`). +* +* ```ts +* const conn1 = await Deno.connect({ port: 80 }); +* const conn2 = await Deno.connect({ hostname: "192.0.2.1", port: 80 }); +* const conn3 = await Deno.connect({ hostname: "[2001:db8::1]", port: 80 }); +* const conn4 = await Deno.connect({ hostname: "golang.org", port: 80, transport: "tcp" }); +* const conn5 = await Deno.connect({ path: "/foo/bar.sock", transport: "unix" }); +* ``` +* +* Requires `allow-net` permission for "tcp" and `allow-read` for "unix". */ + export function connect( + options: ConnectOptions | UnixConnectOptions, + ): Promise<Conn>; + + export interface ConnectTlsClientCertOptions { + /** PEM formatted client certificate chain. */ + certChain: string; + /** PEM formatted (RSA or PKCS8) private key of client certificate. */ + privateKey: string; + } + + /** **UNSTABLE** New API, yet to be vetted. + * + * Create a TLS connection with an attached client certificate. + * + * ```ts + * const conn = await Deno.connectTls({ + * hostname: "deno.land", + * port: 443, + * certChain: "---- BEGIN CERTIFICATE ----\n ...", + * privateKey: "---- BEGIN PRIVATE KEY ----\n ...", + * }); + * ``` + * + * Requires `allow-net` permission. + */ + export function connectTls( + options: ConnectTlsOptions & ConnectTlsClientCertOptions, + ): Promise<Conn>; + + export interface StartTlsOptions { + /** A literal IP address or host name that can be resolved to an IP address. + * If not specified, defaults to `127.0.0.1`. */ + hostname?: string; + /** Server certificate file. */ + certFile?: string; + } + + /** **UNSTABLE**: new API, yet to be vetted. +* +* Start TLS handshake from an existing connection using +* an optional cert file, hostname (default is "127.0.0.1"). The +* cert file is optional and if not included Mozilla's root certificates will +* be used (see also https://github.com/ctz/webpki-roots for specifics) +* Using this function requires that the other end of the connection is +* prepared for TLS handshake. +* +* ```ts +* const conn = await Deno.connect({ port: 80, hostname: "127.0.0.1" }); +* const tlsConn = await Deno.startTls(conn, { certFile: "./certs/my_custom_root_CA.pem", hostname: "localhost" }); +* ``` +* +* Requires `allow-net` permission. + */ + export function startTls( + conn: Conn, + options?: StartTlsOptions, + ): Promise<Conn>; + + export interface ListenTlsOptions { + /** **UNSTABLE**: new API, yet to be vetted. + * + * Application-Layer Protocol Negotiation (ALPN) protocols to announce to + * the client. If not specified, no ALPN extension will be included in the + * TLS handshake. + */ + alpnProtocols?: string[]; + } +} diff --git a/ext/net/lib.rs b/ext/net/lib.rs new file mode 100644 index 000000000..3764433e3 --- /dev/null +++ b/ext/net/lib.rs @@ -0,0 +1,131 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +pub mod io; +pub mod ops; +pub mod ops_tls; +#[cfg(unix)] +pub mod ops_unix; +pub mod resolve_addr; + +use deno_core::error::AnyError; +use deno_core::include_js_files; +use deno_core::Extension; +use deno_core::OpState; +use deno_tls::rustls::RootCertStore; +use std::cell::RefCell; +use std::path::Path; +use std::path::PathBuf; +use std::rc::Rc; + +pub trait NetPermissions { + fn check_net<T: AsRef<str>>( + &mut self, + _host: &(T, Option<u16>), + ) -> Result<(), AnyError>; + fn check_read(&mut self, _p: &Path) -> Result<(), AnyError>; + fn check_write(&mut self, _p: &Path) -> Result<(), AnyError>; +} + +/// For use with this crate when the user does not want permission checks. +pub struct NoNetPermissions; + +impl NetPermissions for NoNetPermissions { + fn check_net<T: AsRef<str>>( + &mut self, + _host: &(T, Option<u16>), + ) -> Result<(), AnyError> { + Ok(()) + } + + fn check_read(&mut self, _p: &Path) -> Result<(), AnyError> { + Ok(()) + } + + fn check_write(&mut self, _p: &Path) -> Result<(), AnyError> { + Ok(()) + } +} + +/// `UnstableChecker` is a struct so it can be placed inside `GothamState`; +/// using type alias for a bool could work, but there's a high chance +/// that there might be another type alias pointing to a bool, which +/// would override previously used alias. +pub struct UnstableChecker { + pub unstable: bool, +} + +impl UnstableChecker { + /// Quits the process if the --unstable flag was not provided. + /// + /// This is intentionally a non-recoverable check so that people cannot probe + /// for unstable APIs from stable programs. + // NOTE(bartlomieju): keep in sync with `cli/program_state.rs` + pub fn check_unstable(&self, api_name: &str) { + if !self.unstable { + eprintln!( + "Unstable API '{}'. The --unstable flag must be provided.", + api_name + ); + std::process::exit(70); + } + } +} +/// Helper for checking unstable features. Used for sync ops. +pub fn check_unstable(state: &OpState, api_name: &str) { + state.borrow::<UnstableChecker>().check_unstable(api_name) +} + +/// Helper for checking unstable features. Used for async ops. +pub fn check_unstable2(state: &Rc<RefCell<OpState>>, api_name: &str) { + let state = state.borrow(); + state.borrow::<UnstableChecker>().check_unstable(api_name) +} + +pub fn get_declaration() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_net.d.ts") +} + +pub fn get_unstable_declaration() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_net.unstable.d.ts") +} + +#[derive(Clone)] +pub struct DefaultTlsOptions { + pub root_cert_store: Option<RootCertStore>, +} + +/// `UnsafelyIgnoreCertificateErrors` is a wrapper struct so it can be placed inside `GothamState`; +/// using type alias for a `Option<Vec<String>>` could work, but there's a high chance +/// that there might be another type alias pointing to a `Option<Vec<String>>`, which +/// would override previously used alias. +pub struct UnsafelyIgnoreCertificateErrors(Option<Vec<String>>); + +pub fn init<P: NetPermissions + 'static>( + root_cert_store: Option<RootCertStore>, + unstable: bool, + unsafely_ignore_certificate_errors: Option<Vec<String>>, +) -> Extension { + let mut ops_to_register = vec![]; + ops_to_register.extend(io::init()); + ops_to_register.extend(ops::init::<P>()); + ops_to_register.extend(ops_tls::init::<P>()); + Extension::builder() + .js(include_js_files!( + prefix "deno:ext/net", + "01_net.js", + "02_tls.js", + "04_net_unstable.js", + )) + .ops(ops_to_register) + .state(move |state| { + state.put(DefaultTlsOptions { + root_cert_store: root_cert_store.clone(), + }); + state.put(UnstableChecker { unstable }); + state.put(UnsafelyIgnoreCertificateErrors( + unsafely_ignore_certificate_errors.clone(), + )); + Ok(()) + }) + .build() +} diff --git a/ext/net/ops.rs b/ext/net/ops.rs new file mode 100644 index 000000000..a0fc2179e --- /dev/null +++ b/ext/net/ops.rs @@ -0,0 +1,795 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use crate::io::TcpStreamResource; +use crate::resolve_addr::resolve_addr; +use crate::resolve_addr::resolve_addr_sync; +use crate::NetPermissions; +use deno_core::error::bad_resource; +use deno_core::error::custom_error; +use deno_core::error::generic_error; +use deno_core::error::null_opbuf; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::op_async; +use deno_core::op_sync; +use deno_core::AsyncRefCell; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::OpPair; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::ZeroCopyBuf; +use log::debug; +use serde::Deserialize; +use serde::Serialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::net::SocketAddr; +use std::rc::Rc; +use tokio::net::TcpListener; +use tokio::net::TcpStream; +use tokio::net::UdpSocket; +use trust_dns_proto::rr::record_data::RData; +use trust_dns_proto::rr::record_type::RecordType; +use trust_dns_resolver::config::NameServerConfigGroup; +use trust_dns_resolver::config::ResolverConfig; +use trust_dns_resolver::config::ResolverOpts; +use trust_dns_resolver::system_conf; +use trust_dns_resolver::AsyncResolver; + +#[cfg(unix)] +use super::ops_unix as net_unix; +#[cfg(unix)] +use crate::io::UnixStreamResource; +#[cfg(unix)] +use std::path::Path; + +pub fn init<P: NetPermissions + 'static>() -> Vec<OpPair> { + vec![ + ("op_accept", op_async(op_accept)), + ("op_connect", op_async(op_connect::<P>)), + ("op_listen", op_sync(op_listen::<P>)), + ("op_datagram_receive", op_async(op_datagram_receive)), + ("op_datagram_send", op_async(op_datagram_send::<P>)), + ("op_dns_resolve", op_async(op_dns_resolve::<P>)), + ] +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct OpConn { + pub rid: ResourceId, + pub remote_addr: Option<OpAddr>, + pub local_addr: Option<OpAddr>, +} + +#[derive(Serialize)] +#[serde(tag = "transport", rename_all = "lowercase")] +pub enum OpAddr { + Tcp(IpAddr), + Udp(IpAddr), + #[cfg(unix)] + Unix(net_unix::UnixAddr), + #[cfg(unix)] + UnixPacket(net_unix::UnixAddr), +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +/// A received datagram packet (from udp or unixpacket) +pub struct OpPacket { + pub size: usize, + pub remote_addr: OpAddr, +} + +#[derive(Serialize)] +pub struct IpAddr { + pub hostname: String, + pub port: u16, +} + +#[derive(Deserialize)] +pub(crate) struct AcceptArgs { + pub rid: ResourceId, + pub transport: String, +} + +async fn accept_tcp( + state: Rc<RefCell<OpState>>, + args: AcceptArgs, + _: (), +) -> Result<OpConn, AnyError> { + let rid = args.rid; + + let resource = state + .borrow() + .resource_table + .get::<TcpListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + let listener = RcRef::map(&resource, |r| &r.listener) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (tcp_stream, _socket_addr) = + listener.accept().try_or_cancel(cancel).await.map_err(|e| { + // FIXME(bartlomieju): compatibility with current JS implementation + if let std::io::ErrorKind::Interrupted = e.kind() { + bad_resource("Listener has been closed") + } else { + e.into() + } + })?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + + let mut state = state.borrow_mut(); + let rid = state + .resource_table + .add(TcpStreamResource::new(tcp_stream.into_split())); + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) +} + +async fn op_accept( + state: Rc<RefCell<OpState>>, + args: AcceptArgs, + _: (), +) -> Result<OpConn, AnyError> { + match args.transport.as_str() { + "tcp" => accept_tcp(state, args, ()).await, + #[cfg(unix)] + "unix" => net_unix::accept_unix(state, args, ()).await, + other => Err(bad_transport(other)), + } +} + +fn bad_transport(transport: &str) -> AnyError { + generic_error(format!("Unsupported transport protocol {}", transport)) +} + +#[derive(Deserialize)] +pub(crate) struct ReceiveArgs { + pub rid: ResourceId, + pub transport: String, +} + +async fn receive_udp( + state: Rc<RefCell<OpState>>, + args: ReceiveArgs, + zero_copy: Option<ZeroCopyBuf>, +) -> Result<OpPacket, AnyError> { + let zero_copy = zero_copy.ok_or_else(null_opbuf)?; + let mut zero_copy = zero_copy.clone(); + + let rid = args.rid; + + let resource = state + .borrow_mut() + .resource_table + .get::<UdpSocketResource>(rid) + .ok_or_else(|| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + let cancel_handle = RcRef::map(&resource, |r| &r.cancel); + let (size, remote_addr) = socket + .recv_from(&mut zero_copy) + .try_or_cancel(cancel_handle) + .await?; + Ok(OpPacket { + size, + remote_addr: OpAddr::Udp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + }), + }) +} + +async fn op_datagram_receive( + state: Rc<RefCell<OpState>>, + args: ReceiveArgs, + zero_copy: Option<ZeroCopyBuf>, +) -> Result<OpPacket, AnyError> { + match args.transport.as_str() { + "udp" => receive_udp(state, args, zero_copy).await, + #[cfg(unix)] + "unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await, + other => Err(bad_transport(other)), + } +} + +#[derive(Deserialize)] +struct SendArgs { + rid: ResourceId, + transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, +} + +async fn op_datagram_send<NP>( + state: Rc<RefCell<OpState>>, + args: SendArgs, + zero_copy: Option<ZeroCopyBuf>, +) -> Result<usize, AnyError> +where + NP: NetPermissions + 'static, +{ + let zero_copy = zero_copy.ok_or_else(null_opbuf)?; + let zero_copy = zero_copy.clone(); + + match args { + SendArgs { + rid, + transport, + transport_args: ArgsEnum::Ip(args), + } if transport == "udp" => { + { + let mut s = state.borrow_mut(); + s.borrow_mut::<NP>() + .check_net(&(&args.hostname, Some(args.port)))?; + } + let addr = resolve_addr(&args.hostname, args.port) + .await? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + + let resource = state + .borrow_mut() + .resource_table + .get::<UdpSocketResource>(rid) + .ok_or_else(|| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + let byte_length = socket.send_to(&zero_copy, &addr).await?; + Ok(byte_length) + } + #[cfg(unix)] + SendArgs { + rid, + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unixpacket" => { + let address_path = Path::new(&args.path); + { + let mut s = state.borrow_mut(); + s.borrow_mut::<NP>().check_write(address_path)?; + } + let resource = state + .borrow() + .resource_table + .get::<net_unix::UnixDatagramResource>(rid) + .ok_or_else(|| { + custom_error("NotConnected", "Socket has been closed") + })?; + let socket = RcRef::map(&resource, |r| &r.socket) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; + let byte_length = socket.send_to(&zero_copy, address_path).await?; + Ok(byte_length) + } + _ => Err(type_error("Wrong argument format!")), + } +} + +#[derive(Deserialize)] +struct ConnectArgs { + transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, +} + +async fn op_connect<NP>( + state: Rc<RefCell<OpState>>, + args: ConnectArgs, + _: (), +) -> Result<OpConn, AnyError> +where + NP: NetPermissions + 'static, +{ + match args { + ConnectArgs { + transport, + transport_args: ArgsEnum::Ip(args), + } if transport == "tcp" => { + { + let mut state_ = state.borrow_mut(); + state_ + .borrow_mut::<NP>() + .check_net(&(&args.hostname, Some(args.port)))?; + } + let addr = resolve_addr(&args.hostname, args.port) + .await? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + let tcp_stream = TcpStream::connect(&addr).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + + let mut state_ = state.borrow_mut(); + let rid = state_ + .resource_table + .add(TcpStreamResource::new(tcp_stream.into_split())); + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) + } + #[cfg(unix)] + ConnectArgs { + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unix" => { + let address_path = Path::new(&args.path); + super::check_unstable2(&state, "Deno.connect"); + { + let mut state_ = state.borrow_mut(); + state_.borrow_mut::<NP>().check_read(address_path)?; + state_.borrow_mut::<NP>().check_write(address_path)?; + } + let path = args.path; + let unix_stream = net_unix::UnixStream::connect(Path::new(&path)).await?; + let local_addr = unix_stream.local_addr()?; + let remote_addr = unix_stream.peer_addr()?; + + let mut state_ = state.borrow_mut(); + let resource = UnixStreamResource::new(unix_stream.into_split()); + let rid = state_.resource_table.add(resource); + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Unix(net_unix::UnixAddr { + path: local_addr.as_pathname().and_then(net_unix::pathstring), + })), + remote_addr: Some(OpAddr::Unix(net_unix::UnixAddr { + path: remote_addr.as_pathname().and_then(net_unix::pathstring), + })), + }) + } + _ => Err(type_error("Wrong argument format!")), + } +} + +pub struct TcpListenerResource { + pub listener: AsyncRefCell<TcpListener>, + pub cancel: CancelHandle, +} + +impl Resource for TcpListenerResource { + fn name(&self) -> Cow<str> { + "tcpListener".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } +} + +struct UdpSocketResource { + socket: AsyncRefCell<UdpSocket>, + cancel: CancelHandle, +} + +impl Resource for UdpSocketResource { + fn name(&self) -> Cow<str> { + "udpSocket".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel() + } +} + +#[derive(Deserialize)] +struct IpListenArgs { + hostname: String, + port: u16, +} + +#[derive(Deserialize)] +#[serde(untagged)] +enum ArgsEnum { + Ip(IpListenArgs), + #[cfg(unix)] + Unix(net_unix::UnixListenArgs), +} + +#[derive(Deserialize)] +struct ListenArgs { + transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, +} + +fn listen_tcp( + state: &mut OpState, + addr: SocketAddr, +) -> Result<(u32, SocketAddr), AnyError> { + let std_listener = std::net::TcpListener::bind(&addr)?; + std_listener.set_nonblocking(true)?; + let listener = TcpListener::from_std(std_listener)?; + let local_addr = listener.local_addr()?; + let listener_resource = TcpListenerResource { + listener: AsyncRefCell::new(listener), + cancel: Default::default(), + }; + let rid = state.resource_table.add(listener_resource); + + Ok((rid, local_addr)) +} + +fn listen_udp( + state: &mut OpState, + addr: SocketAddr, +) -> Result<(u32, SocketAddr), AnyError> { + let std_socket = std::net::UdpSocket::bind(&addr)?; + std_socket.set_nonblocking(true)?; + let socket = UdpSocket::from_std(std_socket)?; + let local_addr = socket.local_addr()?; + let socket_resource = UdpSocketResource { + socket: AsyncRefCell::new(socket), + cancel: Default::default(), + }; + let rid = state.resource_table.add(socket_resource); + + Ok((rid, local_addr)) +} + +fn op_listen<NP>( + state: &mut OpState, + args: ListenArgs, + _: (), +) -> Result<OpConn, AnyError> +where + NP: NetPermissions + 'static, +{ + match args { + ListenArgs { + transport, + transport_args: ArgsEnum::Ip(args), + } => { + { + if transport == "udp" { + super::check_unstable(state, "Deno.listenDatagram"); + } + state + .borrow_mut::<NP>() + .check_net(&(&args.hostname, Some(args.port)))?; + } + let addr = resolve_addr_sync(&args.hostname, args.port)? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + let (rid, local_addr) = if transport == "tcp" { + listen_tcp(state, addr)? + } else { + listen_udp(state, addr)? + }; + debug!( + "New listener {} {}:{}", + rid, + local_addr.ip().to_string(), + local_addr.port() + ); + let ip_addr = IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + }; + Ok(OpConn { + rid, + local_addr: Some(match transport.as_str() { + "udp" => OpAddr::Udp(ip_addr), + "tcp" => OpAddr::Tcp(ip_addr), + // NOTE: This could be unreachable!() + other => return Err(bad_transport(other)), + }), + remote_addr: None, + }) + } + #[cfg(unix)] + ListenArgs { + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unix" || transport == "unixpacket" => { + let address_path = Path::new(&args.path); + { + if transport == "unix" { + super::check_unstable(state, "Deno.listen"); + } + if transport == "unixpacket" { + super::check_unstable(state, "Deno.listenDatagram"); + } + let permissions = state.borrow_mut::<NP>(); + permissions.check_read(address_path)?; + permissions.check_write(address_path)?; + } + let (rid, local_addr) = if transport == "unix" { + net_unix::listen_unix(state, address_path)? + } else { + net_unix::listen_unix_packet(state, address_path)? + }; + debug!( + "New listener {} {}", + rid, + local_addr.as_pathname().unwrap().display(), + ); + let unix_addr = net_unix::UnixAddr { + path: local_addr.as_pathname().and_then(net_unix::pathstring), + }; + + Ok(OpConn { + rid, + local_addr: Some(match transport.as_str() { + "unix" => OpAddr::Unix(unix_addr), + "unixpacket" => OpAddr::UnixPacket(unix_addr), + other => return Err(bad_transport(other)), + }), + remote_addr: None, + }) + } + #[cfg(unix)] + _ => Err(type_error("Wrong argument format!")), + } +} + +#[derive(Serialize, PartialEq, Debug)] +#[serde(untagged)] +enum DnsReturnRecord { + A(String), + Aaaa(String), + Aname(String), + Cname(String), + Mx { + preference: u16, + exchange: String, + }, + Ptr(String), + Srv { + priority: u16, + weight: u16, + port: u16, + target: String, + }, + Txt(Vec<String>), +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ResolveAddrArgs { + query: String, + record_type: RecordType, + options: Option<ResolveDnsOption>, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ResolveDnsOption { + name_server: Option<NameServer>, +} + +fn default_port() -> u16 { + 53 +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NameServer { + ip_addr: String, + #[serde(default = "default_port")] + port: u16, +} + +async fn op_dns_resolve<NP>( + state: Rc<RefCell<OpState>>, + args: ResolveAddrArgs, + _: (), +) -> Result<Vec<DnsReturnRecord>, AnyError> +where + NP: NetPermissions + 'static, +{ + let ResolveAddrArgs { + query, + record_type, + options, + } = args; + + let (config, opts) = if let Some(name_server) = + options.as_ref().and_then(|o| o.name_server.as_ref()) + { + let group = NameServerConfigGroup::from_ips_clear( + &[name_server.ip_addr.parse()?], + name_server.port, + true, + ); + ( + ResolverConfig::from_parts(None, vec![], group), + ResolverOpts::default(), + ) + } else { + system_conf::read_system_conf()? + }; + + { + let mut s = state.borrow_mut(); + let perm = s.borrow_mut::<NP>(); + + // Checks permission against the name servers which will be actually queried. + for ns in config.name_servers() { + let socker_addr = &ns.socket_addr; + let ip = socker_addr.ip().to_string(); + let port = socker_addr.port(); + perm.check_net(&(ip, Some(port)))?; + } + } + + let resolver = AsyncResolver::tokio(config, opts)?; + + let results = resolver + .lookup(query, record_type, Default::default()) + .await + .map_err(|e| generic_error(format!("{}", e)))? + .iter() + .filter_map(rdata_to_return_record(record_type)) + .collect(); + + Ok(results) +} + +fn rdata_to_return_record( + ty: RecordType, +) -> impl Fn(&RData) -> Option<DnsReturnRecord> { + use RecordType::*; + move |r: &RData| -> Option<DnsReturnRecord> { + match ty { + A => r.as_a().map(ToString::to_string).map(DnsReturnRecord::A), + AAAA => r + .as_aaaa() + .map(ToString::to_string) + .map(DnsReturnRecord::Aaaa), + ANAME => r + .as_aname() + .map(ToString::to_string) + .map(DnsReturnRecord::Aname), + CNAME => r + .as_cname() + .map(ToString::to_string) + .map(DnsReturnRecord::Cname), + MX => r.as_mx().map(|mx| DnsReturnRecord::Mx { + preference: mx.preference(), + exchange: mx.exchange().to_string(), + }), + PTR => r + .as_ptr() + .map(ToString::to_string) + .map(DnsReturnRecord::Ptr), + SRV => r.as_srv().map(|srv| DnsReturnRecord::Srv { + priority: srv.priority(), + weight: srv.weight(), + port: srv.port(), + target: srv.target().to_string(), + }), + TXT => r.as_txt().map(|txt| { + let texts: Vec<String> = txt + .iter() + .map(|bytes| { + // Tries to parse these bytes as Latin-1 + bytes.iter().map(|&b| b as char).collect::<String>() + }) + .collect(); + DnsReturnRecord::Txt(texts) + }), + // TODO(magurotuna): Other record types are not supported + _ => todo!(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::Ipv4Addr; + use std::net::Ipv6Addr; + use trust_dns_proto::rr::rdata::mx::MX; + use trust_dns_proto::rr::rdata::srv::SRV; + use trust_dns_proto::rr::rdata::txt::TXT; + use trust_dns_proto::rr::record_data::RData; + use trust_dns_proto::rr::Name; + + #[test] + fn rdata_to_return_record_a() { + let func = rdata_to_return_record(RecordType::A); + let rdata = RData::A(Ipv4Addr::new(127, 0, 0, 1)); + assert_eq!( + func(&rdata), + Some(DnsReturnRecord::A("127.0.0.1".to_string())) + ); + } + + #[test] + fn rdata_to_return_record_aaaa() { + let func = rdata_to_return_record(RecordType::AAAA); + let rdata = RData::AAAA(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)); + assert_eq!(func(&rdata), Some(DnsReturnRecord::Aaaa("::1".to_string()))); + } + + #[test] + fn rdata_to_return_record_aname() { + let func = rdata_to_return_record(RecordType::ANAME); + let rdata = RData::ANAME(Name::new()); + assert_eq!(func(&rdata), Some(DnsReturnRecord::Aname("".to_string()))); + } + + #[test] + fn rdata_to_return_record_cname() { + let func = rdata_to_return_record(RecordType::CNAME); + let rdata = RData::CNAME(Name::new()); + assert_eq!(func(&rdata), Some(DnsReturnRecord::Cname("".to_string()))); + } + + #[test] + fn rdata_to_return_record_mx() { + let func = rdata_to_return_record(RecordType::MX); + let rdata = RData::MX(MX::new(10, Name::new())); + assert_eq!( + func(&rdata), + Some(DnsReturnRecord::Mx { + preference: 10, + exchange: "".to_string() + }) + ); + } + + #[test] + fn rdata_to_return_record_ptr() { + let func = rdata_to_return_record(RecordType::PTR); + let rdata = RData::PTR(Name::new()); + assert_eq!(func(&rdata), Some(DnsReturnRecord::Ptr("".to_string()))); + } + + #[test] + fn rdata_to_return_record_srv() { + let func = rdata_to_return_record(RecordType::SRV); + let rdata = RData::SRV(SRV::new(1, 2, 3, Name::new())); + assert_eq!( + func(&rdata), + Some(DnsReturnRecord::Srv { + priority: 1, + weight: 2, + port: 3, + target: "".to_string() + }) + ); + } + + #[test] + fn rdata_to_return_record_txt() { + let func = rdata_to_return_record(RecordType::TXT); + let rdata = RData::TXT(TXT::from_bytes(vec![ + "foo".as_bytes(), + "bar".as_bytes(), + &[0xa3], // "£" in Latin-1 + &[0xe3, 0x81, 0x82], // "あ" in UTF-8 + ])); + assert_eq!( + func(&rdata), + Some(DnsReturnRecord::Txt(vec![ + "foo".to_string(), + "bar".to_string(), + "£".to_string(), + "ã\u{81}\u{82}".to_string(), + ])) + ); + } +} diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs new file mode 100644 index 000000000..14a135d7d --- /dev/null +++ b/ext/net/ops_tls.rs @@ -0,0 +1,1061 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use crate::io::TcpStreamResource; +use crate::io::TlsStreamResource; +use crate::ops::IpAddr; +use crate::ops::OpAddr; +use crate::ops::OpConn; +use crate::resolve_addr::resolve_addr; +use crate::resolve_addr::resolve_addr_sync; +use crate::DefaultTlsOptions; +use crate::NetPermissions; +use crate::UnsafelyIgnoreCertificateErrors; +use deno_core::error::bad_resource; +use deno_core::error::bad_resource_id; +use deno_core::error::custom_error; +use deno_core::error::generic_error; +use deno_core::error::invalid_hostname; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::futures::future::poll_fn; +use deno_core::futures::ready; +use deno_core::futures::task::noop_waker_ref; +use deno_core::futures::task::AtomicWaker; +use deno_core::futures::task::Context; +use deno_core::futures::task::Poll; +use deno_core::futures::task::RawWaker; +use deno_core::futures::task::RawWakerVTable; +use deno_core::futures::task::Waker; +use deno_core::op_async; +use deno_core::op_sync; +use deno_core::parking_lot::Mutex; +use deno_core::AsyncRefCell; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::OpPair; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_tls::create_client_config; +use deno_tls::rustls::internal::pemfile::certs; +use deno_tls::rustls::internal::pemfile::pkcs8_private_keys; +use deno_tls::rustls::internal::pemfile::rsa_private_keys; +use deno_tls::rustls::Certificate; +use deno_tls::rustls::ClientConfig; +use deno_tls::rustls::ClientSession; +use deno_tls::rustls::NoClientAuth; +use deno_tls::rustls::PrivateKey; +use deno_tls::rustls::ServerConfig; +use deno_tls::rustls::ServerSession; +use deno_tls::rustls::Session; +use deno_tls::webpki::DNSNameRef; +use io::Error; +use io::Read; +use io::Write; +use serde::Deserialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::convert::From; +use std::fs::File; +use std::io; +use std::io::BufRead; +use std::io::BufReader; +use std::io::ErrorKind; +use std::ops::Deref; +use std::ops::DerefMut; +use std::path::Path; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::Arc; +use std::sync::Weak; +use tokio::io::AsyncRead; +use tokio::io::AsyncWrite; +use tokio::io::ReadBuf; +use tokio::net::TcpListener; +use tokio::net::TcpStream; +use tokio::task::spawn_local; + +#[derive(Debug)] +enum TlsSession { + Client(ClientSession), + Server(ServerSession), +} + +impl Deref for TlsSession { + type Target = dyn Session; + + fn deref(&self) -> &Self::Target { + match self { + TlsSession::Client(client_session) => client_session, + TlsSession::Server(server_session) => server_session, + } + } +} + +impl DerefMut for TlsSession { + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + TlsSession::Client(client_session) => client_session, + TlsSession::Server(server_session) => server_session, + } + } +} + +impl From<ClientSession> for TlsSession { + fn from(client_session: ClientSession) -> Self { + TlsSession::Client(client_session) + } +} + +impl From<ServerSession> for TlsSession { + fn from(server_session: ServerSession) -> Self { + TlsSession::Server(server_session) + } +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +enum Flow { + Read, + Write, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +enum State { + StreamOpen, + StreamClosed, + TlsClosing, + TlsClosed, + TcpClosed, +} + +#[derive(Debug)] +pub struct TlsStream(Option<TlsStreamInner>); + +impl TlsStream { + fn new(tcp: TcpStream, tls: TlsSession) -> Self { + let inner = TlsStreamInner { + tcp, + tls, + rd_state: State::StreamOpen, + wr_state: State::StreamOpen, + }; + Self(Some(inner)) + } + + pub fn new_client_side( + tcp: TcpStream, + tls_config: &Arc<ClientConfig>, + hostname: DNSNameRef, + ) -> Self { + let tls = TlsSession::Client(ClientSession::new(tls_config, hostname)); + Self::new(tcp, tls) + } + + pub fn new_server_side( + tcp: TcpStream, + tls_config: &Arc<ServerConfig>, + ) -> Self { + let tls = TlsSession::Server(ServerSession::new(tls_config)); + Self::new(tcp, tls) + } + + pub async fn handshake(&mut self) -> io::Result<()> { + poll_fn(|cx| self.inner_mut().poll_io(cx, Flow::Write)).await + } + + fn into_split(self) -> (ReadHalf, WriteHalf) { + let shared = Shared::new(self); + let rd = ReadHalf { + shared: shared.clone(), + }; + let wr = WriteHalf { shared }; + (rd, wr) + } + + /// Tokio-rustls compatibility: returns a reference to the underlying TCP + /// stream, and a reference to the Rustls `Session` object. + pub fn get_ref(&self) -> (&TcpStream, &dyn Session) { + let inner = self.0.as_ref().unwrap(); + (&inner.tcp, &*inner.tls) + } + + fn inner_mut(&mut self) -> &mut TlsStreamInner { + self.0.as_mut().unwrap() + } +} + +impl AsyncRead for TlsStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + self.inner_mut().poll_read(cx, buf) + } +} + +impl AsyncWrite for TlsStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.inner_mut().poll_write(cx, buf) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<()>> { + self.inner_mut().poll_io(cx, Flow::Write) + // The underlying TCP stream does not need to be flushed. + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<()>> { + self.inner_mut().poll_shutdown(cx) + } +} + +impl Drop for TlsStream { + fn drop(&mut self) { + let mut inner = self.0.take().unwrap(); + + let mut cx = Context::from_waker(noop_waker_ref()); + let use_linger_task = inner.poll_close(&mut cx).is_pending(); + + if use_linger_task { + spawn_local(poll_fn(move |cx| inner.poll_close(cx))); + } else if cfg!(debug_assertions) { + spawn_local(async {}); // Spawn dummy task to detect missing LocalSet. + } + } +} + +#[derive(Debug)] +pub struct TlsStreamInner { + tls: TlsSession, + tcp: TcpStream, + rd_state: State, + wr_state: State, +} + +impl TlsStreamInner { + fn poll_io( + &mut self, + cx: &mut Context<'_>, + flow: Flow, + ) -> Poll<io::Result<()>> { + loop { + let wr_ready = loop { + match self.wr_state { + _ if self.tls.is_handshaking() && !self.tls.wants_write() => { + break true; + } + _ if self.tls.is_handshaking() => {} + State::StreamOpen if !self.tls.wants_write() => break true, + State::StreamClosed => { + // Rustls will enqueue the 'CloseNotify' alert and send it after + // flusing the data that is already in the queue. + self.tls.send_close_notify(); + self.wr_state = State::TlsClosing; + continue; + } + State::TlsClosing if !self.tls.wants_write() => { + self.wr_state = State::TlsClosed; + continue; + } + // If a 'CloseNotify' alert sent by the remote end has been received, + // shut down the underlying TCP socket. Otherwise, consider polling + // done for the moment. + State::TlsClosed if self.rd_state < State::TlsClosed => break true, + State::TlsClosed + if Pin::new(&mut self.tcp).poll_shutdown(cx)?.is_pending() => + { + break false; + } + State::TlsClosed => { + self.wr_state = State::TcpClosed; + continue; + } + State::TcpClosed => break true, + _ => {} + } + + // Poll whether there is space in the socket send buffer so we can flush + // the remaining outgoing ciphertext. + if self.tcp.poll_write_ready(cx)?.is_pending() { + break false; + } + + // Write ciphertext to the TCP socket. + let mut wrapped_tcp = ImplementWriteTrait(&mut self.tcp); + match self.tls.write_tls(&mut wrapped_tcp) { + Ok(0) => unreachable!(), + Ok(_) => {} + Err(err) if err.kind() == ErrorKind::WouldBlock => {} + Err(err) => return Poll::Ready(Err(err)), + } + }; + + let rd_ready = loop { + match self.rd_state { + State::TcpClosed if self.tls.is_handshaking() => { + let err = Error::new(ErrorKind::UnexpectedEof, "tls handshake eof"); + return Poll::Ready(Err(err)); + } + _ if self.tls.is_handshaking() && !self.tls.wants_read() => { + break true; + } + _ if self.tls.is_handshaking() => {} + State::StreamOpen if !self.tls.wants_read() => break true, + State::StreamOpen => {} + State::StreamClosed if !self.tls.wants_read() => { + // Rustls has more incoming cleartext buffered up, but the TLS + // session is closing so this data will never be processed by the + // application layer. Just like what would happen if this were a raw + // TCP stream, don't gracefully end the TLS session, but abort it. + return Poll::Ready(Err(Error::from(ErrorKind::ConnectionReset))); + } + State::StreamClosed => {} + State::TlsClosed if self.wr_state == State::TcpClosed => { + // Wait for the remote end to gracefully close the TCP connection. + // TODO(piscisaureus): this is unnecessary; remove when stable. + } + _ => break true, + } + + if self.rd_state < State::TlsClosed { + // Do a zero-length plaintext read so we can detect the arrival of + // 'CloseNotify' messages, even if only the write half is open. + // Actually reading data from the socket is done in `poll_read()`. + match self.tls.read(&mut []) { + Ok(0) => {} + Err(err) if err.kind() == ErrorKind::ConnectionAborted => { + // `Session::read()` returns `ConnectionAborted` when a + // 'CloseNotify' alert has been received, which indicates that + // the remote peer wants to gracefully end the TLS session. + self.rd_state = State::TlsClosed; + continue; + } + Err(err) => return Poll::Ready(Err(err)), + _ => unreachable!(), + } + } + + // Poll whether more ciphertext is available in the socket receive + // buffer. + if self.tcp.poll_read_ready(cx)?.is_pending() { + break false; + } + + // Receive ciphertext from the socket. + let mut wrapped_tcp = ImplementReadTrait(&mut self.tcp); + match self.tls.read_tls(&mut wrapped_tcp) { + Ok(0) => self.rd_state = State::TcpClosed, + Ok(_) => self + .tls + .process_new_packets() + .map_err(|err| Error::new(ErrorKind::InvalidData, err))?, + Err(err) if err.kind() == ErrorKind::WouldBlock => {} + Err(err) => return Poll::Ready(Err(err)), + } + }; + + if wr_ready { + if self.rd_state >= State::TlsClosed + && self.wr_state >= State::TlsClosed + && self.wr_state < State::TcpClosed + { + continue; + } + if self.tls.wants_write() { + continue; + } + } + + let io_ready = match flow { + _ if self.tls.is_handshaking() => false, + Flow::Read => rd_ready, + Flow::Write => wr_ready, + }; + return match io_ready { + false => Poll::Pending, + true => Poll::Ready(Ok(())), + }; + } + } + + fn poll_read( + &mut self, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + ready!(self.poll_io(cx, Flow::Read))?; + + if self.rd_state == State::StreamOpen { + let buf_slice = + unsafe { &mut *(buf.unfilled_mut() as *mut [_] as *mut [u8]) }; + let bytes_read = self.tls.read(buf_slice)?; + assert_ne!(bytes_read, 0); + unsafe { buf.assume_init(bytes_read) }; + buf.advance(bytes_read); + } + + Poll::Ready(Ok(())) + } + + fn poll_write( + &mut self, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + if buf.is_empty() { + // Tokio-rustls compatibility: a zero byte write always succeeds. + Poll::Ready(Ok(0)) + } else if self.wr_state == State::StreamOpen { + // Flush Rustls' ciphertext send queue. + ready!(self.poll_io(cx, Flow::Write))?; + + // Copy data from `buf` to the Rustls cleartext send queue. + let bytes_written = self.tls.write(buf)?; + assert_ne!(bytes_written, 0); + + // Try to flush as much ciphertext as possible. However, since we just + // handed off at least some bytes to rustls, so we can't return + // `Poll::Pending()` any more: this would tell the caller that it should + // try to send those bytes again. + let _ = self.poll_io(cx, Flow::Write)?; + + Poll::Ready(Ok(bytes_written)) + } else { + // Return error if stream has been shut down for writing. + Poll::Ready(Err(ErrorKind::BrokenPipe.into())) + } + } + + fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + if self.wr_state == State::StreamOpen { + self.wr_state = State::StreamClosed; + } + + ready!(self.poll_io(cx, Flow::Write))?; + + // At minimum, a TLS 'CloseNotify' alert should have been sent. + assert!(self.wr_state >= State::TlsClosed); + // If we received a TLS 'CloseNotify' alert from the remote end + // already, the TCP socket should be shut down at this point. + assert!( + self.rd_state < State::TlsClosed || self.wr_state == State::TcpClosed + ); + + Poll::Ready(Ok(())) + } + + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + if self.rd_state == State::StreamOpen { + self.rd_state = State::StreamClosed; + } + + // Send TLS 'CloseNotify' alert. + ready!(self.poll_shutdown(cx))?; + // Wait for 'CloseNotify', shut down TCP stream, wait for TCP FIN packet. + ready!(self.poll_io(cx, Flow::Read))?; + + assert_eq!(self.rd_state, State::TcpClosed); + assert_eq!(self.wr_state, State::TcpClosed); + + Poll::Ready(Ok(())) + } +} + +#[derive(Debug)] +pub struct ReadHalf { + shared: Arc<Shared>, +} + +impl ReadHalf { + pub fn reunite(self, wr: WriteHalf) -> TlsStream { + assert!(Arc::ptr_eq(&self.shared, &wr.shared)); + drop(wr); // Drop `wr`, so only one strong reference to `shared` remains. + + Arc::try_unwrap(self.shared) + .unwrap_or_else(|_| panic!("Arc::<Shared>::try_unwrap() failed")) + .tls_stream + .into_inner() + } +} + +impl AsyncRead for ReadHalf { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + self + .shared + .poll_with_shared_waker(cx, Flow::Read, move |tls, cx| { + tls.poll_read(cx, buf) + }) + } +} + +#[derive(Debug)] +pub struct WriteHalf { + shared: Arc<Shared>, +} + +impl AsyncWrite for WriteHalf { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self + .shared + .poll_with_shared_waker(cx, Flow::Write, move |tls, cx| { + tls.poll_write(cx, buf) + }) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<()>> { + self + .shared + .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_flush(cx)) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<()>> { + self + .shared + .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_shutdown(cx)) + } +} + +#[derive(Debug)] +struct Shared { + tls_stream: Mutex<TlsStream>, + rd_waker: AtomicWaker, + wr_waker: AtomicWaker, +} + +impl Shared { + fn new(tls_stream: TlsStream) -> Arc<Self> { + let self_ = Self { + tls_stream: Mutex::new(tls_stream), + rd_waker: AtomicWaker::new(), + wr_waker: AtomicWaker::new(), + }; + Arc::new(self_) + } + + fn poll_with_shared_waker<R>( + self: &Arc<Self>, + cx: &mut Context<'_>, + flow: Flow, + mut f: impl FnMut(Pin<&mut TlsStream>, &mut Context<'_>) -> R, + ) -> R { + match flow { + Flow::Read => self.rd_waker.register(cx.waker()), + Flow::Write => self.wr_waker.register(cx.waker()), + } + + let shared_waker = self.new_shared_waker(); + let mut cx = Context::from_waker(&shared_waker); + + let mut tls_stream = self.tls_stream.lock(); + f(Pin::new(&mut tls_stream), &mut cx) + } + + const SHARED_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( + Self::clone_shared_waker, + Self::wake_shared_waker, + Self::wake_shared_waker_by_ref, + Self::drop_shared_waker, + ); + + fn new_shared_waker(self: &Arc<Self>) -> Waker { + let self_weak = Arc::downgrade(self); + let self_ptr = self_weak.into_raw() as *const (); + let raw_waker = RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE); + unsafe { Waker::from_raw(raw_waker) } + } + + fn clone_shared_waker(self_ptr: *const ()) -> RawWaker { + let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) }; + let ptr1 = self_weak.clone().into_raw(); + let ptr2 = self_weak.into_raw(); + assert!(ptr1 == ptr2); + RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE) + } + + fn wake_shared_waker(self_ptr: *const ()) { + Self::wake_shared_waker_by_ref(self_ptr); + Self::drop_shared_waker(self_ptr); + } + + fn wake_shared_waker_by_ref(self_ptr: *const ()) { + let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) }; + if let Some(self_arc) = Weak::upgrade(&self_weak) { + self_arc.rd_waker.wake(); + self_arc.wr_waker.wake(); + } + self_weak.into_raw(); + } + + fn drop_shared_waker(self_ptr: *const ()) { + let _ = unsafe { Weak::from_raw(self_ptr as *const Self) }; + } +} + +struct ImplementReadTrait<'a, T>(&'a mut T); + +impl Read for ImplementReadTrait<'_, TcpStream> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.0.try_read(buf) + } +} + +struct ImplementWriteTrait<'a, T>(&'a mut T); + +impl Write for ImplementWriteTrait<'_, TcpStream> { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.0.try_write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +pub fn init<P: NetPermissions + 'static>() -> Vec<OpPair> { + vec![ + ("op_start_tls", op_async(op_start_tls::<P>)), + ("op_connect_tls", op_async(op_connect_tls::<P>)), + ("op_listen_tls", op_sync(op_listen_tls::<P>)), + ("op_accept_tls", op_async(op_accept_tls)), + ] +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ConnectTlsArgs { + transport: String, + hostname: String, + port: u16, + cert_file: Option<String>, + cert_chain: Option<String>, + private_key: Option<String>, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct StartTlsArgs { + rid: ResourceId, + cert_file: Option<String>, + hostname: String, +} + +async fn op_start_tls<NP>( + state: Rc<RefCell<OpState>>, + args: StartTlsArgs, + _: (), +) -> Result<OpConn, AnyError> +where + NP: NetPermissions + 'static, +{ + let rid = args.rid; + let hostname = match &*args.hostname { + "" => "localhost", + n => n, + }; + let cert_file = args.cert_file.as_deref(); + { + super::check_unstable2(&state, "Deno.startTls"); + let mut s = state.borrow_mut(); + let permissions = s.borrow_mut::<NP>(); + permissions.check_net(&(hostname, Some(0)))?; + if let Some(path) = cert_file { + permissions.check_read(Path::new(path))?; + } + } + + let ca_data = match cert_file { + Some(path) => { + let mut buf = Vec::new(); + File::open(path)?.read_to_end(&mut buf)?; + Some(buf) + } + _ => None, + }; + + let hostname_dns = DNSNameRef::try_from_ascii_str(hostname) + .map_err(|_| invalid_hostname(hostname))?; + + let unsafely_ignore_certificate_errors = state + .borrow() + .borrow::<UnsafelyIgnoreCertificateErrors>() + .0 + .clone(); + + // TODO(@justinmchase): Ideally the certificate store is created once + // and not cloned. The store should be wrapped in Arc<T> to reduce + // copying memory unnecessarily. + let root_cert_store = state + .borrow() + .borrow::<DefaultTlsOptions>() + .root_cert_store + .clone(); + let resource_rc = state + .borrow_mut() + .resource_table + .take::<TcpStreamResource>(rid) + .ok_or_else(bad_resource_id)?; + let resource = Rc::try_unwrap(resource_rc) + .expect("Only a single use of this resource should happen"); + let (read_half, write_half) = resource.into_inner(); + let tcp_stream = read_half.reunite(write_half)?; + + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + + let tls_config = Arc::new(create_client_config( + root_cert_store, + ca_data, + unsafely_ignore_certificate_errors, + )?); + let tls_stream = + TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns); + + let rid = { + let mut state_ = state.borrow_mut(); + state_ + .resource_table + .add(TlsStreamResource::new(tls_stream.into_split())) + }; + + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) +} + +async fn op_connect_tls<NP>( + state: Rc<RefCell<OpState>>, + args: ConnectTlsArgs, + _: (), +) -> Result<OpConn, AnyError> +where + NP: NetPermissions + 'static, +{ + assert_eq!(args.transport, "tcp"); + let hostname = match &*args.hostname { + "" => "localhost", + n => n, + }; + let port = args.port; + let cert_file = args.cert_file.as_deref(); + let unsafely_ignore_certificate_errors = state + .borrow() + .borrow::<UnsafelyIgnoreCertificateErrors>() + .0 + .clone(); + + if args.cert_chain.is_some() { + super::check_unstable2(&state, "ConnectTlsOptions.certChain"); + } + if args.private_key.is_some() { + super::check_unstable2(&state, "ConnectTlsOptions.privateKey"); + } + + { + let mut s = state.borrow_mut(); + let permissions = s.borrow_mut::<NP>(); + permissions.check_net(&(hostname, Some(port)))?; + if let Some(path) = cert_file { + permissions.check_read(Path::new(path))?; + } + } + + let ca_data = match cert_file { + Some(path) => { + let mut buf = Vec::new(); + File::open(path)?.read_to_end(&mut buf)?; + Some(buf) + } + _ => None, + }; + + let root_cert_store = state + .borrow() + .borrow::<DefaultTlsOptions>() + .root_cert_store + .clone(); + let hostname_dns = DNSNameRef::try_from_ascii_str(hostname) + .map_err(|_| invalid_hostname(hostname))?; + + let connect_addr = resolve_addr(hostname, port) + .await? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + let tcp_stream = TcpStream::connect(connect_addr).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + let mut tls_config = create_client_config( + root_cert_store, + ca_data, + unsafely_ignore_certificate_errors, + )?; + + if args.cert_chain.is_some() || args.private_key.is_some() { + let cert_chain = args + .cert_chain + .ok_or_else(|| type_error("No certificate chain provided"))?; + let private_key = args + .private_key + .ok_or_else(|| type_error("No private key provided"))?; + + // The `remove` is safe because load_private_keys checks that there is at least one key. + let private_key = load_private_keys(private_key.as_bytes())?.remove(0); + + tls_config.set_single_client_cert( + load_certs(&mut cert_chain.as_bytes())?, + private_key, + )?; + } + + let tls_config = Arc::new(tls_config); + + let tls_stream = + TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns); + + let rid = { + let mut state_ = state.borrow_mut(); + state_ + .resource_table + .add(TlsStreamResource::new(tls_stream.into_split())) + }; + + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) +} + +fn load_certs(reader: &mut dyn BufRead) -> Result<Vec<Certificate>, AnyError> { + let certs = certs(reader) + .map_err(|_| custom_error("InvalidData", "Unable to decode certificate"))?; + + if certs.is_empty() { + let e = custom_error("InvalidData", "No certificates found in cert file"); + return Err(e); + } + + Ok(certs) +} + +fn load_certs_from_file(path: &str) -> Result<Vec<Certificate>, AnyError> { + let cert_file = File::open(path)?; + let reader = &mut BufReader::new(cert_file); + load_certs(reader) +} + +fn key_decode_err() -> AnyError { + custom_error("InvalidData", "Unable to decode key") +} + +fn key_not_found_err() -> AnyError { + custom_error("InvalidData", "No keys found in key file") +} + +/// Starts with -----BEGIN RSA PRIVATE KEY----- +fn load_rsa_keys(mut bytes: &[u8]) -> Result<Vec<PrivateKey>, AnyError> { + let keys = rsa_private_keys(&mut bytes).map_err(|_| key_decode_err())?; + Ok(keys) +} + +/// Starts with -----BEGIN PRIVATE KEY----- +fn load_pkcs8_keys(mut bytes: &[u8]) -> Result<Vec<PrivateKey>, AnyError> { + let keys = pkcs8_private_keys(&mut bytes).map_err(|_| key_decode_err())?; + Ok(keys) +} + +fn load_private_keys(bytes: &[u8]) -> Result<Vec<PrivateKey>, AnyError> { + let mut keys = load_rsa_keys(bytes)?; + + if keys.is_empty() { + keys = load_pkcs8_keys(bytes)?; + } + + if keys.is_empty() { + return Err(key_not_found_err()); + } + + Ok(keys) +} + +fn load_private_keys_from_file( + path: &str, +) -> Result<Vec<PrivateKey>, AnyError> { + let key_bytes = std::fs::read(path)?; + load_private_keys(&key_bytes) +} + +pub struct TlsListenerResource { + tcp_listener: AsyncRefCell<TcpListener>, + tls_config: Arc<ServerConfig>, + cancel_handle: CancelHandle, +} + +impl Resource for TlsListenerResource { + fn name(&self) -> Cow<str> { + "tlsListener".into() + } + + fn close(self: Rc<Self>) { + self.cancel_handle.cancel(); + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ListenTlsArgs { + transport: String, + hostname: String, + port: u16, + cert_file: String, + key_file: String, + alpn_protocols: Option<Vec<String>>, +} + +fn op_listen_tls<NP>( + state: &mut OpState, + args: ListenTlsArgs, + _: (), +) -> Result<OpConn, AnyError> +where + NP: NetPermissions + 'static, +{ + assert_eq!(args.transport, "tcp"); + let hostname = &*args.hostname; + let port = args.port; + let cert_file = &*args.cert_file; + let key_file = &*args.key_file; + + { + let permissions = state.borrow_mut::<NP>(); + permissions.check_net(&(hostname, Some(port)))?; + permissions.check_read(Path::new(cert_file))?; + permissions.check_read(Path::new(key_file))?; + } + + let mut tls_config = ServerConfig::new(NoClientAuth::new()); + if let Some(alpn_protocols) = args.alpn_protocols { + super::check_unstable(state, "Deno.listenTls#alpn_protocols"); + tls_config.alpn_protocols = + alpn_protocols.into_iter().map(|s| s.into_bytes()).collect(); + } + tls_config + .set_single_cert( + load_certs_from_file(cert_file)?, + load_private_keys_from_file(key_file)?.remove(0), + ) + .expect("invalid key or certificate"); + + let bind_addr = resolve_addr_sync(hostname, port)? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + let std_listener = std::net::TcpListener::bind(bind_addr)?; + std_listener.set_nonblocking(true)?; + let tcp_listener = TcpListener::from_std(std_listener)?; + let local_addr = tcp_listener.local_addr()?; + + let tls_listener_resource = TlsListenerResource { + tcp_listener: AsyncRefCell::new(tcp_listener), + tls_config: Arc::new(tls_config), + cancel_handle: Default::default(), + }; + + let rid = state.resource_table.add(tls_listener_resource); + + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: None, + }) +} + +async fn op_accept_tls( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + _: (), +) -> Result<OpConn, AnyError> { + let resource = state + .borrow() + .resource_table + .get::<TlsListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + + let cancel_handle = RcRef::map(&resource, |r| &r.cancel_handle); + let tcp_listener = RcRef::map(&resource, |r| &r.tcp_listener) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?; + + let (tcp_stream, remote_addr) = + match tcp_listener.accept().try_or_cancel(&cancel_handle).await { + Ok(tuple) => tuple, + Err(err) if err.kind() == ErrorKind::Interrupted => { + // FIXME(bartlomieju): compatibility with current JS implementation. + return Err(bad_resource("Listener has been closed")); + } + Err(err) => return Err(err.into()), + }; + + let local_addr = tcp_stream.local_addr()?; + + let tls_stream = TlsStream::new_server_side(tcp_stream, &resource.tls_config); + + let rid = { + let mut state_ = state.borrow_mut(); + state_ + .resource_table + .add(TlsStreamResource::new(tls_stream.into_split())) + }; + + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) +} diff --git a/ext/net/ops_unix.rs b/ext/net/ops_unix.rs new file mode 100644 index 000000000..9dfcc231e --- /dev/null +++ b/ext/net/ops_unix.rs @@ -0,0 +1,180 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use crate::io::UnixStreamResource; +use crate::ops::AcceptArgs; +use crate::ops::OpAddr; +use crate::ops::OpConn; +use crate::ops::OpPacket; +use crate::ops::ReceiveArgs; +use deno_core::error::bad_resource; +use deno_core::error::custom_error; +use deno_core::error::null_opbuf; +use deno_core::error::AnyError; +use deno_core::AsyncRefCell; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ZeroCopyBuf; +use serde::Deserialize; +use serde::Serialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::fs::remove_file; +use std::path::Path; +use std::rc::Rc; +use tokio::net::UnixDatagram; +use tokio::net::UnixListener; +pub use tokio::net::UnixStream; + +/// A utility function to map OsStrings to Strings +pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> { + s.into_string().map_err(|s| { + let message = format!("File name or path {:?} is not valid UTF-8", s); + custom_error("InvalidData", message) + }) +} + +struct UnixListenerResource { + listener: AsyncRefCell<UnixListener>, + cancel: CancelHandle, +} + +impl Resource for UnixListenerResource { + fn name(&self) -> Cow<str> { + "unixListener".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } +} + +pub struct UnixDatagramResource { + pub socket: AsyncRefCell<UnixDatagram>, + pub cancel: CancelHandle, +} + +impl Resource for UnixDatagramResource { + fn name(&self) -> Cow<str> { + "unixDatagram".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } +} + +#[derive(Serialize)] +pub struct UnixAddr { + pub path: Option<String>, +} + +#[derive(Deserialize)] +pub struct UnixListenArgs { + pub path: String, +} + +pub(crate) async fn accept_unix( + state: Rc<RefCell<OpState>>, + args: AcceptArgs, + _: (), +) -> Result<OpConn, AnyError> { + let rid = args.rid; + + let resource = state + .borrow() + .resource_table + .get::<UnixListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + let listener = RcRef::map(&resource, |r| &r.listener) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Listener already in use"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (unix_stream, _socket_addr) = + listener.accept().try_or_cancel(cancel).await?; + + let local_addr = unix_stream.local_addr()?; + let remote_addr = unix_stream.peer_addr()?; + let resource = UnixStreamResource::new(unix_stream.into_split()); + let mut state = state.borrow_mut(); + let rid = state.resource_table.add(resource); + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Unix(UnixAddr { + path: local_addr.as_pathname().and_then(pathstring), + })), + remote_addr: Some(OpAddr::Unix(UnixAddr { + path: remote_addr.as_pathname().and_then(pathstring), + })), + }) +} + +pub(crate) async fn receive_unix_packet( + state: Rc<RefCell<OpState>>, + args: ReceiveArgs, + buf: Option<ZeroCopyBuf>, +) -> Result<OpPacket, AnyError> { + let mut buf = buf.ok_or_else(null_opbuf)?; + + let rid = args.rid; + + let resource = state + .borrow() + .resource_table + .get::<UnixDatagramResource>(rid) + .ok_or_else(|| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (size, remote_addr) = + socket.recv_from(&mut buf).try_or_cancel(cancel).await?; + Ok(OpPacket { + size, + remote_addr: OpAddr::UnixPacket(UnixAddr { + path: remote_addr.as_pathname().and_then(pathstring), + }), + }) +} + +pub fn listen_unix( + state: &mut OpState, + addr: &Path, +) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> { + if addr.exists() { + remove_file(&addr).unwrap(); + } + let listener = UnixListener::bind(&addr)?; + let local_addr = listener.local_addr()?; + let listener_resource = UnixListenerResource { + listener: AsyncRefCell::new(listener), + cancel: Default::default(), + }; + let rid = state.resource_table.add(listener_resource); + + Ok((rid, local_addr)) +} + +pub fn listen_unix_packet( + state: &mut OpState, + addr: &Path, +) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> { + if addr.exists() { + remove_file(&addr).unwrap(); + } + let socket = UnixDatagram::bind(&addr)?; + let local_addr = socket.local_addr()?; + let datagram_resource = UnixDatagramResource { + socket: AsyncRefCell::new(socket), + cancel: Default::default(), + }; + let rid = state.resource_table.add(datagram_resource); + + Ok((rid, local_addr)) +} + +pub fn pathstring(pathname: &Path) -> Option<String> { + into_string(pathname.into()).ok() +} diff --git a/ext/net/resolve_addr.rs b/ext/net/resolve_addr.rs new file mode 100644 index 000000000..ebf1374d1 --- /dev/null +++ b/ext/net/resolve_addr.rs @@ -0,0 +1,156 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use deno_core::error::AnyError; +use std::net::SocketAddr; +use std::net::ToSocketAddrs; +use tokio::net::lookup_host; + +/// Resolve network address *asynchronously*. +pub async fn resolve_addr( + hostname: &str, + port: u16, +) -> Result<impl Iterator<Item = SocketAddr> + '_, AnyError> { + let addr_port_pair = make_addr_port_pair(hostname, port); + let result = lookup_host(addr_port_pair).await?; + Ok(result) +} + +/// Resolve network address *synchronously*. +pub fn resolve_addr_sync( + hostname: &str, + port: u16, +) -> Result<impl Iterator<Item = SocketAddr>, AnyError> { + let addr_port_pair = make_addr_port_pair(hostname, port); + let result = addr_port_pair.to_socket_addrs()?; + Ok(result) +} + +fn make_addr_port_pair(hostname: &str, port: u16) -> (&str, u16) { + // Default to localhost if given just the port. Example: ":80" + if hostname.is_empty() { + return ("0.0.0.0", port); + } + + // If this looks like an ipv6 IP address. Example: "[2001:db8::1]" + // Then we remove the brackets. + let addr = hostname.trim_start_matches('[').trim_end_matches(']'); + (addr, port) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::Ipv4Addr; + use std::net::Ipv6Addr; + use std::net::SocketAddrV4; + use std::net::SocketAddrV6; + + #[tokio::test] + async fn resolve_addr1() { + let expected = vec![SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + 80, + ))]; + let actual = resolve_addr("127.0.0.1", 80) + .await + .unwrap() + .collect::<Vec<_>>(); + assert_eq!(actual, expected); + } + + #[tokio::test] + async fn resolve_addr2() { + let expected = vec![SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(0, 0, 0, 0), + 80, + ))]; + let actual = resolve_addr("", 80).await.unwrap().collect::<Vec<_>>(); + assert_eq!(actual, expected); + } + + #[tokio::test] + async fn resolve_addr3() { + let expected = vec![SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(192, 0, 2, 1), + 25, + ))]; + let actual = resolve_addr("192.0.2.1", 25) + .await + .unwrap() + .collect::<Vec<_>>(); + assert_eq!(actual, expected); + } + + #[tokio::test] + async fn resolve_addr_ipv6() { + let expected = vec![SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1), + 8080, + 0, + 0, + ))]; + let actual = resolve_addr("[2001:db8::1]", 8080) + .await + .unwrap() + .collect::<Vec<_>>(); + assert_eq!(actual, expected); + } + + #[tokio::test] + async fn resolve_addr_err() { + assert!(resolve_addr("INVALID ADDR", 1234).await.is_err()); + } + + #[test] + fn resolve_addr_sync1() { + let expected = vec![SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + 80, + ))]; + let actual = resolve_addr_sync("127.0.0.1", 80) + .unwrap() + .collect::<Vec<_>>(); + assert_eq!(actual, expected); + } + + #[test] + fn resolve_addr_sync2() { + let expected = vec![SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(0, 0, 0, 0), + 80, + ))]; + let actual = resolve_addr_sync("", 80).unwrap().collect::<Vec<_>>(); + assert_eq!(actual, expected); + } + + #[test] + fn resolve_addr_sync3() { + let expected = vec![SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(192, 0, 2, 1), + 25, + ))]; + let actual = resolve_addr_sync("192.0.2.1", 25) + .unwrap() + .collect::<Vec<_>>(); + assert_eq!(actual, expected); + } + + #[test] + fn resolve_addr_sync_ipv6() { + let expected = vec![SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1), + 8080, + 0, + 0, + ))]; + let actual = resolve_addr_sync("[2001:db8::1]", 8080) + .unwrap() + .collect::<Vec<_>>(); + assert_eq!(actual, expected); + } + + #[test] + fn resolve_addr_sync_err() { + assert!(resolve_addr_sync("INVALID ADDR", 1234).is_err()); + } +} |