summaryrefslogtreecommitdiff
path: root/ext/net/ops.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2022-10-25 20:32:51 +0200
committerGitHub <noreply@github.com>2022-10-25 20:32:51 +0200
commit8e3f825c921b38141afa7a69a0664881c5c94461 (patch)
tree09ffac0d9118d7d5214e247dbf55db06398f1bf9 /ext/net/ops.rs
parent1f6aeb430b71c16c2d9525edba63032d9ac7b372 (diff)
Revert "refactor(ext/net): clean up variadic network ops (#16392)" (#16417)
Should fix https://github.com/denoland/deno_std/issues/2807
Diffstat (limited to 'ext/net/ops.rs')
-rw-r--r--ext/net/ops.rs519
1 files changed, 402 insertions, 117 deletions
diff --git a/ext/net/ops.rs b/ext/net/ops.rs
index 9a6d95586..399baa4fd 100644
--- a/ext/net/ops.rs
+++ b/ext/net/ops.rs
@@ -7,6 +7,7 @@ use crate::NetPermissions;
use deno_core::error::bad_resource;
use deno_core::error::custom_error;
use deno_core::error::generic_error;
+use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op;
@@ -20,6 +21,7 @@ use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
+use log::debug;
use serde::Deserialize;
use serde::Serialize;
use socket2::Domain;
@@ -43,51 +45,69 @@ use trust_dns_resolver::error::ResolveErrorKind;
use trust_dns_resolver::system_conf;
use trust_dns_resolver::AsyncResolver;
+#[cfg(unix)]
+use super::ops_unix as net_unix;
+#[cfg(unix)]
+use crate::io::UnixStreamResource;
+#[cfg(unix)]
+use std::path::Path;
+
pub fn init<P: NetPermissions + 'static>() -> Vec<OpDecl> {
vec![
- op_net_accept_tcp::decl(),
- #[cfg(unix)]
- crate::ops_unix::op_net_accept_unix::decl(),
- op_net_connect_tcp::decl::<P>(),
- #[cfg(unix)]
- crate::ops_unix::op_net_connect_unix::decl::<P>(),
- op_net_listen_tcp::decl::<P>(),
- op_net_listen_udp::decl::<P>(),
- #[cfg(unix)]
- crate::ops_unix::op_net_listen_unix::decl::<P>(),
- #[cfg(unix)]
- crate::ops_unix::op_net_listen_unixpacket::decl::<P>(),
- op_net_recv_udp::decl(),
- #[cfg(unix)]
- crate::ops_unix::op_net_recv_unixpacket::decl(),
- op_net_send_udp::decl::<P>(),
- #[cfg(unix)]
- crate::ops_unix::op_net_send_unixpacket::decl::<P>(),
+ op_net_accept::decl(),
+ op_net_connect::decl::<P>(),
+ op_net_listen::decl::<P>(),
+ op_dgram_recv::decl(),
+ op_dgram_send::decl::<P>(),
op_dns_resolve::decl::<P>(),
op_set_nodelay::decl::<P>(),
op_set_keepalive::decl::<P>(),
]
}
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct OpConn {
+ pub rid: ResourceId,
+ pub remote_addr: Option<OpAddr>,
+ pub local_addr: Option<OpAddr>,
+}
+
+#[derive(Serialize)]
+#[serde(tag = "transport", rename_all = "lowercase")]
+pub enum OpAddr {
+ Tcp(IpAddr),
+ Udp(IpAddr),
+ #[cfg(unix)]
+ Unix(net_unix::UnixAddr),
+ #[cfg(unix)]
+ UnixPacket(net_unix::UnixAddr),
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+/// A received datagram packet (from udp or unixpacket)
+pub struct OpPacket {
+ pub size: usize,
+ pub remote_addr: OpAddr,
+}
+
#[derive(Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct TlsHandshakeInfo {
pub alpn_protocol: Option<ByteString>,
}
-#[derive(Deserialize, Serialize)]
+#[derive(Serialize)]
pub struct IpAddr {
pub hostname: String,
pub port: u16,
}
-impl From<SocketAddr> for IpAddr {
- fn from(addr: SocketAddr) -> Self {
- Self {
- hostname: addr.ip().to_string(),
- port: addr.port(),
- }
- }
+#[derive(Deserialize)]
+pub(crate) struct AcceptArgs {
+ pub rid: ResourceId,
+ pub transport: String,
}
pub(crate) fn accept_err(e: std::io::Error) -> AnyError {
@@ -99,11 +119,13 @@ pub(crate) fn accept_err(e: std::io::Error) -> AnyError {
}
}
-#[op]
-async fn op_net_accept_tcp(
+async fn accept_tcp(
state: Rc<RefCell<OpState>>,
- rid: ResourceId,
-) -> Result<(ResourceId, IpAddr, IpAddr), AnyError> {
+ args: AcceptArgs,
+ _: (),
+) -> Result<OpConn, AnyError> {
+ let rid = args.rid;
+
let resource = state
.borrow()
.resource_table
@@ -125,15 +147,51 @@ async fn op_net_accept_tcp(
let rid = state
.resource_table
.add(TcpStreamResource::new(tcp_stream.into_split()));
- Ok((rid, IpAddr::from(local_addr), IpAddr::from(remote_addr)))
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: local_addr.ip().to_string(),
+ port: local_addr.port(),
+ })),
+ remote_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: remote_addr.ip().to_string(),
+ port: remote_addr.port(),
+ })),
+ })
}
#[op]
-async fn op_net_recv_udp(
+async fn op_net_accept(
state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- mut buf: ZeroCopyBuf,
-) -> Result<(usize, IpAddr), AnyError> {
+ args: AcceptArgs,
+) -> Result<OpConn, AnyError> {
+ match args.transport.as_str() {
+ "tcp" => accept_tcp(state, args, ()).await,
+ #[cfg(unix)]
+ "unix" => net_unix::accept_unix(state, args, ()).await,
+ other => Err(bad_transport(other)),
+ }
+}
+
+fn bad_transport(transport: &str) -> AnyError {
+ generic_error(format!("Unsupported transport protocol {}", transport))
+}
+
+#[derive(Deserialize)]
+pub(crate) struct ReceiveArgs {
+ pub rid: ResourceId,
+ pub transport: String,
+}
+
+async fn receive_udp(
+ state: Rc<RefCell<OpState>>,
+ args: ReceiveArgs,
+ zero_copy: ZeroCopyBuf,
+) -> Result<OpPacket, AnyError> {
+ let mut zero_copy = zero_copy.clone();
+
+ let rid = args.rid;
+
let resource = state
.borrow_mut()
.resource_table
@@ -141,75 +199,192 @@ async fn op_net_recv_udp(
.map_err(|_| bad_resource("Socket has been closed"))?;
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
let cancel_handle = RcRef::map(&resource, |r| &r.cancel);
- let (nread, remote_addr) = socket
- .recv_from(&mut buf)
+ let (size, remote_addr) = socket
+ .recv_from(&mut zero_copy)
.try_or_cancel(cancel_handle)
.await?;
- Ok((nread, IpAddr::from(remote_addr)))
+ Ok(OpPacket {
+ size,
+ remote_addr: OpAddr::Udp(IpAddr {
+ hostname: remote_addr.ip().to_string(),
+ port: remote_addr.port(),
+ }),
+ })
}
#[op]
-async fn op_net_send_udp<NP>(
+async fn op_dgram_recv(
state: Rc<RefCell<OpState>>,
+ args: ReceiveArgs,
+ zero_copy: ZeroCopyBuf,
+) -> Result<OpPacket, AnyError> {
+ match args.transport.as_str() {
+ "udp" => receive_udp(state, args, zero_copy).await,
+ #[cfg(unix)]
+ "unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await,
+ other => Err(bad_transport(other)),
+ }
+}
+
+#[derive(Deserialize)]
+struct SendArgs {
rid: ResourceId,
- addr: IpAddr,
+ transport: String,
+ #[serde(flatten)]
+ transport_args: ArgsEnum,
+}
+
+#[op]
+async fn op_dgram_send<NP>(
+ state: Rc<RefCell<OpState>>,
+ args: SendArgs,
zero_copy: ZeroCopyBuf,
) -> Result<usize, AnyError>
where
NP: NetPermissions + 'static,
{
- {
- let mut s = state.borrow_mut();
- s.borrow_mut::<NP>().check_net(
- &(&addr.hostname, Some(addr.port)),
- "Deno.DatagramConn.send()",
- )?;
+ let zero_copy = zero_copy.clone();
+
+ match args {
+ SendArgs {
+ rid,
+ transport,
+ transport_args: ArgsEnum::Ip(args),
+ } if transport == "udp" => {
+ {
+ let mut s = state.borrow_mut();
+ s.borrow_mut::<NP>().check_net(
+ &(&args.hostname, Some(args.port)),
+ "Deno.DatagramConn.send()",
+ )?;
+ }
+ let addr = resolve_addr(&args.hostname, args.port)
+ .await?
+ .next()
+ .ok_or_else(|| generic_error("No resolved address found"))?;
+
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<UdpSocketResource>(rid)
+ .map_err(|_| bad_resource("Socket has been closed"))?;
+ let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
+ let byte_length = socket.send_to(&zero_copy, &addr).await?;
+ Ok(byte_length)
+ }
+ #[cfg(unix)]
+ SendArgs {
+ rid,
+ transport,
+ transport_args: ArgsEnum::Unix(args),
+ } if transport == "unixpacket" => {
+ let address_path = Path::new(&args.path);
+ {
+ let mut s = state.borrow_mut();
+ s.borrow_mut::<NP>()
+ .check_write(address_path, "Deno.DatagramConn.send()")?;
+ }
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<net_unix::UnixDatagramResource>(rid)
+ .map_err(|_| custom_error("NotConnected", "Socket has been closed"))?;
+ let socket = RcRef::map(&resource, |r| &r.socket)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
+ let byte_length = socket.send_to(&zero_copy, address_path).await?;
+ Ok(byte_length)
+ }
+ _ => Err(type_error("Wrong argument format!")),
}
- let addr = resolve_addr(&addr.hostname, addr.port)
- .await?
- .next()
- .ok_or_else(|| generic_error("No resolved address found"))?;
-
- let resource = state
- .borrow_mut()
- .resource_table
- .get::<UdpSocketResource>(rid)
- .map_err(|_| bad_resource("Socket has been closed"))?;
- let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
- let nwritten = socket.send_to(&zero_copy, &addr).await?;
+}
- Ok(nwritten)
+#[derive(Deserialize)]
+pub struct ConnectArgs {
+ transport: String,
+ #[serde(flatten)]
+ transport_args: ArgsEnum,
}
#[op]
-pub async fn op_net_connect_tcp<NP>(
+pub async fn op_net_connect<NP>(
state: Rc<RefCell<OpState>>,
- addr: IpAddr,
-) -> Result<(ResourceId, IpAddr, IpAddr), AnyError>
+ args: ConnectArgs,
+) -> Result<OpConn, AnyError>
where
NP: NetPermissions + 'static,
{
- {
- let mut state_ = state.borrow_mut();
- state_
- .borrow_mut::<NP>()
- .check_net(&(&addr.hostname, Some(addr.port)), "Deno.connect()")?;
+ match args {
+ ConnectArgs {
+ transport,
+ transport_args: ArgsEnum::Ip(args),
+ } if transport == "tcp" => {
+ {
+ let mut state_ = state.borrow_mut();
+ state_
+ .borrow_mut::<NP>()
+ .check_net(&(&args.hostname, Some(args.port)), "Deno.connect()")?;
+ }
+ let addr = resolve_addr(&args.hostname, args.port)
+ .await?
+ .next()
+ .ok_or_else(|| generic_error("No resolved address found"))?;
+ let tcp_stream = TcpStream::connect(&addr).await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+
+ let mut state_ = state.borrow_mut();
+ let rid = state_
+ .resource_table
+ .add(TcpStreamResource::new(tcp_stream.into_split()));
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: local_addr.ip().to_string(),
+ port: local_addr.port(),
+ })),
+ remote_addr: Some(OpAddr::Tcp(IpAddr {
+ hostname: remote_addr.ip().to_string(),
+ port: remote_addr.port(),
+ })),
+ })
+ }
+ #[cfg(unix)]
+ ConnectArgs {
+ transport,
+ transport_args: ArgsEnum::Unix(args),
+ } if transport == "unix" => {
+ let address_path = Path::new(&args.path);
+ super::check_unstable2(&state, "Deno.connect");
+ {
+ let mut state_ = state.borrow_mut();
+ state_
+ .borrow_mut::<NP>()
+ .check_read(address_path, "Deno.connect()")?;
+ state_
+ .borrow_mut::<NP>()
+ .check_write(address_path, "Deno.connect()")?;
+ }
+ let path = args.path;
+ let unix_stream = net_unix::UnixStream::connect(Path::new(&path)).await?;
+ let local_addr = unix_stream.local_addr()?;
+ let remote_addr = unix_stream.peer_addr()?;
+
+ let mut state_ = state.borrow_mut();
+ let resource = UnixStreamResource::new(unix_stream.into_split());
+ let rid = state_.resource_table.add(resource);
+ Ok(OpConn {
+ rid,
+ local_addr: Some(OpAddr::Unix(net_unix::UnixAddr {
+ path: local_addr.as_pathname().and_then(net_unix::pathstring),
+ })),
+ remote_addr: Some(OpAddr::Unix(net_unix::UnixAddr {
+ path: remote_addr.as_pathname().and_then(net_unix::pathstring),
+ })),
+ })
+ }
+ _ => Err(type_error("Wrong argument format!")),
}
-
- let addr = resolve_addr(&addr.hostname, addr.port)
- .await?
- .next()
- .ok_or_else(|| generic_error("No resolved address found"))?;
- let tcp_stream = TcpStream::connect(&addr).await?;
- let local_addr = tcp_stream.local_addr()?;
- let remote_addr = tcp_stream.peer_addr()?;
-
- let mut state_ = state.borrow_mut();
- let rid = state_
- .resource_table
- .add(TcpStreamResource::new(tcp_stream.into_split()));
-
- Ok((rid, IpAddr::from(local_addr), IpAddr::from(remote_addr)))
}
pub struct TcpListenerResource {
@@ -242,20 +417,33 @@ impl Resource for UdpSocketResource {
}
}
-#[op]
-fn op_net_listen_tcp<NP>(
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct IpListenArgs {
+ hostname: String,
+ port: u16,
+ reuse_address: Option<bool>,
+}
+
+#[derive(Deserialize)]
+#[serde(untagged)]
+enum ArgsEnum {
+ Ip(IpListenArgs),
+ #[cfg(unix)]
+ Unix(net_unix::UnixListenArgs),
+}
+
+#[derive(Deserialize)]
+struct ListenArgs {
+ transport: String,
+ #[serde(flatten)]
+ transport_args: ArgsEnum,
+}
+
+fn listen_tcp(
state: &mut OpState,
- addr: IpAddr,
-) -> Result<(ResourceId, IpAddr), AnyError>
-where
- NP: NetPermissions + 'static,
-{
- state
- .borrow_mut::<NP>()
- .check_net(&(&addr.hostname, Some(addr.port)), "Deno.listen()")?;
- let addr = resolve_addr_sync(&addr.hostname, addr.port)?
- .next()
- .ok_or_else(|| generic_error("No resolved address found"))?;
+ addr: SocketAddr,
+) -> Result<(u32, SocketAddr), AnyError> {
let domain = if addr.is_ipv4() {
Domain::IPV4
} else {
@@ -277,33 +465,21 @@ where
};
let rid = state.resource_table.add(listener_resource);
- Ok((rid, IpAddr::from(local_addr)))
+ Ok((rid, local_addr))
}
-#[op]
-fn op_net_listen_udp<NP>(
+fn listen_udp(
state: &mut OpState,
- addr: IpAddr,
- reuse_address: bool,
-) -> Result<(ResourceId, IpAddr), AnyError>
-where
- NP: NetPermissions + 'static,
-{
- super::check_unstable(state, "Deno.listenDatagram");
- state
- .borrow_mut::<NP>()
- .check_net(&(&addr.hostname, Some(addr.port)), "Deno.listenDatagram()")?;
- let addr = resolve_addr_sync(&addr.hostname, addr.port)?
- .next()
- .ok_or_else(|| generic_error("No resolved address found"))?;
-
+ addr: SocketAddr,
+ reuse_address: Option<bool>,
+) -> Result<(u32, SocketAddr), AnyError> {
let domain = if addr.is_ipv4() {
Domain::IPV4
} else {
Domain::IPV6
};
let socket_tmp = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
- if reuse_address {
+ if reuse_address.unwrap_or(false) {
// This logic is taken from libuv:
//
// On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
@@ -332,7 +508,110 @@ where
};
let rid = state.resource_table.add(socket_resource);
- Ok((rid, IpAddr::from(local_addr)))
+ Ok((rid, local_addr))
+}
+
+#[op]
+fn op_net_listen<NP>(
+ state: &mut OpState,
+ args: ListenArgs,
+) -> Result<OpConn, AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ match args {
+ ListenArgs {
+ transport,
+ transport_args: ArgsEnum::Ip(args),
+ } => {
+ {
+ if transport == "udp" {
+ super::check_unstable(state, "Deno.listenDatagram");
+ }
+ state.borrow_mut::<NP>().check_net(
+ &(&args.hostname, Some(args.port)),
+ "Deno.listenDatagram()",
+ )?;
+ }
+ let addr = resolve_addr_sync(&args.hostname, args.port)?
+ .next()
+ .ok_or_else(|| generic_error("No resolved address found"))?;
+ let (rid, local_addr) = if transport == "tcp" {
+ if args.reuse_address.is_some() {
+ return Err(generic_error(
+ "The reuseAddress option is not supported for TCP",
+ ));
+ }
+ listen_tcp(state, addr)?
+ } else {
+ listen_udp(state, addr, args.reuse_address)?
+ };
+ debug!(
+ "New listener {} {}:{}",
+ rid,
+ local_addr.ip().to_string(),
+ local_addr.port()
+ );
+ let ip_addr = IpAddr {
+ hostname: local_addr.ip().to_string(),
+ port: local_addr.port(),
+ };
+ Ok(OpConn {
+ rid,
+ local_addr: Some(match transport.as_str() {
+ "udp" => OpAddr::Udp(ip_addr),
+ "tcp" => OpAddr::Tcp(ip_addr),
+ // NOTE: This could be unreachable!()
+ other => return Err(bad_transport(other)),
+ }),
+ remote_addr: None,
+ })
+ }
+ #[cfg(unix)]
+ ListenArgs {
+ transport,
+ transport_args: ArgsEnum::Unix(args),
+ } if transport == "unix" || transport == "unixpacket" => {
+ let address_path = Path::new(&args.path);
+ {
+ if transport == "unix" {
+ super::check_unstable(state, "Deno.listen");
+ }
+ if transport == "unixpacket" {
+ super::check_unstable(state, "Deno.listenDatagram");
+ }
+ let api_name = if transport == "unix" {
+ "Deno.listen()"
+ } else {
+ "Deno.listenDatagram()"
+ };
+ let permissions = state.borrow_mut::<NP>();
+ permissions.check_read(address_path, api_name)?;
+ permissions.check_write(address_path, api_name)?;
+ }
+ let (rid, local_addr) = if transport == "unix" {
+ net_unix::listen_unix(state, address_path)?
+ } else {
+ net_unix::listen_unix_packet(state, address_path)?
+ };
+ debug!("New listener {} {:?}", rid, local_addr);
+ let unix_addr = net_unix::UnixAddr {
+ path: local_addr.as_pathname().and_then(net_unix::pathstring),
+ };
+
+ Ok(OpConn {
+ rid,
+ local_addr: Some(match transport.as_str() {
+ "unix" => OpAddr::Unix(unix_addr),
+ "unixpacket" => OpAddr::UnixPacket(unix_addr),
+ other => return Err(bad_transport(other)),
+ }),
+ remote_addr: None,
+ })
+ }
+ #[cfg(unix)]
+ _ => Err(type_error("Wrong argument format!")),
+ }
}
#[derive(Serialize, Eq, PartialEq, Debug)]
@@ -849,15 +1128,21 @@ mod tests {
let conn_state = runtime.op_state();
let server_addr: Vec<&str> = clone_addr.split(':').collect();
- let ip_addr = IpAddr {
+ let ip_args = IpListenArgs {
hostname: String::from(server_addr[0]),
port: server_addr[1].parse().unwrap(),
+ reuse_address: None,
+ };
+ let connect_args = ConnectArgs {
+ transport: String::from("tcp"),
+ transport_args: ArgsEnum::Ip(ip_args),
};
let connect_fut =
- op_net_connect_tcp::call::<TestPermission>(conn_state, ip_addr);
- let (rid, _, _) = connect_fut.await.unwrap();
+ op_net_connect::call::<TestPermission>(conn_state, connect_args);
+ let conn = connect_fut.await.unwrap();
+ let rid = conn.rid;
let state = runtime.op_state();
set_sockopt_fn(&mut state.borrow_mut(), rid);