summaryrefslogtreecommitdiff
path: root/ext/net
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2021-08-11 12:27:05 +0200
committerGitHub <noreply@github.com>2021-08-11 12:27:05 +0200
commita0285e2eb88f6254f6494b0ecd1878db3a3b2a58 (patch)
tree90671b004537e20f9493fd3277ffd21d30b39a0e /ext/net
parent3a6994115176781b3a93d70794b1b81bc95e42b4 (diff)
Rename extensions/ directory to ext/ (#11643)
Diffstat (limited to 'ext/net')
-rw-r--r--ext/net/01_net.js240
-rw-r--r--ext/net/02_tls.js89
-rw-r--r--ext/net/04_net_unstable.js49
-rw-r--r--ext/net/Cargo.toml25
-rw-r--r--ext/net/README.md30
-rw-r--r--ext/net/io.rs232
-rw-r--r--ext/net/lib.deno_net.d.ts150
-rw-r--r--ext/net/lib.deno_net.unstable.d.ts258
-rw-r--r--ext/net/lib.rs131
-rw-r--r--ext/net/ops.rs795
-rw-r--r--ext/net/ops_tls.rs1061
-rw-r--r--ext/net/ops_unix.rs180
-rw-r--r--ext/net/resolve_addr.rs156
13 files changed, 3396 insertions, 0 deletions
diff --git a/ext/net/01_net.js b/ext/net/01_net.js
new file mode 100644
index 000000000..cc10a1c0a
--- /dev/null
+++ b/ext/net/01_net.js
@@ -0,0 +1,240 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+"use strict";
+
+((window) => {
+ const core = window.Deno.core;
+ const { BadResource } = core;
+ const {
+ PromiseResolve,
+ SymbolAsyncIterator,
+ Uint8Array,
+ TypedArrayPrototypeSubarray,
+ } = window.__bootstrap.primordials;
+
+ async function read(
+ rid,
+ buffer,
+ ) {
+ if (buffer.length === 0) {
+ return 0;
+ }
+ const nread = await core.opAsync("op_net_read_async", rid, buffer);
+ return nread === 0 ? null : nread;
+ }
+
+ async function write(rid, data) {
+ return await core.opAsync("op_net_write_async", rid, data);
+ }
+
+ function shutdown(rid) {
+ return core.opAsync("op_net_shutdown", rid);
+ }
+
+ function opAccept(rid, transport) {
+ return core.opAsync("op_accept", { rid, transport });
+ }
+
+ function opListen(args) {
+ return core.opSync("op_listen", args);
+ }
+
+ function opConnect(args) {
+ return core.opAsync("op_connect", args);
+ }
+
+ function opReceive(rid, transport, zeroCopy) {
+ return core.opAsync(
+ "op_datagram_receive",
+ { rid, transport },
+ zeroCopy,
+ );
+ }
+
+ function opSend(args, zeroCopy) {
+ return core.opAsync("op_datagram_send", args, zeroCopy);
+ }
+
+ function resolveDns(query, recordType, options) {
+ return core.opAsync("op_dns_resolve", { query, recordType, options });
+ }
+
+ class Conn {
+ #rid = 0;
+ #remoteAddr = null;
+ #localAddr = null;
+ constructor(rid, remoteAddr, localAddr) {
+ this.#rid = rid;
+ this.#remoteAddr = remoteAddr;
+ this.#localAddr = localAddr;
+ }
+
+ get rid() {
+ return this.#rid;
+ }
+
+ get remoteAddr() {
+ return this.#remoteAddr;
+ }
+
+ get localAddr() {
+ return this.#localAddr;
+ }
+
+ write(p) {
+ return write(this.rid, p);
+ }
+
+ read(p) {
+ return read(this.rid, p);
+ }
+
+ close() {
+ core.close(this.rid);
+ }
+
+ closeWrite() {
+ return shutdown(this.rid);
+ }
+ }
+
+ class Listener {
+ #rid = 0;
+ #addr = null;
+
+ constructor(rid, addr) {
+ this.#rid = rid;
+ this.#addr = addr;
+ }
+
+ get rid() {
+ return this.#rid;
+ }
+
+ get addr() {
+ return this.#addr;
+ }
+
+ async accept() {
+ const res = await opAccept(this.rid, this.addr.transport);
+ return new Conn(res.rid, res.remoteAddr, res.localAddr);
+ }
+
+ async next() {
+ let conn;
+ try {
+ conn = await this.accept();
+ } catch (error) {
+ if (error instanceof BadResource) {
+ return { value: undefined, done: true };
+ }
+ throw error;
+ }
+ return { value: conn, done: false };
+ }
+
+ return(value) {
+ this.close();
+ return PromiseResolve({ value, done: true });
+ }
+
+ close() {
+ core.close(this.rid);
+ }
+
+ [SymbolAsyncIterator]() {
+ return this;
+ }
+ }
+
+ class Datagram {
+ #rid = 0;
+ #addr = null;
+
+ constructor(rid, addr, bufSize = 1024) {
+ this.#rid = rid;
+ this.#addr = addr;
+ this.bufSize = bufSize;
+ }
+
+ get rid() {
+ return this.#rid;
+ }
+
+ get addr() {
+ return this.#addr;
+ }
+
+ async receive(p) {
+ const buf = p || new Uint8Array(this.bufSize);
+ const { size, remoteAddr } = await opReceive(
+ this.rid,
+ this.addr.transport,
+ buf,
+ );
+ const sub = TypedArrayPrototypeSubarray(buf, 0, size);
+ return [sub, remoteAddr];
+ }
+
+ send(p, addr) {
+ const remote = { hostname: "127.0.0.1", ...addr };
+
+ const args = { ...remote, rid: this.rid };
+ return opSend(args, p);
+ }
+
+ close() {
+ core.close(this.rid);
+ }
+
+ async *[SymbolAsyncIterator]() {
+ while (true) {
+ try {
+ yield await this.receive();
+ } catch (err) {
+ if (err instanceof BadResource) {
+ break;
+ }
+ throw err;
+ }
+ }
+ }
+ }
+
+ function listen({ hostname, ...options }) {
+ const res = opListen({
+ transport: "tcp",
+ hostname: typeof hostname === "undefined" ? "0.0.0.0" : hostname,
+ ...options,
+ });
+
+ return new Listener(res.rid, res.localAddr);
+ }
+
+ async function connect(options) {
+ let res;
+
+ if (options.transport === "unix") {
+ res = await opConnect(options);
+ } else {
+ res = await opConnect({
+ transport: "tcp",
+ hostname: "127.0.0.1",
+ ...options,
+ });
+ }
+
+ return new Conn(res.rid, res.remoteAddr, res.localAddr);
+ }
+
+ window.__bootstrap.net = {
+ connect,
+ Conn,
+ opConnect,
+ listen,
+ opListen,
+ Listener,
+ shutdown,
+ Datagram,
+ resolveDns,
+ };
+})(this);
diff --git a/ext/net/02_tls.js b/ext/net/02_tls.js
new file mode 100644
index 000000000..343ec2e4f
--- /dev/null
+++ b/ext/net/02_tls.js
@@ -0,0 +1,89 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+"use strict";
+
+((window) => {
+ const core = window.Deno.core;
+ const { Listener, Conn } = window.__bootstrap.net;
+
+ function opConnectTls(
+ args,
+ ) {
+ return core.opAsync("op_connect_tls", args);
+ }
+
+ function opAcceptTLS(rid) {
+ return core.opAsync("op_accept_tls", rid);
+ }
+
+ function opListenTls(args) {
+ return core.opSync("op_listen_tls", args);
+ }
+
+ function opStartTls(args) {
+ return core.opAsync("op_start_tls", args);
+ }
+
+ async function connectTls({
+ port,
+ hostname = "127.0.0.1",
+ transport = "tcp",
+ certFile = undefined,
+ certChain = undefined,
+ privateKey = undefined,
+ }) {
+ const res = await opConnectTls({
+ port,
+ hostname,
+ transport,
+ certFile,
+ certChain,
+ privateKey,
+ });
+ return new Conn(res.rid, res.remoteAddr, res.localAddr);
+ }
+
+ class TLSListener extends Listener {
+ async accept() {
+ const res = await opAcceptTLS(this.rid);
+ return new Conn(res.rid, res.remoteAddr, res.localAddr);
+ }
+ }
+
+ function listenTls({
+ port,
+ certFile,
+ keyFile,
+ hostname = "0.0.0.0",
+ transport = "tcp",
+ alpnProtocols,
+ }) {
+ const res = opListenTls({
+ port,
+ certFile,
+ keyFile,
+ hostname,
+ transport,
+ alpnProtocols,
+ });
+ return new TLSListener(res.rid, res.localAddr);
+ }
+
+ async function startTls(
+ conn,
+ { hostname = "127.0.0.1", certFile } = {},
+ ) {
+ const res = await opStartTls({
+ rid: conn.rid,
+ hostname,
+ certFile,
+ });
+ return new Conn(res.rid, res.remoteAddr, res.localAddr);
+ }
+
+ window.__bootstrap.tls = {
+ startTls,
+ listenTls,
+ connectTls,
+ TLSListener,
+ };
+})(this);
diff --git a/ext/net/04_net_unstable.js b/ext/net/04_net_unstable.js
new file mode 100644
index 000000000..ca265bfaa
--- /dev/null
+++ b/ext/net/04_net_unstable.js
@@ -0,0 +1,49 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+"use strict";
+
+((window) => {
+ const net = window.__bootstrap.net;
+
+ function listen(options) {
+ if (options.transport === "unix") {
+ const res = net.opListen(options);
+ return new net.Listener(res.rid, res.localAddr);
+ } else {
+ return net.listen(options);
+ }
+ }
+
+ function listenDatagram(
+ options,
+ ) {
+ let res;
+ if (options.transport === "unixpacket") {
+ res = net.opListen(options);
+ } else {
+ res = net.opListen({
+ transport: "udp",
+ hostname: "127.0.0.1",
+ ...options,
+ });
+ }
+
+ return new net.Datagram(res.rid, res.localAddr);
+ }
+
+ async function connect(
+ options,
+ ) {
+ if (options.transport === "unix") {
+ const res = await net.opConnect(options);
+ return new net.Conn(res.rid, res.remoteAddr, res.localAddr);
+ } else {
+ return net.connect(options);
+ }
+ }
+
+ window.__bootstrap.netUnstable = {
+ connect,
+ listenDatagram,
+ listen,
+ };
+})(this);
diff --git a/ext/net/Cargo.toml b/ext/net/Cargo.toml
new file mode 100644
index 000000000..09daf0e48
--- /dev/null
+++ b/ext/net/Cargo.toml
@@ -0,0 +1,25 @@
+# Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+[package]
+name = "deno_net"
+version = "0.5.0"
+authors = ["the Deno authors"]
+edition = "2018"
+license = "MIT"
+readme = "README.md"
+repository = "https://github.com/denoland/deno"
+description = "Networking for Deno"
+
+[lib]
+path = "lib.rs"
+
+[dependencies]
+deno_core = { version = "0.96.0", path = "../../core" }
+deno_tls = { version = "0.1.0", path = "../tls" }
+
+lazy_static = "1.4.0"
+log = "0.4.14"
+serde = { version = "1.0.126", features = ["derive"] }
+tokio = { version = "1.8.1", features = ["full"] }
+trust-dns-proto = "0.20.3"
+trust-dns-resolver = { version = "0.20.3", features = ["tokio-runtime", "serde-config"] }
diff --git a/ext/net/README.md b/ext/net/README.md
new file mode 100644
index 000000000..cdd8923e1
--- /dev/null
+++ b/ext/net/README.md
@@ -0,0 +1,30 @@
+# deno_net
+
+This crate implements networking APIs.
+
+This crate depends on following extensions:
+
+- "deno_web"
+- "deno_fetch"
+
+Following ops are provided:
+
+- "op_net_read_async"
+- "op_net_write_async"
+- "op_net_shutdown"
+- "op_accept"
+- "op_connect"
+- "op_listen"
+- "op_datagram_receive"
+- "op_datagram_send"
+- "op_dns_resolve"
+- "op_start_tls"
+- "op_connect_tls"
+- "op_listen_tls"
+- "op_accept_tls"
+- "op_http_start"
+- "op_http_request_next"
+- "op_http_request_read"
+- "op_http_response"
+- "op_http_response_write"
+- "op_http_response_close"
diff --git a/ext/net/io.rs b/ext/net/io.rs
new file mode 100644
index 000000000..fc10d7e99
--- /dev/null
+++ b/ext/net/io.rs
@@ -0,0 +1,232 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+use crate::ops_tls as tls;
+use deno_core::error::null_opbuf;
+use deno_core::error::AnyError;
+use deno_core::error::{bad_resource_id, not_supported};
+use deno_core::op_async;
+use deno_core::AsyncMutFuture;
+use deno_core::AsyncRefCell;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
+use deno_core::OpPair;
+use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
+use deno_core::ResourceId;
+use deno_core::ZeroCopyBuf;
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::rc::Rc;
+use tokio::io::AsyncRead;
+use tokio::io::AsyncReadExt;
+use tokio::io::AsyncWrite;
+use tokio::io::AsyncWriteExt;
+use tokio::net::tcp;
+
+#[cfg(unix)]
+use tokio::net::unix;
+
+pub fn init() -> Vec<OpPair> {
+ vec![
+ ("op_net_read_async", op_async(op_read_async)),
+ ("op_net_write_async", op_async(op_write_async)),
+ ("op_net_shutdown", op_async(op_shutdown)),
+ ]
+}
+
+/// A full duplex resource has a read and write ends that are completely
+/// independent, like TCP/Unix sockets and TLS streams.
+#[derive(Debug)]
+pub struct FullDuplexResource<R, W> {
+ rd: AsyncRefCell<R>,
+ wr: AsyncRefCell<W>,
+ // When a full-duplex resource is closed, all pending 'read' ops are
+ // canceled, while 'write' ops are allowed to complete. Therefore only
+ // 'read' futures should be attached to this cancel handle.
+ cancel_handle: CancelHandle,
+}
+
+impl<R, W> FullDuplexResource<R, W>
+where
+ R: AsyncRead + Unpin + 'static,
+ W: AsyncWrite + Unpin + 'static,
+{
+ pub fn new((rd, wr): (R, W)) -> Self {
+ Self {
+ rd: rd.into(),
+ wr: wr.into(),
+ cancel_handle: Default::default(),
+ }
+ }
+
+ pub fn into_inner(self) -> (R, W) {
+ (self.rd.into_inner(), self.wr.into_inner())
+ }
+
+ pub fn rd_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<R> {
+ RcRef::map(self, |r| &r.rd).borrow_mut()
+ }
+
+ pub fn wr_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<W> {
+ RcRef::map(self, |r| &r.wr).borrow_mut()
+ }
+
+ pub fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> {
+ RcRef::map(self, |r| &r.cancel_handle)
+ }
+
+ pub fn cancel_read_ops(&self) {
+ self.cancel_handle.cancel()
+ }
+
+ pub async fn read(
+ self: &Rc<Self>,
+ buf: &mut [u8],
+ ) -> Result<usize, AnyError> {
+ let mut rd = self.rd_borrow_mut().await;
+ let nread = rd.read(buf).try_or_cancel(self.cancel_handle()).await?;
+ Ok(nread)
+ }
+
+ pub async fn write(self: &Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
+ let mut wr = self.wr_borrow_mut().await;
+ let nwritten = wr.write(buf).await?;
+ Ok(nwritten)
+ }
+
+ pub async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> {
+ let mut wr = self.wr_borrow_mut().await;
+ wr.shutdown().await?;
+ Ok(())
+ }
+}
+
+pub type TcpStreamResource =
+ FullDuplexResource<tcp::OwnedReadHalf, tcp::OwnedWriteHalf>;
+
+impl Resource for TcpStreamResource {
+ fn name(&self) -> Cow<str> {
+ "tcpStream".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel_read_ops();
+ }
+}
+
+pub type TlsStreamResource = FullDuplexResource<tls::ReadHalf, tls::WriteHalf>;
+
+impl Resource for TlsStreamResource {
+ fn name(&self) -> Cow<str> {
+ "tlsStream".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel_read_ops();
+ }
+}
+
+#[cfg(unix)]
+pub type UnixStreamResource =
+ FullDuplexResource<unix::OwnedReadHalf, unix::OwnedWriteHalf>;
+
+#[cfg(not(unix))]
+pub struct UnixStreamResource;
+
+#[cfg(not(unix))]
+impl UnixStreamResource {
+ pub async fn read(
+ self: &Rc<Self>,
+ _buf: &mut [u8],
+ ) -> Result<usize, AnyError> {
+ unreachable!()
+ }
+ pub async fn write(self: &Rc<Self>, _buf: &[u8]) -> Result<usize, AnyError> {
+ unreachable!()
+ }
+ pub async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> {
+ unreachable!()
+ }
+ pub fn cancel_read_ops(&self) {
+ unreachable!()
+ }
+}
+
+impl Resource for UnixStreamResource {
+ fn name(&self) -> Cow<str> {
+ "unixStream".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel_read_ops();
+ }
+}
+
+async fn op_read_async(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ buf: Option<ZeroCopyBuf>,
+) -> Result<u32, AnyError> {
+ let buf = &mut buf.ok_or_else(null_opbuf)?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get_any(rid)
+ .ok_or_else(bad_resource_id)?;
+ let nread = if let Some(s) = resource.downcast_rc::<TcpStreamResource>() {
+ s.read(buf).await?
+ } else if let Some(s) = resource.downcast_rc::<TlsStreamResource>() {
+ s.read(buf).await?
+ } else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() {
+ s.read(buf).await?
+ } else {
+ return Err(not_supported());
+ };
+ Ok(nread as u32)
+}
+
+async fn op_write_async(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ buf: Option<ZeroCopyBuf>,
+) -> Result<u32, AnyError> {
+ let buf = &buf.ok_or_else(null_opbuf)?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get_any(rid)
+ .ok_or_else(bad_resource_id)?;
+ let nwritten = if let Some(s) = resource.downcast_rc::<TcpStreamResource>() {
+ s.write(buf).await?
+ } else if let Some(s) = resource.downcast_rc::<TlsStreamResource>() {
+ s.write(buf).await?
+ } else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() {
+ s.write(buf).await?
+ } else {
+ return Err(not_supported());
+ };
+ Ok(nwritten as u32)
+}
+
+async fn op_shutdown(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ _: (),
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get_any(rid)
+ .ok_or_else(bad_resource_id)?;
+ if let Some(s) = resource.downcast_rc::<TcpStreamResource>() {
+ s.shutdown().await?;
+ } else if let Some(s) = resource.downcast_rc::<TlsStreamResource>() {
+ s.shutdown().await?;
+ } else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() {
+ s.shutdown().await?;
+ } else {
+ return Err(not_supported());
+ }
+ Ok(())
+}
diff --git a/ext/net/lib.deno_net.d.ts b/ext/net/lib.deno_net.d.ts
new file mode 100644
index 000000000..d35e01e31
--- /dev/null
+++ b/ext/net/lib.deno_net.d.ts
@@ -0,0 +1,150 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+/// <reference no-default-lib="true" />
+/// <reference lib="esnext" />
+
+declare namespace Deno {
+ export interface NetAddr {
+ transport: "tcp" | "udp";
+ hostname: string;
+ port: number;
+ }
+
+ export interface UnixAddr {
+ transport: "unix" | "unixpacket";
+ path: string;
+ }
+
+ export type Addr = NetAddr | UnixAddr;
+
+ /** A generic network listener for stream-oriented protocols. */
+ export interface Listener extends AsyncIterable<Conn> {
+ /** Waits for and resolves to the next connection to the `Listener`. */
+ accept(): Promise<Conn>;
+ /** Close closes the listener. Any pending accept promises will be rejected
+ * with errors. */
+ close(): void;
+ /** Return the address of the `Listener`. */
+ readonly addr: Addr;
+
+ /** Return the rid of the `Listener`. */
+ readonly rid: number;
+
+ [Symbol.asyncIterator](): AsyncIterableIterator<Conn>;
+ }
+
+ export interface Conn extends Reader, Writer, Closer {
+ /** The local address of the connection. */
+ readonly localAddr: Addr;
+ /** The remote address of the connection. */
+ readonly remoteAddr: Addr;
+ /** The resource ID of the connection. */
+ readonly rid: number;
+ /** Shuts down (`shutdown(2)`) the write side of the connection. Most
+ * callers should just use `close()`. */
+ closeWrite(): Promise<void>;
+ }
+
+ export interface ListenOptions {
+ /** The port to listen on. */
+ port: number;
+ /** A literal IP address or host name that can be resolved to an IP address.
+ * If not specified, defaults to `0.0.0.0`. */
+ hostname?: string;
+ }
+
+ /** Listen announces on the local transport address.
+ *
+ * ```ts
+ * const listener1 = Deno.listen({ port: 80 })
+ * const listener2 = Deno.listen({ hostname: "192.0.2.1", port: 80 })
+ * const listener3 = Deno.listen({ hostname: "[2001:db8::1]", port: 80 });
+ * const listener4 = Deno.listen({ hostname: "golang.org", port: 80, transport: "tcp" });
+ * ```
+ *
+ * Requires `allow-net` permission. */
+ export function listen(
+ options: ListenOptions & { transport?: "tcp" },
+ ): Listener;
+
+ export interface ListenTlsOptions extends ListenOptions {
+ /** Path to a file containing a PEM formatted CA certificate. Requires
+ * `--allow-read`. */
+ certFile: string;
+ /** Server public key file. Requires `--allow-read`.*/
+ keyFile: string;
+
+ transport?: "tcp";
+ }
+
+ /** Listen announces on the local transport address over TLS (transport layer
+ * security).
+ *
+ * ```ts
+ * const lstnr = Deno.listenTls({ port: 443, certFile: "./server.crt", keyFile: "./server.key" });
+ * ```
+ *
+ * Requires `allow-net` permission. */
+ export function listenTls(options: ListenTlsOptions): Listener;
+
+ export interface ConnectOptions {
+ /** The port to connect to. */
+ port: number;
+ /** A literal IP address or host name that can be resolved to an IP address.
+ * If not specified, defaults to `127.0.0.1`. */
+ hostname?: string;
+ transport?: "tcp";
+ }
+
+ /**
+ * Connects to the hostname (default is "127.0.0.1") and port on the named
+ * transport (default is "tcp"), and resolves to the connection (`Conn`).
+ *
+ * ```ts
+ * const conn1 = await Deno.connect({ port: 80 });
+ * const conn2 = await Deno.connect({ hostname: "192.0.2.1", port: 80 });
+ * const conn3 = await Deno.connect({ hostname: "[2001:db8::1]", port: 80 });
+ * const conn4 = await Deno.connect({ hostname: "golang.org", port: 80, transport: "tcp" });
+ * ```
+ *
+ * Requires `allow-net` permission for "tcp". */
+ export function connect(options: ConnectOptions): Promise<Conn>;
+
+ export interface ConnectTlsOptions {
+ /** The port to connect to. */
+ port: number;
+ /** A literal IP address or host name that can be resolved to an IP address.
+ * If not specified, defaults to `127.0.0.1`. */
+ hostname?: string;
+ /** Server certificate file. */
+ certFile?: string;
+ }
+
+ /** Establishes a secure connection over TLS (transport layer security) using
+ * an optional cert file, hostname (default is "127.0.0.1") and port. The
+ * cert file is optional and if not included Mozilla's root certificates will
+ * be used (see also https://github.com/ctz/webpki-roots for specifics)
+ *
+ * ```ts
+ * const conn1 = await Deno.connectTls({ port: 80 });
+ * const conn2 = await Deno.connectTls({ certFile: "./certs/my_custom_root_CA.pem", hostname: "192.0.2.1", port: 80 });
+ * const conn3 = await Deno.connectTls({ hostname: "[2001:db8::1]", port: 80 });
+ * const conn4 = await Deno.connectTls({ certFile: "./certs/my_custom_root_CA.pem", hostname: "golang.org", port: 80});
+ * ```
+ *
+ * Requires `allow-net` permission.
+ */
+ export function connectTls(options: ConnectTlsOptions): Promise<Conn>;
+
+ /** Shutdown socket send operations.
+ *
+ * Matches behavior of POSIX shutdown(3).
+ *
+ * ```ts
+ * const listener = Deno.listen({ port: 80 });
+ * const conn = await listener.accept();
+ * Deno.shutdown(conn.rid);
+ * ```
+ */
+ export function shutdown(rid: number): Promise<void>;
+}
diff --git a/ext/net/lib.deno_net.unstable.d.ts b/ext/net/lib.deno_net.unstable.d.ts
new file mode 100644
index 000000000..145f232c0
--- /dev/null
+++ b/ext/net/lib.deno_net.unstable.d.ts
@@ -0,0 +1,258 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+/// <reference no-default-lib="true" />
+/// <reference lib="esnext" />
+
+declare namespace Deno {
+ /** The type of the resource record.
+ * Only the listed types are supported currently. */
+ export type RecordType =
+ | "A"
+ | "AAAA"
+ | "ANAME"
+ | "CNAME"
+ | "MX"
+ | "PTR"
+ | "SRV"
+ | "TXT";
+
+ export interface ResolveDnsOptions {
+ /** The name server to be used for lookups.
+ * If not specified, defaults to the system configuration e.g. `/etc/resolv.conf` on Unix. */
+ nameServer?: {
+ /** The IP address of the name server */
+ ipAddr: string;
+ /** The port number the query will be sent to.
+ * If not specified, defaults to 53. */
+ port?: number;
+ };
+ }
+
+ /** If `resolveDns` is called with "MX" record type specified, it will return an array of this interface. */
+ export interface MXRecord {
+ preference: number;
+ exchange: string;
+ }
+
+ /** If `resolveDns` is called with "SRV" record type specified, it will return an array of this interface. */
+ export interface SRVRecord {
+ priority: number;
+ weight: number;
+ port: number;
+ target: string;
+ }
+
+ export function resolveDns(
+ query: string,
+ recordType: "A" | "AAAA" | "ANAME" | "CNAME" | "PTR",
+ options?: ResolveDnsOptions,
+ ): Promise<string[]>;
+
+ export function resolveDns(
+ query: string,
+ recordType: "MX",
+ options?: ResolveDnsOptions,
+ ): Promise<MXRecord[]>;
+
+ export function resolveDns(
+ query: string,
+ recordType: "SRV",
+ options?: ResolveDnsOptions,
+ ): Promise<SRVRecord[]>;
+
+ export function resolveDns(
+ query: string,
+ recordType: "TXT",
+ options?: ResolveDnsOptions,
+ ): Promise<string[][]>;
+
+ /** ** UNSTABLE**: new API, yet to be vetted.
+*
+* Performs DNS resolution against the given query, returning resolved records.
+* Fails in the cases such as:
+* - the query is in invalid format
+* - the options have an invalid parameter, e.g. `nameServer.port` is beyond the range of 16-bit unsigned integer
+* - timed out
+*
+* ```ts
+* const a = await Deno.resolveDns("example.com", "A");
+*
+* const aaaa = await Deno.resolveDns("example.com", "AAAA", {
+* nameServer: { ipAddr: "8.8.8.8", port: 1234 },
+* });
+* ```
+*
+* Requires `allow-net` permission.
+ */
+ export function resolveDns(
+ query: string,
+ recordType: RecordType,
+ options?: ResolveDnsOptions,
+ ): Promise<string[] | MXRecord[] | SRVRecord[] | string[][]>;
+
+ /** **UNSTABLE**: new API, yet to be vetted.
+*
+* A generic transport listener for message-oriented protocols. */
+ export interface DatagramConn extends AsyncIterable<[Uint8Array, Addr]> {
+ /** **UNSTABLE**: new API, yet to be vetted.
+ *
+ * Waits for and resolves to the next message to the `UDPConn`. */
+ receive(p?: Uint8Array): Promise<[Uint8Array, Addr]>;
+ /** UNSTABLE: new API, yet to be vetted.
+ *
+ * Sends a message to the target. */
+ send(p: Uint8Array, addr: Addr): Promise<number>;
+ /** UNSTABLE: new API, yet to be vetted.
+ *
+ * Close closes the socket. Any pending message promises will be rejected
+ * with errors. */
+ close(): void;
+ /** Return the address of the `UDPConn`. */
+ readonly addr: Addr;
+ [Symbol.asyncIterator](): AsyncIterableIterator<[Uint8Array, Addr]>;
+ }
+
+ export interface UnixListenOptions {
+ /** A Path to the Unix Socket. */
+ path: string;
+ }
+
+ /** **UNSTABLE**: new API, yet to be vetted.
+*
+* Listen announces on the local transport address.
+*
+* ```ts
+* const listener = Deno.listen({ path: "/foo/bar.sock", transport: "unix" })
+* ```
+*
+* Requires `allow-read` and `allow-write` permission. */
+ export function listen(
+ options: UnixListenOptions & { transport: "unix" },
+ ): Listener;
+
+ /** **UNSTABLE**: new API, yet to be vetted
+*
+* Listen announces on the local transport address.
+*
+* ```ts
+* const listener1 = Deno.listenDatagram({
+* port: 80,
+* transport: "udp"
+* });
+* const listener2 = Deno.listenDatagram({
+* hostname: "golang.org",
+* port: 80,
+* transport: "udp"
+* });
+* ```
+*
+* Requires `allow-net` permission. */
+ export function listenDatagram(
+ options: ListenOptions & { transport: "udp" },
+ ): DatagramConn;
+
+ /** **UNSTABLE**: new API, yet to be vetted
+*
+* Listen announces on the local transport address.
+*
+* ```ts
+* const listener = Deno.listenDatagram({
+* path: "/foo/bar.sock",
+* transport: "unixpacket"
+* });
+* ```
+*
+* Requires `allow-read` and `allow-write` permission. */
+ export function listenDatagram(
+ options: UnixListenOptions & { transport: "unixpacket" },
+ ): DatagramConn;
+
+ export interface UnixConnectOptions {
+ transport: "unix";
+ path: string;
+ }
+
+ /** **UNSTABLE**: The unix socket transport is unstable as a new API yet to
+* be vetted. The TCP transport is considered stable.
+*
+* Connects to the hostname (default is "127.0.0.1") and port on the named
+* transport (default is "tcp"), and resolves to the connection (`Conn`).
+*
+* ```ts
+* const conn1 = await Deno.connect({ port: 80 });
+* const conn2 = await Deno.connect({ hostname: "192.0.2.1", port: 80 });
+* const conn3 = await Deno.connect({ hostname: "[2001:db8::1]", port: 80 });
+* const conn4 = await Deno.connect({ hostname: "golang.org", port: 80, transport: "tcp" });
+* const conn5 = await Deno.connect({ path: "/foo/bar.sock", transport: "unix" });
+* ```
+*
+* Requires `allow-net` permission for "tcp" and `allow-read` for "unix". */
+ export function connect(
+ options: ConnectOptions | UnixConnectOptions,
+ ): Promise<Conn>;
+
+ export interface ConnectTlsClientCertOptions {
+ /** PEM formatted client certificate chain. */
+ certChain: string;
+ /** PEM formatted (RSA or PKCS8) private key of client certificate. */
+ privateKey: string;
+ }
+
+ /** **UNSTABLE** New API, yet to be vetted.
+ *
+ * Create a TLS connection with an attached client certificate.
+ *
+ * ```ts
+ * const conn = await Deno.connectTls({
+ * hostname: "deno.land",
+ * port: 443,
+ * certChain: "---- BEGIN CERTIFICATE ----\n ...",
+ * privateKey: "---- BEGIN PRIVATE KEY ----\n ...",
+ * });
+ * ```
+ *
+ * Requires `allow-net` permission.
+ */
+ export function connectTls(
+ options: ConnectTlsOptions & ConnectTlsClientCertOptions,
+ ): Promise<Conn>;
+
+ export interface StartTlsOptions {
+ /** A literal IP address or host name that can be resolved to an IP address.
+ * If not specified, defaults to `127.0.0.1`. */
+ hostname?: string;
+ /** Server certificate file. */
+ certFile?: string;
+ }
+
+ /** **UNSTABLE**: new API, yet to be vetted.
+*
+* Start TLS handshake from an existing connection using
+* an optional cert file, hostname (default is "127.0.0.1"). The
+* cert file is optional and if not included Mozilla's root certificates will
+* be used (see also https://github.com/ctz/webpki-roots for specifics)
+* Using this function requires that the other end of the connection is
+* prepared for TLS handshake.
+*
+* ```ts
+* const conn = await Deno.connect({ port: 80, hostname: "127.0.0.1" });
+* const tlsConn = await Deno.startTls(conn, { certFile: "./certs/my_custom_root_CA.pem", hostname: "localhost" });
+* ```
+*
+* Requires `allow-net` permission.
+ */
+ export function startTls(
+ conn: Conn,
+ options?: StartTlsOptions,
+ ): Promise<Conn>;
+
+ export interface ListenTlsOptions {
+ /** **UNSTABLE**: new API, yet to be vetted.
+ *
+ * Application-Layer Protocol Negotiation (ALPN) protocols to announce to
+ * the client. If not specified, no ALPN extension will be included in the
+ * TLS handshake.
+ */
+ alpnProtocols?: string[];
+ }
+}
diff --git a/ext/net/lib.rs b/ext/net/lib.rs
new file mode 100644
index 000000000..3764433e3
--- /dev/null
+++ b/ext/net/lib.rs
@@ -0,0 +1,131 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+pub mod io;
+pub mod ops;
+pub mod ops_tls;
+#[cfg(unix)]
+pub mod ops_unix;
+pub mod resolve_addr;
+
+use deno_core::error::AnyError;
+use deno_core::include_js_files;
+use deno_core::Extension;
+use deno_core::OpState;
+use deno_tls::rustls::RootCertStore;
+use std::cell::RefCell;
+use std::path::Path;
+use std::path::PathBuf;
+use std::rc::Rc;
+
+pub trait NetPermissions {
+ fn check_net<T: AsRef<str>>(
+ &mut self,
+ _host: &(T, Option<u16>),
+ ) -> Result<(), AnyError>;
+ fn check_read(&mut self, _p: &Path) -> Result<(), AnyError>;
+ fn check_write(&mut self, _p: &Path) -> Result<(), AnyError>;
+}
+
+/// For use with this crate when the user does not want permission checks.
+pub struct NoNetPermissions;
+
+impl NetPermissions for NoNetPermissions {
+ fn check_net<T: AsRef<str>>(
+ &mut self,
+ _host: &(T, Option<u16>),
+ ) -> Result<(), AnyError> {
+ Ok(())
+ }
+
+ fn check_read(&mut self, _p: &Path) -> Result<(), AnyError> {
+ Ok(())
+ }
+
+ fn check_write(&mut self, _p: &Path) -> Result<(), AnyError> {
+ Ok(())
+ }
+}
+
+/// `UnstableChecker` is a struct so it can be placed inside `GothamState`;
+/// using type alias for a bool could work, but there's a high chance
+/// that there might be another type alias pointing to a bool, which
+/// would override previously used alias.
+pub struct UnstableChecker {
+ pub unstable: bool,
+}
+
+impl UnstableChecker {
+ /// Quits the process if the --unstable flag was not provided.
+ ///
+ /// This is intentionally a non-recoverable check so that people cannot probe
+ /// for unstable APIs from stable programs.
+ // NOTE(bartlomieju): keep in sync with `cli/program_state.rs`
+ pub fn check_unstable(&self, api_name: &str) {
+ if !self.unstable {
+ eprintln!(
+ "Unstable API '{}'. The --unstable flag must be provided.",
+ api_name
+ );
+ std::process::exit(70);
+ }
+ }
+}
+/// Helper for checking unstable features. Used for sync ops.
+pub fn check_unstable(state: &OpState, api_name: &str) {
+ state.borrow::<UnstableChecker>().check_unstable(api_name)
+}
+
+/// Helper for checking unstable features. Used for async ops.
+pub fn check_unstable2(state: &Rc<RefCell<OpState>>, api_name: &str) {
+ let state = state.borrow();
+ state.borrow::<UnstableChecker>().check_unstable(api_name)
+}
+
+pub fn get_declaration() -> PathBuf {
+ PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_net.d.ts")
+}
+
+pub fn get_unstable_declaration() -> PathBuf {
+ PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_net.unstable.d.ts")
+}
+
+#[derive(Clone)]
+pub struct DefaultTlsOptions {
+ pub root_cert_store: Option<RootCertStore>,
+}
+
+/// `UnsafelyIgnoreCertificateErrors` is a wrapper struct so it can be placed inside `GothamState`;
+/// using type alias for a `Option<Vec<String>>` could work, but there's a high chance
+/// that there might be another type alias pointing to a `Option<Vec<String>>`, which
+/// would override previously used alias.
+pub struct UnsafelyIgnoreCertificateErrors(Option<Vec<String>>);
+
+pub fn init<P: NetPermissions + 'static>(
+ root_cert_store: Option<RootCertStore>,
+ unstable: bool,
+ unsafely_ignore_certificate_errors: Option<Vec<String>>,
+) -> Extension {
+ let mut ops_to_register = vec![];
+ ops_to_register.extend(io::init());
+ ops_to_register.extend(ops::init::<P>());
+ ops_to_register.extend(ops_tls::init::<P>());
+ Extension::builder()
+ .js(include_js_files!(
+ prefix "deno:ext/net",
+ "01_net.js",
+ "02_tls.js",
+ "04_net_unstable.js",
+ ))
+ .ops(ops_to_register)
+ .state(move |state| {
+ state.put(DefaultTlsOptions {
+ root_cert_store: root_cert_store.clone(),
+ });
+ state.put(UnstableChecker { unstable });
+ state.put(UnsafelyIgnoreCertificateErrors(
+ unsafely_ignore_certificate_errors.clone(),
+ ));
+ Ok(())
+ })
+ .build()
+}
diff --git a/ext/net/ops.rs b/ext/net/ops.rs
new file mode 100644
index 000000000..a0fc2179e
--- /dev/null
+++ b/ext/net/ops.rs
@@ -0,0 +1,795 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+use crate::io::TcpStreamResource;
+use crate::resolve_addr::resolve_addr;
+use crate::resolve_addr::resolve_addr_sync;
+use crate::NetPermissions;
+use deno_core::error::bad_resource;
+use deno_core::error::custom_error;
+use deno_core::error::generic_error;
+use deno_core::error::null_opbuf;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::op_async;
+use deno_core::op_sync;
+use deno_core::AsyncRefCell;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
+use deno_core::OpPair;
+use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
+use deno_core::ResourceId;
+use deno_core::ZeroCopyBuf;
+use log::debug;
+use serde::Deserialize;
+use serde::Serialize;
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::net::SocketAddr;
+use std::rc::Rc;
+use tokio::net::TcpListener;
+use tokio::net::TcpStream;
+use tokio::net::UdpSocket;
+use trust_dns_proto::rr::record_data::RData;
+use trust_dns_proto::rr::record_type::RecordType;
+use trust_dns_resolver::config::NameServerConfigGroup;
+use trust_dns_resolver::config::ResolverConfig;
+use trust_dns_resolver::config::ResolverOpts;
+use trust_dns_resolver::system_conf;
+use trust_dns_resolver::AsyncResolver;
+
+#[cfg(unix)]
+use super::ops_unix as net_unix;
+#[cfg(unix)]
+use crate::io::UnixStreamResource;
+#[cfg(unix)]
+use std::path::Path;
+
+pub fn init<P: NetPermissions + 'static>() -> Vec<OpPair> {
+ vec![
+ ("op_accept", op_async(op_accept)),
+ ("op_connect", op_async(op_connect::<P>)),
+ ("op_listen", op_sync(op_listen::<P>)),
+ ("op_datagram_receive", op_async(op_datagram_receive)),
+ ("op_datagram_send", op_async(op_datagram_send::<P>)),
+ ("op_dns_resolve", op_async(op_dns_resolve::<P>)),
+ ]
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct OpConn {
+ pub rid: ResourceId,
+ pub remote_addr: Option<OpAddr>,
+ pub local_addr: Option<OpAddr>,
+}
+
+#[derive(Serialize)]
+#[serde(tag = "transport", rename_all = "lowercase")]
+pub enum OpAddr {
+ Tcp(IpAddr),
+ Udp(IpAddr),
+ #[cfg(unix)]
+ Unix(net_unix::UnixAddr),
+ #[cfg(unix)]
+ UnixPacket(net_unix::UnixAddr),
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+/// A received datagram packet (from udp or unixpacket)
+pub struct OpPacket {
+ pub size: usize,
+ pub remote_addr: OpAddr,
+}
+
+#[derive(Serialize)]
+pub struct IpAddr {
+ pub hostname: String,
+ pub port: u16,
+}
+
+#[derive(Deserialize)]
+pub(crate) struct AcceptArgs {
+ pub rid: ResourceId,
+ pub transport: String,
+}
+
+async fn accept_tcp(
+ state: Rc<RefCell<OpState>>,
+ args: AcceptArgs,
+ _: (),
+) -> Result<OpConn, AnyError> {
+ let rid = args.rid;
+
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<TcpListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let listener = RcRef::map(&resource, |r| &r.listener)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let (tcp_stream, _socket_addr) =
+ listener.accept().try_or_cancel(cancel).await.map_err(|e| {
+ // FIXME(bartlomieju): compatibility with current JS implementation
+ if let std::io::ErrorKind::Interrupted = e.kind() {
+ bad_resource("Listener has been closed")
+ } else {
+ e.into()
+ }
+ })?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+
+ let mut state = state.borrow_mut();
+ let rid = state
+ .resource_table
+ .add(TcpStreamResource::new(tcp_stream.into_split()));
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: local_addr.ip().to_string(),
+ port: local_addr.port(),
+ })),
+ remote_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: remote_addr.ip().to_string(),
+ port: remote_addr.port(),
+ })),
+ })
+}
+
+async fn op_accept(
+ state: Rc<RefCell<OpState>>,
+ args: AcceptArgs,
+ _: (),
+) -> Result<OpConn, AnyError> {
+ match args.transport.as_str() {
+ "tcp" => accept_tcp(state, args, ()).await,
+ #[cfg(unix)]
+ "unix" => net_unix::accept_unix(state, args, ()).await,
+ other => Err(bad_transport(other)),
+ }
+}
+
+fn bad_transport(transport: &str) -> AnyError {
+ generic_error(format!("Unsupported transport protocol {}", transport))
+}
+
+#[derive(Deserialize)]
+pub(crate) struct ReceiveArgs {
+ pub rid: ResourceId,
+ pub transport: String,
+}
+
+async fn receive_udp(
+ state: Rc<RefCell<OpState>>,
+ args: ReceiveArgs,
+ zero_copy: Option<ZeroCopyBuf>,
+) -> Result<OpPacket, AnyError> {
+ let zero_copy = zero_copy.ok_or_else(null_opbuf)?;
+ let mut zero_copy = zero_copy.clone();
+
+ let rid = args.rid;
+
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<UdpSocketResource>(rid)
+ .ok_or_else(|| bad_resource("Socket has been closed"))?;
+ let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
+ let cancel_handle = RcRef::map(&resource, |r| &r.cancel);
+ let (size, remote_addr) = socket
+ .recv_from(&mut zero_copy)
+ .try_or_cancel(cancel_handle)
+ .await?;
+ Ok(OpPacket {
+ size,
+ remote_addr: OpAddr::Udp(IpAddr {
+ hostname: remote_addr.ip().to_string(),
+ port: remote_addr.port(),
+ }),
+ })
+}
+
+async fn op_datagram_receive(
+ state: Rc<RefCell<OpState>>,
+ args: ReceiveArgs,
+ zero_copy: Option<ZeroCopyBuf>,
+) -> Result<OpPacket, AnyError> {
+ match args.transport.as_str() {
+ "udp" => receive_udp(state, args, zero_copy).await,
+ #[cfg(unix)]
+ "unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await,
+ other => Err(bad_transport(other)),
+ }
+}
+
+#[derive(Deserialize)]
+struct SendArgs {
+ rid: ResourceId,
+ transport: String,
+ #[serde(flatten)]
+ transport_args: ArgsEnum,
+}
+
+async fn op_datagram_send<NP>(
+ state: Rc<RefCell<OpState>>,
+ args: SendArgs,
+ zero_copy: Option<ZeroCopyBuf>,
+) -> Result<usize, AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ let zero_copy = zero_copy.ok_or_else(null_opbuf)?;
+ let zero_copy = zero_copy.clone();
+
+ match args {
+ SendArgs {
+ rid,
+ transport,
+ transport_args: ArgsEnum::Ip(args),
+ } if transport == "udp" => {
+ {
+ let mut s = state.borrow_mut();
+ s.borrow_mut::<NP>()
+ .check_net(&(&args.hostname, Some(args.port)))?;
+ }
+ let addr = resolve_addr(&args.hostname, args.port)
+ .await?
+ .next()
+ .ok_or_else(|| generic_error("No resolved address found"))?;
+
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<UdpSocketResource>(rid)
+ .ok_or_else(|| bad_resource("Socket has been closed"))?;
+ let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
+ let byte_length = socket.send_to(&zero_copy, &addr).await?;
+ Ok(byte_length)
+ }
+ #[cfg(unix)]
+ SendArgs {
+ rid,
+ transport,
+ transport_args: ArgsEnum::Unix(args),
+ } if transport == "unixpacket" => {
+ let address_path = Path::new(&args.path);
+ {
+ let mut s = state.borrow_mut();
+ s.borrow_mut::<NP>().check_write(address_path)?;
+ }
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<net_unix::UnixDatagramResource>(rid)
+ .ok_or_else(|| {
+ custom_error("NotConnected", "Socket has been closed")
+ })?;
+ let socket = RcRef::map(&resource, |r| &r.socket)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
+ let byte_length = socket.send_to(&zero_copy, address_path).await?;
+ Ok(byte_length)
+ }
+ _ => Err(type_error("Wrong argument format!")),
+ }
+}
+
+#[derive(Deserialize)]
+struct ConnectArgs {
+ transport: String,
+ #[serde(flatten)]
+ transport_args: ArgsEnum,
+}
+
+async fn op_connect<NP>(
+ state: Rc<RefCell<OpState>>,
+ args: ConnectArgs,
+ _: (),
+) -> Result<OpConn, AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ match args {
+ ConnectArgs {
+ transport,
+ transport_args: ArgsEnum::Ip(args),
+ } if transport == "tcp" => {
+ {
+ let mut state_ = state.borrow_mut();
+ state_
+ .borrow_mut::<NP>()
+ .check_net(&(&args.hostname, Some(args.port)))?;
+ }
+ let addr = resolve_addr(&args.hostname, args.port)
+ .await?
+ .next()
+ .ok_or_else(|| generic_error("No resolved address found"))?;
+ let tcp_stream = TcpStream::connect(&addr).await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+
+ let mut state_ = state.borrow_mut();
+ let rid = state_
+ .resource_table
+ .add(TcpStreamResource::new(tcp_stream.into_split()));
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: local_addr.ip().to_string(),
+ port: local_addr.port(),
+ })),
+ remote_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: remote_addr.ip().to_string(),
+ port: remote_addr.port(),
+ })),
+ })
+ }
+ #[cfg(unix)]
+ ConnectArgs {
+ transport,
+ transport_args: ArgsEnum::Unix(args),
+ } if transport == "unix" => {
+ let address_path = Path::new(&args.path);
+ super::check_unstable2(&state, "Deno.connect");
+ {
+ let mut state_ = state.borrow_mut();
+ state_.borrow_mut::<NP>().check_read(address_path)?;
+ state_.borrow_mut::<NP>().check_write(address_path)?;
+ }
+ let path = args.path;
+ let unix_stream = net_unix::UnixStream::connect(Path::new(&path)).await?;
+ let local_addr = unix_stream.local_addr()?;
+ let remote_addr = unix_stream.peer_addr()?;
+
+ let mut state_ = state.borrow_mut();
+ let resource = UnixStreamResource::new(unix_stream.into_split());
+ let rid = state_.resource_table.add(resource);
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Unix(net_unix::UnixAddr {
+ path: local_addr.as_pathname().and_then(net_unix::pathstring),
+ })),
+ remote_addr: Some(OpAddr::Unix(net_unix::UnixAddr {
+ path: remote_addr.as_pathname().and_then(net_unix::pathstring),
+ })),
+ })
+ }
+ _ => Err(type_error("Wrong argument format!")),
+ }
+}
+
+pub struct TcpListenerResource {
+ pub listener: AsyncRefCell<TcpListener>,
+ pub cancel: CancelHandle,
+}
+
+impl Resource for TcpListenerResource {
+ fn name(&self) -> Cow<str> {
+ "tcpListener".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
+}
+
+struct UdpSocketResource {
+ socket: AsyncRefCell<UdpSocket>,
+ cancel: CancelHandle,
+}
+
+impl Resource for UdpSocketResource {
+ fn name(&self) -> Cow<str> {
+ "udpSocket".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel()
+ }
+}
+
+#[derive(Deserialize)]
+struct IpListenArgs {
+ hostname: String,
+ port: u16,
+}
+
+#[derive(Deserialize)]
+#[serde(untagged)]
+enum ArgsEnum {
+ Ip(IpListenArgs),
+ #[cfg(unix)]
+ Unix(net_unix::UnixListenArgs),
+}
+
+#[derive(Deserialize)]
+struct ListenArgs {
+ transport: String,
+ #[serde(flatten)]
+ transport_args: ArgsEnum,
+}
+
+fn listen_tcp(
+ state: &mut OpState,
+ addr: SocketAddr,
+) -> Result<(u32, SocketAddr), AnyError> {
+ let std_listener = std::net::TcpListener::bind(&addr)?;
+ std_listener.set_nonblocking(true)?;
+ let listener = TcpListener::from_std(std_listener)?;
+ let local_addr = listener.local_addr()?;
+ let listener_resource = TcpListenerResource {
+ listener: AsyncRefCell::new(listener),
+ cancel: Default::default(),
+ };
+ let rid = state.resource_table.add(listener_resource);
+
+ Ok((rid, local_addr))
+}
+
+fn listen_udp(
+ state: &mut OpState,
+ addr: SocketAddr,
+) -> Result<(u32, SocketAddr), AnyError> {
+ let std_socket = std::net::UdpSocket::bind(&addr)?;
+ std_socket.set_nonblocking(true)?;
+ let socket = UdpSocket::from_std(std_socket)?;
+ let local_addr = socket.local_addr()?;
+ let socket_resource = UdpSocketResource {
+ socket: AsyncRefCell::new(socket),
+ cancel: Default::default(),
+ };
+ let rid = state.resource_table.add(socket_resource);
+
+ Ok((rid, local_addr))
+}
+
+fn op_listen<NP>(
+ state: &mut OpState,
+ args: ListenArgs,
+ _: (),
+) -> Result<OpConn, AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ match args {
+ ListenArgs {
+ transport,
+ transport_args: ArgsEnum::Ip(args),
+ } => {
+ {
+ if transport == "udp" {
+ super::check_unstable(state, "Deno.listenDatagram");
+ }
+ state
+ .borrow_mut::<NP>()
+ .check_net(&(&args.hostname, Some(args.port)))?;
+ }
+ let addr = resolve_addr_sync(&args.hostname, args.port)?
+ .next()
+ .ok_or_else(|| generic_error("No resolved address found"))?;
+ let (rid, local_addr) = if transport == "tcp" {
+ listen_tcp(state, addr)?
+ } else {
+ listen_udp(state, addr)?
+ };
+ debug!(
+ "New listener {} {}:{}",
+ rid,
+ local_addr.ip().to_string(),
+ local_addr.port()
+ );
+ let ip_addr = IpAddr {
+ hostname: local_addr.ip().to_string(),
+ port: local_addr.port(),
+ };
+ Ok(OpConn {
+ rid,
+ local_addr: Some(match transport.as_str() {
+ "udp" => OpAddr::Udp(ip_addr),
+ "tcp" => OpAddr::Tcp(ip_addr),
+ // NOTE: This could be unreachable!()
+ other => return Err(bad_transport(other)),
+ }),
+ remote_addr: None,
+ })
+ }
+ #[cfg(unix)]
+ ListenArgs {
+ transport,
+ transport_args: ArgsEnum::Unix(args),
+ } if transport == "unix" || transport == "unixpacket" => {
+ let address_path = Path::new(&args.path);
+ {
+ if transport == "unix" {
+ super::check_unstable(state, "Deno.listen");
+ }
+ if transport == "unixpacket" {
+ super::check_unstable(state, "Deno.listenDatagram");
+ }
+ let permissions = state.borrow_mut::<NP>();
+ permissions.check_read(address_path)?;
+ permissions.check_write(address_path)?;
+ }
+ let (rid, local_addr) = if transport == "unix" {
+ net_unix::listen_unix(state, address_path)?
+ } else {
+ net_unix::listen_unix_packet(state, address_path)?
+ };
+ debug!(
+ "New listener {} {}",
+ rid,
+ local_addr.as_pathname().unwrap().display(),
+ );
+ let unix_addr = net_unix::UnixAddr {
+ path: local_addr.as_pathname().and_then(net_unix::pathstring),
+ };
+
+ Ok(OpConn {
+ rid,
+ local_addr: Some(match transport.as_str() {
+ "unix" => OpAddr::Unix(unix_addr),
+ "unixpacket" => OpAddr::UnixPacket(unix_addr),
+ other => return Err(bad_transport(other)),
+ }),
+ remote_addr: None,
+ })
+ }
+ #[cfg(unix)]
+ _ => Err(type_error("Wrong argument format!")),
+ }
+}
+
+#[derive(Serialize, PartialEq, Debug)]
+#[serde(untagged)]
+enum DnsReturnRecord {
+ A(String),
+ Aaaa(String),
+ Aname(String),
+ Cname(String),
+ Mx {
+ preference: u16,
+ exchange: String,
+ },
+ Ptr(String),
+ Srv {
+ priority: u16,
+ weight: u16,
+ port: u16,
+ target: String,
+ },
+ Txt(Vec<String>),
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ResolveAddrArgs {
+ query: String,
+ record_type: RecordType,
+ options: Option<ResolveDnsOption>,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ResolveDnsOption {
+ name_server: Option<NameServer>,
+}
+
+fn default_port() -> u16 {
+ 53
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct NameServer {
+ ip_addr: String,
+ #[serde(default = "default_port")]
+ port: u16,
+}
+
+async fn op_dns_resolve<NP>(
+ state: Rc<RefCell<OpState>>,
+ args: ResolveAddrArgs,
+ _: (),
+) -> Result<Vec<DnsReturnRecord>, AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ let ResolveAddrArgs {
+ query,
+ record_type,
+ options,
+ } = args;
+
+ let (config, opts) = if let Some(name_server) =
+ options.as_ref().and_then(|o| o.name_server.as_ref())
+ {
+ let group = NameServerConfigGroup::from_ips_clear(
+ &[name_server.ip_addr.parse()?],
+ name_server.port,
+ true,
+ );
+ (
+ ResolverConfig::from_parts(None, vec![], group),
+ ResolverOpts::default(),
+ )
+ } else {
+ system_conf::read_system_conf()?
+ };
+
+ {
+ let mut s = state.borrow_mut();
+ let perm = s.borrow_mut::<NP>();
+
+ // Checks permission against the name servers which will be actually queried.
+ for ns in config.name_servers() {
+ let socker_addr = &ns.socket_addr;
+ let ip = socker_addr.ip().to_string();
+ let port = socker_addr.port();
+ perm.check_net(&(ip, Some(port)))?;
+ }
+ }
+
+ let resolver = AsyncResolver::tokio(config, opts)?;
+
+ let results = resolver
+ .lookup(query, record_type, Default::default())
+ .await
+ .map_err(|e| generic_error(format!("{}", e)))?
+ .iter()
+ .filter_map(rdata_to_return_record(record_type))
+ .collect();
+
+ Ok(results)
+}
+
+fn rdata_to_return_record(
+ ty: RecordType,
+) -> impl Fn(&RData) -> Option<DnsReturnRecord> {
+ use RecordType::*;
+ move |r: &RData| -> Option<DnsReturnRecord> {
+ match ty {
+ A => r.as_a().map(ToString::to_string).map(DnsReturnRecord::A),
+ AAAA => r
+ .as_aaaa()
+ .map(ToString::to_string)
+ .map(DnsReturnRecord::Aaaa),
+ ANAME => r
+ .as_aname()
+ .map(ToString::to_string)
+ .map(DnsReturnRecord::Aname),
+ CNAME => r
+ .as_cname()
+ .map(ToString::to_string)
+ .map(DnsReturnRecord::Cname),
+ MX => r.as_mx().map(|mx| DnsReturnRecord::Mx {
+ preference: mx.preference(),
+ exchange: mx.exchange().to_string(),
+ }),
+ PTR => r
+ .as_ptr()
+ .map(ToString::to_string)
+ .map(DnsReturnRecord::Ptr),
+ SRV => r.as_srv().map(|srv| DnsReturnRecord::Srv {
+ priority: srv.priority(),
+ weight: srv.weight(),
+ port: srv.port(),
+ target: srv.target().to_string(),
+ }),
+ TXT => r.as_txt().map(|txt| {
+ let texts: Vec<String> = txt
+ .iter()
+ .map(|bytes| {
+ // Tries to parse these bytes as Latin-1
+ bytes.iter().map(|&b| b as char).collect::<String>()
+ })
+ .collect();
+ DnsReturnRecord::Txt(texts)
+ }),
+ // TODO(magurotuna): Other record types are not supported
+ _ => todo!(),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::net::Ipv4Addr;
+ use std::net::Ipv6Addr;
+ use trust_dns_proto::rr::rdata::mx::MX;
+ use trust_dns_proto::rr::rdata::srv::SRV;
+ use trust_dns_proto::rr::rdata::txt::TXT;
+ use trust_dns_proto::rr::record_data::RData;
+ use trust_dns_proto::rr::Name;
+
+ #[test]
+ fn rdata_to_return_record_a() {
+ let func = rdata_to_return_record(RecordType::A);
+ let rdata = RData::A(Ipv4Addr::new(127, 0, 0, 1));
+ assert_eq!(
+ func(&rdata),
+ Some(DnsReturnRecord::A("127.0.0.1".to_string()))
+ );
+ }
+
+ #[test]
+ fn rdata_to_return_record_aaaa() {
+ let func = rdata_to_return_record(RecordType::AAAA);
+ let rdata = RData::AAAA(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1));
+ assert_eq!(func(&rdata), Some(DnsReturnRecord::Aaaa("::1".to_string())));
+ }
+
+ #[test]
+ fn rdata_to_return_record_aname() {
+ let func = rdata_to_return_record(RecordType::ANAME);
+ let rdata = RData::ANAME(Name::new());
+ assert_eq!(func(&rdata), Some(DnsReturnRecord::Aname("".to_string())));
+ }
+
+ #[test]
+ fn rdata_to_return_record_cname() {
+ let func = rdata_to_return_record(RecordType::CNAME);
+ let rdata = RData::CNAME(Name::new());
+ assert_eq!(func(&rdata), Some(DnsReturnRecord::Cname("".to_string())));
+ }
+
+ #[test]
+ fn rdata_to_return_record_mx() {
+ let func = rdata_to_return_record(RecordType::MX);
+ let rdata = RData::MX(MX::new(10, Name::new()));
+ assert_eq!(
+ func(&rdata),
+ Some(DnsReturnRecord::Mx {
+ preference: 10,
+ exchange: "".to_string()
+ })
+ );
+ }
+
+ #[test]
+ fn rdata_to_return_record_ptr() {
+ let func = rdata_to_return_record(RecordType::PTR);
+ let rdata = RData::PTR(Name::new());
+ assert_eq!(func(&rdata), Some(DnsReturnRecord::Ptr("".to_string())));
+ }
+
+ #[test]
+ fn rdata_to_return_record_srv() {
+ let func = rdata_to_return_record(RecordType::SRV);
+ let rdata = RData::SRV(SRV::new(1, 2, 3, Name::new()));
+ assert_eq!(
+ func(&rdata),
+ Some(DnsReturnRecord::Srv {
+ priority: 1,
+ weight: 2,
+ port: 3,
+ target: "".to_string()
+ })
+ );
+ }
+
+ #[test]
+ fn rdata_to_return_record_txt() {
+ let func = rdata_to_return_record(RecordType::TXT);
+ let rdata = RData::TXT(TXT::from_bytes(vec![
+ "foo".as_bytes(),
+ "bar".as_bytes(),
+ &[0xa3], // "£" in Latin-1
+ &[0xe3, 0x81, 0x82], // "あ" in UTF-8
+ ]));
+ assert_eq!(
+ func(&rdata),
+ Some(DnsReturnRecord::Txt(vec![
+ "foo".to_string(),
+ "bar".to_string(),
+ "£".to_string(),
+ "ã\u{81}\u{82}".to_string(),
+ ]))
+ );
+ }
+}
diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs
new file mode 100644
index 000000000..14a135d7d
--- /dev/null
+++ b/ext/net/ops_tls.rs
@@ -0,0 +1,1061 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+use crate::io::TcpStreamResource;
+use crate::io::TlsStreamResource;
+use crate::ops::IpAddr;
+use crate::ops::OpAddr;
+use crate::ops::OpConn;
+use crate::resolve_addr::resolve_addr;
+use crate::resolve_addr::resolve_addr_sync;
+use crate::DefaultTlsOptions;
+use crate::NetPermissions;
+use crate::UnsafelyIgnoreCertificateErrors;
+use deno_core::error::bad_resource;
+use deno_core::error::bad_resource_id;
+use deno_core::error::custom_error;
+use deno_core::error::generic_error;
+use deno_core::error::invalid_hostname;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::futures::future::poll_fn;
+use deno_core::futures::ready;
+use deno_core::futures::task::noop_waker_ref;
+use deno_core::futures::task::AtomicWaker;
+use deno_core::futures::task::Context;
+use deno_core::futures::task::Poll;
+use deno_core::futures::task::RawWaker;
+use deno_core::futures::task::RawWakerVTable;
+use deno_core::futures::task::Waker;
+use deno_core::op_async;
+use deno_core::op_sync;
+use deno_core::parking_lot::Mutex;
+use deno_core::AsyncRefCell;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
+use deno_core::OpPair;
+use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
+use deno_core::ResourceId;
+use deno_tls::create_client_config;
+use deno_tls::rustls::internal::pemfile::certs;
+use deno_tls::rustls::internal::pemfile::pkcs8_private_keys;
+use deno_tls::rustls::internal::pemfile::rsa_private_keys;
+use deno_tls::rustls::Certificate;
+use deno_tls::rustls::ClientConfig;
+use deno_tls::rustls::ClientSession;
+use deno_tls::rustls::NoClientAuth;
+use deno_tls::rustls::PrivateKey;
+use deno_tls::rustls::ServerConfig;
+use deno_tls::rustls::ServerSession;
+use deno_tls::rustls::Session;
+use deno_tls::webpki::DNSNameRef;
+use io::Error;
+use io::Read;
+use io::Write;
+use serde::Deserialize;
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::convert::From;
+use std::fs::File;
+use std::io;
+use std::io::BufRead;
+use std::io::BufReader;
+use std::io::ErrorKind;
+use std::ops::Deref;
+use std::ops::DerefMut;
+use std::path::Path;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::sync::Weak;
+use tokio::io::AsyncRead;
+use tokio::io::AsyncWrite;
+use tokio::io::ReadBuf;
+use tokio::net::TcpListener;
+use tokio::net::TcpStream;
+use tokio::task::spawn_local;
+
+#[derive(Debug)]
+enum TlsSession {
+ Client(ClientSession),
+ Server(ServerSession),
+}
+
+impl Deref for TlsSession {
+ type Target = dyn Session;
+
+ fn deref(&self) -> &Self::Target {
+ match self {
+ TlsSession::Client(client_session) => client_session,
+ TlsSession::Server(server_session) => server_session,
+ }
+ }
+}
+
+impl DerefMut for TlsSession {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ match self {
+ TlsSession::Client(client_session) => client_session,
+ TlsSession::Server(server_session) => server_session,
+ }
+ }
+}
+
+impl From<ClientSession> for TlsSession {
+ fn from(client_session: ClientSession) -> Self {
+ TlsSession::Client(client_session)
+ }
+}
+
+impl From<ServerSession> for TlsSession {
+ fn from(server_session: ServerSession) -> Self {
+ TlsSession::Server(server_session)
+ }
+}
+
+#[derive(Copy, Clone, Debug, Eq, PartialEq)]
+enum Flow {
+ Read,
+ Write,
+}
+
+#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
+enum State {
+ StreamOpen,
+ StreamClosed,
+ TlsClosing,
+ TlsClosed,
+ TcpClosed,
+}
+
+#[derive(Debug)]
+pub struct TlsStream(Option<TlsStreamInner>);
+
+impl TlsStream {
+ fn new(tcp: TcpStream, tls: TlsSession) -> Self {
+ let inner = TlsStreamInner {
+ tcp,
+ tls,
+ rd_state: State::StreamOpen,
+ wr_state: State::StreamOpen,
+ };
+ Self(Some(inner))
+ }
+
+ pub fn new_client_side(
+ tcp: TcpStream,
+ tls_config: &Arc<ClientConfig>,
+ hostname: DNSNameRef,
+ ) -> Self {
+ let tls = TlsSession::Client(ClientSession::new(tls_config, hostname));
+ Self::new(tcp, tls)
+ }
+
+ pub fn new_server_side(
+ tcp: TcpStream,
+ tls_config: &Arc<ServerConfig>,
+ ) -> Self {
+ let tls = TlsSession::Server(ServerSession::new(tls_config));
+ Self::new(tcp, tls)
+ }
+
+ pub async fn handshake(&mut self) -> io::Result<()> {
+ poll_fn(|cx| self.inner_mut().poll_io(cx, Flow::Write)).await
+ }
+
+ fn into_split(self) -> (ReadHalf, WriteHalf) {
+ let shared = Shared::new(self);
+ let rd = ReadHalf {
+ shared: shared.clone(),
+ };
+ let wr = WriteHalf { shared };
+ (rd, wr)
+ }
+
+ /// Tokio-rustls compatibility: returns a reference to the underlying TCP
+ /// stream, and a reference to the Rustls `Session` object.
+ pub fn get_ref(&self) -> (&TcpStream, &dyn Session) {
+ let inner = self.0.as_ref().unwrap();
+ (&inner.tcp, &*inner.tls)
+ }
+
+ fn inner_mut(&mut self) -> &mut TlsStreamInner {
+ self.0.as_mut().unwrap()
+ }
+}
+
+impl AsyncRead for TlsStream {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ self.inner_mut().poll_read(cx, buf)
+ }
+}
+
+impl AsyncWrite for TlsStream {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.inner_mut().poll_write(cx, buf)
+ }
+
+ fn poll_flush(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<()>> {
+ self.inner_mut().poll_io(cx, Flow::Write)
+ // The underlying TCP stream does not need to be flushed.
+ }
+
+ fn poll_shutdown(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<()>> {
+ self.inner_mut().poll_shutdown(cx)
+ }
+}
+
+impl Drop for TlsStream {
+ fn drop(&mut self) {
+ let mut inner = self.0.take().unwrap();
+
+ let mut cx = Context::from_waker(noop_waker_ref());
+ let use_linger_task = inner.poll_close(&mut cx).is_pending();
+
+ if use_linger_task {
+ spawn_local(poll_fn(move |cx| inner.poll_close(cx)));
+ } else if cfg!(debug_assertions) {
+ spawn_local(async {}); // Spawn dummy task to detect missing LocalSet.
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct TlsStreamInner {
+ tls: TlsSession,
+ tcp: TcpStream,
+ rd_state: State,
+ wr_state: State,
+}
+
+impl TlsStreamInner {
+ fn poll_io(
+ &mut self,
+ cx: &mut Context<'_>,
+ flow: Flow,
+ ) -> Poll<io::Result<()>> {
+ loop {
+ let wr_ready = loop {
+ match self.wr_state {
+ _ if self.tls.is_handshaking() && !self.tls.wants_write() => {
+ break true;
+ }
+ _ if self.tls.is_handshaking() => {}
+ State::StreamOpen if !self.tls.wants_write() => break true,
+ State::StreamClosed => {
+ // Rustls will enqueue the 'CloseNotify' alert and send it after
+ // flusing the data that is already in the queue.
+ self.tls.send_close_notify();
+ self.wr_state = State::TlsClosing;
+ continue;
+ }
+ State::TlsClosing if !self.tls.wants_write() => {
+ self.wr_state = State::TlsClosed;
+ continue;
+ }
+ // If a 'CloseNotify' alert sent by the remote end has been received,
+ // shut down the underlying TCP socket. Otherwise, consider polling
+ // done for the moment.
+ State::TlsClosed if self.rd_state < State::TlsClosed => break true,
+ State::TlsClosed
+ if Pin::new(&mut self.tcp).poll_shutdown(cx)?.is_pending() =>
+ {
+ break false;
+ }
+ State::TlsClosed => {
+ self.wr_state = State::TcpClosed;
+ continue;
+ }
+ State::TcpClosed => break true,
+ _ => {}
+ }
+
+ // Poll whether there is space in the socket send buffer so we can flush
+ // the remaining outgoing ciphertext.
+ if self.tcp.poll_write_ready(cx)?.is_pending() {
+ break false;
+ }
+
+ // Write ciphertext to the TCP socket.
+ let mut wrapped_tcp = ImplementWriteTrait(&mut self.tcp);
+ match self.tls.write_tls(&mut wrapped_tcp) {
+ Ok(0) => unreachable!(),
+ Ok(_) => {}
+ Err(err) if err.kind() == ErrorKind::WouldBlock => {}
+ Err(err) => return Poll::Ready(Err(err)),
+ }
+ };
+
+ let rd_ready = loop {
+ match self.rd_state {
+ State::TcpClosed if self.tls.is_handshaking() => {
+ let err = Error::new(ErrorKind::UnexpectedEof, "tls handshake eof");
+ return Poll::Ready(Err(err));
+ }
+ _ if self.tls.is_handshaking() && !self.tls.wants_read() => {
+ break true;
+ }
+ _ if self.tls.is_handshaking() => {}
+ State::StreamOpen if !self.tls.wants_read() => break true,
+ State::StreamOpen => {}
+ State::StreamClosed if !self.tls.wants_read() => {
+ // Rustls has more incoming cleartext buffered up, but the TLS
+ // session is closing so this data will never be processed by the
+ // application layer. Just like what would happen if this were a raw
+ // TCP stream, don't gracefully end the TLS session, but abort it.
+ return Poll::Ready(Err(Error::from(ErrorKind::ConnectionReset)));
+ }
+ State::StreamClosed => {}
+ State::TlsClosed if self.wr_state == State::TcpClosed => {
+ // Wait for the remote end to gracefully close the TCP connection.
+ // TODO(piscisaureus): this is unnecessary; remove when stable.
+ }
+ _ => break true,
+ }
+
+ if self.rd_state < State::TlsClosed {
+ // Do a zero-length plaintext read so we can detect the arrival of
+ // 'CloseNotify' messages, even if only the write half is open.
+ // Actually reading data from the socket is done in `poll_read()`.
+ match self.tls.read(&mut []) {
+ Ok(0) => {}
+ Err(err) if err.kind() == ErrorKind::ConnectionAborted => {
+ // `Session::read()` returns `ConnectionAborted` when a
+ // 'CloseNotify' alert has been received, which indicates that
+ // the remote peer wants to gracefully end the TLS session.
+ self.rd_state = State::TlsClosed;
+ continue;
+ }
+ Err(err) => return Poll::Ready(Err(err)),
+ _ => unreachable!(),
+ }
+ }
+
+ // Poll whether more ciphertext is available in the socket receive
+ // buffer.
+ if self.tcp.poll_read_ready(cx)?.is_pending() {
+ break false;
+ }
+
+ // Receive ciphertext from the socket.
+ let mut wrapped_tcp = ImplementReadTrait(&mut self.tcp);
+ match self.tls.read_tls(&mut wrapped_tcp) {
+ Ok(0) => self.rd_state = State::TcpClosed,
+ Ok(_) => self
+ .tls
+ .process_new_packets()
+ .map_err(|err| Error::new(ErrorKind::InvalidData, err))?,
+ Err(err) if err.kind() == ErrorKind::WouldBlock => {}
+ Err(err) => return Poll::Ready(Err(err)),
+ }
+ };
+
+ if wr_ready {
+ if self.rd_state >= State::TlsClosed
+ && self.wr_state >= State::TlsClosed
+ && self.wr_state < State::TcpClosed
+ {
+ continue;
+ }
+ if self.tls.wants_write() {
+ continue;
+ }
+ }
+
+ let io_ready = match flow {
+ _ if self.tls.is_handshaking() => false,
+ Flow::Read => rd_ready,
+ Flow::Write => wr_ready,
+ };
+ return match io_ready {
+ false => Poll::Pending,
+ true => Poll::Ready(Ok(())),
+ };
+ }
+ }
+
+ fn poll_read(
+ &mut self,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ ready!(self.poll_io(cx, Flow::Read))?;
+
+ if self.rd_state == State::StreamOpen {
+ let buf_slice =
+ unsafe { &mut *(buf.unfilled_mut() as *mut [_] as *mut [u8]) };
+ let bytes_read = self.tls.read(buf_slice)?;
+ assert_ne!(bytes_read, 0);
+ unsafe { buf.assume_init(bytes_read) };
+ buf.advance(bytes_read);
+ }
+
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_write(
+ &mut self,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ if buf.is_empty() {
+ // Tokio-rustls compatibility: a zero byte write always succeeds.
+ Poll::Ready(Ok(0))
+ } else if self.wr_state == State::StreamOpen {
+ // Flush Rustls' ciphertext send queue.
+ ready!(self.poll_io(cx, Flow::Write))?;
+
+ // Copy data from `buf` to the Rustls cleartext send queue.
+ let bytes_written = self.tls.write(buf)?;
+ assert_ne!(bytes_written, 0);
+
+ // Try to flush as much ciphertext as possible. However, since we just
+ // handed off at least some bytes to rustls, so we can't return
+ // `Poll::Pending()` any more: this would tell the caller that it should
+ // try to send those bytes again.
+ let _ = self.poll_io(cx, Flow::Write)?;
+
+ Poll::Ready(Ok(bytes_written))
+ } else {
+ // Return error if stream has been shut down for writing.
+ Poll::Ready(Err(ErrorKind::BrokenPipe.into()))
+ }
+ }
+
+ fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ if self.wr_state == State::StreamOpen {
+ self.wr_state = State::StreamClosed;
+ }
+
+ ready!(self.poll_io(cx, Flow::Write))?;
+
+ // At minimum, a TLS 'CloseNotify' alert should have been sent.
+ assert!(self.wr_state >= State::TlsClosed);
+ // If we received a TLS 'CloseNotify' alert from the remote end
+ // already, the TCP socket should be shut down at this point.
+ assert!(
+ self.rd_state < State::TlsClosed || self.wr_state == State::TcpClosed
+ );
+
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ if self.rd_state == State::StreamOpen {
+ self.rd_state = State::StreamClosed;
+ }
+
+ // Send TLS 'CloseNotify' alert.
+ ready!(self.poll_shutdown(cx))?;
+ // Wait for 'CloseNotify', shut down TCP stream, wait for TCP FIN packet.
+ ready!(self.poll_io(cx, Flow::Read))?;
+
+ assert_eq!(self.rd_state, State::TcpClosed);
+ assert_eq!(self.wr_state, State::TcpClosed);
+
+ Poll::Ready(Ok(()))
+ }
+}
+
+#[derive(Debug)]
+pub struct ReadHalf {
+ shared: Arc<Shared>,
+}
+
+impl ReadHalf {
+ pub fn reunite(self, wr: WriteHalf) -> TlsStream {
+ assert!(Arc::ptr_eq(&self.shared, &wr.shared));
+ drop(wr); // Drop `wr`, so only one strong reference to `shared` remains.
+
+ Arc::try_unwrap(self.shared)
+ .unwrap_or_else(|_| panic!("Arc::<Shared>::try_unwrap() failed"))
+ .tls_stream
+ .into_inner()
+ }
+}
+
+impl AsyncRead for ReadHalf {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ self
+ .shared
+ .poll_with_shared_waker(cx, Flow::Read, move |tls, cx| {
+ tls.poll_read(cx, buf)
+ })
+ }
+}
+
+#[derive(Debug)]
+pub struct WriteHalf {
+ shared: Arc<Shared>,
+}
+
+impl AsyncWrite for WriteHalf {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self
+ .shared
+ .poll_with_shared_waker(cx, Flow::Write, move |tls, cx| {
+ tls.poll_write(cx, buf)
+ })
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<()>> {
+ self
+ .shared
+ .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_flush(cx))
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<()>> {
+ self
+ .shared
+ .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_shutdown(cx))
+ }
+}
+
+#[derive(Debug)]
+struct Shared {
+ tls_stream: Mutex<TlsStream>,
+ rd_waker: AtomicWaker,
+ wr_waker: AtomicWaker,
+}
+
+impl Shared {
+ fn new(tls_stream: TlsStream) -> Arc<Self> {
+ let self_ = Self {
+ tls_stream: Mutex::new(tls_stream),
+ rd_waker: AtomicWaker::new(),
+ wr_waker: AtomicWaker::new(),
+ };
+ Arc::new(self_)
+ }
+
+ fn poll_with_shared_waker<R>(
+ self: &Arc<Self>,
+ cx: &mut Context<'_>,
+ flow: Flow,
+ mut f: impl FnMut(Pin<&mut TlsStream>, &mut Context<'_>) -> R,
+ ) -> R {
+ match flow {
+ Flow::Read => self.rd_waker.register(cx.waker()),
+ Flow::Write => self.wr_waker.register(cx.waker()),
+ }
+
+ let shared_waker = self.new_shared_waker();
+ let mut cx = Context::from_waker(&shared_waker);
+
+ let mut tls_stream = self.tls_stream.lock();
+ f(Pin::new(&mut tls_stream), &mut cx)
+ }
+
+ const SHARED_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
+ Self::clone_shared_waker,
+ Self::wake_shared_waker,
+ Self::wake_shared_waker_by_ref,
+ Self::drop_shared_waker,
+ );
+
+ fn new_shared_waker(self: &Arc<Self>) -> Waker {
+ let self_weak = Arc::downgrade(self);
+ let self_ptr = self_weak.into_raw() as *const ();
+ let raw_waker = RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE);
+ unsafe { Waker::from_raw(raw_waker) }
+ }
+
+ fn clone_shared_waker(self_ptr: *const ()) -> RawWaker {
+ let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) };
+ let ptr1 = self_weak.clone().into_raw();
+ let ptr2 = self_weak.into_raw();
+ assert!(ptr1 == ptr2);
+ RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE)
+ }
+
+ fn wake_shared_waker(self_ptr: *const ()) {
+ Self::wake_shared_waker_by_ref(self_ptr);
+ Self::drop_shared_waker(self_ptr);
+ }
+
+ fn wake_shared_waker_by_ref(self_ptr: *const ()) {
+ let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) };
+ if let Some(self_arc) = Weak::upgrade(&self_weak) {
+ self_arc.rd_waker.wake();
+ self_arc.wr_waker.wake();
+ }
+ self_weak.into_raw();
+ }
+
+ fn drop_shared_waker(self_ptr: *const ()) {
+ let _ = unsafe { Weak::from_raw(self_ptr as *const Self) };
+ }
+}
+
+struct ImplementReadTrait<'a, T>(&'a mut T);
+
+impl Read for ImplementReadTrait<'_, TcpStream> {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.0.try_read(buf)
+ }
+}
+
+struct ImplementWriteTrait<'a, T>(&'a mut T);
+
+impl Write for ImplementWriteTrait<'_, TcpStream> {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.0.try_write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+pub fn init<P: NetPermissions + 'static>() -> Vec<OpPair> {
+ vec![
+ ("op_start_tls", op_async(op_start_tls::<P>)),
+ ("op_connect_tls", op_async(op_connect_tls::<P>)),
+ ("op_listen_tls", op_sync(op_listen_tls::<P>)),
+ ("op_accept_tls", op_async(op_accept_tls)),
+ ]
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ConnectTlsArgs {
+ transport: String,
+ hostname: String,
+ port: u16,
+ cert_file: Option<String>,
+ cert_chain: Option<String>,
+ private_key: Option<String>,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct StartTlsArgs {
+ rid: ResourceId,
+ cert_file: Option<String>,
+ hostname: String,
+}
+
+async fn op_start_tls<NP>(
+ state: Rc<RefCell<OpState>>,
+ args: StartTlsArgs,
+ _: (),
+) -> Result<OpConn, AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ let rid = args.rid;
+ let hostname = match &*args.hostname {
+ "" => "localhost",
+ n => n,
+ };
+ let cert_file = args.cert_file.as_deref();
+ {
+ super::check_unstable2(&state, "Deno.startTls");
+ let mut s = state.borrow_mut();
+ let permissions = s.borrow_mut::<NP>();
+ permissions.check_net(&(hostname, Some(0)))?;
+ if let Some(path) = cert_file {
+ permissions.check_read(Path::new(path))?;
+ }
+ }
+
+ let ca_data = match cert_file {
+ Some(path) => {
+ let mut buf = Vec::new();
+ File::open(path)?.read_to_end(&mut buf)?;
+ Some(buf)
+ }
+ _ => None,
+ };
+
+ let hostname_dns = DNSNameRef::try_from_ascii_str(hostname)
+ .map_err(|_| invalid_hostname(hostname))?;
+
+ let unsafely_ignore_certificate_errors = state
+ .borrow()
+ .borrow::<UnsafelyIgnoreCertificateErrors>()
+ .0
+ .clone();
+
+ // TODO(@justinmchase): Ideally the certificate store is created once
+ // and not cloned. The store should be wrapped in Arc<T> to reduce
+ // copying memory unnecessarily.
+ let root_cert_store = state
+ .borrow()
+ .borrow::<DefaultTlsOptions>()
+ .root_cert_store
+ .clone();
+ let resource_rc = state
+ .borrow_mut()
+ .resource_table
+ .take::<TcpStreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let resource = Rc::try_unwrap(resource_rc)
+ .expect("Only a single use of this resource should happen");
+ let (read_half, write_half) = resource.into_inner();
+ let tcp_stream = read_half.reunite(write_half)?;
+
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+
+ let tls_config = Arc::new(create_client_config(
+ root_cert_store,
+ ca_data,
+ unsafely_ignore_certificate_errors,
+ )?);
+ let tls_stream =
+ TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns);
+
+ let rid = {
+ let mut state_ = state.borrow_mut();
+ state_
+ .resource_table
+ .add(TlsStreamResource::new(tls_stream.into_split()))
+ };
+
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: local_addr.ip().to_string(),
+ port: local_addr.port(),
+ })),
+ remote_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: remote_addr.ip().to_string(),
+ port: remote_addr.port(),
+ })),
+ })
+}
+
+async fn op_connect_tls<NP>(
+ state: Rc<RefCell<OpState>>,
+ args: ConnectTlsArgs,
+ _: (),
+) -> Result<OpConn, AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ assert_eq!(args.transport, "tcp");
+ let hostname = match &*args.hostname {
+ "" => "localhost",
+ n => n,
+ };
+ let port = args.port;
+ let cert_file = args.cert_file.as_deref();
+ let unsafely_ignore_certificate_errors = state
+ .borrow()
+ .borrow::<UnsafelyIgnoreCertificateErrors>()
+ .0
+ .clone();
+
+ if args.cert_chain.is_some() {
+ super::check_unstable2(&state, "ConnectTlsOptions.certChain");
+ }
+ if args.private_key.is_some() {
+ super::check_unstable2(&state, "ConnectTlsOptions.privateKey");
+ }
+
+ {
+ let mut s = state.borrow_mut();
+ let permissions = s.borrow_mut::<NP>();
+ permissions.check_net(&(hostname, Some(port)))?;
+ if let Some(path) = cert_file {
+ permissions.check_read(Path::new(path))?;
+ }
+ }
+
+ let ca_data = match cert_file {
+ Some(path) => {
+ let mut buf = Vec::new();
+ File::open(path)?.read_to_end(&mut buf)?;
+ Some(buf)
+ }
+ _ => None,
+ };
+
+ let root_cert_store = state
+ .borrow()
+ .borrow::<DefaultTlsOptions>()
+ .root_cert_store
+ .clone();
+ let hostname_dns = DNSNameRef::try_from_ascii_str(hostname)
+ .map_err(|_| invalid_hostname(hostname))?;
+
+ let connect_addr = resolve_addr(hostname, port)
+ .await?
+ .next()
+ .ok_or_else(|| generic_error("No resolved address found"))?;
+ let tcp_stream = TcpStream::connect(connect_addr).await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+ let mut tls_config = create_client_config(
+ root_cert_store,
+ ca_data,
+ unsafely_ignore_certificate_errors,
+ )?;
+
+ if args.cert_chain.is_some() || args.private_key.is_some() {
+ let cert_chain = args
+ .cert_chain
+ .ok_or_else(|| type_error("No certificate chain provided"))?;
+ let private_key = args
+ .private_key
+ .ok_or_else(|| type_error("No private key provided"))?;
+
+ // The `remove` is safe because load_private_keys checks that there is at least one key.
+ let private_key = load_private_keys(private_key.as_bytes())?.remove(0);
+
+ tls_config.set_single_client_cert(
+ load_certs(&mut cert_chain.as_bytes())?,
+ private_key,
+ )?;
+ }
+
+ let tls_config = Arc::new(tls_config);
+
+ let tls_stream =
+ TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns);
+
+ let rid = {
+ let mut state_ = state.borrow_mut();
+ state_
+ .resource_table
+ .add(TlsStreamResource::new(tls_stream.into_split()))
+ };
+
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: local_addr.ip().to_string(),
+ port: local_addr.port(),
+ })),
+ remote_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: remote_addr.ip().to_string(),
+ port: remote_addr.port(),
+ })),
+ })
+}
+
+fn load_certs(reader: &mut dyn BufRead) -> Result<Vec<Certificate>, AnyError> {
+ let certs = certs(reader)
+ .map_err(|_| custom_error("InvalidData", "Unable to decode certificate"))?;
+
+ if certs.is_empty() {
+ let e = custom_error("InvalidData", "No certificates found in cert file");
+ return Err(e);
+ }
+
+ Ok(certs)
+}
+
+fn load_certs_from_file(path: &str) -> Result<Vec<Certificate>, AnyError> {
+ let cert_file = File::open(path)?;
+ let reader = &mut BufReader::new(cert_file);
+ load_certs(reader)
+}
+
+fn key_decode_err() -> AnyError {
+ custom_error("InvalidData", "Unable to decode key")
+}
+
+fn key_not_found_err() -> AnyError {
+ custom_error("InvalidData", "No keys found in key file")
+}
+
+/// Starts with -----BEGIN RSA PRIVATE KEY-----
+fn load_rsa_keys(mut bytes: &[u8]) -> Result<Vec<PrivateKey>, AnyError> {
+ let keys = rsa_private_keys(&mut bytes).map_err(|_| key_decode_err())?;
+ Ok(keys)
+}
+
+/// Starts with -----BEGIN PRIVATE KEY-----
+fn load_pkcs8_keys(mut bytes: &[u8]) -> Result<Vec<PrivateKey>, AnyError> {
+ let keys = pkcs8_private_keys(&mut bytes).map_err(|_| key_decode_err())?;
+ Ok(keys)
+}
+
+fn load_private_keys(bytes: &[u8]) -> Result<Vec<PrivateKey>, AnyError> {
+ let mut keys = load_rsa_keys(bytes)?;
+
+ if keys.is_empty() {
+ keys = load_pkcs8_keys(bytes)?;
+ }
+
+ if keys.is_empty() {
+ return Err(key_not_found_err());
+ }
+
+ Ok(keys)
+}
+
+fn load_private_keys_from_file(
+ path: &str,
+) -> Result<Vec<PrivateKey>, AnyError> {
+ let key_bytes = std::fs::read(path)?;
+ load_private_keys(&key_bytes)
+}
+
+pub struct TlsListenerResource {
+ tcp_listener: AsyncRefCell<TcpListener>,
+ tls_config: Arc<ServerConfig>,
+ cancel_handle: CancelHandle,
+}
+
+impl Resource for TlsListenerResource {
+ fn name(&self) -> Cow<str> {
+ "tlsListener".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel_handle.cancel();
+ }
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ListenTlsArgs {
+ transport: String,
+ hostname: String,
+ port: u16,
+ cert_file: String,
+ key_file: String,
+ alpn_protocols: Option<Vec<String>>,
+}
+
+fn op_listen_tls<NP>(
+ state: &mut OpState,
+ args: ListenTlsArgs,
+ _: (),
+) -> Result<OpConn, AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ assert_eq!(args.transport, "tcp");
+ let hostname = &*args.hostname;
+ let port = args.port;
+ let cert_file = &*args.cert_file;
+ let key_file = &*args.key_file;
+
+ {
+ let permissions = state.borrow_mut::<NP>();
+ permissions.check_net(&(hostname, Some(port)))?;
+ permissions.check_read(Path::new(cert_file))?;
+ permissions.check_read(Path::new(key_file))?;
+ }
+
+ let mut tls_config = ServerConfig::new(NoClientAuth::new());
+ if let Some(alpn_protocols) = args.alpn_protocols {
+ super::check_unstable(state, "Deno.listenTls#alpn_protocols");
+ tls_config.alpn_protocols =
+ alpn_protocols.into_iter().map(|s| s.into_bytes()).collect();
+ }
+ tls_config
+ .set_single_cert(
+ load_certs_from_file(cert_file)?,
+ load_private_keys_from_file(key_file)?.remove(0),
+ )
+ .expect("invalid key or certificate");
+
+ let bind_addr = resolve_addr_sync(hostname, port)?
+ .next()
+ .ok_or_else(|| generic_error("No resolved address found"))?;
+ let std_listener = std::net::TcpListener::bind(bind_addr)?;
+ std_listener.set_nonblocking(true)?;
+ let tcp_listener = TcpListener::from_std(std_listener)?;
+ let local_addr = tcp_listener.local_addr()?;
+
+ let tls_listener_resource = TlsListenerResource {
+ tcp_listener: AsyncRefCell::new(tcp_listener),
+ tls_config: Arc::new(tls_config),
+ cancel_handle: Default::default(),
+ };
+
+ let rid = state.resource_table.add(tls_listener_resource);
+
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: local_addr.ip().to_string(),
+ port: local_addr.port(),
+ })),
+ remote_addr: None,
+ })
+}
+
+async fn op_accept_tls(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ _: (),
+) -> Result<OpConn, AnyError> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<TlsListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+
+ let cancel_handle = RcRef::map(&resource, |r| &r.cancel_handle);
+ let tcp_listener = RcRef::map(&resource, |r| &r.tcp_listener)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
+
+ let (tcp_stream, remote_addr) =
+ match tcp_listener.accept().try_or_cancel(&cancel_handle).await {
+ Ok(tuple) => tuple,
+ Err(err) if err.kind() == ErrorKind::Interrupted => {
+ // FIXME(bartlomieju): compatibility with current JS implementation.
+ return Err(bad_resource("Listener has been closed"));
+ }
+ Err(err) => return Err(err.into()),
+ };
+
+ let local_addr = tcp_stream.local_addr()?;
+
+ let tls_stream = TlsStream::new_server_side(tcp_stream, &resource.tls_config);
+
+ let rid = {
+ let mut state_ = state.borrow_mut();
+ state_
+ .resource_table
+ .add(TlsStreamResource::new(tls_stream.into_split()))
+ };
+
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: local_addr.ip().to_string(),
+ port: local_addr.port(),
+ })),
+ remote_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: remote_addr.ip().to_string(),
+ port: remote_addr.port(),
+ })),
+ })
+}
diff --git a/ext/net/ops_unix.rs b/ext/net/ops_unix.rs
new file mode 100644
index 000000000..9dfcc231e
--- /dev/null
+++ b/ext/net/ops_unix.rs
@@ -0,0 +1,180 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+use crate::io::UnixStreamResource;
+use crate::ops::AcceptArgs;
+use crate::ops::OpAddr;
+use crate::ops::OpConn;
+use crate::ops::OpPacket;
+use crate::ops::ReceiveArgs;
+use deno_core::error::bad_resource;
+use deno_core::error::custom_error;
+use deno_core::error::null_opbuf;
+use deno_core::error::AnyError;
+use deno_core::AsyncRefCell;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
+use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
+use deno_core::ZeroCopyBuf;
+use serde::Deserialize;
+use serde::Serialize;
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::fs::remove_file;
+use std::path::Path;
+use std::rc::Rc;
+use tokio::net::UnixDatagram;
+use tokio::net::UnixListener;
+pub use tokio::net::UnixStream;
+
+/// A utility function to map OsStrings to Strings
+pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> {
+ s.into_string().map_err(|s| {
+ let message = format!("File name or path {:?} is not valid UTF-8", s);
+ custom_error("InvalidData", message)
+ })
+}
+
+struct UnixListenerResource {
+ listener: AsyncRefCell<UnixListener>,
+ cancel: CancelHandle,
+}
+
+impl Resource for UnixListenerResource {
+ fn name(&self) -> Cow<str> {
+ "unixListener".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
+}
+
+pub struct UnixDatagramResource {
+ pub socket: AsyncRefCell<UnixDatagram>,
+ pub cancel: CancelHandle,
+}
+
+impl Resource for UnixDatagramResource {
+ fn name(&self) -> Cow<str> {
+ "unixDatagram".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
+}
+
+#[derive(Serialize)]
+pub struct UnixAddr {
+ pub path: Option<String>,
+}
+
+#[derive(Deserialize)]
+pub struct UnixListenArgs {
+ pub path: String,
+}
+
+pub(crate) async fn accept_unix(
+ state: Rc<RefCell<OpState>>,
+ args: AcceptArgs,
+ _: (),
+) -> Result<OpConn, AnyError> {
+ let rid = args.rid;
+
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<UnixListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let listener = RcRef::map(&resource, |r| &r.listener)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Listener already in use"))?;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let (unix_stream, _socket_addr) =
+ listener.accept().try_or_cancel(cancel).await?;
+
+ let local_addr = unix_stream.local_addr()?;
+ let remote_addr = unix_stream.peer_addr()?;
+ let resource = UnixStreamResource::new(unix_stream.into_split());
+ let mut state = state.borrow_mut();
+ let rid = state.resource_table.add(resource);
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Unix(UnixAddr {
+ path: local_addr.as_pathname().and_then(pathstring),
+ })),
+ remote_addr: Some(OpAddr::Unix(UnixAddr {
+ path: remote_addr.as_pathname().and_then(pathstring),
+ })),
+ })
+}
+
+pub(crate) async fn receive_unix_packet(
+ state: Rc<RefCell<OpState>>,
+ args: ReceiveArgs,
+ buf: Option<ZeroCopyBuf>,
+) -> Result<OpPacket, AnyError> {
+ let mut buf = buf.ok_or_else(null_opbuf)?;
+
+ let rid = args.rid;
+
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<UnixDatagramResource>(rid)
+ .ok_or_else(|| bad_resource("Socket has been closed"))?;
+ let socket = RcRef::map(&resource, |r| &r.socket)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let (size, remote_addr) =
+ socket.recv_from(&mut buf).try_or_cancel(cancel).await?;
+ Ok(OpPacket {
+ size,
+ remote_addr: OpAddr::UnixPacket(UnixAddr {
+ path: remote_addr.as_pathname().and_then(pathstring),
+ }),
+ })
+}
+
+pub fn listen_unix(
+ state: &mut OpState,
+ addr: &Path,
+) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> {
+ if addr.exists() {
+ remove_file(&addr).unwrap();
+ }
+ let listener = UnixListener::bind(&addr)?;
+ let local_addr = listener.local_addr()?;
+ let listener_resource = UnixListenerResource {
+ listener: AsyncRefCell::new(listener),
+ cancel: Default::default(),
+ };
+ let rid = state.resource_table.add(listener_resource);
+
+ Ok((rid, local_addr))
+}
+
+pub fn listen_unix_packet(
+ state: &mut OpState,
+ addr: &Path,
+) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> {
+ if addr.exists() {
+ remove_file(&addr).unwrap();
+ }
+ let socket = UnixDatagram::bind(&addr)?;
+ let local_addr = socket.local_addr()?;
+ let datagram_resource = UnixDatagramResource {
+ socket: AsyncRefCell::new(socket),
+ cancel: Default::default(),
+ };
+ let rid = state.resource_table.add(datagram_resource);
+
+ Ok((rid, local_addr))
+}
+
+pub fn pathstring(pathname: &Path) -> Option<String> {
+ into_string(pathname.into()).ok()
+}
diff --git a/ext/net/resolve_addr.rs b/ext/net/resolve_addr.rs
new file mode 100644
index 000000000..ebf1374d1
--- /dev/null
+++ b/ext/net/resolve_addr.rs
@@ -0,0 +1,156 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+use deno_core::error::AnyError;
+use std::net::SocketAddr;
+use std::net::ToSocketAddrs;
+use tokio::net::lookup_host;
+
+/// Resolve network address *asynchronously*.
+pub async fn resolve_addr(
+ hostname: &str,
+ port: u16,
+) -> Result<impl Iterator<Item = SocketAddr> + '_, AnyError> {
+ let addr_port_pair = make_addr_port_pair(hostname, port);
+ let result = lookup_host(addr_port_pair).await?;
+ Ok(result)
+}
+
+/// Resolve network address *synchronously*.
+pub fn resolve_addr_sync(
+ hostname: &str,
+ port: u16,
+) -> Result<impl Iterator<Item = SocketAddr>, AnyError> {
+ let addr_port_pair = make_addr_port_pair(hostname, port);
+ let result = addr_port_pair.to_socket_addrs()?;
+ Ok(result)
+}
+
+fn make_addr_port_pair(hostname: &str, port: u16) -> (&str, u16) {
+ // Default to localhost if given just the port. Example: ":80"
+ if hostname.is_empty() {
+ return ("0.0.0.0", port);
+ }
+
+ // If this looks like an ipv6 IP address. Example: "[2001:db8::1]"
+ // Then we remove the brackets.
+ let addr = hostname.trim_start_matches('[').trim_end_matches(']');
+ (addr, port)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::net::Ipv4Addr;
+ use std::net::Ipv6Addr;
+ use std::net::SocketAddrV4;
+ use std::net::SocketAddrV6;
+
+ #[tokio::test]
+ async fn resolve_addr1() {
+ let expected = vec![SocketAddr::V4(SocketAddrV4::new(
+ Ipv4Addr::new(127, 0, 0, 1),
+ 80,
+ ))];
+ let actual = resolve_addr("127.0.0.1", 80)
+ .await
+ .unwrap()
+ .collect::<Vec<_>>();
+ assert_eq!(actual, expected);
+ }
+
+ #[tokio::test]
+ async fn resolve_addr2() {
+ let expected = vec![SocketAddr::V4(SocketAddrV4::new(
+ Ipv4Addr::new(0, 0, 0, 0),
+ 80,
+ ))];
+ let actual = resolve_addr("", 80).await.unwrap().collect::<Vec<_>>();
+ assert_eq!(actual, expected);
+ }
+
+ #[tokio::test]
+ async fn resolve_addr3() {
+ let expected = vec![SocketAddr::V4(SocketAddrV4::new(
+ Ipv4Addr::new(192, 0, 2, 1),
+ 25,
+ ))];
+ let actual = resolve_addr("192.0.2.1", 25)
+ .await
+ .unwrap()
+ .collect::<Vec<_>>();
+ assert_eq!(actual, expected);
+ }
+
+ #[tokio::test]
+ async fn resolve_addr_ipv6() {
+ let expected = vec![SocketAddr::V6(SocketAddrV6::new(
+ Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1),
+ 8080,
+ 0,
+ 0,
+ ))];
+ let actual = resolve_addr("[2001:db8::1]", 8080)
+ .await
+ .unwrap()
+ .collect::<Vec<_>>();
+ assert_eq!(actual, expected);
+ }
+
+ #[tokio::test]
+ async fn resolve_addr_err() {
+ assert!(resolve_addr("INVALID ADDR", 1234).await.is_err());
+ }
+
+ #[test]
+ fn resolve_addr_sync1() {
+ let expected = vec![SocketAddr::V4(SocketAddrV4::new(
+ Ipv4Addr::new(127, 0, 0, 1),
+ 80,
+ ))];
+ let actual = resolve_addr_sync("127.0.0.1", 80)
+ .unwrap()
+ .collect::<Vec<_>>();
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn resolve_addr_sync2() {
+ let expected = vec![SocketAddr::V4(SocketAddrV4::new(
+ Ipv4Addr::new(0, 0, 0, 0),
+ 80,
+ ))];
+ let actual = resolve_addr_sync("", 80).unwrap().collect::<Vec<_>>();
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn resolve_addr_sync3() {
+ let expected = vec![SocketAddr::V4(SocketAddrV4::new(
+ Ipv4Addr::new(192, 0, 2, 1),
+ 25,
+ ))];
+ let actual = resolve_addr_sync("192.0.2.1", 25)
+ .unwrap()
+ .collect::<Vec<_>>();
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn resolve_addr_sync_ipv6() {
+ let expected = vec![SocketAddr::V6(SocketAddrV6::new(
+ Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1),
+ 8080,
+ 0,
+ 0,
+ ))];
+ let actual = resolve_addr_sync("[2001:db8::1]", 8080)
+ .unwrap()
+ .collect::<Vec<_>>();
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn resolve_addr_sync_err() {
+ assert!(resolve_addr_sync("INVALID ADDR", 1234).is_err());
+ }
+}