diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2022-10-25 20:32:51 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-25 20:32:51 +0200 |
commit | 8e3f825c921b38141afa7a69a0664881c5c94461 (patch) | |
tree | 09ffac0d9118d7d5214e247dbf55db06398f1bf9 /ext/net/ops.rs | |
parent | 1f6aeb430b71c16c2d9525edba63032d9ac7b372 (diff) |
Revert "refactor(ext/net): clean up variadic network ops (#16392)" (#16417)
Should fix https://github.com/denoland/deno_std/issues/2807
Diffstat (limited to 'ext/net/ops.rs')
-rw-r--r-- | ext/net/ops.rs | 519 |
1 files changed, 402 insertions, 117 deletions
diff --git a/ext/net/ops.rs b/ext/net/ops.rs index 9a6d95586..399baa4fd 100644 --- a/ext/net/ops.rs +++ b/ext/net/ops.rs @@ -7,6 +7,7 @@ use crate::NetPermissions; use deno_core::error::bad_resource; use deno_core::error::custom_error; use deno_core::error::generic_error; +use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::op; @@ -20,6 +21,7 @@ use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_core::ZeroCopyBuf; +use log::debug; use serde::Deserialize; use serde::Serialize; use socket2::Domain; @@ -43,51 +45,69 @@ use trust_dns_resolver::error::ResolveErrorKind; use trust_dns_resolver::system_conf; use trust_dns_resolver::AsyncResolver; +#[cfg(unix)] +use super::ops_unix as net_unix; +#[cfg(unix)] +use crate::io::UnixStreamResource; +#[cfg(unix)] +use std::path::Path; + pub fn init<P: NetPermissions + 'static>() -> Vec<OpDecl> { vec![ - op_net_accept_tcp::decl(), - #[cfg(unix)] - crate::ops_unix::op_net_accept_unix::decl(), - op_net_connect_tcp::decl::<P>(), - #[cfg(unix)] - crate::ops_unix::op_net_connect_unix::decl::<P>(), - op_net_listen_tcp::decl::<P>(), - op_net_listen_udp::decl::<P>(), - #[cfg(unix)] - crate::ops_unix::op_net_listen_unix::decl::<P>(), - #[cfg(unix)] - crate::ops_unix::op_net_listen_unixpacket::decl::<P>(), - op_net_recv_udp::decl(), - #[cfg(unix)] - crate::ops_unix::op_net_recv_unixpacket::decl(), - op_net_send_udp::decl::<P>(), - #[cfg(unix)] - crate::ops_unix::op_net_send_unixpacket::decl::<P>(), + op_net_accept::decl(), + op_net_connect::decl::<P>(), + op_net_listen::decl::<P>(), + op_dgram_recv::decl(), + op_dgram_send::decl::<P>(), op_dns_resolve::decl::<P>(), op_set_nodelay::decl::<P>(), op_set_keepalive::decl::<P>(), ] } +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct OpConn { + pub rid: ResourceId, + pub remote_addr: Option<OpAddr>, + pub local_addr: Option<OpAddr>, +} + +#[derive(Serialize)] +#[serde(tag = "transport", rename_all = "lowercase")] +pub enum OpAddr { + Tcp(IpAddr), + Udp(IpAddr), + #[cfg(unix)] + Unix(net_unix::UnixAddr), + #[cfg(unix)] + UnixPacket(net_unix::UnixAddr), +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +/// A received datagram packet (from udp or unixpacket) +pub struct OpPacket { + pub size: usize, + pub remote_addr: OpAddr, +} + #[derive(Serialize, Clone, Debug)] #[serde(rename_all = "camelCase")] pub struct TlsHandshakeInfo { pub alpn_protocol: Option<ByteString>, } -#[derive(Deserialize, Serialize)] +#[derive(Serialize)] pub struct IpAddr { pub hostname: String, pub port: u16, } -impl From<SocketAddr> for IpAddr { - fn from(addr: SocketAddr) -> Self { - Self { - hostname: addr.ip().to_string(), - port: addr.port(), - } - } +#[derive(Deserialize)] +pub(crate) struct AcceptArgs { + pub rid: ResourceId, + pub transport: String, } pub(crate) fn accept_err(e: std::io::Error) -> AnyError { @@ -99,11 +119,13 @@ pub(crate) fn accept_err(e: std::io::Error) -> AnyError { } } -#[op] -async fn op_net_accept_tcp( +async fn accept_tcp( state: Rc<RefCell<OpState>>, - rid: ResourceId, -) -> Result<(ResourceId, IpAddr, IpAddr), AnyError> { + args: AcceptArgs, + _: (), +) -> Result<OpConn, AnyError> { + let rid = args.rid; + let resource = state .borrow() .resource_table @@ -125,15 +147,51 @@ async fn op_net_accept_tcp( let rid = state .resource_table .add(TcpStreamResource::new(tcp_stream.into_split())); - Ok((rid, IpAddr::from(local_addr), IpAddr::from(remote_addr))) + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) } #[op] -async fn op_net_recv_udp( +async fn op_net_accept( state: Rc<RefCell<OpState>>, - rid: ResourceId, - mut buf: ZeroCopyBuf, -) -> Result<(usize, IpAddr), AnyError> { + args: AcceptArgs, +) -> Result<OpConn, AnyError> { + match args.transport.as_str() { + "tcp" => accept_tcp(state, args, ()).await, + #[cfg(unix)] + "unix" => net_unix::accept_unix(state, args, ()).await, + other => Err(bad_transport(other)), + } +} + +fn bad_transport(transport: &str) -> AnyError { + generic_error(format!("Unsupported transport protocol {}", transport)) +} + +#[derive(Deserialize)] +pub(crate) struct ReceiveArgs { + pub rid: ResourceId, + pub transport: String, +} + +async fn receive_udp( + state: Rc<RefCell<OpState>>, + args: ReceiveArgs, + zero_copy: ZeroCopyBuf, +) -> Result<OpPacket, AnyError> { + let mut zero_copy = zero_copy.clone(); + + let rid = args.rid; + let resource = state .borrow_mut() .resource_table @@ -141,75 +199,192 @@ async fn op_net_recv_udp( .map_err(|_| bad_resource("Socket has been closed"))?; let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; let cancel_handle = RcRef::map(&resource, |r| &r.cancel); - let (nread, remote_addr) = socket - .recv_from(&mut buf) + let (size, remote_addr) = socket + .recv_from(&mut zero_copy) .try_or_cancel(cancel_handle) .await?; - Ok((nread, IpAddr::from(remote_addr))) + Ok(OpPacket { + size, + remote_addr: OpAddr::Udp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + }), + }) } #[op] -async fn op_net_send_udp<NP>( +async fn op_dgram_recv( state: Rc<RefCell<OpState>>, + args: ReceiveArgs, + zero_copy: ZeroCopyBuf, +) -> Result<OpPacket, AnyError> { + match args.transport.as_str() { + "udp" => receive_udp(state, args, zero_copy).await, + #[cfg(unix)] + "unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await, + other => Err(bad_transport(other)), + } +} + +#[derive(Deserialize)] +struct SendArgs { rid: ResourceId, - addr: IpAddr, + transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, +} + +#[op] +async fn op_dgram_send<NP>( + state: Rc<RefCell<OpState>>, + args: SendArgs, zero_copy: ZeroCopyBuf, ) -> Result<usize, AnyError> where NP: NetPermissions + 'static, { - { - let mut s = state.borrow_mut(); - s.borrow_mut::<NP>().check_net( - &(&addr.hostname, Some(addr.port)), - "Deno.DatagramConn.send()", - )?; + let zero_copy = zero_copy.clone(); + + match args { + SendArgs { + rid, + transport, + transport_args: ArgsEnum::Ip(args), + } if transport == "udp" => { + { + let mut s = state.borrow_mut(); + s.borrow_mut::<NP>().check_net( + &(&args.hostname, Some(args.port)), + "Deno.DatagramConn.send()", + )?; + } + let addr = resolve_addr(&args.hostname, args.port) + .await? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + + let resource = state + .borrow_mut() + .resource_table + .get::<UdpSocketResource>(rid) + .map_err(|_| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + let byte_length = socket.send_to(&zero_copy, &addr).await?; + Ok(byte_length) + } + #[cfg(unix)] + SendArgs { + rid, + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unixpacket" => { + let address_path = Path::new(&args.path); + { + let mut s = state.borrow_mut(); + s.borrow_mut::<NP>() + .check_write(address_path, "Deno.DatagramConn.send()")?; + } + let resource = state + .borrow() + .resource_table + .get::<net_unix::UnixDatagramResource>(rid) + .map_err(|_| custom_error("NotConnected", "Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; + let byte_length = socket.send_to(&zero_copy, address_path).await?; + Ok(byte_length) + } + _ => Err(type_error("Wrong argument format!")), } - let addr = resolve_addr(&addr.hostname, addr.port) - .await? - .next() - .ok_or_else(|| generic_error("No resolved address found"))?; - - let resource = state - .borrow_mut() - .resource_table - .get::<UdpSocketResource>(rid) - .map_err(|_| bad_resource("Socket has been closed"))?; - let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; - let nwritten = socket.send_to(&zero_copy, &addr).await?; +} - Ok(nwritten) +#[derive(Deserialize)] +pub struct ConnectArgs { + transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, } #[op] -pub async fn op_net_connect_tcp<NP>( +pub async fn op_net_connect<NP>( state: Rc<RefCell<OpState>>, - addr: IpAddr, -) -> Result<(ResourceId, IpAddr, IpAddr), AnyError> + args: ConnectArgs, +) -> Result<OpConn, AnyError> where NP: NetPermissions + 'static, { - { - let mut state_ = state.borrow_mut(); - state_ - .borrow_mut::<NP>() - .check_net(&(&addr.hostname, Some(addr.port)), "Deno.connect()")?; + match args { + ConnectArgs { + transport, + transport_args: ArgsEnum::Ip(args), + } if transport == "tcp" => { + { + let mut state_ = state.borrow_mut(); + state_ + .borrow_mut::<NP>() + .check_net(&(&args.hostname, Some(args.port)), "Deno.connect()")?; + } + let addr = resolve_addr(&args.hostname, args.port) + .await? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + let tcp_stream = TcpStream::connect(&addr).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + + let mut state_ = state.borrow_mut(); + let rid = state_ + .resource_table + .add(TcpStreamResource::new(tcp_stream.into_split())); + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) + } + #[cfg(unix)] + ConnectArgs { + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unix" => { + let address_path = Path::new(&args.path); + super::check_unstable2(&state, "Deno.connect"); + { + let mut state_ = state.borrow_mut(); + state_ + .borrow_mut::<NP>() + .check_read(address_path, "Deno.connect()")?; + state_ + .borrow_mut::<NP>() + .check_write(address_path, "Deno.connect()")?; + } + let path = args.path; + let unix_stream = net_unix::UnixStream::connect(Path::new(&path)).await?; + let local_addr = unix_stream.local_addr()?; + let remote_addr = unix_stream.peer_addr()?; + + let mut state_ = state.borrow_mut(); + let resource = UnixStreamResource::new(unix_stream.into_split()); + let rid = state_.resource_table.add(resource); + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Unix(net_unix::UnixAddr { + path: local_addr.as_pathname().and_then(net_unix::pathstring), + })), + remote_addr: Some(OpAddr::Unix(net_unix::UnixAddr { + path: remote_addr.as_pathname().and_then(net_unix::pathstring), + })), + }) + } + _ => Err(type_error("Wrong argument format!")), } - - let addr = resolve_addr(&addr.hostname, addr.port) - .await? - .next() - .ok_or_else(|| generic_error("No resolved address found"))?; - let tcp_stream = TcpStream::connect(&addr).await?; - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - - let mut state_ = state.borrow_mut(); - let rid = state_ - .resource_table - .add(TcpStreamResource::new(tcp_stream.into_split())); - - Ok((rid, IpAddr::from(local_addr), IpAddr::from(remote_addr))) } pub struct TcpListenerResource { @@ -242,20 +417,33 @@ impl Resource for UdpSocketResource { } } -#[op] -fn op_net_listen_tcp<NP>( +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct IpListenArgs { + hostname: String, + port: u16, + reuse_address: Option<bool>, +} + +#[derive(Deserialize)] +#[serde(untagged)] +enum ArgsEnum { + Ip(IpListenArgs), + #[cfg(unix)] + Unix(net_unix::UnixListenArgs), +} + +#[derive(Deserialize)] +struct ListenArgs { + transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, +} + +fn listen_tcp( state: &mut OpState, - addr: IpAddr, -) -> Result<(ResourceId, IpAddr), AnyError> -where - NP: NetPermissions + 'static, -{ - state - .borrow_mut::<NP>() - .check_net(&(&addr.hostname, Some(addr.port)), "Deno.listen()")?; - let addr = resolve_addr_sync(&addr.hostname, addr.port)? - .next() - .ok_or_else(|| generic_error("No resolved address found"))?; + addr: SocketAddr, +) -> Result<(u32, SocketAddr), AnyError> { let domain = if addr.is_ipv4() { Domain::IPV4 } else { @@ -277,33 +465,21 @@ where }; let rid = state.resource_table.add(listener_resource); - Ok((rid, IpAddr::from(local_addr))) + Ok((rid, local_addr)) } -#[op] -fn op_net_listen_udp<NP>( +fn listen_udp( state: &mut OpState, - addr: IpAddr, - reuse_address: bool, -) -> Result<(ResourceId, IpAddr), AnyError> -where - NP: NetPermissions + 'static, -{ - super::check_unstable(state, "Deno.listenDatagram"); - state - .borrow_mut::<NP>() - .check_net(&(&addr.hostname, Some(addr.port)), "Deno.listenDatagram()")?; - let addr = resolve_addr_sync(&addr.hostname, addr.port)? - .next() - .ok_or_else(|| generic_error("No resolved address found"))?; - + addr: SocketAddr, + reuse_address: Option<bool>, +) -> Result<(u32, SocketAddr), AnyError> { let domain = if addr.is_ipv4() { Domain::IPV4 } else { Domain::IPV6 }; let socket_tmp = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?; - if reuse_address { + if reuse_address.unwrap_or(false) { // This logic is taken from libuv: // // On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional @@ -332,7 +508,110 @@ where }; let rid = state.resource_table.add(socket_resource); - Ok((rid, IpAddr::from(local_addr))) + Ok((rid, local_addr)) +} + +#[op] +fn op_net_listen<NP>( + state: &mut OpState, + args: ListenArgs, +) -> Result<OpConn, AnyError> +where + NP: NetPermissions + 'static, +{ + match args { + ListenArgs { + transport, + transport_args: ArgsEnum::Ip(args), + } => { + { + if transport == "udp" { + super::check_unstable(state, "Deno.listenDatagram"); + } + state.borrow_mut::<NP>().check_net( + &(&args.hostname, Some(args.port)), + "Deno.listenDatagram()", + )?; + } + let addr = resolve_addr_sync(&args.hostname, args.port)? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + let (rid, local_addr) = if transport == "tcp" { + if args.reuse_address.is_some() { + return Err(generic_error( + "The reuseAddress option is not supported for TCP", + )); + } + listen_tcp(state, addr)? + } else { + listen_udp(state, addr, args.reuse_address)? + }; + debug!( + "New listener {} {}:{}", + rid, + local_addr.ip().to_string(), + local_addr.port() + ); + let ip_addr = IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + }; + Ok(OpConn { + rid, + local_addr: Some(match transport.as_str() { + "udp" => OpAddr::Udp(ip_addr), + "tcp" => OpAddr::Tcp(ip_addr), + // NOTE: This could be unreachable!() + other => return Err(bad_transport(other)), + }), + remote_addr: None, + }) + } + #[cfg(unix)] + ListenArgs { + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unix" || transport == "unixpacket" => { + let address_path = Path::new(&args.path); + { + if transport == "unix" { + super::check_unstable(state, "Deno.listen"); + } + if transport == "unixpacket" { + super::check_unstable(state, "Deno.listenDatagram"); + } + let api_name = if transport == "unix" { + "Deno.listen()" + } else { + "Deno.listenDatagram()" + }; + let permissions = state.borrow_mut::<NP>(); + permissions.check_read(address_path, api_name)?; + permissions.check_write(address_path, api_name)?; + } + let (rid, local_addr) = if transport == "unix" { + net_unix::listen_unix(state, address_path)? + } else { + net_unix::listen_unix_packet(state, address_path)? + }; + debug!("New listener {} {:?}", rid, local_addr); + let unix_addr = net_unix::UnixAddr { + path: local_addr.as_pathname().and_then(net_unix::pathstring), + }; + + Ok(OpConn { + rid, + local_addr: Some(match transport.as_str() { + "unix" => OpAddr::Unix(unix_addr), + "unixpacket" => OpAddr::UnixPacket(unix_addr), + other => return Err(bad_transport(other)), + }), + remote_addr: None, + }) + } + #[cfg(unix)] + _ => Err(type_error("Wrong argument format!")), + } } #[derive(Serialize, Eq, PartialEq, Debug)] @@ -849,15 +1128,21 @@ mod tests { let conn_state = runtime.op_state(); let server_addr: Vec<&str> = clone_addr.split(':').collect(); - let ip_addr = IpAddr { + let ip_args = IpListenArgs { hostname: String::from(server_addr[0]), port: server_addr[1].parse().unwrap(), + reuse_address: None, + }; + let connect_args = ConnectArgs { + transport: String::from("tcp"), + transport_args: ArgsEnum::Ip(ip_args), }; let connect_fut = - op_net_connect_tcp::call::<TestPermission>(conn_state, ip_addr); - let (rid, _, _) = connect_fut.await.unwrap(); + op_net_connect::call::<TestPermission>(conn_state, connect_args); + let conn = connect_fut.await.unwrap(); + let rid = conn.rid; let state = runtime.op_state(); set_sockopt_fn(&mut state.borrow_mut(), rid); |