summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGianluca Oldani <oldanigianluca@gmail.com>2022-10-24 11:05:07 +0200
committerGitHub <noreply@github.com>2022-10-24 09:05:07 +0000
commit873a5ce2eddb65cdfea7fc8bbcae3e8dfef5dfb9 (patch)
treec02d782463ae2d8c412118aee35170be6cca7238
parent38213f1142200e9184d1c5ae1e25ff781248362a (diff)
feat(ext/net): add reuseAddress option for UDP (#13849)
This commit adds a `reuseAddress` option for UDP sockets. When this option is enabled, one can listen on an address even though it is already being listened on from a different process or thread. The new socket will steal the address from the existing socket. On Windows and Linux this uses the `SO_REUSEADDR` option, while on other Unixes this is done with `SO_REUSEPORT`. This behavior aligns with what libuv does. TCP sockets still unconditionally set the `SO_REUSEADDR` flag - this behavior matches Node.js and Go. This PR does not change this behaviour. Co-authored-by: Luca Casonato <hello@lcas.dev>
-rw-r--r--cli/dts/lib.deno.unstable.d.ts13
-rw-r--r--cli/tests/unit/net_test.ts99
-rw-r--r--cli/tests/unit/tls_test.ts51
-rw-r--r--ext/net/ops.rs41
4 files changed, 199 insertions, 5 deletions
diff --git a/cli/dts/lib.deno.unstable.d.ts b/cli/dts/lib.deno.unstable.d.ts
index 72f5fc058..85b5911f8 100644
--- a/cli/dts/lib.deno.unstable.d.ts
+++ b/cli/dts/lib.deno.unstable.d.ts
@@ -1073,6 +1073,17 @@ declare namespace Deno {
/** **UNSTABLE**: New API, yet to be vetted.
*
+ * @category Network
+ */
+ export interface UdpListenOptions extends ListenOptions {
+ /** When `true` the specified address will be reused, even if another
+ * process has already bound a socket on it. This effectively steals the
+ * socket from the listener. Defaults to `false`. */
+ reuseAddress?: boolean;
+ }
+
+ /** **UNSTABLE**: New API, yet to be vetted.
+ *
* Listen announces on the local transport address.
*
* ```ts
@@ -1110,7 +1121,7 @@ declare namespace Deno {
* @category Network
*/
export function listenDatagram(
- options: ListenOptions & { transport: "udp" },
+ options: UdpListenOptions & { transport: "udp" },
): DatagramConn;
/** **UNSTABLE**: New API, yet to be vetted.
diff --git a/cli/tests/unit/net_test.ts b/cli/tests/unit/net_test.ts
index eeaada05e..d2beb5566 100644
--- a/cli/tests/unit/net_test.ts
+++ b/cli/tests/unit/net_test.ts
@@ -906,3 +906,102 @@ Deno.test({
);
listener.close();
});
+
+Deno.test({ permissions: { net: true } }, async function netTcpReuseAddr() {
+ const listener1 = Deno.listen({
+ hostname: "127.0.0.1",
+ port: 3500,
+ });
+ listener1.accept().then(
+ (conn) => {
+ conn.close();
+ },
+ );
+
+ const conn1 = await Deno.connect({ hostname: "127.0.0.1", port: 3500 });
+ const buf1 = new Uint8Array(1024);
+ await conn1.read(buf1);
+ listener1.close();
+ conn1.close();
+
+ const listener2 = Deno.listen({
+ hostname: "127.0.0.1",
+ port: 3500,
+ });
+
+ listener2.accept().then(
+ (conn) => {
+ conn.close();
+ },
+ );
+
+ const conn2 = await Deno.connect({ hostname: "127.0.0.1", port: 3500 });
+ const buf2 = new Uint8Array(1024);
+ await conn2.read(buf2);
+
+ listener2.close();
+ conn2.close();
+});
+
+Deno.test(
+ { permissions: { net: true } },
+ async function netUdpReuseAddr() {
+ const sender = Deno.listenDatagram({
+ port: 4002,
+ transport: "udp",
+ });
+ const listener1 = Deno.listenDatagram({
+ port: 4000,
+ transport: "udp",
+ reuseAddress: true,
+ });
+ const listener2 = Deno.listenDatagram({
+ port: 4000,
+ transport: "udp",
+ reuseAddress: true,
+ });
+
+ const sent = new Uint8Array([1, 2, 3]);
+ await sender.send(sent, listener1.addr);
+ await Promise.any([listener1.receive(), listener2.receive()]).then(
+ ([recvd, remote]) => {
+ assert(remote.transport === "udp");
+ assertEquals(recvd.length, 3);
+ assertEquals(1, recvd[0]);
+ assertEquals(2, recvd[1]);
+ assertEquals(3, recvd[2]);
+ },
+ );
+ sender.close();
+ listener1.close();
+ listener2.close();
+ },
+);
+
+Deno.test(
+ { permissions: { net: true } },
+ function netUdpNoReuseAddr() {
+ let listener1;
+ try {
+ listener1 = Deno.listenDatagram({
+ port: 4001,
+ transport: "udp",
+ reuseAddress: false,
+ });
+ } catch (err) {
+ assert(err);
+ assert(err instanceof Deno.errors.AddrInUse); // AddrInUse from previous test
+ }
+
+ assertThrows(() => {
+ Deno.listenDatagram({
+ port: 4001,
+ transport: "udp",
+ reuseAddress: false,
+ });
+ }, Deno.errors.AddrInUse);
+ if (typeof listener1 !== "undefined") {
+ listener1.close();
+ }
+ },
+);
diff --git a/cli/tests/unit/tls_test.ts b/cli/tests/unit/tls_test.ts
index 860965e49..df82f6ef6 100644
--- a/cli/tests/unit/tls_test.ts
+++ b/cli/tests/unit/tls_test.ts
@@ -1376,3 +1376,54 @@ Deno.test(
await Promise.all([server(), startTlsClient()]);
},
);
+
+Deno.test(
+ { permissions: { read: false, net: true } },
+ async function listenTlsWithReuseAddr() {
+ const resolvable1 = deferred();
+ const hostname = "localhost";
+ const port = 3500;
+
+ const listener1 = Deno.listenTls({ hostname, port, cert, key });
+
+ const response1 = encoder.encode(
+ "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n",
+ );
+
+ listener1.accept().then(
+ async (conn) => {
+ await conn.write(response1);
+ setTimeout(() => {
+ conn.close();
+ resolvable1.resolve();
+ }, 0);
+ },
+ );
+
+ const conn1 = await Deno.connectTls({ hostname, port, caCerts });
+ conn1.close();
+ listener1.close();
+ await resolvable1;
+
+ const resolvable2 = deferred();
+ const listener2 = Deno.listenTls({ hostname, port, cert, key });
+ const response2 = encoder.encode(
+ "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n",
+ );
+
+ listener2.accept().then(
+ async (conn) => {
+ await conn.write(response2);
+ setTimeout(() => {
+ conn.close();
+ resolvable2.resolve();
+ }, 0);
+ },
+ );
+
+ const conn2 = await Deno.connectTls({ hostname, port, caCerts });
+ conn2.close();
+ listener2.close();
+ await resolvable2;
+ },
+);
diff --git a/ext/net/ops.rs b/ext/net/ops.rs
index 41d04467e..399baa4fd 100644
--- a/ext/net/ops.rs
+++ b/ext/net/ops.rs
@@ -25,6 +25,7 @@ use log::debug;
use serde::Deserialize;
use serde::Serialize;
use socket2::Domain;
+use socket2::Protocol;
use socket2::Socket;
use socket2::Type;
use std::borrow::Cow;
@@ -417,9 +418,11 @@ impl Resource for UdpSocketResource {
}
#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
struct IpListenArgs {
hostname: String,
port: u16,
+ reuse_address: Option<bool>,
}
#[derive(Deserialize)]
@@ -468,11 +471,35 @@ fn listen_tcp(
fn listen_udp(
state: &mut OpState,
addr: SocketAddr,
+ reuse_address: Option<bool>,
) -> Result<(u32, SocketAddr), AnyError> {
- let std_socket = std::net::UdpSocket::bind(&addr)?;
- std_socket.set_nonblocking(true)?;
+ let domain = if addr.is_ipv4() {
+ Domain::IPV4
+ } else {
+ Domain::IPV6
+ };
+ let socket_tmp = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
+ if reuse_address.unwrap_or(false) {
+ // This logic is taken from libuv:
+ //
+ // On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
+ // refinements for programs that use multicast.
+ //
+ // Linux as of 3.9 has a SO_REUSEPORT socket option but with semantics that
+ // are different from the BSDs: it _shares_ the port rather than steal it
+ // from the current listener. While useful, it's not something we can
+ // emulate on other platforms so we don't enable it.
+ #[cfg(any(target_os = "windows", target_os = "linux"))]
+ socket_tmp.set_reuse_address(true)?;
+ #[cfg(all(unix, not(target_os = "linux")))]
+ socket_tmp.set_reuse_port(true)?;
+ }
+ let socket_addr = socket2::SockAddr::from(addr);
+ socket_tmp.bind(&socket_addr)?;
+ socket_tmp.set_nonblocking(true)?;
// Enable messages to be sent to the broadcast address (255.255.255.255) by default
- std_socket.set_broadcast(true)?;
+ socket_tmp.set_broadcast(true)?;
+ let std_socket: std::net::UdpSocket = socket_tmp.into();
let socket = UdpSocket::from_std(std_socket)?;
let local_addr = socket.local_addr()?;
let socket_resource = UdpSocketResource {
@@ -510,9 +537,14 @@ where
.next()
.ok_or_else(|| generic_error("No resolved address found"))?;
let (rid, local_addr) = if transport == "tcp" {
+ if args.reuse_address.is_some() {
+ return Err(generic_error(
+ "The reuseAddress option is not supported for TCP",
+ ));
+ }
listen_tcp(state, addr)?
} else {
- listen_udp(state, addr)?
+ listen_udp(state, addr, args.reuse_address)?
};
debug!(
"New listener {} {}:{}",
@@ -1099,6 +1131,7 @@ mod tests {
let ip_args = IpListenArgs {
hostname: String::from(server_addr[0]),
port: server_addr[1].parse().unwrap(),
+ reuse_address: None,
};
let connect_args = ConnectArgs {
transport: String::from("tcp"),