summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoão Souto <joao.jpgs@hotmail.com>2020-03-23 22:02:51 +0000
committerGitHub <noreply@github.com>2020-03-23 18:02:51 -0400
commit70a50344315a4c3361fc321e78e076fb09a502b3 (patch)
tree1079d325ec898afb7829ac1888ed395ed2ac35d2
parentb924e5ab7e69eab4d3b6d9a863a8fc2974f33b5d (diff)
feat: Support Unix Domain Sockets (#4176)
-rw-r--r--cli/Cargo.toml2
-rw-r--r--cli/js/deno.ts5
-rw-r--r--cli/js/lib.deno.ns.d.ts88
-rw-r--r--cli/js/net.ts117
-rw-r--r--cli/js/ops/net.ts80
-rw-r--r--cli/js/ops/tls.ts15
-rw-r--r--cli/js/tests/net_test.ts181
-rw-r--r--cli/js/tls.ts6
-rw-r--r--cli/ops/io.rs8
-rw-r--r--cli/ops/mod.rs2
-rw-r--r--cli/ops/net.rs361
-rw-r--r--cli/ops/net_unix.rs142
-rw-r--r--tools/deno_tcp.ts1
13 files changed, 740 insertions, 268 deletions
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
index dac161731..c7a2fbf75 100644
--- a/cli/Cargo.toml
+++ b/cli/Cargo.toml
@@ -54,7 +54,7 @@ sys-info = "=0.5.8" # 0.5.9 and 0.5.10 are broken on windows.
sourcemap = "5.0.0"
tempfile = "3.1.0"
termcolor = "1.1.0"
-tokio = { version = "0.2.13", features = ["rt-core", "tcp", "udp", "process", "fs", "blocking", "sync", "io-std", "macros", "time"] }
+tokio = { version = "0.2.13", features = ["rt-core", "tcp", "udp", "uds", "process", "fs", "blocking", "sync", "io-std", "macros", "time"] }
tokio-rustls = "0.13.0"
url = "2.1.1"
utime = "0.2.1"
diff --git a/cli/js/deno.ts b/cli/js/deno.ts
index 6a493faf8..f42e39754 100644
--- a/cli/js/deno.ts
+++ b/cli/js/deno.ts
@@ -70,12 +70,9 @@ export {
export { metrics, Metrics } from "./ops/runtime.ts";
export { mkdirSync, mkdir, MkdirOptions } from "./ops/fs/mkdir.ts";
export {
- Addr,
connect,
listen,
- recvfrom,
- UDPConn,
- UDPAddr,
+ DatagramConn,
Listener,
Conn,
ShutdownMode,
diff --git a/cli/js/lib.deno.ns.d.ts b/cli/js/lib.deno.ns.d.ts
index 99e40014c..72794e3bf 100644
--- a/cli/js/lib.deno.ns.d.ts
+++ b/cli/js/lib.deno.ns.d.ts
@@ -1546,21 +1546,18 @@ declare namespace Deno {
*
* Requires `allow-plugin` permission. */
export function openPlugin(filename: string): Plugin;
-
- export type Transport = "tcp" | "udp";
-
- export interface Addr {
- transport: Transport;
+ export interface NetAddr {
+ transport: "tcp" | "udp";
hostname: string;
port: number;
}
- export interface UDPAddr {
- port: number;
- transport?: Transport;
- hostname?: string;
+ export interface UnixAddr {
+ transport: "unix" | "unixpacket";
+ address: string;
}
+ export type Addr = NetAddr | UnixAddr;
/** **UNSTABLE**: Maybe remove `ShutdownMode` entirely.
*
* Corresponds to `SHUT_RD`, `SHUT_WR`, `SHUT_RDWR` on POSIX-like systems.
@@ -1587,16 +1584,8 @@ declare namespace Deno {
/** **UNSTABLE**: new API, yet to be vetted.
*
- * Waits for the next message to the passed `rid` and writes it on the passed
- * `Uint8Array`.
- *
- * Resolves to the number of bytes written and the remote address. */
- export function recvfrom(rid: number, p: Uint8Array): Promise<[number, Addr]>;
-
- /** **UNSTABLE**: new API, yet to be vetted.
- *
* A generic transport listener for message-oriented protocols. */
- export interface UDPConn extends AsyncIterable<[Uint8Array, Addr]> {
+ export interface DatagramConn extends AsyncIterable<[Uint8Array, Addr]> {
/** **UNSTABLE**: new API, yet to be vetted.
*
* Waits for and resolves to the next message to the `UDPConn`. */
@@ -1604,7 +1593,7 @@ declare namespace Deno {
/** UNSTABLE: new API, yet to be vetted.
*
* Sends a message to the target. */
- send(p: Uint8Array, addr: UDPAddr): Promise<void>;
+ send(p: Uint8Array, addr: Addr): Promise<void>;
/** UNSTABLE: new API, yet to be vetted.
*
* Close closes the socket. Any pending message promises will be rejected
@@ -1624,6 +1613,7 @@ declare namespace Deno {
close(): void;
/** Return the address of the `Listener`. */
readonly addr: Addr;
+
[Symbol.asyncIterator](): AsyncIterator<Conn>;
}
@@ -1648,13 +1638,12 @@ declare namespace Deno {
/** A literal IP address or host name that can be resolved to an IP address.
* If not specified, defaults to `0.0.0.0`. */
hostname?: string;
- /** Either `"tcp"` or `"udp"`. Defaults to `"tcp"`.
- *
- * In the future: `"tcp4"`, `"tcp6"`, `"udp4"`, `"udp6"`, `"ip"`, `"ip4"`,
- * `"ip6"`, `"unix"`, `"unixgram"`, and `"unixpacket"`. */
- transport?: Transport;
}
+ export interface UnixListenOptions {
+ /** A Path to the Unix Socket. */
+ address: string;
+ }
/** **UNSTABLE**: new API
*
* Listen announces on the local transport address.
@@ -1672,32 +1661,41 @@ declare namespace Deno {
*
* Listen announces on the local transport address.
*
- * Deno.listen({ port: 80 })
- * Deno.listen({ hostname: "192.0.2.1", port: 80 })
- * Deno.listen({ hostname: "[2001:db8::1]", port: 80 });
- * Deno.listen({ hostname: "golang.org", port: 80, transport: "tcp" });
+ * Deno.listen({ address: "/foo/bar.sock", transport: "unix" })
+ *
+ * Requires `allow-read` permission. */
+ export function listen(
+ options: UnixListenOptions & { transport: "unix" }
+ ): Listener;
+ /** **UNSTABLE**: new API
+ *
+ * Listen announces on the local transport address.
+ *
+ * Deno.listen({ port: 80, transport: "udp" })
+ * Deno.listen({ hostname: "golang.org", port: 80, transport: "udp" });
*
* Requires `allow-net` permission. */
export function listen(
options: ListenOptions & { transport: "udp" }
- ): UDPConn;
+ ): DatagramConn;
/** **UNSTABLE**: new API
*
* Listen announces on the local transport address.
*
- * Deno.listen({ port: 80 })
- * Deno.listen({ hostname: "192.0.2.1", port: 80 })
- * Deno.listen({ hostname: "[2001:db8::1]", port: 80 });
- * Deno.listen({ hostname: "golang.org", port: 80, transport: "tcp" });
+ * Deno.listen({ address: "/foo/bar.sock", transport: "unixpacket" })
*
- * Requires `allow-net` permission. */
- export function listen(options: ListenOptions): Listener | UDPConn;
+ * Requires `allow-read` permission. */
+ export function listen(
+ options: UnixListenOptions & { transport: "unixpacket" }
+ ): DatagramConn;
export interface ListenTLSOptions extends ListenOptions {
/** Server certificate file. */
certFile: string;
/** Server public key file. */
keyFile: string;
+
+ transport?: "tcp";
}
/** Listen announces on the local transport address over TLS (transport layer
@@ -1714,11 +1712,12 @@ declare namespace Deno {
/** A literal IP address or host name that can be resolved to an IP address.
* If not specified, defaults to `127.0.0.1`. */
hostname?: string;
- /** Either `"tcp"` or `"udp"`. Defaults to `"tcp"`.
- *
- * In the future: `"tcp4"`, `"tcp6"`, `"udp4"`, `"udp6"`, `"ip"`, `"ip4"`,
- * `"ip6"`, `"unix"`, `"unixgram"`, and `"unixpacket"`. */
- transport?: Transport;
+ transport?: "tcp";
+ }
+
+ export interface UnixConnectOptions {
+ transport: "unix";
+ address: string;
}
/**
@@ -1728,10 +1727,13 @@ declare namespace Deno {
* const conn1 = await Deno.connect({ port: 80 })
* const conn2 = await Deno.connect({ hostname: "192.0.2.1", port: 80 })
* const conn3 = await Deno.connect({ hostname: "[2001:db8::1]", port: 80 });
- * const conn4 = await Deno.connect({ hostname: "golang.org", port: 80, transport: "tcp" })
+ * const conn4 = await Deno.connect({ hostname: "golang.org", port: 80, transport: "tcp" });
+ * const conn5 = await Deno.connect({ address: "/foo/bar.sock", transport: "unix" });
*
- * Requires `allow-net` permission. */
- export function connect(options: ConnectOptions): Promise<Conn>;
+ * Requires `allow-net` permission for "tcp" and `allow-read` for unix. */
+ export function connect(
+ options: ConnectOptions | UnixConnectOptions
+ ): Promise<Conn>;
export interface ConnectTLSOptions {
/** The port to connect to. */
diff --git a/cli/js/net.ts b/cli/js/net.ts
index 570bada49..9be97dc2e 100644
--- a/cli/js/net.ts
+++ b/cli/js/net.ts
@@ -4,25 +4,13 @@ import { EOF, Reader, Writer, Closer } from "./io.ts";
import { read, write } from "./ops/io.ts";
import { close } from "./ops/resources.ts";
import * as netOps from "./ops/net.ts";
-import { Transport } from "./ops/net.ts";
-export { ShutdownMode, shutdown, Transport } from "./ops/net.ts";
+import { Addr } from "./ops/net.ts";
+export { ShutdownMode, shutdown, NetAddr, UnixAddr } from "./ops/net.ts";
-export interface Addr {
- transport: Transport;
- hostname: string;
- port: number;
-}
-
-export interface UDPAddr {
- transport?: Transport;
- hostname?: string;
- port: number;
-}
-
-export interface UDPConn extends AsyncIterable<[Uint8Array, Addr]> {
+export interface DatagramConn extends AsyncIterable<[Uint8Array, Addr]> {
receive(p?: Uint8Array): Promise<[Uint8Array, Addr]>;
- send(p: Uint8Array, addr: UDPAddr): Promise<void>;
+ send(p: Uint8Array, addr: Addr): Promise<void>;
close(): void;
@@ -73,7 +61,7 @@ export class ListenerImpl implements Listener {
constructor(readonly rid: number, readonly addr: Addr) {}
async accept(): Promise<Conn> {
- const res = await netOps.accept(this.rid);
+ const res = await netOps.accept(this.rid, this.addr.transport);
return new ConnImpl(res.rid, res.remoteAddr, res.localAddr);
}
@@ -95,15 +83,7 @@ export class ListenerImpl implements Listener {
}
}
-export async function recvfrom(
- rid: number,
- p: Uint8Array
-): Promise<[number, Addr]> {
- const { size, remoteAddr } = await netOps.receive(rid, p);
- return [size, remoteAddr];
-}
-
-export class UDPConnImpl implements UDPConn {
+export class DatagramImpl implements DatagramConn {
constructor(
readonly rid: number,
readonly addr: Addr,
@@ -112,14 +92,18 @@ export class UDPConnImpl implements UDPConn {
async receive(p?: Uint8Array): Promise<[Uint8Array, Addr]> {
const buf = p || new Uint8Array(this.bufSize);
- const [size, remoteAddr] = await recvfrom(this.rid, buf);
+ const { size, remoteAddr } = await netOps.receive(
+ this.rid,
+ this.addr.transport,
+ buf
+ );
const sub = buf.subarray(0, size);
return [sub, remoteAddr];
}
- async send(p: Uint8Array, addr: UDPAddr): Promise<void> {
+ async send(p: Uint8Array, addr: Addr): Promise<void> {
const remote = { hostname: "127.0.0.1", transport: "udp", ...addr };
- if (remote.transport !== "udp") throw Error("Remote transport must be UDP");
+
const args = { ...remote, rid: this.rid };
await netOps.send(args as netOps.SendRequest, p);
}
@@ -153,38 +137,77 @@ export interface Conn extends Reader, Writer, Closer {
export interface ListenOptions {
port: number;
hostname?: string;
- transport?: Transport;
+ transport?: "tcp" | "udp";
+}
+
+export interface UnixListenOptions {
+ transport: "unix" | "unixpacket";
+ address: string;
}
export function listen(
options: ListenOptions & { transport?: "tcp" }
): Listener;
-export function listen(options: ListenOptions & { transport: "udp" }): UDPConn;
-export function listen({
- port,
- hostname = "0.0.0.0",
- transport = "tcp"
-}: ListenOptions): Listener | UDPConn {
- const res = netOps.listen({ port, hostname, transport });
-
- if (transport === "tcp") {
+export function listen(
+ options: UnixListenOptions & { transport: "unix" }
+): Listener;
+export function listen(
+ options: ListenOptions & { transport: "udp" }
+): DatagramConn;
+export function listen(
+ options: UnixListenOptions & { transport: "unixpacket" }
+): DatagramConn;
+export function listen(
+ options: ListenOptions | UnixListenOptions
+): Listener | DatagramConn {
+ let res;
+
+ if (options.transport === "unix" || options.transport === "unixpacket") {
+ res = netOps.listen(options);
+ } else {
+ res = netOps.listen({
+ transport: "tcp",
+ hostname: "127.0.0.1",
+ ...(options as ListenOptions)
+ });
+ }
+
+ if (
+ !options.transport ||
+ options.transport === "tcp" ||
+ options.transport === "unix"
+ ) {
return new ListenerImpl(res.rid, res.localAddr);
} else {
- return new UDPConnImpl(res.rid, res.localAddr);
+ return new DatagramImpl(res.rid, res.localAddr);
}
}
export interface ConnectOptions {
port: number;
hostname?: string;
- transport?: Transport;
+ transport?: "tcp";
}
+export interface UnixConnectOptions {
+ transport: "unix";
+ address: string;
+}
+export async function connect(options: UnixConnectOptions): Promise<Conn>;
+export async function connect(options: ConnectOptions): Promise<Conn>;
+export async function connect(
+ options: ConnectOptions | UnixConnectOptions
+): Promise<Conn> {
+ let res;
+
+ if (options.transport === "unix") {
+ res = await netOps.connect(options);
+ } else {
+ res = await netOps.connect({
+ transport: "tcp",
+ hostname: "127.0.0.1",
+ ...options
+ });
+ }
-export async function connect({
- port,
- hostname = "127.0.0.1",
- transport = "tcp"
-}: ConnectOptions): Promise<Conn> {
- const res = await netOps.connect({ port, hostname, transport });
return new ConnImpl(res.rid, res.remoteAddr!, res.localAddr!);
}
diff --git a/cli/js/ops/net.ts b/cli/js/ops/net.ts
index 25f3a8322..7734e8811 100644
--- a/cli/js/ops/net.ts
+++ b/cli/js/ops/net.ts
@@ -1,9 +1,18 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import { sendSync, sendAsync } from "./dispatch_json.ts";
-export type Transport = "tcp" | "udp";
-// TODO support other types:
-// export type Transport = "tcp" | "tcp4" | "tcp6" | "unix" | "unixpacket";
+export interface NetAddr {
+ transport: "tcp" | "udp";
+ hostname: string;
+ port: number;
+}
+
+export interface UnixAddr {
+ transport: "unix" | "unixpacket";
+ address: string;
+}
+
+export type Addr = NetAddr | UnixAddr;
export enum ShutdownMode {
// See http://man7.org/linux/man-pages/man2/shutdown.2.html
@@ -19,35 +28,22 @@ export function shutdown(rid: number, how: ShutdownMode): void {
interface AcceptResponse {
rid: number;
- localAddr: {
- hostname: string;
- port: number;
- transport: Transport;
- };
- remoteAddr: {
- hostname: string;
- port: number;
- transport: Transport;
- };
+ localAddr: Addr;
+ remoteAddr: Addr;
}
-export function accept(rid: number): Promise<AcceptResponse> {
- return sendAsync("op_accept", { rid });
+export function accept(
+ rid: number,
+ transport: string
+): Promise<AcceptResponse> {
+ return sendAsync("op_accept", { rid, transport });
}
-export interface ListenRequest {
- transport: Transport;
- hostname: string;
- port: number;
-}
+export type ListenRequest = Addr;
interface ListenResponse {
rid: number;
- localAddr: {
- hostname: string;
- port: number;
- transport: Transport;
- };
+ localAddr: Addr;
}
export function listen(args: ListenRequest): ListenResponse {
@@ -56,23 +52,11 @@ export function listen(args: ListenRequest): ListenResponse {
interface ConnectResponse {
rid: number;
- localAddr: {
- hostname: string;
- port: number;
- transport: Transport;
- };
- remoteAddr: {
- hostname: string;
- port: number;
- transport: Transport;
- };
+ localAddr: Addr;
+ remoteAddr: Addr;
}
-export interface ConnectRequest {
- transport: Transport;
- hostname: string;
- port: number;
-}
+export type ConnectRequest = Addr;
export function connect(args: ConnectRequest): Promise<ConnectResponse> {
return sendAsync("op_connect", args);
@@ -80,26 +64,20 @@ export function connect(args: ConnectRequest): Promise<ConnectResponse> {
interface ReceiveResponse {
size: number;
- remoteAddr: {
- hostname: string;
- port: number;
- transport: Transport;
- };
+ remoteAddr: Addr;
}
export function receive(
rid: number,
+ transport: string,
zeroCopy: Uint8Array
): Promise<ReceiveResponse> {
- return sendAsync("op_receive", { rid }, zeroCopy);
+ return sendAsync("op_receive", { rid, transport }, zeroCopy);
}
-export interface SendRequest {
+export type SendRequest = {
rid: number;
- hostname: string;
- port: number;
- transport: Transport;
-}
+} & Addr;
export async function send(
args: SendRequest,
diff --git a/cli/js/ops/tls.ts b/cli/js/ops/tls.ts
index e52143cb0..234e569dd 100644
--- a/cli/js/ops/tls.ts
+++ b/cli/js/ops/tls.ts
@@ -1,9 +1,8 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import { sendAsync, sendSync } from "./dispatch_json.ts";
-import { Transport } from "./net.ts";
export interface ConnectTLSRequest {
- transport: Transport;
+ transport: "tcp";
hostname: string;
port: number;
certFile?: string;
@@ -14,12 +13,12 @@ interface ConnectTLSResponse {
localAddr: {
hostname: string;
port: number;
- transport: Transport;
+ transport: "tcp";
};
remoteAddr: {
hostname: string;
port: number;
- transport: Transport;
+ transport: "tcp";
};
}
@@ -34,12 +33,12 @@ interface AcceptTLSResponse {
localAddr: {
hostname: string;
port: number;
- transport: Transport;
+ transport: "tcp";
};
remoteAddr: {
hostname: string;
port: number;
- transport: Transport;
+ transport: "tcp";
};
}
@@ -50,7 +49,7 @@ export function acceptTLS(rid: number): Promise<AcceptTLSResponse> {
export interface ListenTLSRequest {
port: number;
hostname: string;
- transport: Transport;
+ transport: "tcp";
certFile: string;
keyFile: string;
}
@@ -60,7 +59,7 @@ interface ListenTLSResponse {
localAddr: {
hostname: string;
port: number;
- transport: Transport;
+ transport: "tcp";
};
}
diff --git a/cli/js/tests/net_test.ts b/cli/js/tests/net_test.ts
index f62c5c329..788203b5a 100644
--- a/cli/js/tests/net_test.ts
+++ b/cli/js/tests/net_test.ts
@@ -10,7 +10,7 @@ import {
unitTest({ perms: { net: true } }, function netTcpListenClose(): void {
const port = randomPort();
const listener = Deno.listen({ hostname: "127.0.0.1", port });
- assertEquals(listener.addr.transport, "tcp");
+ assert(listener.addr.transport === "tcp");
assertEquals(listener.addr.hostname, "127.0.0.1");
assertEquals(listener.addr.port, port);
listener.close();
@@ -29,7 +29,7 @@ unitTest(
port,
transport: "udp"
});
- assertEquals(socket.addr.transport, "udp");
+ assert(socket.addr.transport === "udp");
assertEquals(socket.addr.hostname, "127.0.0.1");
assertEquals(socket.addr.port, port);
socket.close();
@@ -37,6 +37,34 @@ unitTest(
);
unitTest(
+ { ignore: Deno.build.os === "win", perms: { read: true, write: true } },
+ function netUnixListenClose(): void {
+ const filePath = Deno.makeTempFileSync();
+ const socket = Deno.listen({
+ address: filePath,
+ transport: "unix"
+ });
+ assert(socket.addr.transport === "unix");
+ assertEquals(socket.addr.address, filePath);
+ socket.close();
+ }
+);
+
+unitTest(
+ { ignore: Deno.build.os === "win", perms: { read: true, write: true } },
+ function netUnixPacketListenClose(): void {
+ const filePath = Deno.makeTempFileSync();
+ const socket = Deno.listen({
+ address: filePath,
+ transport: "unixpacket"
+ });
+ assert(socket.addr.transport === "unixpacket");
+ assertEquals(socket.addr.address, filePath);
+ socket.close();
+ }
+);
+
+unitTest(
{
perms: { net: true }
},
@@ -58,6 +86,28 @@ unitTest(
);
unitTest(
+ { ignore: Deno.build.os === "win", perms: { read: true, write: true } },
+ async function netUnixCloseWhileAccept(): Promise<void> {
+ const filePath = await Deno.makeTempFile();
+ const listener = Deno.listen({
+ address: filePath,
+ transport: "unix"
+ });
+ const p = listener.accept();
+ listener.close();
+ let err;
+ try {
+ await p;
+ } catch (e) {
+ err = e;
+ }
+ assert(!!err);
+ assert(err instanceof Error);
+ assertEquals(err.message, "Listener has been closed");
+ }
+);
+
+unitTest(
{ perms: { net: true } },
async function netTcpConcurrentAccept(): Promise<void> {
const port = randomPort();
@@ -81,6 +131,31 @@ unitTest(
}
);
+// TODO(jsouto): Enable when tokio updates mio to v0.7!
+unitTest(
+ { ignore: true, perms: { read: true, write: true } },
+ async function netUnixConcurrentAccept(): Promise<void> {
+ const filePath = await Deno.makeTempFile();
+ const listener = Deno.listen({ transport: "unix", address: filePath });
+ let acceptErrCount = 0;
+ const checkErr = (e: Error): void => {
+ if (e.message === "Listener has been closed") {
+ assertEquals(acceptErrCount, 1);
+ } else if (e.message === "Another accept task is ongoing") {
+ acceptErrCount++;
+ } else {
+ throw new Error("Unexpected error message");
+ }
+ };
+ const p = listener.accept().catch(checkErr);
+ const p1 = listener.accept().catch(checkErr);
+ await Promise.race([p, p1]);
+ listener.close();
+ await [p, p1];
+ assertEquals(acceptErrCount, 1);
+ }
+);
+
unitTest({ perms: { net: true } }, async function netTcpDialListen(): Promise<
void
> {
@@ -89,6 +164,7 @@ unitTest({ perms: { net: true } }, async function netTcpDialListen(): Promise<
listener.accept().then(
async (conn): Promise<void> => {
assert(conn.remoteAddr != null);
+ assert(conn.localAddr.transport === "tcp");
assertEquals(conn.localAddr.hostname, "127.0.0.1");
assertEquals(conn.localAddr.port, port);
await conn.write(new Uint8Array([1, 2, 3]));
@@ -96,6 +172,7 @@ unitTest({ perms: { net: true } }, async function netTcpDialListen(): Promise<
}
);
const conn = await Deno.connect({ hostname: "127.0.0.1", port });
+ assert(conn.remoteAddr.transport === "tcp");
assertEquals(conn.remoteAddr.hostname, "127.0.0.1");
assertEquals(conn.remoteAddr.port, port);
assert(conn.localAddr != null);
@@ -117,24 +194,61 @@ unitTest({ perms: { net: true } }, async function netTcpDialListen(): Promise<
});
unitTest(
+ { ignore: Deno.build.os === "win", perms: { read: true, write: true } },
+ async function netUnixDialListen(): Promise<void> {
+ const filePath = await Deno.makeTempFile();
+ const listener = Deno.listen({ address: filePath, transport: "unix" });
+ listener.accept().then(
+ async (conn): Promise<void> => {
+ assert(conn.remoteAddr != null);
+ assert(conn.localAddr.transport === "unix");
+ assertEquals(conn.localAddr.address, filePath);
+ await conn.write(new Uint8Array([1, 2, 3]));
+ conn.close();
+ }
+ );
+ const conn = await Deno.connect({ address: filePath, transport: "unix" });
+ assert(conn.remoteAddr.transport === "unix");
+ assertEquals(conn.remoteAddr.address, filePath);
+ assert(conn.remoteAddr != null);
+ const buf = new Uint8Array(1024);
+ const readResult = await conn.read(buf);
+ assertEquals(3, readResult);
+ assertEquals(1, buf[0]);
+ assertEquals(2, buf[1]);
+ assertEquals(3, buf[2]);
+ assert(conn.rid > 0);
+
+ assert(readResult !== Deno.EOF);
+
+ const readResult2 = await conn.read(buf);
+ assertEquals(Deno.EOF, readResult2);
+
+ listener.close();
+ conn.close();
+ }
+);
+
+unitTest(
{ ignore: Deno.build.os === "win", perms: { net: true } },
async function netUdpSendReceive(): Promise<void> {
const alicePort = randomPort();
const alice = Deno.listen({ port: alicePort, transport: "udp" });
+ assert(alice.addr.transport === "udp");
assertEquals(alice.addr.port, alicePort);
- assertEquals(alice.addr.hostname, "0.0.0.0");
- assertEquals(alice.addr.transport, "udp");
+ assertEquals(alice.addr.hostname, "127.0.0.1");
const bobPort = randomPort();
const bob = Deno.listen({ port: bobPort, transport: "udp" });
+ assert(bob.addr.transport === "udp");
assertEquals(bob.addr.port, bobPort);
- assertEquals(bob.addr.hostname, "0.0.0.0");
- assertEquals(bob.addr.transport, "udp");
+ assertEquals(bob.addr.hostname, "127.0.0.1");
const sent = new Uint8Array([1, 2, 3]);
await alice.send(sent, bob.addr);
const [recvd, remote] = await bob.receive();
+ assert(remote.transport === "udp");
assertEquals(remote.port, alicePort);
assertEquals(recvd.length, 3);
assertEquals(1, recvd[0]);
@@ -146,6 +260,33 @@ unitTest(
);
unitTest(
+ { ignore: Deno.build.os === "win", perms: { read: true, write: true } },
+ async function netUnixPacketSendReceive(): Promise<void> {
+ const filePath = await Deno.makeTempFile();
+ const alice = Deno.listen({ address: filePath, transport: "unixpacket" });
+ assert(alice.addr.transport === "unixpacket");
+ assertEquals(alice.addr.address, filePath);
+
+ const bob = Deno.listen({ address: filePath, transport: "unixpacket" });
+ assert(bob.addr.transport === "unixpacket");
+ assertEquals(bob.addr.address, filePath);
+
+ const sent = new Uint8Array([1, 2, 3]);
+ await alice.send(sent, bob.addr);
+
+ const [recvd, remote] = await bob.receive();
+ assert(remote.transport === "unixpacket");
+ assertEquals(remote.address, filePath);
+ assertEquals(recvd.length, 3);
+ assertEquals(1, recvd[0]);
+ assertEquals(2, recvd[1]);
+ assertEquals(3, recvd[2]);
+ alice.close();
+ bob.close();
+ }
+);
+
+unitTest(
{ perms: { net: true } },
async function netTcpListenCloseWhileIterating(): Promise<void> {
const port = randomPort();
@@ -174,6 +315,34 @@ unitTest(
);
unitTest(
+ { ignore: Deno.build.os === "win", perms: { read: true, write: true } },
+ async function netUnixListenCloseWhileIterating(): Promise<void> {
+ const filePath = Deno.makeTempFileSync();
+ const socket = Deno.listen({ address: filePath, transport: "unix" });
+ const nextWhileClosing = socket[Symbol.asyncIterator]().next();
+ socket.close();
+ assertEquals(await nextWhileClosing, { value: undefined, done: true });
+
+ const nextAfterClosing = socket[Symbol.asyncIterator]().next();
+ assertEquals(await nextAfterClosing, { value: undefined, done: true });
+ }
+);
+
+unitTest(
+ { ignore: Deno.build.os === "win", perms: { read: true, write: true } },
+ async function netUnixPacketListenCloseWhileIterating(): Promise<void> {
+ const filePath = Deno.makeTempFileSync();
+ const socket = Deno.listen({ address: filePath, transport: "unixpacket" });
+ const nextWhileClosing = socket[Symbol.asyncIterator]().next();
+ socket.close();
+ assertEquals(await nextWhileClosing, { value: undefined, done: true });
+
+ const nextAfterClosing = socket[Symbol.asyncIterator]().next();
+ assertEquals(await nextAfterClosing, { value: undefined, done: true });
+ }
+);
+
+unitTest(
{
// FIXME(bartlomieju)
ignore: true,
diff --git a/cli/js/tls.ts b/cli/js/tls.ts
index 3ed70727d..5a9b9ace0 100644
--- a/cli/js/tls.ts
+++ b/cli/js/tls.ts
@@ -1,11 +1,11 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import * as tlsOps from "./ops/tls.ts";
-import { Listener, Transport, Conn, ConnImpl, ListenerImpl } from "./net.ts";
+import { Listener, Conn, ConnImpl, ListenerImpl } from "./net.ts";
// TODO(ry) There are many configuration options to add...
// https://docs.rs/rustls/0.16.0/rustls/struct.ClientConfig.html
interface ConnectTLSOptions {
- transport?: Transport;
+ transport?: "tcp";
port: number;
hostname?: string;
certFile?: string;
@@ -36,7 +36,7 @@ class TLSListenerImpl extends ListenerImpl {
export interface ListenTLSOptions {
port: number;
hostname?: string;
- transport?: Transport;
+ transport?: "tcp";
certFile: string;
keyFile: string;
}
diff --git a/cli/ops/io.rs b/cli/ops/io.rs
index 7969184ef..0c9a83883 100644
--- a/cli/ops/io.rs
+++ b/cli/ops/io.rs
@@ -148,6 +148,8 @@ pub enum StreamResource {
Stderr(tokio::io::Stderr),
FsFile(tokio::fs::File, FileMetadata),
TcpStream(tokio::net::TcpStream),
+ #[cfg(not(windows))]
+ UnixStream(tokio::net::UnixStream),
ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
HttpBody(Box<HttpBody>),
@@ -183,6 +185,8 @@ impl DenoAsyncRead for StreamResource {
FsFile(f, _) => f,
Stdin(f, _) => f,
TcpStream(f) => f,
+ #[cfg(not(windows))]
+ UnixStream(f) => f,
ClientTlsStream(f) => f,
ServerTlsStream(f) => f,
ChildStdout(f) => f,
@@ -262,6 +266,8 @@ impl DenoAsyncWrite for StreamResource {
Stdout(f) => f,
Stderr(f) => f,
TcpStream(f) => f,
+ #[cfg(not(windows))]
+ UnixStream(f) => f,
ClientTlsStream(f) => f,
ServerTlsStream(f) => f,
ChildStdin(f) => f,
@@ -279,6 +285,8 @@ impl DenoAsyncWrite for StreamResource {
Stdout(f) => f,
Stderr(f) => f,
TcpStream(f) => f,
+ #[cfg(not(windows))]
+ UnixStream(f) => f,
ClientTlsStream(f) => f,
ServerTlsStream(f) => f,
ChildStdin(f) => f,
diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs
index c011facfc..b91a61c3a 100644
--- a/cli/ops/mod.rs
+++ b/cli/ops/mod.rs
@@ -15,6 +15,8 @@ pub mod fs;
pub mod fs_events;
pub mod io;
pub mod net;
+#[cfg(unix)]
+mod net_unix;
pub mod os;
pub mod permissions;
pub mod plugins;
diff --git a/cli/ops/net.rs b/cli/ops/net.rs
index 3987e94c1..f074ef9ee 100644
--- a/cli/ops/net.rs
+++ b/cli/ops/net.rs
@@ -18,6 +18,9 @@ use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::net::UdpSocket;
+#[cfg(unix)]
+use super::net_unix;
+
pub fn init(i: &mut Isolate, s: &State) {
i.register_op("op_accept", s.stateful_json_op(op_accept));
i.register_op("op_connect", s.stateful_json_op(op_connect));
@@ -30,14 +33,14 @@ pub fn init(i: &mut Isolate, s: &State) {
#[derive(Deserialize)]
struct AcceptArgs {
rid: i32,
+ transport: String,
}
-fn op_accept(
+fn accept_tcp(
state: &State,
- args: Value,
+ args: AcceptArgs,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
- let args: AcceptArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let state_ = state.clone();
{
@@ -102,20 +105,36 @@ fn op_accept(
Ok(JsonOp::Async(op.boxed_local()))
}
+fn op_accept(
+ state: &State,
+ args: Value,
+ zero_copy: Option<ZeroCopyBuf>,
+) -> Result<JsonOp, OpError> {
+ let args: AcceptArgs = serde_json::from_value(args)?;
+ match args.transport.as_str() {
+ "tcp" => accept_tcp(state, args, zero_copy),
+ #[cfg(unix)]
+ "unix" => net_unix::accept_unix(state, args.rid as u32, zero_copy),
+ _ => Err(OpError::other(format!(
+ "Unsupported transport protocol {}",
+ args.transport
+ ))),
+ }
+}
+
#[derive(Deserialize)]
struct ReceiveArgs {
rid: i32,
+ transport: String,
}
-fn op_receive(
+fn receive_udp(
state: &State,
- args: Value,
+ args: ReceiveArgs,
zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
- assert!(zero_copy.is_some());
let mut buf = zero_copy.unwrap();
- let args: ReceiveArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let state_ = state.clone();
@@ -145,12 +164,32 @@ fn op_receive(
Ok(JsonOp::Async(op.boxed_local()))
}
+fn op_receive(
+ state: &State,
+ args: Value,
+ zero_copy: Option<ZeroCopyBuf>,
+) -> Result<JsonOp, OpError> {
+ assert!(zero_copy.is_some());
+ let args: ReceiveArgs = serde_json::from_value(args)?;
+ match args.transport.as_str() {
+ "udp" => receive_udp(state, args, zero_copy),
+ #[cfg(unix)]
+ "unixpacket" => {
+ net_unix::receive_unix_packet(state, args.rid as u32, zero_copy)
+ }
+ _ => Err(OpError::other(format!(
+ "Unsupported transport protocol {}",
+ args.transport
+ ))),
+ }
+}
+
#[derive(Deserialize)]
struct SendArgs {
rid: i32,
- hostname: String,
- port: u16,
transport: String,
+ #[serde(flatten)]
+ transport_args: ArgsEnum,
}
fn op_send(
@@ -160,38 +199,67 @@ fn op_send(
) -> Result<JsonOp, OpError> {
assert!(zero_copy.is_some());
let buf = zero_copy.unwrap();
-
- let args: SendArgs = serde_json::from_value(args)?;
- assert_eq!(args.transport, "udp");
- let rid = args.rid as u32;
-
let state_ = state.clone();
- state.check_net(&args.hostname, args.port)?;
-
- let op = async move {
- let mut state = state_.borrow_mut();
- let resource = state
- .resource_table
- .get_mut::<UdpSocketResource>(rid)
- .ok_or_else(|| {
- OpError::bad_resource("Socket has been closed".to_string())
- })?;
-
- let socket = &mut resource.socket;
- let addr = resolve_addr(&args.hostname, args.port).await?;
- socket.send_to(&buf, addr).await?;
-
- Ok(json!({}))
- };
-
- Ok(JsonOp::Async(op.boxed_local()))
+ match serde_json::from_value(args)? {
+ SendArgs {
+ rid,
+ transport,
+ transport_args: ArgsEnum::Ip(args),
+ } if transport == "udp" => {
+ state.check_net(&args.hostname, args.port)?;
+
+ let op = async move {
+ let mut state = state_.borrow_mut();
+ let resource = state
+ .resource_table
+ .get_mut::<UdpSocketResource>(rid as u32)
+ .ok_or_else(|| {
+ OpError::bad_resource("Socket has been closed".to_string())
+ })?;
+ let socket = &mut resource.socket;
+ let addr = resolve_addr(&args.hostname, args.port).await?;
+ socket.send_to(&buf, addr).await?;
+ Ok(json!({}))
+ };
+
+ Ok(JsonOp::Async(op.boxed_local()))
+ }
+ #[cfg(unix)]
+ SendArgs {
+ rid,
+ transport,
+ transport_args: ArgsEnum::Unix(args),
+ } if transport == "unixpacket" => {
+ let address_path = net_unix::Path::new(&args.address);
+ state.check_read(&address_path)?;
+ let op = async move {
+ let mut state = state_.borrow_mut();
+ let resource = state
+ .resource_table
+ .get_mut::<net_unix::UnixDatagramResource>(rid as u32)
+ .ok_or_else(|| {
+ OpError::other("Socket has been closed".to_string())
+ })?;
+
+ let socket = &mut resource.socket;
+ socket
+ .send_to(&buf, &resource.local_addr.as_pathname().unwrap())
+ .await?;
+
+ Ok(json!({}))
+ };
+
+ Ok(JsonOp::Async(op.boxed_local()))
+ }
+ _ => Err(OpError::other("Wrong argument format!".to_owned())),
+ }
}
#[derive(Deserialize)]
struct ConnectArgs {
transport: String,
- hostname: String,
- port: u16,
+ #[serde(flatten)]
+ transport_args: ArgsEnum,
}
fn op_connect(
@@ -199,39 +267,78 @@ fn op_connect(
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
- let args: ConnectArgs = serde_json::from_value(args)?;
- assert_eq!(args.transport, "tcp"); // TODO Support others.
- let state_ = state.clone();
- state.check_net(&args.hostname, args.port)?;
-
- let op = async move {
- let addr = resolve_addr(&args.hostname, args.port).await?;
- let tcp_stream = TcpStream::connect(&addr).await?;
- let local_addr = tcp_stream.local_addr()?;
- let remote_addr = tcp_stream.peer_addr()?;
- let mut state = state_.borrow_mut();
- let rid = state.resource_table.add(
- "tcpStream",
- Box::new(StreamResourceHolder::new(StreamResource::TcpStream(
- tcp_stream,
- ))),
- );
- Ok(json!({
- "rid": rid,
- "localAddr": {
- "hostname": local_addr.ip().to_string(),
- "port": local_addr.port(),
- "transport": args.transport,
- },
- "remoteAddr": {
- "hostname": remote_addr.ip().to_string(),
- "port": remote_addr.port(),
- "transport": args.transport,
- }
- }))
- };
-
- Ok(JsonOp::Async(op.boxed_local()))
+ match serde_json::from_value(args)? {
+ ConnectArgs {
+ transport,
+ transport_args: ArgsEnum::Ip(args),
+ } if transport == "tcp" => {
+ let state_ = state.clone();
+ state.check_net(&args.hostname, args.port)?;
+ let op = async move {
+ let addr = resolve_addr(&args.hostname, args.port).await?;
+ let tcp_stream = TcpStream::connect(&addr).await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+ let mut state = state_.borrow_mut();
+ let rid = state.resource_table.add(
+ "tcpStream",
+ Box::new(StreamResourceHolder::new(StreamResource::TcpStream(
+ tcp_stream,
+ ))),
+ );
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "hostname": local_addr.ip().to_string(),
+ "port": local_addr.port(),
+ "transport": transport,
+ },
+ "remoteAddr": {
+ "hostname": remote_addr.ip().to_string(),
+ "port": remote_addr.port(),
+ "transport": transport,
+ }
+ }))
+ };
+ Ok(JsonOp::Async(op.boxed_local()))
+ }
+ #[cfg(unix)]
+ ConnectArgs {
+ transport,
+ transport_args: ArgsEnum::Unix(args),
+ } if transport == "unix" => {
+ let address_path = net_unix::Path::new(&args.address);
+ let state_ = state.clone();
+ state.check_read(&address_path)?;
+ let op = async move {
+ let address = args.address;
+ let unix_stream =
+ net_unix::UnixStream::connect(net_unix::Path::new(&address)).await?;
+ let local_addr = unix_stream.local_addr()?;
+ let remote_addr = unix_stream.peer_addr()?;
+ let mut state = state_.borrow_mut();
+ let rid = state.resource_table.add(
+ "unixStream",
+ Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
+ unix_stream,
+ ))),
+ );
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "address": local_addr.as_pathname(),
+ "transport": transport,
+ },
+ "remoteAddr": {
+ "address": remote_addr.as_pathname(),
+ "transport": transport,
+ }
+ }))
+ };
+ Ok(JsonOp::Async(op.boxed_local()))
+ }
+ _ => Err(OpError::other("Wrong argument format!".to_owned())),
+ }
}
#[derive(Deserialize)]
@@ -265,19 +372,17 @@ fn op_shutdown(
StreamResource::TcpStream(ref mut stream) => {
TcpStream::shutdown(stream, shutdown_mode).map_err(OpError::from)?;
}
+ #[cfg(unix)]
+ StreamResource::UnixStream(ref mut stream) => {
+ net_unix::UnixStream::shutdown(stream, shutdown_mode)
+ .map_err(OpError::from)?;
+ }
_ => return Err(OpError::bad_resource_id()),
}
Ok(JsonOp::Sync(json!({})))
}
-#[derive(Deserialize)]
-struct ListenArgs {
- transport: String,
- hostname: String,
- port: u16,
-}
-
#[allow(dead_code)]
struct TcpListenerResource {
listener: TcpListener,
@@ -331,6 +436,27 @@ struct UdpSocketResource {
socket: UdpSocket,
}
+#[derive(Deserialize)]
+struct IpListenArgs {
+ hostname: String,
+ port: u16,
+}
+
+#[derive(Deserialize)]
+#[serde(untagged)]
+enum ArgsEnum {
+ Ip(IpListenArgs),
+ #[cfg(unix)]
+ Unix(net_unix::UnixListenArgs),
+}
+
+#[derive(Deserialize)]
+struct ListenArgs {
+ transport: String,
+ #[serde(flatten)]
+ transport_args: ArgsEnum,
+}
+
fn listen_tcp(
state: &State,
addr: SocketAddr,
@@ -370,33 +496,60 @@ fn op_listen(
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
- let args: ListenArgs = serde_json::from_value(args)?;
- assert!(args.transport == "tcp" || args.transport == "udp");
-
- state.check_net(&args.hostname, args.port)?;
-
- let addr =
- futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
-
- let (rid, local_addr) = if args.transport == "tcp" {
- listen_tcp(state, addr)?
- } else {
- listen_udp(state, addr)?
- };
-
- debug!(
- "New listener {} {}:{}",
- rid,
- local_addr.ip().to_string(),
- local_addr.port()
- );
-
- Ok(JsonOp::Sync(json!({
- "rid": rid,
- "localAddr": {
- "hostname": local_addr.ip().to_string(),
- "port": local_addr.port(),
- "transport": args.transport,
- },
- })))
+ match serde_json::from_value(args)? {
+ ListenArgs {
+ transport,
+ transport_args: ArgsEnum::Ip(args),
+ } => {
+ state.check_net(&args.hostname, args.port)?;
+ let addr =
+ futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
+ let (rid, local_addr) = if transport == "tcp" {
+ listen_tcp(state, addr)?
+ } else {
+ listen_udp(state, addr)?
+ };
+ debug!(
+ "New listener {} {}:{}",
+ rid,
+ local_addr.ip().to_string(),
+ local_addr.port()
+ );
+ Ok(JsonOp::Sync(json!({
+ "rid": rid,
+ "localAddr": {
+ "hostname": local_addr.ip().to_string(),
+ "port": local_addr.port(),
+ "transport": transport,
+ },
+ })))
+ }
+ #[cfg(unix)]
+ ListenArgs {
+ transport,
+ transport_args: ArgsEnum::Unix(args),
+ } if transport == "unix" || transport == "unixpacket" => {
+ let address_path = net_unix::Path::new(&args.address);
+ state.check_read(&address_path)?;
+ let (rid, local_addr) = if transport == "unix" {
+ net_unix::listen_unix(state, &address_path)?
+ } else {
+ net_unix::listen_unix_packet(state, &address_path)?
+ };
+ debug!(
+ "New listener {} {}",
+ rid,
+ local_addr.as_pathname().unwrap().display(),
+ );
+ Ok(JsonOp::Sync(json!({
+ "rid": rid,
+ "localAddr": {
+ "address": local_addr.as_pathname(),
+ "transport": transport,
+ },
+ })))
+ }
+ #[cfg(unix)]
+ _ => Err(OpError::other("Wrong argument format!".to_owned())),
+ }
}
diff --git a/cli/ops/net_unix.rs b/cli/ops/net_unix.rs
new file mode 100644
index 000000000..43778e7c6
--- /dev/null
+++ b/cli/ops/net_unix.rs
@@ -0,0 +1,142 @@
+use super::dispatch_json::{Deserialize, JsonOp};
+use super::io::{StreamResource, StreamResourceHolder};
+use crate::op_error::OpError;
+use crate::state::State;
+use futures::future::FutureExt;
+
+use deno_core::*;
+use std::fs::remove_file;
+use std::os::unix;
+pub use std::path::Path;
+use tokio::net::UnixDatagram;
+use tokio::net::UnixListener;
+pub use tokio::net::UnixStream;
+
+struct UnixListenerResource {
+ listener: UnixListener,
+}
+
+pub struct UnixDatagramResource {
+ pub socket: UnixDatagram,
+ pub local_addr: unix::net::SocketAddr,
+}
+
+#[derive(Deserialize)]
+pub struct UnixListenArgs {
+ pub address: String,
+}
+
+pub fn accept_unix(
+ state: &State,
+ rid: u32,
+ _zero_copy: Option<ZeroCopyBuf>,
+) -> Result<JsonOp, OpError> {
+ let state_ = state.clone();
+ {
+ let state = state.borrow();
+ state
+ .resource_table
+ .get::<UnixListenerResource>(rid)
+ .ok_or_else(OpError::bad_resource_id)?;
+ }
+ let op = async move {
+ let mut state = state_.borrow_mut();
+ let listener_resource = state
+ .resource_table
+ .get_mut::<UnixListenerResource>(rid)
+ .ok_or_else(|| {
+ OpError::bad_resource("Listener has been closed".to_string())
+ })?;
+ let (unix_stream, _socket_addr) =
+ listener_resource.listener.accept().await?;
+ let local_addr = unix_stream.local_addr()?;
+ let remote_addr = unix_stream.peer_addr()?;
+ let rid = state.resource_table.add(
+ "unixStream",
+ Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
+ unix_stream,
+ ))),
+ );
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "address": local_addr.as_pathname(),
+ "transport": "unix",
+ },
+ "remoteAddr": {
+ "address": remote_addr.as_pathname(),
+ "transport": "unix",
+ }
+ }))
+ };
+
+ Ok(JsonOp::Async(op.boxed_local()))
+}
+
+pub fn receive_unix_packet(
+ state: &State,
+ rid: u32,
+ zero_copy: Option<ZeroCopyBuf>,
+) -> Result<JsonOp, OpError> {
+ let mut buf = zero_copy.unwrap();
+ let state_ = state.clone();
+
+ let op = async move {
+ let mut state = state_.borrow_mut();
+ let resource = state
+ .resource_table
+ .get_mut::<UnixDatagramResource>(rid)
+ .ok_or_else(|| {
+ OpError::bad_resource("Socket has been closed".to_string())
+ })?;
+ let (size, remote_addr) = resource.socket.recv_from(&mut buf).await?;
+ Ok(json!({
+ "size": size,
+ "remoteAddr": {
+ "address": remote_addr.as_pathname(),
+ "transport": "unixpacket",
+ }
+ }))
+ };
+
+ Ok(JsonOp::Async(op.boxed_local()))
+}
+
+pub fn listen_unix(
+ state: &State,
+ addr: &Path,
+) -> Result<(u32, unix::net::SocketAddr), OpError> {
+ let mut state = state.borrow_mut();
+ if addr.exists() {
+ remove_file(&addr).unwrap();
+ }
+ let listener = UnixListener::bind(&addr)?;
+ let local_addr = listener.local_addr()?;
+ let listener_resource = UnixListenerResource { listener };
+ let rid = state
+ .resource_table
+ .add("unixListener", Box::new(listener_resource));
+
+ Ok((rid, local_addr))
+}
+
+pub fn listen_unix_packet(
+ state: &State,
+ addr: &Path,
+) -> Result<(u32, unix::net::SocketAddr), OpError> {
+ let mut state = state.borrow_mut();
+ if addr.exists() {
+ remove_file(&addr).unwrap();
+ }
+ let socket = UnixDatagram::bind(&addr)?;
+ let local_addr = socket.local_addr()?;
+ let datagram_resource = UnixDatagramResource {
+ socket,
+ local_addr: local_addr.clone(),
+ };
+ let rid = state
+ .resource_table
+ .add("unixDatagram", Box::new(datagram_resource));
+
+ Ok((rid, local_addr))
+}
diff --git a/tools/deno_tcp.ts b/tools/deno_tcp.ts
index 9a884cb70..068c48a04 100644
--- a/tools/deno_tcp.ts
+++ b/tools/deno_tcp.ts
@@ -8,7 +8,6 @@ const listener = Deno.listen({ hostname, port: Number(port) });
const response = new TextEncoder().encode(
"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n"
);
-
async function handle(conn: Deno.Conn): Promise<void> {
const buffer = new Uint8Array(1024);
try {