summaryrefslogtreecommitdiff
path: root/runtime/ops
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/ops')
-rw-r--r--runtime/ops/fs.rs1
-rw-r--r--runtime/ops/fs_events.rs2
-rw-r--r--runtime/ops/net.rs47
-rw-r--r--runtime/ops/net_unix.rs9
-rw-r--r--runtime/ops/process.rs2
-rw-r--r--runtime/ops/timers.rs2
-rw-r--r--runtime/ops/tls.rs3
7 files changed, 36 insertions, 30 deletions
diff --git a/runtime/ops/fs.rs b/runtime/ops/fs.rs
index 3b0c02083..d1a2489ba 100644
--- a/runtime/ops/fs.rs
+++ b/runtime/ops/fs.rs
@@ -27,6 +27,7 @@ use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
+use tokio::io::AsyncSeekExt;
#[cfg(not(unix))]
use deno_core::error::generic_error;
diff --git a/runtime/ops/fs_events.rs b/runtime/ops/fs_events.rs
index c97ba0af4..6f6ba04bd 100644
--- a/runtime/ops/fs_events.rs
+++ b/runtime/ops/fs_events.rs
@@ -99,7 +99,7 @@ fn op_fs_events_open(
let mut watcher: RecommendedWatcher =
Watcher::new_immediate(move |res: Result<NotifyEvent, NotifyError>| {
let res2 = res.map(FsEvent::from).map_err(AnyError::from);
- let mut sender = sender.lock().unwrap();
+ let sender = sender.lock().unwrap();
// Ignore result, if send failed it means that watcher was already closed,
// but not all messages have been flushed.
let _ = sender.try_send(res2);
diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs
index 6b7e05771..dea7ffe51 100644
--- a/runtime/ops/net.rs
+++ b/runtime/ops/net.rs
@@ -1,6 +1,5 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-use crate::ops::io::FullDuplexResource;
use crate::ops::io::TcpStreamResource;
use crate::permissions::Permissions;
use crate::resolve_addr::resolve_addr;
@@ -28,7 +27,7 @@ use std::cell::RefCell;
use std::net::Shutdown;
use std::net::SocketAddr;
use std::rc::Rc;
-use tokio::net::udp;
+use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::net::UdpSocket;
@@ -67,7 +66,7 @@ async fn accept_tcp(
.resource_table
.get::<TcpListenerResource>(rid)
.ok_or_else(|| bad_resource("Listener has been closed"))?;
- let mut listener = RcRef::map(&resource, |r| &r.listener)
+ let listener = RcRef::map(&resource, |r| &r.listener)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
let cancel = RcRef::map(resource, |r| &r.cancel);
@@ -140,11 +139,11 @@ async fn receive_udp(
.resource_table
.get::<UdpSocketResource>(rid)
.ok_or_else(|| bad_resource("Socket has been closed"))?;
- let (size, remote_addr) = resource
- .rd_borrow_mut()
- .await
+ let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
+ let cancel_handle = RcRef::map(&resource, |r| &r.cancel);
+ let (size, remote_addr) = socket
.recv_from(&mut zero_copy)
- .try_or_cancel(resource.cancel_handle())
+ .try_or_cancel(cancel_handle)
.await?;
Ok(json!({
"size": size,
@@ -212,11 +211,8 @@ async fn op_datagram_send(
.resource_table
.get::<UdpSocketResource>(rid as u32)
.ok_or_else(|| bad_resource("Socket has been closed"))?;
- let byte_length = resource
- .wr_borrow_mut()
- .await
- .send_to(&zero_copy, &addr)
- .await?;
+ let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
+ let byte_length = socket.send_to(&zero_copy, &addr).await?;
Ok(json!(byte_length))
}
#[cfg(unix)]
@@ -237,7 +233,7 @@ async fn op_datagram_send(
.ok_or_else(|| {
custom_error("NotConnected", "Socket has been closed")
})?;
- let mut socket = RcRef::map(&resource, |r| &r.socket)
+ let socket = RcRef::map(&resource, |r| &r.socket)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
let byte_length = socket.send_to(&zero_copy, address_path).await?;
@@ -350,7 +346,8 @@ async fn op_shutdown(
let rid = args.rid as u32;
let how = args.how;
- let shutdown_mode = match how {
+ // TODO(bartlomieju): no longer needed after Tokio 1.0 upgrade
+ let _shutdown_mode = match how {
0 => Shutdown::Read, // TODO: nonsense, remove me.
1 => Shutdown::Write,
_ => unimplemented!(),
@@ -362,18 +359,18 @@ async fn op_shutdown(
.get_any(rid)
.ok_or_else(bad_resource_id)?;
if let Some(stream) = resource.downcast_rc::<TcpStreamResource>() {
- let wr = stream.wr_borrow_mut().await;
- TcpStream::shutdown((*wr).as_ref(), shutdown_mode)?;
+ let mut wr = stream.wr_borrow_mut().await;
+ wr.shutdown().await?;
return Ok(json!({}));
}
#[cfg(unix)]
if let Some(stream) = resource.downcast_rc::<StreamResource>() {
if stream.unix_stream.is_some() {
- let wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap())
+ let mut wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap())
.borrow_mut()
.await;
- net_unix::UnixStream::shutdown(&*wr, shutdown_mode)?;
+ wr.shutdown().await?;
return Ok(json!({}));
}
}
@@ -396,7 +393,10 @@ impl Resource for TcpListenerResource {
}
}
-type UdpSocketResource = FullDuplexResource<udp::RecvHalf, udp::SendHalf>;
+struct UdpSocketResource {
+ socket: AsyncRefCell<UdpSocket>,
+ cancel: CancelHandle,
+}
impl Resource for UdpSocketResource {
fn name(&self) -> Cow<str> {
@@ -404,7 +404,7 @@ impl Resource for UdpSocketResource {
}
fn close(self: Rc<Self>) {
- self.cancel_read_ops()
+ self.cancel.cancel()
}
}
@@ -434,6 +434,7 @@ fn listen_tcp(
addr: SocketAddr,
) -> Result<(u32, SocketAddr), AnyError> {
let std_listener = std::net::TcpListener::bind(&addr)?;
+ std_listener.set_nonblocking(true)?;
let listener = TcpListener::from_std(std_listener)?;
let local_addr = listener.local_addr()?;
let listener_resource = TcpListenerResource {
@@ -450,9 +451,13 @@ fn listen_udp(
addr: SocketAddr,
) -> Result<(u32, SocketAddr), AnyError> {
let std_socket = std::net::UdpSocket::bind(&addr)?;
+ std_socket.set_nonblocking(true)?;
let socket = UdpSocket::from_std(std_socket)?;
let local_addr = socket.local_addr()?;
- let socket_resource = UdpSocketResource::new(socket.split());
+ let socket_resource = UdpSocketResource {
+ socket: AsyncRefCell::new(socket),
+ cancel: Default::default(),
+ };
let rid = state.resource_table.add(socket_resource);
Ok((rid, local_addr))
diff --git a/runtime/ops/net_unix.rs b/runtime/ops/net_unix.rs
index ace66425c..1177d071c 100644
--- a/runtime/ops/net_unix.rs
+++ b/runtime/ops/net_unix.rs
@@ -19,7 +19,6 @@ use serde::Deserialize;
use std::borrow::Cow;
use std::cell::RefCell;
use std::fs::remove_file;
-use std::os::unix;
use std::path::Path;
use std::rc::Rc;
use tokio::net::UnixDatagram;
@@ -73,7 +72,7 @@ pub(crate) async fn accept_unix(
.resource_table
.get::<UnixListenerResource>(rid)
.ok_or_else(|| bad_resource("Listener has been closed"))?;
- let mut listener = RcRef::map(&resource, |r| &r.listener)
+ let listener = RcRef::map(&resource, |r| &r.listener)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Listener already in use"))?;
let cancel = RcRef::map(resource, |r| &r.cancel);
@@ -113,7 +112,7 @@ pub(crate) async fn receive_unix_packet(
.resource_table
.get::<UnixDatagramResource>(rid)
.ok_or_else(|| bad_resource("Socket has been closed"))?;
- let mut socket = RcRef::map(&resource, |r| &r.socket)
+ let socket = RcRef::map(&resource, |r| &r.socket)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
let cancel = RcRef::map(resource, |r| &r.cancel);
@@ -131,7 +130,7 @@ pub(crate) async fn receive_unix_packet(
pub fn listen_unix(
state: &mut OpState,
addr: &Path,
-) -> Result<(u32, unix::net::SocketAddr), AnyError> {
+) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> {
if addr.exists() {
remove_file(&addr).unwrap();
}
@@ -149,7 +148,7 @@ pub fn listen_unix(
pub fn listen_unix_packet(
state: &mut OpState,
addr: &Path,
-) -> Result<(u32, unix::net::SocketAddr), AnyError> {
+) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> {
if addr.exists() {
remove_file(&addr).unwrap();
}
diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs
index e0a9a0795..63e22b601 100644
--- a/runtime/ops/process.rs
+++ b/runtime/ops/process.rs
@@ -199,7 +199,7 @@ async fn op_run_status(
.get::<ChildResource>(rid)
.ok_or_else(bad_resource_id)?;
let mut child = resource.borrow_mut().await;
- let run_status = (&mut *child).await?;
+ let run_status = child.wait().await?;
let code = run_status.code();
#[cfg(unix)]
diff --git a/runtime/ops/timers.rs b/runtime/ops/timers.rs
index 940a96a8e..d53a0c971 100644
--- a/runtime/ops/timers.rs
+++ b/runtime/ops/timers.rs
@@ -60,7 +60,7 @@ impl GlobalTimer {
let (tx, rx) = oneshot::channel();
self.tx = Some(tx);
- let delay = tokio::time::delay_until(deadline.into());
+ let delay = tokio::time::sleep_until(deadline.into()).boxed_local();
let rx = rx
.map_err(|err| panic!("Unexpected error in receiving channel {:?}", err));
diff --git a/runtime/ops/tls.rs b/runtime/ops/tls.rs
index 9c4c9f422..05d432e1c 100644
--- a/runtime/ops/tls.rs
+++ b/runtime/ops/tls.rs
@@ -303,6 +303,7 @@ fn op_listen_tls(
.next()
.ok_or_else(|| generic_error("No resolved address found"))?;
let std_listener = std::net::TcpListener::bind(&addr)?;
+ std_listener.set_nonblocking(true)?;
let listener = TcpListener::from_std(std_listener)?;
let local_addr = listener.local_addr()?;
let tls_listener_resource = TlsListenerResource {
@@ -341,7 +342,7 @@ async fn op_accept_tls(
.resource_table
.get::<TlsListenerResource>(rid)
.ok_or_else(|| bad_resource("Listener has been closed"))?;
- let mut listener = RcRef::map(&resource, |r| &r.listener)
+ let listener = RcRef::map(&resource, |r| &r.listener)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
let cancel = RcRef::map(resource, |r| &r.cancel);