diff options
author | João Souto <joao.jpgs@hotmail.com> | 2020-03-23 22:02:51 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-23 18:02:51 -0400 |
commit | 70a50344315a4c3361fc321e78e076fb09a502b3 (patch) | |
tree | 1079d325ec898afb7829ac1888ed395ed2ac35d2 | |
parent | b924e5ab7e69eab4d3b6d9a863a8fc2974f33b5d (diff) |
feat: Support Unix Domain Sockets (#4176)
-rw-r--r-- | cli/Cargo.toml | 2 | ||||
-rw-r--r-- | cli/js/deno.ts | 5 | ||||
-rw-r--r-- | cli/js/lib.deno.ns.d.ts | 88 | ||||
-rw-r--r-- | cli/js/net.ts | 117 | ||||
-rw-r--r-- | cli/js/ops/net.ts | 80 | ||||
-rw-r--r-- | cli/js/ops/tls.ts | 15 | ||||
-rw-r--r-- | cli/js/tests/net_test.ts | 181 | ||||
-rw-r--r-- | cli/js/tls.ts | 6 | ||||
-rw-r--r-- | cli/ops/io.rs | 8 | ||||
-rw-r--r-- | cli/ops/mod.rs | 2 | ||||
-rw-r--r-- | cli/ops/net.rs | 361 | ||||
-rw-r--r-- | cli/ops/net_unix.rs | 142 | ||||
-rw-r--r-- | tools/deno_tcp.ts | 1 |
13 files changed, 740 insertions, 268 deletions
diff --git a/cli/Cargo.toml b/cli/Cargo.toml index dac161731..c7a2fbf75 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -54,7 +54,7 @@ sys-info = "=0.5.8" # 0.5.9 and 0.5.10 are broken on windows. sourcemap = "5.0.0" tempfile = "3.1.0" termcolor = "1.1.0" -tokio = { version = "0.2.13", features = ["rt-core", "tcp", "udp", "process", "fs", "blocking", "sync", "io-std", "macros", "time"] } +tokio = { version = "0.2.13", features = ["rt-core", "tcp", "udp", "uds", "process", "fs", "blocking", "sync", "io-std", "macros", "time"] } tokio-rustls = "0.13.0" url = "2.1.1" utime = "0.2.1" diff --git a/cli/js/deno.ts b/cli/js/deno.ts index 6a493faf8..f42e39754 100644 --- a/cli/js/deno.ts +++ b/cli/js/deno.ts @@ -70,12 +70,9 @@ export { export { metrics, Metrics } from "./ops/runtime.ts"; export { mkdirSync, mkdir, MkdirOptions } from "./ops/fs/mkdir.ts"; export { - Addr, connect, listen, - recvfrom, - UDPConn, - UDPAddr, + DatagramConn, Listener, Conn, ShutdownMode, diff --git a/cli/js/lib.deno.ns.d.ts b/cli/js/lib.deno.ns.d.ts index 99e40014c..72794e3bf 100644 --- a/cli/js/lib.deno.ns.d.ts +++ b/cli/js/lib.deno.ns.d.ts @@ -1546,21 +1546,18 @@ declare namespace Deno { * * Requires `allow-plugin` permission. */ export function openPlugin(filename: string): Plugin; - - export type Transport = "tcp" | "udp"; - - export interface Addr { - transport: Transport; + export interface NetAddr { + transport: "tcp" | "udp"; hostname: string; port: number; } - export interface UDPAddr { - port: number; - transport?: Transport; - hostname?: string; + export interface UnixAddr { + transport: "unix" | "unixpacket"; + address: string; } + export type Addr = NetAddr | UnixAddr; /** **UNSTABLE**: Maybe remove `ShutdownMode` entirely. * * Corresponds to `SHUT_RD`, `SHUT_WR`, `SHUT_RDWR` on POSIX-like systems. @@ -1587,16 +1584,8 @@ declare namespace Deno { /** **UNSTABLE**: new API, yet to be vetted. * - * Waits for the next message to the passed `rid` and writes it on the passed - * `Uint8Array`. - * - * Resolves to the number of bytes written and the remote address. */ - export function recvfrom(rid: number, p: Uint8Array): Promise<[number, Addr]>; - - /** **UNSTABLE**: new API, yet to be vetted. - * * A generic transport listener for message-oriented protocols. */ - export interface UDPConn extends AsyncIterable<[Uint8Array, Addr]> { + export interface DatagramConn extends AsyncIterable<[Uint8Array, Addr]> { /** **UNSTABLE**: new API, yet to be vetted. * * Waits for and resolves to the next message to the `UDPConn`. */ @@ -1604,7 +1593,7 @@ declare namespace Deno { /** UNSTABLE: new API, yet to be vetted. * * Sends a message to the target. */ - send(p: Uint8Array, addr: UDPAddr): Promise<void>; + send(p: Uint8Array, addr: Addr): Promise<void>; /** UNSTABLE: new API, yet to be vetted. * * Close closes the socket. Any pending message promises will be rejected @@ -1624,6 +1613,7 @@ declare namespace Deno { close(): void; /** Return the address of the `Listener`. */ readonly addr: Addr; + [Symbol.asyncIterator](): AsyncIterator<Conn>; } @@ -1648,13 +1638,12 @@ declare namespace Deno { /** A literal IP address or host name that can be resolved to an IP address. * If not specified, defaults to `0.0.0.0`. */ hostname?: string; - /** Either `"tcp"` or `"udp"`. Defaults to `"tcp"`. - * - * In the future: `"tcp4"`, `"tcp6"`, `"udp4"`, `"udp6"`, `"ip"`, `"ip4"`, - * `"ip6"`, `"unix"`, `"unixgram"`, and `"unixpacket"`. */ - transport?: Transport; } + export interface UnixListenOptions { + /** A Path to the Unix Socket. */ + address: string; + } /** **UNSTABLE**: new API * * Listen announces on the local transport address. @@ -1672,32 +1661,41 @@ declare namespace Deno { * * Listen announces on the local transport address. * - * Deno.listen({ port: 80 }) - * Deno.listen({ hostname: "192.0.2.1", port: 80 }) - * Deno.listen({ hostname: "[2001:db8::1]", port: 80 }); - * Deno.listen({ hostname: "golang.org", port: 80, transport: "tcp" }); + * Deno.listen({ address: "/foo/bar.sock", transport: "unix" }) + * + * Requires `allow-read` permission. */ + export function listen( + options: UnixListenOptions & { transport: "unix" } + ): Listener; + /** **UNSTABLE**: new API + * + * Listen announces on the local transport address. + * + * Deno.listen({ port: 80, transport: "udp" }) + * Deno.listen({ hostname: "golang.org", port: 80, transport: "udp" }); * * Requires `allow-net` permission. */ export function listen( options: ListenOptions & { transport: "udp" } - ): UDPConn; + ): DatagramConn; /** **UNSTABLE**: new API * * Listen announces on the local transport address. * - * Deno.listen({ port: 80 }) - * Deno.listen({ hostname: "192.0.2.1", port: 80 }) - * Deno.listen({ hostname: "[2001:db8::1]", port: 80 }); - * Deno.listen({ hostname: "golang.org", port: 80, transport: "tcp" }); + * Deno.listen({ address: "/foo/bar.sock", transport: "unixpacket" }) * - * Requires `allow-net` permission. */ - export function listen(options: ListenOptions): Listener | UDPConn; + * Requires `allow-read` permission. */ + export function listen( + options: UnixListenOptions & { transport: "unixpacket" } + ): DatagramConn; export interface ListenTLSOptions extends ListenOptions { /** Server certificate file. */ certFile: string; /** Server public key file. */ keyFile: string; + + transport?: "tcp"; } /** Listen announces on the local transport address over TLS (transport layer @@ -1714,11 +1712,12 @@ declare namespace Deno { /** A literal IP address or host name that can be resolved to an IP address. * If not specified, defaults to `127.0.0.1`. */ hostname?: string; - /** Either `"tcp"` or `"udp"`. Defaults to `"tcp"`. - * - * In the future: `"tcp4"`, `"tcp6"`, `"udp4"`, `"udp6"`, `"ip"`, `"ip4"`, - * `"ip6"`, `"unix"`, `"unixgram"`, and `"unixpacket"`. */ - transport?: Transport; + transport?: "tcp"; + } + + export interface UnixConnectOptions { + transport: "unix"; + address: string; } /** @@ -1728,10 +1727,13 @@ declare namespace Deno { * const conn1 = await Deno.connect({ port: 80 }) * const conn2 = await Deno.connect({ hostname: "192.0.2.1", port: 80 }) * const conn3 = await Deno.connect({ hostname: "[2001:db8::1]", port: 80 }); - * const conn4 = await Deno.connect({ hostname: "golang.org", port: 80, transport: "tcp" }) + * const conn4 = await Deno.connect({ hostname: "golang.org", port: 80, transport: "tcp" }); + * const conn5 = await Deno.connect({ address: "/foo/bar.sock", transport: "unix" }); * - * Requires `allow-net` permission. */ - export function connect(options: ConnectOptions): Promise<Conn>; + * Requires `allow-net` permission for "tcp" and `allow-read` for unix. */ + export function connect( + options: ConnectOptions | UnixConnectOptions + ): Promise<Conn>; export interface ConnectTLSOptions { /** The port to connect to. */ diff --git a/cli/js/net.ts b/cli/js/net.ts index 570bada49..9be97dc2e 100644 --- a/cli/js/net.ts +++ b/cli/js/net.ts @@ -4,25 +4,13 @@ import { EOF, Reader, Writer, Closer } from "./io.ts"; import { read, write } from "./ops/io.ts"; import { close } from "./ops/resources.ts"; import * as netOps from "./ops/net.ts"; -import { Transport } from "./ops/net.ts"; -export { ShutdownMode, shutdown, Transport } from "./ops/net.ts"; +import { Addr } from "./ops/net.ts"; +export { ShutdownMode, shutdown, NetAddr, UnixAddr } from "./ops/net.ts"; -export interface Addr { - transport: Transport; - hostname: string; - port: number; -} - -export interface UDPAddr { - transport?: Transport; - hostname?: string; - port: number; -} - -export interface UDPConn extends AsyncIterable<[Uint8Array, Addr]> { +export interface DatagramConn extends AsyncIterable<[Uint8Array, Addr]> { receive(p?: Uint8Array): Promise<[Uint8Array, Addr]>; - send(p: Uint8Array, addr: UDPAddr): Promise<void>; + send(p: Uint8Array, addr: Addr): Promise<void>; close(): void; @@ -73,7 +61,7 @@ export class ListenerImpl implements Listener { constructor(readonly rid: number, readonly addr: Addr) {} async accept(): Promise<Conn> { - const res = await netOps.accept(this.rid); + const res = await netOps.accept(this.rid, this.addr.transport); return new ConnImpl(res.rid, res.remoteAddr, res.localAddr); } @@ -95,15 +83,7 @@ export class ListenerImpl implements Listener { } } -export async function recvfrom( - rid: number, - p: Uint8Array -): Promise<[number, Addr]> { - const { size, remoteAddr } = await netOps.receive(rid, p); - return [size, remoteAddr]; -} - -export class UDPConnImpl implements UDPConn { +export class DatagramImpl implements DatagramConn { constructor( readonly rid: number, readonly addr: Addr, @@ -112,14 +92,18 @@ export class UDPConnImpl implements UDPConn { async receive(p?: Uint8Array): Promise<[Uint8Array, Addr]> { const buf = p || new Uint8Array(this.bufSize); - const [size, remoteAddr] = await recvfrom(this.rid, buf); + const { size, remoteAddr } = await netOps.receive( + this.rid, + this.addr.transport, + buf + ); const sub = buf.subarray(0, size); return [sub, remoteAddr]; } - async send(p: Uint8Array, addr: UDPAddr): Promise<void> { + async send(p: Uint8Array, addr: Addr): Promise<void> { const remote = { hostname: "127.0.0.1", transport: "udp", ...addr }; - if (remote.transport !== "udp") throw Error("Remote transport must be UDP"); + const args = { ...remote, rid: this.rid }; await netOps.send(args as netOps.SendRequest, p); } @@ -153,38 +137,77 @@ export interface Conn extends Reader, Writer, Closer { export interface ListenOptions { port: number; hostname?: string; - transport?: Transport; + transport?: "tcp" | "udp"; +} + +export interface UnixListenOptions { + transport: "unix" | "unixpacket"; + address: string; } export function listen( options: ListenOptions & { transport?: "tcp" } ): Listener; -export function listen(options: ListenOptions & { transport: "udp" }): UDPConn; -export function listen({ - port, - hostname = "0.0.0.0", - transport = "tcp" -}: ListenOptions): Listener | UDPConn { - const res = netOps.listen({ port, hostname, transport }); - - if (transport === "tcp") { +export function listen( + options: UnixListenOptions & { transport: "unix" } +): Listener; +export function listen( + options: ListenOptions & { transport: "udp" } +): DatagramConn; +export function listen( + options: UnixListenOptions & { transport: "unixpacket" } +): DatagramConn; +export function listen( + options: ListenOptions | UnixListenOptions +): Listener | DatagramConn { + let res; + + if (options.transport === "unix" || options.transport === "unixpacket") { + res = netOps.listen(options); + } else { + res = netOps.listen({ + transport: "tcp", + hostname: "127.0.0.1", + ...(options as ListenOptions) + }); + } + + if ( + !options.transport || + options.transport === "tcp" || + options.transport === "unix" + ) { return new ListenerImpl(res.rid, res.localAddr); } else { - return new UDPConnImpl(res.rid, res.localAddr); + return new DatagramImpl(res.rid, res.localAddr); } } export interface ConnectOptions { port: number; hostname?: string; - transport?: Transport; + transport?: "tcp"; } +export interface UnixConnectOptions { + transport: "unix"; + address: string; +} +export async function connect(options: UnixConnectOptions): Promise<Conn>; +export async function connect(options: ConnectOptions): Promise<Conn>; +export async function connect( + options: ConnectOptions | UnixConnectOptions +): Promise<Conn> { + let res; + + if (options.transport === "unix") { + res = await netOps.connect(options); + } else { + res = await netOps.connect({ + transport: "tcp", + hostname: "127.0.0.1", + ...options + }); + } -export async function connect({ - port, - hostname = "127.0.0.1", - transport = "tcp" -}: ConnectOptions): Promise<Conn> { - const res = await netOps.connect({ port, hostname, transport }); return new ConnImpl(res.rid, res.remoteAddr!, res.localAddr!); } diff --git a/cli/js/ops/net.ts b/cli/js/ops/net.ts index 25f3a8322..7734e8811 100644 --- a/cli/js/ops/net.ts +++ b/cli/js/ops/net.ts @@ -1,9 +1,18 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. import { sendSync, sendAsync } from "./dispatch_json.ts"; -export type Transport = "tcp" | "udp"; -// TODO support other types: -// export type Transport = "tcp" | "tcp4" | "tcp6" | "unix" | "unixpacket"; +export interface NetAddr { + transport: "tcp" | "udp"; + hostname: string; + port: number; +} + +export interface UnixAddr { + transport: "unix" | "unixpacket"; + address: string; +} + +export type Addr = NetAddr | UnixAddr; export enum ShutdownMode { // See http://man7.org/linux/man-pages/man2/shutdown.2.html @@ -19,35 +28,22 @@ export function shutdown(rid: number, how: ShutdownMode): void { interface AcceptResponse { rid: number; - localAddr: { - hostname: string; - port: number; - transport: Transport; - }; - remoteAddr: { - hostname: string; - port: number; - transport: Transport; - }; + localAddr: Addr; + remoteAddr: Addr; } -export function accept(rid: number): Promise<AcceptResponse> { - return sendAsync("op_accept", { rid }); +export function accept( + rid: number, + transport: string +): Promise<AcceptResponse> { + return sendAsync("op_accept", { rid, transport }); } -export interface ListenRequest { - transport: Transport; - hostname: string; - port: number; -} +export type ListenRequest = Addr; interface ListenResponse { rid: number; - localAddr: { - hostname: string; - port: number; - transport: Transport; - }; + localAddr: Addr; } export function listen(args: ListenRequest): ListenResponse { @@ -56,23 +52,11 @@ export function listen(args: ListenRequest): ListenResponse { interface ConnectResponse { rid: number; - localAddr: { - hostname: string; - port: number; - transport: Transport; - }; - remoteAddr: { - hostname: string; - port: number; - transport: Transport; - }; + localAddr: Addr; + remoteAddr: Addr; } -export interface ConnectRequest { - transport: Transport; - hostname: string; - port: number; -} +export type ConnectRequest = Addr; export function connect(args: ConnectRequest): Promise<ConnectResponse> { return sendAsync("op_connect", args); @@ -80,26 +64,20 @@ export function connect(args: ConnectRequest): Promise<ConnectResponse> { interface ReceiveResponse { size: number; - remoteAddr: { - hostname: string; - port: number; - transport: Transport; - }; + remoteAddr: Addr; } export function receive( rid: number, + transport: string, zeroCopy: Uint8Array ): Promise<ReceiveResponse> { - return sendAsync("op_receive", { rid }, zeroCopy); + return sendAsync("op_receive", { rid, transport }, zeroCopy); } -export interface SendRequest { +export type SendRequest = { rid: number; - hostname: string; - port: number; - transport: Transport; -} +} & Addr; export async function send( args: SendRequest, diff --git a/cli/js/ops/tls.ts b/cli/js/ops/tls.ts index e52143cb0..234e569dd 100644 --- a/cli/js/ops/tls.ts +++ b/cli/js/ops/tls.ts @@ -1,9 +1,8 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. import { sendAsync, sendSync } from "./dispatch_json.ts"; -import { Transport } from "./net.ts"; export interface ConnectTLSRequest { - transport: Transport; + transport: "tcp"; hostname: string; port: number; certFile?: string; @@ -14,12 +13,12 @@ interface ConnectTLSResponse { localAddr: { hostname: string; port: number; - transport: Transport; + transport: "tcp"; }; remoteAddr: { hostname: string; port: number; - transport: Transport; + transport: "tcp"; }; } @@ -34,12 +33,12 @@ interface AcceptTLSResponse { localAddr: { hostname: string; port: number; - transport: Transport; + transport: "tcp"; }; remoteAddr: { hostname: string; port: number; - transport: Transport; + transport: "tcp"; }; } @@ -50,7 +49,7 @@ export function acceptTLS(rid: number): Promise<AcceptTLSResponse> { export interface ListenTLSRequest { port: number; hostname: string; - transport: Transport; + transport: "tcp"; certFile: string; keyFile: string; } @@ -60,7 +59,7 @@ interface ListenTLSResponse { localAddr: { hostname: string; port: number; - transport: Transport; + transport: "tcp"; }; } diff --git a/cli/js/tests/net_test.ts b/cli/js/tests/net_test.ts index f62c5c329..788203b5a 100644 --- a/cli/js/tests/net_test.ts +++ b/cli/js/tests/net_test.ts @@ -10,7 +10,7 @@ import { unitTest({ perms: { net: true } }, function netTcpListenClose(): void { const port = randomPort(); const listener = Deno.listen({ hostname: "127.0.0.1", port }); - assertEquals(listener.addr.transport, "tcp"); + assert(listener.addr.transport === "tcp"); assertEquals(listener.addr.hostname, "127.0.0.1"); assertEquals(listener.addr.port, port); listener.close(); @@ -29,7 +29,7 @@ unitTest( port, transport: "udp" }); - assertEquals(socket.addr.transport, "udp"); + assert(socket.addr.transport === "udp"); assertEquals(socket.addr.hostname, "127.0.0.1"); assertEquals(socket.addr.port, port); socket.close(); @@ -37,6 +37,34 @@ unitTest( ); unitTest( + { ignore: Deno.build.os === "win", perms: { read: true, write: true } }, + function netUnixListenClose(): void { + const filePath = Deno.makeTempFileSync(); + const socket = Deno.listen({ + address: filePath, + transport: "unix" + }); + assert(socket.addr.transport === "unix"); + assertEquals(socket.addr.address, filePath); + socket.close(); + } +); + +unitTest( + { ignore: Deno.build.os === "win", perms: { read: true, write: true } }, + function netUnixPacketListenClose(): void { + const filePath = Deno.makeTempFileSync(); + const socket = Deno.listen({ + address: filePath, + transport: "unixpacket" + }); + assert(socket.addr.transport === "unixpacket"); + assertEquals(socket.addr.address, filePath); + socket.close(); + } +); + +unitTest( { perms: { net: true } }, @@ -58,6 +86,28 @@ unitTest( ); unitTest( + { ignore: Deno.build.os === "win", perms: { read: true, write: true } }, + async function netUnixCloseWhileAccept(): Promise<void> { + const filePath = await Deno.makeTempFile(); + const listener = Deno.listen({ + address: filePath, + transport: "unix" + }); + const p = listener.accept(); + listener.close(); + let err; + try { + await p; + } catch (e) { + err = e; + } + assert(!!err); + assert(err instanceof Error); + assertEquals(err.message, "Listener has been closed"); + } +); + +unitTest( { perms: { net: true } }, async function netTcpConcurrentAccept(): Promise<void> { const port = randomPort(); @@ -81,6 +131,31 @@ unitTest( } ); +// TODO(jsouto): Enable when tokio updates mio to v0.7! +unitTest( + { ignore: true, perms: { read: true, write: true } }, + async function netUnixConcurrentAccept(): Promise<void> { + const filePath = await Deno.makeTempFile(); + const listener = Deno.listen({ transport: "unix", address: filePath }); + let acceptErrCount = 0; + const checkErr = (e: Error): void => { + if (e.message === "Listener has been closed") { + assertEquals(acceptErrCount, 1); + } else if (e.message === "Another accept task is ongoing") { + acceptErrCount++; + } else { + throw new Error("Unexpected error message"); + } + }; + const p = listener.accept().catch(checkErr); + const p1 = listener.accept().catch(checkErr); + await Promise.race([p, p1]); + listener.close(); + await [p, p1]; + assertEquals(acceptErrCount, 1); + } +); + unitTest({ perms: { net: true } }, async function netTcpDialListen(): Promise< void > { @@ -89,6 +164,7 @@ unitTest({ perms: { net: true } }, async function netTcpDialListen(): Promise< listener.accept().then( async (conn): Promise<void> => { assert(conn.remoteAddr != null); + assert(conn.localAddr.transport === "tcp"); assertEquals(conn.localAddr.hostname, "127.0.0.1"); assertEquals(conn.localAddr.port, port); await conn.write(new Uint8Array([1, 2, 3])); @@ -96,6 +172,7 @@ unitTest({ perms: { net: true } }, async function netTcpDialListen(): Promise< } ); const conn = await Deno.connect({ hostname: "127.0.0.1", port }); + assert(conn.remoteAddr.transport === "tcp"); assertEquals(conn.remoteAddr.hostname, "127.0.0.1"); assertEquals(conn.remoteAddr.port, port); assert(conn.localAddr != null); @@ -117,24 +194,61 @@ unitTest({ perms: { net: true } }, async function netTcpDialListen(): Promise< }); unitTest( + { ignore: Deno.build.os === "win", perms: { read: true, write: true } }, + async function netUnixDialListen(): Promise<void> { + const filePath = await Deno.makeTempFile(); + const listener = Deno.listen({ address: filePath, transport: "unix" }); + listener.accept().then( + async (conn): Promise<void> => { + assert(conn.remoteAddr != null); + assert(conn.localAddr.transport === "unix"); + assertEquals(conn.localAddr.address, filePath); + await conn.write(new Uint8Array([1, 2, 3])); + conn.close(); + } + ); + const conn = await Deno.connect({ address: filePath, transport: "unix" }); + assert(conn.remoteAddr.transport === "unix"); + assertEquals(conn.remoteAddr.address, filePath); + assert(conn.remoteAddr != null); + const buf = new Uint8Array(1024); + const readResult = await conn.read(buf); + assertEquals(3, readResult); + assertEquals(1, buf[0]); + assertEquals(2, buf[1]); + assertEquals(3, buf[2]); + assert(conn.rid > 0); + + assert(readResult !== Deno.EOF); + + const readResult2 = await conn.read(buf); + assertEquals(Deno.EOF, readResult2); + + listener.close(); + conn.close(); + } +); + +unitTest( { ignore: Deno.build.os === "win", perms: { net: true } }, async function netUdpSendReceive(): Promise<void> { const alicePort = randomPort(); const alice = Deno.listen({ port: alicePort, transport: "udp" }); + assert(alice.addr.transport === "udp"); assertEquals(alice.addr.port, alicePort); - assertEquals(alice.addr.hostname, "0.0.0.0"); - assertEquals(alice.addr.transport, "udp"); + assertEquals(alice.addr.hostname, "127.0.0.1"); const bobPort = randomPort(); const bob = Deno.listen({ port: bobPort, transport: "udp" }); + assert(bob.addr.transport === "udp"); assertEquals(bob.addr.port, bobPort); - assertEquals(bob.addr.hostname, "0.0.0.0"); - assertEquals(bob.addr.transport, "udp"); + assertEquals(bob.addr.hostname, "127.0.0.1"); const sent = new Uint8Array([1, 2, 3]); await alice.send(sent, bob.addr); const [recvd, remote] = await bob.receive(); + assert(remote.transport === "udp"); assertEquals(remote.port, alicePort); assertEquals(recvd.length, 3); assertEquals(1, recvd[0]); @@ -146,6 +260,33 @@ unitTest( ); unitTest( + { ignore: Deno.build.os === "win", perms: { read: true, write: true } }, + async function netUnixPacketSendReceive(): Promise<void> { + const filePath = await Deno.makeTempFile(); + const alice = Deno.listen({ address: filePath, transport: "unixpacket" }); + assert(alice.addr.transport === "unixpacket"); + assertEquals(alice.addr.address, filePath); + + const bob = Deno.listen({ address: filePath, transport: "unixpacket" }); + assert(bob.addr.transport === "unixpacket"); + assertEquals(bob.addr.address, filePath); + + const sent = new Uint8Array([1, 2, 3]); + await alice.send(sent, bob.addr); + + const [recvd, remote] = await bob.receive(); + assert(remote.transport === "unixpacket"); + assertEquals(remote.address, filePath); + assertEquals(recvd.length, 3); + assertEquals(1, recvd[0]); + assertEquals(2, recvd[1]); + assertEquals(3, recvd[2]); + alice.close(); + bob.close(); + } +); + +unitTest( { perms: { net: true } }, async function netTcpListenCloseWhileIterating(): Promise<void> { const port = randomPort(); @@ -174,6 +315,34 @@ unitTest( ); unitTest( + { ignore: Deno.build.os === "win", perms: { read: true, write: true } }, + async function netUnixListenCloseWhileIterating(): Promise<void> { + const filePath = Deno.makeTempFileSync(); + const socket = Deno.listen({ address: filePath, transport: "unix" }); + const nextWhileClosing = socket[Symbol.asyncIterator]().next(); + socket.close(); + assertEquals(await nextWhileClosing, { value: undefined, done: true }); + + const nextAfterClosing = socket[Symbol.asyncIterator]().next(); + assertEquals(await nextAfterClosing, { value: undefined, done: true }); + } +); + +unitTest( + { ignore: Deno.build.os === "win", perms: { read: true, write: true } }, + async function netUnixPacketListenCloseWhileIterating(): Promise<void> { + const filePath = Deno.makeTempFileSync(); + const socket = Deno.listen({ address: filePath, transport: "unixpacket" }); + const nextWhileClosing = socket[Symbol.asyncIterator]().next(); + socket.close(); + assertEquals(await nextWhileClosing, { value: undefined, done: true }); + + const nextAfterClosing = socket[Symbol.asyncIterator]().next(); + assertEquals(await nextAfterClosing, { value: undefined, done: true }); + } +); + +unitTest( { // FIXME(bartlomieju) ignore: true, diff --git a/cli/js/tls.ts b/cli/js/tls.ts index 3ed70727d..5a9b9ace0 100644 --- a/cli/js/tls.ts +++ b/cli/js/tls.ts @@ -1,11 +1,11 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. import * as tlsOps from "./ops/tls.ts"; -import { Listener, Transport, Conn, ConnImpl, ListenerImpl } from "./net.ts"; +import { Listener, Conn, ConnImpl, ListenerImpl } from "./net.ts"; // TODO(ry) There are many configuration options to add... // https://docs.rs/rustls/0.16.0/rustls/struct.ClientConfig.html interface ConnectTLSOptions { - transport?: Transport; + transport?: "tcp"; port: number; hostname?: string; certFile?: string; @@ -36,7 +36,7 @@ class TLSListenerImpl extends ListenerImpl { export interface ListenTLSOptions { port: number; hostname?: string; - transport?: Transport; + transport?: "tcp"; certFile: string; keyFile: string; } diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 7969184ef..0c9a83883 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -148,6 +148,8 @@ pub enum StreamResource { Stderr(tokio::io::Stderr), FsFile(tokio::fs::File, FileMetadata), TcpStream(tokio::net::TcpStream), + #[cfg(not(windows))] + UnixStream(tokio::net::UnixStream), ServerTlsStream(Box<ServerTlsStream<TcpStream>>), ClientTlsStream(Box<ClientTlsStream<TcpStream>>), HttpBody(Box<HttpBody>), @@ -183,6 +185,8 @@ impl DenoAsyncRead for StreamResource { FsFile(f, _) => f, Stdin(f, _) => f, TcpStream(f) => f, + #[cfg(not(windows))] + UnixStream(f) => f, ClientTlsStream(f) => f, ServerTlsStream(f) => f, ChildStdout(f) => f, @@ -262,6 +266,8 @@ impl DenoAsyncWrite for StreamResource { Stdout(f) => f, Stderr(f) => f, TcpStream(f) => f, + #[cfg(not(windows))] + UnixStream(f) => f, ClientTlsStream(f) => f, ServerTlsStream(f) => f, ChildStdin(f) => f, @@ -279,6 +285,8 @@ impl DenoAsyncWrite for StreamResource { Stdout(f) => f, Stderr(f) => f, TcpStream(f) => f, + #[cfg(not(windows))] + UnixStream(f) => f, ClientTlsStream(f) => f, ServerTlsStream(f) => f, ChildStdin(f) => f, diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs index c011facfc..b91a61c3a 100644 --- a/cli/ops/mod.rs +++ b/cli/ops/mod.rs @@ -15,6 +15,8 @@ pub mod fs; pub mod fs_events; pub mod io; pub mod net; +#[cfg(unix)] +mod net_unix; pub mod os; pub mod permissions; pub mod plugins; diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 3987e94c1..f074ef9ee 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -18,6 +18,9 @@ use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::net::UdpSocket; +#[cfg(unix)] +use super::net_unix; + pub fn init(i: &mut Isolate, s: &State) { i.register_op("op_accept", s.stateful_json_op(op_accept)); i.register_op("op_connect", s.stateful_json_op(op_connect)); @@ -30,14 +33,14 @@ pub fn init(i: &mut Isolate, s: &State) { #[derive(Deserialize)] struct AcceptArgs { rid: i32, + transport: String, } -fn op_accept( +fn accept_tcp( state: &State, - args: Value, + args: AcceptArgs, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { - let args: AcceptArgs = serde_json::from_value(args)?; let rid = args.rid as u32; let state_ = state.clone(); { @@ -102,20 +105,36 @@ fn op_accept( Ok(JsonOp::Async(op.boxed_local())) } +fn op_accept( + state: &State, + args: Value, + zero_copy: Option<ZeroCopyBuf>, +) -> Result<JsonOp, OpError> { + let args: AcceptArgs = serde_json::from_value(args)?; + match args.transport.as_str() { + "tcp" => accept_tcp(state, args, zero_copy), + #[cfg(unix)] + "unix" => net_unix::accept_unix(state, args.rid as u32, zero_copy), + _ => Err(OpError::other(format!( + "Unsupported transport protocol {}", + args.transport + ))), + } +} + #[derive(Deserialize)] struct ReceiveArgs { rid: i32, + transport: String, } -fn op_receive( +fn receive_udp( state: &State, - args: Value, + args: ReceiveArgs, zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { - assert!(zero_copy.is_some()); let mut buf = zero_copy.unwrap(); - let args: ReceiveArgs = serde_json::from_value(args)?; let rid = args.rid as u32; let state_ = state.clone(); @@ -145,12 +164,32 @@ fn op_receive( Ok(JsonOp::Async(op.boxed_local())) } +fn op_receive( + state: &State, + args: Value, + zero_copy: Option<ZeroCopyBuf>, +) -> Result<JsonOp, OpError> { + assert!(zero_copy.is_some()); + let args: ReceiveArgs = serde_json::from_value(args)?; + match args.transport.as_str() { + "udp" => receive_udp(state, args, zero_copy), + #[cfg(unix)] + "unixpacket" => { + net_unix::receive_unix_packet(state, args.rid as u32, zero_copy) + } + _ => Err(OpError::other(format!( + "Unsupported transport protocol {}", + args.transport + ))), + } +} + #[derive(Deserialize)] struct SendArgs { rid: i32, - hostname: String, - port: u16, transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, } fn op_send( @@ -160,38 +199,67 @@ fn op_send( ) -> Result<JsonOp, OpError> { assert!(zero_copy.is_some()); let buf = zero_copy.unwrap(); - - let args: SendArgs = serde_json::from_value(args)?; - assert_eq!(args.transport, "udp"); - let rid = args.rid as u32; - let state_ = state.clone(); - state.check_net(&args.hostname, args.port)?; - - let op = async move { - let mut state = state_.borrow_mut(); - let resource = state - .resource_table - .get_mut::<UdpSocketResource>(rid) - .ok_or_else(|| { - OpError::bad_resource("Socket has been closed".to_string()) - })?; - - let socket = &mut resource.socket; - let addr = resolve_addr(&args.hostname, args.port).await?; - socket.send_to(&buf, addr).await?; - - Ok(json!({})) - }; - - Ok(JsonOp::Async(op.boxed_local())) + match serde_json::from_value(args)? { + SendArgs { + rid, + transport, + transport_args: ArgsEnum::Ip(args), + } if transport == "udp" => { + state.check_net(&args.hostname, args.port)?; + + let op = async move { + let mut state = state_.borrow_mut(); + let resource = state + .resource_table + .get_mut::<UdpSocketResource>(rid as u32) + .ok_or_else(|| { + OpError::bad_resource("Socket has been closed".to_string()) + })?; + let socket = &mut resource.socket; + let addr = resolve_addr(&args.hostname, args.port).await?; + socket.send_to(&buf, addr).await?; + Ok(json!({})) + }; + + Ok(JsonOp::Async(op.boxed_local())) + } + #[cfg(unix)] + SendArgs { + rid, + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unixpacket" => { + let address_path = net_unix::Path::new(&args.address); + state.check_read(&address_path)?; + let op = async move { + let mut state = state_.borrow_mut(); + let resource = state + .resource_table + .get_mut::<net_unix::UnixDatagramResource>(rid as u32) + .ok_or_else(|| { + OpError::other("Socket has been closed".to_string()) + })?; + + let socket = &mut resource.socket; + socket + .send_to(&buf, &resource.local_addr.as_pathname().unwrap()) + .await?; + + Ok(json!({})) + }; + + Ok(JsonOp::Async(op.boxed_local())) + } + _ => Err(OpError::other("Wrong argument format!".to_owned())), + } } #[derive(Deserialize)] struct ConnectArgs { transport: String, - hostname: String, - port: u16, + #[serde(flatten)] + transport_args: ArgsEnum, } fn op_connect( @@ -199,39 +267,78 @@ fn op_connect( args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { - let args: ConnectArgs = serde_json::from_value(args)?; - assert_eq!(args.transport, "tcp"); // TODO Support others. - let state_ = state.clone(); - state.check_net(&args.hostname, args.port)?; - - let op = async move { - let addr = resolve_addr(&args.hostname, args.port).await?; - let tcp_stream = TcpStream::connect(&addr).await?; - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - let mut state = state_.borrow_mut(); - let rid = state.resource_table.add( - "tcpStream", - Box::new(StreamResourceHolder::new(StreamResource::TcpStream( - tcp_stream, - ))), - ); - Ok(json!({ - "rid": rid, - "localAddr": { - "hostname": local_addr.ip().to_string(), - "port": local_addr.port(), - "transport": args.transport, - }, - "remoteAddr": { - "hostname": remote_addr.ip().to_string(), - "port": remote_addr.port(), - "transport": args.transport, - } - })) - }; - - Ok(JsonOp::Async(op.boxed_local())) + match serde_json::from_value(args)? { + ConnectArgs { + transport, + transport_args: ArgsEnum::Ip(args), + } if transport == "tcp" => { + let state_ = state.clone(); + state.check_net(&args.hostname, args.port)?; + let op = async move { + let addr = resolve_addr(&args.hostname, args.port).await?; + let tcp_stream = TcpStream::connect(&addr).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + let mut state = state_.borrow_mut(); + let rid = state.resource_table.add( + "tcpStream", + Box::new(StreamResourceHolder::new(StreamResource::TcpStream( + tcp_stream, + ))), + ); + Ok(json!({ + "rid": rid, + "localAddr": { + "hostname": local_addr.ip().to_string(), + "port": local_addr.port(), + "transport": transport, + }, + "remoteAddr": { + "hostname": remote_addr.ip().to_string(), + "port": remote_addr.port(), + "transport": transport, + } + })) + }; + Ok(JsonOp::Async(op.boxed_local())) + } + #[cfg(unix)] + ConnectArgs { + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unix" => { + let address_path = net_unix::Path::new(&args.address); + let state_ = state.clone(); + state.check_read(&address_path)?; + let op = async move { + let address = args.address; + let unix_stream = + net_unix::UnixStream::connect(net_unix::Path::new(&address)).await?; + let local_addr = unix_stream.local_addr()?; + let remote_addr = unix_stream.peer_addr()?; + let mut state = state_.borrow_mut(); + let rid = state.resource_table.add( + "unixStream", + Box::new(StreamResourceHolder::new(StreamResource::UnixStream( + unix_stream, + ))), + ); + Ok(json!({ + "rid": rid, + "localAddr": { + "address": local_addr.as_pathname(), + "transport": transport, + }, + "remoteAddr": { + "address": remote_addr.as_pathname(), + "transport": transport, + } + })) + }; + Ok(JsonOp::Async(op.boxed_local())) + } + _ => Err(OpError::other("Wrong argument format!".to_owned())), + } } #[derive(Deserialize)] @@ -265,19 +372,17 @@ fn op_shutdown( StreamResource::TcpStream(ref mut stream) => { TcpStream::shutdown(stream, shutdown_mode).map_err(OpError::from)?; } + #[cfg(unix)] + StreamResource::UnixStream(ref mut stream) => { + net_unix::UnixStream::shutdown(stream, shutdown_mode) + .map_err(OpError::from)?; + } _ => return Err(OpError::bad_resource_id()), } Ok(JsonOp::Sync(json!({}))) } -#[derive(Deserialize)] -struct ListenArgs { - transport: String, - hostname: String, - port: u16, -} - #[allow(dead_code)] struct TcpListenerResource { listener: TcpListener, @@ -331,6 +436,27 @@ struct UdpSocketResource { socket: UdpSocket, } +#[derive(Deserialize)] +struct IpListenArgs { + hostname: String, + port: u16, +} + +#[derive(Deserialize)] +#[serde(untagged)] +enum ArgsEnum { + Ip(IpListenArgs), + #[cfg(unix)] + Unix(net_unix::UnixListenArgs), +} + +#[derive(Deserialize)] +struct ListenArgs { + transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, +} + fn listen_tcp( state: &State, addr: SocketAddr, @@ -370,33 +496,60 @@ fn op_listen( args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { - let args: ListenArgs = serde_json::from_value(args)?; - assert!(args.transport == "tcp" || args.transport == "udp"); - - state.check_net(&args.hostname, args.port)?; - - let addr = - futures::executor::block_on(resolve_addr(&args.hostname, args.port))?; - - let (rid, local_addr) = if args.transport == "tcp" { - listen_tcp(state, addr)? - } else { - listen_udp(state, addr)? - }; - - debug!( - "New listener {} {}:{}", - rid, - local_addr.ip().to_string(), - local_addr.port() - ); - - Ok(JsonOp::Sync(json!({ - "rid": rid, - "localAddr": { - "hostname": local_addr.ip().to_string(), - "port": local_addr.port(), - "transport": args.transport, - }, - }))) + match serde_json::from_value(args)? { + ListenArgs { + transport, + transport_args: ArgsEnum::Ip(args), + } => { + state.check_net(&args.hostname, args.port)?; + let addr = + futures::executor::block_on(resolve_addr(&args.hostname, args.port))?; + let (rid, local_addr) = if transport == "tcp" { + listen_tcp(state, addr)? + } else { + listen_udp(state, addr)? + }; + debug!( + "New listener {} {}:{}", + rid, + local_addr.ip().to_string(), + local_addr.port() + ); + Ok(JsonOp::Sync(json!({ + "rid": rid, + "localAddr": { + "hostname": local_addr.ip().to_string(), + "port": local_addr.port(), + "transport": transport, + }, + }))) + } + #[cfg(unix)] + ListenArgs { + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unix" || transport == "unixpacket" => { + let address_path = net_unix::Path::new(&args.address); + state.check_read(&address_path)?; + let (rid, local_addr) = if transport == "unix" { + net_unix::listen_unix(state, &address_path)? + } else { + net_unix::listen_unix_packet(state, &address_path)? + }; + debug!( + "New listener {} {}", + rid, + local_addr.as_pathname().unwrap().display(), + ); + Ok(JsonOp::Sync(json!({ + "rid": rid, + "localAddr": { + "address": local_addr.as_pathname(), + "transport": transport, + }, + }))) + } + #[cfg(unix)] + _ => Err(OpError::other("Wrong argument format!".to_owned())), + } } diff --git a/cli/ops/net_unix.rs b/cli/ops/net_unix.rs new file mode 100644 index 000000000..43778e7c6 --- /dev/null +++ b/cli/ops/net_unix.rs @@ -0,0 +1,142 @@ +use super::dispatch_json::{Deserialize, JsonOp}; +use super::io::{StreamResource, StreamResourceHolder}; +use crate::op_error::OpError; +use crate::state::State; +use futures::future::FutureExt; + +use deno_core::*; +use std::fs::remove_file; +use std::os::unix; +pub use std::path::Path; +use tokio::net::UnixDatagram; +use tokio::net::UnixListener; +pub use tokio::net::UnixStream; + +struct UnixListenerResource { + listener: UnixListener, +} + +pub struct UnixDatagramResource { + pub socket: UnixDatagram, + pub local_addr: unix::net::SocketAddr, +} + +#[derive(Deserialize)] +pub struct UnixListenArgs { + pub address: String, +} + +pub fn accept_unix( + state: &State, + rid: u32, + _zero_copy: Option<ZeroCopyBuf>, +) -> Result<JsonOp, OpError> { + let state_ = state.clone(); + { + let state = state.borrow(); + state + .resource_table + .get::<UnixListenerResource>(rid) + .ok_or_else(OpError::bad_resource_id)?; + } + let op = async move { + let mut state = state_.borrow_mut(); + let listener_resource = state + .resource_table + .get_mut::<UnixListenerResource>(rid) + .ok_or_else(|| { + OpError::bad_resource("Listener has been closed".to_string()) + })?; + let (unix_stream, _socket_addr) = + listener_resource.listener.accept().await?; + let local_addr = unix_stream.local_addr()?; + let remote_addr = unix_stream.peer_addr()?; + let rid = state.resource_table.add( + "unixStream", + Box::new(StreamResourceHolder::new(StreamResource::UnixStream( + unix_stream, + ))), + ); + Ok(json!({ + "rid": rid, + "localAddr": { + "address": local_addr.as_pathname(), + "transport": "unix", + }, + "remoteAddr": { + "address": remote_addr.as_pathname(), + "transport": "unix", + } + })) + }; + + Ok(JsonOp::Async(op.boxed_local())) +} + +pub fn receive_unix_packet( + state: &State, + rid: u32, + zero_copy: Option<ZeroCopyBuf>, +) -> Result<JsonOp, OpError> { + let mut buf = zero_copy.unwrap(); + let state_ = state.clone(); + + let op = async move { + let mut state = state_.borrow_mut(); + let resource = state + .resource_table + .get_mut::<UnixDatagramResource>(rid) + .ok_or_else(|| { + OpError::bad_resource("Socket has been closed".to_string()) + })?; + let (size, remote_addr) = resource.socket.recv_from(&mut buf).await?; + Ok(json!({ + "size": size, + "remoteAddr": { + "address": remote_addr.as_pathname(), + "transport": "unixpacket", + } + })) + }; + + Ok(JsonOp::Async(op.boxed_local())) +} + +pub fn listen_unix( + state: &State, + addr: &Path, +) -> Result<(u32, unix::net::SocketAddr), OpError> { + let mut state = state.borrow_mut(); + if addr.exists() { + remove_file(&addr).unwrap(); + } + let listener = UnixListener::bind(&addr)?; + let local_addr = listener.local_addr()?; + let listener_resource = UnixListenerResource { listener }; + let rid = state + .resource_table + .add("unixListener", Box::new(listener_resource)); + + Ok((rid, local_addr)) +} + +pub fn listen_unix_packet( + state: &State, + addr: &Path, +) -> Result<(u32, unix::net::SocketAddr), OpError> { + let mut state = state.borrow_mut(); + if addr.exists() { + remove_file(&addr).unwrap(); + } + let socket = UnixDatagram::bind(&addr)?; + let local_addr = socket.local_addr()?; + let datagram_resource = UnixDatagramResource { + socket, + local_addr: local_addr.clone(), + }; + let rid = state + .resource_table + .add("unixDatagram", Box::new(datagram_resource)); + + Ok((rid, local_addr)) +} diff --git a/tools/deno_tcp.ts b/tools/deno_tcp.ts index 9a884cb70..068c48a04 100644 --- a/tools/deno_tcp.ts +++ b/tools/deno_tcp.ts @@ -8,7 +8,6 @@ const listener = Deno.listen({ hostname, port: Number(port) }); const response = new TextEncoder().encode( "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n" ); - async function handle(conn: Deno.Conn): Promise<void> { const buffer = new Uint8Array(1024); try { |