From 38a7128cdd6f3308ba3c13cfb0b0d4ef925a44c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 29 Jun 2021 01:43:03 +0200 Subject: feat: Add "deno_net" extension (#11150) This commits moves implementation of net related APIs available on "Deno" namespace to "deno_net" extension. Following APIs were moved: - Deno.listen() - Deno.connect() - Deno.listenTls() - Deno.serveHttp() - Deno.shutdown() - Deno.resolveDns() - Deno.listenDatagram() - Deno.startTls() - Deno.Conn - Deno.Listener - Deno.DatagramConn --- Cargo.lock | 29 +- Cargo.toml | 1 + cli/Cargo.toml | 1 + cli/build.rs | 11 + cli/dts/lib.deno.ns.d.ts | 144 +--- cli/dts/lib.deno.unstable.d.ts | 257 +------- cli/main.rs | 2 + cli/tests/integration/mod.rs | 6 +- cli/tsc.rs | 3 + core/core.js | 19 + extensions/net/01_net.js | 234 +++++++ extensions/net/02_tls.js | 85 +++ extensions/net/03_http.js | 251 +++++++ extensions/net/04_net_unstable.js | 49 ++ extensions/net/Cargo.toml | 31 + extensions/net/README.md | 30 + extensions/net/io.rs | 232 +++++++ extensions/net/lib.deno_net.d.ts | 149 +++++ extensions/net/lib.deno_net.unstable.d.ts | 262 ++++++++ extensions/net/lib.rs | 113 ++++ extensions/net/ops.rs | 795 ++++++++++++++++++++++ extensions/net/ops_http.rs | 577 ++++++++++++++++ extensions/net/ops_tls.rs | 1024 +++++++++++++++++++++++++++++ extensions/net/ops_unix.rs | 180 +++++ extensions/net/resolve_addr.rs | 156 +++++ runtime/Cargo.toml | 9 +- runtime/build.rs | 1 + runtime/js/01_errors.js | 17 +- runtime/js/30_net.js | 220 ------- runtime/js/40_http.js | 251 ------- runtime/js/40_net_unstable.js | 49 -- runtime/js/40_tls.js | 85 --- runtime/lib.rs | 2 +- runtime/ops/http.rs | 579 ---------------- runtime/ops/io.rs | 129 +--- runtime/ops/mod.rs | 5 - runtime/ops/net.rs | 793 ---------------------- runtime/ops/net_unix.rs | 173 ----- runtime/ops/tls.rs | 1017 ---------------------------- runtime/permissions.rs | 17 + runtime/resolve_addr.rs | 156 ----- runtime/web_worker.rs | 4 +- runtime/worker.rs | 4 +- 43 files changed, 4261 insertions(+), 3891 deletions(-) create mode 100644 extensions/net/01_net.js create mode 100644 extensions/net/02_tls.js create mode 100644 extensions/net/03_http.js create mode 100644 extensions/net/04_net_unstable.js create mode 100644 extensions/net/Cargo.toml create mode 100644 extensions/net/README.md create mode 100644 extensions/net/io.rs create mode 100644 extensions/net/lib.deno_net.d.ts create mode 100644 extensions/net/lib.deno_net.unstable.d.ts create mode 100644 extensions/net/lib.rs create mode 100644 extensions/net/ops.rs create mode 100644 extensions/net/ops_http.rs create mode 100644 extensions/net/ops_tls.rs create mode 100644 extensions/net/ops_unix.rs create mode 100644 extensions/net/resolve_addr.rs delete mode 100644 runtime/js/30_net.js delete mode 100644 runtime/js/40_http.js delete mode 100644 runtime/js/40_net_unstable.js delete mode 100644 runtime/js/40_tls.js delete mode 100644 runtime/ops/http.rs delete mode 100644 runtime/ops/net.rs delete mode 100644 runtime/ops/net_unix.rs delete mode 100644 runtime/ops/tls.rs delete mode 100644 runtime/resolve_addr.rs diff --git a/Cargo.lock b/Cargo.lock index 5e7fed272..8c946a9de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -536,6 +536,7 @@ dependencies = [ "deno_doc", "deno_fetch", "deno_lint", + "deno_net", "deno_runtime", "deno_timers", "deno_url", @@ -702,17 +703,37 @@ dependencies = [ "swc_ecmascript", ] +[[package]] +name = "deno_net" +version = "0.1.0" +dependencies = [ + "bytes", + "deno_core", + "http", + "hyper", + "lazy_static", + "log", + "rustls", + "serde", + "tokio", + "tokio-util", + "trust-dns-proto", + "trust-dns-resolver", + "webpki", + "webpki-roots", +] + [[package]] name = "deno_runtime" version = "0.18.0" dependencies = [ "atty", - "bytes", "deno_broadcast_channel", "deno_console", "deno_core", "deno_crypto", "deno_fetch", + "deno_net", "deno_timers", "deno_url", "deno_web", @@ -735,18 +756,12 @@ dependencies = [ "percent-encoding", "regex", "ring", - "rustls", "serde", "sys-info", "termcolor", "test_util", "tokio", - "tokio-util", - "trust-dns-proto", - "trust-dns-resolver", "uuid", - "webpki", - "webpki-roots", "winapi 0.3.9", "winres", ] diff --git a/Cargo.toml b/Cargo.toml index 178d9de9c..d7025513c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "extensions/console", "extensions/crypto", "extensions/fetch", + "extensions/net", "extensions/timers", "extensions/url", "extensions/web", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 8052f06a5..c46bfb17a 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -25,6 +25,7 @@ deno_console = { version = "0.10.0", path = "../extensions/console" } deno_core = { version = "0.92.0", path = "../core" } deno_crypto = { version = "0.24.0", path = "../extensions/crypto" } deno_fetch = { version = "0.32.0", path = "../extensions/fetch" } +deno_net = { version = "0.1.0", path = "../extensions/net" } deno_timers = { version = "0.8.0", path = "../extensions/timers" } deno_url = { version = "0.10.0", path = "../extensions/url" } deno_web = { version = "0.41.0", path = "../extensions/web" } diff --git a/cli/build.rs b/cli/build.rs index 5e872ab2c..f932d5eff 100644 --- a/cli/build.rs +++ b/cli/build.rs @@ -68,6 +68,9 @@ fn create_compiler_snapshot( "deno.broadcast_channel", deno_broadcast_channel::get_declaration(), ); + op_crate_libs.insert("deno.net", deno_net::get_declaration()); + op_crate_libs + .insert("deno.net_unstable", deno_net::get_unstable_declaration()); // ensure we invalidate the build properly. for (_, path) in op_crate_libs.iter() { @@ -302,6 +305,14 @@ fn main() { "cargo:rustc-env=DENO_BROADCAST_CHANNEL_LIB_PATH={}", deno_broadcast_channel::get_declaration().display() ); + println!( + "cargo:rustc-env=DENO_NET_LIB_PATH={}", + deno_net::get_declaration().display() + ); + println!( + "cargo:rustc-env=DENO_NET_UNSTABLE_LIB_PATH={}", + deno_net::get_unstable_declaration().display() + ); println!("cargo:rustc-env=TARGET={}", env::var("TARGET").unwrap()); println!("cargo:rustc-env=PROFILE={}", env::var("PROFILE").unwrap()); diff --git a/cli/dts/lib.deno.ns.d.ts b/cli/dts/lib.deno.ns.d.ts index e83858586..e1c558f37 100644 --- a/cli/dts/lib.deno.ns.d.ts +++ b/cli/dts/lib.deno.ns.d.ts @@ -2,6 +2,7 @@ /// /// +/// /** Deno provides extra properties on `import.meta`. These are included here * to ensure that these are still available when using the Deno namespace in @@ -1784,149 +1785,6 @@ declare namespace Deno { * Requires `allow-write` permission. */ export function truncate(name: string, len?: number): Promise; - export interface NetAddr { - transport: "tcp" | "udp"; - hostname: string; - port: number; - } - - export interface UnixAddr { - transport: "unix" | "unixpacket"; - path: string; - } - - export type Addr = NetAddr | UnixAddr; - - /** A generic network listener for stream-oriented protocols. */ - export interface Listener extends AsyncIterable { - /** Waits for and resolves to the next connection to the `Listener`. */ - accept(): Promise; - /** Close closes the listener. Any pending accept promises will be rejected - * with errors. */ - close(): void; - /** Return the address of the `Listener`. */ - readonly addr: Addr; - - /** Return the rid of the `Listener`. */ - readonly rid: number; - - [Symbol.asyncIterator](): AsyncIterableIterator; - } - - export interface Conn extends Reader, Writer, Closer { - /** The local address of the connection. */ - readonly localAddr: Addr; - /** The remote address of the connection. */ - readonly remoteAddr: Addr; - /** The resource ID of the connection. */ - readonly rid: number; - /** Shuts down (`shutdown(2)`) the write side of the connection. Most - * callers should just use `close()`. */ - closeWrite(): Promise; - } - - export interface ListenOptions { - /** The port to listen on. */ - port: number; - /** A literal IP address or host name that can be resolved to an IP address. - * If not specified, defaults to `0.0.0.0`. */ - hostname?: string; - } - - /** Listen announces on the local transport address. - * - * ```ts - * const listener1 = Deno.listen({ port: 80 }) - * const listener2 = Deno.listen({ hostname: "192.0.2.1", port: 80 }) - * const listener3 = Deno.listen({ hostname: "[2001:db8::1]", port: 80 }); - * const listener4 = Deno.listen({ hostname: "golang.org", port: 80, transport: "tcp" }); - * ``` - * - * Requires `allow-net` permission. */ - export function listen( - options: ListenOptions & { transport?: "tcp" }, - ): Listener; - - export interface ListenTlsOptions extends ListenOptions { - /** Server certificate file. */ - certFile: string; - /** Server public key file. */ - keyFile: string; - - transport?: "tcp"; - } - - /** Listen announces on the local transport address over TLS (transport layer - * security). - * - * ```ts - * const lstnr = Deno.listenTls({ port: 443, certFile: "./server.crt", keyFile: "./server.key" }); - * ``` - * - * Requires `allow-net` permission. */ - export function listenTls(options: ListenTlsOptions): Listener; - - export interface ConnectOptions { - /** The port to connect to. */ - port: number; - /** A literal IP address or host name that can be resolved to an IP address. - * If not specified, defaults to `127.0.0.1`. */ - hostname?: string; - transport?: "tcp"; - } - - /** - * Connects to the hostname (default is "127.0.0.1") and port on the named - * transport (default is "tcp"), and resolves to the connection (`Conn`). - * - * ```ts - * const conn1 = await Deno.connect({ port: 80 }); - * const conn2 = await Deno.connect({ hostname: "192.0.2.1", port: 80 }); - * const conn3 = await Deno.connect({ hostname: "[2001:db8::1]", port: 80 }); - * const conn4 = await Deno.connect({ hostname: "golang.org", port: 80, transport: "tcp" }); - * ``` - * - * Requires `allow-net` permission for "tcp". */ - export function connect(options: ConnectOptions): Promise; - - export interface ConnectTlsOptions { - /** The port to connect to. */ - port: number; - /** A literal IP address or host name that can be resolved to an IP address. - * If not specified, defaults to `127.0.0.1`. */ - hostname?: string; - /** Server certificate file. */ - certFile?: string; - } - - /** Establishes a secure connection over TLS (transport layer security) using - * an optional cert file, hostname (default is "127.0.0.1") and port. The - * cert file is optional and if not included Mozilla's root certificates will - * be used (see also https://github.com/ctz/webpki-roots for specifics) - * - * ```ts - * const conn1 = await Deno.connectTls({ port: 80 }); - * const conn2 = await Deno.connectTls({ certFile: "./certs/my_custom_root_CA.pem", hostname: "192.0.2.1", port: 80 }); - * const conn3 = await Deno.connectTls({ hostname: "[2001:db8::1]", port: 80 }); - * const conn4 = await Deno.connectTls({ certFile: "./certs/my_custom_root_CA.pem", hostname: "golang.org", port: 80}); - * ``` - * - * Requires `allow-net` permission. - */ - export function connectTls(options: ConnectTlsOptions): Promise; - - /** Shutdown socket send operations. - * - * Matches behavior of POSIX shutdown(3). - * - * ```ts - * const listener = Deno.listen({ port: 80 }); - * const conn = await listener.accept(); - * Deno.shutdown(conn.rid); - * ``` - */ - export function shutdown(rid: number): Promise; - export interface Metrics { opsDispatched: number; opsDispatchedSync: number; diff --git a/cli/dts/lib.deno.unstable.d.ts b/cli/dts/lib.deno.unstable.d.ts index 1d01a748e..ac03e695c 100644 --- a/cli/dts/lib.deno.unstable.d.ts +++ b/cli/dts/lib.deno.unstable.d.ts @@ -2,6 +2,7 @@ /// /// +/// declare namespace Deno { /** @@ -812,232 +813,6 @@ declare namespace Deno { mtime: number | Date, ): Promise; - /** The type of the resource record. - * Only the listed types are supported currently. */ - export type RecordType = - | "A" - | "AAAA" - | "ANAME" - | "CNAME" - | "MX" - | "PTR" - | "SRV" - | "TXT"; - - export interface ResolveDnsOptions { - /** The name server to be used for lookups. - * If not specified, defaults to the system configuration e.g. `/etc/resolv.conf` on Unix. */ - nameServer?: { - /** The IP address of the name server */ - ipAddr: string; - /** The port number the query will be sent to. - * If not specified, defaults to 53. */ - port?: number; - }; - } - - /** If `resolveDns` is called with "MX" record type specified, it will return an array of this interface. */ - export interface MXRecord { - preference: number; - exchange: string; - } - - /** If `resolveDns` is called with "SRV" record type specified, it will return an array of this interface. */ - export interface SRVRecord { - priority: number; - weight: number; - port: number; - target: string; - } - - export function resolveDns( - query: string, - recordType: "A" | "AAAA" | "ANAME" | "CNAME" | "PTR", - options?: ResolveDnsOptions, - ): Promise; - - export function resolveDns( - query: string, - recordType: "MX", - options?: ResolveDnsOptions, - ): Promise; - - export function resolveDns( - query: string, - recordType: "SRV", - options?: ResolveDnsOptions, - ): Promise; - - export function resolveDns( - query: string, - recordType: "TXT", - options?: ResolveDnsOptions, - ): Promise; - - /** ** UNSTABLE**: new API, yet to be vetted. - * - * Performs DNS resolution against the given query, returning resolved records. - * Fails in the cases such as: - * - the query is in invalid format - * - the options have an invalid parameter, e.g. `nameServer.port` is beyond the range of 16-bit unsigned integer - * - timed out - * - * ```ts - * const a = await Deno.resolveDns("example.com", "A"); - * - * const aaaa = await Deno.resolveDns("example.com", "AAAA", { - * nameServer: { ipAddr: "8.8.8.8", port: 1234 }, - * }); - * ``` - * - * Requires `allow-net` permission. - */ - export function resolveDns( - query: string, - recordType: RecordType, - options?: ResolveDnsOptions, - ): Promise; - - /** **UNSTABLE**: new API, yet to be vetted. - * - * A generic transport listener for message-oriented protocols. */ - export interface DatagramConn extends AsyncIterable<[Uint8Array, Addr]> { - /** **UNSTABLE**: new API, yet to be vetted. - * - * Waits for and resolves to the next message to the `UDPConn`. */ - receive(p?: Uint8Array): Promise<[Uint8Array, Addr]>; - /** UNSTABLE: new API, yet to be vetted. - * - * Sends a message to the target. */ - send(p: Uint8Array, addr: Addr): Promise; - /** UNSTABLE: new API, yet to be vetted. - * - * Close closes the socket. Any pending message promises will be rejected - * with errors. */ - close(): void; - /** Return the address of the `UDPConn`. */ - readonly addr: Addr; - [Symbol.asyncIterator](): AsyncIterableIterator<[Uint8Array, Addr]>; - } - - export interface UnixListenOptions { - /** A Path to the Unix Socket. */ - path: string; - } - - /** **UNSTABLE**: new API, yet to be vetted. - * - * Listen announces on the local transport address. - * - * ```ts - * const listener = Deno.listen({ path: "/foo/bar.sock", transport: "unix" }) - * ``` - * - * Requires `allow-read` and `allow-write` permission. */ - export function listen( - options: UnixListenOptions & { transport: "unix" }, - ): Listener; - - /** **UNSTABLE**: new API, yet to be vetted - * - * Listen announces on the local transport address. - * - * ```ts - * const listener1 = Deno.listenDatagram({ - * port: 80, - * transport: "udp" - * }); - * const listener2 = Deno.listenDatagram({ - * hostname: "golang.org", - * port: 80, - * transport: "udp" - * }); - * ``` - * - * Requires `allow-net` permission. */ - export function listenDatagram( - options: ListenOptions & { transport: "udp" }, - ): DatagramConn; - - /** **UNSTABLE**: new API, yet to be vetted - * - * Listen announces on the local transport address. - * - * ```ts - * const listener = Deno.listenDatagram({ - * path: "/foo/bar.sock", - * transport: "unixpacket" - * }); - * ``` - * - * Requires `allow-read` and `allow-write` permission. */ - export function listenDatagram( - options: UnixListenOptions & { transport: "unixpacket" }, - ): DatagramConn; - - export interface UnixConnectOptions { - transport: "unix"; - path: string; - } - - /** **UNSTABLE**: The unix socket transport is unstable as a new API yet to - * be vetted. The TCP transport is considered stable. - * - * Connects to the hostname (default is "127.0.0.1") and port on the named - * transport (default is "tcp"), and resolves to the connection (`Conn`). - * - * ```ts - * const conn1 = await Deno.connect({ port: 80 }); - * const conn2 = await Deno.connect({ hostname: "192.0.2.1", port: 80 }); - * const conn3 = await Deno.connect({ hostname: "[2001:db8::1]", port: 80 }); - * const conn4 = await Deno.connect({ hostname: "golang.org", port: 80, transport: "tcp" }); - * const conn5 = await Deno.connect({ path: "/foo/bar.sock", transport: "unix" }); - * ``` - * - * Requires `allow-net` permission for "tcp" and `allow-read` for "unix". */ - export function connect( - options: ConnectOptions | UnixConnectOptions, - ): Promise; - - export interface StartTlsOptions { - /** A literal IP address or host name that can be resolved to an IP address. - * If not specified, defaults to `127.0.0.1`. */ - hostname?: string; - /** Server certificate file. */ - certFile?: string; - } - - /** **UNSTABLE**: new API, yet to be vetted. - * - * Start TLS handshake from an existing connection using - * an optional cert file, hostname (default is "127.0.0.1"). The - * cert file is optional and if not included Mozilla's root certificates will - * be used (see also https://github.com/ctz/webpki-roots for specifics) - * Using this function requires that the other end of the connection is - * prepared for TLS handshake. - * - * ```ts - * const conn = await Deno.connect({ port: 80, hostname: "127.0.0.1" }); - * const tlsConn = await Deno.startTls(conn, { certFile: "./certs/my_custom_root_CA.pem", hostname: "localhost" }); - * ``` - * - * Requires `allow-net` permission. - */ - export function startTls( - conn: Conn, - options?: StartTlsOptions, - ): Promise; - - export interface ListenTlsOptions { - /** **UNSTABLE**: new API, yet to be vetted. - * - * Application-Layer Protocol Negotiation (ALPN) protocols to announce to - * the client. If not specified, no ALPN extension will be included in the - * TLS handshake. - */ - alpnProtocols?: string[]; - } - /** **UNSTABLE**: The `signo` argument may change to require the Deno.Signal * enum. * @@ -1182,36 +957,6 @@ declare namespace Deno { bytesReceived: number; } - export interface RequestEvent { - readonly request: Request; - respondWith(r: Response | Promise): Promise; - } - - export interface HttpConn extends AsyncIterable { - readonly rid: number; - - nextRequest(): Promise; - close(): void; - } - - /** **UNSTABLE**: new API, yet to be vetted. - * - * Services HTTP requests given a TCP or TLS socket. - * - * ```ts - * const conn = await Deno.connect({ port: 80, hostname: "127.0.0.1" }); - * const httpConn = Deno.serveHttp(conn); - * const e = await httpConn.nextRequest(); - * if (e) { - * e.respondWith(new Response("Hello World")); - * } - * ``` - * - * If `httpConn.nextRequest()` encounters an error or returns `null` - * then the underlying HttpConn resource is closed automatically. - */ - export function serveHttp(conn: Conn): HttpConn; - /** **UNSTABLE**: New option, yet to be vetted. */ export interface TestDefinition { /** Specifies the permissions that should be used to run the test. diff --git a/cli/main.rs b/cli/main.rs index 38db7d13f..f1cf67ac4 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -334,12 +334,14 @@ pub fn get_types(unstable: bool) -> String { crate::tsc::DENO_WEBSTORAGE_LIB, crate::tsc::DENO_CRYPTO_LIB, crate::tsc::DENO_BROADCAST_CHANNEL_LIB, + crate::tsc::DENO_NET_LIB, crate::tsc::SHARED_GLOBALS_LIB, crate::tsc::WINDOW_LIB, ]; if unstable { types.push(crate::tsc::UNSTABLE_NS_LIB); + types.push(crate::tsc::DENO_NET_UNSTABLE_LIB); } types.join("\n") diff --git a/cli/tests/integration/mod.rs b/cli/tests/integration/mod.rs index fab2f20a5..c11d26dc9 100644 --- a/cli/tests/integration/mod.rs +++ b/cli/tests/integration/mod.rs @@ -2,9 +2,9 @@ use crate::itest; use deno_core::url; -use deno_runtime::ops::tls::rustls; -use deno_runtime::ops::tls::webpki; -use deno_runtime::ops::tls::TlsStream; +use deno_runtime::deno_net::ops_tls::rustls; +use deno_runtime::deno_net::ops_tls::webpki; +use deno_runtime::deno_net::ops_tls::TlsStream; use std::fs; use std::io::BufReader; use std::io::Cursor; diff --git a/cli/tsc.rs b/cli/tsc.rs index 3e7974a97..59b4ac81a 100644 --- a/cli/tsc.rs +++ b/cli/tsc.rs @@ -44,6 +44,9 @@ pub static DENO_WEBSTORAGE_LIB: &str = pub static DENO_CRYPTO_LIB: &str = include_str!(env!("DENO_CRYPTO_LIB_PATH")); pub static DENO_BROADCAST_CHANNEL_LIB: &str = include_str!(env!("DENO_BROADCAST_CHANNEL_LIB_PATH")); +pub static DENO_NET_LIB: &str = include_str!(env!("DENO_NET_LIB_PATH")); +pub static DENO_NET_UNSTABLE_LIB: &str = + include_str!(env!("DENO_NET_UNSTABLE_LIB_PATH")); pub static SHARED_GLOBALS_LIB: &str = include_str!("dts/lib.deno.shared_globals.d.ts"); pub static WINDOW_LIB: &str = include_str!("dts/lib.deno.window.d.ts"); diff --git a/core/core.js b/core/core.js index 8e06a3e45..9ce563869 100644 --- a/core/core.js +++ b/core/core.js @@ -132,6 +132,23 @@ opSync("op_print", str, isErr); } + // Some "extensions" rely on "BadResource" and "Interrupted" errors in the + // JS code (eg. "deno_net") so they are provided in "Deno.core" but later + // reexported on "Deno.errors" + class BadResource extends Error { + constructor(msg) { + super(msg); + this.name = "BadResource"; + } + } + + class Interrupted extends Error { + constructor(msg) { + super(msg); + this.name = "Interrupted"; + } + } + // Provide bootstrap namespace window.__bootstrap = {}; // Extra Deno.core.* exports @@ -146,5 +163,7 @@ registerErrorClass, handleAsyncMsgFromRust, syncOpsCache, + BadResource, + Interrupted, }); })(this); diff --git a/extensions/net/01_net.js b/extensions/net/01_net.js new file mode 100644 index 000000000..9a531bd94 --- /dev/null +++ b/extensions/net/01_net.js @@ -0,0 +1,234 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +((window) => { + const core = window.Deno.core; + const { BadResource } = core; + + async function read( + rid, + buffer, + ) { + if (buffer.length === 0) { + return 0; + } + const nread = await core.opAsync("op_net_read_async", rid, buffer); + return nread === 0 ? null : nread; + } + + async function write(rid, data) { + return await core.opAsync("op_net_write_async", rid, data); + } + + function shutdown(rid) { + return core.opAsync("op_net_shutdown", rid); + } + + function opAccept(rid, transport) { + return core.opAsync("op_accept", { rid, transport }); + } + + function opListen(args) { + return core.opSync("op_listen", args); + } + + function opConnect(args) { + return core.opAsync("op_connect", args); + } + + function opReceive(rid, transport, zeroCopy) { + return core.opAsync( + "op_datagram_receive", + { rid, transport }, + zeroCopy, + ); + } + + function opSend(args, zeroCopy) { + return core.opAsync("op_datagram_send", args, zeroCopy); + } + + function resolveDns(query, recordType, options) { + return core.opAsync("op_dns_resolve", { query, recordType, options }); + } + + class Conn { + #rid = 0; + #remoteAddr = null; + #localAddr = null; + constructor(rid, remoteAddr, localAddr) { + this.#rid = rid; + this.#remoteAddr = remoteAddr; + this.#localAddr = localAddr; + } + + get rid() { + return this.#rid; + } + + get remoteAddr() { + return this.#remoteAddr; + } + + get localAddr() { + return this.#localAddr; + } + + write(p) { + return write(this.rid, p); + } + + read(p) { + return read(this.rid, p); + } + + close() { + core.close(this.rid); + } + + closeWrite() { + return shutdown(this.rid); + } + } + + class Listener { + #rid = 0; + #addr = null; + + constructor(rid, addr) { + this.#rid = rid; + this.#addr = addr; + } + + get rid() { + return this.#rid; + } + + get addr() { + return this.#addr; + } + + async accept() { + const res = await opAccept(this.rid, this.addr.transport); + return new Conn(res.rid, res.remoteAddr, res.localAddr); + } + + async next() { + let conn; + try { + conn = await this.accept(); + } catch (error) { + if (error instanceof BadResource) { + return { value: undefined, done: true }; + } + throw error; + } + return { value: conn, done: false }; + } + + return(value) { + this.close(); + return Promise.resolve({ value, done: true }); + } + + close() { + core.close(this.rid); + } + + [Symbol.asyncIterator]() { + return this; + } + } + + class Datagram { + #rid = 0; + #addr = null; + + constructor(rid, addr, bufSize = 1024) { + this.#rid = rid; + this.#addr = addr; + this.bufSize = bufSize; + } + + get rid() { + return this.#rid; + } + + get addr() { + return this.#addr; + } + + async receive(p) { + const buf = p || new Uint8Array(this.bufSize); + const { size, remoteAddr } = await opReceive( + this.rid, + this.addr.transport, + buf, + ); + const sub = buf.subarray(0, size); + return [sub, remoteAddr]; + } + + send(p, addr) { + const remote = { hostname: "127.0.0.1", ...addr }; + + const args = { ...remote, rid: this.rid }; + return opSend(args, p); + } + + close() { + core.close(this.rid); + } + + async *[Symbol.asyncIterator]() { + while (true) { + try { + yield await this.receive(); + } catch (err) { + if (err instanceof BadResource) { + break; + } + throw err; + } + } + } + } + + function listen({ hostname, ...options }) { + const res = opListen({ + transport: "tcp", + hostname: typeof hostname === "undefined" ? "0.0.0.0" : hostname, + ...options, + }); + + return new Listener(res.rid, res.localAddr); + } + + async function connect(options) { + let res; + + if (options.transport === "unix") { + res = await opConnect(options); + } else { + res = await opConnect({ + transport: "tcp", + hostname: "127.0.0.1", + ...options, + }); + } + + return new Conn(res.rid, res.remoteAddr, res.localAddr); + } + + window.__bootstrap.net = { + connect, + Conn, + opConnect, + listen, + opListen, + Listener, + shutdown, + Datagram, + resolveDns, + }; +})(this); diff --git a/extensions/net/02_tls.js b/extensions/net/02_tls.js new file mode 100644 index 000000000..4fafe9079 --- /dev/null +++ b/extensions/net/02_tls.js @@ -0,0 +1,85 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +((window) => { + const core = window.Deno.core; + const { Listener, Conn } = window.__bootstrap.net; + + function opConnectTls( + args, + ) { + return core.opAsync("op_connect_tls", args); + } + + function opAcceptTLS(rid) { + return core.opAsync("op_accept_tls", rid); + } + + function opListenTls(args) { + return core.opSync("op_listen_tls", args); + } + + function opStartTls(args) { + return core.opAsync("op_start_tls", args); + } + + async function connectTls({ + port, + hostname = "127.0.0.1", + transport = "tcp", + certFile = undefined, + }) { + const res = await opConnectTls({ + port, + hostname, + transport, + certFile, + }); + return new Conn(res.rid, res.remoteAddr, res.localAddr); + } + + class TLSListener extends Listener { + async accept() { + const res = await opAcceptTLS(this.rid); + return new Conn(res.rid, res.remoteAddr, res.localAddr); + } + } + + function listenTls({ + port, + certFile, + keyFile, + hostname = "0.0.0.0", + transport = "tcp", + alpnProtocols, + }) { + const res = opListenTls({ + port, + certFile, + keyFile, + hostname, + transport, + alpnProtocols, + }); + return new TLSListener(res.rid, res.localAddr); + } + + async function startTls( + conn, + { hostname = "127.0.0.1", certFile } = {}, + ) { + const res = await opStartTls({ + rid: conn.rid, + hostname, + certFile, + }); + return new Conn(res.rid, res.remoteAddr, res.localAddr); + } + + window.__bootstrap.tls = { + startTls, + listenTls, + connectTls, + TLSListener, + }; +})(this); diff --git a/extensions/net/03_http.js b/extensions/net/03_http.js new file mode 100644 index 000000000..d5054bd1a --- /dev/null +++ b/extensions/net/03_http.js @@ -0,0 +1,251 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +((window) => { + const { InnerBody } = window.__bootstrap.fetchBody; + const { Response, fromInnerRequest, toInnerResponse, newInnerRequest } = + window.__bootstrap.fetch; + const core = window.Deno.core; + const { BadResource, Interrupted } = core; + const { ReadableStream } = window.__bootstrap.streams; + const abortSignal = window.__bootstrap.abortSignal; + + function serveHttp(conn) { + const rid = Deno.core.opSync("op_http_start", conn.rid); + return new HttpConn(rid); + } + + const connErrorSymbol = Symbol("connError"); + + class HttpConn { + #rid = 0; + + constructor(rid) { + this.#rid = rid; + } + + /** @returns {number} */ + get rid() { + return this.#rid; + } + + /** @returns {Promise} */ + async nextRequest() { + let nextRequest; + try { + nextRequest = await Deno.core.opAsync( + "op_http_request_next", + this.#rid, + ); + } catch (error) { + // A connection error seen here would cause disrupted responses to throw + // a generic `BadResource` error. Instead store this error and replace + // those with it. + this[connErrorSymbol] = error; + if (error instanceof BadResource) { + return null; + } else if (error instanceof Interrupted) { + return null; + } else if (error.message.includes("connection closed")) { + return null; + } + throw error; + } + if (nextRequest === null) return null; + + const [ + requestBodyRid, + responseSenderRid, + method, + headersList, + url, + ] = nextRequest; + + /** @type {ReadableStream | undefined} */ + let body = null; + if (typeof requestBodyRid === "number") { + body = createRequestBodyStream(requestBodyRid); + } + + const innerRequest = newInnerRequest( + method, + url, + headersList, + body !== null ? new InnerBody(body) : null, + ); + const signal = abortSignal.newSignal(); + const request = fromInnerRequest(innerRequest, signal, "immutable"); + + const respondWith = createRespondWith(this, responseSenderRid); + + return { request, respondWith }; + } + + /** @returns {void} */ + close() { + core.close(this.#rid); + } + + [Symbol.asyncIterator]() { + // deno-lint-ignore no-this-alias + const httpConn = this; + return { + async next() { + const reqEvt = await httpConn.nextRequest(); + // Change with caution, current form avoids a v8 deopt + return { value: reqEvt, done: reqEvt === null }; + }, + }; + } + } + + function readRequest(requestRid, zeroCopyBuf) { + return Deno.core.opAsync( + "op_http_request_read", + requestRid, + zeroCopyBuf, + ); + } + + function createRespondWith(httpConn, responseSenderRid) { + return async function respondWith(resp) { + if (resp instanceof Promise) { + resp = await resp; + } + + if (!(resp instanceof Response)) { + throw new TypeError( + "First argument to respondWith must be a Response or a promise resolving to a Response.", + ); + } + + const innerResp = toInnerResponse(resp); + + // If response body length is known, it will be sent synchronously in a + // single op, in other case a "response body" resource will be created and + // we'll be streaming it. + /** @type {ReadableStream | Uint8Array | null} */ + let respBody = null; + if (innerResp.body !== null) { + if (innerResp.body.unusable()) throw new TypeError("Body is unusable."); + if (innerResp.body.streamOrStatic instanceof ReadableStream) { + if (innerResp.body.length === null) { + respBody = innerResp.body.stream; + } else { + const reader = innerResp.body.stream.getReader(); + const r1 = await reader.read(); + if (r1.done) { + respBody = new Uint8Array(0); + } else { + respBody = r1.value; + const r2 = await reader.read(); + if (!r2.done) throw new TypeError("Unreachable"); + } + } + } else { + innerResp.body.streamOrStatic.consumed = true; + respBody = innerResp.body.streamOrStatic.body; + } + } else { + respBody = new Uint8Array(0); + } + + let responseBodyRid; + try { + responseBodyRid = await Deno.core.opAsync("op_http_response", [ + responseSenderRid, + innerResp.status ?? 200, + innerResp.headerList, + ], respBody instanceof Uint8Array ? respBody : null); + } catch (error) { + const connError = httpConn[connErrorSymbol]; + if (error instanceof BadResource && connError != null) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); + } + if (respBody !== null && respBody instanceof ReadableStream) { + await respBody.cancel(error); + } + throw error; + } + + // If `respond` returns a responseBodyRid, we should stream the body + // to that resource. + if (responseBodyRid !== null) { + try { + if (respBody === null || !(respBody instanceof ReadableStream)) { + throw new TypeError("Unreachable"); + } + const reader = respBody.getReader(); + while (true) { + const { value, done } = await reader.read(); + if (done) break; + if (!(value instanceof Uint8Array)) { + await reader.cancel(new TypeError("Value not a Uint8Array")); + break; + } + try { + await Deno.core.opAsync( + "op_http_response_write", + responseBodyRid, + value, + ); + } catch (error) { + const connError = httpConn[connErrorSymbol]; + if (error instanceof BadResource && connError != null) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); + } + await reader.cancel(error); + throw error; + } + } + } finally { + // Once all chunks are sent, and the request body is closed, we can + // close the response body. + try { + await Deno.core.opAsync("op_http_response_close", responseBodyRid); + } catch { /* pass */ } + } + } + }; + } + + function createRequestBodyStream(requestBodyRid) { + return new ReadableStream({ + type: "bytes", + async pull(controller) { + try { + // This is the largest possible size for a single packet on a TLS + // stream. + const chunk = new Uint8Array(16 * 1024 + 256); + const read = await readRequest( + requestBodyRid, + chunk, + ); + if (read > 0) { + // We read some data. Enqueue it onto the stream. + controller.enqueue(chunk.subarray(0, read)); + } else { + // We have reached the end of the body, so we close the stream. + controller.close(); + core.close(requestBodyRid); + } + } catch (err) { + // There was an error while reading a chunk of the body, so we + // error. + controller.error(err); + controller.close(); + core.close(requestBodyRid); + } + }, + cancel() { + core.close(requestBodyRid); + }, + }); + } + + window.__bootstrap.http = { + serveHttp, + }; +})(this); diff --git a/extensions/net/04_net_unstable.js b/extensions/net/04_net_unstable.js new file mode 100644 index 000000000..ca265bfaa --- /dev/null +++ b/extensions/net/04_net_unstable.js @@ -0,0 +1,49 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +((window) => { + const net = window.__bootstrap.net; + + function listen(options) { + if (options.transport === "unix") { + const res = net.opListen(options); + return new net.Listener(res.rid, res.localAddr); + } else { + return net.listen(options); + } + } + + function listenDatagram( + options, + ) { + let res; + if (options.transport === "unixpacket") { + res = net.opListen(options); + } else { + res = net.opListen({ + transport: "udp", + hostname: "127.0.0.1", + ...options, + }); + } + + return new net.Datagram(res.rid, res.localAddr); + } + + async function connect( + options, + ) { + if (options.transport === "unix") { + const res = await net.opConnect(options); + return new net.Conn(res.rid, res.remoteAddr, res.localAddr); + } else { + return net.connect(options); + } + } + + window.__bootstrap.netUnstable = { + connect, + listenDatagram, + listen, + }; +})(this); diff --git a/extensions/net/Cargo.toml b/extensions/net/Cargo.toml new file mode 100644 index 000000000..9d5e97bbb --- /dev/null +++ b/extensions/net/Cargo.toml @@ -0,0 +1,31 @@ +# Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +[package] +name = "deno_net" +version = "0.1.0" +edition = "2018" +description = "Networking for Deno" +authors = ["the Deno authors"] +license = "MIT" +readme = "README.md" +repository = "https://github.com/denoland/deno" + +[lib] +path = "lib.rs" + +[dependencies] +deno_core = { version = "0.92.0", path = "../../core" } + +bytes = "1" +log = "0.4.14" +lazy_static = "1.4.0" +http = "0.2.3" +hyper = { version = "0.14.9", features = ["server", "stream", "http1", "http2", "runtime"] } +rustls = "0.19.0" +serde = { version = "1.0.125", features = ["derive"] } +tokio = { version = "1.7.1", features = ["full"] } +tokio-util = { version = "0.6", features = ["io"] } +webpki = "0.21.4" +webpki-roots = "0.21.1" +trust-dns-proto = "0.20.3" +trust-dns-resolver = { version = "0.20.3", features = ["tokio-runtime", "serde-config"] } diff --git a/extensions/net/README.md b/extensions/net/README.md new file mode 100644 index 000000000..cdd8923e1 --- /dev/null +++ b/extensions/net/README.md @@ -0,0 +1,30 @@ +# deno_net + +This crate implements networking APIs. + +This crate depends on following extensions: + +- "deno_web" +- "deno_fetch" + +Following ops are provided: + +- "op_net_read_async" +- "op_net_write_async" +- "op_net_shutdown" +- "op_accept" +- "op_connect" +- "op_listen" +- "op_datagram_receive" +- "op_datagram_send" +- "op_dns_resolve" +- "op_start_tls" +- "op_connect_tls" +- "op_listen_tls" +- "op_accept_tls" +- "op_http_start" +- "op_http_request_next" +- "op_http_request_read" +- "op_http_response" +- "op_http_response_write" +- "op_http_response_close" diff --git a/extensions/net/io.rs b/extensions/net/io.rs new file mode 100644 index 000000000..fc10d7e99 --- /dev/null +++ b/extensions/net/io.rs @@ -0,0 +1,232 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use crate::ops_tls as tls; +use deno_core::error::null_opbuf; +use deno_core::error::AnyError; +use deno_core::error::{bad_resource_id, not_supported}; +use deno_core::op_async; +use deno_core::AsyncMutFuture; +use deno_core::AsyncRefCell; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::OpPair; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::ZeroCopyBuf; +use std::borrow::Cow; +use std::cell::RefCell; +use std::rc::Rc; +use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWrite; +use tokio::io::AsyncWriteExt; +use tokio::net::tcp; + +#[cfg(unix)] +use tokio::net::unix; + +pub fn init() -> Vec { + vec![ + ("op_net_read_async", op_async(op_read_async)), + ("op_net_write_async", op_async(op_write_async)), + ("op_net_shutdown", op_async(op_shutdown)), + ] +} + +/// A full duplex resource has a read and write ends that are completely +/// independent, like TCP/Unix sockets and TLS streams. +#[derive(Debug)] +pub struct FullDuplexResource { + rd: AsyncRefCell, + wr: AsyncRefCell, + // When a full-duplex resource is closed, all pending 'read' ops are + // canceled, while 'write' ops are allowed to complete. Therefore only + // 'read' futures should be attached to this cancel handle. + cancel_handle: CancelHandle, +} + +impl FullDuplexResource +where + R: AsyncRead + Unpin + 'static, + W: AsyncWrite + Unpin + 'static, +{ + pub fn new((rd, wr): (R, W)) -> Self { + Self { + rd: rd.into(), + wr: wr.into(), + cancel_handle: Default::default(), + } + } + + pub fn into_inner(self) -> (R, W) { + (self.rd.into_inner(), self.wr.into_inner()) + } + + pub fn rd_borrow_mut(self: &Rc) -> AsyncMutFuture { + RcRef::map(self, |r| &r.rd).borrow_mut() + } + + pub fn wr_borrow_mut(self: &Rc) -> AsyncMutFuture { + RcRef::map(self, |r| &r.wr).borrow_mut() + } + + pub fn cancel_handle(self: &Rc) -> RcRef { + RcRef::map(self, |r| &r.cancel_handle) + } + + pub fn cancel_read_ops(&self) { + self.cancel_handle.cancel() + } + + pub async fn read( + self: &Rc, + buf: &mut [u8], + ) -> Result { + let mut rd = self.rd_borrow_mut().await; + let nread = rd.read(buf).try_or_cancel(self.cancel_handle()).await?; + Ok(nread) + } + + pub async fn write(self: &Rc, buf: &[u8]) -> Result { + let mut wr = self.wr_borrow_mut().await; + let nwritten = wr.write(buf).await?; + Ok(nwritten) + } + + pub async fn shutdown(self: &Rc) -> Result<(), AnyError> { + let mut wr = self.wr_borrow_mut().await; + wr.shutdown().await?; + Ok(()) + } +} + +pub type TcpStreamResource = + FullDuplexResource; + +impl Resource for TcpStreamResource { + fn name(&self) -> Cow { + "tcpStream".into() + } + + fn close(self: Rc) { + self.cancel_read_ops(); + } +} + +pub type TlsStreamResource = FullDuplexResource; + +impl Resource for TlsStreamResource { + fn name(&self) -> Cow { + "tlsStream".into() + } + + fn close(self: Rc) { + self.cancel_read_ops(); + } +} + +#[cfg(unix)] +pub type UnixStreamResource = + FullDuplexResource; + +#[cfg(not(unix))] +pub struct UnixStreamResource; + +#[cfg(not(unix))] +impl UnixStreamResource { + pub async fn read( + self: &Rc, + _buf: &mut [u8], + ) -> Result { + unreachable!() + } + pub async fn write(self: &Rc, _buf: &[u8]) -> Result { + unreachable!() + } + pub async fn shutdown(self: &Rc) -> Result<(), AnyError> { + unreachable!() + } + pub fn cancel_read_ops(&self) { + unreachable!() + } +} + +impl Resource for UnixStreamResource { + fn name(&self) -> Cow { + "unixStream".into() + } + + fn close(self: Rc) { + self.cancel_read_ops(); + } +} + +async fn op_read_async( + state: Rc>, + rid: ResourceId, + buf: Option, +) -> Result { + let buf = &mut buf.ok_or_else(null_opbuf)?; + let resource = state + .borrow() + .resource_table + .get_any(rid) + .ok_or_else(bad_resource_id)?; + let nread = if let Some(s) = resource.downcast_rc::() { + s.read(buf).await? + } else if let Some(s) = resource.downcast_rc::() { + s.read(buf).await? + } else if let Some(s) = resource.downcast_rc::() { + s.read(buf).await? + } else { + return Err(not_supported()); + }; + Ok(nread as u32) +} + +async fn op_write_async( + state: Rc>, + rid: ResourceId, + buf: Option, +) -> Result { + let buf = &buf.ok_or_else(null_opbuf)?; + let resource = state + .borrow() + .resource_table + .get_any(rid) + .ok_or_else(bad_resource_id)?; + let nwritten = if let Some(s) = resource.downcast_rc::() { + s.write(buf).await? + } else if let Some(s) = resource.downcast_rc::() { + s.write(buf).await? + } else if let Some(s) = resource.downcast_rc::() { + s.write(buf).await? + } else { + return Err(not_supported()); + }; + Ok(nwritten as u32) +} + +async fn op_shutdown( + state: Rc>, + rid: ResourceId, + _: (), +) -> Result<(), AnyError> { + let resource = state + .borrow() + .resource_table + .get_any(rid) + .ok_or_else(bad_resource_id)?; + if let Some(s) = resource.downcast_rc::() { + s.shutdown().await?; + } else if let Some(s) = resource.downcast_rc::() { + s.shutdown().await?; + } else if let Some(s) = resource.downcast_rc::() { + s.shutdown().await?; + } else { + return Err(not_supported()); + } + Ok(()) +} diff --git a/extensions/net/lib.deno_net.d.ts b/extensions/net/lib.deno_net.d.ts new file mode 100644 index 000000000..25397f960 --- /dev/null +++ b/extensions/net/lib.deno_net.d.ts @@ -0,0 +1,149 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +/// +/// + +declare namespace Deno { + export interface NetAddr { + transport: "tcp" | "udp"; + hostname: string; + port: number; + } + + export interface UnixAddr { + transport: "unix" | "unixpacket"; + path: string; + } + + export type Addr = NetAddr | UnixAddr; + + /** A generic network listener for stream-oriented protocols. */ + export interface Listener extends AsyncIterable { + /** Waits for and resolves to the next connection to the `Listener`. */ + accept(): Promise; + /** Close closes the listener. Any pending accept promises will be rejected + * with errors. */ + close(): void; + /** Return the address of the `Listener`. */ + readonly addr: Addr; + + /** Return the rid of the `Listener`. */ + readonly rid: number; + + [Symbol.asyncIterator](): AsyncIterableIterator; + } + + export interface Conn extends Reader, Writer, Closer { + /** The local address of the connection. */ + readonly localAddr: Addr; + /** The remote address of the connection. */ + readonly remoteAddr: Addr; + /** The resource ID of the connection. */ + readonly rid: number; + /** Shuts down (`shutdown(2)`) the write side of the connection. Most + * callers should just use `close()`. */ + closeWrite(): Promise; + } + + export interface ListenOptions { + /** The port to listen on. */ + port: number; + /** A literal IP address or host name that can be resolved to an IP address. + * If not specified, defaults to `0.0.0.0`. */ + hostname?: string; + } + + /** Listen announces on the local transport address. + * + * ```ts + * const listener1 = Deno.listen({ port: 80 }) + * const listener2 = Deno.listen({ hostname: "192.0.2.1", port: 80 }) + * const listener3 = Deno.listen({ hostname: "[2001:db8::1]", port: 80 }); + * const listener4 = Deno.listen({ hostname: "golang.org", port: 80, transport: "tcp" }); + * ``` + * + * Requires `allow-net` permission. */ + export function listen( + options: ListenOptions & { transport?: "tcp" }, + ): Listener; + + export interface ListenTlsOptions extends ListenOptions { + /** Server certificate file. */ + certFile: string; + /** Server public key file. */ + keyFile: string; + + transport?: "tcp"; + } + + /** Listen announces on the local transport address over TLS (transport layer + * security). + * + * ```ts + * const lstnr = Deno.listenTls({ port: 443, certFile: "./server.crt", keyFile: "./server.key" }); + * ``` + * + * Requires `allow-net` permission. */ + export function listenTls(options: ListenTlsOptions): Listener; + + export interface ConnectOptions { + /** The port to connect to. */ + port: number; + /** A literal IP address or host name that can be resolved to an IP address. + * If not specified, defaults to `127.0.0.1`. */ + hostname?: string; + transport?: "tcp"; + } + + /** + * Connects to the hostname (default is "127.0.0.1") and port on the named + * transport (default is "tcp"), and resolves to the connection (`Conn`). + * + * ```ts + * const conn1 = await Deno.connect({ port: 80 }); + * const conn2 = await Deno.connect({ hostname: "192.0.2.1", port: 80 }); + * const conn3 = await Deno.connect({ hostname: "[2001:db8::1]", port: 80 }); + * const conn4 = await Deno.connect({ hostname: "golang.org", port: 80, transport: "tcp" }); + * ``` + * + * Requires `allow-net` permission for "tcp". */ + export function connect(options: ConnectOptions): Promise; + + export interface ConnectTlsOptions { + /** The port to connect to. */ + port: number; + /** A literal IP address or host name that can be resolved to an IP address. + * If not specified, defaults to `127.0.0.1`. */ + hostname?: string; + /** Server certificate file. */ + certFile?: string; + } + + /** Establishes a secure connection over TLS (transport layer security) using + * an optional cert file, hostname (default is "127.0.0.1") and port. The + * cert file is optional and if not included Mozilla's root certificates will + * be used (see also https://github.com/ctz/webpki-roots for specifics) + * + * ```ts + * const conn1 = await Deno.connectTls({ port: 80 }); + * const conn2 = await Deno.connectTls({ certFile: "./certs/my_custom_root_CA.pem", hostname: "192.0.2.1", port: 80 }); + * const conn3 = await Deno.connectTls({ hostname: "[2001:db8::1]", port: 80 }); + * const conn4 = await Deno.connectTls({ certFile: "./certs/my_custom_root_CA.pem", hostname: "golang.org", port: 80}); + * ``` + * + * Requires `allow-net` permission. + */ + export function connectTls(options: ConnectTlsOptions): Promise; + + /** Shutdown socket send operations. + * + * Matches behavior of POSIX shutdown(3). + * + * ```ts + * const listener = Deno.listen({ port: 80 }); + * const conn = await listener.accept(); + * Deno.shutdown(conn.rid); + * ``` + */ + export function shutdown(rid: number): Promise; +} diff --git a/extensions/net/lib.deno_net.unstable.d.ts b/extensions/net/lib.deno_net.unstable.d.ts new file mode 100644 index 000000000..905a7acc1 --- /dev/null +++ b/extensions/net/lib.deno_net.unstable.d.ts @@ -0,0 +1,262 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +/// +/// + +declare namespace Deno { + /** The type of the resource record. + * Only the listed types are supported currently. */ + export type RecordType = + | "A" + | "AAAA" + | "ANAME" + | "CNAME" + | "MX" + | "PTR" + | "SRV" + | "TXT"; + + export interface ResolveDnsOptions { + /** The name server to be used for lookups. + * If not specified, defaults to the system configuration e.g. `/etc/resolv.conf` on Unix. */ + nameServer?: { + /** The IP address of the name server */ + ipAddr: string; + /** The port number the query will be sent to. + * If not specified, defaults to 53. */ + port?: number; + }; + } + + /** If `resolveDns` is called with "MX" record type specified, it will return an array of this interface. */ + export interface MXRecord { + preference: number; + exchange: string; + } + + /** If `resolveDns` is called with "SRV" record type specified, it will return an array of this interface. */ + export interface SRVRecord { + priority: number; + weight: number; + port: number; + target: string; + } + + export function resolveDns( + query: string, + recordType: "A" | "AAAA" | "ANAME" | "CNAME" | "PTR", + options?: ResolveDnsOptions, + ): Promise; + + export function resolveDns( + query: string, + recordType: "MX", + options?: ResolveDnsOptions, + ): Promise; + + export function resolveDns( + query: string, + recordType: "SRV", + options?: ResolveDnsOptions, + ): Promise; + + export function resolveDns( + query: string, + recordType: "TXT", + options?: ResolveDnsOptions, + ): Promise; + + /** ** UNSTABLE**: new API, yet to be vetted. +* +* Performs DNS resolution against the given query, returning resolved records. +* Fails in the cases such as: +* - the query is in invalid format +* - the options have an invalid parameter, e.g. `nameServer.port` is beyond the range of 16-bit unsigned integer +* - timed out +* +* ```ts +* const a = await Deno.resolveDns("example.com", "A"); +* +* const aaaa = await Deno.resolveDns("example.com", "AAAA", { +* nameServer: { ipAddr: "8.8.8.8", port: 1234 }, +* }); +* ``` +* +* Requires `allow-net` permission. + */ + export function resolveDns( + query: string, + recordType: RecordType, + options?: ResolveDnsOptions, + ): Promise; + + /** **UNSTABLE**: new API, yet to be vetted. +* +* A generic transport listener for message-oriented protocols. */ + export interface DatagramConn extends AsyncIterable<[Uint8Array, Addr]> { + /** **UNSTABLE**: new API, yet to be vetted. + * + * Waits for and resolves to the next message to the `UDPConn`. */ + receive(p?: Uint8Array): Promise<[Uint8Array, Addr]>; + /** UNSTABLE: new API, yet to be vetted. + * + * Sends a message to the target. */ + send(p: Uint8Array, addr: Addr): Promise; + /** UNSTABLE: new API, yet to be vetted. + * + * Close closes the socket. Any pending message promises will be rejected + * with errors. */ + close(): void; + /** Return the address of the `UDPConn`. */ + readonly addr: Addr; + [Symbol.asyncIterator](): AsyncIterableIterator<[Uint8Array, Addr]>; + } + + export interface UnixListenOptions { + /** A Path to the Unix Socket. */ + path: string; + } + + /** **UNSTABLE**: new API, yet to be vetted. +* +* Listen announces on the local transport address. +* +* ```ts +* const listener = Deno.listen({ path: "/foo/bar.sock", transport: "unix" }) +* ``` +* +* Requires `allow-read` and `allow-write` permission. */ + export function listen( + options: UnixListenOptions & { transport: "unix" }, + ): Listener; + + /** **UNSTABLE**: new API, yet to be vetted +* +* Listen announces on the local transport address. +* +* ```ts +* const listener1 = Deno.listenDatagram({ +* port: 80, +* transport: "udp" +* }); +* const listener2 = Deno.listenDatagram({ +* hostname: "golang.org", +* port: 80, +* transport: "udp" +* }); +* ``` +* +* Requires `allow-net` permission. */ + export function listenDatagram( + options: ListenOptions & { transport: "udp" }, + ): DatagramConn; + + /** **UNSTABLE**: new API, yet to be vetted +* +* Listen announces on the local transport address. +* +* ```ts +* const listener = Deno.listenDatagram({ +* path: "/foo/bar.sock", +* transport: "unixpacket" +* }); +* ``` +* +* Requires `allow-read` and `allow-write` permission. */ + export function listenDatagram( + options: UnixListenOptions & { transport: "unixpacket" }, + ): DatagramConn; + + export interface UnixConnectOptions { + transport: "unix"; + path: string; + } + + /** **UNSTABLE**: The unix socket transport is unstable as a new API yet to +* be vetted. The TCP transport is considered stable. +* +* Connects to the hostname (default is "127.0.0.1") and port on the named +* transport (default is "tcp"), and resolves to the connection (`Conn`). +* +* ```ts +* const conn1 = await Deno.connect({ port: 80 }); +* const conn2 = await Deno.connect({ hostname: "192.0.2.1", port: 80 }); +* const conn3 = await Deno.connect({ hostname: "[2001:db8::1]", port: 80 }); +* const conn4 = await Deno.connect({ hostname: "golang.org", port: 80, transport: "tcp" }); +* const conn5 = await Deno.connect({ path: "/foo/bar.sock", transport: "unix" }); +* ``` +* +* Requires `allow-net` permission for "tcp" and `allow-read` for "unix". */ + export function connect( + options: ConnectOptions | UnixConnectOptions, + ): Promise; + + export interface StartTlsOptions { + /** A literal IP address or host name that can be resolved to an IP address. + * If not specified, defaults to `127.0.0.1`. */ + hostname?: string; + /** Server certificate file. */ + certFile?: string; + } + + /** **UNSTABLE**: new API, yet to be vetted. +* +* Start TLS handshake from an existing connection using +* an optional cert file, hostname (default is "127.0.0.1"). The +* cert file is optional and if not included Mozilla's root certificates will +* be used (see also https://github.com/ctz/webpki-roots for specifics) +* Using this function requires that the other end of the connection is +* prepared for TLS handshake. +* +* ```ts +* const conn = await Deno.connect({ port: 80, hostname: "127.0.0.1" }); +* const tlsConn = await Deno.startTls(conn, { certFile: "./certs/my_custom_root_CA.pem", hostname: "localhost" }); +* ``` +* +* Requires `allow-net` permission. + */ + export function startTls( + conn: Conn, + options?: StartTlsOptions, + ): Promise; + + export interface ListenTlsOptions { + /** **UNSTABLE**: new API, yet to be vetted. + * + * Application-Layer Protocol Negotiation (ALPN) protocols to announce to + * the client. If not specified, no ALPN extension will be included in the + * TLS handshake. + */ + alpnProtocols?: string[]; + } + + export interface RequestEvent { + readonly request: Request; + respondWith(r: Response | Promise): Promise; + } + + export interface HttpConn extends AsyncIterable { + readonly rid: number; + + nextRequest(): Promise; + close(): void; + } + + /** **UNSTABLE**: new API, yet to be vetted. + * + * Services HTTP requests given a TCP or TLS socket. + * + * ```ts + * const conn = await Deno.connect({ port: 80, hostname: "127.0.0.1" }); + * const httpConn = Deno.serveHttp(conn); + * const e = await httpConn.nextRequest(); + * if (e) { + * e.respondWith(new Response("Hello World")); + * } + * ``` + * + * If `httpConn.nextRequest()` encounters an error or returns `null` + * then the underlying HttpConn resource is closed automatically. + */ + export function serveHttp(conn: Conn): HttpConn; +} diff --git a/extensions/net/lib.rs b/extensions/net/lib.rs new file mode 100644 index 000000000..d1e836fce --- /dev/null +++ b/extensions/net/lib.rs @@ -0,0 +1,113 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +pub mod io; +pub mod ops; +pub mod ops_http; +pub mod ops_tls; +#[cfg(unix)] +pub mod ops_unix; +pub mod resolve_addr; + +use deno_core::error::AnyError; +use deno_core::include_js_files; +use deno_core::Extension; +use deno_core::OpState; +use std::cell::RefCell; +use std::path::Path; +use std::path::PathBuf; +use std::rc::Rc; + +pub trait NetPermissions { + fn check_net>( + &mut self, + _host: &(T, Option), + ) -> Result<(), AnyError>; + fn check_read(&mut self, _p: &Path) -> Result<(), AnyError>; + fn check_write(&mut self, _p: &Path) -> Result<(), AnyError>; +} + +/// For use with this crate when the user does not want permission checks. +pub struct NoNetPermissions; + +impl NetPermissions for NoNetPermissions { + fn check_net>( + &mut self, + _host: &(T, Option), + ) -> Result<(), AnyError> { + Ok(()) + } + + fn check_read(&mut self, _p: &Path) -> Result<(), AnyError> { + Ok(()) + } + + fn check_write(&mut self, _p: &Path) -> Result<(), AnyError> { + Ok(()) + } +} + +/// `UnstableChecker` is a struct so it can be placed inside `GothamState`; +/// using type alias for a bool could work, but there's a high chance +/// that there might be another type alias pointing to a bool, which +/// would override previously used alias. +pub struct UnstableChecker { + pub unstable: bool, +} + +impl UnstableChecker { + /// Quits the process if the --unstable flag was not provided. + /// + /// This is intentionally a non-recoverable check so that people cannot probe + /// for unstable APIs from stable programs. + // NOTE(bartlomieju): keep in sync with `cli/program_state.rs` + pub fn check_unstable(&self, api_name: &str) { + if !self.unstable { + eprintln!( + "Unstable API '{}'. The --unstable flag must be provided.", + api_name + ); + std::process::exit(70); + } + } +} +/// Helper for checking unstable features. Used for sync ops. +pub fn check_unstable(state: &OpState, api_name: &str) { + state.borrow::().check_unstable(api_name) +} + +/// Helper for checking unstable features. Used for async ops. +pub fn check_unstable2(state: &Rc>, api_name: &str) { + let state = state.borrow(); + state.borrow::().check_unstable(api_name) +} + +pub fn get_declaration() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_net.d.ts") +} + +pub fn get_unstable_declaration() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_net.unstable.d.ts") +} + +pub fn init(unstable: bool) -> Extension { + let mut ops_to_register = vec![]; + ops_to_register.extend(io::init()); + ops_to_register.extend(ops::init::

()); + ops_to_register.extend(ops_tls::init::

()); + ops_to_register.extend(ops_http::init()); + + Extension::builder() + .js(include_js_files!( + prefix "deno:extensions/net", + "01_net.js", + "02_tls.js", + "03_http.js", + "04_net_unstable.js", + )) + .ops(ops_to_register) + .state(move |state| { + state.put(UnstableChecker { unstable }); + Ok(()) + }) + .build() +} diff --git a/extensions/net/ops.rs b/extensions/net/ops.rs new file mode 100644 index 000000000..a02bbf91a --- /dev/null +++ b/extensions/net/ops.rs @@ -0,0 +1,795 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use crate::io::TcpStreamResource; +use crate::resolve_addr::resolve_addr; +use crate::resolve_addr::resolve_addr_sync; +use crate::NetPermissions; +use deno_core::error::bad_resource; +use deno_core::error::custom_error; +use deno_core::error::generic_error; +use deno_core::error::null_opbuf; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::op_async; +use deno_core::op_sync; +use deno_core::AsyncRefCell; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::OpPair; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::ZeroCopyBuf; +use log::debug; +use serde::Deserialize; +use serde::Serialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::net::SocketAddr; +use std::rc::Rc; +use tokio::net::TcpListener; +use tokio::net::TcpStream; +use tokio::net::UdpSocket; +use trust_dns_proto::rr::record_data::RData; +use trust_dns_proto::rr::record_type::RecordType; +use trust_dns_resolver::config::NameServerConfigGroup; +use trust_dns_resolver::config::ResolverConfig; +use trust_dns_resolver::config::ResolverOpts; +use trust_dns_resolver::system_conf; +use trust_dns_resolver::AsyncResolver; + +#[cfg(unix)] +use super::ops_unix as net_unix; +#[cfg(unix)] +use crate::io::UnixStreamResource; +#[cfg(unix)] +use std::path::Path; + +pub fn init() -> Vec { + vec![ + ("op_accept", op_async(op_accept)), + ("op_connect", op_async(op_connect::

)), + ("op_listen", op_sync(op_listen::

)), + ("op_datagram_receive", op_async(op_datagram_receive)), + ("op_datagram_send", op_async(op_datagram_send::

)), + ("op_dns_resolve", op_async(op_dns_resolve::

)), + ] +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct OpConn { + pub rid: ResourceId, + pub remote_addr: Option, + pub local_addr: Option, +} + +#[derive(Serialize)] +#[serde(tag = "transport", rename_all = "lowercase")] +pub enum OpAddr { + Tcp(IpAddr), + Udp(IpAddr), + #[cfg(unix)] + Unix(net_unix::UnixAddr), + #[cfg(unix)] + UnixPacket(net_unix::UnixAddr), +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +/// A received datagram packet (from udp or unixpacket) +pub struct OpPacket { + pub size: usize, + pub remote_addr: OpAddr, +} + +#[derive(Serialize)] +pub struct IpAddr { + pub hostname: String, + pub port: u16, +} + +#[derive(Deserialize)] +pub(crate) struct AcceptArgs { + pub rid: ResourceId, + pub transport: String, +} + +async fn accept_tcp( + state: Rc>, + args: AcceptArgs, + _: (), +) -> Result { + let rid = args.rid; + + let resource = state + .borrow() + .resource_table + .get::(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + let listener = RcRef::map(&resource, |r| &r.listener) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (tcp_stream, _socket_addr) = + listener.accept().try_or_cancel(cancel).await.map_err(|e| { + // FIXME(bartlomieju): compatibility with current JS implementation + if let std::io::ErrorKind::Interrupted = e.kind() { + bad_resource("Listener has been closed") + } else { + e.into() + } + })?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + + let mut state = state.borrow_mut(); + let rid = state + .resource_table + .add(TcpStreamResource::new(tcp_stream.into_split())); + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) +} + +async fn op_accept( + state: Rc>, + args: AcceptArgs, + _: (), +) -> Result { + match args.transport.as_str() { + "tcp" => accept_tcp(state, args, ()).await, + #[cfg(unix)] + "unix" => net_unix::accept_unix(state, args, ()).await, + other => Err(bad_transport(other)), + } +} + +fn bad_transport(transport: &str) -> AnyError { + generic_error(format!("Unsupported transport protocol {}", transport)) +} + +#[derive(Deserialize)] +pub(crate) struct ReceiveArgs { + pub rid: ResourceId, + pub transport: String, +} + +async fn receive_udp( + state: Rc>, + args: ReceiveArgs, + zero_copy: Option, +) -> Result { + let zero_copy = zero_copy.ok_or_else(null_opbuf)?; + let mut zero_copy = zero_copy.clone(); + + let rid = args.rid; + + let resource = state + .borrow_mut() + .resource_table + .get::(rid) + .ok_or_else(|| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + let cancel_handle = RcRef::map(&resource, |r| &r.cancel); + let (size, remote_addr) = socket + .recv_from(&mut zero_copy) + .try_or_cancel(cancel_handle) + .await?; + Ok(OpPacket { + size, + remote_addr: OpAddr::Udp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + }), + }) +} + +async fn op_datagram_receive( + state: Rc>, + args: ReceiveArgs, + zero_copy: Option, +) -> Result { + match args.transport.as_str() { + "udp" => receive_udp(state, args, zero_copy).await, + #[cfg(unix)] + "unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await, + other => Err(bad_transport(other)), + } +} + +#[derive(Deserialize)] +struct SendArgs { + rid: ResourceId, + transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, +} + +async fn op_datagram_send( + state: Rc>, + args: SendArgs, + zero_copy: Option, +) -> Result +where + NP: NetPermissions + 'static, +{ + let zero_copy = zero_copy.ok_or_else(null_opbuf)?; + let zero_copy = zero_copy.clone(); + + match args { + SendArgs { + rid, + transport, + transport_args: ArgsEnum::Ip(args), + } if transport == "udp" => { + { + let mut s = state.borrow_mut(); + s.borrow_mut::() + .check_net(&(&args.hostname, Some(args.port)))?; + } + let addr = resolve_addr(&args.hostname, args.port) + .await? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + + let resource = state + .borrow_mut() + .resource_table + .get::(rid) + .ok_or_else(|| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + let byte_length = socket.send_to(&zero_copy, &addr).await?; + Ok(byte_length) + } + #[cfg(unix)] + SendArgs { + rid, + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unixpacket" => { + let address_path = Path::new(&args.path); + { + let mut s = state.borrow_mut(); + s.borrow_mut::().check_write(&address_path)?; + } + let resource = state + .borrow() + .resource_table + .get::(rid) + .ok_or_else(|| { + custom_error("NotConnected", "Socket has been closed") + })?; + let socket = RcRef::map(&resource, |r| &r.socket) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; + let byte_length = socket.send_to(&zero_copy, address_path).await?; + Ok(byte_length) + } + _ => Err(type_error("Wrong argument format!")), + } +} + +#[derive(Deserialize)] +struct ConnectArgs { + transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, +} + +async fn op_connect( + state: Rc>, + args: ConnectArgs, + _: (), +) -> Result +where + NP: NetPermissions + 'static, +{ + match args { + ConnectArgs { + transport, + transport_args: ArgsEnum::Ip(args), + } if transport == "tcp" => { + { + let mut state_ = state.borrow_mut(); + state_ + .borrow_mut::() + .check_net(&(&args.hostname, Some(args.port)))?; + } + let addr = resolve_addr(&args.hostname, args.port) + .await? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + let tcp_stream = TcpStream::connect(&addr).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + + let mut state_ = state.borrow_mut(); + let rid = state_ + .resource_table + .add(TcpStreamResource::new(tcp_stream.into_split())); + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) + } + #[cfg(unix)] + ConnectArgs { + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unix" => { + let address_path = Path::new(&args.path); + super::check_unstable2(&state, "Deno.connect"); + { + let mut state_ = state.borrow_mut(); + state_.borrow_mut::().check_read(&address_path)?; + state_.borrow_mut::().check_write(&address_path)?; + } + let path = args.path; + let unix_stream = net_unix::UnixStream::connect(Path::new(&path)).await?; + let local_addr = unix_stream.local_addr()?; + let remote_addr = unix_stream.peer_addr()?; + + let mut state_ = state.borrow_mut(); + let resource = UnixStreamResource::new(unix_stream.into_split()); + let rid = state_.resource_table.add(resource); + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Unix(net_unix::UnixAddr { + path: local_addr.as_pathname().and_then(net_unix::pathstring), + })), + remote_addr: Some(OpAddr::Unix(net_unix::UnixAddr { + path: remote_addr.as_pathname().and_then(net_unix::pathstring), + })), + }) + } + _ => Err(type_error("Wrong argument format!")), + } +} + +pub struct TcpListenerResource { + pub listener: AsyncRefCell, + pub cancel: CancelHandle, +} + +impl Resource for TcpListenerResource { + fn name(&self) -> Cow { + "tcpListener".into() + } + + fn close(self: Rc) { + self.cancel.cancel(); + } +} + +struct UdpSocketResource { + socket: AsyncRefCell, + cancel: CancelHandle, +} + +impl Resource for UdpSocketResource { + fn name(&self) -> Cow { + "udpSocket".into() + } + + fn close(self: Rc) { + self.cancel.cancel() + } +} + +#[derive(Deserialize)] +struct IpListenArgs { + hostname: String, + port: u16, +} + +#[derive(Deserialize)] +#[serde(untagged)] +enum ArgsEnum { + Ip(IpListenArgs), + #[cfg(unix)] + Unix(net_unix::UnixListenArgs), +} + +#[derive(Deserialize)] +struct ListenArgs { + transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, +} + +fn listen_tcp( + state: &mut OpState, + addr: SocketAddr, +) -> Result<(u32, SocketAddr), AnyError> { + let std_listener = std::net::TcpListener::bind(&addr)?; + std_listener.set_nonblocking(true)?; + let listener = TcpListener::from_std(std_listener)?; + let local_addr = listener.local_addr()?; + let listener_resource = TcpListenerResource { + listener: AsyncRefCell::new(listener), + cancel: Default::default(), + }; + let rid = state.resource_table.add(listener_resource); + + Ok((rid, local_addr)) +} + +fn listen_udp( + state: &mut OpState, + addr: SocketAddr, +) -> Result<(u32, SocketAddr), AnyError> { + let std_socket = std::net::UdpSocket::bind(&addr)?; + std_socket.set_nonblocking(true)?; + let socket = UdpSocket::from_std(std_socket)?; + let local_addr = socket.local_addr()?; + let socket_resource = UdpSocketResource { + socket: AsyncRefCell::new(socket), + cancel: Default::default(), + }; + let rid = state.resource_table.add(socket_resource); + + Ok((rid, local_addr)) +} + +fn op_listen( + state: &mut OpState, + args: ListenArgs, + _: (), +) -> Result +where + NP: NetPermissions + 'static, +{ + match args { + ListenArgs { + transport, + transport_args: ArgsEnum::Ip(args), + } => { + { + if transport == "udp" { + super::check_unstable(state, "Deno.listenDatagram"); + } + state + .borrow_mut::() + .check_net(&(&args.hostname, Some(args.port)))?; + } + let addr = resolve_addr_sync(&args.hostname, args.port)? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + let (rid, local_addr) = if transport == "tcp" { + listen_tcp(state, addr)? + } else { + listen_udp(state, addr)? + }; + debug!( + "New listener {} {}:{}", + rid, + local_addr.ip().to_string(), + local_addr.port() + ); + let ip_addr = IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + }; + Ok(OpConn { + rid, + local_addr: Some(match transport.as_str() { + "udp" => OpAddr::Udp(ip_addr), + "tcp" => OpAddr::Tcp(ip_addr), + // NOTE: This could be unreachable!() + other => return Err(bad_transport(other)), + }), + remote_addr: None, + }) + } + #[cfg(unix)] + ListenArgs { + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unix" || transport == "unixpacket" => { + let address_path = Path::new(&args.path); + { + if transport == "unix" { + super::check_unstable(state, "Deno.listen"); + } + if transport == "unixpacket" { + super::check_unstable(state, "Deno.listenDatagram"); + } + let permissions = state.borrow_mut::(); + permissions.check_read(&address_path)?; + permissions.check_write(&address_path)?; + } + let (rid, local_addr) = if transport == "unix" { + net_unix::listen_unix(state, &address_path)? + } else { + net_unix::listen_unix_packet(state, &address_path)? + }; + debug!( + "New listener {} {}", + rid, + local_addr.as_pathname().unwrap().display(), + ); + let unix_addr = net_unix::UnixAddr { + path: local_addr.as_pathname().and_then(net_unix::pathstring), + }; + + Ok(OpConn { + rid, + local_addr: Some(match transport.as_str() { + "unix" => OpAddr::Unix(unix_addr), + "unixpacket" => OpAddr::UnixPacket(unix_addr), + other => return Err(bad_transport(other)), + }), + remote_addr: None, + }) + } + #[cfg(unix)] + _ => Err(type_error("Wrong argument format!")), + } +} + +#[derive(Serialize, PartialEq, Debug)] +#[serde(untagged)] +enum DnsReturnRecord { + A(String), + Aaaa(String), + Aname(String), + Cname(String), + Mx { + preference: u16, + exchange: String, + }, + Ptr(String), + Srv { + priority: u16, + weight: u16, + port: u16, + target: String, + }, + Txt(Vec), +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ResolveAddrArgs { + query: String, + record_type: RecordType, + options: Option, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ResolveDnsOption { + name_server: Option, +} + +fn default_port() -> u16 { + 53 +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NameServer { + ip_addr: String, + #[serde(default = "default_port")] + port: u16, +} + +async fn op_dns_resolve( + state: Rc>, + args: ResolveAddrArgs, + _: (), +) -> Result, AnyError> +where + NP: NetPermissions + 'static, +{ + let ResolveAddrArgs { + query, + record_type, + options, + } = args; + + let (config, opts) = if let Some(name_server) = + options.as_ref().and_then(|o| o.name_server.as_ref()) + { + let group = NameServerConfigGroup::from_ips_clear( + &[name_server.ip_addr.parse()?], + name_server.port, + true, + ); + ( + ResolverConfig::from_parts(None, vec![], group), + ResolverOpts::default(), + ) + } else { + system_conf::read_system_conf()? + }; + + { + let mut s = state.borrow_mut(); + let perm = s.borrow_mut::(); + + // Checks permission against the name servers which will be actually queried. + for ns in config.name_servers() { + let socker_addr = &ns.socket_addr; + let ip = socker_addr.ip().to_string(); + let port = socker_addr.port(); + perm.check_net(&(ip, Some(port)))?; + } + } + + let resolver = AsyncResolver::tokio(config, opts)?; + + let results = resolver + .lookup(query, record_type, Default::default()) + .await + .map_err(|e| generic_error(format!("{}", e)))? + .iter() + .filter_map(rdata_to_return_record(record_type)) + .collect(); + + Ok(results) +} + +fn rdata_to_return_record( + ty: RecordType, +) -> impl Fn(&RData) -> Option { + use RecordType::*; + move |r: &RData| -> Option { + match ty { + A => r.as_a().map(ToString::to_string).map(DnsReturnRecord::A), + AAAA => r + .as_aaaa() + .map(ToString::to_string) + .map(DnsReturnRecord::Aaaa), + ANAME => r + .as_aname() + .map(ToString::to_string) + .map(DnsReturnRecord::Aname), + CNAME => r + .as_cname() + .map(ToString::to_string) + .map(DnsReturnRecord::Cname), + MX => r.as_mx().map(|mx| DnsReturnRecord::Mx { + preference: mx.preference(), + exchange: mx.exchange().to_string(), + }), + PTR => r + .as_ptr() + .map(ToString::to_string) + .map(DnsReturnRecord::Ptr), + SRV => r.as_srv().map(|srv| DnsReturnRecord::Srv { + priority: srv.priority(), + weight: srv.weight(), + port: srv.port(), + target: srv.target().to_string(), + }), + TXT => r.as_txt().map(|txt| { + let texts: Vec = txt + .iter() + .map(|bytes| { + // Tries to parse these bytes as Latin-1 + bytes.iter().map(|&b| b as char).collect::() + }) + .collect(); + DnsReturnRecord::Txt(texts) + }), + // TODO(magurotuna): Other record types are not supported + _ => todo!(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::Ipv4Addr; + use std::net::Ipv6Addr; + use trust_dns_proto::rr::rdata::mx::MX; + use trust_dns_proto::rr::rdata::srv::SRV; + use trust_dns_proto::rr::rdata::txt::TXT; + use trust_dns_proto::rr::record_data::RData; + use trust_dns_proto::rr::Name; + + #[test] + fn rdata_to_return_record_a() { + let func = rdata_to_return_record(RecordType::A); + let rdata = RData::A(Ipv4Addr::new(127, 0, 0, 1)); + assert_eq!( + func(&rdata), + Some(DnsReturnRecord::A("127.0.0.1".to_string())) + ); + } + + #[test] + fn rdata_to_return_record_aaaa() { + let func = rdata_to_return_record(RecordType::AAAA); + let rdata = RData::AAAA(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)); + assert_eq!(func(&rdata), Some(DnsReturnRecord::Aaaa("::1".to_string()))); + } + + #[test] + fn rdata_to_return_record_aname() { + let func = rdata_to_return_record(RecordType::ANAME); + let rdata = RData::ANAME(Name::new()); + assert_eq!(func(&rdata), Some(DnsReturnRecord::Aname("".to_string()))); + } + + #[test] + fn rdata_to_return_record_cname() { + let func = rdata_to_return_record(RecordType::CNAME); + let rdata = RData::CNAME(Name::new()); + assert_eq!(func(&rdata), Some(DnsReturnRecord::Cname("".to_string()))); + } + + #[test] + fn rdata_to_return_record_mx() { + let func = rdata_to_return_record(RecordType::MX); + let rdata = RData::MX(MX::new(10, Name::new())); + assert_eq!( + func(&rdata), + Some(DnsReturnRecord::Mx { + preference: 10, + exchange: "".to_string() + }) + ); + } + + #[test] + fn rdata_to_return_record_ptr() { + let func = rdata_to_return_record(RecordType::PTR); + let rdata = RData::PTR(Name::new()); + assert_eq!(func(&rdata), Some(DnsReturnRecord::Ptr("".to_string()))); + } + + #[test] + fn rdata_to_return_record_srv() { + let func = rdata_to_return_record(RecordType::SRV); + let rdata = RData::SRV(SRV::new(1, 2, 3, Name::new())); + assert_eq!( + func(&rdata), + Some(DnsReturnRecord::Srv { + priority: 1, + weight: 2, + port: 3, + target: "".to_string() + }) + ); + } + + #[test] + fn rdata_to_return_record_txt() { + let func = rdata_to_return_record(RecordType::TXT); + let rdata = RData::TXT(TXT::from_bytes(vec![ + "foo".as_bytes(), + "bar".as_bytes(), + &[0xa3], // "£" in Latin-1 + &[0xe3, 0x81, 0x82], // "あ" in UTF-8 + ])); + assert_eq!( + func(&rdata), + Some(DnsReturnRecord::Txt(vec![ + "foo".to_string(), + "bar".to_string(), + "£".to_string(), + "ã\u{81}\u{82}".to_string(), + ])) + ); + } +} diff --git a/extensions/net/ops_http.rs b/extensions/net/ops_http.rs new file mode 100644 index 000000000..54e06c3a7 --- /dev/null +++ b/extensions/net/ops_http.rs @@ -0,0 +1,577 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use crate::io::TcpStreamResource; +use crate::io::TlsStreamResource; +use crate::ops_tls::TlsStream; +use deno_core::error::bad_resource_id; +use deno_core::error::null_opbuf; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::futures::future::poll_fn; +use deno_core::futures::FutureExt; +use deno_core::futures::Stream; +use deno_core::futures::StreamExt; +use deno_core::op_async; +use deno_core::op_sync; +use deno_core::AsyncRefCell; +use deno_core::ByteString; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::OpPair; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::ZeroCopyBuf; +use hyper::body::HttpBody; +use hyper::http; +use hyper::server::conn::Connection; +use hyper::server::conn::Http; +use hyper::service::Service as HyperService; +use hyper::Body; +use hyper::Request; +use hyper::Response; +use serde::Deserialize; +use serde::Serialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::future::Future; +use std::net::SocketAddr; +use std::pin::Pin; +use std::rc::Rc; +use std::task::Context; +use std::task::Poll; +use tokio::io::AsyncReadExt; +use tokio::net::TcpStream; +use tokio::sync::oneshot; +use tokio_util::io::StreamReader; + +pub fn init() -> Vec { + vec![ + ("op_http_start", op_sync(op_http_start)), + ("op_http_request_next", op_async(op_http_request_next)), + ("op_http_request_read", op_async(op_http_request_read)), + ("op_http_response", op_async(op_http_response)), + ("op_http_response_write", op_async(op_http_response_write)), + ("op_http_response_close", op_async(op_http_response_close)), + ] +} + +struct ServiceInner { + request: Request, + response_tx: oneshot::Sender>, +} + +#[derive(Clone, Default)] +struct Service { + inner: Rc>>, + waker: Rc, +} + +impl HyperService> for Service { + type Response = Response; + type Error = http::Error; + #[allow(clippy::type_complexity)] + type Future = + Pin>>>; + + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + if self.inner.borrow().is_some() { + Poll::Pending + } else { + Poll::Ready(Ok(())) + } + } + + fn call(&mut self, req: Request) -> Self::Future { + let (resp_tx, resp_rx) = oneshot::channel(); + self.inner.borrow_mut().replace(ServiceInner { + request: req, + response_tx: resp_tx, + }); + + async move { Ok(resp_rx.await.unwrap()) }.boxed_local() + } +} + +enum ConnType { + Tcp(Rc>>), + Tls(Rc>>), +} + +struct ConnResource { + hyper_connection: ConnType, + deno_service: Service, + addr: SocketAddr, + cancel: CancelHandle, +} + +impl ConnResource { + // TODO(ry) impl Future for ConnResource? + fn poll(&self, cx: &mut Context<'_>) -> Poll> { + match &self.hyper_connection { + ConnType::Tcp(c) => c.borrow_mut().poll_unpin(cx), + ConnType::Tls(c) => c.borrow_mut().poll_unpin(cx), + } + .map_err(AnyError::from) + } +} + +impl Resource for ConnResource { + fn name(&self) -> Cow { + "httpConnection".into() + } + + fn close(self: Rc) { + self.cancel.cancel() + } +} + +// We use a tuple instead of struct to avoid serialization overhead of the keys. +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct NextRequestResponse( + // request_body_rid: + Option, + // response_sender_rid: + ResourceId, + // method: + // This is a String rather than a ByteString because reqwest will only return + // the method as a str which is guaranteed to be ASCII-only. + String, + // headers: + Vec<(ByteString, ByteString)>, + // url: + String, +); + +async fn op_http_request_next( + state: Rc>, + conn_rid: ResourceId, + _: (), +) -> Result, AnyError> { + let conn_resource = state + .borrow() + .resource_table + .get::(conn_rid) + .ok_or_else(bad_resource_id)?; + + let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel); + + poll_fn(|cx| { + conn_resource.deno_service.waker.register(cx.waker()); + let connection_closed = match conn_resource.poll(cx) { + Poll::Pending => false, + Poll::Ready(Ok(())) => { + // try to close ConnResource, but don't unwrap as it might + // already be closed + let _ = state + .borrow_mut() + .resource_table + .take::(conn_rid); + true + } + Poll::Ready(Err(e)) => { + // TODO(ry) close RequestResource associated with connection + // TODO(ry) close ResponseBodyResource associated with connection + // close ConnResource + state + .borrow_mut() + .resource_table + .take::(conn_rid) + .unwrap(); + + if should_ignore_error(&e) { + true + } else { + return Poll::Ready(Err(e)); + } + } + }; + if let Some(request_resource) = + conn_resource.deno_service.inner.borrow_mut().take() + { + let tx = request_resource.response_tx; + let req = request_resource.request; + let method = req.method().to_string(); + + let mut headers = Vec::with_capacity(req.headers().len()); + for (name, value) in req.headers().iter() { + let name: &[u8] = name.as_ref(); + let value = value.as_bytes(); + headers + .push((ByteString(name.to_owned()), ByteString(value.to_owned()))); + } + + let url = { + let scheme = { + match conn_resource.hyper_connection { + ConnType::Tcp(_) => "http", + ConnType::Tls(_) => "https", + } + }; + let host: Cow = if let Some(host) = req.uri().host() { + Cow::Borrowed(host) + } else if let Some(host) = req.headers().get("HOST") { + Cow::Borrowed(host.to_str()?) + } else { + Cow::Owned(conn_resource.addr.to_string()) + }; + let path = req.uri().path_and_query().map_or("/", |p| p.as_str()); + format!("{}://{}{}", scheme, host, path) + }; + + let has_body = if let Some(exact_size) = req.size_hint().exact() { + exact_size > 0 + } else { + true + }; + + let maybe_request_body_rid = if has_body { + let stream: BytesStream = Box::pin(req.into_body().map(|r| { + r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + })); + let stream_reader = StreamReader::new(stream); + let mut state = state.borrow_mut(); + let request_body_rid = state.resource_table.add(RequestBodyResource { + conn_rid, + reader: AsyncRefCell::new(stream_reader), + cancel: CancelHandle::default(), + }); + Some(request_body_rid) + } else { + None + }; + + let mut state = state.borrow_mut(); + let response_sender_rid = + state.resource_table.add(ResponseSenderResource { + sender: tx, + conn_rid, + }); + + Poll::Ready(Ok(Some(NextRequestResponse( + maybe_request_body_rid, + response_sender_rid, + method, + headers, + url, + )))) + } else if connection_closed { + Poll::Ready(Ok(None)) + } else { + Poll::Pending + } + }) + .try_or_cancel(cancel) + .await + .map_err(AnyError::from) +} + +fn should_ignore_error(e: &AnyError) -> bool { + if let Some(e) = e.downcast_ref::() { + use std::error::Error; + if let Some(std_err) = e.source() { + if let Some(io_err) = std_err.downcast_ref::() { + if io_err.kind() == std::io::ErrorKind::NotConnected { + return true; + } + } + } + } + false +} + +fn op_http_start( + state: &mut OpState, + tcp_stream_rid: ResourceId, + _: (), +) -> Result { + let deno_service = Service::default(); + + if let Some(resource_rc) = state + .resource_table + .take::(tcp_stream_rid) + { + let resource = Rc::try_unwrap(resource_rc) + .expect("Only a single use of this resource should happen"); + let (read_half, write_half) = resource.into_inner(); + let tcp_stream = read_half.reunite(write_half)?; + let addr = tcp_stream.local_addr()?; + let hyper_connection = Http::new() + .with_executor(LocalExecutor) + .serve_connection(tcp_stream, deno_service.clone()); + let conn_resource = ConnResource { + hyper_connection: ConnType::Tcp(Rc::new(RefCell::new(hyper_connection))), + deno_service, + addr, + cancel: CancelHandle::default(), + }; + let rid = state.resource_table.add(conn_resource); + return Ok(rid); + } + + if let Some(resource_rc) = state + .resource_table + .take::(tcp_stream_rid) + { + let resource = Rc::try_unwrap(resource_rc) + .expect("Only a single use of this resource should happen"); + let (read_half, write_half) = resource.into_inner(); + let tls_stream = read_half.reunite(write_half); + let addr = tls_stream.get_ref().0.local_addr()?; + + let hyper_connection = Http::new() + .with_executor(LocalExecutor) + .serve_connection(tls_stream, deno_service.clone()); + let conn_resource = ConnResource { + hyper_connection: ConnType::Tls(Rc::new(RefCell::new(hyper_connection))), + deno_service, + addr, + cancel: CancelHandle::default(), + }; + let rid = state.resource_table.add(conn_resource); + return Ok(rid); + } + + Err(bad_resource_id()) +} + +// We use a tuple instead of struct to avoid serialization overhead of the keys. +#[derive(Deserialize)] +struct RespondArgs( + // rid: + u32, + // status: + u16, + // headers: + Vec<(ByteString, ByteString)>, +); + +async fn op_http_response( + state: Rc>, + args: RespondArgs, + data: Option, +) -> Result, AnyError> { + let RespondArgs(rid, status, headers) = args; + + let response_sender = state + .borrow_mut() + .resource_table + .take::(rid) + .ok_or_else(bad_resource_id)?; + let response_sender = Rc::try_unwrap(response_sender) + .ok() + .expect("multiple op_http_respond ongoing"); + + let conn_resource = state + .borrow() + .resource_table + .get::(response_sender.conn_rid) + .ok_or_else(bad_resource_id)?; + + let mut builder = Response::builder().status(status); + + builder.headers_mut().unwrap().reserve(headers.len()); + for (key, value) in &headers { + builder = builder.header(key.as_ref(), value.as_ref()); + } + + let res; + let maybe_response_body_rid = if let Some(d) = data { + // If a body is passed, we use it, and don't return a body for streaming. + res = builder.body(Vec::from(&*d).into())?; + None + } else { + // If no body is passed, we return a writer for streaming the body. + let (sender, body) = Body::channel(); + res = builder.body(body)?; + + let response_body_rid = + state.borrow_mut().resource_table.add(ResponseBodyResource { + body: AsyncRefCell::new(sender), + conn_rid: response_sender.conn_rid, + }); + + Some(response_body_rid) + }; + + // oneshot::Sender::send(v) returns |v| on error, not an error object. + // The only failure mode is the receiver already having dropped its end + // of the channel. + if response_sender.sender.send(res).is_err() { + return Err(type_error("internal communication error")); + } + + poll_fn(|cx| match conn_resource.poll(cx) { + Poll::Ready(x) => Poll::Ready(x), + Poll::Pending => Poll::Ready(Ok(())), + }) + .await?; + + if maybe_response_body_rid.is_none() { + conn_resource.deno_service.waker.wake(); + } + Ok(maybe_response_body_rid) +} + +async fn op_http_response_close( + state: Rc>, + rid: ResourceId, + _: (), +) -> Result<(), AnyError> { + let resource = state + .borrow_mut() + .resource_table + .take::(rid) + .ok_or_else(bad_resource_id)?; + + let conn_resource = state + .borrow() + .resource_table + .get::(resource.conn_rid) + .ok_or_else(bad_resource_id)?; + drop(resource); + + let r = poll_fn(|cx| match conn_resource.poll(cx) { + Poll::Ready(x) => Poll::Ready(x), + Poll::Pending => Poll::Ready(Ok(())), + }) + .await; + conn_resource.deno_service.waker.wake(); + r +} + +async fn op_http_request_read( + state: Rc>, + rid: ResourceId, + data: Option, +) -> Result { + let mut data = data.ok_or_else(null_opbuf)?; + + let resource = state + .borrow() + .resource_table + .get::(rid as u32) + .ok_or_else(bad_resource_id)?; + + let conn_resource = state + .borrow() + .resource_table + .get::(resource.conn_rid) + .ok_or_else(bad_resource_id)?; + + let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await; + let cancel = RcRef::map(resource, |r| &r.cancel); + let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local(); + + poll_fn(|cx| { + if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { + // close ConnResource + // close RequestResource associated with connection + // close ResponseBodyResource associated with connection + return Poll::Ready(Err(e)); + } + + read_fut.poll_unpin(cx).map_err(AnyError::from) + }) + .await +} + +async fn op_http_response_write( + state: Rc>, + rid: ResourceId, + data: Option, +) -> Result<(), AnyError> { + let buf = data.ok_or_else(null_opbuf)?; + let resource = state + .borrow() + .resource_table + .get::(rid as u32) + .ok_or_else(bad_resource_id)?; + + let conn_resource = state + .borrow() + .resource_table + .get::(resource.conn_rid) + .ok_or_else(bad_resource_id)?; + + let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; + + let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local(); + + poll_fn(|cx| { + let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from); + + // Poll connection so the data is flushed + if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { + // close ConnResource + // close RequestResource associated with connection + // close ResponseBodyResource associated with connection + return Poll::Ready(Err(e)); + } + + r + }) + .await?; + + Ok(()) +} + +type BytesStream = + Pin> + Unpin>>; + +struct RequestBodyResource { + conn_rid: ResourceId, + reader: AsyncRefCell>, + cancel: CancelHandle, +} + +impl Resource for RequestBodyResource { + fn name(&self) -> Cow { + "requestBody".into() + } + + fn close(self: Rc) { + self.cancel.cancel() + } +} + +struct ResponseSenderResource { + sender: oneshot::Sender>, + conn_rid: ResourceId, +} + +impl Resource for ResponseSenderResource { + fn name(&self) -> Cow { + "responseSender".into() + } +} + +struct ResponseBodyResource { + body: AsyncRefCell, + conn_rid: ResourceId, +} + +impl Resource for ResponseBodyResource { + fn name(&self) -> Cow { + "responseBody".into() + } +} + +// Needed so hyper can use non Send futures +#[derive(Clone)] +struct LocalExecutor; + +impl hyper::rt::Executor for LocalExecutor +where + Fut: Future + 'static, + Fut::Output: 'static, +{ + fn execute(&self, fut: Fut) { + tokio::task::spawn_local(fut); + } +} diff --git a/extensions/net/ops_tls.rs b/extensions/net/ops_tls.rs new file mode 100644 index 000000000..701c5d1a1 --- /dev/null +++ b/extensions/net/ops_tls.rs @@ -0,0 +1,1024 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +pub use rustls; +pub use webpki; + +use crate::io::TcpStreamResource; +use crate::io::TlsStreamResource; +use crate::ops::IpAddr; +use crate::ops::OpAddr; +use crate::ops::OpConn; +use crate::resolve_addr::resolve_addr; +use crate::resolve_addr::resolve_addr_sync; +use crate::NetPermissions; +use deno_core::error::bad_resource; +use deno_core::error::bad_resource_id; +use deno_core::error::custom_error; +use deno_core::error::generic_error; +use deno_core::error::invalid_hostname; +use deno_core::error::AnyError; +use deno_core::futures::future::poll_fn; +use deno_core::futures::ready; +use deno_core::futures::task::noop_waker_ref; +use deno_core::futures::task::AtomicWaker; +use deno_core::futures::task::Context; +use deno_core::futures::task::Poll; +use deno_core::futures::task::RawWaker; +use deno_core::futures::task::RawWakerVTable; +use deno_core::futures::task::Waker; +use deno_core::op_async; +use deno_core::op_sync; +use deno_core::AsyncRefCell; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::OpPair; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use io::Error; +use io::Read; +use io::Write; +use rustls::internal::pemfile::certs; +use rustls::internal::pemfile::pkcs8_private_keys; +use rustls::internal::pemfile::rsa_private_keys; +use rustls::Certificate; +use rustls::ClientConfig; +use rustls::ClientSession; +use rustls::NoClientAuth; +use rustls::PrivateKey; +use rustls::ServerConfig; +use rustls::ServerSession; +use rustls::Session; +use rustls::StoresClientSessions; +use serde::Deserialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::collections::HashMap; +use std::convert::From; +use std::fs::File; +use std::io; +use std::io::BufReader; +use std::io::ErrorKind; +use std::ops::Deref; +use std::ops::DerefMut; +use std::path::Path; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::Weak; +use tokio::io::AsyncRead; +use tokio::io::AsyncWrite; +use tokio::io::ReadBuf; +use tokio::net::TcpListener; +use tokio::net::TcpStream; +use tokio::task::spawn_local; +use webpki::DNSNameRef; + +lazy_static::lazy_static! { + static ref CLIENT_SESSION_MEMORY_CACHE: Arc = + Arc::new(ClientSessionMemoryCache::default()); +} + +#[derive(Default)] +struct ClientSessionMemoryCache(Mutex, Vec>>); + +impl StoresClientSessions for ClientSessionMemoryCache { + fn get(&self, key: &[u8]) -> Option> { + self.0.lock().unwrap().get(key).cloned() + } + + fn put(&self, key: Vec, value: Vec) -> bool { + let mut sessions = self.0.lock().unwrap(); + // TODO(bnoordhuis) Evict sessions LRU-style instead of arbitrarily. + while sessions.len() >= 1024 { + let key = sessions.keys().next().unwrap().clone(); + sessions.remove(&key); + } + sessions.insert(key, value); + true + } +} + +#[derive(Debug)] +enum TlsSession { + Client(ClientSession), + Server(ServerSession), +} + +impl Deref for TlsSession { + type Target = dyn Session; + + fn deref(&self) -> &Self::Target { + match self { + TlsSession::Client(client_session) => client_session, + TlsSession::Server(server_session) => server_session, + } + } +} + +impl DerefMut for TlsSession { + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + TlsSession::Client(client_session) => client_session, + TlsSession::Server(server_session) => server_session, + } + } +} + +impl From for TlsSession { + fn from(client_session: ClientSession) -> Self { + TlsSession::Client(client_session) + } +} + +impl From for TlsSession { + fn from(server_session: ServerSession) -> Self { + TlsSession::Server(server_session) + } +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +enum Flow { + Read, + Write, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +enum State { + StreamOpen, + StreamClosed, + TlsClosing, + TlsClosed, + TcpClosed, +} + +#[derive(Debug)] +pub struct TlsStream(Option); + +impl TlsStream { + fn new(tcp: TcpStream, tls: TlsSession) -> Self { + let inner = TlsStreamInner { + tcp, + tls, + rd_state: State::StreamOpen, + wr_state: State::StreamOpen, + }; + Self(Some(inner)) + } + + pub fn new_client_side( + tcp: TcpStream, + tls_config: &Arc, + hostname: DNSNameRef, + ) -> Self { + let tls = TlsSession::Client(ClientSession::new(tls_config, hostname)); + Self::new(tcp, tls) + } + + pub fn new_server_side( + tcp: TcpStream, + tls_config: &Arc, + ) -> Self { + let tls = TlsSession::Server(ServerSession::new(tls_config)); + Self::new(tcp, tls) + } + + pub async fn handshake(&mut self) -> io::Result<()> { + poll_fn(|cx| self.inner_mut().poll_io(cx, Flow::Write)).await + } + + fn into_split(self) -> (ReadHalf, WriteHalf) { + let shared = Shared::new(self); + let rd = ReadHalf { + shared: shared.clone(), + }; + let wr = WriteHalf { shared }; + (rd, wr) + } + + /// Tokio-rustls compatibility: returns a reference to the underlying TCP + /// stream, and a reference to the Rustls `Session` object. + pub fn get_ref(&self) -> (&TcpStream, &dyn Session) { + let inner = self.0.as_ref().unwrap(); + (&inner.tcp, &*inner.tls) + } + + fn inner_mut(&mut self) -> &mut TlsStreamInner { + self.0.as_mut().unwrap() + } +} + +impl AsyncRead for TlsStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + self.inner_mut().poll_read(cx, buf) + } +} + +impl AsyncWrite for TlsStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.inner_mut().poll_write(cx, buf) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.inner_mut().poll_io(cx, Flow::Write) + // The underlying TCP stream does not need to be flushed. + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.inner_mut().poll_shutdown(cx) + } +} + +impl Drop for TlsStream { + fn drop(&mut self) { + let mut inner = self.0.take().unwrap(); + + let mut cx = Context::from_waker(noop_waker_ref()); + let use_linger_task = inner.poll_close(&mut cx).is_pending(); + + if use_linger_task { + spawn_local(poll_fn(move |cx| inner.poll_close(cx))); + } else if cfg!(debug_assertions) { + spawn_local(async {}); // Spawn dummy task to detect missing LocalSet. + } + } +} + +#[derive(Debug)] +pub struct TlsStreamInner { + tls: TlsSession, + tcp: TcpStream, + rd_state: State, + wr_state: State, +} + +impl TlsStreamInner { + fn poll_io( + &mut self, + cx: &mut Context<'_>, + flow: Flow, + ) -> Poll> { + loop { + let wr_ready = loop { + match self.wr_state { + _ if self.tls.is_handshaking() && !self.tls.wants_write() => { + break true; + } + _ if self.tls.is_handshaking() => {} + State::StreamOpen if !self.tls.wants_write() => break true, + State::StreamClosed => { + // Rustls will enqueue the 'CloseNotify' alert and send it after + // flusing the data that is already in the queue. + self.tls.send_close_notify(); + self.wr_state = State::TlsClosing; + continue; + } + State::TlsClosing if !self.tls.wants_write() => { + self.wr_state = State::TlsClosed; + continue; + } + // If a 'CloseNotify' alert sent by the remote end has been received, + // shut down the underlying TCP socket. Otherwise, consider polling + // done for the moment. + State::TlsClosed if self.rd_state < State::TlsClosed => break true, + State::TlsClosed + if Pin::new(&mut self.tcp).poll_shutdown(cx)?.is_pending() => + { + break false; + } + State::TlsClosed => { + self.wr_state = State::TcpClosed; + continue; + } + State::TcpClosed => break true, + _ => {} + } + + // Poll whether there is space in the socket send buffer so we can flush + // the remaining outgoing ciphertext. + if self.tcp.poll_write_ready(cx)?.is_pending() { + break false; + } + + // Write ciphertext to the TCP socket. + let mut wrapped_tcp = ImplementWriteTrait(&mut self.tcp); + match self.tls.write_tls(&mut wrapped_tcp) { + Ok(0) => unreachable!(), + Ok(_) => {} + Err(err) if err.kind() == ErrorKind::WouldBlock => {} + Err(err) => return Poll::Ready(Err(err)), + } + }; + + let rd_ready = loop { + match self.rd_state { + State::TcpClosed if self.tls.is_handshaking() => { + let err = Error::new(ErrorKind::UnexpectedEof, "tls handshake eof"); + return Poll::Ready(Err(err)); + } + _ if self.tls.is_handshaking() && !self.tls.wants_read() => { + break true; + } + _ if self.tls.is_handshaking() => {} + State::StreamOpen if !self.tls.wants_read() => break true, + State::StreamOpen => {} + State::StreamClosed if !self.tls.wants_read() => { + // Rustls has more incoming cleartext buffered up, but the TLS + // session is closing so this data will never be processed by the + // application layer. Just like what would happen if this were a raw + // TCP stream, don't gracefully end the TLS session, but abort it. + return Poll::Ready(Err(Error::from(ErrorKind::ConnectionReset))); + } + State::StreamClosed => {} + State::TlsClosed if self.wr_state == State::TcpClosed => { + // Wait for the remote end to gracefully close the TCP connection. + // TODO(piscisaureus): this is unnecessary; remove when stable. + } + _ => break true, + } + + if self.rd_state < State::TlsClosed { + // Do a zero-length plaintext read so we can detect the arrival of + // 'CloseNotify' messages, even if only the write half is open. + // Actually reading data from the socket is done in `poll_read()`. + match self.tls.read(&mut []) { + Ok(0) => {} + Err(err) if err.kind() == ErrorKind::ConnectionAborted => { + // `Session::read()` returns `ConnectionAborted` when a + // 'CloseNotify' alert has been received, which indicates that + // the remote peer wants to gracefully end the TLS session. + self.rd_state = State::TlsClosed; + continue; + } + Err(err) => return Poll::Ready(Err(err)), + _ => unreachable!(), + } + } + + // Poll whether more ciphertext is available in the socket receive + // buffer. + if self.tcp.poll_read_ready(cx)?.is_pending() { + break false; + } + + // Receive ciphertext from the socket. + let mut wrapped_tcp = ImplementReadTrait(&mut self.tcp); + match self.tls.read_tls(&mut wrapped_tcp) { + Ok(0) => self.rd_state = State::TcpClosed, + Ok(_) => self + .tls + .process_new_packets() + .map_err(|err| Error::new(ErrorKind::InvalidData, err))?, + Err(err) if err.kind() == ErrorKind::WouldBlock => {} + Err(err) => return Poll::Ready(Err(err)), + } + }; + + if wr_ready { + if self.rd_state >= State::TlsClosed + && self.wr_state >= State::TlsClosed + && self.wr_state < State::TcpClosed + { + continue; + } + if self.tls.wants_write() { + continue; + } + } + + let io_ready = match flow { + _ if self.tls.is_handshaking() => false, + Flow::Read => rd_ready, + Flow::Write => wr_ready, + }; + return match io_ready { + false => Poll::Pending, + true => Poll::Ready(Ok(())), + }; + } + } + + fn poll_read( + &mut self, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + ready!(self.poll_io(cx, Flow::Read))?; + + if self.rd_state == State::StreamOpen { + let buf_slice = + unsafe { &mut *(buf.unfilled_mut() as *mut [_] as *mut [u8]) }; + let bytes_read = self.tls.read(buf_slice)?; + assert_ne!(bytes_read, 0); + unsafe { buf.assume_init(bytes_read) }; + buf.advance(bytes_read); + } + + Poll::Ready(Ok(())) + } + + fn poll_write( + &mut self, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + if buf.is_empty() { + // Tokio-rustls compatibility: a zero byte write always succeeds. + Poll::Ready(Ok(0)) + } else if self.wr_state == State::StreamOpen { + // Flush Rustls' ciphertext send queue. + ready!(self.poll_io(cx, Flow::Write))?; + + // Copy data from `buf` to the Rustls cleartext send queue. + let bytes_written = self.tls.write(buf)?; + assert_ne!(bytes_written, 0); + + // Try to flush as much ciphertext as possible. However, since we just + // handed off at least some bytes to rustls, so we can't return + // `Poll::Pending()` any more: this would tell the caller that it should + // try to send those bytes again. + let _ = self.poll_io(cx, Flow::Write)?; + + Poll::Ready(Ok(bytes_written)) + } else { + // Return error if stream has been shut down for writing. + Poll::Ready(Err(ErrorKind::BrokenPipe.into())) + } + } + + fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll> { + if self.wr_state == State::StreamOpen { + self.wr_state = State::StreamClosed; + } + + ready!(self.poll_io(cx, Flow::Write))?; + + // At minimum, a TLS 'CloseNotify' alert should have been sent. + assert!(self.wr_state >= State::TlsClosed); + // If we received a TLS 'CloseNotify' alert from the remote end + // already, the TCP socket should be shut down at this point. + assert!( + self.rd_state < State::TlsClosed || self.wr_state == State::TcpClosed + ); + + Poll::Ready(Ok(())) + } + + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + if self.rd_state == State::StreamOpen { + self.rd_state = State::StreamClosed; + } + + // Send TLS 'CloseNotify' alert. + ready!(self.poll_shutdown(cx))?; + // Wait for 'CloseNotify', shut down TCP stream, wait for TCP FIN packet. + ready!(self.poll_io(cx, Flow::Read))?; + + assert_eq!(self.rd_state, State::TcpClosed); + assert_eq!(self.wr_state, State::TcpClosed); + + Poll::Ready(Ok(())) + } +} + +#[derive(Debug)] +pub struct ReadHalf { + shared: Arc, +} + +impl ReadHalf { + pub fn reunite(self, wr: WriteHalf) -> TlsStream { + assert!(Arc::ptr_eq(&self.shared, &wr.shared)); + drop(wr); // Drop `wr`, so only one strong reference to `shared` remains. + + Arc::try_unwrap(self.shared) + .unwrap_or_else(|_| panic!("Arc::::try_unwrap() failed")) + .tls_stream + .into_inner() + .unwrap() + } +} + +impl AsyncRead for ReadHalf { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + self + .shared + .poll_with_shared_waker(cx, Flow::Read, move |tls, cx| { + tls.poll_read(cx, buf) + }) + } +} + +#[derive(Debug)] +pub struct WriteHalf { + shared: Arc, +} + +impl AsyncWrite for WriteHalf { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self + .shared + .poll_with_shared_waker(cx, Flow::Write, move |tls, cx| { + tls.poll_write(cx, buf) + }) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self + .shared + .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_flush(cx)) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self + .shared + .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_shutdown(cx)) + } +} + +#[derive(Debug)] +struct Shared { + tls_stream: Mutex, + rd_waker: AtomicWaker, + wr_waker: AtomicWaker, +} + +impl Shared { + fn new(tls_stream: TlsStream) -> Arc { + let self_ = Self { + tls_stream: Mutex::new(tls_stream), + rd_waker: AtomicWaker::new(), + wr_waker: AtomicWaker::new(), + }; + Arc::new(self_) + } + + fn poll_with_shared_waker( + self: &Arc, + cx: &mut Context<'_>, + flow: Flow, + mut f: impl FnMut(Pin<&mut TlsStream>, &mut Context<'_>) -> R, + ) -> R { + match flow { + Flow::Read => self.rd_waker.register(cx.waker()), + Flow::Write => self.wr_waker.register(cx.waker()), + } + + let shared_waker = self.new_shared_waker(); + let mut cx = Context::from_waker(&shared_waker); + + let mut tls_stream = self.tls_stream.lock().unwrap(); + f(Pin::new(&mut tls_stream), &mut cx) + } + + const SHARED_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( + Self::clone_shared_waker, + Self::wake_shared_waker, + Self::wake_shared_waker_by_ref, + Self::drop_shared_waker, + ); + + fn new_shared_waker(self: &Arc) -> Waker { + let self_weak = Arc::downgrade(self); + let self_ptr = self_weak.into_raw() as *const (); + let raw_waker = RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE); + unsafe { Waker::from_raw(raw_waker) } + } + + fn clone_shared_waker(self_ptr: *const ()) -> RawWaker { + let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) }; + let ptr1 = self_weak.clone().into_raw(); + let ptr2 = self_weak.into_raw(); + assert!(ptr1 == ptr2); + RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE) + } + + fn wake_shared_waker(self_ptr: *const ()) { + Self::wake_shared_waker_by_ref(self_ptr); + Self::drop_shared_waker(self_ptr); + } + + fn wake_shared_waker_by_ref(self_ptr: *const ()) { + let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) }; + if let Some(self_arc) = Weak::upgrade(&self_weak) { + self_arc.rd_waker.wake(); + self_arc.wr_waker.wake(); + } + self_weak.into_raw(); + } + + fn drop_shared_waker(self_ptr: *const ()) { + let _ = unsafe { Weak::from_raw(self_ptr as *const Self) }; + } +} + +struct ImplementReadTrait<'a, T>(&'a mut T); + +impl Read for ImplementReadTrait<'_, TcpStream> { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.0.try_read(buf) + } +} + +struct ImplementWriteTrait<'a, T>(&'a mut T); + +impl Write for ImplementWriteTrait<'_, TcpStream> { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.0.try_write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +pub fn init() -> Vec { + vec![ + ("op_start_tls", op_async(op_start_tls::

)), + ("op_connect_tls", op_async(op_connect_tls::

)), + ("op_listen_tls", op_sync(op_listen_tls::

)), + ("op_accept_tls", op_async(op_accept_tls)), + ] +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ConnectTlsArgs { + transport: String, + hostname: String, + port: u16, + cert_file: Option, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct StartTlsArgs { + rid: ResourceId, + cert_file: Option, + hostname: String, +} + +async fn op_start_tls( + state: Rc>, + args: StartTlsArgs, + _: (), +) -> Result +where + NP: NetPermissions + 'static, +{ + let rid = args.rid; + let hostname = match &*args.hostname { + "" => "localhost", + n => n, + }; + let cert_file = args.cert_file.as_deref(); + + { + super::check_unstable2(&state, "Deno.startTls"); + let mut s = state.borrow_mut(); + let permissions = s.borrow_mut::(); + permissions.check_net(&(hostname, Some(0)))?; + if let Some(path) = cert_file { + permissions.check_read(Path::new(path))?; + } + } + + let hostname_dns = DNSNameRef::try_from_ascii_str(hostname) + .map_err(|_| invalid_hostname(hostname))?; + + let resource_rc = state + .borrow_mut() + .resource_table + .take::(rid) + .ok_or_else(bad_resource_id)?; + let resource = Rc::try_unwrap(resource_rc) + .expect("Only a single use of this resource should happen"); + let (read_half, write_half) = resource.into_inner(); + let tcp_stream = read_half.reunite(write_half)?; + + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + + let mut tls_config = ClientConfig::new(); + tls_config.set_persistence(CLIENT_SESSION_MEMORY_CACHE.clone()); + tls_config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + if let Some(path) = cert_file { + let key_file = File::open(path)?; + let reader = &mut BufReader::new(key_file); + tls_config.root_store.add_pem_file(reader).unwrap(); + } + let tls_config = Arc::new(tls_config); + + let tls_stream = + TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns); + + let rid = { + let mut state_ = state.borrow_mut(); + state_ + .resource_table + .add(TlsStreamResource::new(tls_stream.into_split())) + }; + + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) +} + +async fn op_connect_tls( + state: Rc>, + args: ConnectTlsArgs, + _: (), +) -> Result +where + NP: NetPermissions + 'static, +{ + assert_eq!(args.transport, "tcp"); + let hostname = match &*args.hostname { + "" => "localhost", + n => n, + }; + let port = args.port; + let cert_file = args.cert_file.as_deref(); + + { + let mut s = state.borrow_mut(); + let permissions = s.borrow_mut::(); + permissions.check_net(&(hostname, Some(port)))?; + if let Some(path) = cert_file { + permissions.check_read(Path::new(path))?; + } + } + + let hostname_dns = DNSNameRef::try_from_ascii_str(hostname) + .map_err(|_| invalid_hostname(hostname))?; + + let connect_addr = resolve_addr(hostname, port) + .await? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + let tcp_stream = TcpStream::connect(connect_addr).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + + let mut tls_config = ClientConfig::new(); + tls_config.set_persistence(CLIENT_SESSION_MEMORY_CACHE.clone()); + tls_config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + if let Some(path) = cert_file { + let key_file = File::open(path)?; + let reader = &mut BufReader::new(key_file); + tls_config.root_store.add_pem_file(reader).unwrap(); + } + let tls_config = Arc::new(tls_config); + + let tls_stream = + TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns); + + let rid = { + let mut state_ = state.borrow_mut(); + state_ + .resource_table + .add(TlsStreamResource::new(tls_stream.into_split())) + }; + + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) +} + +fn load_certs(path: &str) -> Result, AnyError> { + let cert_file = File::open(path)?; + let reader = &mut BufReader::new(cert_file); + + let certs = certs(reader) + .map_err(|_| custom_error("InvalidData", "Unable to decode certificate"))?; + + if certs.is_empty() { + let e = custom_error("InvalidData", "No certificates found in cert file"); + return Err(e); + } + + Ok(certs) +} + +fn key_decode_err() -> AnyError { + custom_error("InvalidData", "Unable to decode key") +} + +fn key_not_found_err() -> AnyError { + custom_error("InvalidData", "No keys found in key file") +} + +/// Starts with -----BEGIN RSA PRIVATE KEY----- +fn load_rsa_keys(path: &str) -> Result, AnyError> { + let key_file = File::open(path)?; + let reader = &mut BufReader::new(key_file); + let keys = rsa_private_keys(reader).map_err(|_| key_decode_err())?; + Ok(keys) +} + +/// Starts with -----BEGIN PRIVATE KEY----- +fn load_pkcs8_keys(path: &str) -> Result, AnyError> { + let key_file = File::open(path)?; + let reader = &mut BufReader::new(key_file); + let keys = pkcs8_private_keys(reader).map_err(|_| key_decode_err())?; + Ok(keys) +} + +fn load_keys(path: &str) -> Result, AnyError> { + let path = path.to_string(); + let mut keys = load_rsa_keys(&path)?; + + if keys.is_empty() { + keys = load_pkcs8_keys(&path)?; + } + + if keys.is_empty() { + return Err(key_not_found_err()); + } + + Ok(keys) +} + +pub struct TlsListenerResource { + tcp_listener: AsyncRefCell, + tls_config: Arc, + cancel_handle: CancelHandle, +} + +impl Resource for TlsListenerResource { + fn name(&self) -> Cow { + "tlsListener".into() + } + + fn close(self: Rc) { + self.cancel_handle.cancel(); + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ListenTlsArgs { + transport: String, + hostname: String, + port: u16, + cert_file: String, + key_file: String, + alpn_protocols: Option>, +} + +fn op_listen_tls( + state: &mut OpState, + args: ListenTlsArgs, + _: (), +) -> Result +where + NP: NetPermissions + 'static, +{ + assert_eq!(args.transport, "tcp"); + let hostname = &*args.hostname; + let port = args.port; + let cert_file = &*args.cert_file; + let key_file = &*args.key_file; + + { + let permissions = state.borrow_mut::(); + permissions.check_net(&(hostname, Some(port)))?; + permissions.check_read(Path::new(cert_file))?; + permissions.check_read(Path::new(key_file))?; + } + + let mut tls_config = ServerConfig::new(NoClientAuth::new()); + if let Some(alpn_protocols) = args.alpn_protocols { + super::check_unstable(state, "Deno.listenTls#alpn_protocols"); + tls_config.alpn_protocols = + alpn_protocols.into_iter().map(|s| s.into_bytes()).collect(); + } + tls_config + .set_single_cert(load_certs(cert_file)?, load_keys(key_file)?.remove(0)) + .expect("invalid key or certificate"); + + let bind_addr = resolve_addr_sync(hostname, port)? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + let std_listener = std::net::TcpListener::bind(bind_addr)?; + std_listener.set_nonblocking(true)?; + let tcp_listener = TcpListener::from_std(std_listener)?; + let local_addr = tcp_listener.local_addr()?; + + let tls_listener_resource = TlsListenerResource { + tcp_listener: AsyncRefCell::new(tcp_listener), + tls_config: Arc::new(tls_config), + cancel_handle: Default::default(), + }; + + let rid = state.resource_table.add(tls_listener_resource); + + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: None, + }) +} + +async fn op_accept_tls( + state: Rc>, + rid: ResourceId, + _: (), +) -> Result { + let resource = state + .borrow() + .resource_table + .get::(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + + let cancel_handle = RcRef::map(&resource, |r| &r.cancel_handle); + let tcp_listener = RcRef::map(&resource, |r| &r.tcp_listener) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?; + + let (tcp_stream, remote_addr) = + match tcp_listener.accept().try_or_cancel(&cancel_handle).await { + Ok(tuple) => tuple, + Err(err) if err.kind() == ErrorKind::Interrupted => { + // FIXME(bartlomieju): compatibility with current JS implementation. + return Err(bad_resource("Listener has been closed")); + } + Err(err) => return Err(err.into()), + }; + + let local_addr = tcp_stream.local_addr()?; + + let tls_stream = TlsStream::new_server_side(tcp_stream, &resource.tls_config); + + let rid = { + let mut state_ = state.borrow_mut(); + state_ + .resource_table + .add(TlsStreamResource::new(tls_stream.into_split())) + }; + + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) +} diff --git a/extensions/net/ops_unix.rs b/extensions/net/ops_unix.rs new file mode 100644 index 000000000..9dfcc231e --- /dev/null +++ b/extensions/net/ops_unix.rs @@ -0,0 +1,180 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use crate::io::UnixStreamResource; +use crate::ops::AcceptArgs; +use crate::ops::OpAddr; +use crate::ops::OpConn; +use crate::ops::OpPacket; +use crate::ops::ReceiveArgs; +use deno_core::error::bad_resource; +use deno_core::error::custom_error; +use deno_core::error::null_opbuf; +use deno_core::error::AnyError; +use deno_core::AsyncRefCell; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ZeroCopyBuf; +use serde::Deserialize; +use serde::Serialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::fs::remove_file; +use std::path::Path; +use std::rc::Rc; +use tokio::net::UnixDatagram; +use tokio::net::UnixListener; +pub use tokio::net::UnixStream; + +/// A utility function to map OsStrings to Strings +pub fn into_string(s: std::ffi::OsString) -> Result { + s.into_string().map_err(|s| { + let message = format!("File name or path {:?} is not valid UTF-8", s); + custom_error("InvalidData", message) + }) +} + +struct UnixListenerResource { + listener: AsyncRefCell, + cancel: CancelHandle, +} + +impl Resource for UnixListenerResource { + fn name(&self) -> Cow { + "unixListener".into() + } + + fn close(self: Rc) { + self.cancel.cancel(); + } +} + +pub struct UnixDatagramResource { + pub socket: AsyncRefCell, + pub cancel: CancelHandle, +} + +impl Resource for UnixDatagramResource { + fn name(&self) -> Cow { + "unixDatagram".into() + } + + fn close(self: Rc) { + self.cancel.cancel(); + } +} + +#[derive(Serialize)] +pub struct UnixAddr { + pub path: Option, +} + +#[derive(Deserialize)] +pub struct UnixListenArgs { + pub path: String, +} + +pub(crate) async fn accept_unix( + state: Rc>, + args: AcceptArgs, + _: (), +) -> Result { + let rid = args.rid; + + let resource = state + .borrow() + .resource_table + .get::(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + let listener = RcRef::map(&resource, |r| &r.listener) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Listener already in use"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (unix_stream, _socket_addr) = + listener.accept().try_or_cancel(cancel).await?; + + let local_addr = unix_stream.local_addr()?; + let remote_addr = unix_stream.peer_addr()?; + let resource = UnixStreamResource::new(unix_stream.into_split()); + let mut state = state.borrow_mut(); + let rid = state.resource_table.add(resource); + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Unix(UnixAddr { + path: local_addr.as_pathname().and_then(pathstring), + })), + remote_addr: Some(OpAddr::Unix(UnixAddr { + path: remote_addr.as_pathname().and_then(pathstring), + })), + }) +} + +pub(crate) async fn receive_unix_packet( + state: Rc>, + args: ReceiveArgs, + buf: Option, +) -> Result { + let mut buf = buf.ok_or_else(null_opbuf)?; + + let rid = args.rid; + + let resource = state + .borrow() + .resource_table + .get::(rid) + .ok_or_else(|| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (size, remote_addr) = + socket.recv_from(&mut buf).try_or_cancel(cancel).await?; + Ok(OpPacket { + size, + remote_addr: OpAddr::UnixPacket(UnixAddr { + path: remote_addr.as_pathname().and_then(pathstring), + }), + }) +} + +pub fn listen_unix( + state: &mut OpState, + addr: &Path, +) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> { + if addr.exists() { + remove_file(&addr).unwrap(); + } + let listener = UnixListener::bind(&addr)?; + let local_addr = listener.local_addr()?; + let listener_resource = UnixListenerResource { + listener: AsyncRefCell::new(listener), + cancel: Default::default(), + }; + let rid = state.resource_table.add(listener_resource); + + Ok((rid, local_addr)) +} + +pub fn listen_unix_packet( + state: &mut OpState, + addr: &Path, +) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> { + if addr.exists() { + remove_file(&addr).unwrap(); + } + let socket = UnixDatagram::bind(&addr)?; + let local_addr = socket.local_addr()?; + let datagram_resource = UnixDatagramResource { + socket: AsyncRefCell::new(socket), + cancel: Default::default(), + }; + let rid = state.resource_table.add(datagram_resource); + + Ok((rid, local_addr)) +} + +pub fn pathstring(pathname: &Path) -> Option { + into_string(pathname.into()).ok() +} diff --git a/extensions/net/resolve_addr.rs b/extensions/net/resolve_addr.rs new file mode 100644 index 000000000..ebf1374d1 --- /dev/null +++ b/extensions/net/resolve_addr.rs @@ -0,0 +1,156 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use deno_core::error::AnyError; +use std::net::SocketAddr; +use std::net::ToSocketAddrs; +use tokio::net::lookup_host; + +/// Resolve network address *asynchronously*. +pub async fn resolve_addr( + hostname: &str, + port: u16, +) -> Result + '_, AnyError> { + let addr_port_pair = make_addr_port_pair(hostname, port); + let result = lookup_host(addr_port_pair).await?; + Ok(result) +} + +/// Resolve network address *synchronously*. +pub fn resolve_addr_sync( + hostname: &str, + port: u16, +) -> Result, AnyError> { + let addr_port_pair = make_addr_port_pair(hostname, port); + let result = addr_port_pair.to_socket_addrs()?; + Ok(result) +} + +fn make_addr_port_pair(hostname: &str, port: u16) -> (&str, u16) { + // Default to localhost if given just the port. Example: ":80" + if hostname.is_empty() { + return ("0.0.0.0", port); + } + + // If this looks like an ipv6 IP address. Example: "[2001:db8::1]" + // Then we remove the brackets. + let addr = hostname.trim_start_matches('[').trim_end_matches(']'); + (addr, port) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::Ipv4Addr; + use std::net::Ipv6Addr; + use std::net::SocketAddrV4; + use std::net::SocketAddrV6; + + #[tokio::test] + async fn resolve_addr1() { + let expected = vec![SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + 80, + ))]; + let actual = resolve_addr("127.0.0.1", 80) + .await + .unwrap() + .collect::>(); + assert_eq!(actual, expected); + } + + #[tokio::test] + async fn resolve_addr2() { + let expected = vec![SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(0, 0, 0, 0), + 80, + ))]; + let actual = resolve_addr("", 80).await.unwrap().collect::>(); + assert_eq!(actual, expected); + } + + #[tokio::test] + async fn resolve_addr3() { + let expected = vec![SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(192, 0, 2, 1), + 25, + ))]; + let actual = resolve_addr("192.0.2.1", 25) + .await + .unwrap() + .collect::>(); + assert_eq!(actual, expected); + } + + #[tokio::test] + async fn resolve_addr_ipv6() { + let expected = vec![SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1), + 8080, + 0, + 0, + ))]; + let actual = resolve_addr("[2001:db8::1]", 8080) + .await + .unwrap() + .collect::>(); + assert_eq!(actual, expected); + } + + #[tokio::test] + async fn resolve_addr_err() { + assert!(resolve_addr("INVALID ADDR", 1234).await.is_err()); + } + + #[test] + fn resolve_addr_sync1() { + let expected = vec![SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + 80, + ))]; + let actual = resolve_addr_sync("127.0.0.1", 80) + .unwrap() + .collect::>(); + assert_eq!(actual, expected); + } + + #[test] + fn resolve_addr_sync2() { + let expected = vec![SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(0, 0, 0, 0), + 80, + ))]; + let actual = resolve_addr_sync("", 80).unwrap().collect::>(); + assert_eq!(actual, expected); + } + + #[test] + fn resolve_addr_sync3() { + let expected = vec![SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(192, 0, 2, 1), + 25, + ))]; + let actual = resolve_addr_sync("192.0.2.1", 25) + .unwrap() + .collect::>(); + assert_eq!(actual, expected); + } + + #[test] + fn resolve_addr_sync_ipv6() { + let expected = vec![SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1), + 8080, + 0, + 0, + ))]; + let actual = resolve_addr_sync("[2001:db8::1]", 8080) + .unwrap() + .collect::>(); + assert_eq!(actual, expected); + } + + #[test] + fn resolve_addr_sync_err() { + assert!(resolve_addr_sync("INVALID ADDR", 1234).is_err()); + } +} diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index bf1c24847..b3f152706 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -23,6 +23,7 @@ deno_console = { version = "0.10.0", path = "../extensions/console" } deno_core = { version = "0.92.0", path = "../core" } deno_crypto = { version = "0.24.0", path = "../extensions/crypto" } deno_fetch = { version = "0.32.0", path = "../extensions/fetch" } +deno_net = { version = "0.1.0", path = "../extensions/net" } deno_timers = { version = "0.8.0", path = "../extensions/timers" } deno_url = { version = "0.10.0", path = "../extensions/url" } deno_web = { version = "0.41.0", path = "../extensions/web" } @@ -41,6 +42,7 @@ deno_console = { version = "0.10.0", path = "../extensions/console" } deno_core = { version = "0.92.0", path = "../core" } deno_crypto = { version = "0.24.0", path = "../extensions/crypto" } deno_fetch = { version = "0.32.0", path = "../extensions/fetch" } +deno_net = { version = "0.1.0", path = "../extensions/net" } deno_timers = { version = "0.8.0", path = "../extensions/timers" } deno_url = { version = "0.10.0", path = "../extensions/url" } deno_web = { version = "0.41.0", path = "../extensions/web" } @@ -50,7 +52,6 @@ deno_websocket = { version = "0.15.0", path = "../extensions/websocket" } deno_webstorage = { version = "0.5.0", path = "../extensions/webstorage" } atty = "0.2.14" -bytes = "1" dlopen = "0.1.8" encoding_rs = "0.8.28" filetime = "0.2.14" @@ -64,17 +65,11 @@ notify = "5.0.0-pre.7" percent-encoding = "2.1.0" regex = "1.4.3" ring = "0.16.20" -rustls = "0.19.0" serde = { version = "1.0.125", features = ["derive"] } sys-info = "0.9.0" termcolor = "1.1.2" tokio = { version = "1.7.1", features = ["full"] } -tokio-util = { version = "0.6", features = ["io"] } uuid = { version = "0.8.2", features = ["v4"] } -webpki = "0.21.4" -webpki-roots = "0.21.1" -trust-dns-proto = "0.20.3" -trust-dns-resolver = { version = "0.20.3", features = ["tokio-runtime", "serde-config"] } [target.'cfg(windows)'.dependencies] fwdansi = "1.1.0" diff --git a/runtime/build.rs b/runtime/build.rs index 7d086b045..3e8f8e5b0 100644 --- a/runtime/build.rs +++ b/runtime/build.rs @@ -59,6 +59,7 @@ fn create_runtime_snapshot(snapshot_path: &Path, files: Vec) { deno_broadcast_channel::InMemoryBroadcastChannel::default(), false, // No --unstable. ), + deno_net::init::(false), // No --unstable. ]; let js_runtime = JsRuntime::new(RuntimeOptions { diff --git a/runtime/js/01_errors.js b/runtime/js/01_errors.js index a46a0a149..d59bd7adb 100644 --- a/runtime/js/01_errors.js +++ b/runtime/js/01_errors.js @@ -2,6 +2,9 @@ "use strict"; ((window) => { + const core = window.Deno.core; + const { BadResource, Interrupted } = core; + class NotFound extends Error { constructor(msg) { super(msg); @@ -86,13 +89,6 @@ } } - class Interrupted extends Error { - constructor(msg) { - super(msg); - this.name = "Interrupted"; - } - } - class WriteZero extends Error { constructor(msg) { super(msg); @@ -107,13 +103,6 @@ } } - class BadResource extends Error { - constructor(msg) { - super(msg); - this.name = "BadResource"; - } - } - class Http extends Error { constructor(msg) { super(msg); diff --git a/runtime/js/30_net.js b/runtime/js/30_net.js deleted file mode 100644 index 2d4b1e48e..000000000 --- a/runtime/js/30_net.js +++ /dev/null @@ -1,220 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -"use strict"; - -((window) => { - const core = window.Deno.core; - const { errors } = window.__bootstrap.errors; - const { read, write } = window.__bootstrap.io; - - function shutdown(rid) { - return core.opAsync("op_shutdown", rid); - } - - function opAccept(rid, transport) { - return core.opAsync("op_accept", { rid, transport }); - } - - function opListen(args) { - return core.opSync("op_listen", args); - } - - function opConnect(args) { - return core.opAsync("op_connect", args); - } - - function opReceive(rid, transport, zeroCopy) { - return core.opAsync( - "op_datagram_receive", - { rid, transport }, - zeroCopy, - ); - } - - function opSend(args, zeroCopy) { - return core.opAsync("op_datagram_send", args, zeroCopy); - } - - function resolveDns(query, recordType, options) { - return core.opAsync("op_dns_resolve", { query, recordType, options }); - } - - class Conn { - #rid = 0; - #remoteAddr = null; - #localAddr = null; - constructor(rid, remoteAddr, localAddr) { - this.#rid = rid; - this.#remoteAddr = remoteAddr; - this.#localAddr = localAddr; - } - - get rid() { - return this.#rid; - } - - get remoteAddr() { - return this.#remoteAddr; - } - - get localAddr() { - return this.#localAddr; - } - - write(p) { - return write(this.rid, p); - } - - read(p) { - return read(this.rid, p); - } - - close() { - core.close(this.rid); - } - - closeWrite() { - return shutdown(this.rid); - } - } - - class Listener { - #rid = 0; - #addr = null; - - constructor(rid, addr) { - this.#rid = rid; - this.#addr = addr; - } - - get rid() { - return this.#rid; - } - - get addr() { - return this.#addr; - } - - async accept() { - const res = await opAccept(this.rid, this.addr.transport); - return new Conn(res.rid, res.remoteAddr, res.localAddr); - } - - async next() { - let conn; - try { - conn = await this.accept(); - } catch (error) { - if (error instanceof errors.BadResource) { - return { value: undefined, done: true }; - } - throw error; - } - return { value: conn, done: false }; - } - - return(value) { - this.close(); - return Promise.resolve({ value, done: true }); - } - - close() { - core.close(this.rid); - } - - [Symbol.asyncIterator]() { - return this; - } - } - - class Datagram { - #rid = 0; - #addr = null; - - constructor(rid, addr, bufSize = 1024) { - this.#rid = rid; - this.#addr = addr; - this.bufSize = bufSize; - } - - get rid() { - return this.#rid; - } - - get addr() { - return this.#addr; - } - - async receive(p) { - const buf = p || new Uint8Array(this.bufSize); - const { size, remoteAddr } = await opReceive( - this.rid, - this.addr.transport, - buf, - ); - const sub = buf.subarray(0, size); - return [sub, remoteAddr]; - } - - send(p, addr) { - const remote = { hostname: "127.0.0.1", ...addr }; - - const args = { ...remote, rid: this.rid }; - return opSend(args, p); - } - - close() { - core.close(this.rid); - } - - async *[Symbol.asyncIterator]() { - while (true) { - try { - yield await this.receive(); - } catch (err) { - if (err instanceof errors.BadResource) { - break; - } - throw err; - } - } - } - } - - function listen({ hostname, ...options }) { - const res = opListen({ - transport: "tcp", - hostname: typeof hostname === "undefined" ? "0.0.0.0" : hostname, - ...options, - }); - - return new Listener(res.rid, res.localAddr); - } - - async function connect(options) { - let res; - - if (options.transport === "unix") { - res = await opConnect(options); - } else { - res = await opConnect({ - transport: "tcp", - hostname: "127.0.0.1", - ...options, - }); - } - - return new Conn(res.rid, res.remoteAddr, res.localAddr); - } - - window.__bootstrap.net = { - connect, - Conn, - opConnect, - listen, - opListen, - Listener, - shutdown, - Datagram, - resolveDns, - }; -})(this); diff --git a/runtime/js/40_http.js b/runtime/js/40_http.js deleted file mode 100644 index 5aa57238b..000000000 --- a/runtime/js/40_http.js +++ /dev/null @@ -1,251 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -"use strict"; - -((window) => { - const { InnerBody } = window.__bootstrap.fetchBody; - const { Response, fromInnerRequest, toInnerResponse, newInnerRequest } = - window.__bootstrap.fetch; - const errors = window.__bootstrap.errors.errors; - const core = window.Deno.core; - const { ReadableStream } = window.__bootstrap.streams; - const abortSignal = window.__bootstrap.abortSignal; - - function serveHttp(conn) { - const rid = Deno.core.opSync("op_http_start", conn.rid); - return new HttpConn(rid); - } - - const connErrorSymbol = Symbol("connError"); - - class HttpConn { - #rid = 0; - - constructor(rid) { - this.#rid = rid; - } - - /** @returns {number} */ - get rid() { - return this.#rid; - } - - /** @returns {Promise} */ - async nextRequest() { - let nextRequest; - try { - nextRequest = await Deno.core.opAsync( - "op_http_request_next", - this.#rid, - ); - } catch (error) { - // A connection error seen here would cause disrupted responses to throw - // a generic `BadResource` error. Instead store this error and replace - // those with it. - this[connErrorSymbol] = error; - if (error instanceof errors.BadResource) { - return null; - } else if (error instanceof errors.Interrupted) { - return null; - } else if (error.message.includes("connection closed")) { - return null; - } - throw error; - } - if (nextRequest === null) return null; - - const [ - requestBodyRid, - responseSenderRid, - method, - headersList, - url, - ] = nextRequest; - - /** @type {ReadableStream | undefined} */ - let body = null; - if (typeof requestBodyRid === "number") { - body = createRequestBodyStream(requestBodyRid); - } - - const innerRequest = newInnerRequest( - method, - url, - headersList, - body !== null ? new InnerBody(body) : null, - ); - const signal = abortSignal.newSignal(); - const request = fromInnerRequest(innerRequest, signal, "immutable"); - - const respondWith = createRespondWith(this, responseSenderRid); - - return { request, respondWith }; - } - - /** @returns {void} */ - close() { - core.close(this.#rid); - } - - [Symbol.asyncIterator]() { - // deno-lint-ignore no-this-alias - const httpConn = this; - return { - async next() { - const reqEvt = await httpConn.nextRequest(); - // Change with caution, current form avoids a v8 deopt - return { value: reqEvt, done: reqEvt === null }; - }, - }; - } - } - - function readRequest(requestRid, zeroCopyBuf) { - return Deno.core.opAsync( - "op_http_request_read", - requestRid, - zeroCopyBuf, - ); - } - - function createRespondWith(httpConn, responseSenderRid) { - return async function respondWith(resp) { - if (resp instanceof Promise) { - resp = await resp; - } - - if (!(resp instanceof Response)) { - throw new TypeError( - "First argument to respondWith must be a Response or a promise resolving to a Response.", - ); - } - - const innerResp = toInnerResponse(resp); - - // If response body length is known, it will be sent synchronously in a - // single op, in other case a "response body" resource will be created and - // we'll be streaming it. - /** @type {ReadableStream | Uint8Array | null} */ - let respBody = null; - if (innerResp.body !== null) { - if (innerResp.body.unusable()) throw new TypeError("Body is unusable."); - if (innerResp.body.streamOrStatic instanceof ReadableStream) { - if (innerResp.body.length === null) { - respBody = innerResp.body.stream; - } else { - const reader = innerResp.body.stream.getReader(); - const r1 = await reader.read(); - if (r1.done) { - respBody = new Uint8Array(0); - } else { - respBody = r1.value; - const r2 = await reader.read(); - if (!r2.done) throw new TypeError("Unreachable"); - } - } - } else { - innerResp.body.streamOrStatic.consumed = true; - respBody = innerResp.body.streamOrStatic.body; - } - } else { - respBody = new Uint8Array(0); - } - - let responseBodyRid; - try { - responseBodyRid = await Deno.core.opAsync("op_http_response", [ - responseSenderRid, - innerResp.status ?? 200, - innerResp.headerList, - ], respBody instanceof Uint8Array ? respBody : null); - } catch (error) { - const connError = httpConn[connErrorSymbol]; - if (error instanceof errors.BadResource && connError != null) { - // deno-lint-ignore no-ex-assign - error = new connError.constructor(connError.message); - } - if (respBody !== null && respBody instanceof ReadableStream) { - await respBody.cancel(error); - } - throw error; - } - - // If `respond` returns a responseBodyRid, we should stream the body - // to that resource. - if (responseBodyRid !== null) { - try { - if (respBody === null || !(respBody instanceof ReadableStream)) { - throw new TypeError("Unreachable"); - } - const reader = respBody.getReader(); - while (true) { - const { value, done } = await reader.read(); - if (done) break; - if (!(value instanceof Uint8Array)) { - await reader.cancel(new TypeError("Value not a Uint8Array")); - break; - } - try { - await Deno.core.opAsync( - "op_http_response_write", - responseBodyRid, - value, - ); - } catch (error) { - const connError = httpConn[connErrorSymbol]; - if (error instanceof errors.BadResource && connError != null) { - // deno-lint-ignore no-ex-assign - error = new connError.constructor(connError.message); - } - await reader.cancel(error); - throw error; - } - } - } finally { - // Once all chunks are sent, and the request body is closed, we can - // close the response body. - try { - await Deno.core.opAsync("op_http_response_close", responseBodyRid); - } catch { /* pass */ } - } - } - }; - } - - function createRequestBodyStream(requestBodyRid) { - return new ReadableStream({ - type: "bytes", - async pull(controller) { - try { - // This is the largest possible size for a single packet on a TLS - // stream. - const chunk = new Uint8Array(16 * 1024 + 256); - const read = await readRequest( - requestBodyRid, - chunk, - ); - if (read > 0) { - // We read some data. Enqueue it onto the stream. - controller.enqueue(chunk.subarray(0, read)); - } else { - // We have reached the end of the body, so we close the stream. - controller.close(); - core.close(requestBodyRid); - } - } catch (err) { - // There was an error while reading a chunk of the body, so we - // error. - controller.error(err); - controller.close(); - core.close(requestBodyRid); - } - }, - cancel() { - core.close(requestBodyRid); - }, - }); - } - - window.__bootstrap.http = { - serveHttp, - }; -})(this); diff --git a/runtime/js/40_net_unstable.js b/runtime/js/40_net_unstable.js deleted file mode 100644 index ca265bfaa..000000000 --- a/runtime/js/40_net_unstable.js +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -"use strict"; - -((window) => { - const net = window.__bootstrap.net; - - function listen(options) { - if (options.transport === "unix") { - const res = net.opListen(options); - return new net.Listener(res.rid, res.localAddr); - } else { - return net.listen(options); - } - } - - function listenDatagram( - options, - ) { - let res; - if (options.transport === "unixpacket") { - res = net.opListen(options); - } else { - res = net.opListen({ - transport: "udp", - hostname: "127.0.0.1", - ...options, - }); - } - - return new net.Datagram(res.rid, res.localAddr); - } - - async function connect( - options, - ) { - if (options.transport === "unix") { - const res = await net.opConnect(options); - return new net.Conn(res.rid, res.remoteAddr, res.localAddr); - } else { - return net.connect(options); - } - } - - window.__bootstrap.netUnstable = { - connect, - listenDatagram, - listen, - }; -})(this); diff --git a/runtime/js/40_tls.js b/runtime/js/40_tls.js deleted file mode 100644 index 4fafe9079..000000000 --- a/runtime/js/40_tls.js +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -"use strict"; - -((window) => { - const core = window.Deno.core; - const { Listener, Conn } = window.__bootstrap.net; - - function opConnectTls( - args, - ) { - return core.opAsync("op_connect_tls", args); - } - - function opAcceptTLS(rid) { - return core.opAsync("op_accept_tls", rid); - } - - function opListenTls(args) { - return core.opSync("op_listen_tls", args); - } - - function opStartTls(args) { - return core.opAsync("op_start_tls", args); - } - - async function connectTls({ - port, - hostname = "127.0.0.1", - transport = "tcp", - certFile = undefined, - }) { - const res = await opConnectTls({ - port, - hostname, - transport, - certFile, - }); - return new Conn(res.rid, res.remoteAddr, res.localAddr); - } - - class TLSListener extends Listener { - async accept() { - const res = await opAcceptTLS(this.rid); - return new Conn(res.rid, res.remoteAddr, res.localAddr); - } - } - - function listenTls({ - port, - certFile, - keyFile, - hostname = "0.0.0.0", - transport = "tcp", - alpnProtocols, - }) { - const res = opListenTls({ - port, - certFile, - keyFile, - hostname, - transport, - alpnProtocols, - }); - return new TLSListener(res.rid, res.localAddr); - } - - async function startTls( - conn, - { hostname = "127.0.0.1", certFile } = {}, - ) { - const res = await opStartTls({ - rid: conn.rid, - hostname, - certFile, - }); - return new Conn(res.rid, res.remoteAddr, res.localAddr); - } - - window.__bootstrap.tls = { - startTls, - listenTls, - connectTls, - TLSListener, - }; -})(this); diff --git a/runtime/lib.rs b/runtime/lib.rs index 63829c2d2..aa95aefbc 100644 --- a/runtime/lib.rs +++ b/runtime/lib.rs @@ -4,6 +4,7 @@ pub use deno_broadcast_channel; pub use deno_console; pub use deno_crypto; pub use deno_fetch; +pub use deno_net; pub use deno_timers; pub use deno_url; pub use deno_web; @@ -20,7 +21,6 @@ pub mod js; pub mod metrics; pub mod ops; pub mod permissions; -pub mod resolve_addr; pub mod tokio_util; pub mod web_worker; pub mod worker; diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs deleted file mode 100644 index 01658c802..000000000 --- a/runtime/ops/http.rs +++ /dev/null @@ -1,579 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -use crate::ops::io::TcpStreamResource; -use crate::ops::io::TlsStreamResource; -use crate::ops::tls::TlsStream; -use deno_core::error::bad_resource_id; -use deno_core::error::null_opbuf; -use deno_core::error::type_error; -use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; -use deno_core::futures::FutureExt; -use deno_core::futures::Stream; -use deno_core::futures::StreamExt; -use deno_core::op_async; -use deno_core::op_sync; -use deno_core::AsyncRefCell; -use deno_core::ByteString; -use deno_core::CancelHandle; -use deno_core::CancelTryFuture; -use deno_core::Extension; -use deno_core::OpState; -use deno_core::RcRef; -use deno_core::Resource; -use deno_core::ResourceId; -use deno_core::ZeroCopyBuf; -use hyper::body::HttpBody; -use hyper::http; -use hyper::server::conn::Connection; -use hyper::server::conn::Http; -use hyper::service::Service as HyperService; -use hyper::Body; -use hyper::Request; -use hyper::Response; -use serde::Deserialize; -use serde::Serialize; -use std::borrow::Cow; -use std::cell::RefCell; -use std::future::Future; -use std::net::SocketAddr; -use std::pin::Pin; -use std::rc::Rc; -use std::task::Context; -use std::task::Poll; -use tokio::io::AsyncReadExt; -use tokio::net::TcpStream; -use tokio::sync::oneshot; -use tokio_util::io::StreamReader; - -pub fn init() -> Extension { - Extension::builder() - .ops(vec![ - ("op_http_start", op_sync(op_http_start)), - ("op_http_request_next", op_async(op_http_request_next)), - ("op_http_request_read", op_async(op_http_request_read)), - ("op_http_response", op_async(op_http_response)), - ("op_http_response_write", op_async(op_http_response_write)), - ("op_http_response_close", op_async(op_http_response_close)), - ]) - .build() -} - -struct ServiceInner { - request: Request, - response_tx: oneshot::Sender>, -} - -#[derive(Clone, Default)] -struct Service { - inner: Rc>>, - waker: Rc, -} - -impl HyperService> for Service { - type Response = Response; - type Error = http::Error; - #[allow(clippy::type_complexity)] - type Future = - Pin>>>; - - fn poll_ready( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll> { - if self.inner.borrow().is_some() { - Poll::Pending - } else { - Poll::Ready(Ok(())) - } - } - - fn call(&mut self, req: Request) -> Self::Future { - let (resp_tx, resp_rx) = oneshot::channel(); - self.inner.borrow_mut().replace(ServiceInner { - request: req, - response_tx: resp_tx, - }); - - async move { Ok(resp_rx.await.unwrap()) }.boxed_local() - } -} - -enum ConnType { - Tcp(Rc>>), - Tls(Rc>>), -} - -struct ConnResource { - hyper_connection: ConnType, - deno_service: Service, - addr: SocketAddr, - cancel: CancelHandle, -} - -impl ConnResource { - // TODO(ry) impl Future for ConnResource? - fn poll(&self, cx: &mut Context<'_>) -> Poll> { - match &self.hyper_connection { - ConnType::Tcp(c) => c.borrow_mut().poll_unpin(cx), - ConnType::Tls(c) => c.borrow_mut().poll_unpin(cx), - } - .map_err(AnyError::from) - } -} - -impl Resource for ConnResource { - fn name(&self) -> Cow { - "httpConnection".into() - } - - fn close(self: Rc) { - self.cancel.cancel() - } -} - -// We use a tuple instead of struct to avoid serialization overhead of the keys. -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct NextRequestResponse( - // request_body_rid: - Option, - // response_sender_rid: - ResourceId, - // method: - // This is a String rather than a ByteString because reqwest will only return - // the method as a str which is guaranteed to be ASCII-only. - String, - // headers: - Vec<(ByteString, ByteString)>, - // url: - String, -); - -async fn op_http_request_next( - state: Rc>, - conn_rid: ResourceId, - _: (), -) -> Result, AnyError> { - let conn_resource = state - .borrow() - .resource_table - .get::(conn_rid) - .ok_or_else(bad_resource_id)?; - - let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel); - - poll_fn(|cx| { - conn_resource.deno_service.waker.register(cx.waker()); - let connection_closed = match conn_resource.poll(cx) { - Poll::Pending => false, - Poll::Ready(Ok(())) => { - // try to close ConnResource, but don't unwrap as it might - // already be closed - let _ = state - .borrow_mut() - .resource_table - .take::(conn_rid); - true - } - Poll::Ready(Err(e)) => { - // TODO(ry) close RequestResource associated with connection - // TODO(ry) close ResponseBodyResource associated with connection - // close ConnResource - state - .borrow_mut() - .resource_table - .take::(conn_rid) - .unwrap(); - - if should_ignore_error(&e) { - true - } else { - return Poll::Ready(Err(e)); - } - } - }; - if let Some(request_resource) = - conn_resource.deno_service.inner.borrow_mut().take() - { - let tx = request_resource.response_tx; - let req = request_resource.request; - let method = req.method().to_string(); - - let mut headers = Vec::with_capacity(req.headers().len()); - for (name, value) in req.headers().iter() { - let name: &[u8] = name.as_ref(); - let value = value.as_bytes(); - headers - .push((ByteString(name.to_owned()), ByteString(value.to_owned()))); - } - - let url = { - let scheme = { - match conn_resource.hyper_connection { - ConnType::Tcp(_) => "http", - ConnType::Tls(_) => "https", - } - }; - let host: Cow = if let Some(host) = req.uri().host() { - Cow::Borrowed(host) - } else if let Some(host) = req.headers().get("HOST") { - Cow::Borrowed(host.to_str()?) - } else { - Cow::Owned(conn_resource.addr.to_string()) - }; - let path = req.uri().path_and_query().map_or("/", |p| p.as_str()); - format!("{}://{}{}", scheme, host, path) - }; - - let has_body = if let Some(exact_size) = req.size_hint().exact() { - exact_size > 0 - } else { - true - }; - - let maybe_request_body_rid = if has_body { - let stream: BytesStream = Box::pin(req.into_body().map(|r| { - r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) - })); - let stream_reader = StreamReader::new(stream); - let mut state = state.borrow_mut(); - let request_body_rid = state.resource_table.add(RequestBodyResource { - conn_rid, - reader: AsyncRefCell::new(stream_reader), - cancel: CancelHandle::default(), - }); - Some(request_body_rid) - } else { - None - }; - - let mut state = state.borrow_mut(); - let response_sender_rid = - state.resource_table.add(ResponseSenderResource { - sender: tx, - conn_rid, - }); - - Poll::Ready(Ok(Some(NextRequestResponse( - maybe_request_body_rid, - response_sender_rid, - method, - headers, - url, - )))) - } else if connection_closed { - Poll::Ready(Ok(None)) - } else { - Poll::Pending - } - }) - .try_or_cancel(cancel) - .await - .map_err(AnyError::from) -} - -fn should_ignore_error(e: &AnyError) -> bool { - if let Some(e) = e.downcast_ref::() { - use std::error::Error; - if let Some(std_err) = e.source() { - if let Some(io_err) = std_err.downcast_ref::() { - if io_err.kind() == std::io::ErrorKind::NotConnected { - return true; - } - } - } - } - false -} - -fn op_http_start( - state: &mut OpState, - tcp_stream_rid: ResourceId, - _: (), -) -> Result { - let deno_service = Service::default(); - - if let Some(resource_rc) = state - .resource_table - .take::(tcp_stream_rid) - { - let resource = Rc::try_unwrap(resource_rc) - .expect("Only a single use of this resource should happen"); - let (read_half, write_half) = resource.into_inner(); - let tcp_stream = read_half.reunite(write_half)?; - let addr = tcp_stream.local_addr()?; - let hyper_connection = Http::new() - .with_executor(LocalExecutor) - .serve_connection(tcp_stream, deno_service.clone()); - let conn_resource = ConnResource { - hyper_connection: ConnType::Tcp(Rc::new(RefCell::new(hyper_connection))), - deno_service, - addr, - cancel: CancelHandle::default(), - }; - let rid = state.resource_table.add(conn_resource); - return Ok(rid); - } - - if let Some(resource_rc) = state - .resource_table - .take::(tcp_stream_rid) - { - let resource = Rc::try_unwrap(resource_rc) - .expect("Only a single use of this resource should happen"); - let (read_half, write_half) = resource.into_inner(); - let tls_stream = read_half.reunite(write_half); - let addr = tls_stream.get_ref().0.local_addr()?; - - let hyper_connection = Http::new() - .with_executor(LocalExecutor) - .serve_connection(tls_stream, deno_service.clone()); - let conn_resource = ConnResource { - hyper_connection: ConnType::Tls(Rc::new(RefCell::new(hyper_connection))), - deno_service, - addr, - cancel: CancelHandle::default(), - }; - let rid = state.resource_table.add(conn_resource); - return Ok(rid); - } - - Err(bad_resource_id()) -} - -// We use a tuple instead of struct to avoid serialization overhead of the keys. -#[derive(Deserialize)] -struct RespondArgs( - // rid: - u32, - // status: - u16, - // headers: - Vec<(ByteString, ByteString)>, -); - -async fn op_http_response( - state: Rc>, - args: RespondArgs, - data: Option, -) -> Result, AnyError> { - let RespondArgs(rid, status, headers) = args; - - let response_sender = state - .borrow_mut() - .resource_table - .take::(rid) - .ok_or_else(bad_resource_id)?; - let response_sender = Rc::try_unwrap(response_sender) - .ok() - .expect("multiple op_http_respond ongoing"); - - let conn_resource = state - .borrow() - .resource_table - .get::(response_sender.conn_rid) - .ok_or_else(bad_resource_id)?; - - let mut builder = Response::builder().status(status); - - builder.headers_mut().unwrap().reserve(headers.len()); - for (key, value) in &headers { - builder = builder.header(key.as_ref(), value.as_ref()); - } - - let res; - let maybe_response_body_rid = if let Some(d) = data { - // If a body is passed, we use it, and don't return a body for streaming. - res = builder.body(Vec::from(&*d).into())?; - None - } else { - // If no body is passed, we return a writer for streaming the body. - let (sender, body) = Body::channel(); - res = builder.body(body)?; - - let response_body_rid = - state.borrow_mut().resource_table.add(ResponseBodyResource { - body: AsyncRefCell::new(sender), - conn_rid: response_sender.conn_rid, - }); - - Some(response_body_rid) - }; - - // oneshot::Sender::send(v) returns |v| on error, not an error object. - // The only failure mode is the receiver already having dropped its end - // of the channel. - if response_sender.sender.send(res).is_err() { - return Err(type_error("internal communication error")); - } - - poll_fn(|cx| match conn_resource.poll(cx) { - Poll::Ready(x) => Poll::Ready(x), - Poll::Pending => Poll::Ready(Ok(())), - }) - .await?; - - if maybe_response_body_rid.is_none() { - conn_resource.deno_service.waker.wake(); - } - Ok(maybe_response_body_rid) -} - -async fn op_http_response_close( - state: Rc>, - rid: ResourceId, - _: (), -) -> Result<(), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .take::(rid) - .ok_or_else(bad_resource_id)?; - - let conn_resource = state - .borrow() - .resource_table - .get::(resource.conn_rid) - .ok_or_else(bad_resource_id)?; - drop(resource); - - let r = poll_fn(|cx| match conn_resource.poll(cx) { - Poll::Ready(x) => Poll::Ready(x), - Poll::Pending => Poll::Ready(Ok(())), - }) - .await; - conn_resource.deno_service.waker.wake(); - r -} - -async fn op_http_request_read( - state: Rc>, - rid: ResourceId, - data: Option, -) -> Result { - let mut data = data.ok_or_else(null_opbuf)?; - - let resource = state - .borrow() - .resource_table - .get::(rid as u32) - .ok_or_else(bad_resource_id)?; - - let conn_resource = state - .borrow() - .resource_table - .get::(resource.conn_rid) - .ok_or_else(bad_resource_id)?; - - let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await; - let cancel = RcRef::map(resource, |r| &r.cancel); - let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local(); - - poll_fn(|cx| { - if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { - // close ConnResource - // close RequestResource associated with connection - // close ResponseBodyResource associated with connection - return Poll::Ready(Err(e)); - } - - read_fut.poll_unpin(cx).map_err(AnyError::from) - }) - .await -} - -async fn op_http_response_write( - state: Rc>, - rid: ResourceId, - data: Option, -) -> Result<(), AnyError> { - let buf = data.ok_or_else(null_opbuf)?; - let resource = state - .borrow() - .resource_table - .get::(rid as u32) - .ok_or_else(bad_resource_id)?; - - let conn_resource = state - .borrow() - .resource_table - .get::(resource.conn_rid) - .ok_or_else(bad_resource_id)?; - - let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; - - let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local(); - - poll_fn(|cx| { - let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from); - - // Poll connection so the data is flushed - if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { - // close ConnResource - // close RequestResource associated with connection - // close ResponseBodyResource associated with connection - return Poll::Ready(Err(e)); - } - - r - }) - .await?; - - Ok(()) -} - -type BytesStream = - Pin> + Unpin>>; - -struct RequestBodyResource { - conn_rid: ResourceId, - reader: AsyncRefCell>, - cancel: CancelHandle, -} - -impl Resource for RequestBodyResource { - fn name(&self) -> Cow { - "requestBody".into() - } - - fn close(self: Rc) { - self.cancel.cancel() - } -} - -struct ResponseSenderResource { - sender: oneshot::Sender>, - conn_rid: ResourceId, -} - -impl Resource for ResponseSenderResource { - fn name(&self) -> Cow { - "responseSender".into() - } -} - -struct ResponseBodyResource { - body: AsyncRefCell, - conn_rid: ResourceId, -} - -impl Resource for ResponseBodyResource { - fn name(&self) -> Cow { - "responseBody".into() - } -} - -// Needed so hyper can use non Send futures -#[derive(Clone)] -struct LocalExecutor; - -impl hyper::rt::Executor for LocalExecutor -where - Fut: Future + 'static, - Fut::Output: 'static, -{ - fn execute(&self, fut: Fut) { - tokio::task::spawn_local(fut); - } -} diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index 18279c0eb..e18846466 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -1,6 +1,5 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use crate::ops::tls; use deno_core::error::null_opbuf; use deno_core::error::resource_unavailable; use deno_core::error::AnyError; @@ -17,6 +16,9 @@ use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_core::ZeroCopyBuf; +use deno_net::io::TcpStreamResource; +use deno_net::io::TlsStreamResource; +use deno_net::io::UnixStreamResource; use std::borrow::Cow; use std::cell::RefCell; use std::io::Read; @@ -26,13 +28,10 @@ use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; -use tokio::net::tcp; use tokio::process; #[cfg(unix)] use std::os::unix::io::FromRawFd; -#[cfg(unix)] -use tokio::net::unix; #[cfg(windows)] use std::os::windows::io::FromRawHandle; @@ -238,70 +237,6 @@ where } } -/// A full duplex resource has a read and write ends that are completely -/// independent, like TCP/Unix sockets and TLS streams. -#[derive(Debug)] -pub struct FullDuplexResource { - rd: AsyncRefCell, - wr: AsyncRefCell, - // When a full-duplex resource is closed, all pending 'read' ops are - // canceled, while 'write' ops are allowed to complete. Therefore only - // 'read' futures should be attached to this cancel handle. - cancel_handle: CancelHandle, -} - -impl FullDuplexResource -where - R: AsyncRead + Unpin + 'static, - W: AsyncWrite + Unpin + 'static, -{ - pub fn new((rd, wr): (R, W)) -> Self { - Self { - rd: rd.into(), - wr: wr.into(), - cancel_handle: Default::default(), - } - } - - pub fn into_inner(self) -> (R, W) { - (self.rd.into_inner(), self.wr.into_inner()) - } - - pub fn rd_borrow_mut(self: &Rc) -> AsyncMutFuture { - RcRef::map(self, |r| &r.rd).borrow_mut() - } - - pub fn wr_borrow_mut(self: &Rc) -> AsyncMutFuture { - RcRef::map(self, |r| &r.wr).borrow_mut() - } - - pub fn cancel_handle(self: &Rc) -> RcRef { - RcRef::map(self, |r| &r.cancel_handle) - } - - pub fn cancel_read_ops(&self) { - self.cancel_handle.cancel() - } - - async fn read(self: &Rc, buf: &mut [u8]) -> Result { - let mut rd = self.rd_borrow_mut().await; - let nread = rd.read(buf).try_or_cancel(self.cancel_handle()).await?; - Ok(nread) - } - - async fn write(self: &Rc, buf: &[u8]) -> Result { - let mut wr = self.wr_borrow_mut().await; - let nwritten = wr.write(buf).await?; - Ok(nwritten) - } - - async fn shutdown(self: &Rc) -> Result<(), AnyError> { - let mut wr = self.wr_borrow_mut().await; - wr.shutdown().await?; - Ok(()) - } -} - pub type ChildStdinResource = WriteOnlyResource; impl Resource for ChildStdinResource { @@ -334,64 +269,6 @@ impl Resource for ChildStderrResource { } } -pub type TcpStreamResource = - FullDuplexResource; - -impl Resource for TcpStreamResource { - fn name(&self) -> Cow { - "tcpStream".into() - } - - fn close(self: Rc) { - self.cancel_read_ops(); - } -} - -pub type TlsStreamResource = FullDuplexResource; - -impl Resource for TlsStreamResource { - fn name(&self) -> Cow { - "tlsStream".into() - } - - fn close(self: Rc) { - self.cancel_read_ops(); - } -} - -#[cfg(unix)] -pub type UnixStreamResource = - FullDuplexResource; - -#[cfg(not(unix))] -struct UnixStreamResource; - -#[cfg(not(unix))] -impl UnixStreamResource { - async fn read(self: &Rc, _buf: &mut [u8]) -> Result { - unreachable!() - } - async fn write(self: &Rc, _buf: &[u8]) -> Result { - unreachable!() - } - async fn shutdown(self: &Rc) -> Result<(), AnyError> { - unreachable!() - } - fn cancel_read_ops(&self) { - unreachable!() - } -} - -impl Resource for UnixStreamResource { - fn name(&self) -> Cow { - "unixStream".into() - } - - fn close(self: Rc) { - self.cancel_read_ops(); - } -} - #[derive(Debug, Default)] pub struct StdFileResource { pub fs_file: diff --git a/runtime/ops/mod.rs b/runtime/ops/mod.rs index b05a91180..c94020780 100644 --- a/runtime/ops/mod.rs +++ b/runtime/ops/mod.rs @@ -2,18 +2,13 @@ pub mod fs; pub mod fs_events; -pub mod http; pub mod io; -pub mod net; -#[cfg(unix)] -mod net_unix; pub mod os; pub mod permissions; pub mod plugin; pub mod process; pub mod runtime; pub mod signal; -pub mod tls; pub mod tty; mod utils; pub mod web_worker; diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs deleted file mode 100644 index c9195aab7..000000000 --- a/runtime/ops/net.rs +++ /dev/null @@ -1,793 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use crate::ops::io::TcpStreamResource; -use crate::permissions::Permissions; -use crate::resolve_addr::resolve_addr; -use crate::resolve_addr::resolve_addr_sync; -use deno_core::error::bad_resource; -use deno_core::error::custom_error; -use deno_core::error::generic_error; -use deno_core::error::null_opbuf; -use deno_core::error::type_error; -use deno_core::error::AnyError; -use deno_core::op_async; -use deno_core::op_sync; -use deno_core::AsyncRefCell; -use deno_core::CancelHandle; -use deno_core::CancelTryFuture; -use deno_core::Extension; -use deno_core::OpState; -use deno_core::RcRef; -use deno_core::Resource; -use deno_core::ResourceId; -use deno_core::ZeroCopyBuf; -use log::debug; -use serde::Deserialize; -use serde::Serialize; -use std::borrow::Cow; -use std::cell::RefCell; -use std::net::SocketAddr; -use std::rc::Rc; -use tokio::net::TcpListener; -use tokio::net::TcpStream; -use tokio::net::UdpSocket; -use trust_dns_proto::rr::record_data::RData; -use trust_dns_proto::rr::record_type::RecordType; -use trust_dns_resolver::config::NameServerConfigGroup; -use trust_dns_resolver::config::ResolverConfig; -use trust_dns_resolver::config::ResolverOpts; -use trust_dns_resolver::system_conf; -use trust_dns_resolver::AsyncResolver; - -#[cfg(unix)] -use super::net_unix; -#[cfg(unix)] -use crate::ops::io::UnixStreamResource; -#[cfg(unix)] -use std::path::Path; - -pub fn init() -> Extension { - Extension::builder() - .ops(vec![ - ("op_accept", op_async(op_accept)), - ("op_connect", op_async(op_connect)), - ("op_listen", op_sync(op_listen)), - ("op_datagram_receive", op_async(op_datagram_receive)), - ("op_datagram_send", op_async(op_datagram_send)), - ("op_dns_resolve", op_async(op_dns_resolve)), - ]) - .build() -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -pub struct OpConn { - pub rid: ResourceId, - pub remote_addr: Option, - pub local_addr: Option, -} - -#[derive(Serialize)] -#[serde(tag = "transport", rename_all = "lowercase")] -pub enum OpAddr { - Tcp(IpAddr), - Udp(IpAddr), - #[cfg(unix)] - Unix(net_unix::UnixAddr), - #[cfg(unix)] - UnixPacket(net_unix::UnixAddr), -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -/// A received datagram packet (from udp or unixpacket) -pub struct OpPacket { - pub size: usize, - pub remote_addr: OpAddr, -} - -#[derive(Serialize)] -pub struct IpAddr { - pub hostname: String, - pub port: u16, -} - -#[derive(Deserialize)] -pub(crate) struct AcceptArgs { - pub rid: ResourceId, - pub transport: String, -} - -async fn accept_tcp( - state: Rc>, - args: AcceptArgs, - _: (), -) -> Result { - let rid = args.rid; - - let resource = state - .borrow() - .resource_table - .get::(rid) - .ok_or_else(|| bad_resource("Listener has been closed"))?; - let listener = RcRef::map(&resource, |r| &r.listener) - .try_borrow_mut() - .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?; - let cancel = RcRef::map(resource, |r| &r.cancel); - let (tcp_stream, _socket_addr) = - listener.accept().try_or_cancel(cancel).await.map_err(|e| { - // FIXME(bartlomieju): compatibility with current JS implementation - if let std::io::ErrorKind::Interrupted = e.kind() { - bad_resource("Listener has been closed") - } else { - e.into() - } - })?; - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - - let mut state = state.borrow_mut(); - let rid = state - .resource_table - .add(TcpStreamResource::new(tcp_stream.into_split())); - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Tcp(IpAddr { - hostname: local_addr.ip().to_string(), - port: local_addr.port(), - })), - remote_addr: Some(OpAddr::Tcp(IpAddr { - hostname: remote_addr.ip().to_string(), - port: remote_addr.port(), - })), - }) -} - -async fn op_accept( - state: Rc>, - args: AcceptArgs, - _: (), -) -> Result { - match args.transport.as_str() { - "tcp" => accept_tcp(state, args, ()).await, - #[cfg(unix)] - "unix" => net_unix::accept_unix(state, args, ()).await, - other => Err(bad_transport(other)), - } -} - -fn bad_transport(transport: &str) -> AnyError { - generic_error(format!("Unsupported transport protocol {}", transport)) -} - -#[derive(Deserialize)] -pub(crate) struct ReceiveArgs { - pub rid: ResourceId, - pub transport: String, -} - -async fn receive_udp( - state: Rc>, - args: ReceiveArgs, - zero_copy: Option, -) -> Result { - let zero_copy = zero_copy.ok_or_else(null_opbuf)?; - let mut zero_copy = zero_copy.clone(); - - let rid = args.rid; - - let resource = state - .borrow_mut() - .resource_table - .get::(rid) - .ok_or_else(|| bad_resource("Socket has been closed"))?; - let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; - let cancel_handle = RcRef::map(&resource, |r| &r.cancel); - let (size, remote_addr) = socket - .recv_from(&mut zero_copy) - .try_or_cancel(cancel_handle) - .await?; - Ok(OpPacket { - size, - remote_addr: OpAddr::Udp(IpAddr { - hostname: remote_addr.ip().to_string(), - port: remote_addr.port(), - }), - }) -} - -async fn op_datagram_receive( - state: Rc>, - args: ReceiveArgs, - zero_copy: Option, -) -> Result { - match args.transport.as_str() { - "udp" => receive_udp(state, args, zero_copy).await, - #[cfg(unix)] - "unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await, - other => Err(bad_transport(other)), - } -} - -#[derive(Deserialize)] -struct SendArgs { - rid: ResourceId, - transport: String, - #[serde(flatten)] - transport_args: ArgsEnum, -} - -async fn op_datagram_send( - state: Rc>, - args: SendArgs, - zero_copy: Option, -) -> Result { - let zero_copy = zero_copy.ok_or_else(null_opbuf)?; - let zero_copy = zero_copy.clone(); - - match args { - SendArgs { - rid, - transport, - transport_args: ArgsEnum::Ip(args), - } if transport == "udp" => { - { - let mut s = state.borrow_mut(); - s.borrow_mut::() - .net - .check(&(&args.hostname, Some(args.port)))?; - } - let addr = resolve_addr(&args.hostname, args.port) - .await? - .next() - .ok_or_else(|| generic_error("No resolved address found"))?; - - let resource = state - .borrow_mut() - .resource_table - .get::(rid) - .ok_or_else(|| bad_resource("Socket has been closed"))?; - let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; - let byte_length = socket.send_to(&zero_copy, &addr).await?; - Ok(byte_length) - } - #[cfg(unix)] - SendArgs { - rid, - transport, - transport_args: ArgsEnum::Unix(args), - } if transport == "unixpacket" => { - let address_path = Path::new(&args.path); - { - let mut s = state.borrow_mut(); - s.borrow_mut::().write.check(&address_path)?; - } - let resource = state - .borrow() - .resource_table - .get::(rid) - .ok_or_else(|| { - custom_error("NotConnected", "Socket has been closed") - })?; - let socket = RcRef::map(&resource, |r| &r.socket) - .try_borrow_mut() - .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; - let byte_length = socket.send_to(&zero_copy, address_path).await?; - Ok(byte_length) - } - _ => Err(type_error("Wrong argument format!")), - } -} - -#[derive(Deserialize)] -struct ConnectArgs { - transport: String, - #[serde(flatten)] - transport_args: ArgsEnum, -} - -async fn op_connect( - state: Rc>, - args: ConnectArgs, - _: (), -) -> Result { - match args { - ConnectArgs { - transport, - transport_args: ArgsEnum::Ip(args), - } if transport == "tcp" => { - { - let mut state_ = state.borrow_mut(); - state_ - .borrow_mut::() - .net - .check(&(&args.hostname, Some(args.port)))?; - } - let addr = resolve_addr(&args.hostname, args.port) - .await? - .next() - .ok_or_else(|| generic_error("No resolved address found"))?; - let tcp_stream = TcpStream::connect(&addr).await?; - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - - let mut state_ = state.borrow_mut(); - let rid = state_ - .resource_table - .add(TcpStreamResource::new(tcp_stream.into_split())); - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Tcp(IpAddr { - hostname: local_addr.ip().to_string(), - port: local_addr.port(), - })), - remote_addr: Some(OpAddr::Tcp(IpAddr { - hostname: remote_addr.ip().to_string(), - port: remote_addr.port(), - })), - }) - } - #[cfg(unix)] - ConnectArgs { - transport, - transport_args: ArgsEnum::Unix(args), - } if transport == "unix" => { - let address_path = Path::new(&args.path); - super::check_unstable2(&state, "Deno.connect"); - { - let mut state_ = state.borrow_mut(); - state_ - .borrow_mut::() - .read - .check(&address_path)?; - state_ - .borrow_mut::() - .write - .check(&address_path)?; - } - let path = args.path; - let unix_stream = net_unix::UnixStream::connect(Path::new(&path)).await?; - let local_addr = unix_stream.local_addr()?; - let remote_addr = unix_stream.peer_addr()?; - - let mut state_ = state.borrow_mut(); - let resource = UnixStreamResource::new(unix_stream.into_split()); - let rid = state_.resource_table.add(resource); - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Unix(net_unix::UnixAddr { - path: local_addr.as_pathname().and_then(net_unix::pathstring), - })), - remote_addr: Some(OpAddr::Unix(net_unix::UnixAddr { - path: remote_addr.as_pathname().and_then(net_unix::pathstring), - })), - }) - } - _ => Err(type_error("Wrong argument format!")), - } -} - -pub struct TcpListenerResource { - pub listener: AsyncRefCell, - pub cancel: CancelHandle, -} - -impl Resource for TcpListenerResource { - fn name(&self) -> Cow { - "tcpListener".into() - } - - fn close(self: Rc) { - self.cancel.cancel(); - } -} - -struct UdpSocketResource { - socket: AsyncRefCell, - cancel: CancelHandle, -} - -impl Resource for UdpSocketResource { - fn name(&self) -> Cow { - "udpSocket".into() - } - - fn close(self: Rc) { - self.cancel.cancel() - } -} - -#[derive(Deserialize)] -struct IpListenArgs { - hostname: String, - port: u16, -} - -#[derive(Deserialize)] -#[serde(untagged)] -enum ArgsEnum { - Ip(IpListenArgs), - #[cfg(unix)] - Unix(net_unix::UnixListenArgs), -} - -#[derive(Deserialize)] -struct ListenArgs { - transport: String, - #[serde(flatten)] - transport_args: ArgsEnum, -} - -fn listen_tcp( - state: &mut OpState, - addr: SocketAddr, -) -> Result<(u32, SocketAddr), AnyError> { - let std_listener = std::net::TcpListener::bind(&addr)?; - std_listener.set_nonblocking(true)?; - let listener = TcpListener::from_std(std_listener)?; - let local_addr = listener.local_addr()?; - let listener_resource = TcpListenerResource { - listener: AsyncRefCell::new(listener), - cancel: Default::default(), - }; - let rid = state.resource_table.add(listener_resource); - - Ok((rid, local_addr)) -} - -fn listen_udp( - state: &mut OpState, - addr: SocketAddr, -) -> Result<(u32, SocketAddr), AnyError> { - let std_socket = std::net::UdpSocket::bind(&addr)?; - std_socket.set_nonblocking(true)?; - let socket = UdpSocket::from_std(std_socket)?; - let local_addr = socket.local_addr()?; - let socket_resource = UdpSocketResource { - socket: AsyncRefCell::new(socket), - cancel: Default::default(), - }; - let rid = state.resource_table.add(socket_resource); - - Ok((rid, local_addr)) -} - -fn op_listen( - state: &mut OpState, - args: ListenArgs, - _: (), -) -> Result { - match args { - ListenArgs { - transport, - transport_args: ArgsEnum::Ip(args), - } => { - { - if transport == "udp" { - super::check_unstable(state, "Deno.listenDatagram"); - } - state - .borrow_mut::() - .net - .check(&(&args.hostname, Some(args.port)))?; - } - let addr = resolve_addr_sync(&args.hostname, args.port)? - .next() - .ok_or_else(|| generic_error("No resolved address found"))?; - let (rid, local_addr) = if transport == "tcp" { - listen_tcp(state, addr)? - } else { - listen_udp(state, addr)? - }; - debug!( - "New listener {} {}:{}", - rid, - local_addr.ip().to_string(), - local_addr.port() - ); - let ip_addr = IpAddr { - hostname: local_addr.ip().to_string(), - port: local_addr.port(), - }; - Ok(OpConn { - rid, - local_addr: Some(match transport.as_str() { - "udp" => OpAddr::Udp(ip_addr), - "tcp" => OpAddr::Tcp(ip_addr), - // NOTE: This could be unreachable!() - other => return Err(bad_transport(other)), - }), - remote_addr: None, - }) - } - #[cfg(unix)] - ListenArgs { - transport, - transport_args: ArgsEnum::Unix(args), - } if transport == "unix" || transport == "unixpacket" => { - let address_path = Path::new(&args.path); - { - if transport == "unix" { - super::check_unstable(state, "Deno.listen"); - } - if transport == "unixpacket" { - super::check_unstable(state, "Deno.listenDatagram"); - } - let permissions = state.borrow_mut::(); - permissions.read.check(&address_path)?; - permissions.write.check(&address_path)?; - } - let (rid, local_addr) = if transport == "unix" { - net_unix::listen_unix(state, &address_path)? - } else { - net_unix::listen_unix_packet(state, &address_path)? - }; - debug!( - "New listener {} {}", - rid, - local_addr.as_pathname().unwrap().display(), - ); - let unix_addr = net_unix::UnixAddr { - path: local_addr.as_pathname().and_then(net_unix::pathstring), - }; - - Ok(OpConn { - rid, - local_addr: Some(match transport.as_str() { - "unix" => OpAddr::Unix(unix_addr), - "unixpacket" => OpAddr::UnixPacket(unix_addr), - other => return Err(bad_transport(other)), - }), - remote_addr: None, - }) - } - #[cfg(unix)] - _ => Err(type_error("Wrong argument format!")), - } -} - -#[derive(Serialize, PartialEq, Debug)] -#[serde(untagged)] -enum DnsReturnRecord { - A(String), - Aaaa(String), - Aname(String), - Cname(String), - Mx { - preference: u16, - exchange: String, - }, - Ptr(String), - Srv { - priority: u16, - weight: u16, - port: u16, - target: String, - }, - Txt(Vec), -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ResolveAddrArgs { - query: String, - record_type: RecordType, - options: Option, -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ResolveDnsOption { - name_server: Option, -} - -fn default_port() -> u16 { - 53 -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct NameServer { - ip_addr: String, - #[serde(default = "default_port")] - port: u16, -} - -async fn op_dns_resolve( - state: Rc>, - args: ResolveAddrArgs, - _: (), -) -> Result, AnyError> { - let ResolveAddrArgs { - query, - record_type, - options, - } = args; - - let (config, opts) = if let Some(name_server) = - options.as_ref().and_then(|o| o.name_server.as_ref()) - { - let group = NameServerConfigGroup::from_ips_clear( - &[name_server.ip_addr.parse()?], - name_server.port, - true, - ); - ( - ResolverConfig::from_parts(None, vec![], group), - ResolverOpts::default(), - ) - } else { - system_conf::read_system_conf()? - }; - - { - let mut s = state.borrow_mut(); - let perm = s.borrow_mut::(); - - // Checks permission against the name servers which will be actually queried. - for ns in config.name_servers() { - let socker_addr = &ns.socket_addr; - let ip = socker_addr.ip().to_string(); - let port = socker_addr.port(); - perm.net.check(&(ip, Some(port)))?; - } - } - - let resolver = AsyncResolver::tokio(config, opts)?; - - let results = resolver - .lookup(query, record_type, Default::default()) - .await - .map_err(|e| generic_error(format!("{}", e)))? - .iter() - .filter_map(rdata_to_return_record(record_type)) - .collect(); - - Ok(results) -} - -fn rdata_to_return_record( - ty: RecordType, -) -> impl Fn(&RData) -> Option { - use RecordType::*; - move |r: &RData| -> Option { - match ty { - A => r.as_a().map(ToString::to_string).map(DnsReturnRecord::A), - AAAA => r - .as_aaaa() - .map(ToString::to_string) - .map(DnsReturnRecord::Aaaa), - ANAME => r - .as_aname() - .map(ToString::to_string) - .map(DnsReturnRecord::Aname), - CNAME => r - .as_cname() - .map(ToString::to_string) - .map(DnsReturnRecord::Cname), - MX => r.as_mx().map(|mx| DnsReturnRecord::Mx { - preference: mx.preference(), - exchange: mx.exchange().to_string(), - }), - PTR => r - .as_ptr() - .map(ToString::to_string) - .map(DnsReturnRecord::Ptr), - SRV => r.as_srv().map(|srv| DnsReturnRecord::Srv { - priority: srv.priority(), - weight: srv.weight(), - port: srv.port(), - target: srv.target().to_string(), - }), - TXT => r.as_txt().map(|txt| { - let texts: Vec = txt - .iter() - .map(|bytes| { - // Tries to parse these bytes as Latin-1 - bytes.iter().map(|&b| b as char).collect::() - }) - .collect(); - DnsReturnRecord::Txt(texts) - }), - // TODO(magurotuna): Other record types are not supported - _ => todo!(), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::net::Ipv4Addr; - use std::net::Ipv6Addr; - use trust_dns_proto::rr::rdata::mx::MX; - use trust_dns_proto::rr::rdata::srv::SRV; - use trust_dns_proto::rr::rdata::txt::TXT; - use trust_dns_proto::rr::record_data::RData; - use trust_dns_proto::rr::Name; - - #[test] - fn rdata_to_return_record_a() { - let func = rdata_to_return_record(RecordType::A); - let rdata = RData::A(Ipv4Addr::new(127, 0, 0, 1)); - assert_eq!( - func(&rdata), - Some(DnsReturnRecord::A("127.0.0.1".to_string())) - ); - } - - #[test] - fn rdata_to_return_record_aaaa() { - let func = rdata_to_return_record(RecordType::AAAA); - let rdata = RData::AAAA(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)); - assert_eq!(func(&rdata), Some(DnsReturnRecord::Aaaa("::1".to_string()))); - } - - #[test] - fn rdata_to_return_record_aname() { - let func = rdata_to_return_record(RecordType::ANAME); - let rdata = RData::ANAME(Name::new()); - assert_eq!(func(&rdata), Some(DnsReturnRecord::Aname("".to_string()))); - } - - #[test] - fn rdata_to_return_record_cname() { - let func = rdata_to_return_record(RecordType::CNAME); - let rdata = RData::CNAME(Name::new()); - assert_eq!(func(&rdata), Some(DnsReturnRecord::Cname("".to_string()))); - } - - #[test] - fn rdata_to_return_record_mx() { - let func = rdata_to_return_record(RecordType::MX); - let rdata = RData::MX(MX::new(10, Name::new())); - assert_eq!( - func(&rdata), - Some(DnsReturnRecord::Mx { - preference: 10, - exchange: "".to_string() - }) - ); - } - - #[test] - fn rdata_to_return_record_ptr() { - let func = rdata_to_return_record(RecordType::PTR); - let rdata = RData::PTR(Name::new()); - assert_eq!(func(&rdata), Some(DnsReturnRecord::Ptr("".to_string()))); - } - - #[test] - fn rdata_to_return_record_srv() { - let func = rdata_to_return_record(RecordType::SRV); - let rdata = RData::SRV(SRV::new(1, 2, 3, Name::new())); - assert_eq!( - func(&rdata), - Some(DnsReturnRecord::Srv { - priority: 1, - weight: 2, - port: 3, - target: "".to_string() - }) - ); - } - - #[test] - fn rdata_to_return_record_txt() { - let func = rdata_to_return_record(RecordType::TXT); - let rdata = RData::TXT(TXT::from_bytes(vec![ - "foo".as_bytes(), - "bar".as_bytes(), - &[0xa3], // "£" in Latin-1 - &[0xe3, 0x81, 0x82], // "あ" in UTF-8 - ])); - assert_eq!( - func(&rdata), - Some(DnsReturnRecord::Txt(vec![ - "foo".to_string(), - "bar".to_string(), - "£".to_string(), - "ã\u{81}\u{82}".to_string(), - ])) - ); - } -} diff --git a/runtime/ops/net_unix.rs b/runtime/ops/net_unix.rs deleted file mode 100644 index d56dc76d9..000000000 --- a/runtime/ops/net_unix.rs +++ /dev/null @@ -1,173 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -use super::utils::into_string; -use crate::ops::io::UnixStreamResource; -use crate::ops::net::AcceptArgs; -use crate::ops::net::OpAddr; -use crate::ops::net::OpConn; -use crate::ops::net::OpPacket; -use crate::ops::net::ReceiveArgs; -use deno_core::error::bad_resource; -use deno_core::error::custom_error; -use deno_core::error::null_opbuf; -use deno_core::error::AnyError; -use deno_core::AsyncRefCell; -use deno_core::CancelHandle; -use deno_core::CancelTryFuture; -use deno_core::OpState; -use deno_core::RcRef; -use deno_core::Resource; -use deno_core::ZeroCopyBuf; -use serde::Deserialize; -use serde::Serialize; -use std::borrow::Cow; -use std::cell::RefCell; -use std::fs::remove_file; -use std::path::Path; -use std::rc::Rc; -use tokio::net::UnixDatagram; -use tokio::net::UnixListener; -pub use tokio::net::UnixStream; - -struct UnixListenerResource { - listener: AsyncRefCell, - cancel: CancelHandle, -} - -impl Resource for UnixListenerResource { - fn name(&self) -> Cow { - "unixListener".into() - } - - fn close(self: Rc) { - self.cancel.cancel(); - } -} - -pub struct UnixDatagramResource { - pub socket: AsyncRefCell, - pub cancel: CancelHandle, -} - -impl Resource for UnixDatagramResource { - fn name(&self) -> Cow { - "unixDatagram".into() - } - - fn close(self: Rc) { - self.cancel.cancel(); - } -} - -#[derive(Serialize)] -pub struct UnixAddr { - pub path: Option, -} - -#[derive(Deserialize)] -pub struct UnixListenArgs { - pub path: String, -} - -pub(crate) async fn accept_unix( - state: Rc>, - args: AcceptArgs, - _: (), -) -> Result { - let rid = args.rid; - - let resource = state - .borrow() - .resource_table - .get::(rid) - .ok_or_else(|| bad_resource("Listener has been closed"))?; - let listener = RcRef::map(&resource, |r| &r.listener) - .try_borrow_mut() - .ok_or_else(|| custom_error("Busy", "Listener already in use"))?; - let cancel = RcRef::map(resource, |r| &r.cancel); - let (unix_stream, _socket_addr) = - listener.accept().try_or_cancel(cancel).await?; - - let local_addr = unix_stream.local_addr()?; - let remote_addr = unix_stream.peer_addr()?; - let resource = UnixStreamResource::new(unix_stream.into_split()); - let mut state = state.borrow_mut(); - let rid = state.resource_table.add(resource); - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Unix(UnixAddr { - path: local_addr.as_pathname().and_then(pathstring), - })), - remote_addr: Some(OpAddr::Unix(UnixAddr { - path: remote_addr.as_pathname().and_then(pathstring), - })), - }) -} - -pub(crate) async fn receive_unix_packet( - state: Rc>, - args: ReceiveArgs, - buf: Option, -) -> Result { - let mut buf = buf.ok_or_else(null_opbuf)?; - - let rid = args.rid; - - let resource = state - .borrow() - .resource_table - .get::(rid) - .ok_or_else(|| bad_resource("Socket has been closed"))?; - let socket = RcRef::map(&resource, |r| &r.socket) - .try_borrow_mut() - .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; - let cancel = RcRef::map(resource, |r| &r.cancel); - let (size, remote_addr) = - socket.recv_from(&mut buf).try_or_cancel(cancel).await?; - Ok(OpPacket { - size, - remote_addr: OpAddr::UnixPacket(UnixAddr { - path: remote_addr.as_pathname().and_then(pathstring), - }), - }) -} - -pub fn listen_unix( - state: &mut OpState, - addr: &Path, -) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> { - if addr.exists() { - remove_file(&addr).unwrap(); - } - let listener = UnixListener::bind(&addr)?; - let local_addr = listener.local_addr()?; - let listener_resource = UnixListenerResource { - listener: AsyncRefCell::new(listener), - cancel: Default::default(), - }; - let rid = state.resource_table.add(listener_resource); - - Ok((rid, local_addr)) -} - -pub fn listen_unix_packet( - state: &mut OpState, - addr: &Path, -) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> { - if addr.exists() { - remove_file(&addr).unwrap(); - } - let socket = UnixDatagram::bind(&addr)?; - let local_addr = socket.local_addr()?; - let datagram_resource = UnixDatagramResource { - socket: AsyncRefCell::new(socket), - cancel: Default::default(), - }; - let rid = state.resource_table.add(datagram_resource); - - Ok((rid, local_addr)) -} - -pub fn pathstring(pathname: &Path) -> Option { - into_string(pathname.into()).ok() -} diff --git a/runtime/ops/tls.rs b/runtime/ops/tls.rs deleted file mode 100644 index c3f554856..000000000 --- a/runtime/ops/tls.rs +++ /dev/null @@ -1,1017 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -pub use rustls; -pub use webpki; - -use crate::ops::io::TcpStreamResource; -use crate::ops::io::TlsStreamResource; -use crate::ops::net::IpAddr; -use crate::ops::net::OpAddr; -use crate::ops::net::OpConn; -use crate::permissions::Permissions; -use crate::resolve_addr::resolve_addr; -use crate::resolve_addr::resolve_addr_sync; -use deno_core::error::bad_resource; -use deno_core::error::bad_resource_id; -use deno_core::error::custom_error; -use deno_core::error::generic_error; -use deno_core::error::invalid_hostname; -use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; -use deno_core::futures::ready; -use deno_core::futures::task::noop_waker_ref; -use deno_core::futures::task::AtomicWaker; -use deno_core::futures::task::Context; -use deno_core::futures::task::Poll; -use deno_core::futures::task::RawWaker; -use deno_core::futures::task::RawWakerVTable; -use deno_core::futures::task::Waker; -use deno_core::op_async; -use deno_core::op_sync; -use deno_core::AsyncRefCell; -use deno_core::CancelHandle; -use deno_core::CancelTryFuture; -use deno_core::Extension; -use deno_core::OpState; -use deno_core::RcRef; -use deno_core::Resource; -use deno_core::ResourceId; -use io::Error; -use io::Read; -use io::Write; -use rustls::internal::pemfile::certs; -use rustls::internal::pemfile::pkcs8_private_keys; -use rustls::internal::pemfile::rsa_private_keys; -use rustls::Certificate; -use rustls::ClientConfig; -use rustls::ClientSession; -use rustls::NoClientAuth; -use rustls::PrivateKey; -use rustls::ServerConfig; -use rustls::ServerSession; -use rustls::Session; -use rustls::StoresClientSessions; -use serde::Deserialize; -use std::borrow::Cow; -use std::cell::RefCell; -use std::collections::HashMap; -use std::convert::From; -use std::fs::File; -use std::io; -use std::io::BufReader; -use std::io::ErrorKind; -use std::ops::Deref; -use std::ops::DerefMut; -use std::path::Path; -use std::pin::Pin; -use std::rc::Rc; -use std::sync::Arc; -use std::sync::Mutex; -use std::sync::Weak; -use tokio::io::AsyncRead; -use tokio::io::AsyncWrite; -use tokio::io::ReadBuf; -use tokio::net::TcpListener; -use tokio::net::TcpStream; -use tokio::task::spawn_local; -use webpki::DNSNameRef; - -lazy_static::lazy_static! { - static ref CLIENT_SESSION_MEMORY_CACHE: Arc = - Arc::new(ClientSessionMemoryCache::default()); -} - -#[derive(Default)] -struct ClientSessionMemoryCache(Mutex, Vec>>); - -impl StoresClientSessions for ClientSessionMemoryCache { - fn get(&self, key: &[u8]) -> Option> { - self.0.lock().unwrap().get(key).cloned() - } - - fn put(&self, key: Vec, value: Vec) -> bool { - let mut sessions = self.0.lock().unwrap(); - // TODO(bnoordhuis) Evict sessions LRU-style instead of arbitrarily. - while sessions.len() >= 1024 { - let key = sessions.keys().next().unwrap().clone(); - sessions.remove(&key); - } - sessions.insert(key, value); - true - } -} - -#[derive(Debug)] -enum TlsSession { - Client(ClientSession), - Server(ServerSession), -} - -impl Deref for TlsSession { - type Target = dyn Session; - - fn deref(&self) -> &Self::Target { - match self { - TlsSession::Client(client_session) => client_session, - TlsSession::Server(server_session) => server_session, - } - } -} - -impl DerefMut for TlsSession { - fn deref_mut(&mut self) -> &mut Self::Target { - match self { - TlsSession::Client(client_session) => client_session, - TlsSession::Server(server_session) => server_session, - } - } -} - -impl From for TlsSession { - fn from(client_session: ClientSession) -> Self { - TlsSession::Client(client_session) - } -} - -impl From for TlsSession { - fn from(server_session: ServerSession) -> Self { - TlsSession::Server(server_session) - } -} - -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -enum Flow { - Read, - Write, -} - -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -enum State { - StreamOpen, - StreamClosed, - TlsClosing, - TlsClosed, - TcpClosed, -} - -#[derive(Debug)] -pub struct TlsStream(Option); - -impl TlsStream { - fn new(tcp: TcpStream, tls: TlsSession) -> Self { - let inner = TlsStreamInner { - tcp, - tls, - rd_state: State::StreamOpen, - wr_state: State::StreamOpen, - }; - Self(Some(inner)) - } - - pub fn new_client_side( - tcp: TcpStream, - tls_config: &Arc, - hostname: DNSNameRef, - ) -> Self { - let tls = TlsSession::Client(ClientSession::new(tls_config, hostname)); - Self::new(tcp, tls) - } - - pub fn new_server_side( - tcp: TcpStream, - tls_config: &Arc, - ) -> Self { - let tls = TlsSession::Server(ServerSession::new(tls_config)); - Self::new(tcp, tls) - } - - pub async fn handshake(&mut self) -> io::Result<()> { - poll_fn(|cx| self.inner_mut().poll_io(cx, Flow::Write)).await - } - - fn into_split(self) -> (ReadHalf, WriteHalf) { - let shared = Shared::new(self); - let rd = ReadHalf { - shared: shared.clone(), - }; - let wr = WriteHalf { shared }; - (rd, wr) - } - - /// Tokio-rustls compatibility: returns a reference to the underlying TCP - /// stream, and a reference to the Rustls `Session` object. - pub fn get_ref(&self) -> (&TcpStream, &dyn Session) { - let inner = self.0.as_ref().unwrap(); - (&inner.tcp, &*inner.tls) - } - - fn inner_mut(&mut self) -> &mut TlsStreamInner { - self.0.as_mut().unwrap() - } -} - -impl AsyncRead for TlsStream { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - self.inner_mut().poll_read(cx, buf) - } -} - -impl AsyncWrite for TlsStream { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.inner_mut().poll_write(cx, buf) - } - - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.inner_mut().poll_io(cx, Flow::Write) - // The underlying TCP stream does not need to be flushed. - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.inner_mut().poll_shutdown(cx) - } -} - -impl Drop for TlsStream { - fn drop(&mut self) { - let mut inner = self.0.take().unwrap(); - - let mut cx = Context::from_waker(noop_waker_ref()); - let use_linger_task = inner.poll_close(&mut cx).is_pending(); - - if use_linger_task { - spawn_local(poll_fn(move |cx| inner.poll_close(cx))); - } else if cfg!(debug_assertions) { - spawn_local(async {}); // Spawn dummy task to detect missing LocalSet. - } - } -} - -#[derive(Debug)] -pub struct TlsStreamInner { - tls: TlsSession, - tcp: TcpStream, - rd_state: State, - wr_state: State, -} - -impl TlsStreamInner { - fn poll_io( - &mut self, - cx: &mut Context<'_>, - flow: Flow, - ) -> Poll> { - loop { - let wr_ready = loop { - match self.wr_state { - _ if self.tls.is_handshaking() && !self.tls.wants_write() => { - break true; - } - _ if self.tls.is_handshaking() => {} - State::StreamOpen if !self.tls.wants_write() => break true, - State::StreamClosed => { - // Rustls will enqueue the 'CloseNotify' alert and send it after - // flusing the data that is already in the queue. - self.tls.send_close_notify(); - self.wr_state = State::TlsClosing; - continue; - } - State::TlsClosing if !self.tls.wants_write() => { - self.wr_state = State::TlsClosed; - continue; - } - // If a 'CloseNotify' alert sent by the remote end has been received, - // shut down the underlying TCP socket. Otherwise, consider polling - // done for the moment. - State::TlsClosed if self.rd_state < State::TlsClosed => break true, - State::TlsClosed - if Pin::new(&mut self.tcp).poll_shutdown(cx)?.is_pending() => - { - break false; - } - State::TlsClosed => { - self.wr_state = State::TcpClosed; - continue; - } - State::TcpClosed => break true, - _ => {} - } - - // Poll whether there is space in the socket send buffer so we can flush - // the remaining outgoing ciphertext. - if self.tcp.poll_write_ready(cx)?.is_pending() { - break false; - } - - // Write ciphertext to the TCP socket. - let mut wrapped_tcp = ImplementWriteTrait(&mut self.tcp); - match self.tls.write_tls(&mut wrapped_tcp) { - Ok(0) => unreachable!(), - Ok(_) => {} - Err(err) if err.kind() == ErrorKind::WouldBlock => {} - Err(err) => return Poll::Ready(Err(err)), - } - }; - - let rd_ready = loop { - match self.rd_state { - State::TcpClosed if self.tls.is_handshaking() => { - let err = Error::new(ErrorKind::UnexpectedEof, "tls handshake eof"); - return Poll::Ready(Err(err)); - } - _ if self.tls.is_handshaking() && !self.tls.wants_read() => { - break true; - } - _ if self.tls.is_handshaking() => {} - State::StreamOpen if !self.tls.wants_read() => break true, - State::StreamOpen => {} - State::StreamClosed if !self.tls.wants_read() => { - // Rustls has more incoming cleartext buffered up, but the TLS - // session is closing so this data will never be processed by the - // application layer. Just like what would happen if this were a raw - // TCP stream, don't gracefully end the TLS session, but abort it. - return Poll::Ready(Err(Error::from(ErrorKind::ConnectionReset))); - } - State::StreamClosed => {} - State::TlsClosed if self.wr_state == State::TcpClosed => { - // Wait for the remote end to gracefully close the TCP connection. - // TODO(piscisaureus): this is unnecessary; remove when stable. - } - _ => break true, - } - - if self.rd_state < State::TlsClosed { - // Do a zero-length plaintext read so we can detect the arrival of - // 'CloseNotify' messages, even if only the write half is open. - // Actually reading data from the socket is done in `poll_read()`. - match self.tls.read(&mut []) { - Ok(0) => {} - Err(err) if err.kind() == ErrorKind::ConnectionAborted => { - // `Session::read()` returns `ConnectionAborted` when a - // 'CloseNotify' alert has been received, which indicates that - // the remote peer wants to gracefully end the TLS session. - self.rd_state = State::TlsClosed; - continue; - } - Err(err) => return Poll::Ready(Err(err)), - _ => unreachable!(), - } - } - - // Poll whether more ciphertext is available in the socket receive - // buffer. - if self.tcp.poll_read_ready(cx)?.is_pending() { - break false; - } - - // Receive ciphertext from the socket. - let mut wrapped_tcp = ImplementReadTrait(&mut self.tcp); - match self.tls.read_tls(&mut wrapped_tcp) { - Ok(0) => self.rd_state = State::TcpClosed, - Ok(_) => self - .tls - .process_new_packets() - .map_err(|err| Error::new(ErrorKind::InvalidData, err))?, - Err(err) if err.kind() == ErrorKind::WouldBlock => {} - Err(err) => return Poll::Ready(Err(err)), - } - }; - - if wr_ready { - if self.rd_state >= State::TlsClosed - && self.wr_state >= State::TlsClosed - && self.wr_state < State::TcpClosed - { - continue; - } - if self.tls.wants_write() { - continue; - } - } - - let io_ready = match flow { - _ if self.tls.is_handshaking() => false, - Flow::Read => rd_ready, - Flow::Write => wr_ready, - }; - return match io_ready { - false => Poll::Pending, - true => Poll::Ready(Ok(())), - }; - } - } - - fn poll_read( - &mut self, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - ready!(self.poll_io(cx, Flow::Read))?; - - if self.rd_state == State::StreamOpen { - let buf_slice = - unsafe { &mut *(buf.unfilled_mut() as *mut [_] as *mut [u8]) }; - let bytes_read = self.tls.read(buf_slice)?; - assert_ne!(bytes_read, 0); - unsafe { buf.assume_init(bytes_read) }; - buf.advance(bytes_read); - } - - Poll::Ready(Ok(())) - } - - fn poll_write( - &mut self, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - if buf.is_empty() { - // Tokio-rustls compatibility: a zero byte write always succeeds. - Poll::Ready(Ok(0)) - } else if self.wr_state == State::StreamOpen { - // Flush Rustls' ciphertext send queue. - ready!(self.poll_io(cx, Flow::Write))?; - - // Copy data from `buf` to the Rustls cleartext send queue. - let bytes_written = self.tls.write(buf)?; - assert_ne!(bytes_written, 0); - - // Try to flush as much ciphertext as possible. However, since we just - // handed off at least some bytes to rustls, so we can't return - // `Poll::Pending()` any more: this would tell the caller that it should - // try to send those bytes again. - let _ = self.poll_io(cx, Flow::Write)?; - - Poll::Ready(Ok(bytes_written)) - } else { - // Return error if stream has been shut down for writing. - Poll::Ready(Err(ErrorKind::BrokenPipe.into())) - } - } - - fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll> { - if self.wr_state == State::StreamOpen { - self.wr_state = State::StreamClosed; - } - - ready!(self.poll_io(cx, Flow::Write))?; - - // At minimum, a TLS 'CloseNotify' alert should have been sent. - assert!(self.wr_state >= State::TlsClosed); - // If we received a TLS 'CloseNotify' alert from the remote end - // already, the TCP socket should be shut down at this point. - assert!( - self.rd_state < State::TlsClosed || self.wr_state == State::TcpClosed - ); - - Poll::Ready(Ok(())) - } - - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { - if self.rd_state == State::StreamOpen { - self.rd_state = State::StreamClosed; - } - - // Send TLS 'CloseNotify' alert. - ready!(self.poll_shutdown(cx))?; - // Wait for 'CloseNotify', shut down TCP stream, wait for TCP FIN packet. - ready!(self.poll_io(cx, Flow::Read))?; - - assert_eq!(self.rd_state, State::TcpClosed); - assert_eq!(self.wr_state, State::TcpClosed); - - Poll::Ready(Ok(())) - } -} - -#[derive(Debug)] -pub struct ReadHalf { - shared: Arc, -} - -impl ReadHalf { - pub fn reunite(self, wr: WriteHalf) -> TlsStream { - assert!(Arc::ptr_eq(&self.shared, &wr.shared)); - drop(wr); // Drop `wr`, so only one strong reference to `shared` remains. - - Arc::try_unwrap(self.shared) - .unwrap_or_else(|_| panic!("Arc::::try_unwrap() failed")) - .tls_stream - .into_inner() - .unwrap() - } -} - -impl AsyncRead for ReadHalf { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - self - .shared - .poll_with_shared_waker(cx, Flow::Read, move |tls, cx| { - tls.poll_read(cx, buf) - }) - } -} - -#[derive(Debug)] -pub struct WriteHalf { - shared: Arc, -} - -impl AsyncWrite for WriteHalf { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self - .shared - .poll_with_shared_waker(cx, Flow::Write, move |tls, cx| { - tls.poll_write(cx, buf) - }) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self - .shared - .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_flush(cx)) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self - .shared - .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_shutdown(cx)) - } -} - -#[derive(Debug)] -struct Shared { - tls_stream: Mutex, - rd_waker: AtomicWaker, - wr_waker: AtomicWaker, -} - -impl Shared { - fn new(tls_stream: TlsStream) -> Arc { - let self_ = Self { - tls_stream: Mutex::new(tls_stream), - rd_waker: AtomicWaker::new(), - wr_waker: AtomicWaker::new(), - }; - Arc::new(self_) - } - - fn poll_with_shared_waker( - self: &Arc, - cx: &mut Context<'_>, - flow: Flow, - mut f: impl FnMut(Pin<&mut TlsStream>, &mut Context<'_>) -> R, - ) -> R { - match flow { - Flow::Read => self.rd_waker.register(cx.waker()), - Flow::Write => self.wr_waker.register(cx.waker()), - } - - let shared_waker = self.new_shared_waker(); - let mut cx = Context::from_waker(&shared_waker); - - let mut tls_stream = self.tls_stream.lock().unwrap(); - f(Pin::new(&mut tls_stream), &mut cx) - } - - const SHARED_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( - Self::clone_shared_waker, - Self::wake_shared_waker, - Self::wake_shared_waker_by_ref, - Self::drop_shared_waker, - ); - - fn new_shared_waker(self: &Arc) -> Waker { - let self_weak = Arc::downgrade(self); - let self_ptr = self_weak.into_raw() as *const (); - let raw_waker = RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE); - unsafe { Waker::from_raw(raw_waker) } - } - - fn clone_shared_waker(self_ptr: *const ()) -> RawWaker { - let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) }; - let ptr1 = self_weak.clone().into_raw(); - let ptr2 = self_weak.into_raw(); - assert!(ptr1 == ptr2); - RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE) - } - - fn wake_shared_waker(self_ptr: *const ()) { - Self::wake_shared_waker_by_ref(self_ptr); - Self::drop_shared_waker(self_ptr); - } - - fn wake_shared_waker_by_ref(self_ptr: *const ()) { - let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) }; - if let Some(self_arc) = Weak::upgrade(&self_weak) { - self_arc.rd_waker.wake(); - self_arc.wr_waker.wake(); - } - self_weak.into_raw(); - } - - fn drop_shared_waker(self_ptr: *const ()) { - let _ = unsafe { Weak::from_raw(self_ptr as *const Self) }; - } -} - -struct ImplementReadTrait<'a, T>(&'a mut T); - -impl Read for ImplementReadTrait<'_, TcpStream> { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.0.try_read(buf) - } -} - -struct ImplementWriteTrait<'a, T>(&'a mut T); - -impl Write for ImplementWriteTrait<'_, TcpStream> { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.0.try_write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - -pub fn init() -> Extension { - Extension::builder() - .ops(vec![ - ("op_start_tls", op_async(op_start_tls)), - ("op_connect_tls", op_async(op_connect_tls)), - ("op_listen_tls", op_sync(op_listen_tls)), - ("op_accept_tls", op_async(op_accept_tls)), - ]) - .build() -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ConnectTlsArgs { - transport: String, - hostname: String, - port: u16, - cert_file: Option, -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct StartTlsArgs { - rid: ResourceId, - cert_file: Option, - hostname: String, -} - -async fn op_start_tls( - state: Rc>, - args: StartTlsArgs, - _: (), -) -> Result { - let rid = args.rid; - let hostname = match &*args.hostname { - "" => "localhost", - n => n, - }; - let cert_file = args.cert_file.as_deref(); - - { - super::check_unstable2(&state, "Deno.startTls"); - let mut s = state.borrow_mut(); - let permissions = s.borrow_mut::(); - permissions.net.check(&(hostname, Some(0)))?; - if let Some(path) = cert_file { - permissions.read.check(Path::new(path))?; - } - } - - let hostname_dns = DNSNameRef::try_from_ascii_str(hostname) - .map_err(|_| invalid_hostname(hostname))?; - - let resource_rc = state - .borrow_mut() - .resource_table - .take::(rid) - .ok_or_else(bad_resource_id)?; - let resource = Rc::try_unwrap(resource_rc) - .expect("Only a single use of this resource should happen"); - let (read_half, write_half) = resource.into_inner(); - let tcp_stream = read_half.reunite(write_half)?; - - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - - let mut tls_config = ClientConfig::new(); - tls_config.set_persistence(CLIENT_SESSION_MEMORY_CACHE.clone()); - tls_config - .root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - if let Some(path) = cert_file { - let key_file = File::open(path)?; - let reader = &mut BufReader::new(key_file); - tls_config.root_store.add_pem_file(reader).unwrap(); - } - let tls_config = Arc::new(tls_config); - - let tls_stream = - TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns); - - let rid = { - let mut state_ = state.borrow_mut(); - state_ - .resource_table - .add(TlsStreamResource::new(tls_stream.into_split())) - }; - - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Tcp(IpAddr { - hostname: local_addr.ip().to_string(), - port: local_addr.port(), - })), - remote_addr: Some(OpAddr::Tcp(IpAddr { - hostname: remote_addr.ip().to_string(), - port: remote_addr.port(), - })), - }) -} - -async fn op_connect_tls( - state: Rc>, - args: ConnectTlsArgs, - _: (), -) -> Result { - assert_eq!(args.transport, "tcp"); - let hostname = match &*args.hostname { - "" => "localhost", - n => n, - }; - let port = args.port; - let cert_file = args.cert_file.as_deref(); - - { - let mut s = state.borrow_mut(); - let permissions = s.borrow_mut::(); - permissions.net.check(&(hostname, Some(port)))?; - if let Some(path) = cert_file { - permissions.read.check(Path::new(path))?; - } - } - - let hostname_dns = DNSNameRef::try_from_ascii_str(hostname) - .map_err(|_| invalid_hostname(hostname))?; - - let connect_addr = resolve_addr(hostname, port) - .await? - .next() - .ok_or_else(|| generic_error("No resolved address found"))?; - let tcp_stream = TcpStream::connect(connect_addr).await?; - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - - let mut tls_config = ClientConfig::new(); - tls_config.set_persistence(CLIENT_SESSION_MEMORY_CACHE.clone()); - tls_config - .root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - if let Some(path) = cert_file { - let key_file = File::open(path)?; - let reader = &mut BufReader::new(key_file); - tls_config.root_store.add_pem_file(reader).unwrap(); - } - let tls_config = Arc::new(tls_config); - - let tls_stream = - TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns); - - let rid = { - let mut state_ = state.borrow_mut(); - state_ - .resource_table - .add(TlsStreamResource::new(tls_stream.into_split())) - }; - - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Tcp(IpAddr { - hostname: local_addr.ip().to_string(), - port: local_addr.port(), - })), - remote_addr: Some(OpAddr::Tcp(IpAddr { - hostname: remote_addr.ip().to_string(), - port: remote_addr.port(), - })), - }) -} - -fn load_certs(path: &str) -> Result, AnyError> { - let cert_file = File::open(path)?; - let reader = &mut BufReader::new(cert_file); - - let certs = certs(reader) - .map_err(|_| custom_error("InvalidData", "Unable to decode certificate"))?; - - if certs.is_empty() { - let e = custom_error("InvalidData", "No certificates found in cert file"); - return Err(e); - } - - Ok(certs) -} - -fn key_decode_err() -> AnyError { - custom_error("InvalidData", "Unable to decode key") -} - -fn key_not_found_err() -> AnyError { - custom_error("InvalidData", "No keys found in key file") -} - -/// Starts with -----BEGIN RSA PRIVATE KEY----- -fn load_rsa_keys(path: &str) -> Result, AnyError> { - let key_file = File::open(path)?; - let reader = &mut BufReader::new(key_file); - let keys = rsa_private_keys(reader).map_err(|_| key_decode_err())?; - Ok(keys) -} - -/// Starts with -----BEGIN PRIVATE KEY----- -fn load_pkcs8_keys(path: &str) -> Result, AnyError> { - let key_file = File::open(path)?; - let reader = &mut BufReader::new(key_file); - let keys = pkcs8_private_keys(reader).map_err(|_| key_decode_err())?; - Ok(keys) -} - -fn load_keys(path: &str) -> Result, AnyError> { - let path = path.to_string(); - let mut keys = load_rsa_keys(&path)?; - - if keys.is_empty() { - keys = load_pkcs8_keys(&path)?; - } - - if keys.is_empty() { - return Err(key_not_found_err()); - } - - Ok(keys) -} - -pub struct TlsListenerResource { - tcp_listener: AsyncRefCell, - tls_config: Arc, - cancel_handle: CancelHandle, -} - -impl Resource for TlsListenerResource { - fn name(&self) -> Cow { - "tlsListener".into() - } - - fn close(self: Rc) { - self.cancel_handle.cancel(); - } -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ListenTlsArgs { - transport: String, - hostname: String, - port: u16, - cert_file: String, - key_file: String, - alpn_protocols: Option>, -} - -fn op_listen_tls( - state: &mut OpState, - args: ListenTlsArgs, - _: (), -) -> Result { - assert_eq!(args.transport, "tcp"); - let hostname = &*args.hostname; - let port = args.port; - let cert_file = &*args.cert_file; - let key_file = &*args.key_file; - - { - let permissions = state.borrow_mut::(); - permissions.net.check(&(hostname, Some(port)))?; - permissions.read.check(Path::new(cert_file))?; - permissions.read.check(Path::new(key_file))?; - } - - let mut tls_config = ServerConfig::new(NoClientAuth::new()); - if let Some(alpn_protocols) = args.alpn_protocols { - super::check_unstable(state, "Deno.listenTls#alpn_protocols"); - tls_config.alpn_protocols = - alpn_protocols.into_iter().map(|s| s.into_bytes()).collect(); - } - tls_config - .set_single_cert(load_certs(cert_file)?, load_keys(key_file)?.remove(0)) - .expect("invalid key or certificate"); - - let bind_addr = resolve_addr_sync(hostname, port)? - .next() - .ok_or_else(|| generic_error("No resolved address found"))?; - let std_listener = std::net::TcpListener::bind(bind_addr)?; - std_listener.set_nonblocking(true)?; - let tcp_listener = TcpListener::from_std(std_listener)?; - let local_addr = tcp_listener.local_addr()?; - - let tls_listener_resource = TlsListenerResource { - tcp_listener: AsyncRefCell::new(tcp_listener), - tls_config: Arc::new(tls_config), - cancel_handle: Default::default(), - }; - - let rid = state.resource_table.add(tls_listener_resource); - - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Tcp(IpAddr { - hostname: local_addr.ip().to_string(), - port: local_addr.port(), - })), - remote_addr: None, - }) -} - -async fn op_accept_tls( - state: Rc>, - rid: ResourceId, - _: (), -) -> Result { - let resource = state - .borrow() - .resource_table - .get::(rid) - .ok_or_else(|| bad_resource("Listener has been closed"))?; - - let cancel_handle = RcRef::map(&resource, |r| &r.cancel_handle); - let tcp_listener = RcRef::map(&resource, |r| &r.tcp_listener) - .try_borrow_mut() - .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?; - - let (tcp_stream, remote_addr) = - match tcp_listener.accept().try_or_cancel(&cancel_handle).await { - Ok(tuple) => tuple, - Err(err) if err.kind() == ErrorKind::Interrupted => { - // FIXME(bartlomieju): compatibility with current JS implementation. - return Err(bad_resource("Listener has been closed")); - } - Err(err) => return Err(err.into()), - }; - - let local_addr = tcp_stream.local_addr()?; - - let tls_stream = TlsStream::new_server_side(tcp_stream, &resource.tls_config); - - let rid = { - let mut state_ = state.borrow_mut(); - state_ - .resource_table - .add(TlsStreamResource::new(tls_stream.into_split())) - }; - - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Tcp(IpAddr { - hostname: local_addr.ip().to_string(), - port: local_addr.port(), - })), - remote_addr: Some(OpAddr::Tcp(IpAddr { - hostname: remote_addr.ip().to_string(), - port: remote_addr.port(), - })), - }) -} diff --git a/runtime/permissions.rs b/runtime/permissions.rs index f8385e201..d78e20076 100644 --- a/runtime/permissions.rs +++ b/runtime/permissions.rs @@ -962,6 +962,23 @@ impl Permissions { } } +impl deno_net::NetPermissions for Permissions { + fn check_net>( + &mut self, + host: &(T, Option), + ) -> Result<(), AnyError> { + self.net.check(host) + } + + fn check_read(&mut self, path: &Path) -> Result<(), AnyError> { + self.read.check(path) + } + + fn check_write(&mut self, path: &Path) -> Result<(), AnyError> { + self.write.check(path) + } +} + impl deno_fetch::FetchPermissions for Permissions { fn check_net_url(&mut self, url: &url::Url) -> Result<(), AnyError> { self.net.check_url(url) diff --git a/runtime/resolve_addr.rs b/runtime/resolve_addr.rs deleted file mode 100644 index ebf1374d1..000000000 --- a/runtime/resolve_addr.rs +++ /dev/null @@ -1,156 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -use deno_core::error::AnyError; -use std::net::SocketAddr; -use std::net::ToSocketAddrs; -use tokio::net::lookup_host; - -/// Resolve network address *asynchronously*. -pub async fn resolve_addr( - hostname: &str, - port: u16, -) -> Result + '_, AnyError> { - let addr_port_pair = make_addr_port_pair(hostname, port); - let result = lookup_host(addr_port_pair).await?; - Ok(result) -} - -/// Resolve network address *synchronously*. -pub fn resolve_addr_sync( - hostname: &str, - port: u16, -) -> Result, AnyError> { - let addr_port_pair = make_addr_port_pair(hostname, port); - let result = addr_port_pair.to_socket_addrs()?; - Ok(result) -} - -fn make_addr_port_pair(hostname: &str, port: u16) -> (&str, u16) { - // Default to localhost if given just the port. Example: ":80" - if hostname.is_empty() { - return ("0.0.0.0", port); - } - - // If this looks like an ipv6 IP address. Example: "[2001:db8::1]" - // Then we remove the brackets. - let addr = hostname.trim_start_matches('[').trim_end_matches(']'); - (addr, port) -} - -#[cfg(test)] -mod tests { - use super::*; - use std::net::Ipv4Addr; - use std::net::Ipv6Addr; - use std::net::SocketAddrV4; - use std::net::SocketAddrV6; - - #[tokio::test] - async fn resolve_addr1() { - let expected = vec![SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(127, 0, 0, 1), - 80, - ))]; - let actual = resolve_addr("127.0.0.1", 80) - .await - .unwrap() - .collect::>(); - assert_eq!(actual, expected); - } - - #[tokio::test] - async fn resolve_addr2() { - let expected = vec![SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(0, 0, 0, 0), - 80, - ))]; - let actual = resolve_addr("", 80).await.unwrap().collect::>(); - assert_eq!(actual, expected); - } - - #[tokio::test] - async fn resolve_addr3() { - let expected = vec![SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(192, 0, 2, 1), - 25, - ))]; - let actual = resolve_addr("192.0.2.1", 25) - .await - .unwrap() - .collect::>(); - assert_eq!(actual, expected); - } - - #[tokio::test] - async fn resolve_addr_ipv6() { - let expected = vec![SocketAddr::V6(SocketAddrV6::new( - Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1), - 8080, - 0, - 0, - ))]; - let actual = resolve_addr("[2001:db8::1]", 8080) - .await - .unwrap() - .collect::>(); - assert_eq!(actual, expected); - } - - #[tokio::test] - async fn resolve_addr_err() { - assert!(resolve_addr("INVALID ADDR", 1234).await.is_err()); - } - - #[test] - fn resolve_addr_sync1() { - let expected = vec![SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(127, 0, 0, 1), - 80, - ))]; - let actual = resolve_addr_sync("127.0.0.1", 80) - .unwrap() - .collect::>(); - assert_eq!(actual, expected); - } - - #[test] - fn resolve_addr_sync2() { - let expected = vec![SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(0, 0, 0, 0), - 80, - ))]; - let actual = resolve_addr_sync("", 80).unwrap().collect::>(); - assert_eq!(actual, expected); - } - - #[test] - fn resolve_addr_sync3() { - let expected = vec![SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(192, 0, 2, 1), - 25, - ))]; - let actual = resolve_addr_sync("192.0.2.1", 25) - .unwrap() - .collect::>(); - assert_eq!(actual, expected); - } - - #[test] - fn resolve_addr_sync_ipv6() { - let expected = vec![SocketAddr::V6(SocketAddrV6::new( - Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1), - 8080, - 0, - 0, - ))]; - let actual = resolve_addr_sync("[2001:db8::1]", 8080) - .unwrap() - .collect::>(); - assert_eq!(actual, expected); - } - - #[test] - fn resolve_addr_sync_err() { - assert!(resolve_addr_sync("INVALID ADDR", 1234).is_err()); - } -} diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index a3a062221..ac87d285b 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -330,14 +330,12 @@ impl WebWorker { vec![ ops::fs_events::init(), ops::fs::init(), - ops::net::init(), + deno_net::init::(options.unstable), ops::os::init(), - ops::http::init(), ops::permissions::init(), ops::plugin::init(), ops::process::init(), ops::signal::init(), - ops::tls::init(), ops::tty::init(), ops::io::init_stdio(), ] diff --git a/runtime/worker.rs b/runtime/worker.rs index 9dfdcc825..567e75253 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -120,16 +120,14 @@ impl MainWorker { ops::worker_host::init(options.create_web_worker_cb.clone()), ops::fs_events::init(), ops::fs::init(), - ops::http::init(), ops::io::init(), ops::io::init_stdio(), - ops::net::init(), + deno_net::init::(options.unstable), ops::os::init(), ops::permissions::init(), ops::plugin::init(), ops::process::init(), ops::signal::init(), - ops::tls::init(), ops::tty::init(), // Permissions ext (worker specific state) perm_ext, -- cgit v1.2.3