From 4c34a2f2df595fa43b3eb06722c3ad742450d8bd Mon Sep 17 00:00:00 2001 From: Sam Gwilym Date: Mon, 20 Mar 2023 21:27:00 +0000 Subject: feat(ext/net): Add multicasting APIs to DatagramConn (#10706) (#17811) --- ext/net/ops.rs | 164 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 162 insertions(+), 2 deletions(-) (limited to 'ext/net/ops.rs') diff --git a/ext/net/ops.rs b/ext/net/ops.rs index c094ddac2..8e7263753 100644 --- a/ext/net/ops.rs +++ b/ext/net/ops.rs @@ -28,8 +28,11 @@ use socket2::Socket; use socket2::Type; use std::borrow::Cow; use std::cell::RefCell; +use std::net::Ipv4Addr; +use std::net::Ipv6Addr; use std::net::SocketAddr; use std::rc::Rc; +use std::str::FromStr; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::net::UdpSocket; @@ -155,6 +158,151 @@ where Ok(nwritten) } +#[op] +async fn op_net_join_multi_v4_udp( + state: Rc>, + rid: ResourceId, + address: String, + multi_interface: String, +) -> Result<(), AnyError> +where + NP: NetPermissions + 'static, +{ + let resource = state + .borrow_mut() + .resource_table + .get::(rid) + .map_err(|_| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + + let addr = Ipv4Addr::from_str(address.as_str())?; + let interface_addr = Ipv4Addr::from_str(multi_interface.as_str())?; + + socket.join_multicast_v4(addr, interface_addr)?; + + Ok(()) +} + +#[op] +async fn op_net_join_multi_v6_udp( + state: Rc>, + rid: ResourceId, + address: String, + multi_interface: u32, +) -> Result<(), AnyError> +where + NP: NetPermissions + 'static, +{ + let resource = state + .borrow_mut() + .resource_table + .get::(rid) + .map_err(|_| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + + let addr = Ipv6Addr::from_str(address.as_str())?; + + socket.join_multicast_v6(&addr, multi_interface)?; + + Ok(()) +} + +#[op] +async fn op_net_leave_multi_v4_udp( + state: Rc>, + rid: ResourceId, + address: String, + multi_interface: String, +) -> Result<(), AnyError> +where + NP: NetPermissions + 'static, +{ + let resource = state + .borrow_mut() + .resource_table + .get::(rid) + .map_err(|_| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + + let addr = Ipv4Addr::from_str(address.as_str())?; + let interface_addr = Ipv4Addr::from_str(multi_interface.as_str())?; + + socket.leave_multicast_v4(addr, interface_addr)?; + + Ok(()) +} + +#[op] +async fn op_net_leave_multi_v6_udp( + state: Rc>, + rid: ResourceId, + address: String, + multi_interface: u32, +) -> Result<(), AnyError> +where + NP: NetPermissions + 'static, +{ + let resource = state + .borrow_mut() + .resource_table + .get::(rid) + .map_err(|_| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + + let addr = Ipv6Addr::from_str(address.as_str())?; + + socket.leave_multicast_v6(&addr, multi_interface)?; + + Ok(()) +} + +#[op] +async fn op_net_set_multi_loopback_udp( + state: Rc>, + rid: ResourceId, + is_v4_membership: bool, + loopback: bool, +) -> Result<(), AnyError> +where + NP: NetPermissions + 'static, +{ + let resource = state + .borrow_mut() + .resource_table + .get::(rid) + .map_err(|_| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + + if is_v4_membership { + socket.set_multicast_loop_v4(loopback)? + } else { + socket.set_multicast_loop_v6(loopback)?; + } + + Ok(()) +} + +#[op] +async fn op_net_set_multi_ttl_udp( + state: Rc>, + rid: ResourceId, + ttl: u32, +) -> Result<(), AnyError> +where + NP: NetPermissions + 'static, +{ + let resource = state + .borrow_mut() + .resource_table + .get::(rid) + .map_err(|_| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + + socket.set_multicast_ttl_v4(ttl)?; + + Ok(()) +} + #[op] pub async fn op_net_connect_tcp( state: Rc>, @@ -266,6 +414,7 @@ fn net_listen_udp( state: &mut OpState, addr: IpAddr, reuse_address: bool, + loopback: bool, ) -> Result<(ResourceId, IpAddr), AnyError> where NP: NetPermissions + 'static, @@ -301,9 +450,18 @@ where let socket_addr = socket2::SockAddr::from(addr); socket_tmp.bind(&socket_addr)?; socket_tmp.set_nonblocking(true)?; + // Enable messages to be sent to the broadcast address (255.255.255.255) by default socket_tmp.set_broadcast(true)?; + + if domain == Domain::IPV4 { + socket_tmp.set_multicast_loop_v4(loopback)?; + } else { + socket_tmp.set_multicast_loop_v6(loopback)?; + } + let std_socket: std::net::UdpSocket = socket_tmp.into(); + let socket = UdpSocket::from_std(std_socket)?; let local_addr = socket.local_addr()?; let socket_resource = UdpSocketResource { @@ -320,12 +478,13 @@ fn op_net_listen_udp( state: &mut OpState, addr: IpAddr, reuse_address: bool, + loopback: bool, ) -> Result<(ResourceId, IpAddr), AnyError> where NP: NetPermissions + 'static, { super::check_unstable(state, "Deno.listenDatagram"); - net_listen_udp::(state, addr, reuse_address) + net_listen_udp::(state, addr, reuse_address, loopback) } #[op] @@ -333,11 +492,12 @@ fn op_node_unstable_net_listen_udp( state: &mut OpState, addr: IpAddr, reuse_address: bool, + loopback: bool, ) -> Result<(ResourceId, IpAddr), AnyError> where NP: NetPermissions + 'static, { - net_listen_udp::(state, addr, reuse_address) + net_listen_udp::(state, addr, reuse_address, loopback) } #[derive(Serialize, Eq, PartialEq, Debug)] -- cgit v1.2.3