summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/unit/net_test.ts135
-rw-r--r--cli/tsc/dts/lib.deno.unstable.d.ts49
-rw-r--r--ext/net/01_net.js59
-rw-r--r--ext/net/lib.rs6
-rw-r--r--ext/net/ops.rs164
5 files changed, 409 insertions, 4 deletions
diff --git a/cli/tests/unit/net_test.ts b/cli/tests/unit/net_test.ts
index 652d07d7d..935a6f846 100644
--- a/cli/tests/unit/net_test.ts
+++ b/cli/tests/unit/net_test.ts
@@ -426,6 +426,141 @@ Deno.test(
);
Deno.test(
+ { permissions: { net: true }, ignore: true },
+ async function netUdpMulticastV4() {
+ const listener = Deno.listenDatagram({
+ hostname: "0.0.0.0",
+ port: 5353,
+ transport: "udp",
+ reuseAddress: true,
+ });
+
+ const membership = await listener.joinMulticastV4(
+ "224.0.0.251",
+ "127.0.0.1",
+ );
+
+ membership.setLoopback(true);
+ membership.setLoopback(false);
+ membership.setTTL(50);
+ membership.leave();
+ listener.close();
+ },
+);
+
+Deno.test(
+ { permissions: { net: true }, ignore: true },
+ async function netUdpMulticastV6() {
+ const listener = Deno.listenDatagram({
+ hostname: "::",
+ port: 5353,
+ transport: "udp",
+ reuseAddress: true,
+ });
+
+ const membership = await listener.joinMulticastV6(
+ "ff02::fb",
+ 1,
+ );
+
+ membership.setLoopback(true);
+ membership.setLoopback(false);
+ membership.leave();
+ listener.close();
+ },
+);
+
+Deno.test(
+ { permissions: { net: true }, ignore: true },
+ async function netUdpSendReceiveMulticastv4() {
+ const alice = Deno.listenDatagram({
+ hostname: "0.0.0.0",
+ port: 5353,
+ transport: "udp",
+ reuseAddress: true,
+ loopback: true,
+ });
+
+ const bob = Deno.listenDatagram({
+ hostname: "0.0.0.0",
+ port: 5353,
+ transport: "udp",
+ reuseAddress: true,
+ });
+
+ const aliceMembership = await alice.joinMulticastV4(
+ "224.0.0.1",
+ "0.0.0.0",
+ );
+
+ const bobMembership = await bob.joinMulticastV4("224.0.0.1", "0.0.0.0");
+
+ const sent = new Uint8Array([1, 2, 3]);
+
+ await alice.send(sent, {
+ hostname: "224.0.0.1",
+ port: 5353,
+ transport: "udp",
+ });
+
+ const [recvd, remote] = await bob.receive();
+
+ assert(remote.transport === "udp");
+ assertEquals(remote.port, 5353);
+ assertEquals(recvd.length, 3);
+ assertEquals(1, recvd[0]);
+ assertEquals(2, recvd[1]);
+ assertEquals(3, recvd[2]);
+
+ aliceMembership.leave();
+ bobMembership.leave();
+
+ alice.close();
+ bob.close();
+ },
+);
+
+Deno.test(
+ { permissions: { net: true }, ignore: true },
+ async function netUdpMulticastLoopbackOption() {
+ // Must bind sender to an address that can send to the broadcast address on MacOS.
+ // Macos will give us error 49 when sending the broadcast packet if we omit hostname here.
+ const listener = Deno.listenDatagram({
+ port: 5353,
+ transport: "udp",
+ hostname: "0.0.0.0",
+ loopback: true,
+ reuseAddress: true,
+ });
+
+ const membership = await listener.joinMulticastV4(
+ "224.0.0.1",
+ "0.0.0.0",
+ );
+
+ // await membership.setLoopback(true);
+
+ const sent = new Uint8Array([1, 2, 3]);
+ const byteLength = await listener.send(sent, {
+ hostname: "224.0.0.1",
+ port: 5353,
+ transport: "udp",
+ });
+
+ assertEquals(byteLength, 3);
+ const [recvd, remote] = await listener.receive();
+ assert(remote.transport === "udp");
+ assertEquals(remote.port, 5353);
+ assertEquals(recvd.length, 3);
+ assertEquals(1, recvd[0]);
+ assertEquals(2, recvd[1]);
+ assertEquals(3, recvd[2]);
+ membership.leave();
+ listener.close();
+ },
+);
+
+Deno.test(
{ permissions: { net: true } },
async function netUdpConcurrentSendReceive() {
const socket = Deno.listenDatagram({ port: 3500, transport: "udp" });
diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts
index ed7e682f1..62426ca35 100644
--- a/cli/tsc/dts/lib.deno.unstable.d.ts
+++ b/cli/tsc/dts/lib.deno.unstable.d.ts
@@ -163,7 +163,7 @@ declare namespace Deno {
*/
type ToNativeResultType<T extends NativeResultType = NativeResultType> =
T extends NativeStructType ? BufferSource
- : ToNativeResultTypeMap[Exclude<T, NativeStructType>];
+ : ToNativeResultTypeMap[Exclude<T, NativeStructType>];
/** **UNSTABLE**: New API, yet to be vetted.
*
@@ -225,7 +225,7 @@ declare namespace Deno {
*/
type FromNativeResultType<T extends NativeResultType = NativeResultType> =
T extends NativeStructType ? Uint8Array
- : FromNativeResultTypeMap[Exclude<T, NativeStructType>];
+ : FromNativeResultTypeMap[Exclude<T, NativeStructType>];
/** **UNSTABLE**: New API, yet to be vetted.
*
@@ -852,11 +852,51 @@ declare namespace Deno {
/** **UNSTABLE**: New API, yet to be vetted.
*
+ * Represents membership of a IPv4 multicast group.
+ *
+ * @category Network
+ */
+ interface MulticastV4Membership {
+ /** Leaves the multicast group. */
+ leave: () => Promise<void>;
+ /** Sets the multicast loopback option. If enabled, multicast packets will be looped back to the local socket. */
+ setLoopback: (loopback: boolean) => Promise<void>;
+ /** Sets the time-to-live of outgoing multicast packets for this socket. */
+ setTTL: (ttl: number) => Promise<void>;
+ }
+
+ /** **UNSTABLE**: New API, yet to be vetted.
+ *
+ * Represents membership of a IPv6 multicast group.
+ *
+ * @category Network
+ */
+ interface MulticastV6Membership {
+ /** Leaves the multicast group. */
+ leave: () => Promise<void>;
+ /** Sets the multicast loopback option. If enabled, multicast packets will be looped back to the local socket. */
+ setLoopback: (loopback: boolean) => Promise<void>;
+ }
+
+ /** **UNSTABLE**: New API, yet to be vetted.
+ *
* A generic transport listener for message-oriented protocols.
*
* @category Network
*/
export interface DatagramConn extends AsyncIterable<[Uint8Array, Addr]> {
+ /** Joins an IPv4 multicast group. */
+ joinMulticastV4(
+ address: string,
+ networkInterface: string,
+ ): Promise<MulticastV4Membership>;
+
+ /** Joins an IPv6 multicast group. */
+ joinMulticastV6(
+ address: string,
+ networkInterface: number,
+ ): Promise<MulticastV6Membership>;
+
/** Waits for and resolves to the next message to the instance.
*
* Messages are received in the format of a tuple containing the data array
@@ -918,6 +958,11 @@ declare namespace Deno {
*
* @default {false} */
reuseAddress?: boolean;
+
+ /** When `true`, sent multicast packets will be looped back to the local socket.
+ *
+ * @default {false} */
+ loopback?: boolean;
}
/** **UNSTABLE**: New API, yet to be vetted.
diff --git a/ext/net/01_net.js b/ext/net/01_net.js
index 8d8e34e56..81e13f094 100644
--- a/ext/net/01_net.js
+++ b/ext/net/01_net.js
@@ -277,6 +277,64 @@ class Datagram {
return this.#addr;
}
+ async joinMulticastV4(addr, multiInterface) {
+ await core.opAsync(
+ "op_net_join_multi_v4_udp",
+ this.rid,
+ addr,
+ multiInterface,
+ );
+
+ return {
+ leave: () =>
+ core.opAsync(
+ "op_net_leave_multi_v4_udp",
+ this.rid,
+ addr,
+ multiInterface,
+ ),
+ setLoopback: (loopback) =>
+ core.opAsync(
+ "op_net_set_multi_loopback_udp",
+ this.rid,
+ true,
+ loopback,
+ ),
+ setTTL: (ttl) =>
+ core.opAsync(
+ "op_net_set_multi_ttl_udp",
+ this.rid,
+ ttl,
+ ),
+ };
+ }
+
+ async joinMulticastV6(addr, multiInterface) {
+ await core.opAsync(
+ "op_net_join_multi_v6_udp",
+ this.rid,
+ addr,
+ multiInterface,
+ );
+
+ return {
+ leave: () =>
+ core.opAsync(
+ "op_net_leave_multi_v6_udp",
+ this.rid,
+ addr,
+ multiInterface,
+ ),
+ setLoopback: (loopback) =>
+ core.opAsync(
+ "op_net_set_multi_loopback_udp",
+ this.rid,
+ false,
+ loopback,
+ ),
+ };
+ }
+
async receive(p) {
const buf = p || new Uint8Array(this.bufSize);
let nread;
@@ -383,6 +441,7 @@ function createListenDatagram(udpOpFn, unixOpFn) {
port: args.port,
},
args.reuseAddress ?? false,
+ args.loopback ?? false,
);
addr.transport = "udp";
return new Datagram(rid, addr);
diff --git a/ext/net/lib.rs b/ext/net/lib.rs
index 00833b53c..f812bf60b 100644
--- a/ext/net/lib.rs
+++ b/ext/net/lib.rs
@@ -86,6 +86,12 @@ deno_core::extension!(deno_net,
ops::op_node_unstable_net_listen_udp<P>,
ops::op_net_recv_udp,
ops::op_net_send_udp<P>,
+ ops::op_net_join_multi_v4_udp<P>,
+ ops::op_net_join_multi_v6_udp<P>,
+ ops::op_net_leave_multi_v4_udp<P>,
+ ops::op_net_leave_multi_v6_udp<P>,
+ ops::op_net_set_multi_loopback_udp<P>,
+ ops::op_net_set_multi_ttl_udp<P>,
ops::op_dns_resolve<P>,
ops::op_set_nodelay,
ops::op_set_keepalive,
diff --git a/ext/net/ops.rs b/ext/net/ops.rs
index c094ddac2..8e7263753 100644
--- a/ext/net/ops.rs
+++ b/ext/net/ops.rs
@@ -28,8 +28,11 @@ use socket2::Socket;
use socket2::Type;
use std::borrow::Cow;
use std::cell::RefCell;
+use std::net::Ipv4Addr;
+use std::net::Ipv6Addr;
use std::net::SocketAddr;
use std::rc::Rc;
+use std::str::FromStr;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::net::UdpSocket;
@@ -156,6 +159,151 @@ where
}
#[op]
+async fn op_net_join_multi_v4_udp<NP>(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ address: String,
+ multi_interface: String,
+) -> Result<(), AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<UdpSocketResource>(rid)
+ .map_err(|_| bad_resource("Socket has been closed"))?;
+ let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
+
+ let addr = Ipv4Addr::from_str(address.as_str())?;
+ let interface_addr = Ipv4Addr::from_str(multi_interface.as_str())?;
+
+ socket.join_multicast_v4(addr, interface_addr)?;
+
+ Ok(())
+}
+
+#[op]
+async fn op_net_join_multi_v6_udp<NP>(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ address: String,
+ multi_interface: u32,
+) -> Result<(), AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<UdpSocketResource>(rid)
+ .map_err(|_| bad_resource("Socket has been closed"))?;
+ let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
+
+ let addr = Ipv6Addr::from_str(address.as_str())?;
+
+ socket.join_multicast_v6(&addr, multi_interface)?;
+
+ Ok(())
+}
+
+#[op]
+async fn op_net_leave_multi_v4_udp<NP>(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ address: String,
+ multi_interface: String,
+) -> Result<(), AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<UdpSocketResource>(rid)
+ .map_err(|_| bad_resource("Socket has been closed"))?;
+ let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
+
+ let addr = Ipv4Addr::from_str(address.as_str())?;
+ let interface_addr = Ipv4Addr::from_str(multi_interface.as_str())?;
+
+ socket.leave_multicast_v4(addr, interface_addr)?;
+
+ Ok(())
+}
+
+#[op]
+async fn op_net_leave_multi_v6_udp<NP>(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ address: String,
+ multi_interface: u32,
+) -> Result<(), AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<UdpSocketResource>(rid)
+ .map_err(|_| bad_resource("Socket has been closed"))?;
+ let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
+
+ let addr = Ipv6Addr::from_str(address.as_str())?;
+
+ socket.leave_multicast_v6(&addr, multi_interface)?;
+
+ Ok(())
+}
+
+#[op]
+async fn op_net_set_multi_loopback_udp<NP>(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ is_v4_membership: bool,
+ loopback: bool,
+) -> Result<(), AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<UdpSocketResource>(rid)
+ .map_err(|_| bad_resource("Socket has been closed"))?;
+ let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
+
+ if is_v4_membership {
+ socket.set_multicast_loop_v4(loopback)?
+ } else {
+ socket.set_multicast_loop_v6(loopback)?;
+ }
+
+ Ok(())
+}
+
+#[op]
+async fn op_net_set_multi_ttl_udp<NP>(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ ttl: u32,
+) -> Result<(), AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<UdpSocketResource>(rid)
+ .map_err(|_| bad_resource("Socket has been closed"))?;
+ let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
+
+ socket.set_multicast_ttl_v4(ttl)?;
+
+ Ok(())
+}
+
+#[op]
pub async fn op_net_connect_tcp<NP>(
state: Rc<RefCell<OpState>>,
addr: IpAddr,
@@ -266,6 +414,7 @@ fn net_listen_udp<NP>(
state: &mut OpState,
addr: IpAddr,
reuse_address: bool,
+ loopback: bool,
) -> Result<(ResourceId, IpAddr), AnyError>
where
NP: NetPermissions + 'static,
@@ -301,9 +450,18 @@ where
let socket_addr = socket2::SockAddr::from(addr);
socket_tmp.bind(&socket_addr)?;
socket_tmp.set_nonblocking(true)?;
+
// Enable messages to be sent to the broadcast address (255.255.255.255) by default
socket_tmp.set_broadcast(true)?;
+
+ if domain == Domain::IPV4 {
+ socket_tmp.set_multicast_loop_v4(loopback)?;
+ } else {
+ socket_tmp.set_multicast_loop_v6(loopback)?;
+ }
+
let std_socket: std::net::UdpSocket = socket_tmp.into();
+
let socket = UdpSocket::from_std(std_socket)?;
let local_addr = socket.local_addr()?;
let socket_resource = UdpSocketResource {
@@ -320,12 +478,13 @@ fn op_net_listen_udp<NP>(
state: &mut OpState,
addr: IpAddr,
reuse_address: bool,
+ loopback: bool,
) -> Result<(ResourceId, IpAddr), AnyError>
where
NP: NetPermissions + 'static,
{
super::check_unstable(state, "Deno.listenDatagram");
- net_listen_udp::<NP>(state, addr, reuse_address)
+ net_listen_udp::<NP>(state, addr, reuse_address, loopback)
}
#[op]
@@ -333,11 +492,12 @@ fn op_node_unstable_net_listen_udp<NP>(
state: &mut OpState,
addr: IpAddr,
reuse_address: bool,
+ loopback: bool,
) -> Result<(ResourceId, IpAddr), AnyError>
where
NP: NetPermissions + 'static,
{
- net_listen_udp::<NP>(state, addr, reuse_address)
+ net_listen_udp::<NP>(state, addr, reuse_address, loopback)
}
#[derive(Serialize, Eq, PartialEq, Debug)]