diff options
author | Luca Casonato <hello@lcas.dev> | 2022-10-24 14:55:39 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-24 14:55:39 +0200 |
commit | c061538417cdb2b0a8f2d6eae7f5f11511ba34fe (patch) | |
tree | d8d8efc9e2a883d4ad21ebacf44e8e922fae794d /ext/net/ops.rs | |
parent | f38666f5a348c5f40c66dd6e5896618b7f1b09bc (diff) |
refactor(ext/net): clean up variadic network ops (#16392)
Previously `op_net_listen`, `op_net_accept`, and various other ops in
ext/net where variadic on the transport. This created a lot of code
bloat. This commit updates the code to instead have separate ops for
each transport.
Diffstat (limited to 'ext/net/ops.rs')
-rw-r--r-- | ext/net/ops.rs | 519 |
1 files changed, 117 insertions, 402 deletions
diff --git a/ext/net/ops.rs b/ext/net/ops.rs index 399baa4fd..9a6d95586 100644 --- a/ext/net/ops.rs +++ b/ext/net/ops.rs @@ -7,7 +7,6 @@ use crate::NetPermissions; use deno_core::error::bad_resource; use deno_core::error::custom_error; use deno_core::error::generic_error; -use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::op; @@ -21,7 +20,6 @@ use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_core::ZeroCopyBuf; -use log::debug; use serde::Deserialize; use serde::Serialize; use socket2::Domain; @@ -45,69 +43,51 @@ use trust_dns_resolver::error::ResolveErrorKind; use trust_dns_resolver::system_conf; use trust_dns_resolver::AsyncResolver; -#[cfg(unix)] -use super::ops_unix as net_unix; -#[cfg(unix)] -use crate::io::UnixStreamResource; -#[cfg(unix)] -use std::path::Path; - pub fn init<P: NetPermissions + 'static>() -> Vec<OpDecl> { vec![ - op_net_accept::decl(), - op_net_connect::decl::<P>(), - op_net_listen::decl::<P>(), - op_dgram_recv::decl(), - op_dgram_send::decl::<P>(), + op_net_accept_tcp::decl(), + #[cfg(unix)] + crate::ops_unix::op_net_accept_unix::decl(), + op_net_connect_tcp::decl::<P>(), + #[cfg(unix)] + crate::ops_unix::op_net_connect_unix::decl::<P>(), + op_net_listen_tcp::decl::<P>(), + op_net_listen_udp::decl::<P>(), + #[cfg(unix)] + crate::ops_unix::op_net_listen_unix::decl::<P>(), + #[cfg(unix)] + crate::ops_unix::op_net_listen_unixpacket::decl::<P>(), + op_net_recv_udp::decl(), + #[cfg(unix)] + crate::ops_unix::op_net_recv_unixpacket::decl(), + op_net_send_udp::decl::<P>(), + #[cfg(unix)] + crate::ops_unix::op_net_send_unixpacket::decl::<P>(), op_dns_resolve::decl::<P>(), op_set_nodelay::decl::<P>(), op_set_keepalive::decl::<P>(), ] } -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -pub struct OpConn { - pub rid: ResourceId, - pub remote_addr: Option<OpAddr>, - pub local_addr: Option<OpAddr>, -} - -#[derive(Serialize)] -#[serde(tag = "transport", rename_all = "lowercase")] -pub enum OpAddr { - Tcp(IpAddr), - Udp(IpAddr), - #[cfg(unix)] - Unix(net_unix::UnixAddr), - #[cfg(unix)] - UnixPacket(net_unix::UnixAddr), -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -/// A received datagram packet (from udp or unixpacket) -pub struct OpPacket { - pub size: usize, - pub remote_addr: OpAddr, -} - #[derive(Serialize, Clone, Debug)] #[serde(rename_all = "camelCase")] pub struct TlsHandshakeInfo { pub alpn_protocol: Option<ByteString>, } -#[derive(Serialize)] +#[derive(Deserialize, Serialize)] pub struct IpAddr { pub hostname: String, pub port: u16, } -#[derive(Deserialize)] -pub(crate) struct AcceptArgs { - pub rid: ResourceId, - pub transport: String, +impl From<SocketAddr> for IpAddr { + fn from(addr: SocketAddr) -> Self { + Self { + hostname: addr.ip().to_string(), + port: addr.port(), + } + } } pub(crate) fn accept_err(e: std::io::Error) -> AnyError { @@ -119,13 +99,11 @@ pub(crate) fn accept_err(e: std::io::Error) -> AnyError { } } -async fn accept_tcp( +#[op] +async fn op_net_accept_tcp( state: Rc<RefCell<OpState>>, - args: AcceptArgs, - _: (), -) -> Result<OpConn, AnyError> { - let rid = args.rid; - + rid: ResourceId, +) -> Result<(ResourceId, IpAddr, IpAddr), AnyError> { let resource = state .borrow() .resource_table @@ -147,51 +125,15 @@ async fn accept_tcp( let rid = state .resource_table .add(TcpStreamResource::new(tcp_stream.into_split())); - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Tcp(IpAddr { - hostname: local_addr.ip().to_string(), - port: local_addr.port(), - })), - remote_addr: Some(OpAddr::Tcp(IpAddr { - hostname: remote_addr.ip().to_string(), - port: remote_addr.port(), - })), - }) + Ok((rid, IpAddr::from(local_addr), IpAddr::from(remote_addr))) } #[op] -async fn op_net_accept( +async fn op_net_recv_udp( state: Rc<RefCell<OpState>>, - args: AcceptArgs, -) -> Result<OpConn, AnyError> { - match args.transport.as_str() { - "tcp" => accept_tcp(state, args, ()).await, - #[cfg(unix)] - "unix" => net_unix::accept_unix(state, args, ()).await, - other => Err(bad_transport(other)), - } -} - -fn bad_transport(transport: &str) -> AnyError { - generic_error(format!("Unsupported transport protocol {}", transport)) -} - -#[derive(Deserialize)] -pub(crate) struct ReceiveArgs { - pub rid: ResourceId, - pub transport: String, -} - -async fn receive_udp( - state: Rc<RefCell<OpState>>, - args: ReceiveArgs, - zero_copy: ZeroCopyBuf, -) -> Result<OpPacket, AnyError> { - let mut zero_copy = zero_copy.clone(); - - let rid = args.rid; - + rid: ResourceId, + mut buf: ZeroCopyBuf, +) -> Result<(usize, IpAddr), AnyError> { let resource = state .borrow_mut() .resource_table @@ -199,192 +141,75 @@ async fn receive_udp( .map_err(|_| bad_resource("Socket has been closed"))?; let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; let cancel_handle = RcRef::map(&resource, |r| &r.cancel); - let (size, remote_addr) = socket - .recv_from(&mut zero_copy) + let (nread, remote_addr) = socket + .recv_from(&mut buf) .try_or_cancel(cancel_handle) .await?; - Ok(OpPacket { - size, - remote_addr: OpAddr::Udp(IpAddr { - hostname: remote_addr.ip().to_string(), - port: remote_addr.port(), - }), - }) + Ok((nread, IpAddr::from(remote_addr))) } #[op] -async fn op_dgram_recv( +async fn op_net_send_udp<NP>( state: Rc<RefCell<OpState>>, - args: ReceiveArgs, - zero_copy: ZeroCopyBuf, -) -> Result<OpPacket, AnyError> { - match args.transport.as_str() { - "udp" => receive_udp(state, args, zero_copy).await, - #[cfg(unix)] - "unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await, - other => Err(bad_transport(other)), - } -} - -#[derive(Deserialize)] -struct SendArgs { rid: ResourceId, - transport: String, - #[serde(flatten)] - transport_args: ArgsEnum, -} - -#[op] -async fn op_dgram_send<NP>( - state: Rc<RefCell<OpState>>, - args: SendArgs, + addr: IpAddr, zero_copy: ZeroCopyBuf, ) -> Result<usize, AnyError> where NP: NetPermissions + 'static, { - let zero_copy = zero_copy.clone(); - - match args { - SendArgs { - rid, - transport, - transport_args: ArgsEnum::Ip(args), - } if transport == "udp" => { - { - let mut s = state.borrow_mut(); - s.borrow_mut::<NP>().check_net( - &(&args.hostname, Some(args.port)), - "Deno.DatagramConn.send()", - )?; - } - let addr = resolve_addr(&args.hostname, args.port) - .await? - .next() - .ok_or_else(|| generic_error("No resolved address found"))?; - - let resource = state - .borrow_mut() - .resource_table - .get::<UdpSocketResource>(rid) - .map_err(|_| bad_resource("Socket has been closed"))?; - let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; - let byte_length = socket.send_to(&zero_copy, &addr).await?; - Ok(byte_length) - } - #[cfg(unix)] - SendArgs { - rid, - transport, - transport_args: ArgsEnum::Unix(args), - } if transport == "unixpacket" => { - let address_path = Path::new(&args.path); - { - let mut s = state.borrow_mut(); - s.borrow_mut::<NP>() - .check_write(address_path, "Deno.DatagramConn.send()")?; - } - let resource = state - .borrow() - .resource_table - .get::<net_unix::UnixDatagramResource>(rid) - .map_err(|_| custom_error("NotConnected", "Socket has been closed"))?; - let socket = RcRef::map(&resource, |r| &r.socket) - .try_borrow_mut() - .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; - let byte_length = socket.send_to(&zero_copy, address_path).await?; - Ok(byte_length) - } - _ => Err(type_error("Wrong argument format!")), + { + let mut s = state.borrow_mut(); + s.borrow_mut::<NP>().check_net( + &(&addr.hostname, Some(addr.port)), + "Deno.DatagramConn.send()", + )?; } -} + let addr = resolve_addr(&addr.hostname, addr.port) + .await? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; -#[derive(Deserialize)] -pub struct ConnectArgs { - transport: String, - #[serde(flatten)] - transport_args: ArgsEnum, + let resource = state + .borrow_mut() + .resource_table + .get::<UdpSocketResource>(rid) + .map_err(|_| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + let nwritten = socket.send_to(&zero_copy, &addr).await?; + + Ok(nwritten) } #[op] -pub async fn op_net_connect<NP>( +pub async fn op_net_connect_tcp<NP>( state: Rc<RefCell<OpState>>, - args: ConnectArgs, -) -> Result<OpConn, AnyError> + addr: IpAddr, +) -> Result<(ResourceId, IpAddr, IpAddr), AnyError> where NP: NetPermissions + 'static, { - match args { - ConnectArgs { - transport, - transport_args: ArgsEnum::Ip(args), - } if transport == "tcp" => { - { - let mut state_ = state.borrow_mut(); - state_ - .borrow_mut::<NP>() - .check_net(&(&args.hostname, Some(args.port)), "Deno.connect()")?; - } - let addr = resolve_addr(&args.hostname, args.port) - .await? - .next() - .ok_or_else(|| generic_error("No resolved address found"))?; - let tcp_stream = TcpStream::connect(&addr).await?; - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - - let mut state_ = state.borrow_mut(); - let rid = state_ - .resource_table - .add(TcpStreamResource::new(tcp_stream.into_split())); - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Tcp(IpAddr { - hostname: local_addr.ip().to_string(), - port: local_addr.port(), - })), - remote_addr: Some(OpAddr::Tcp(IpAddr { - hostname: remote_addr.ip().to_string(), - port: remote_addr.port(), - })), - }) - } - #[cfg(unix)] - ConnectArgs { - transport, - transport_args: ArgsEnum::Unix(args), - } if transport == "unix" => { - let address_path = Path::new(&args.path); - super::check_unstable2(&state, "Deno.connect"); - { - let mut state_ = state.borrow_mut(); - state_ - .borrow_mut::<NP>() - .check_read(address_path, "Deno.connect()")?; - state_ - .borrow_mut::<NP>() - .check_write(address_path, "Deno.connect()")?; - } - let path = args.path; - let unix_stream = net_unix::UnixStream::connect(Path::new(&path)).await?; - let local_addr = unix_stream.local_addr()?; - let remote_addr = unix_stream.peer_addr()?; - - let mut state_ = state.borrow_mut(); - let resource = UnixStreamResource::new(unix_stream.into_split()); - let rid = state_.resource_table.add(resource); - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Unix(net_unix::UnixAddr { - path: local_addr.as_pathname().and_then(net_unix::pathstring), - })), - remote_addr: Some(OpAddr::Unix(net_unix::UnixAddr { - path: remote_addr.as_pathname().and_then(net_unix::pathstring), - })), - }) - } - _ => Err(type_error("Wrong argument format!")), + { + let mut state_ = state.borrow_mut(); + state_ + .borrow_mut::<NP>() + .check_net(&(&addr.hostname, Some(addr.port)), "Deno.connect()")?; } + + let addr = resolve_addr(&addr.hostname, addr.port) + .await? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + let tcp_stream = TcpStream::connect(&addr).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + + let mut state_ = state.borrow_mut(); + let rid = state_ + .resource_table + .add(TcpStreamResource::new(tcp_stream.into_split())); + + Ok((rid, IpAddr::from(local_addr), IpAddr::from(remote_addr))) } pub struct TcpListenerResource { @@ -417,33 +242,20 @@ impl Resource for UdpSocketResource { } } -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct IpListenArgs { - hostname: String, - port: u16, - reuse_address: Option<bool>, -} - -#[derive(Deserialize)] -#[serde(untagged)] -enum ArgsEnum { - Ip(IpListenArgs), - #[cfg(unix)] - Unix(net_unix::UnixListenArgs), -} - -#[derive(Deserialize)] -struct ListenArgs { - transport: String, - #[serde(flatten)] - transport_args: ArgsEnum, -} - -fn listen_tcp( +#[op] +fn op_net_listen_tcp<NP>( state: &mut OpState, - addr: SocketAddr, -) -> Result<(u32, SocketAddr), AnyError> { + addr: IpAddr, +) -> Result<(ResourceId, IpAddr), AnyError> +where + NP: NetPermissions + 'static, +{ + state + .borrow_mut::<NP>() + .check_net(&(&addr.hostname, Some(addr.port)), "Deno.listen()")?; + let addr = resolve_addr_sync(&addr.hostname, addr.port)? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; let domain = if addr.is_ipv4() { Domain::IPV4 } else { @@ -465,21 +277,33 @@ fn listen_tcp( }; let rid = state.resource_table.add(listener_resource); - Ok((rid, local_addr)) + Ok((rid, IpAddr::from(local_addr))) } -fn listen_udp( +#[op] +fn op_net_listen_udp<NP>( state: &mut OpState, - addr: SocketAddr, - reuse_address: Option<bool>, -) -> Result<(u32, SocketAddr), AnyError> { + addr: IpAddr, + reuse_address: bool, +) -> Result<(ResourceId, IpAddr), AnyError> +where + NP: NetPermissions + 'static, +{ + super::check_unstable(state, "Deno.listenDatagram"); + state + .borrow_mut::<NP>() + .check_net(&(&addr.hostname, Some(addr.port)), "Deno.listenDatagram()")?; + let addr = resolve_addr_sync(&addr.hostname, addr.port)? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + let domain = if addr.is_ipv4() { Domain::IPV4 } else { Domain::IPV6 }; let socket_tmp = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?; - if reuse_address.unwrap_or(false) { + if reuse_address { // This logic is taken from libuv: // // On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional @@ -508,110 +332,7 @@ fn listen_udp( }; let rid = state.resource_table.add(socket_resource); - Ok((rid, local_addr)) -} - -#[op] -fn op_net_listen<NP>( - state: &mut OpState, - args: ListenArgs, -) -> Result<OpConn, AnyError> -where - NP: NetPermissions + 'static, -{ - match args { - ListenArgs { - transport, - transport_args: ArgsEnum::Ip(args), - } => { - { - if transport == "udp" { - super::check_unstable(state, "Deno.listenDatagram"); - } - state.borrow_mut::<NP>().check_net( - &(&args.hostname, Some(args.port)), - "Deno.listenDatagram()", - )?; - } - let addr = resolve_addr_sync(&args.hostname, args.port)? - .next() - .ok_or_else(|| generic_error("No resolved address found"))?; - let (rid, local_addr) = if transport == "tcp" { - if args.reuse_address.is_some() { - return Err(generic_error( - "The reuseAddress option is not supported for TCP", - )); - } - listen_tcp(state, addr)? - } else { - listen_udp(state, addr, args.reuse_address)? - }; - debug!( - "New listener {} {}:{}", - rid, - local_addr.ip().to_string(), - local_addr.port() - ); - let ip_addr = IpAddr { - hostname: local_addr.ip().to_string(), - port: local_addr.port(), - }; - Ok(OpConn { - rid, - local_addr: Some(match transport.as_str() { - "udp" => OpAddr::Udp(ip_addr), - "tcp" => OpAddr::Tcp(ip_addr), - // NOTE: This could be unreachable!() - other => return Err(bad_transport(other)), - }), - remote_addr: None, - }) - } - #[cfg(unix)] - ListenArgs { - transport, - transport_args: ArgsEnum::Unix(args), - } if transport == "unix" || transport == "unixpacket" => { - let address_path = Path::new(&args.path); - { - if transport == "unix" { - super::check_unstable(state, "Deno.listen"); - } - if transport == "unixpacket" { - super::check_unstable(state, "Deno.listenDatagram"); - } - let api_name = if transport == "unix" { - "Deno.listen()" - } else { - "Deno.listenDatagram()" - }; - let permissions = state.borrow_mut::<NP>(); - permissions.check_read(address_path, api_name)?; - permissions.check_write(address_path, api_name)?; - } - let (rid, local_addr) = if transport == "unix" { - net_unix::listen_unix(state, address_path)? - } else { - net_unix::listen_unix_packet(state, address_path)? - }; - debug!("New listener {} {:?}", rid, local_addr); - let unix_addr = net_unix::UnixAddr { - path: local_addr.as_pathname().and_then(net_unix::pathstring), - }; - - Ok(OpConn { - rid, - local_addr: Some(match transport.as_str() { - "unix" => OpAddr::Unix(unix_addr), - "unixpacket" => OpAddr::UnixPacket(unix_addr), - other => return Err(bad_transport(other)), - }), - remote_addr: None, - }) - } - #[cfg(unix)] - _ => Err(type_error("Wrong argument format!")), - } + Ok((rid, IpAddr::from(local_addr))) } #[derive(Serialize, Eq, PartialEq, Debug)] @@ -1128,21 +849,15 @@ mod tests { let conn_state = runtime.op_state(); let server_addr: Vec<&str> = clone_addr.split(':').collect(); - let ip_args = IpListenArgs { + let ip_addr = IpAddr { hostname: String::from(server_addr[0]), port: server_addr[1].parse().unwrap(), - reuse_address: None, - }; - let connect_args = ConnectArgs { - transport: String::from("tcp"), - transport_args: ArgsEnum::Ip(ip_args), }; let connect_fut = - op_net_connect::call::<TestPermission>(conn_state, connect_args); - let conn = connect_fut.await.unwrap(); + op_net_connect_tcp::call::<TestPermission>(conn_state, ip_addr); + let (rid, _, _) = connect_fut.await.unwrap(); - let rid = conn.rid; let state = runtime.op_state(); set_sockopt_fn(&mut state.borrow_mut(), rid); |