diff options
Diffstat (limited to 'runtime/ops/net.rs')
-rw-r--r-- | runtime/ops/net.rs | 47 |
1 files changed, 26 insertions, 21 deletions
diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs index 6b7e05771..dea7ffe51 100644 --- a/runtime/ops/net.rs +++ b/runtime/ops/net.rs @@ -1,6 +1,5 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use crate::ops::io::FullDuplexResource; use crate::ops::io::TcpStreamResource; use crate::permissions::Permissions; use crate::resolve_addr::resolve_addr; @@ -28,7 +27,7 @@ use std::cell::RefCell; use std::net::Shutdown; use std::net::SocketAddr; use std::rc::Rc; -use tokio::net::udp; +use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::net::UdpSocket; @@ -67,7 +66,7 @@ async fn accept_tcp( .resource_table .get::<TcpListenerResource>(rid) .ok_or_else(|| bad_resource("Listener has been closed"))?; - let mut listener = RcRef::map(&resource, |r| &r.listener) + let listener = RcRef::map(&resource, |r| &r.listener) .try_borrow_mut() .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?; let cancel = RcRef::map(resource, |r| &r.cancel); @@ -140,11 +139,11 @@ async fn receive_udp( .resource_table .get::<UdpSocketResource>(rid) .ok_or_else(|| bad_resource("Socket has been closed"))?; - let (size, remote_addr) = resource - .rd_borrow_mut() - .await + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + let cancel_handle = RcRef::map(&resource, |r| &r.cancel); + let (size, remote_addr) = socket .recv_from(&mut zero_copy) - .try_or_cancel(resource.cancel_handle()) + .try_or_cancel(cancel_handle) .await?; Ok(json!({ "size": size, @@ -212,11 +211,8 @@ async fn op_datagram_send( .resource_table .get::<UdpSocketResource>(rid as u32) .ok_or_else(|| bad_resource("Socket has been closed"))?; - let byte_length = resource - .wr_borrow_mut() - .await - .send_to(&zero_copy, &addr) - .await?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + let byte_length = socket.send_to(&zero_copy, &addr).await?; Ok(json!(byte_length)) } #[cfg(unix)] @@ -237,7 +233,7 @@ async fn op_datagram_send( .ok_or_else(|| { custom_error("NotConnected", "Socket has been closed") })?; - let mut socket = RcRef::map(&resource, |r| &r.socket) + let socket = RcRef::map(&resource, |r| &r.socket) .try_borrow_mut() .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; let byte_length = socket.send_to(&zero_copy, address_path).await?; @@ -350,7 +346,8 @@ async fn op_shutdown( let rid = args.rid as u32; let how = args.how; - let shutdown_mode = match how { + // TODO(bartlomieju): no longer needed after Tokio 1.0 upgrade + let _shutdown_mode = match how { 0 => Shutdown::Read, // TODO: nonsense, remove me. 1 => Shutdown::Write, _ => unimplemented!(), @@ -362,18 +359,18 @@ async fn op_shutdown( .get_any(rid) .ok_or_else(bad_resource_id)?; if let Some(stream) = resource.downcast_rc::<TcpStreamResource>() { - let wr = stream.wr_borrow_mut().await; - TcpStream::shutdown((*wr).as_ref(), shutdown_mode)?; + let mut wr = stream.wr_borrow_mut().await; + wr.shutdown().await?; return Ok(json!({})); } #[cfg(unix)] if let Some(stream) = resource.downcast_rc::<StreamResource>() { if stream.unix_stream.is_some() { - let wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap()) + let mut wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap()) .borrow_mut() .await; - net_unix::UnixStream::shutdown(&*wr, shutdown_mode)?; + wr.shutdown().await?; return Ok(json!({})); } } @@ -396,7 +393,10 @@ impl Resource for TcpListenerResource { } } -type UdpSocketResource = FullDuplexResource<udp::RecvHalf, udp::SendHalf>; +struct UdpSocketResource { + socket: AsyncRefCell<UdpSocket>, + cancel: CancelHandle, +} impl Resource for UdpSocketResource { fn name(&self) -> Cow<str> { @@ -404,7 +404,7 @@ impl Resource for UdpSocketResource { } fn close(self: Rc<Self>) { - self.cancel_read_ops() + self.cancel.cancel() } } @@ -434,6 +434,7 @@ fn listen_tcp( addr: SocketAddr, ) -> Result<(u32, SocketAddr), AnyError> { let std_listener = std::net::TcpListener::bind(&addr)?; + std_listener.set_nonblocking(true)?; let listener = TcpListener::from_std(std_listener)?; let local_addr = listener.local_addr()?; let listener_resource = TcpListenerResource { @@ -450,9 +451,13 @@ fn listen_udp( addr: SocketAddr, ) -> Result<(u32, SocketAddr), AnyError> { let std_socket = std::net::UdpSocket::bind(&addr)?; + std_socket.set_nonblocking(true)?; let socket = UdpSocket::from_std(std_socket)?; let local_addr = socket.local_addr()?; - let socket_resource = UdpSocketResource::new(socket.split()); + let socket_resource = UdpSocketResource { + socket: AsyncRefCell::new(socket), + cancel: Default::default(), + }; let rid = state.resource_table.add(socket_resource); Ok((rid, local_addr)) |