summaryrefslogtreecommitdiff
path: root/runtime/ops/net.rs
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/ops/net.rs')
-rw-r--r--runtime/ops/net.rs47
1 files changed, 26 insertions, 21 deletions
diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs
index 6b7e05771..dea7ffe51 100644
--- a/runtime/ops/net.rs
+++ b/runtime/ops/net.rs
@@ -1,6 +1,5 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-use crate::ops::io::FullDuplexResource;
use crate::ops::io::TcpStreamResource;
use crate::permissions::Permissions;
use crate::resolve_addr::resolve_addr;
@@ -28,7 +27,7 @@ use std::cell::RefCell;
use std::net::Shutdown;
use std::net::SocketAddr;
use std::rc::Rc;
-use tokio::net::udp;
+use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::net::UdpSocket;
@@ -67,7 +66,7 @@ async fn accept_tcp(
.resource_table
.get::<TcpListenerResource>(rid)
.ok_or_else(|| bad_resource("Listener has been closed"))?;
- let mut listener = RcRef::map(&resource, |r| &r.listener)
+ let listener = RcRef::map(&resource, |r| &r.listener)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
let cancel = RcRef::map(resource, |r| &r.cancel);
@@ -140,11 +139,11 @@ async fn receive_udp(
.resource_table
.get::<UdpSocketResource>(rid)
.ok_or_else(|| bad_resource("Socket has been closed"))?;
- let (size, remote_addr) = resource
- .rd_borrow_mut()
- .await
+ let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
+ let cancel_handle = RcRef::map(&resource, |r| &r.cancel);
+ let (size, remote_addr) = socket
.recv_from(&mut zero_copy)
- .try_or_cancel(resource.cancel_handle())
+ .try_or_cancel(cancel_handle)
.await?;
Ok(json!({
"size": size,
@@ -212,11 +211,8 @@ async fn op_datagram_send(
.resource_table
.get::<UdpSocketResource>(rid as u32)
.ok_or_else(|| bad_resource("Socket has been closed"))?;
- let byte_length = resource
- .wr_borrow_mut()
- .await
- .send_to(&zero_copy, &addr)
- .await?;
+ let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
+ let byte_length = socket.send_to(&zero_copy, &addr).await?;
Ok(json!(byte_length))
}
#[cfg(unix)]
@@ -237,7 +233,7 @@ async fn op_datagram_send(
.ok_or_else(|| {
custom_error("NotConnected", "Socket has been closed")
})?;
- let mut socket = RcRef::map(&resource, |r| &r.socket)
+ let socket = RcRef::map(&resource, |r| &r.socket)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
let byte_length = socket.send_to(&zero_copy, address_path).await?;
@@ -350,7 +346,8 @@ async fn op_shutdown(
let rid = args.rid as u32;
let how = args.how;
- let shutdown_mode = match how {
+ // TODO(bartlomieju): no longer needed after Tokio 1.0 upgrade
+ let _shutdown_mode = match how {
0 => Shutdown::Read, // TODO: nonsense, remove me.
1 => Shutdown::Write,
_ => unimplemented!(),
@@ -362,18 +359,18 @@ async fn op_shutdown(
.get_any(rid)
.ok_or_else(bad_resource_id)?;
if let Some(stream) = resource.downcast_rc::<TcpStreamResource>() {
- let wr = stream.wr_borrow_mut().await;
- TcpStream::shutdown((*wr).as_ref(), shutdown_mode)?;
+ let mut wr = stream.wr_borrow_mut().await;
+ wr.shutdown().await?;
return Ok(json!({}));
}
#[cfg(unix)]
if let Some(stream) = resource.downcast_rc::<StreamResource>() {
if stream.unix_stream.is_some() {
- let wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap())
+ let mut wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap())
.borrow_mut()
.await;
- net_unix::UnixStream::shutdown(&*wr, shutdown_mode)?;
+ wr.shutdown().await?;
return Ok(json!({}));
}
}
@@ -396,7 +393,10 @@ impl Resource for TcpListenerResource {
}
}
-type UdpSocketResource = FullDuplexResource<udp::RecvHalf, udp::SendHalf>;
+struct UdpSocketResource {
+ socket: AsyncRefCell<UdpSocket>,
+ cancel: CancelHandle,
+}
impl Resource for UdpSocketResource {
fn name(&self) -> Cow<str> {
@@ -404,7 +404,7 @@ impl Resource for UdpSocketResource {
}
fn close(self: Rc<Self>) {
- self.cancel_read_ops()
+ self.cancel.cancel()
}
}
@@ -434,6 +434,7 @@ fn listen_tcp(
addr: SocketAddr,
) -> Result<(u32, SocketAddr), AnyError> {
let std_listener = std::net::TcpListener::bind(&addr)?;
+ std_listener.set_nonblocking(true)?;
let listener = TcpListener::from_std(std_listener)?;
let local_addr = listener.local_addr()?;
let listener_resource = TcpListenerResource {
@@ -450,9 +451,13 @@ fn listen_udp(
addr: SocketAddr,
) -> Result<(u32, SocketAddr), AnyError> {
let std_socket = std::net::UdpSocket::bind(&addr)?;
+ std_socket.set_nonblocking(true)?;
let socket = UdpSocket::from_std(std_socket)?;
let local_addr = socket.local_addr()?;
- let socket_resource = UdpSocketResource::new(socket.split());
+ let socket_resource = UdpSocketResource {
+ socket: AsyncRefCell::new(socket),
+ cancel: Default::default(),
+ };
let rid = state.resource_table.add(socket_resource);
Ok((rid, local_addr))