summaryrefslogtreecommitdiff
path: root/extensions
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2021-06-29 01:43:03 +0200
committerGitHub <noreply@github.com>2021-06-29 01:43:03 +0200
commit38a7128cdd6f3308ba3c13cfb0b0d4ef925a44c3 (patch)
tree8f0c86028d9ba0266f1846e7f3611f7049cb43a8 /extensions
parent30cba2484815f712502ae8937a25afa13aba0818 (diff)
feat: Add "deno_net" extension (#11150)
This commits moves implementation of net related APIs available on "Deno" namespace to "deno_net" extension. Following APIs were moved: - Deno.listen() - Deno.connect() - Deno.listenTls() - Deno.serveHttp() - Deno.shutdown() - Deno.resolveDns() - Deno.listenDatagram() - Deno.startTls() - Deno.Conn - Deno.Listener - Deno.DatagramConn
Diffstat (limited to 'extensions')
-rw-r--r--extensions/net/01_net.js234
-rw-r--r--extensions/net/02_tls.js85
-rw-r--r--extensions/net/03_http.js251
-rw-r--r--extensions/net/04_net_unstable.js49
-rw-r--r--extensions/net/Cargo.toml31
-rw-r--r--extensions/net/README.md30
-rw-r--r--extensions/net/io.rs232
-rw-r--r--extensions/net/lib.deno_net.d.ts149
-rw-r--r--extensions/net/lib.deno_net.unstable.d.ts262
-rw-r--r--extensions/net/lib.rs113
-rw-r--r--extensions/net/ops.rs795
-rw-r--r--extensions/net/ops_http.rs577
-rw-r--r--extensions/net/ops_tls.rs1024
-rw-r--r--extensions/net/ops_unix.rs180
-rw-r--r--extensions/net/resolve_addr.rs156
15 files changed, 4168 insertions, 0 deletions
diff --git a/extensions/net/01_net.js b/extensions/net/01_net.js
new file mode 100644
index 000000000..9a531bd94
--- /dev/null
+++ b/extensions/net/01_net.js
@@ -0,0 +1,234 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+"use strict";
+
+((window) => {
+ const core = window.Deno.core;
+ const { BadResource } = core;
+
+ async function read(
+ rid,
+ buffer,
+ ) {
+ if (buffer.length === 0) {
+ return 0;
+ }
+ const nread = await core.opAsync("op_net_read_async", rid, buffer);
+ return nread === 0 ? null : nread;
+ }
+
+ async function write(rid, data) {
+ return await core.opAsync("op_net_write_async", rid, data);
+ }
+
+ function shutdown(rid) {
+ return core.opAsync("op_net_shutdown", rid);
+ }
+
+ function opAccept(rid, transport) {
+ return core.opAsync("op_accept", { rid, transport });
+ }
+
+ function opListen(args) {
+ return core.opSync("op_listen", args);
+ }
+
+ function opConnect(args) {
+ return core.opAsync("op_connect", args);
+ }
+
+ function opReceive(rid, transport, zeroCopy) {
+ return core.opAsync(
+ "op_datagram_receive",
+ { rid, transport },
+ zeroCopy,
+ );
+ }
+
+ function opSend(args, zeroCopy) {
+ return core.opAsync("op_datagram_send", args, zeroCopy);
+ }
+
+ function resolveDns(query, recordType, options) {
+ return core.opAsync("op_dns_resolve", { query, recordType, options });
+ }
+
+ class Conn {
+ #rid = 0;
+ #remoteAddr = null;
+ #localAddr = null;
+ constructor(rid, remoteAddr, localAddr) {
+ this.#rid = rid;
+ this.#remoteAddr = remoteAddr;
+ this.#localAddr = localAddr;
+ }
+
+ get rid() {
+ return this.#rid;
+ }
+
+ get remoteAddr() {
+ return this.#remoteAddr;
+ }
+
+ get localAddr() {
+ return this.#localAddr;
+ }
+
+ write(p) {
+ return write(this.rid, p);
+ }
+
+ read(p) {
+ return read(this.rid, p);
+ }
+
+ close() {
+ core.close(this.rid);
+ }
+
+ closeWrite() {
+ return shutdown(this.rid);
+ }
+ }
+
+ class Listener {
+ #rid = 0;
+ #addr = null;
+
+ constructor(rid, addr) {
+ this.#rid = rid;
+ this.#addr = addr;
+ }
+
+ get rid() {
+ return this.#rid;
+ }
+
+ get addr() {
+ return this.#addr;
+ }
+
+ async accept() {
+ const res = await opAccept(this.rid, this.addr.transport);
+ return new Conn(res.rid, res.remoteAddr, res.localAddr);
+ }
+
+ async next() {
+ let conn;
+ try {
+ conn = await this.accept();
+ } catch (error) {
+ if (error instanceof BadResource) {
+ return { value: undefined, done: true };
+ }
+ throw error;
+ }
+ return { value: conn, done: false };
+ }
+
+ return(value) {
+ this.close();
+ return Promise.resolve({ value, done: true });
+ }
+
+ close() {
+ core.close(this.rid);
+ }
+
+ [Symbol.asyncIterator]() {
+ return this;
+ }
+ }
+
+ class Datagram {
+ #rid = 0;
+ #addr = null;
+
+ constructor(rid, addr, bufSize = 1024) {
+ this.#rid = rid;
+ this.#addr = addr;
+ this.bufSize = bufSize;
+ }
+
+ get rid() {
+ return this.#rid;
+ }
+
+ get addr() {
+ return this.#addr;
+ }
+
+ async receive(p) {
+ const buf = p || new Uint8Array(this.bufSize);
+ const { size, remoteAddr } = await opReceive(
+ this.rid,
+ this.addr.transport,
+ buf,
+ );
+ const sub = buf.subarray(0, size);
+ return [sub, remoteAddr];
+ }
+
+ send(p, addr) {
+ const remote = { hostname: "127.0.0.1", ...addr };
+
+ const args = { ...remote, rid: this.rid };
+ return opSend(args, p);
+ }
+
+ close() {
+ core.close(this.rid);
+ }
+
+ async *[Symbol.asyncIterator]() {
+ while (true) {
+ try {
+ yield await this.receive();
+ } catch (err) {
+ if (err instanceof BadResource) {
+ break;
+ }
+ throw err;
+ }
+ }
+ }
+ }
+
+ function listen({ hostname, ...options }) {
+ const res = opListen({
+ transport: "tcp",
+ hostname: typeof hostname === "undefined" ? "0.0.0.0" : hostname,
+ ...options,
+ });
+
+ return new Listener(res.rid, res.localAddr);
+ }
+
+ async function connect(options) {
+ let res;
+
+ if (options.transport === "unix") {
+ res = await opConnect(options);
+ } else {
+ res = await opConnect({
+ transport: "tcp",
+ hostname: "127.0.0.1",
+ ...options,
+ });
+ }
+
+ return new Conn(res.rid, res.remoteAddr, res.localAddr);
+ }
+
+ window.__bootstrap.net = {
+ connect,
+ Conn,
+ opConnect,
+ listen,
+ opListen,
+ Listener,
+ shutdown,
+ Datagram,
+ resolveDns,
+ };
+})(this);
diff --git a/extensions/net/02_tls.js b/extensions/net/02_tls.js
new file mode 100644
index 000000000..4fafe9079
--- /dev/null
+++ b/extensions/net/02_tls.js
@@ -0,0 +1,85 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+"use strict";
+
+((window) => {
+ const core = window.Deno.core;
+ const { Listener, Conn } = window.__bootstrap.net;
+
+ function opConnectTls(
+ args,
+ ) {
+ return core.opAsync("op_connect_tls", args);
+ }
+
+ function opAcceptTLS(rid) {
+ return core.opAsync("op_accept_tls", rid);
+ }
+
+ function opListenTls(args) {
+ return core.opSync("op_listen_tls", args);
+ }
+
+ function opStartTls(args) {
+ return core.opAsync("op_start_tls", args);
+ }
+
+ async function connectTls({
+ port,
+ hostname = "127.0.0.1",
+ transport = "tcp",
+ certFile = undefined,
+ }) {
+ const res = await opConnectTls({
+ port,
+ hostname,
+ transport,
+ certFile,
+ });
+ return new Conn(res.rid, res.remoteAddr, res.localAddr);
+ }
+
+ class TLSListener extends Listener {
+ async accept() {
+ const res = await opAcceptTLS(this.rid);
+ return new Conn(res.rid, res.remoteAddr, res.localAddr);
+ }
+ }
+
+ function listenTls({
+ port,
+ certFile,
+ keyFile,
+ hostname = "0.0.0.0",
+ transport = "tcp",
+ alpnProtocols,
+ }) {
+ const res = opListenTls({
+ port,
+ certFile,
+ keyFile,
+ hostname,
+ transport,
+ alpnProtocols,
+ });
+ return new TLSListener(res.rid, res.localAddr);
+ }
+
+ async function startTls(
+ conn,
+ { hostname = "127.0.0.1", certFile } = {},
+ ) {
+ const res = await opStartTls({
+ rid: conn.rid,
+ hostname,
+ certFile,
+ });
+ return new Conn(res.rid, res.remoteAddr, res.localAddr);
+ }
+
+ window.__bootstrap.tls = {
+ startTls,
+ listenTls,
+ connectTls,
+ TLSListener,
+ };
+})(this);
diff --git a/extensions/net/03_http.js b/extensions/net/03_http.js
new file mode 100644
index 000000000..d5054bd1a
--- /dev/null
+++ b/extensions/net/03_http.js
@@ -0,0 +1,251 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+"use strict";
+
+((window) => {
+ const { InnerBody } = window.__bootstrap.fetchBody;
+ const { Response, fromInnerRequest, toInnerResponse, newInnerRequest } =
+ window.__bootstrap.fetch;
+ const core = window.Deno.core;
+ const { BadResource, Interrupted } = core;
+ const { ReadableStream } = window.__bootstrap.streams;
+ const abortSignal = window.__bootstrap.abortSignal;
+
+ function serveHttp(conn) {
+ const rid = Deno.core.opSync("op_http_start", conn.rid);
+ return new HttpConn(rid);
+ }
+
+ const connErrorSymbol = Symbol("connError");
+
+ class HttpConn {
+ #rid = 0;
+
+ constructor(rid) {
+ this.#rid = rid;
+ }
+
+ /** @returns {number} */
+ get rid() {
+ return this.#rid;
+ }
+
+ /** @returns {Promise<ResponseEvent | null>} */
+ async nextRequest() {
+ let nextRequest;
+ try {
+ nextRequest = await Deno.core.opAsync(
+ "op_http_request_next",
+ this.#rid,
+ );
+ } catch (error) {
+ // A connection error seen here would cause disrupted responses to throw
+ // a generic `BadResource` error. Instead store this error and replace
+ // those with it.
+ this[connErrorSymbol] = error;
+ if (error instanceof BadResource) {
+ return null;
+ } else if (error instanceof Interrupted) {
+ return null;
+ } else if (error.message.includes("connection closed")) {
+ return null;
+ }
+ throw error;
+ }
+ if (nextRequest === null) return null;
+
+ const [
+ requestBodyRid,
+ responseSenderRid,
+ method,
+ headersList,
+ url,
+ ] = nextRequest;
+
+ /** @type {ReadableStream<Uint8Array> | undefined} */
+ let body = null;
+ if (typeof requestBodyRid === "number") {
+ body = createRequestBodyStream(requestBodyRid);
+ }
+
+ const innerRequest = newInnerRequest(
+ method,
+ url,
+ headersList,
+ body !== null ? new InnerBody(body) : null,
+ );
+ const signal = abortSignal.newSignal();
+ const request = fromInnerRequest(innerRequest, signal, "immutable");
+
+ const respondWith = createRespondWith(this, responseSenderRid);
+
+ return { request, respondWith };
+ }
+
+ /** @returns {void} */
+ close() {
+ core.close(this.#rid);
+ }
+
+ [Symbol.asyncIterator]() {
+ // deno-lint-ignore no-this-alias
+ const httpConn = this;
+ return {
+ async next() {
+ const reqEvt = await httpConn.nextRequest();
+ // Change with caution, current form avoids a v8 deopt
+ return { value: reqEvt, done: reqEvt === null };
+ },
+ };
+ }
+ }
+
+ function readRequest(requestRid, zeroCopyBuf) {
+ return Deno.core.opAsync(
+ "op_http_request_read",
+ requestRid,
+ zeroCopyBuf,
+ );
+ }
+
+ function createRespondWith(httpConn, responseSenderRid) {
+ return async function respondWith(resp) {
+ if (resp instanceof Promise) {
+ resp = await resp;
+ }
+
+ if (!(resp instanceof Response)) {
+ throw new TypeError(
+ "First argument to respondWith must be a Response or a promise resolving to a Response.",
+ );
+ }
+
+ const innerResp = toInnerResponse(resp);
+
+ // If response body length is known, it will be sent synchronously in a
+ // single op, in other case a "response body" resource will be created and
+ // we'll be streaming it.
+ /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
+ let respBody = null;
+ if (innerResp.body !== null) {
+ if (innerResp.body.unusable()) throw new TypeError("Body is unusable.");
+ if (innerResp.body.streamOrStatic instanceof ReadableStream) {
+ if (innerResp.body.length === null) {
+ respBody = innerResp.body.stream;
+ } else {
+ const reader = innerResp.body.stream.getReader();
+ const r1 = await reader.read();
+ if (r1.done) {
+ respBody = new Uint8Array(0);
+ } else {
+ respBody = r1.value;
+ const r2 = await reader.read();
+ if (!r2.done) throw new TypeError("Unreachable");
+ }
+ }
+ } else {
+ innerResp.body.streamOrStatic.consumed = true;
+ respBody = innerResp.body.streamOrStatic.body;
+ }
+ } else {
+ respBody = new Uint8Array(0);
+ }
+
+ let responseBodyRid;
+ try {
+ responseBodyRid = await Deno.core.opAsync("op_http_response", [
+ responseSenderRid,
+ innerResp.status ?? 200,
+ innerResp.headerList,
+ ], respBody instanceof Uint8Array ? respBody : null);
+ } catch (error) {
+ const connError = httpConn[connErrorSymbol];
+ if (error instanceof BadResource && connError != null) {
+ // deno-lint-ignore no-ex-assign
+ error = new connError.constructor(connError.message);
+ }
+ if (respBody !== null && respBody instanceof ReadableStream) {
+ await respBody.cancel(error);
+ }
+ throw error;
+ }
+
+ // If `respond` returns a responseBodyRid, we should stream the body
+ // to that resource.
+ if (responseBodyRid !== null) {
+ try {
+ if (respBody === null || !(respBody instanceof ReadableStream)) {
+ throw new TypeError("Unreachable");
+ }
+ const reader = respBody.getReader();
+ while (true) {
+ const { value, done } = await reader.read();
+ if (done) break;
+ if (!(value instanceof Uint8Array)) {
+ await reader.cancel(new TypeError("Value not a Uint8Array"));
+ break;
+ }
+ try {
+ await Deno.core.opAsync(
+ "op_http_response_write",
+ responseBodyRid,
+ value,
+ );
+ } catch (error) {
+ const connError = httpConn[connErrorSymbol];
+ if (error instanceof BadResource && connError != null) {
+ // deno-lint-ignore no-ex-assign
+ error = new connError.constructor(connError.message);
+ }
+ await reader.cancel(error);
+ throw error;
+ }
+ }
+ } finally {
+ // Once all chunks are sent, and the request body is closed, we can
+ // close the response body.
+ try {
+ await Deno.core.opAsync("op_http_response_close", responseBodyRid);
+ } catch { /* pass */ }
+ }
+ }
+ };
+ }
+
+ function createRequestBodyStream(requestBodyRid) {
+ return new ReadableStream({
+ type: "bytes",
+ async pull(controller) {
+ try {
+ // This is the largest possible size for a single packet on a TLS
+ // stream.
+ const chunk = new Uint8Array(16 * 1024 + 256);
+ const read = await readRequest(
+ requestBodyRid,
+ chunk,
+ );
+ if (read > 0) {
+ // We read some data. Enqueue it onto the stream.
+ controller.enqueue(chunk.subarray(0, read));
+ } else {
+ // We have reached the end of the body, so we close the stream.
+ controller.close();
+ core.close(requestBodyRid);
+ }
+ } catch (err) {
+ // There was an error while reading a chunk of the body, so we
+ // error.
+ controller.error(err);
+ controller.close();
+ core.close(requestBodyRid);
+ }
+ },
+ cancel() {
+ core.close(requestBodyRid);
+ },
+ });
+ }
+
+ window.__bootstrap.http = {
+ serveHttp,
+ };
+})(this);
diff --git a/extensions/net/04_net_unstable.js b/extensions/net/04_net_unstable.js
new file mode 100644
index 000000000..ca265bfaa
--- /dev/null
+++ b/extensions/net/04_net_unstable.js
@@ -0,0 +1,49 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+"use strict";
+
+((window) => {
+ const net = window.__bootstrap.net;
+
+ function listen(options) {
+ if (options.transport === "unix") {
+ const res = net.opListen(options);
+ return new net.Listener(res.rid, res.localAddr);
+ } else {
+ return net.listen(options);
+ }
+ }
+
+ function listenDatagram(
+ options,
+ ) {
+ let res;
+ if (options.transport === "unixpacket") {
+ res = net.opListen(options);
+ } else {
+ res = net.opListen({
+ transport: "udp",
+ hostname: "127.0.0.1",
+ ...options,
+ });
+ }
+
+ return new net.Datagram(res.rid, res.localAddr);
+ }
+
+ async function connect(
+ options,
+ ) {
+ if (options.transport === "unix") {
+ const res = await net.opConnect(options);
+ return new net.Conn(res.rid, res.remoteAddr, res.localAddr);
+ } else {
+ return net.connect(options);
+ }
+ }
+
+ window.__bootstrap.netUnstable = {
+ connect,
+ listenDatagram,
+ listen,
+ };
+})(this);
diff --git a/extensions/net/Cargo.toml b/extensions/net/Cargo.toml
new file mode 100644
index 000000000..9d5e97bbb
--- /dev/null
+++ b/extensions/net/Cargo.toml
@@ -0,0 +1,31 @@
+# Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+[package]
+name = "deno_net"
+version = "0.1.0"
+edition = "2018"
+description = "Networking for Deno"
+authors = ["the Deno authors"]
+license = "MIT"
+readme = "README.md"
+repository = "https://github.com/denoland/deno"
+
+[lib]
+path = "lib.rs"
+
+[dependencies]
+deno_core = { version = "0.92.0", path = "../../core" }
+
+bytes = "1"
+log = "0.4.14"
+lazy_static = "1.4.0"
+http = "0.2.3"
+hyper = { version = "0.14.9", features = ["server", "stream", "http1", "http2", "runtime"] }
+rustls = "0.19.0"
+serde = { version = "1.0.125", features = ["derive"] }
+tokio = { version = "1.7.1", features = ["full"] }
+tokio-util = { version = "0.6", features = ["io"] }
+webpki = "0.21.4"
+webpki-roots = "0.21.1"
+trust-dns-proto = "0.20.3"
+trust-dns-resolver = { version = "0.20.3", features = ["tokio-runtime", "serde-config"] }
diff --git a/extensions/net/README.md b/extensions/net/README.md
new file mode 100644
index 000000000..cdd8923e1
--- /dev/null
+++ b/extensions/net/README.md
@@ -0,0 +1,30 @@
+# deno_net
+
+This crate implements networking APIs.
+
+This crate depends on following extensions:
+
+- "deno_web"
+- "deno_fetch"
+
+Following ops are provided:
+
+- "op_net_read_async"
+- "op_net_write_async"
+- "op_net_shutdown"
+- "op_accept"
+- "op_connect"
+- "op_listen"
+- "op_datagram_receive"
+- "op_datagram_send"
+- "op_dns_resolve"
+- "op_start_tls"
+- "op_connect_tls"
+- "op_listen_tls"
+- "op_accept_tls"
+- "op_http_start"
+- "op_http_request_next"
+- "op_http_request_read"
+- "op_http_response"
+- "op_http_response_write"
+- "op_http_response_close"
diff --git a/extensions/net/io.rs b/extensions/net/io.rs
new file mode 100644
index 000000000..fc10d7e99
--- /dev/null
+++ b/extensions/net/io.rs
@@ -0,0 +1,232 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+use crate::ops_tls as tls;
+use deno_core::error::null_opbuf;
+use deno_core::error::AnyError;
+use deno_core::error::{bad_resource_id, not_supported};
+use deno_core::op_async;
+use deno_core::AsyncMutFuture;
+use deno_core::AsyncRefCell;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
+use deno_core::OpPair;
+use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
+use deno_core::ResourceId;
+use deno_core::ZeroCopyBuf;
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::rc::Rc;
+use tokio::io::AsyncRead;
+use tokio::io::AsyncReadExt;
+use tokio::io::AsyncWrite;
+use tokio::io::AsyncWriteExt;
+use tokio::net::tcp;
+
+#[cfg(unix)]
+use tokio::net::unix;
+
+pub fn init() -> Vec<OpPair> {
+ vec![
+ ("op_net_read_async", op_async(op_read_async)),
+ ("op_net_write_async", op_async(op_write_async)),
+ ("op_net_shutdown", op_async(op_shutdown)),
+ ]
+}
+
+/// A full duplex resource has a read and write ends that are completely
+/// independent, like TCP/Unix sockets and TLS streams.
+#[derive(Debug)]
+pub struct FullDuplexResource<R, W> {
+ rd: AsyncRefCell<R>,
+ wr: AsyncRefCell<W>,
+ // When a full-duplex resource is closed, all pending 'read' ops are
+ // canceled, while 'write' ops are allowed to complete. Therefore only
+ // 'read' futures should be attached to this cancel handle.
+ cancel_handle: CancelHandle,
+}
+
+impl<R, W> FullDuplexResource<R, W>
+where
+ R: AsyncRead + Unpin + 'static,
+ W: AsyncWrite + Unpin + 'static,
+{
+ pub fn new((rd, wr): (R, W)) -> Self {
+ Self {
+ rd: rd.into(),
+ wr: wr.into(),
+ cancel_handle: Default::default(),
+ }
+ }
+
+ pub fn into_inner(self) -> (R, W) {
+ (self.rd.into_inner(), self.wr.into_inner())
+ }
+
+ pub fn rd_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<R> {
+ RcRef::map(self, |r| &r.rd).borrow_mut()
+ }
+
+ pub fn wr_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<W> {
+ RcRef::map(self, |r| &r.wr).borrow_mut()
+ }
+
+ pub fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> {
+ RcRef::map(self, |r| &r.cancel_handle)
+ }
+
+ pub fn cancel_read_ops(&self) {
+ self.cancel_handle.cancel()
+ }
+
+ pub async fn read(
+ self: &Rc<Self>,
+ buf: &mut [u8],
+ ) -> Result<usize, AnyError> {
+ let mut rd = self.rd_borrow_mut().await;
+ let nread = rd.read(buf).try_or_cancel(self.cancel_handle()).await?;
+ Ok(nread)
+ }
+
+ pub async fn write(self: &Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
+ let mut wr = self.wr_borrow_mut().await;
+ let nwritten = wr.write(buf).await?;
+ Ok(nwritten)
+ }
+
+ pub async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> {
+ let mut wr = self.wr_borrow_mut().await;
+ wr.shutdown().await?;
+ Ok(())
+ }
+}
+
+pub type TcpStreamResource =
+ FullDuplexResource<tcp::OwnedReadHalf, tcp::OwnedWriteHalf>;
+
+impl Resource for TcpStreamResource {
+ fn name(&self) -> Cow<str> {
+ "tcpStream".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel_read_ops();
+ }
+}
+
+pub type TlsStreamResource = FullDuplexResource<tls::ReadHalf, tls::WriteHalf>;
+
+impl Resource for TlsStreamResource {
+ fn name(&self) -> Cow<str> {
+ "tlsStream".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel_read_ops();
+ }
+}
+
+#[cfg(unix)]
+pub type UnixStreamResource =
+ FullDuplexResource<unix::OwnedReadHalf, unix::OwnedWriteHalf>;
+
+#[cfg(not(unix))]
+pub struct UnixStreamResource;
+
+#[cfg(not(unix))]
+impl UnixStreamResource {
+ pub async fn read(
+ self: &Rc<Self>,
+ _buf: &mut [u8],
+ ) -> Result<usize, AnyError> {
+ unreachable!()
+ }
+ pub async fn write(self: &Rc<Self>, _buf: &[u8]) -> Result<usize, AnyError> {
+ unreachable!()
+ }
+ pub async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> {
+ unreachable!()
+ }
+ pub fn cancel_read_ops(&self) {
+ unreachable!()
+ }
+}
+
+impl Resource for UnixStreamResource {
+ fn name(&self) -> Cow<str> {
+ "unixStream".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel_read_ops();
+ }
+}
+
+async fn op_read_async(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ buf: Option<ZeroCopyBuf>,
+) -> Result<u32, AnyError> {
+ let buf = &mut buf.ok_or_else(null_opbuf)?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get_any(rid)
+ .ok_or_else(bad_resource_id)?;
+ let nread = if let Some(s) = resource.downcast_rc::<TcpStreamResource>() {
+ s.read(buf).await?
+ } else if let Some(s) = resource.downcast_rc::<TlsStreamResource>() {
+ s.read(buf).await?
+ } else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() {
+ s.read(buf).await?
+ } else {
+ return Err(not_supported());
+ };
+ Ok(nread as u32)
+}
+
+async fn op_write_async(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ buf: Option<ZeroCopyBuf>,
+) -> Result<u32, AnyError> {
+ let buf = &buf.ok_or_else(null_opbuf)?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get_any(rid)
+ .ok_or_else(bad_resource_id)?;
+ let nwritten = if let Some(s) = resource.downcast_rc::<TcpStreamResource>() {
+ s.write(buf).await?
+ } else if let Some(s) = resource.downcast_rc::<TlsStreamResource>() {
+ s.write(buf).await?
+ } else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() {
+ s.write(buf).await?
+ } else {
+ return Err(not_supported());
+ };
+ Ok(nwritten as u32)
+}
+
+async fn op_shutdown(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ _: (),
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get_any(rid)
+ .ok_or_else(bad_resource_id)?;
+ if let Some(s) = resource.downcast_rc::<TcpStreamResource>() {
+ s.shutdown().await?;
+ } else if let Some(s) = resource.downcast_rc::<TlsStreamResource>() {
+ s.shutdown().await?;
+ } else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() {
+ s.shutdown().await?;
+ } else {
+ return Err(not_supported());
+ }
+ Ok(())
+}
diff --git a/extensions/net/lib.deno_net.d.ts b/extensions/net/lib.deno_net.d.ts
new file mode 100644
index 000000000..25397f960
--- /dev/null
+++ b/extensions/net/lib.deno_net.d.ts
@@ -0,0 +1,149 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+/// <reference no-default-lib="true" />
+/// <reference lib="esnext" />
+
+declare namespace Deno {
+ export interface NetAddr {
+ transport: "tcp" | "udp";
+ hostname: string;
+ port: number;
+ }
+
+ export interface UnixAddr {
+ transport: "unix" | "unixpacket";
+ path: string;
+ }
+
+ export type Addr = NetAddr | UnixAddr;
+
+ /** A generic network listener for stream-oriented protocols. */
+ export interface Listener extends AsyncIterable<Conn> {
+ /** Waits for and resolves to the next connection to the `Listener`. */
+ accept(): Promise<Conn>;
+ /** Close closes the listener. Any pending accept promises will be rejected
+ * with errors. */
+ close(): void;
+ /** Return the address of the `Listener`. */
+ readonly addr: Addr;
+
+ /** Return the rid of the `Listener`. */
+ readonly rid: number;
+
+ [Symbol.asyncIterator](): AsyncIterableIterator<Conn>;
+ }
+
+ export interface Conn extends Reader, Writer, Closer {
+ /** The local address of the connection. */
+ readonly localAddr: Addr;
+ /** The remote address of the connection. */
+ readonly remoteAddr: Addr;
+ /** The resource ID of the connection. */
+ readonly rid: number;
+ /** Shuts down (`shutdown(2)`) the write side of the connection. Most
+ * callers should just use `close()`. */
+ closeWrite(): Promise<void>;
+ }
+
+ export interface ListenOptions {
+ /** The port to listen on. */
+ port: number;
+ /** A literal IP address or host name that can be resolved to an IP address.
+ * If not specified, defaults to `0.0.0.0`. */
+ hostname?: string;
+ }
+
+ /** Listen announces on the local transport address.
+ *
+ * ```ts
+ * const listener1 = Deno.listen({ port: 80 })
+ * const listener2 = Deno.listen({ hostname: "192.0.2.1", port: 80 })
+ * const listener3 = Deno.listen({ hostname: "[2001:db8::1]", port: 80 });
+ * const listener4 = Deno.listen({ hostname: "golang.org", port: 80, transport: "tcp" });
+ * ```
+ *
+ * Requires `allow-net` permission. */
+ export function listen(
+ options: ListenOptions & { transport?: "tcp" },
+ ): Listener;
+
+ export interface ListenTlsOptions extends ListenOptions {
+ /** Server certificate file. */
+ certFile: string;
+ /** Server public key file. */
+ keyFile: string;
+
+ transport?: "tcp";
+ }
+
+ /** Listen announces on the local transport address over TLS (transport layer
+ * security).
+ *
+ * ```ts
+ * const lstnr = Deno.listenTls({ port: 443, certFile: "./server.crt", keyFile: "./server.key" });
+ * ```
+ *
+ * Requires `allow-net` permission. */
+ export function listenTls(options: ListenTlsOptions): Listener;
+
+ export interface ConnectOptions {
+ /** The port to connect to. */
+ port: number;
+ /** A literal IP address or host name that can be resolved to an IP address.
+ * If not specified, defaults to `127.0.0.1`. */
+ hostname?: string;
+ transport?: "tcp";
+ }
+
+ /**
+ * Connects to the hostname (default is "127.0.0.1") and port on the named
+ * transport (default is "tcp"), and resolves to the connection (`Conn`).
+ *
+ * ```ts
+ * const conn1 = await Deno.connect({ port: 80 });
+ * const conn2 = await Deno.connect({ hostname: "192.0.2.1", port: 80 });
+ * const conn3 = await Deno.connect({ hostname: "[2001:db8::1]", port: 80 });
+ * const conn4 = await Deno.connect({ hostname: "golang.org", port: 80, transport: "tcp" });
+ * ```
+ *
+ * Requires `allow-net` permission for "tcp". */
+ export function connect(options: ConnectOptions): Promise<Conn>;
+
+ export interface ConnectTlsOptions {
+ /** The port to connect to. */
+ port: number;
+ /** A literal IP address or host name that can be resolved to an IP address.
+ * If not specified, defaults to `127.0.0.1`. */
+ hostname?: string;
+ /** Server certificate file. */
+ certFile?: string;
+ }
+
+ /** Establishes a secure connection over TLS (transport layer security) using
+ * an optional cert file, hostname (default is "127.0.0.1") and port. The
+ * cert file is optional and if not included Mozilla's root certificates will
+ * be used (see also https://github.com/ctz/webpki-roots for specifics)
+ *
+ * ```ts
+ * const conn1 = await Deno.connectTls({ port: 80 });
+ * const conn2 = await Deno.connectTls({ certFile: "./certs/my_custom_root_CA.pem", hostname: "192.0.2.1", port: 80 });
+ * const conn3 = await Deno.connectTls({ hostname: "[2001:db8::1]", port: 80 });
+ * const conn4 = await Deno.connectTls({ certFile: "./certs/my_custom_root_CA.pem", hostname: "golang.org", port: 80});
+ * ```
+ *
+ * Requires `allow-net` permission.
+ */
+ export function connectTls(options: ConnectTlsOptions): Promise<Conn>;
+
+ /** Shutdown socket send operations.
+ *
+ * Matches behavior of POSIX shutdown(3).
+ *
+ * ```ts
+ * const listener = Deno.listen({ port: 80 });
+ * const conn = await listener.accept();
+ * Deno.shutdown(conn.rid);
+ * ```
+ */
+ export function shutdown(rid: number): Promise<void>;
+}
diff --git a/extensions/net/lib.deno_net.unstable.d.ts b/extensions/net/lib.deno_net.unstable.d.ts
new file mode 100644
index 000000000..905a7acc1
--- /dev/null
+++ b/extensions/net/lib.deno_net.unstable.d.ts
@@ -0,0 +1,262 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+/// <reference no-default-lib="true" />
+/// <reference lib="esnext" />
+
+declare namespace Deno {
+ /** The type of the resource record.
+ * Only the listed types are supported currently. */
+ export type RecordType =
+ | "A"
+ | "AAAA"
+ | "ANAME"
+ | "CNAME"
+ | "MX"
+ | "PTR"
+ | "SRV"
+ | "TXT";
+
+ export interface ResolveDnsOptions {
+ /** The name server to be used for lookups.
+ * If not specified, defaults to the system configuration e.g. `/etc/resolv.conf` on Unix. */
+ nameServer?: {
+ /** The IP address of the name server */
+ ipAddr: string;
+ /** The port number the query will be sent to.
+ * If not specified, defaults to 53. */
+ port?: number;
+ };
+ }
+
+ /** If `resolveDns` is called with "MX" record type specified, it will return an array of this interface. */
+ export interface MXRecord {
+ preference: number;
+ exchange: string;
+ }
+
+ /** If `resolveDns` is called with "SRV" record type specified, it will return an array of this interface. */
+ export interface SRVRecord {
+ priority: number;
+ weight: number;
+ port: number;
+ target: string;
+ }
+
+ export function resolveDns(
+ query: string,
+ recordType: "A" | "AAAA" | "ANAME" | "CNAME" | "PTR",
+ options?: ResolveDnsOptions,
+ ): Promise<string[]>;
+
+ export function resolveDns(
+ query: string,
+ recordType: "MX",
+ options?: ResolveDnsOptions,
+ ): Promise<MXRecord[]>;
+
+ export function resolveDns(
+ query: string,
+ recordType: "SRV",
+ options?: ResolveDnsOptions,
+ ): Promise<SRVRecord[]>;
+
+ export function resolveDns(
+ query: string,
+ recordType: "TXT",
+ options?: ResolveDnsOptions,
+ ): Promise<string[][]>;
+
+ /** ** UNSTABLE**: new API, yet to be vetted.
+*
+* Performs DNS resolution against the given query, returning resolved records.
+* Fails in the cases such as:
+* - the query is in invalid format
+* - the options have an invalid parameter, e.g. `nameServer.port` is beyond the range of 16-bit unsigned integer
+* - timed out
+*
+* ```ts
+* const a = await Deno.resolveDns("example.com", "A");
+*
+* const aaaa = await Deno.resolveDns("example.com", "AAAA", {
+* nameServer: { ipAddr: "8.8.8.8", port: 1234 },
+* });
+* ```
+*
+* Requires `allow-net` permission.
+ */
+ export function resolveDns(
+ query: string,
+ recordType: RecordType,
+ options?: ResolveDnsOptions,
+ ): Promise<string[] | MXRecord[] | SRVRecord[] | string[][]>;
+
+ /** **UNSTABLE**: new API, yet to be vetted.
+*
+* A generic transport listener for message-oriented protocols. */
+ export interface DatagramConn extends AsyncIterable<[Uint8Array, Addr]> {
+ /** **UNSTABLE**: new API, yet to be vetted.
+ *
+ * Waits for and resolves to the next message to the `UDPConn`. */
+ receive(p?: Uint8Array): Promise<[Uint8Array, Addr]>;
+ /** UNSTABLE: new API, yet to be vetted.
+ *
+ * Sends a message to the target. */
+ send(p: Uint8Array, addr: Addr): Promise<number>;
+ /** UNSTABLE: new API, yet to be vetted.
+ *
+ * Close closes the socket. Any pending message promises will be rejected
+ * with errors. */
+ close(): void;
+ /** Return the address of the `UDPConn`. */
+ readonly addr: Addr;
+ [Symbol.asyncIterator](): AsyncIterableIterator<[Uint8Array, Addr]>;
+ }
+
+ export interface UnixListenOptions {
+ /** A Path to the Unix Socket. */
+ path: string;
+ }
+
+ /** **UNSTABLE**: new API, yet to be vetted.
+*
+* Listen announces on the local transport address.
+*
+* ```ts
+* const listener = Deno.listen({ path: "/foo/bar.sock", transport: "unix" })
+* ```
+*
+* Requires `allow-read` and `allow-write` permission. */
+ export function listen(
+ options: UnixListenOptions & { transport: "unix" },
+ ): Listener;
+
+ /** **UNSTABLE**: new API, yet to be vetted
+*
+* Listen announces on the local transport address.
+*
+* ```ts
+* const listener1 = Deno.listenDatagram({
+* port: 80,
+* transport: "udp"
+* });
+* const listener2 = Deno.listenDatagram({
+* hostname: "golang.org",
+* port: 80,
+* transport: "udp"
+* });
+* ```
+*
+* Requires `allow-net` permission. */
+ export function listenDatagram(
+ options: ListenOptions & { transport: "udp" },
+ ): DatagramConn;
+
+ /** **UNSTABLE**: new API, yet to be vetted
+*
+* Listen announces on the local transport address.
+*
+* ```ts
+* const listener = Deno.listenDatagram({
+* path: "/foo/bar.sock",
+* transport: "unixpacket"
+* });
+* ```
+*
+* Requires `allow-read` and `allow-write` permission. */
+ export function listenDatagram(
+ options: UnixListenOptions & { transport: "unixpacket" },
+ ): DatagramConn;
+
+ export interface UnixConnectOptions {
+ transport: "unix";
+ path: string;
+ }
+
+ /** **UNSTABLE**: The unix socket transport is unstable as a new API yet to
+* be vetted. The TCP transport is considered stable.
+*
+* Connects to the hostname (default is "127.0.0.1") and port on the named
+* transport (default is "tcp"), and resolves to the connection (`Conn`).
+*
+* ```ts
+* const conn1 = await Deno.connect({ port: 80 });
+* const conn2 = await Deno.connect({ hostname: "192.0.2.1", port: 80 });
+* const conn3 = await Deno.connect({ hostname: "[2001:db8::1]", port: 80 });
+* const conn4 = await Deno.connect({ hostname: "golang.org", port: 80, transport: "tcp" });
+* const conn5 = await Deno.connect({ path: "/foo/bar.sock", transport: "unix" });
+* ```
+*
+* Requires `allow-net` permission for "tcp" and `allow-read` for "unix". */
+ export function connect(
+ options: ConnectOptions | UnixConnectOptions,
+ ): Promise<Conn>;
+
+ export interface StartTlsOptions {
+ /** A literal IP address or host name that can be resolved to an IP address.
+ * If not specified, defaults to `127.0.0.1`. */
+ hostname?: string;
+ /** Server certificate file. */
+ certFile?: string;
+ }
+
+ /** **UNSTABLE**: new API, yet to be vetted.
+*
+* Start TLS handshake from an existing connection using
+* an optional cert file, hostname (default is "127.0.0.1"). The
+* cert file is optional and if not included Mozilla's root certificates will
+* be used (see also https://github.com/ctz/webpki-roots for specifics)
+* Using this function requires that the other end of the connection is
+* prepared for TLS handshake.
+*
+* ```ts
+* const conn = await Deno.connect({ port: 80, hostname: "127.0.0.1" });
+* const tlsConn = await Deno.startTls(conn, { certFile: "./certs/my_custom_root_CA.pem", hostname: "localhost" });
+* ```
+*
+* Requires `allow-net` permission.
+ */
+ export function startTls(
+ conn: Conn,
+ options?: StartTlsOptions,
+ ): Promise<Conn>;
+
+ export interface ListenTlsOptions {
+ /** **UNSTABLE**: new API, yet to be vetted.
+ *
+ * Application-Layer Protocol Negotiation (ALPN) protocols to announce to
+ * the client. If not specified, no ALPN extension will be included in the
+ * TLS handshake.
+ */
+ alpnProtocols?: string[];
+ }
+
+ export interface RequestEvent {
+ readonly request: Request;
+ respondWith(r: Response | Promise<Response>): Promise<void>;
+ }
+
+ export interface HttpConn extends AsyncIterable<RequestEvent> {
+ readonly rid: number;
+
+ nextRequest(): Promise<RequestEvent | null>;
+ close(): void;
+ }
+
+ /** **UNSTABLE**: new API, yet to be vetted.
+ *
+ * Services HTTP requests given a TCP or TLS socket.
+ *
+ * ```ts
+ * const conn = await Deno.connect({ port: 80, hostname: "127.0.0.1" });
+ * const httpConn = Deno.serveHttp(conn);
+ * const e = await httpConn.nextRequest();
+ * if (e) {
+ * e.respondWith(new Response("Hello World"));
+ * }
+ * ```
+ *
+ * If `httpConn.nextRequest()` encounters an error or returns `null`
+ * then the underlying HttpConn resource is closed automatically.
+ */
+ export function serveHttp(conn: Conn): HttpConn;
+}
diff --git a/extensions/net/lib.rs b/extensions/net/lib.rs
new file mode 100644
index 000000000..d1e836fce
--- /dev/null
+++ b/extensions/net/lib.rs
@@ -0,0 +1,113 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+pub mod io;
+pub mod ops;
+pub mod ops_http;
+pub mod ops_tls;
+#[cfg(unix)]
+pub mod ops_unix;
+pub mod resolve_addr;
+
+use deno_core::error::AnyError;
+use deno_core::include_js_files;
+use deno_core::Extension;
+use deno_core::OpState;
+use std::cell::RefCell;
+use std::path::Path;
+use std::path::PathBuf;
+use std::rc::Rc;
+
+pub trait NetPermissions {
+ fn check_net<T: AsRef<str>>(
+ &mut self,
+ _host: &(T, Option<u16>),
+ ) -> Result<(), AnyError>;
+ fn check_read(&mut self, _p: &Path) -> Result<(), AnyError>;
+ fn check_write(&mut self, _p: &Path) -> Result<(), AnyError>;
+}
+
+/// For use with this crate when the user does not want permission checks.
+pub struct NoNetPermissions;
+
+impl NetPermissions for NoNetPermissions {
+ fn check_net<T: AsRef<str>>(
+ &mut self,
+ _host: &(T, Option<u16>),
+ ) -> Result<(), AnyError> {
+ Ok(())
+ }
+
+ fn check_read(&mut self, _p: &Path) -> Result<(), AnyError> {
+ Ok(())
+ }
+
+ fn check_write(&mut self, _p: &Path) -> Result<(), AnyError> {
+ Ok(())
+ }
+}
+
+/// `UnstableChecker` is a struct so it can be placed inside `GothamState`;
+/// using type alias for a bool could work, but there's a high chance
+/// that there might be another type alias pointing to a bool, which
+/// would override previously used alias.
+pub struct UnstableChecker {
+ pub unstable: bool,
+}
+
+impl UnstableChecker {
+ /// Quits the process if the --unstable flag was not provided.
+ ///
+ /// This is intentionally a non-recoverable check so that people cannot probe
+ /// for unstable APIs from stable programs.
+ // NOTE(bartlomieju): keep in sync with `cli/program_state.rs`
+ pub fn check_unstable(&self, api_name: &str) {
+ if !self.unstable {
+ eprintln!(
+ "Unstable API '{}'. The --unstable flag must be provided.",
+ api_name
+ );
+ std::process::exit(70);
+ }
+ }
+}
+/// Helper for checking unstable features. Used for sync ops.
+pub fn check_unstable(state: &OpState, api_name: &str) {
+ state.borrow::<UnstableChecker>().check_unstable(api_name)
+}
+
+/// Helper for checking unstable features. Used for async ops.
+pub fn check_unstable2(state: &Rc<RefCell<OpState>>, api_name: &str) {
+ let state = state.borrow();
+ state.borrow::<UnstableChecker>().check_unstable(api_name)
+}
+
+pub fn get_declaration() -> PathBuf {
+ PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_net.d.ts")
+}
+
+pub fn get_unstable_declaration() -> PathBuf {
+ PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_net.unstable.d.ts")
+}
+
+pub fn init<P: NetPermissions + 'static>(unstable: bool) -> Extension {
+ let mut ops_to_register = vec![];
+ ops_to_register.extend(io::init());
+ ops_to_register.extend(ops::init::<P>());
+ ops_to_register.extend(ops_tls::init::<P>());
+ ops_to_register.extend(ops_http::init());
+
+ Extension::builder()
+ .js(include_js_files!(
+ prefix "deno:extensions/net",
+ "01_net.js",
+ "02_tls.js",
+ "03_http.js",
+ "04_net_unstable.js",
+ ))
+ .ops(ops_to_register)
+ .state(move |state| {
+ state.put(UnstableChecker { unstable });
+ Ok(())
+ })
+ .build()
+}
diff --git a/extensions/net/ops.rs b/extensions/net/ops.rs
new file mode 100644
index 000000000..a02bbf91a
--- /dev/null
+++ b/extensions/net/ops.rs
@@ -0,0 +1,795 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+use crate::io::TcpStreamResource;
+use crate::resolve_addr::resolve_addr;
+use crate::resolve_addr::resolve_addr_sync;
+use crate::NetPermissions;
+use deno_core::error::bad_resource;
+use deno_core::error::custom_error;
+use deno_core::error::generic_error;
+use deno_core::error::null_opbuf;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::op_async;
+use deno_core::op_sync;
+use deno_core::AsyncRefCell;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
+use deno_core::OpPair;
+use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
+use deno_core::ResourceId;
+use deno_core::ZeroCopyBuf;
+use log::debug;
+use serde::Deserialize;
+use serde::Serialize;
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::net::SocketAddr;
+use std::rc::Rc;
+use tokio::net::TcpListener;
+use tokio::net::TcpStream;
+use tokio::net::UdpSocket;
+use trust_dns_proto::rr::record_data::RData;
+use trust_dns_proto::rr::record_type::RecordType;
+use trust_dns_resolver::config::NameServerConfigGroup;
+use trust_dns_resolver::config::ResolverConfig;
+use trust_dns_resolver::config::ResolverOpts;
+use trust_dns_resolver::system_conf;
+use trust_dns_resolver::AsyncResolver;
+
+#[cfg(unix)]
+use super::ops_unix as net_unix;
+#[cfg(unix)]
+use crate::io::UnixStreamResource;
+#[cfg(unix)]
+use std::path::Path;
+
+pub fn init<P: NetPermissions + 'static>() -> Vec<OpPair> {
+ vec![
+ ("op_accept", op_async(op_accept)),
+ ("op_connect", op_async(op_connect::<P>)),
+ ("op_listen", op_sync(op_listen::<P>)),
+ ("op_datagram_receive", op_async(op_datagram_receive)),
+ ("op_datagram_send", op_async(op_datagram_send::<P>)),
+ ("op_dns_resolve", op_async(op_dns_resolve::<P>)),
+ ]
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct OpConn {
+ pub rid: ResourceId,
+ pub remote_addr: Option<OpAddr>,
+ pub local_addr: Option<OpAddr>,
+}
+
+#[derive(Serialize)]
+#[serde(tag = "transport", rename_all = "lowercase")]
+pub enum OpAddr {
+ Tcp(IpAddr),
+ Udp(IpAddr),
+ #[cfg(unix)]
+ Unix(net_unix::UnixAddr),
+ #[cfg(unix)]
+ UnixPacket(net_unix::UnixAddr),
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+/// A received datagram packet (from udp or unixpacket)
+pub struct OpPacket {
+ pub size: usize,
+ pub remote_addr: OpAddr,
+}
+
+#[derive(Serialize)]
+pub struct IpAddr {
+ pub hostname: String,
+ pub port: u16,
+}
+
+#[derive(Deserialize)]
+pub(crate) struct AcceptArgs {
+ pub rid: ResourceId,
+ pub transport: String,
+}
+
+async fn accept_tcp(
+ state: Rc<RefCell<OpState>>,
+ args: AcceptArgs,
+ _: (),
+) -> Result<OpConn, AnyError> {
+ let rid = args.rid;
+
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<TcpListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let listener = RcRef::map(&resource, |r| &r.listener)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let (tcp_stream, _socket_addr) =
+ listener.accept().try_or_cancel(cancel).await.map_err(|e| {
+ // FIXME(bartlomieju): compatibility with current JS implementation
+ if let std::io::ErrorKind::Interrupted = e.kind() {
+ bad_resource("Listener has been closed")
+ } else {
+ e.into()
+ }
+ })?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+
+ let mut state = state.borrow_mut();
+ let rid = state
+ .resource_table
+ .add(TcpStreamResource::new(tcp_stream.into_split()));
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: local_addr.ip().to_string(),
+ port: local_addr.port(),
+ })),
+ remote_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: remote_addr.ip().to_string(),
+ port: remote_addr.port(),
+ })),
+ })
+}
+
+async fn op_accept(
+ state: Rc<RefCell<OpState>>,
+ args: AcceptArgs,
+ _: (),
+) -> Result<OpConn, AnyError> {
+ match args.transport.as_str() {
+ "tcp" => accept_tcp(state, args, ()).await,
+ #[cfg(unix)]
+ "unix" => net_unix::accept_unix(state, args, ()).await,
+ other => Err(bad_transport(other)),
+ }
+}
+
+fn bad_transport(transport: &str) -> AnyError {
+ generic_error(format!("Unsupported transport protocol {}", transport))
+}
+
+#[derive(Deserialize)]
+pub(crate) struct ReceiveArgs {
+ pub rid: ResourceId,
+ pub transport: String,
+}
+
+async fn receive_udp(
+ state: Rc<RefCell<OpState>>,
+ args: ReceiveArgs,
+ zero_copy: Option<ZeroCopyBuf>,
+) -> Result<OpPacket, AnyError> {
+ let zero_copy = zero_copy.ok_or_else(null_opbuf)?;
+ let mut zero_copy = zero_copy.clone();
+
+ let rid = args.rid;
+
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<UdpSocketResource>(rid)
+ .ok_or_else(|| bad_resource("Socket has been closed"))?;
+ let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
+ let cancel_handle = RcRef::map(&resource, |r| &r.cancel);
+ let (size, remote_addr) = socket
+ .recv_from(&mut zero_copy)
+ .try_or_cancel(cancel_handle)
+ .await?;
+ Ok(OpPacket {
+ size,
+ remote_addr: OpAddr::Udp(IpAddr {
+ hostname: remote_addr.ip().to_string(),
+ port: remote_addr.port(),
+ }),
+ })
+}
+
+async fn op_datagram_receive(
+ state: Rc<RefCell<OpState>>,
+ args: ReceiveArgs,
+ zero_copy: Option<ZeroCopyBuf>,
+) -> Result<OpPacket, AnyError> {
+ match args.transport.as_str() {
+ "udp" => receive_udp(state, args, zero_copy).await,
+ #[cfg(unix)]
+ "unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await,
+ other => Err(bad_transport(other)),
+ }
+}
+
+#[derive(Deserialize)]
+struct SendArgs {
+ rid: ResourceId,
+ transport: String,
+ #[serde(flatten)]
+ transport_args: ArgsEnum,
+}
+
+async fn op_datagram_send<NP>(
+ state: Rc<RefCell<OpState>>,
+ args: SendArgs,
+ zero_copy: Option<ZeroCopyBuf>,
+) -> Result<usize, AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ let zero_copy = zero_copy.ok_or_else(null_opbuf)?;
+ let zero_copy = zero_copy.clone();
+
+ match args {
+ SendArgs {
+ rid,
+ transport,
+ transport_args: ArgsEnum::Ip(args),
+ } if transport == "udp" => {
+ {
+ let mut s = state.borrow_mut();
+ s.borrow_mut::<NP>()
+ .check_net(&(&args.hostname, Some(args.port)))?;
+ }
+ let addr = resolve_addr(&args.hostname, args.port)
+ .await?
+ .next()
+ .ok_or_else(|| generic_error("No resolved address found"))?;
+
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<UdpSocketResource>(rid)
+ .ok_or_else(|| bad_resource("Socket has been closed"))?;
+ let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
+ let byte_length = socket.send_to(&zero_copy, &addr).await?;
+ Ok(byte_length)
+ }
+ #[cfg(unix)]
+ SendArgs {
+ rid,
+ transport,
+ transport_args: ArgsEnum::Unix(args),
+ } if transport == "unixpacket" => {
+ let address_path = Path::new(&args.path);
+ {
+ let mut s = state.borrow_mut();
+ s.borrow_mut::<NP>().check_write(&address_path)?;
+ }
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<net_unix::UnixDatagramResource>(rid)
+ .ok_or_else(|| {
+ custom_error("NotConnected", "Socket has been closed")
+ })?;
+ let socket = RcRef::map(&resource, |r| &r.socket)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
+ let byte_length = socket.send_to(&zero_copy, address_path).await?;
+ Ok(byte_length)
+ }
+ _ => Err(type_error("Wrong argument format!")),
+ }
+}
+
+#[derive(Deserialize)]
+struct ConnectArgs {
+ transport: String,
+ #[serde(flatten)]
+ transport_args: ArgsEnum,
+}
+
+async fn op_connect<NP>(
+ state: Rc<RefCell<OpState>>,
+ args: ConnectArgs,
+ _: (),
+) -> Result<OpConn, AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ match args {
+ ConnectArgs {
+ transport,
+ transport_args: ArgsEnum::Ip(args),
+ } if transport == "tcp" => {
+ {
+ let mut state_ = state.borrow_mut();
+ state_
+ .borrow_mut::<NP>()
+ .check_net(&(&args.hostname, Some(args.port)))?;
+ }
+ let addr = resolve_addr(&args.hostname, args.port)
+ .await?
+ .next()
+ .ok_or_else(|| generic_error("No resolved address found"))?;
+ let tcp_stream = TcpStream::connect(&addr).await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+
+ let mut state_ = state.borrow_mut();
+ let rid = state_
+ .resource_table
+ .add(TcpStreamResource::new(tcp_stream.into_split()));
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: local_addr.ip().to_string(),
+ port: local_addr.port(),
+ })),
+ remote_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: remote_addr.ip().to_string(),
+ port: remote_addr.port(),
+ })),
+ })
+ }
+ #[cfg(unix)]
+ ConnectArgs {
+ transport,
+ transport_args: ArgsEnum::Unix(args),
+ } if transport == "unix" => {
+ let address_path = Path::new(&args.path);
+ super::check_unstable2(&state, "Deno.connect");
+ {
+ let mut state_ = state.borrow_mut();
+ state_.borrow_mut::<NP>().check_read(&address_path)?;
+ state_.borrow_mut::<NP>().check_write(&address_path)?;
+ }
+ let path = args.path;
+ let unix_stream = net_unix::UnixStream::connect(Path::new(&path)).await?;
+ let local_addr = unix_stream.local_addr()?;
+ let remote_addr = unix_stream.peer_addr()?;
+
+ let mut state_ = state.borrow_mut();
+ let resource = UnixStreamResource::new(unix_stream.into_split());
+ let rid = state_.resource_table.add(resource);
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Unix(net_unix::UnixAddr {
+ path: local_addr.as_pathname().and_then(net_unix::pathstring),
+ })),
+ remote_addr: Some(OpAddr::Unix(net_unix::UnixAddr {
+ path: remote_addr.as_pathname().and_then(net_unix::pathstring),
+ })),
+ })
+ }
+ _ => Err(type_error("Wrong argument format!")),
+ }
+}
+
+pub struct TcpListenerResource {
+ pub listener: AsyncRefCell<TcpListener>,
+ pub cancel: CancelHandle,
+}
+
+impl Resource for TcpListenerResource {
+ fn name(&self) -> Cow<str> {
+ "tcpListener".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
+}
+
+struct UdpSocketResource {
+ socket: AsyncRefCell<UdpSocket>,
+ cancel: CancelHandle,
+}
+
+impl Resource for UdpSocketResource {
+ fn name(&self) -> Cow<str> {
+ "udpSocket".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel()
+ }
+}
+
+#[derive(Deserialize)]
+struct IpListenArgs {
+ hostname: String,
+ port: u16,
+}
+
+#[derive(Deserialize)]
+#[serde(untagged)]
+enum ArgsEnum {
+ Ip(IpListenArgs),
+ #[cfg(unix)]
+ Unix(net_unix::UnixListenArgs),
+}
+
+#[derive(Deserialize)]
+struct ListenArgs {
+ transport: String,
+ #[serde(flatten)]
+ transport_args: ArgsEnum,
+}
+
+fn listen_tcp(
+ state: &mut OpState,
+ addr: SocketAddr,
+) -> Result<(u32, SocketAddr), AnyError> {
+ let std_listener = std::net::TcpListener::bind(&addr)?;
+ std_listener.set_nonblocking(true)?;
+ let listener = TcpListener::from_std(std_listener)?;
+ let local_addr = listener.local_addr()?;
+ let listener_resource = TcpListenerResource {
+ listener: AsyncRefCell::new(listener),
+ cancel: Default::default(),
+ };
+ let rid = state.resource_table.add(listener_resource);
+
+ Ok((rid, local_addr))
+}
+
+fn listen_udp(
+ state: &mut OpState,
+ addr: SocketAddr,
+) -> Result<(u32, SocketAddr), AnyError> {
+ let std_socket = std::net::UdpSocket::bind(&addr)?;
+ std_socket.set_nonblocking(true)?;
+ let socket = UdpSocket::from_std(std_socket)?;
+ let local_addr = socket.local_addr()?;
+ let socket_resource = UdpSocketResource {
+ socket: AsyncRefCell::new(socket),
+ cancel: Default::default(),
+ };
+ let rid = state.resource_table.add(socket_resource);
+
+ Ok((rid, local_addr))
+}
+
+fn op_listen<NP>(
+ state: &mut OpState,
+ args: ListenArgs,
+ _: (),
+) -> Result<OpConn, AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ match args {
+ ListenArgs {
+ transport,
+ transport_args: ArgsEnum::Ip(args),
+ } => {
+ {
+ if transport == "udp" {
+ super::check_unstable(state, "Deno.listenDatagram");
+ }
+ state
+ .borrow_mut::<NP>()
+ .check_net(&(&args.hostname, Some(args.port)))?;
+ }
+ let addr = resolve_addr_sync(&args.hostname, args.port)?
+ .next()
+ .ok_or_else(|| generic_error("No resolved address found"))?;
+ let (rid, local_addr) = if transport == "tcp" {
+ listen_tcp(state, addr)?
+ } else {
+ listen_udp(state, addr)?
+ };
+ debug!(
+ "New listener {} {}:{}",
+ rid,
+ local_addr.ip().to_string(),
+ local_addr.port()
+ );
+ let ip_addr = IpAddr {
+ hostname: local_addr.ip().to_string(),
+ port: local_addr.port(),
+ };
+ Ok(OpConn {
+ rid,
+ local_addr: Some(match transport.as_str() {
+ "udp" => OpAddr::Udp(ip_addr),
+ "tcp" => OpAddr::Tcp(ip_addr),
+ // NOTE: This could be unreachable!()
+ other => return Err(bad_transport(other)),
+ }),
+ remote_addr: None,
+ })
+ }
+ #[cfg(unix)]
+ ListenArgs {
+ transport,
+ transport_args: ArgsEnum::Unix(args),
+ } if transport == "unix" || transport == "unixpacket" => {
+ let address_path = Path::new(&args.path);
+ {
+ if transport == "unix" {
+ super::check_unstable(state, "Deno.listen");
+ }
+ if transport == "unixpacket" {
+ super::check_unstable(state, "Deno.listenDatagram");
+ }
+ let permissions = state.borrow_mut::<NP>();
+ permissions.check_read(&address_path)?;
+ permissions.check_write(&address_path)?;
+ }
+ let (rid, local_addr) = if transport == "unix" {
+ net_unix::listen_unix(state, &address_path)?
+ } else {
+ net_unix::listen_unix_packet(state, &address_path)?
+ };
+ debug!(
+ "New listener {} {}",
+ rid,
+ local_addr.as_pathname().unwrap().display(),
+ );
+ let unix_addr = net_unix::UnixAddr {
+ path: local_addr.as_pathname().and_then(net_unix::pathstring),
+ };
+
+ Ok(OpConn {
+ rid,
+ local_addr: Some(match transport.as_str() {
+ "unix" => OpAddr::Unix(unix_addr),
+ "unixpacket" => OpAddr::UnixPacket(unix_addr),
+ other => return Err(bad_transport(other)),
+ }),
+ remote_addr: None,
+ })
+ }
+ #[cfg(unix)]
+ _ => Err(type_error("Wrong argument format!")),
+ }
+}
+
+#[derive(Serialize, PartialEq, Debug)]
+#[serde(untagged)]
+enum DnsReturnRecord {
+ A(String),
+ Aaaa(String),
+ Aname(String),
+ Cname(String),
+ Mx {
+ preference: u16,
+ exchange: String,
+ },
+ Ptr(String),
+ Srv {
+ priority: u16,
+ weight: u16,
+ port: u16,
+ target: String,
+ },
+ Txt(Vec<String>),
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ResolveAddrArgs {
+ query: String,
+ record_type: RecordType,
+ options: Option<ResolveDnsOption>,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ResolveDnsOption {
+ name_server: Option<NameServer>,
+}
+
+fn default_port() -> u16 {
+ 53
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct NameServer {
+ ip_addr: String,
+ #[serde(default = "default_port")]
+ port: u16,
+}
+
+async fn op_dns_resolve<NP>(
+ state: Rc<RefCell<OpState>>,
+ args: ResolveAddrArgs,
+ _: (),
+) -> Result<Vec<DnsReturnRecord>, AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ let ResolveAddrArgs {
+ query,
+ record_type,
+ options,
+ } = args;
+
+ let (config, opts) = if let Some(name_server) =
+ options.as_ref().and_then(|o| o.name_server.as_ref())
+ {
+ let group = NameServerConfigGroup::from_ips_clear(
+ &[name_server.ip_addr.parse()?],
+ name_server.port,
+ true,
+ );
+ (
+ ResolverConfig::from_parts(None, vec![], group),
+ ResolverOpts::default(),
+ )
+ } else {
+ system_conf::read_system_conf()?
+ };
+
+ {
+ let mut s = state.borrow_mut();
+ let perm = s.borrow_mut::<NP>();
+
+ // Checks permission against the name servers which will be actually queried.
+ for ns in config.name_servers() {
+ let socker_addr = &ns.socket_addr;
+ let ip = socker_addr.ip().to_string();
+ let port = socker_addr.port();
+ perm.check_net(&(ip, Some(port)))?;
+ }
+ }
+
+ let resolver = AsyncResolver::tokio(config, opts)?;
+
+ let results = resolver
+ .lookup(query, record_type, Default::default())
+ .await
+ .map_err(|e| generic_error(format!("{}", e)))?
+ .iter()
+ .filter_map(rdata_to_return_record(record_type))
+ .collect();
+
+ Ok(results)
+}
+
+fn rdata_to_return_record(
+ ty: RecordType,
+) -> impl Fn(&RData) -> Option<DnsReturnRecord> {
+ use RecordType::*;
+ move |r: &RData| -> Option<DnsReturnRecord> {
+ match ty {
+ A => r.as_a().map(ToString::to_string).map(DnsReturnRecord::A),
+ AAAA => r
+ .as_aaaa()
+ .map(ToString::to_string)
+ .map(DnsReturnRecord::Aaaa),
+ ANAME => r
+ .as_aname()
+ .map(ToString::to_string)
+ .map(DnsReturnRecord::Aname),
+ CNAME => r
+ .as_cname()
+ .map(ToString::to_string)
+ .map(DnsReturnRecord::Cname),
+ MX => r.as_mx().map(|mx| DnsReturnRecord::Mx {
+ preference: mx.preference(),
+ exchange: mx.exchange().to_string(),
+ }),
+ PTR => r
+ .as_ptr()
+ .map(ToString::to_string)
+ .map(DnsReturnRecord::Ptr),
+ SRV => r.as_srv().map(|srv| DnsReturnRecord::Srv {
+ priority: srv.priority(),
+ weight: srv.weight(),
+ port: srv.port(),
+ target: srv.target().to_string(),
+ }),
+ TXT => r.as_txt().map(|txt| {
+ let texts: Vec<String> = txt
+ .iter()
+ .map(|bytes| {
+ // Tries to parse these bytes as Latin-1
+ bytes.iter().map(|&b| b as char).collect::<String>()
+ })
+ .collect();
+ DnsReturnRecord::Txt(texts)
+ }),
+ // TODO(magurotuna): Other record types are not supported
+ _ => todo!(),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::net::Ipv4Addr;
+ use std::net::Ipv6Addr;
+ use trust_dns_proto::rr::rdata::mx::MX;
+ use trust_dns_proto::rr::rdata::srv::SRV;
+ use trust_dns_proto::rr::rdata::txt::TXT;
+ use trust_dns_proto::rr::record_data::RData;
+ use trust_dns_proto::rr::Name;
+
+ #[test]
+ fn rdata_to_return_record_a() {
+ let func = rdata_to_return_record(RecordType::A);
+ let rdata = RData::A(Ipv4Addr::new(127, 0, 0, 1));
+ assert_eq!(
+ func(&rdata),
+ Some(DnsReturnRecord::A("127.0.0.1".to_string()))
+ );
+ }
+
+ #[test]
+ fn rdata_to_return_record_aaaa() {
+ let func = rdata_to_return_record(RecordType::AAAA);
+ let rdata = RData::AAAA(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1));
+ assert_eq!(func(&rdata), Some(DnsReturnRecord::Aaaa("::1".to_string())));
+ }
+
+ #[test]
+ fn rdata_to_return_record_aname() {
+ let func = rdata_to_return_record(RecordType::ANAME);
+ let rdata = RData::ANAME(Name::new());
+ assert_eq!(func(&rdata), Some(DnsReturnRecord::Aname("".to_string())));
+ }
+
+ #[test]
+ fn rdata_to_return_record_cname() {
+ let func = rdata_to_return_record(RecordType::CNAME);
+ let rdata = RData::CNAME(Name::new());
+ assert_eq!(func(&rdata), Some(DnsReturnRecord::Cname("".to_string())));
+ }
+
+ #[test]
+ fn rdata_to_return_record_mx() {
+ let func = rdata_to_return_record(RecordType::MX);
+ let rdata = RData::MX(MX::new(10, Name::new()));
+ assert_eq!(
+ func(&rdata),
+ Some(DnsReturnRecord::Mx {
+ preference: 10,
+ exchange: "".to_string()
+ })
+ );
+ }
+
+ #[test]
+ fn rdata_to_return_record_ptr() {
+ let func = rdata_to_return_record(RecordType::PTR);
+ let rdata = RData::PTR(Name::new());
+ assert_eq!(func(&rdata), Some(DnsReturnRecord::Ptr("".to_string())));
+ }
+
+ #[test]
+ fn rdata_to_return_record_srv() {
+ let func = rdata_to_return_record(RecordType::SRV);
+ let rdata = RData::SRV(SRV::new(1, 2, 3, Name::new()));
+ assert_eq!(
+ func(&rdata),
+ Some(DnsReturnRecord::Srv {
+ priority: 1,
+ weight: 2,
+ port: 3,
+ target: "".to_string()
+ })
+ );
+ }
+
+ #[test]
+ fn rdata_to_return_record_txt() {
+ let func = rdata_to_return_record(RecordType::TXT);
+ let rdata = RData::TXT(TXT::from_bytes(vec![
+ "foo".as_bytes(),
+ "bar".as_bytes(),
+ &[0xa3], // "ÂŁ" in Latin-1
+ &[0xe3, 0x81, 0x82], // "あ" in UTF-8
+ ]));
+ assert_eq!(
+ func(&rdata),
+ Some(DnsReturnRecord::Txt(vec![
+ "foo".to_string(),
+ "bar".to_string(),
+ "ÂŁ".to_string(),
+ "ĂŁ\u{81}\u{82}".to_string(),
+ ]))
+ );
+ }
+}
diff --git a/extensions/net/ops_http.rs b/extensions/net/ops_http.rs
new file mode 100644
index 000000000..54e06c3a7
--- /dev/null
+++ b/extensions/net/ops_http.rs
@@ -0,0 +1,577 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+use crate::io::TcpStreamResource;
+use crate::io::TlsStreamResource;
+use crate::ops_tls::TlsStream;
+use deno_core::error::bad_resource_id;
+use deno_core::error::null_opbuf;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::futures::future::poll_fn;
+use deno_core::futures::FutureExt;
+use deno_core::futures::Stream;
+use deno_core::futures::StreamExt;
+use deno_core::op_async;
+use deno_core::op_sync;
+use deno_core::AsyncRefCell;
+use deno_core::ByteString;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
+use deno_core::OpPair;
+use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
+use deno_core::ResourceId;
+use deno_core::ZeroCopyBuf;
+use hyper::body::HttpBody;
+use hyper::http;
+use hyper::server::conn::Connection;
+use hyper::server::conn::Http;
+use hyper::service::Service as HyperService;
+use hyper::Body;
+use hyper::Request;
+use hyper::Response;
+use serde::Deserialize;
+use serde::Serialize;
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::future::Future;
+use std::net::SocketAddr;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::task::Context;
+use std::task::Poll;
+use tokio::io::AsyncReadExt;
+use tokio::net::TcpStream;
+use tokio::sync::oneshot;
+use tokio_util::io::StreamReader;
+
+pub fn init() -> Vec<OpPair> {
+ vec![
+ ("op_http_start", op_sync(op_http_start)),
+ ("op_http_request_next", op_async(op_http_request_next)),
+ ("op_http_request_read", op_async(op_http_request_read)),
+ ("op_http_response", op_async(op_http_response)),
+ ("op_http_response_write", op_async(op_http_response_write)),
+ ("op_http_response_close", op_async(op_http_response_close)),
+ ]
+}
+
+struct ServiceInner {
+ request: Request<Body>,
+ response_tx: oneshot::Sender<Response<Body>>,
+}
+
+#[derive(Clone, Default)]
+struct Service {
+ inner: Rc<RefCell<Option<ServiceInner>>>,
+ waker: Rc<deno_core::futures::task::AtomicWaker>,
+}
+
+impl HyperService<Request<Body>> for Service {
+ type Response = Response<Body>;
+ type Error = http::Error;
+ #[allow(clippy::type_complexity)]
+ type Future =
+ Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
+
+ fn poll_ready(
+ &mut self,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ if self.inner.borrow().is_some() {
+ Poll::Pending
+ } else {
+ Poll::Ready(Ok(()))
+ }
+ }
+
+ fn call(&mut self, req: Request<Body>) -> Self::Future {
+ let (resp_tx, resp_rx) = oneshot::channel();
+ self.inner.borrow_mut().replace(ServiceInner {
+ request: req,
+ response_tx: resp_tx,
+ });
+
+ async move { Ok(resp_rx.await.unwrap()) }.boxed_local()
+ }
+}
+
+enum ConnType {
+ Tcp(Rc<RefCell<Connection<TcpStream, Service, LocalExecutor>>>),
+ Tls(Rc<RefCell<Connection<TlsStream, Service, LocalExecutor>>>),
+}
+
+struct ConnResource {
+ hyper_connection: ConnType,
+ deno_service: Service,
+ addr: SocketAddr,
+ cancel: CancelHandle,
+}
+
+impl ConnResource {
+ // TODO(ry) impl Future for ConnResource?
+ fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> {
+ match &self.hyper_connection {
+ ConnType::Tcp(c) => c.borrow_mut().poll_unpin(cx),
+ ConnType::Tls(c) => c.borrow_mut().poll_unpin(cx),
+ }
+ .map_err(AnyError::from)
+ }
+}
+
+impl Resource for ConnResource {
+ fn name(&self) -> Cow<str> {
+ "httpConnection".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel()
+ }
+}
+
+// We use a tuple instead of struct to avoid serialization overhead of the keys.
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct NextRequestResponse(
+ // request_body_rid:
+ Option<ResourceId>,
+ // response_sender_rid:
+ ResourceId,
+ // method:
+ // This is a String rather than a ByteString because reqwest will only return
+ // the method as a str which is guaranteed to be ASCII-only.
+ String,
+ // headers:
+ Vec<(ByteString, ByteString)>,
+ // url:
+ String,
+);
+
+async fn op_http_request_next(
+ state: Rc<RefCell<OpState>>,
+ conn_rid: ResourceId,
+ _: (),
+) -> Result<Option<NextRequestResponse>, AnyError> {
+ let conn_resource = state
+ .borrow()
+ .resource_table
+ .get::<ConnResource>(conn_rid)
+ .ok_or_else(bad_resource_id)?;
+
+ let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel);
+
+ poll_fn(|cx| {
+ conn_resource.deno_service.waker.register(cx.waker());
+ let connection_closed = match conn_resource.poll(cx) {
+ Poll::Pending => false,
+ Poll::Ready(Ok(())) => {
+ // try to close ConnResource, but don't unwrap as it might
+ // already be closed
+ let _ = state
+ .borrow_mut()
+ .resource_table
+ .take::<ConnResource>(conn_rid);
+ true
+ }
+ Poll::Ready(Err(e)) => {
+ // TODO(ry) close RequestResource associated with connection
+ // TODO(ry) close ResponseBodyResource associated with connection
+ // close ConnResource
+ state
+ .borrow_mut()
+ .resource_table
+ .take::<ConnResource>(conn_rid)
+ .unwrap();
+
+ if should_ignore_error(&e) {
+ true
+ } else {
+ return Poll::Ready(Err(e));
+ }
+ }
+ };
+ if let Some(request_resource) =
+ conn_resource.deno_service.inner.borrow_mut().take()
+ {
+ let tx = request_resource.response_tx;
+ let req = request_resource.request;
+ let method = req.method().to_string();
+
+ let mut headers = Vec::with_capacity(req.headers().len());
+ for (name, value) in req.headers().iter() {
+ let name: &[u8] = name.as_ref();
+ let value = value.as_bytes();
+ headers
+ .push((ByteString(name.to_owned()), ByteString(value.to_owned())));
+ }
+
+ let url = {
+ let scheme = {
+ match conn_resource.hyper_connection {
+ ConnType::Tcp(_) => "http",
+ ConnType::Tls(_) => "https",
+ }
+ };
+ let host: Cow<str> = if let Some(host) = req.uri().host() {
+ Cow::Borrowed(host)
+ } else if let Some(host) = req.headers().get("HOST") {
+ Cow::Borrowed(host.to_str()?)
+ } else {
+ Cow::Owned(conn_resource.addr.to_string())
+ };
+ let path = req.uri().path_and_query().map_or("/", |p| p.as_str());
+ format!("{}://{}{}", scheme, host, path)
+ };
+
+ let has_body = if let Some(exact_size) = req.size_hint().exact() {
+ exact_size > 0
+ } else {
+ true
+ };
+
+ let maybe_request_body_rid = if has_body {
+ let stream: BytesStream = Box::pin(req.into_body().map(|r| {
+ r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
+ }));
+ let stream_reader = StreamReader::new(stream);
+ let mut state = state.borrow_mut();
+ let request_body_rid = state.resource_table.add(RequestBodyResource {
+ conn_rid,
+ reader: AsyncRefCell::new(stream_reader),
+ cancel: CancelHandle::default(),
+ });
+ Some(request_body_rid)
+ } else {
+ None
+ };
+
+ let mut state = state.borrow_mut();
+ let response_sender_rid =
+ state.resource_table.add(ResponseSenderResource {
+ sender: tx,
+ conn_rid,
+ });
+
+ Poll::Ready(Ok(Some(NextRequestResponse(
+ maybe_request_body_rid,
+ response_sender_rid,
+ method,
+ headers,
+ url,
+ ))))
+ } else if connection_closed {
+ Poll::Ready(Ok(None))
+ } else {
+ Poll::Pending
+ }
+ })
+ .try_or_cancel(cancel)
+ .await
+ .map_err(AnyError::from)
+}
+
+fn should_ignore_error(e: &AnyError) -> bool {
+ if let Some(e) = e.downcast_ref::<hyper::Error>() {
+ use std::error::Error;
+ if let Some(std_err) = e.source() {
+ if let Some(io_err) = std_err.downcast_ref::<std::io::Error>() {
+ if io_err.kind() == std::io::ErrorKind::NotConnected {
+ return true;
+ }
+ }
+ }
+ }
+ false
+}
+
+fn op_http_start(
+ state: &mut OpState,
+ tcp_stream_rid: ResourceId,
+ _: (),
+) -> Result<ResourceId, AnyError> {
+ let deno_service = Service::default();
+
+ if let Some(resource_rc) = state
+ .resource_table
+ .take::<TcpStreamResource>(tcp_stream_rid)
+ {
+ let resource = Rc::try_unwrap(resource_rc)
+ .expect("Only a single use of this resource should happen");
+ let (read_half, write_half) = resource.into_inner();
+ let tcp_stream = read_half.reunite(write_half)?;
+ let addr = tcp_stream.local_addr()?;
+ let hyper_connection = Http::new()
+ .with_executor(LocalExecutor)
+ .serve_connection(tcp_stream, deno_service.clone());
+ let conn_resource = ConnResource {
+ hyper_connection: ConnType::Tcp(Rc::new(RefCell::new(hyper_connection))),
+ deno_service,
+ addr,
+ cancel: CancelHandle::default(),
+ };
+ let rid = state.resource_table.add(conn_resource);
+ return Ok(rid);
+ }
+
+ if let Some(resource_rc) = state
+ .resource_table
+ .take::<TlsStreamResource>(tcp_stream_rid)
+ {
+ let resource = Rc::try_unwrap(resource_rc)
+ .expect("Only a single use of this resource should happen");
+ let (read_half, write_half) = resource.into_inner();
+ let tls_stream = read_half.reunite(write_half);
+ let addr = tls_stream.get_ref().0.local_addr()?;
+
+ let hyper_connection = Http::new()
+ .with_executor(LocalExecutor)
+ .serve_connection(tls_stream, deno_service.clone());
+ let conn_resource = ConnResource {
+ hyper_connection: ConnType::Tls(Rc::new(RefCell::new(hyper_connection))),
+ deno_service,
+ addr,
+ cancel: CancelHandle::default(),
+ };
+ let rid = state.resource_table.add(conn_resource);
+ return Ok(rid);
+ }
+
+ Err(bad_resource_id())
+}
+
+// We use a tuple instead of struct to avoid serialization overhead of the keys.
+#[derive(Deserialize)]
+struct RespondArgs(
+ // rid:
+ u32,
+ // status:
+ u16,
+ // headers:
+ Vec<(ByteString, ByteString)>,
+);
+
+async fn op_http_response(
+ state: Rc<RefCell<OpState>>,
+ args: RespondArgs,
+ data: Option<ZeroCopyBuf>,
+) -> Result<Option<ResourceId>, AnyError> {
+ let RespondArgs(rid, status, headers) = args;
+
+ let response_sender = state
+ .borrow_mut()
+ .resource_table
+ .take::<ResponseSenderResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let response_sender = Rc::try_unwrap(response_sender)
+ .ok()
+ .expect("multiple op_http_respond ongoing");
+
+ let conn_resource = state
+ .borrow()
+ .resource_table
+ .get::<ConnResource>(response_sender.conn_rid)
+ .ok_or_else(bad_resource_id)?;
+
+ let mut builder = Response::builder().status(status);
+
+ builder.headers_mut().unwrap().reserve(headers.len());
+ for (key, value) in &headers {
+ builder = builder.header(key.as_ref(), value.as_ref());
+ }
+
+ let res;
+ let maybe_response_body_rid = if let Some(d) = data {
+ // If a body is passed, we use it, and don't return a body for streaming.
+ res = builder.body(Vec::from(&*d).into())?;
+ None
+ } else {
+ // If no body is passed, we return a writer for streaming the body.
+ let (sender, body) = Body::channel();
+ res = builder.body(body)?;
+
+ let response_body_rid =
+ state.borrow_mut().resource_table.add(ResponseBodyResource {
+ body: AsyncRefCell::new(sender),
+ conn_rid: response_sender.conn_rid,
+ });
+
+ Some(response_body_rid)
+ };
+
+ // oneshot::Sender::send(v) returns |v| on error, not an error object.
+ // The only failure mode is the receiver already having dropped its end
+ // of the channel.
+ if response_sender.sender.send(res).is_err() {
+ return Err(type_error("internal communication error"));
+ }
+
+ poll_fn(|cx| match conn_resource.poll(cx) {
+ Poll::Ready(x) => Poll::Ready(x),
+ Poll::Pending => Poll::Ready(Ok(())),
+ })
+ .await?;
+
+ if maybe_response_body_rid.is_none() {
+ conn_resource.deno_service.waker.wake();
+ }
+ Ok(maybe_response_body_rid)
+}
+
+async fn op_http_response_close(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ _: (),
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .take::<ResponseBodyResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+
+ let conn_resource = state
+ .borrow()
+ .resource_table
+ .get::<ConnResource>(resource.conn_rid)
+ .ok_or_else(bad_resource_id)?;
+ drop(resource);
+
+ let r = poll_fn(|cx| match conn_resource.poll(cx) {
+ Poll::Ready(x) => Poll::Ready(x),
+ Poll::Pending => Poll::Ready(Ok(())),
+ })
+ .await;
+ conn_resource.deno_service.waker.wake();
+ r
+}
+
+async fn op_http_request_read(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ data: Option<ZeroCopyBuf>,
+) -> Result<usize, AnyError> {
+ let mut data = data.ok_or_else(null_opbuf)?;
+
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<RequestBodyResource>(rid as u32)
+ .ok_or_else(bad_resource_id)?;
+
+ let conn_resource = state
+ .borrow()
+ .resource_table
+ .get::<ConnResource>(resource.conn_rid)
+ .ok_or_else(bad_resource_id)?;
+
+ let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local();
+
+ poll_fn(|cx| {
+ if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
+ // close ConnResource
+ // close RequestResource associated with connection
+ // close ResponseBodyResource associated with connection
+ return Poll::Ready(Err(e));
+ }
+
+ read_fut.poll_unpin(cx).map_err(AnyError::from)
+ })
+ .await
+}
+
+async fn op_http_response_write(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ data: Option<ZeroCopyBuf>,
+) -> Result<(), AnyError> {
+ let buf = data.ok_or_else(null_opbuf)?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<ResponseBodyResource>(rid as u32)
+ .ok_or_else(bad_resource_id)?;
+
+ let conn_resource = state
+ .borrow()
+ .resource_table
+ .get::<ConnResource>(resource.conn_rid)
+ .ok_or_else(bad_resource_id)?;
+
+ let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
+
+ let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local();
+
+ poll_fn(|cx| {
+ let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from);
+
+ // Poll connection so the data is flushed
+ if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
+ // close ConnResource
+ // close RequestResource associated with connection
+ // close ResponseBodyResource associated with connection
+ return Poll::Ready(Err(e));
+ }
+
+ r
+ })
+ .await?;
+
+ Ok(())
+}
+
+type BytesStream =
+ Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>;
+
+struct RequestBodyResource {
+ conn_rid: ResourceId,
+ reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>,
+ cancel: CancelHandle,
+}
+
+impl Resource for RequestBodyResource {
+ fn name(&self) -> Cow<str> {
+ "requestBody".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel()
+ }
+}
+
+struct ResponseSenderResource {
+ sender: oneshot::Sender<Response<Body>>,
+ conn_rid: ResourceId,
+}
+
+impl Resource for ResponseSenderResource {
+ fn name(&self) -> Cow<str> {
+ "responseSender".into()
+ }
+}
+
+struct ResponseBodyResource {
+ body: AsyncRefCell<hyper::body::Sender>,
+ conn_rid: ResourceId,
+}
+
+impl Resource for ResponseBodyResource {
+ fn name(&self) -> Cow<str> {
+ "responseBody".into()
+ }
+}
+
+// Needed so hyper can use non Send futures
+#[derive(Clone)]
+struct LocalExecutor;
+
+impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor
+where
+ Fut: Future + 'static,
+ Fut::Output: 'static,
+{
+ fn execute(&self, fut: Fut) {
+ tokio::task::spawn_local(fut);
+ }
+}
diff --git a/extensions/net/ops_tls.rs b/extensions/net/ops_tls.rs
new file mode 100644
index 000000000..701c5d1a1
--- /dev/null
+++ b/extensions/net/ops_tls.rs
@@ -0,0 +1,1024 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+pub use rustls;
+pub use webpki;
+
+use crate::io::TcpStreamResource;
+use crate::io::TlsStreamResource;
+use crate::ops::IpAddr;
+use crate::ops::OpAddr;
+use crate::ops::OpConn;
+use crate::resolve_addr::resolve_addr;
+use crate::resolve_addr::resolve_addr_sync;
+use crate::NetPermissions;
+use deno_core::error::bad_resource;
+use deno_core::error::bad_resource_id;
+use deno_core::error::custom_error;
+use deno_core::error::generic_error;
+use deno_core::error::invalid_hostname;
+use deno_core::error::AnyError;
+use deno_core::futures::future::poll_fn;
+use deno_core::futures::ready;
+use deno_core::futures::task::noop_waker_ref;
+use deno_core::futures::task::AtomicWaker;
+use deno_core::futures::task::Context;
+use deno_core::futures::task::Poll;
+use deno_core::futures::task::RawWaker;
+use deno_core::futures::task::RawWakerVTable;
+use deno_core::futures::task::Waker;
+use deno_core::op_async;
+use deno_core::op_sync;
+use deno_core::AsyncRefCell;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
+use deno_core::OpPair;
+use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
+use deno_core::ResourceId;
+use io::Error;
+use io::Read;
+use io::Write;
+use rustls::internal::pemfile::certs;
+use rustls::internal::pemfile::pkcs8_private_keys;
+use rustls::internal::pemfile::rsa_private_keys;
+use rustls::Certificate;
+use rustls::ClientConfig;
+use rustls::ClientSession;
+use rustls::NoClientAuth;
+use rustls::PrivateKey;
+use rustls::ServerConfig;
+use rustls::ServerSession;
+use rustls::Session;
+use rustls::StoresClientSessions;
+use serde::Deserialize;
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::convert::From;
+use std::fs::File;
+use std::io;
+use std::io::BufReader;
+use std::io::ErrorKind;
+use std::ops::Deref;
+use std::ops::DerefMut;
+use std::path::Path;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::sync::Mutex;
+use std::sync::Weak;
+use tokio::io::AsyncRead;
+use tokio::io::AsyncWrite;
+use tokio::io::ReadBuf;
+use tokio::net::TcpListener;
+use tokio::net::TcpStream;
+use tokio::task::spawn_local;
+use webpki::DNSNameRef;
+
+lazy_static::lazy_static! {
+ static ref CLIENT_SESSION_MEMORY_CACHE: Arc<ClientSessionMemoryCache> =
+ Arc::new(ClientSessionMemoryCache::default());
+}
+
+#[derive(Default)]
+struct ClientSessionMemoryCache(Mutex<HashMap<Vec<u8>, Vec<u8>>>);
+
+impl StoresClientSessions for ClientSessionMemoryCache {
+ fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
+ self.0.lock().unwrap().get(key).cloned()
+ }
+
+ fn put(&self, key: Vec<u8>, value: Vec<u8>) -> bool {
+ let mut sessions = self.0.lock().unwrap();
+ // TODO(bnoordhuis) Evict sessions LRU-style instead of arbitrarily.
+ while sessions.len() >= 1024 {
+ let key = sessions.keys().next().unwrap().clone();
+ sessions.remove(&key);
+ }
+ sessions.insert(key, value);
+ true
+ }
+}
+
+#[derive(Debug)]
+enum TlsSession {
+ Client(ClientSession),
+ Server(ServerSession),
+}
+
+impl Deref for TlsSession {
+ type Target = dyn Session;
+
+ fn deref(&self) -> &Self::Target {
+ match self {
+ TlsSession::Client(client_session) => client_session,
+ TlsSession::Server(server_session) => server_session,
+ }
+ }
+}
+
+impl DerefMut for TlsSession {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ match self {
+ TlsSession::Client(client_session) => client_session,
+ TlsSession::Server(server_session) => server_session,
+ }
+ }
+}
+
+impl From<ClientSession> for TlsSession {
+ fn from(client_session: ClientSession) -> Self {
+ TlsSession::Client(client_session)
+ }
+}
+
+impl From<ServerSession> for TlsSession {
+ fn from(server_session: ServerSession) -> Self {
+ TlsSession::Server(server_session)
+ }
+}
+
+#[derive(Copy, Clone, Debug, Eq, PartialEq)]
+enum Flow {
+ Read,
+ Write,
+}
+
+#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
+enum State {
+ StreamOpen,
+ StreamClosed,
+ TlsClosing,
+ TlsClosed,
+ TcpClosed,
+}
+
+#[derive(Debug)]
+pub struct TlsStream(Option<TlsStreamInner>);
+
+impl TlsStream {
+ fn new(tcp: TcpStream, tls: TlsSession) -> Self {
+ let inner = TlsStreamInner {
+ tcp,
+ tls,
+ rd_state: State::StreamOpen,
+ wr_state: State::StreamOpen,
+ };
+ Self(Some(inner))
+ }
+
+ pub fn new_client_side(
+ tcp: TcpStream,
+ tls_config: &Arc<ClientConfig>,
+ hostname: DNSNameRef,
+ ) -> Self {
+ let tls = TlsSession::Client(ClientSession::new(tls_config, hostname));
+ Self::new(tcp, tls)
+ }
+
+ pub fn new_server_side(
+ tcp: TcpStream,
+ tls_config: &Arc<ServerConfig>,
+ ) -> Self {
+ let tls = TlsSession::Server(ServerSession::new(tls_config));
+ Self::new(tcp, tls)
+ }
+
+ pub async fn handshake(&mut self) -> io::Result<()> {
+ poll_fn(|cx| self.inner_mut().poll_io(cx, Flow::Write)).await
+ }
+
+ fn into_split(self) -> (ReadHalf, WriteHalf) {
+ let shared = Shared::new(self);
+ let rd = ReadHalf {
+ shared: shared.clone(),
+ };
+ let wr = WriteHalf { shared };
+ (rd, wr)
+ }
+
+ /// Tokio-rustls compatibility: returns a reference to the underlying TCP
+ /// stream, and a reference to the Rustls `Session` object.
+ pub fn get_ref(&self) -> (&TcpStream, &dyn Session) {
+ let inner = self.0.as_ref().unwrap();
+ (&inner.tcp, &*inner.tls)
+ }
+
+ fn inner_mut(&mut self) -> &mut TlsStreamInner {
+ self.0.as_mut().unwrap()
+ }
+}
+
+impl AsyncRead for TlsStream {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ self.inner_mut().poll_read(cx, buf)
+ }
+}
+
+impl AsyncWrite for TlsStream {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.inner_mut().poll_write(cx, buf)
+ }
+
+ fn poll_flush(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<()>> {
+ self.inner_mut().poll_io(cx, Flow::Write)
+ // The underlying TCP stream does not need to be flushed.
+ }
+
+ fn poll_shutdown(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<()>> {
+ self.inner_mut().poll_shutdown(cx)
+ }
+}
+
+impl Drop for TlsStream {
+ fn drop(&mut self) {
+ let mut inner = self.0.take().unwrap();
+
+ let mut cx = Context::from_waker(noop_waker_ref());
+ let use_linger_task = inner.poll_close(&mut cx).is_pending();
+
+ if use_linger_task {
+ spawn_local(poll_fn(move |cx| inner.poll_close(cx)));
+ } else if cfg!(debug_assertions) {
+ spawn_local(async {}); // Spawn dummy task to detect missing LocalSet.
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct TlsStreamInner {
+ tls: TlsSession,
+ tcp: TcpStream,
+ rd_state: State,
+ wr_state: State,
+}
+
+impl TlsStreamInner {
+ fn poll_io(
+ &mut self,
+ cx: &mut Context<'_>,
+ flow: Flow,
+ ) -> Poll<io::Result<()>> {
+ loop {
+ let wr_ready = loop {
+ match self.wr_state {
+ _ if self.tls.is_handshaking() && !self.tls.wants_write() => {
+ break true;
+ }
+ _ if self.tls.is_handshaking() => {}
+ State::StreamOpen if !self.tls.wants_write() => break true,
+ State::StreamClosed => {
+ // Rustls will enqueue the 'CloseNotify' alert and send it after
+ // flusing the data that is already in the queue.
+ self.tls.send_close_notify();
+ self.wr_state = State::TlsClosing;
+ continue;
+ }
+ State::TlsClosing if !self.tls.wants_write() => {
+ self.wr_state = State::TlsClosed;
+ continue;
+ }
+ // If a 'CloseNotify' alert sent by the remote end has been received,
+ // shut down the underlying TCP socket. Otherwise, consider polling
+ // done for the moment.
+ State::TlsClosed if self.rd_state < State::TlsClosed => break true,
+ State::TlsClosed
+ if Pin::new(&mut self.tcp).poll_shutdown(cx)?.is_pending() =>
+ {
+ break false;
+ }
+ State::TlsClosed => {
+ self.wr_state = State::TcpClosed;
+ continue;
+ }
+ State::TcpClosed => break true,
+ _ => {}
+ }
+
+ // Poll whether there is space in the socket send buffer so we can flush
+ // the remaining outgoing ciphertext.
+ if self.tcp.poll_write_ready(cx)?.is_pending() {
+ break false;
+ }
+
+ // Write ciphertext to the TCP socket.
+ let mut wrapped_tcp = ImplementWriteTrait(&mut self.tcp);
+ match self.tls.write_tls(&mut wrapped_tcp) {
+ Ok(0) => unreachable!(),
+ Ok(_) => {}
+ Err(err) if err.kind() == ErrorKind::WouldBlock => {}
+ Err(err) => return Poll::Ready(Err(err)),
+ }
+ };
+
+ let rd_ready = loop {
+ match self.rd_state {
+ State::TcpClosed if self.tls.is_handshaking() => {
+ let err = Error::new(ErrorKind::UnexpectedEof, "tls handshake eof");
+ return Poll::Ready(Err(err));
+ }
+ _ if self.tls.is_handshaking() && !self.tls.wants_read() => {
+ break true;
+ }
+ _ if self.tls.is_handshaking() => {}
+ State::StreamOpen if !self.tls.wants_read() => break true,
+ State::StreamOpen => {}
+ State::StreamClosed if !self.tls.wants_read() => {
+ // Rustls has more incoming cleartext buffered up, but the TLS
+ // session is closing so this data will never be processed by the
+ // application layer. Just like what would happen if this were a raw
+ // TCP stream, don't gracefully end the TLS session, but abort it.
+ return Poll::Ready(Err(Error::from(ErrorKind::ConnectionReset)));
+ }
+ State::StreamClosed => {}
+ State::TlsClosed if self.wr_state == State::TcpClosed => {
+ // Wait for the remote end to gracefully close the TCP connection.
+ // TODO(piscisaureus): this is unnecessary; remove when stable.
+ }
+ _ => break true,
+ }
+
+ if self.rd_state < State::TlsClosed {
+ // Do a zero-length plaintext read so we can detect the arrival of
+ // 'CloseNotify' messages, even if only the write half is open.
+ // Actually reading data from the socket is done in `poll_read()`.
+ match self.tls.read(&mut []) {
+ Ok(0) => {}
+ Err(err) if err.kind() == ErrorKind::ConnectionAborted => {
+ // `Session::read()` returns `ConnectionAborted` when a
+ // 'CloseNotify' alert has been received, which indicates that
+ // the remote peer wants to gracefully end the TLS session.
+ self.rd_state = State::TlsClosed;
+ continue;
+ }
+ Err(err) => return Poll::Ready(Err(err)),
+ _ => unreachable!(),
+ }
+ }
+
+ // Poll whether more ciphertext is available in the socket receive
+ // buffer.
+ if self.tcp.poll_read_ready(cx)?.is_pending() {
+ break false;
+ }
+
+ // Receive ciphertext from the socket.
+ let mut wrapped_tcp = ImplementReadTrait(&mut self.tcp);
+ match self.tls.read_tls(&mut wrapped_tcp) {
+ Ok(0) => self.rd_state = State::TcpClosed,
+ Ok(_) => self
+ .tls
+ .process_new_packets()
+ .map_err(|err| Error::new(ErrorKind::InvalidData, err))?,
+ Err(err) if err.kind() == ErrorKind::WouldBlock => {}
+ Err(err) => return Poll::Ready(Err(err)),
+ }
+ };
+
+ if wr_ready {
+ if self.rd_state >= State::TlsClosed
+ && self.wr_state >= State::TlsClosed
+ && self.wr_state < State::TcpClosed
+ {
+ continue;
+ }
+ if self.tls.wants_write() {
+ continue;
+ }
+ }
+
+ let io_ready = match flow {
+ _ if self.tls.is_handshaking() => false,
+ Flow::Read => rd_ready,
+ Flow::Write => wr_ready,
+ };
+ return match io_ready {
+ false => Poll::Pending,
+ true => Poll::Ready(Ok(())),
+ };
+ }
+ }
+
+ fn poll_read(
+ &mut self,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ ready!(self.poll_io(cx, Flow::Read))?;
+
+ if self.rd_state == State::StreamOpen {
+ let buf_slice =
+ unsafe { &mut *(buf.unfilled_mut() as *mut [_] as *mut [u8]) };
+ let bytes_read = self.tls.read(buf_slice)?;
+ assert_ne!(bytes_read, 0);
+ unsafe { buf.assume_init(bytes_read) };
+ buf.advance(bytes_read);
+ }
+
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_write(
+ &mut self,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ if buf.is_empty() {
+ // Tokio-rustls compatibility: a zero byte write always succeeds.
+ Poll::Ready(Ok(0))
+ } else if self.wr_state == State::StreamOpen {
+ // Flush Rustls' ciphertext send queue.
+ ready!(self.poll_io(cx, Flow::Write))?;
+
+ // Copy data from `buf` to the Rustls cleartext send queue.
+ let bytes_written = self.tls.write(buf)?;
+ assert_ne!(bytes_written, 0);
+
+ // Try to flush as much ciphertext as possible. However, since we just
+ // handed off at least some bytes to rustls, so we can't return
+ // `Poll::Pending()` any more: this would tell the caller that it should
+ // try to send those bytes again.
+ let _ = self.poll_io(cx, Flow::Write)?;
+
+ Poll::Ready(Ok(bytes_written))
+ } else {
+ // Return error if stream has been shut down for writing.
+ Poll::Ready(Err(ErrorKind::BrokenPipe.into()))
+ }
+ }
+
+ fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ if self.wr_state == State::StreamOpen {
+ self.wr_state = State::StreamClosed;
+ }
+
+ ready!(self.poll_io(cx, Flow::Write))?;
+
+ // At minimum, a TLS 'CloseNotify' alert should have been sent.
+ assert!(self.wr_state >= State::TlsClosed);
+ // If we received a TLS 'CloseNotify' alert from the remote end
+ // already, the TCP socket should be shut down at this point.
+ assert!(
+ self.rd_state < State::TlsClosed || self.wr_state == State::TcpClosed
+ );
+
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ if self.rd_state == State::StreamOpen {
+ self.rd_state = State::StreamClosed;
+ }
+
+ // Send TLS 'CloseNotify' alert.
+ ready!(self.poll_shutdown(cx))?;
+ // Wait for 'CloseNotify', shut down TCP stream, wait for TCP FIN packet.
+ ready!(self.poll_io(cx, Flow::Read))?;
+
+ assert_eq!(self.rd_state, State::TcpClosed);
+ assert_eq!(self.wr_state, State::TcpClosed);
+
+ Poll::Ready(Ok(()))
+ }
+}
+
+#[derive(Debug)]
+pub struct ReadHalf {
+ shared: Arc<Shared>,
+}
+
+impl ReadHalf {
+ pub fn reunite(self, wr: WriteHalf) -> TlsStream {
+ assert!(Arc::ptr_eq(&self.shared, &wr.shared));
+ drop(wr); // Drop `wr`, so only one strong reference to `shared` remains.
+
+ Arc::try_unwrap(self.shared)
+ .unwrap_or_else(|_| panic!("Arc::<Shared>::try_unwrap() failed"))
+ .tls_stream
+ .into_inner()
+ .unwrap()
+ }
+}
+
+impl AsyncRead for ReadHalf {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ self
+ .shared
+ .poll_with_shared_waker(cx, Flow::Read, move |tls, cx| {
+ tls.poll_read(cx, buf)
+ })
+ }
+}
+
+#[derive(Debug)]
+pub struct WriteHalf {
+ shared: Arc<Shared>,
+}
+
+impl AsyncWrite for WriteHalf {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self
+ .shared
+ .poll_with_shared_waker(cx, Flow::Write, move |tls, cx| {
+ tls.poll_write(cx, buf)
+ })
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<()>> {
+ self
+ .shared
+ .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_flush(cx))
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<()>> {
+ self
+ .shared
+ .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_shutdown(cx))
+ }
+}
+
+#[derive(Debug)]
+struct Shared {
+ tls_stream: Mutex<TlsStream>,
+ rd_waker: AtomicWaker,
+ wr_waker: AtomicWaker,
+}
+
+impl Shared {
+ fn new(tls_stream: TlsStream) -> Arc<Self> {
+ let self_ = Self {
+ tls_stream: Mutex::new(tls_stream),
+ rd_waker: AtomicWaker::new(),
+ wr_waker: AtomicWaker::new(),
+ };
+ Arc::new(self_)
+ }
+
+ fn poll_with_shared_waker<R>(
+ self: &Arc<Self>,
+ cx: &mut Context<'_>,
+ flow: Flow,
+ mut f: impl FnMut(Pin<&mut TlsStream>, &mut Context<'_>) -> R,
+ ) -> R {
+ match flow {
+ Flow::Read => self.rd_waker.register(cx.waker()),
+ Flow::Write => self.wr_waker.register(cx.waker()),
+ }
+
+ let shared_waker = self.new_shared_waker();
+ let mut cx = Context::from_waker(&shared_waker);
+
+ let mut tls_stream = self.tls_stream.lock().unwrap();
+ f(Pin::new(&mut tls_stream), &mut cx)
+ }
+
+ const SHARED_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
+ Self::clone_shared_waker,
+ Self::wake_shared_waker,
+ Self::wake_shared_waker_by_ref,
+ Self::drop_shared_waker,
+ );
+
+ fn new_shared_waker(self: &Arc<Self>) -> Waker {
+ let self_weak = Arc::downgrade(self);
+ let self_ptr = self_weak.into_raw() as *const ();
+ let raw_waker = RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE);
+ unsafe { Waker::from_raw(raw_waker) }
+ }
+
+ fn clone_shared_waker(self_ptr: *const ()) -> RawWaker {
+ let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) };
+ let ptr1 = self_weak.clone().into_raw();
+ let ptr2 = self_weak.into_raw();
+ assert!(ptr1 == ptr2);
+ RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE)
+ }
+
+ fn wake_shared_waker(self_ptr: *const ()) {
+ Self::wake_shared_waker_by_ref(self_ptr);
+ Self::drop_shared_waker(self_ptr);
+ }
+
+ fn wake_shared_waker_by_ref(self_ptr: *const ()) {
+ let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) };
+ if let Some(self_arc) = Weak::upgrade(&self_weak) {
+ self_arc.rd_waker.wake();
+ self_arc.wr_waker.wake();
+ }
+ self_weak.into_raw();
+ }
+
+ fn drop_shared_waker(self_ptr: *const ()) {
+ let _ = unsafe { Weak::from_raw(self_ptr as *const Self) };
+ }
+}
+
+struct ImplementReadTrait<'a, T>(&'a mut T);
+
+impl Read for ImplementReadTrait<'_, TcpStream> {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.0.try_read(buf)
+ }
+}
+
+struct ImplementWriteTrait<'a, T>(&'a mut T);
+
+impl Write for ImplementWriteTrait<'_, TcpStream> {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.0.try_write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+pub fn init<P: NetPermissions + 'static>() -> Vec<OpPair> {
+ vec![
+ ("op_start_tls", op_async(op_start_tls::<P>)),
+ ("op_connect_tls", op_async(op_connect_tls::<P>)),
+ ("op_listen_tls", op_sync(op_listen_tls::<P>)),
+ ("op_accept_tls", op_async(op_accept_tls)),
+ ]
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ConnectTlsArgs {
+ transport: String,
+ hostname: String,
+ port: u16,
+ cert_file: Option<String>,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct StartTlsArgs {
+ rid: ResourceId,
+ cert_file: Option<String>,
+ hostname: String,
+}
+
+async fn op_start_tls<NP>(
+ state: Rc<RefCell<OpState>>,
+ args: StartTlsArgs,
+ _: (),
+) -> Result<OpConn, AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ let rid = args.rid;
+ let hostname = match &*args.hostname {
+ "" => "localhost",
+ n => n,
+ };
+ let cert_file = args.cert_file.as_deref();
+
+ {
+ super::check_unstable2(&state, "Deno.startTls");
+ let mut s = state.borrow_mut();
+ let permissions = s.borrow_mut::<NP>();
+ permissions.check_net(&(hostname, Some(0)))?;
+ if let Some(path) = cert_file {
+ permissions.check_read(Path::new(path))?;
+ }
+ }
+
+ let hostname_dns = DNSNameRef::try_from_ascii_str(hostname)
+ .map_err(|_| invalid_hostname(hostname))?;
+
+ let resource_rc = state
+ .borrow_mut()
+ .resource_table
+ .take::<TcpStreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let resource = Rc::try_unwrap(resource_rc)
+ .expect("Only a single use of this resource should happen");
+ let (read_half, write_half) = resource.into_inner();
+ let tcp_stream = read_half.reunite(write_half)?;
+
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+
+ let mut tls_config = ClientConfig::new();
+ tls_config.set_persistence(CLIENT_SESSION_MEMORY_CACHE.clone());
+ tls_config
+ .root_store
+ .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
+ if let Some(path) = cert_file {
+ let key_file = File::open(path)?;
+ let reader = &mut BufReader::new(key_file);
+ tls_config.root_store.add_pem_file(reader).unwrap();
+ }
+ let tls_config = Arc::new(tls_config);
+
+ let tls_stream =
+ TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns);
+
+ let rid = {
+ let mut state_ = state.borrow_mut();
+ state_
+ .resource_table
+ .add(TlsStreamResource::new(tls_stream.into_split()))
+ };
+
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: local_addr.ip().to_string(),
+ port: local_addr.port(),
+ })),
+ remote_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: remote_addr.ip().to_string(),
+ port: remote_addr.port(),
+ })),
+ })
+}
+
+async fn op_connect_tls<NP>(
+ state: Rc<RefCell<OpState>>,
+ args: ConnectTlsArgs,
+ _: (),
+) -> Result<OpConn, AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ assert_eq!(args.transport, "tcp");
+ let hostname = match &*args.hostname {
+ "" => "localhost",
+ n => n,
+ };
+ let port = args.port;
+ let cert_file = args.cert_file.as_deref();
+
+ {
+ let mut s = state.borrow_mut();
+ let permissions = s.borrow_mut::<NP>();
+ permissions.check_net(&(hostname, Some(port)))?;
+ if let Some(path) = cert_file {
+ permissions.check_read(Path::new(path))?;
+ }
+ }
+
+ let hostname_dns = DNSNameRef::try_from_ascii_str(hostname)
+ .map_err(|_| invalid_hostname(hostname))?;
+
+ let connect_addr = resolve_addr(hostname, port)
+ .await?
+ .next()
+ .ok_or_else(|| generic_error("No resolved address found"))?;
+ let tcp_stream = TcpStream::connect(connect_addr).await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+
+ let mut tls_config = ClientConfig::new();
+ tls_config.set_persistence(CLIENT_SESSION_MEMORY_CACHE.clone());
+ tls_config
+ .root_store
+ .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
+ if let Some(path) = cert_file {
+ let key_file = File::open(path)?;
+ let reader = &mut BufReader::new(key_file);
+ tls_config.root_store.add_pem_file(reader).unwrap();
+ }
+ let tls_config = Arc::new(tls_config);
+
+ let tls_stream =
+ TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns);
+
+ let rid = {
+ let mut state_ = state.borrow_mut();
+ state_
+ .resource_table
+ .add(TlsStreamResource::new(tls_stream.into_split()))
+ };
+
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: local_addr.ip().to_string(),
+ port: local_addr.port(),
+ })),
+ remote_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: remote_addr.ip().to_string(),
+ port: remote_addr.port(),
+ })),
+ })
+}
+
+fn load_certs(path: &str) -> Result<Vec<Certificate>, AnyError> {
+ let cert_file = File::open(path)?;
+ let reader = &mut BufReader::new(cert_file);
+
+ let certs = certs(reader)
+ .map_err(|_| custom_error("InvalidData", "Unable to decode certificate"))?;
+
+ if certs.is_empty() {
+ let e = custom_error("InvalidData", "No certificates found in cert file");
+ return Err(e);
+ }
+
+ Ok(certs)
+}
+
+fn key_decode_err() -> AnyError {
+ custom_error("InvalidData", "Unable to decode key")
+}
+
+fn key_not_found_err() -> AnyError {
+ custom_error("InvalidData", "No keys found in key file")
+}
+
+/// Starts with -----BEGIN RSA PRIVATE KEY-----
+fn load_rsa_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> {
+ let key_file = File::open(path)?;
+ let reader = &mut BufReader::new(key_file);
+ let keys = rsa_private_keys(reader).map_err(|_| key_decode_err())?;
+ Ok(keys)
+}
+
+/// Starts with -----BEGIN PRIVATE KEY-----
+fn load_pkcs8_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> {
+ let key_file = File::open(path)?;
+ let reader = &mut BufReader::new(key_file);
+ let keys = pkcs8_private_keys(reader).map_err(|_| key_decode_err())?;
+ Ok(keys)
+}
+
+fn load_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> {
+ let path = path.to_string();
+ let mut keys = load_rsa_keys(&path)?;
+
+ if keys.is_empty() {
+ keys = load_pkcs8_keys(&path)?;
+ }
+
+ if keys.is_empty() {
+ return Err(key_not_found_err());
+ }
+
+ Ok(keys)
+}
+
+pub struct TlsListenerResource {
+ tcp_listener: AsyncRefCell<TcpListener>,
+ tls_config: Arc<ServerConfig>,
+ cancel_handle: CancelHandle,
+}
+
+impl Resource for TlsListenerResource {
+ fn name(&self) -> Cow<str> {
+ "tlsListener".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel_handle.cancel();
+ }
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ListenTlsArgs {
+ transport: String,
+ hostname: String,
+ port: u16,
+ cert_file: String,
+ key_file: String,
+ alpn_protocols: Option<Vec<String>>,
+}
+
+fn op_listen_tls<NP>(
+ state: &mut OpState,
+ args: ListenTlsArgs,
+ _: (),
+) -> Result<OpConn, AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ assert_eq!(args.transport, "tcp");
+ let hostname = &*args.hostname;
+ let port = args.port;
+ let cert_file = &*args.cert_file;
+ let key_file = &*args.key_file;
+
+ {
+ let permissions = state.borrow_mut::<NP>();
+ permissions.check_net(&(hostname, Some(port)))?;
+ permissions.check_read(Path::new(cert_file))?;
+ permissions.check_read(Path::new(key_file))?;
+ }
+
+ let mut tls_config = ServerConfig::new(NoClientAuth::new());
+ if let Some(alpn_protocols) = args.alpn_protocols {
+ super::check_unstable(state, "Deno.listenTls#alpn_protocols");
+ tls_config.alpn_protocols =
+ alpn_protocols.into_iter().map(|s| s.into_bytes()).collect();
+ }
+ tls_config
+ .set_single_cert(load_certs(cert_file)?, load_keys(key_file)?.remove(0))
+ .expect("invalid key or certificate");
+
+ let bind_addr = resolve_addr_sync(hostname, port)?
+ .next()
+ .ok_or_else(|| generic_error("No resolved address found"))?;
+ let std_listener = std::net::TcpListener::bind(bind_addr)?;
+ std_listener.set_nonblocking(true)?;
+ let tcp_listener = TcpListener::from_std(std_listener)?;
+ let local_addr = tcp_listener.local_addr()?;
+
+ let tls_listener_resource = TlsListenerResource {
+ tcp_listener: AsyncRefCell::new(tcp_listener),
+ tls_config: Arc::new(tls_config),
+ cancel_handle: Default::default(),
+ };
+
+ let rid = state.resource_table.add(tls_listener_resource);
+
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: local_addr.ip().to_string(),
+ port: local_addr.port(),
+ })),
+ remote_addr: None,
+ })
+}
+
+async fn op_accept_tls(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ _: (),
+) -> Result<OpConn, AnyError> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<TlsListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+
+ let cancel_handle = RcRef::map(&resource, |r| &r.cancel_handle);
+ let tcp_listener = RcRef::map(&resource, |r| &r.tcp_listener)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
+
+ let (tcp_stream, remote_addr) =
+ match tcp_listener.accept().try_or_cancel(&cancel_handle).await {
+ Ok(tuple) => tuple,
+ Err(err) if err.kind() == ErrorKind::Interrupted => {
+ // FIXME(bartlomieju): compatibility with current JS implementation.
+ return Err(bad_resource("Listener has been closed"));
+ }
+ Err(err) => return Err(err.into()),
+ };
+
+ let local_addr = tcp_stream.local_addr()?;
+
+ let tls_stream = TlsStream::new_server_side(tcp_stream, &resource.tls_config);
+
+ let rid = {
+ let mut state_ = state.borrow_mut();
+ state_
+ .resource_table
+ .add(TlsStreamResource::new(tls_stream.into_split()))
+ };
+
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: local_addr.ip().to_string(),
+ port: local_addr.port(),
+ })),
+ remote_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: remote_addr.ip().to_string(),
+ port: remote_addr.port(),
+ })),
+ })
+}
diff --git a/extensions/net/ops_unix.rs b/extensions/net/ops_unix.rs
new file mode 100644
index 000000000..9dfcc231e
--- /dev/null
+++ b/extensions/net/ops_unix.rs
@@ -0,0 +1,180 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+use crate::io::UnixStreamResource;
+use crate::ops::AcceptArgs;
+use crate::ops::OpAddr;
+use crate::ops::OpConn;
+use crate::ops::OpPacket;
+use crate::ops::ReceiveArgs;
+use deno_core::error::bad_resource;
+use deno_core::error::custom_error;
+use deno_core::error::null_opbuf;
+use deno_core::error::AnyError;
+use deno_core::AsyncRefCell;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
+use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
+use deno_core::ZeroCopyBuf;
+use serde::Deserialize;
+use serde::Serialize;
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::fs::remove_file;
+use std::path::Path;
+use std::rc::Rc;
+use tokio::net::UnixDatagram;
+use tokio::net::UnixListener;
+pub use tokio::net::UnixStream;
+
+/// A utility function to map OsStrings to Strings
+pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> {
+ s.into_string().map_err(|s| {
+ let message = format!("File name or path {:?} is not valid UTF-8", s);
+ custom_error("InvalidData", message)
+ })
+}
+
+struct UnixListenerResource {
+ listener: AsyncRefCell<UnixListener>,
+ cancel: CancelHandle,
+}
+
+impl Resource for UnixListenerResource {
+ fn name(&self) -> Cow<str> {
+ "unixListener".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
+}
+
+pub struct UnixDatagramResource {
+ pub socket: AsyncRefCell<UnixDatagram>,
+ pub cancel: CancelHandle,
+}
+
+impl Resource for UnixDatagramResource {
+ fn name(&self) -> Cow<str> {
+ "unixDatagram".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
+}
+
+#[derive(Serialize)]
+pub struct UnixAddr {
+ pub path: Option<String>,
+}
+
+#[derive(Deserialize)]
+pub struct UnixListenArgs {
+ pub path: String,
+}
+
+pub(crate) async fn accept_unix(
+ state: Rc<RefCell<OpState>>,
+ args: AcceptArgs,
+ _: (),
+) -> Result<OpConn, AnyError> {
+ let rid = args.rid;
+
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<UnixListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let listener = RcRef::map(&resource, |r| &r.listener)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Listener already in use"))?;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let (unix_stream, _socket_addr) =
+ listener.accept().try_or_cancel(cancel).await?;
+
+ let local_addr = unix_stream.local_addr()?;
+ let remote_addr = unix_stream.peer_addr()?;
+ let resource = UnixStreamResource::new(unix_stream.into_split());
+ let mut state = state.borrow_mut();
+ let rid = state.resource_table.add(resource);
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Unix(UnixAddr {
+ path: local_addr.as_pathname().and_then(pathstring),
+ })),
+ remote_addr: Some(OpAddr::Unix(UnixAddr {
+ path: remote_addr.as_pathname().and_then(pathstring),
+ })),
+ })
+}
+
+pub(crate) async fn receive_unix_packet(
+ state: Rc<RefCell<OpState>>,
+ args: ReceiveArgs,
+ buf: Option<ZeroCopyBuf>,
+) -> Result<OpPacket, AnyError> {
+ let mut buf = buf.ok_or_else(null_opbuf)?;
+
+ let rid = args.rid;
+
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<UnixDatagramResource>(rid)
+ .ok_or_else(|| bad_resource("Socket has been closed"))?;
+ let socket = RcRef::map(&resource, |r| &r.socket)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let (size, remote_addr) =
+ socket.recv_from(&mut buf).try_or_cancel(cancel).await?;
+ Ok(OpPacket {
+ size,
+ remote_addr: OpAddr::UnixPacket(UnixAddr {
+ path: remote_addr.as_pathname().and_then(pathstring),
+ }),
+ })
+}
+
+pub fn listen_unix(
+ state: &mut OpState,
+ addr: &Path,
+) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> {
+ if addr.exists() {
+ remove_file(&addr).unwrap();
+ }
+ let listener = UnixListener::bind(&addr)?;
+ let local_addr = listener.local_addr()?;
+ let listener_resource = UnixListenerResource {
+ listener: AsyncRefCell::new(listener),
+ cancel: Default::default(),
+ };
+ let rid = state.resource_table.add(listener_resource);
+
+ Ok((rid, local_addr))
+}
+
+pub fn listen_unix_packet(
+ state: &mut OpState,
+ addr: &Path,
+) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> {
+ if addr.exists() {
+ remove_file(&addr).unwrap();
+ }
+ let socket = UnixDatagram::bind(&addr)?;
+ let local_addr = socket.local_addr()?;
+ let datagram_resource = UnixDatagramResource {
+ socket: AsyncRefCell::new(socket),
+ cancel: Default::default(),
+ };
+ let rid = state.resource_table.add(datagram_resource);
+
+ Ok((rid, local_addr))
+}
+
+pub fn pathstring(pathname: &Path) -> Option<String> {
+ into_string(pathname.into()).ok()
+}
diff --git a/extensions/net/resolve_addr.rs b/extensions/net/resolve_addr.rs
new file mode 100644
index 000000000..ebf1374d1
--- /dev/null
+++ b/extensions/net/resolve_addr.rs
@@ -0,0 +1,156 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+use deno_core::error::AnyError;
+use std::net::SocketAddr;
+use std::net::ToSocketAddrs;
+use tokio::net::lookup_host;
+
+/// Resolve network address *asynchronously*.
+pub async fn resolve_addr(
+ hostname: &str,
+ port: u16,
+) -> Result<impl Iterator<Item = SocketAddr> + '_, AnyError> {
+ let addr_port_pair = make_addr_port_pair(hostname, port);
+ let result = lookup_host(addr_port_pair).await?;
+ Ok(result)
+}
+
+/// Resolve network address *synchronously*.
+pub fn resolve_addr_sync(
+ hostname: &str,
+ port: u16,
+) -> Result<impl Iterator<Item = SocketAddr>, AnyError> {
+ let addr_port_pair = make_addr_port_pair(hostname, port);
+ let result = addr_port_pair.to_socket_addrs()?;
+ Ok(result)
+}
+
+fn make_addr_port_pair(hostname: &str, port: u16) -> (&str, u16) {
+ // Default to localhost if given just the port. Example: ":80"
+ if hostname.is_empty() {
+ return ("0.0.0.0", port);
+ }
+
+ // If this looks like an ipv6 IP address. Example: "[2001:db8::1]"
+ // Then we remove the brackets.
+ let addr = hostname.trim_start_matches('[').trim_end_matches(']');
+ (addr, port)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::net::Ipv4Addr;
+ use std::net::Ipv6Addr;
+ use std::net::SocketAddrV4;
+ use std::net::SocketAddrV6;
+
+ #[tokio::test]
+ async fn resolve_addr1() {
+ let expected = vec![SocketAddr::V4(SocketAddrV4::new(
+ Ipv4Addr::new(127, 0, 0, 1),
+ 80,
+ ))];
+ let actual = resolve_addr("127.0.0.1", 80)
+ .await
+ .unwrap()
+ .collect::<Vec<_>>();
+ assert_eq!(actual, expected);
+ }
+
+ #[tokio::test]
+ async fn resolve_addr2() {
+ let expected = vec![SocketAddr::V4(SocketAddrV4::new(
+ Ipv4Addr::new(0, 0, 0, 0),
+ 80,
+ ))];
+ let actual = resolve_addr("", 80).await.unwrap().collect::<Vec<_>>();
+ assert_eq!(actual, expected);
+ }
+
+ #[tokio::test]
+ async fn resolve_addr3() {
+ let expected = vec![SocketAddr::V4(SocketAddrV4::new(
+ Ipv4Addr::new(192, 0, 2, 1),
+ 25,
+ ))];
+ let actual = resolve_addr("192.0.2.1", 25)
+ .await
+ .unwrap()
+ .collect::<Vec<_>>();
+ assert_eq!(actual, expected);
+ }
+
+ #[tokio::test]
+ async fn resolve_addr_ipv6() {
+ let expected = vec![SocketAddr::V6(SocketAddrV6::new(
+ Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1),
+ 8080,
+ 0,
+ 0,
+ ))];
+ let actual = resolve_addr("[2001:db8::1]", 8080)
+ .await
+ .unwrap()
+ .collect::<Vec<_>>();
+ assert_eq!(actual, expected);
+ }
+
+ #[tokio::test]
+ async fn resolve_addr_err() {
+ assert!(resolve_addr("INVALID ADDR", 1234).await.is_err());
+ }
+
+ #[test]
+ fn resolve_addr_sync1() {
+ let expected = vec![SocketAddr::V4(SocketAddrV4::new(
+ Ipv4Addr::new(127, 0, 0, 1),
+ 80,
+ ))];
+ let actual = resolve_addr_sync("127.0.0.1", 80)
+ .unwrap()
+ .collect::<Vec<_>>();
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn resolve_addr_sync2() {
+ let expected = vec![SocketAddr::V4(SocketAddrV4::new(
+ Ipv4Addr::new(0, 0, 0, 0),
+ 80,
+ ))];
+ let actual = resolve_addr_sync("", 80).unwrap().collect::<Vec<_>>();
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn resolve_addr_sync3() {
+ let expected = vec![SocketAddr::V4(SocketAddrV4::new(
+ Ipv4Addr::new(192, 0, 2, 1),
+ 25,
+ ))];
+ let actual = resolve_addr_sync("192.0.2.1", 25)
+ .unwrap()
+ .collect::<Vec<_>>();
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn resolve_addr_sync_ipv6() {
+ let expected = vec![SocketAddr::V6(SocketAddrV6::new(
+ Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1),
+ 8080,
+ 0,
+ 0,
+ ))];
+ let actual = resolve_addr_sync("[2001:db8::1]", 8080)
+ .unwrap()
+ .collect::<Vec<_>>();
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn resolve_addr_sync_err() {
+ assert!(resolve_addr_sync("INVALID ADDR", 1234).is_err());
+ }
+}