summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/Cargo.toml2
-rw-r--r--cli/js/deno.ts4
-rw-r--r--cli/js/dispatch.ts2
-rw-r--r--cli/js/lib.deno.ns.d.ts52
-rw-r--r--cli/js/net.ts119
-rw-r--r--cli/js/net_test.ts80
-rw-r--r--cli/ops/net.rs174
-rwxr-xr-xtools/http_benchmark.py2
8 files changed, 389 insertions, 46 deletions
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
index 7f825fd41..6055523c0 100644
--- a/cli/Cargo.toml
+++ b/cli/Cargo.toml
@@ -56,7 +56,7 @@ source-map-mappings = "0.5.0"
sys-info = "0.5.8"
tempfile = "3.1.0"
termcolor = "1.0.5"
-tokio = { version = "0.2", features = ["rt-core", "tcp", "process", "fs", "blocking", "sync", "io-std", "macros", "time"] }
+tokio = { version = "0.2", features = ["rt-core", "tcp", "udp", "process", "fs", "blocking", "sync", "io-std", "macros", "time"] }
tokio-rustls = "0.12.1"
url = "2.1.0"
utime = "0.2.1"
diff --git a/cli/js/deno.ts b/cli/js/deno.ts
index 8ccca9096..b86b28911 100644
--- a/cli/js/deno.ts
+++ b/cli/js/deno.ts
@@ -73,8 +73,12 @@ export {
export { metrics, Metrics } from "./metrics.ts";
export { mkdirSync, mkdir } from "./mkdir.ts";
export {
+ Addr,
connect,
listen,
+ recvfrom,
+ UDPConn,
+ UDPAddr,
Listener,
Conn,
ShutdownMode,
diff --git a/cli/js/dispatch.ts b/cli/js/dispatch.ts
index 4322daa99..64a392ab9 100644
--- a/cli/js/dispatch.ts
+++ b/cli/js/dispatch.ts
@@ -30,6 +30,8 @@ export let OP_REPL_START: number;
export let OP_REPL_READLINE: number;
export let OP_ACCEPT: number;
export let OP_ACCEPT_TLS: number;
+export let OP_RECEIVE: number;
+export let OP_SEND: number;
export let OP_CONNECT: number;
export let OP_SHUTDOWN: number;
export let OP_LISTEN: number;
diff --git a/cli/js/lib.deno.ns.d.ts b/cli/js/lib.deno.ns.d.ts
index fda2270a8..1839c813a 100644
--- a/cli/js/lib.deno.ns.d.ts
+++ b/cli/js/lib.deno.ns.d.ts
@@ -1387,14 +1387,20 @@ declare namespace Deno {
*/
export function openPlugin(filename: string): Plugin;
- type Transport = "tcp";
+ export type Transport = "tcp" | "udp";
- interface Addr {
+ export interface Addr {
transport: Transport;
hostname: string;
port: number;
}
+ export interface UDPAddr {
+ transport?: Transport;
+ hostname?: string;
+ port: number;
+ }
+
/** UNSTABLE: Maybe remove ShutdownMode entirely. */
export enum ShutdownMode {
// See http://man7.org/linux/man-pages/man2/shutdown.2.html
@@ -1417,6 +1423,36 @@ declare namespace Deno {
*/
export function shutdown(rid: number, how: ShutdownMode): void;
+ /** UNSTABLE: new API
+ * Waits for the next message to the passed rid and writes it on the passed buffer.
+ * Returns the number of bytes written and the remote address.
+ */
+ export function recvfrom(rid: number, p: Uint8Array): Promise<[number, Addr]>;
+
+ /** UNSTABLE: new API
+ * A socket is a generic transport listener for message-oriented protocols
+ */
+ export interface UDPConn extends AsyncIterator<[Uint8Array, Addr]> {
+ /** UNSTABLE: new API
+ * Waits for and resolves to the next message to the `Socket`. */
+ receive(p?: Uint8Array): Promise<[Uint8Array, Addr]>;
+
+ /** UNSTABLE: new API
+ * Sends a message to the target. */
+ send(p: Uint8Array, addr: UDPAddr): Promise<void>;
+
+ /** UNSTABLE: new API
+ * Close closes the socket. Any pending message promises will be rejected
+ * with errors.
+ */
+ close(): void;
+
+ /** Return the address of the `Socket`. */
+ addr: Addr;
+
+ [Symbol.asyncIterator](): AsyncIterator<[Uint8Array, Addr]>;
+ }
+
/** A Listener is a generic network listener for stream-oriented protocols. */
export interface Listener extends AsyncIterator<Conn> {
/** Waits for and resolves to the next connection to the `Listener`. */
@@ -1457,7 +1493,9 @@ declare namespace Deno {
transport?: Transport;
}
- /** Listen announces on the local transport address.
+ /** UNSTABLE: new API
+ *
+ * Listen announces on the local transport address.
*
* Requires the allow-net permission.
*
@@ -1476,7 +1514,13 @@ declare namespace Deno {
* listen({ hostname: "[2001:db8::1]", port: 80 });
* listen({ hostname: "golang.org", port: 80, transport: "tcp" })
*/
- export function listen(options: ListenOptions): Listener;
+ export function listen(
+ options: ListenOptions & { transport?: "tcp" }
+ ): Listener;
+ export function listen(
+ options: ListenOptions & { transport: "udp" }
+ ): UDPConn;
+ export function listen(options: ListenOptions): Listener | UDPConn;
export interface ListenTLSOptions {
port: number;
diff --git a/cli/js/net.ts b/cli/js/net.ts
index a89468f02..9d82a3a3f 100644
--- a/cli/js/net.ts
+++ b/cli/js/net.ts
@@ -4,7 +4,7 @@ import { read, write, close } from "./files.ts";
import * as dispatch from "./dispatch.ts";
import { sendSync, sendAsync } from "./dispatch_json.ts";
-export type Transport = "tcp";
+export type Transport = "tcp" | "udp";
// TODO support other types:
// export type Transport = "tcp" | "tcp4" | "tcp6" | "unix" | "unixpacket";
@@ -14,6 +14,31 @@ export interface Addr {
port: number;
}
+export interface UDPAddr {
+ transport?: Transport;
+ hostname?: string;
+ port: number;
+}
+
+/** A socket is a generic transport listener for message-oriented protocols */
+export interface UDPConn extends AsyncIterator<[Uint8Array, Addr]> {
+ /** Waits for and resolves to the next message to the `Socket`. */
+ receive(p?: Uint8Array): Promise<[Uint8Array, Addr]>;
+
+ /** Sends a message to the target. */
+ send(p: Uint8Array, addr: UDPAddr): Promise<void>;
+
+ /** Close closes the socket. Any pending message promises will be rejected
+ * with errors.
+ */
+ close(): void;
+
+ /** Return the address of the `Socket`. */
+ addr: Addr;
+
+ [Symbol.asyncIterator](): AsyncIterator<[Uint8Array, Addr]>;
+}
+
/** A Listener is a generic transport listener for stream-oriented protocols. */
export interface Listener extends AsyncIterator<Conn> {
/** Waits for and resolves to the next connection to the `Listener`. */
@@ -87,7 +112,7 @@ export class ConnImpl implements Conn {
export class ListenerImpl implements Listener {
constructor(
readonly rid: number,
- public addr: Addr,
+ readonly addr: Addr,
private closing: boolean = false
) {}
@@ -123,6 +148,63 @@ export class ListenerImpl implements Listener {
}
}
+export async function recvfrom(
+ rid: number,
+ p: Uint8Array
+): Promise<[number, Addr]> {
+ const { size, remoteAddr } = await sendAsync(dispatch.OP_RECEIVE, { rid }, p);
+ return [size, remoteAddr];
+}
+
+export class UDPConnImpl implements UDPConn {
+ constructor(
+ readonly rid: number,
+ readonly addr: Addr,
+ public bufSize: number = 1024,
+ private closing: boolean = false
+ ) {}
+
+ async receive(p?: Uint8Array): Promise<[Uint8Array, Addr]> {
+ const buf = p || new Uint8Array(this.bufSize);
+ const [size, remoteAddr] = await recvfrom(this.rid, buf);
+ const sub = buf.subarray(0, size);
+ return [sub, remoteAddr];
+ }
+
+ async send(p: Uint8Array, addr: UDPAddr): Promise<void> {
+ const remote = { hostname: "127.0.0.1", transport: "udp", ...addr };
+ if (remote.transport !== "udp") throw Error("Remote transport must be UDP");
+ const args = { ...remote, rid: this.rid };
+ await sendAsync(dispatch.OP_SEND, args, p);
+ }
+
+ close(): void {
+ this.closing = true;
+ close(this.rid);
+ }
+
+ async next(): Promise<IteratorResult<[Uint8Array, Addr]>> {
+ if (this.closing) {
+ return { value: undefined, done: true };
+ }
+ return await this.receive()
+ .then(value => ({ value, done: false }))
+ .catch(e => {
+ // It wouldn't be correct to simply check this.closing here.
+ // TODO: Get a proper error kind for this case, don't check the message.
+ // The current error kind is Other.
+ if (e.message == "Socket has been closed") {
+ return { value: undefined, done: true };
+ }
+ throw e;
+ });
+ }
+
+ [Symbol.asyncIterator](): AsyncIterator<[Uint8Array, Addr]> {
+ return this;
+ }
+}
+
export interface Conn extends Reader, Writer, Closer {
/** The local address of the connection. */
localAddr: Addr;
@@ -146,14 +228,16 @@ export interface ListenOptions {
transport?: Transport;
}
+const listenDefaults = { hostname: "0.0.0.0", transport: "tcp" };
+
/** Listen announces on the local transport address.
*
* @param options
* @param options.port The port to connect to. (Required.)
* @param options.hostname A literal IP address or host name that can be
* resolved to an IP address. If not specified, defaults to 0.0.0.0
- * @param options.transport Defaults to "tcp". Later we plan to add "tcp4",
- * "tcp6", "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unix", "unixgram" and
+ * @param options.transport Must be "tcp" or "udp". Defaults to "tcp". Later we plan to add "tcp4",
+ * "tcp6", "udp4", "udp6", "ip", "ip4", "ip6", "unix", "unixgram" and
* "unixpacket".
*
* Examples:
@@ -163,16 +247,19 @@ export interface ListenOptions {
* listen({ hostname: "[2001:db8::1]", port: 80 });
* listen({ hostname: "golang.org", port: 80, transport: "tcp" })
*/
-export function listen(options: ListenOptions): Listener {
- const hostname = options.hostname || "0.0.0.0";
- const transport = options.transport || "tcp";
-
- const res = sendSync(dispatch.OP_LISTEN, {
- hostname,
- port: options.port,
- transport
- });
- return new ListenerImpl(res.rid, res.localAddr);
+export function listen(
+ options: ListenOptions & { transport?: "tcp" }
+): Listener;
+export function listen(options: ListenOptions & { transport: "udp" }): UDPConn;
+export function listen(options: ListenOptions): Listener | UDPConn {
+ const args = { ...listenDefaults, ...options };
+ const res = sendSync(dispatch.OP_LISTEN, args);
+
+ if (args.transport === "tcp") {
+ return new ListenerImpl(res.rid, res.localAddr);
+ } else {
+ return new UDPConnImpl(res.rid, res.localAddr);
+ }
}
export interface ConnectOptions {
@@ -189,8 +276,8 @@ const connectDefaults = { hostname: "127.0.0.1", transport: "tcp" };
* @param options.port The port to connect to. (Required.)
* @param options.hostname A literal IP address or host name that can be
* resolved to an IP address. If not specified, defaults to 127.0.0.1
- * @param options.transport Defaults to "tcp". Later we plan to add "tcp4",
- * "tcp6", "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unix", "unixgram" and
+ * @param options.transport Must be "tcp" or "udp". Defaults to "tcp". Later we plan to add "tcp4",
+ * "tcp6", "udp4", "udp6", "ip", "ip4", "ip6", "unix", "unixgram" and
* "unixpacket".
*
* Examples:
diff --git a/cli/js/net_test.ts b/cli/js/net_test.ts
index a2f086f0a..75bce2e52 100644
--- a/cli/js/net_test.ts
+++ b/cli/js/net_test.ts
@@ -1,7 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import { testPerm, assert, assertEquals } from "./test_util.ts";
-testPerm({ net: true }, function netListenClose(): void {
+testPerm({ net: true }, function netTcpListenClose(): void {
const listener = Deno.listen({ hostname: "127.0.0.1", port: 4500 });
assertEquals(listener.addr.transport, "tcp");
assertEquals(listener.addr.hostname, "127.0.0.1");
@@ -9,7 +9,21 @@ testPerm({ net: true }, function netListenClose(): void {
listener.close();
});
-testPerm({ net: true }, async function netCloseWhileAccept(): Promise<void> {
+testPerm({ net: true }, function netUdpListenClose(): void {
+ if (Deno.build.os === "win") return; // TODO
+
+ const socket = Deno.listen({
+ hostname: "127.0.0.1",
+ port: 4500,
+ transport: "udp"
+ });
+ assertEquals(socket.addr.transport, "udp");
+ assertEquals(socket.addr.hostname, "127.0.0.1");
+ assertEquals(socket.addr.port, 4500);
+ socket.close();
+});
+
+testPerm({ net: true }, async function netTcpCloseWhileAccept(): Promise<void> {
const listener = Deno.listen({ port: 4501 });
const p = listener.accept();
listener.close();
@@ -24,7 +38,7 @@ testPerm({ net: true }, async function netCloseWhileAccept(): Promise<void> {
assertEquals(err.message, "Listener has been closed");
});
-testPerm({ net: true }, async function netConcurrentAccept(): Promise<void> {
+testPerm({ net: true }, async function netTcpConcurrentAccept(): Promise<void> {
const listener = Deno.listen({ port: 4502 });
let acceptErrCount = 0;
const checkErr = (e: Error): void => {
@@ -44,7 +58,7 @@ testPerm({ net: true }, async function netConcurrentAccept(): Promise<void> {
assertEquals(acceptErrCount, 1);
});
-testPerm({ net: true }, async function netDialListen(): Promise<void> {
+testPerm({ net: true }, async function netTcpDialListen(): Promise<void> {
const listener = Deno.listen({ port: 4500 });
listener.accept().then(
async (conn): Promise<void> => {
@@ -76,18 +90,58 @@ testPerm({ net: true }, async function netDialListen(): Promise<void> {
conn.close();
});
-testPerm({ net: true }, async function netListenCloseWhileIterating(): Promise<
- void
-> {
- const listener = Deno.listen({ port: 8000 });
- const nextWhileClosing = listener[Symbol.asyncIterator]().next();
- listener.close();
- assertEquals(await nextWhileClosing, { value: undefined, done: true });
+testPerm({ net: true }, async function netUdpSendReceive(): Promise<void> {
+ if (Deno.build.os === "win") return; // TODO
+
+ const alice = Deno.listen({ port: 4500, transport: "udp" });
+ assertEquals(alice.addr.port, 4500);
+ assertEquals(alice.addr.hostname, "0.0.0.0");
+ assertEquals(alice.addr.transport, "udp");
+
+ const bob = Deno.listen({ port: 4501, transport: "udp" });
+ assertEquals(bob.addr.port, 4501);
+ assertEquals(bob.addr.hostname, "0.0.0.0");
+ assertEquals(bob.addr.transport, "udp");
+
+ const sent = new Uint8Array([1, 2, 3]);
+ await alice.send(sent, bob.addr);
- const nextAfterClosing = listener[Symbol.asyncIterator]().next();
- assertEquals(await nextAfterClosing, { value: undefined, done: true });
+ const [recvd, remote] = await bob.receive();
+ assertEquals(remote.port, 4500);
+ assertEquals(recvd.length, 3);
+ assertEquals(1, recvd[0]);
+ assertEquals(2, recvd[1]);
+ assertEquals(3, recvd[2]);
});
+testPerm(
+ { net: true },
+ async function netTcpListenCloseWhileIterating(): Promise<void> {
+ const listener = Deno.listen({ port: 8000 });
+ const nextWhileClosing = listener[Symbol.asyncIterator]().next();
+ listener.close();
+ assertEquals(await nextWhileClosing, { value: undefined, done: true });
+
+ const nextAfterClosing = listener[Symbol.asyncIterator]().next();
+ assertEquals(await nextAfterClosing, { value: undefined, done: true });
+ }
+);
+
+testPerm(
+ { net: true },
+ async function netUdpListenCloseWhileIterating(): Promise<void> {
+ if (Deno.build.os === "win") return; // TODO
+
+ const socket = Deno.listen({ port: 8000, transport: "udp" });
+ const nextWhileClosing = socket[Symbol.asyncIterator]().next();
+ socket.close();
+ assertEquals(await nextWhileClosing, { value: undefined, done: true });
+
+ const nextAfterClosing = socket[Symbol.asyncIterator]().next();
+ assertEquals(await nextAfterClosing, { value: undefined, done: true });
+ }
+);
+
/* TODO(ry) Re-enable this test.
testPerm({ net: true }, async function netListenAsyncIterator(): Promise<void> {
const listener = Deno.listen(":4500");
diff --git a/cli/ops/net.rs b/cli/ops/net.rs
index 569aebca0..c8fd5d398 100644
--- a/cli/ops/net.rs
+++ b/cli/ops/net.rs
@@ -18,12 +18,15 @@ use std::task::Poll;
use tokio;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
+use tokio::net::UdpSocket;
pub fn init(i: &mut Isolate, s: &State) {
i.register_op("accept", s.core_op(json_op(s.stateful_op(op_accept))));
i.register_op("connect", s.core_op(json_op(s.stateful_op(op_connect))));
i.register_op("shutdown", s.core_op(json_op(s.stateful_op(op_shutdown))));
i.register_op("listen", s.core_op(json_op(s.stateful_op(op_listen))));
+ i.register_op("receive", s.core_op(json_op(s.stateful_op(op_receive))));
+ i.register_op("send", s.core_op(json_op(s.stateful_op(op_send))));
}
#[derive(Debug, PartialEq)]
@@ -137,6 +140,121 @@ fn op_accept(
Ok(JsonOp::Async(op.boxed_local()))
}
+pub struct Receive<'a> {
+ state: &'a State,
+ rid: ResourceId,
+ buf: ZeroCopyBuf,
+}
+
+impl Future for Receive<'_> {
+ type Output = Result<(usize, SocketAddr), ErrBox>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ let mut state = inner.state.borrow_mut();
+ let resource = state
+ .resource_table
+ .get_mut::<UdpSocketResource>(inner.rid)
+ .ok_or_else(|| {
+ let e = std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "Socket has been closed",
+ );
+ ErrBox::from(e)
+ })?;
+
+ let socket = &mut resource.socket;
+
+ socket
+ .poll_recv_from(cx, &mut inner.buf)
+ .map_err(ErrBox::from)
+ }
+}
+
+#[derive(Deserialize)]
+struct ReceiveArgs {
+ rid: i32,
+}
+
+fn receive(state: &State, rid: ResourceId, buf: ZeroCopyBuf) -> Receive {
+ Receive { state, rid, buf }
+}
+
+fn op_receive(
+ state: &State,
+ args: Value,
+ zero_copy: Option<ZeroCopyBuf>,
+) -> Result<JsonOp, ErrBox> {
+ assert!(zero_copy.is_some());
+ let buf = zero_copy.unwrap();
+
+ let args: ReceiveArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
+
+ let state_ = state.clone();
+
+ let op = async move {
+ let (size, remote_addr) = receive(&state_, rid, buf).await?;
+
+ Ok(json!({
+ "size": size,
+ "remoteAddr": {
+ "hostname": remote_addr.ip().to_string(),
+ "port": remote_addr.port(),
+ "transport": "udp",
+ }
+ }))
+ };
+
+ Ok(JsonOp::Async(op.boxed_local()))
+}
+
+#[derive(Deserialize)]
+struct SendArgs {
+ rid: i32,
+ hostname: String,
+ port: u16,
+ transport: String,
+}
+
+fn op_send(
+ state: &State,
+ args: Value,
+ zero_copy: Option<ZeroCopyBuf>,
+) -> Result<JsonOp, ErrBox> {
+ assert!(zero_copy.is_some());
+ let buf = zero_copy.unwrap();
+
+ let args: SendArgs = serde_json::from_value(args)?;
+ assert_eq!(args.transport, "udp");
+ let rid = args.rid as u32;
+
+ let state_ = state.clone();
+ state.check_net(&args.hostname, args.port)?;
+
+ let op = async move {
+ let mut state = state_.borrow_mut();
+ let resource = state
+ .resource_table
+ .get_mut::<UdpSocketResource>(rid)
+ .ok_or_else(|| {
+ let e = std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "Socket has been closed",
+ );
+ ErrBox::from(e)
+ })?;
+
+ let socket = &mut resource.socket;
+ let addr = resolve_addr(&args.hostname, args.port).await?;
+ socket.send_to(&buf, addr).await?;
+
+ Ok(json!({}))
+ };
+
+ Ok(JsonOp::Async(op.boxed_local()))
+}
+
#[derive(Deserialize)]
struct ConnectArgs {
transport: String,
@@ -278,29 +396,63 @@ impl TcpListenerResource {
}
}
+struct UdpSocketResource {
+ socket: UdpSocket,
+}
+
+fn listen_tcp(
+ state: &State,
+ addr: SocketAddr,
+) -> Result<(u32, SocketAddr), ErrBox> {
+ let mut state = state.borrow_mut();
+ let listener = futures::executor::block_on(TcpListener::bind(&addr))?;
+ let local_addr = listener.local_addr()?;
+ let listener_resource = TcpListenerResource {
+ listener,
+ waker: None,
+ local_addr,
+ };
+ let rid = state
+ .resource_table
+ .add("tcpListener", Box::new(listener_resource));
+
+ Ok((rid, local_addr))
+}
+
+fn listen_udp(
+ state: &State,
+ addr: SocketAddr,
+) -> Result<(u32, SocketAddr), ErrBox> {
+ let mut state = state.borrow_mut();
+ let socket = futures::executor::block_on(UdpSocket::bind(&addr))?;
+ let local_addr = socket.local_addr()?;
+ let socket_resource = UdpSocketResource { socket };
+ let rid = state
+ .resource_table
+ .add("udpSocket", Box::new(socket_resource));
+
+ Ok((rid, local_addr))
+}
+
fn op_listen(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
let args: ListenArgs = serde_json::from_value(args)?;
- assert_eq!(args.transport, "tcp");
+ assert!(args.transport == "tcp" || args.transport == "udp");
state.check_net(&args.hostname, args.port)?;
let addr =
futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
- let listener = futures::executor::block_on(TcpListener::bind(&addr))?;
- let local_addr = listener.local_addr()?;
- let listener_resource = TcpListenerResource {
- listener,
- waker: None,
- local_addr,
+
+ let (rid, local_addr) = if args.transport == "tcp" {
+ listen_tcp(state, addr)?
+ } else {
+ listen_udp(state, addr)?
};
- let mut state = state.borrow_mut();
- let rid = state
- .resource_table
- .add("tcpListener", Box::new(listener_resource));
+
debug!(
"New listener {} {}:{}",
rid,
diff --git a/tools/http_benchmark.py b/tools/http_benchmark.py
index 05cd542e3..64abbf8ba 100755
--- a/tools/http_benchmark.py
+++ b/tools/http_benchmark.py
@@ -138,7 +138,7 @@ def http_benchmark(build_dir):
return {
# "deno_tcp" was once called "deno"
"deno_tcp": deno_tcp(deno_exe),
- # "deno_http" was once called "deno_net_http"
+ # "deno_udp": deno_udp(deno_exe),
"deno_http": deno_http(deno_exe),
"deno_proxy": deno_http_proxy(deno_exe, hyper_hello_exe),
"deno_proxy_tcp": deno_tcp_proxy(deno_exe, hyper_hello_exe),