summaryrefslogtreecommitdiff
path: root/runtime
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2021-01-12 08:50:02 +0100
committerGitHub <noreply@github.com>2021-01-11 23:50:02 -0800
commit275a5c65a20529cd4a3d775b8d8c6e9b261c76b1 (patch)
tree9f861e36e70be809d5586128a24b9f7b4332e09e /runtime
parent36ff7bdf575e0547fabd8957ee778cc4224d5956 (diff)
upgrade: tokio 1.0 (#8779)
Co-authored-by: Bert Belder <bertbelder@gmail.com>
Diffstat (limited to 'runtime')
-rw-r--r--runtime/Cargo.toml36
-rw-r--r--runtime/inspector.rs11
-rw-r--r--runtime/ops/fs.rs1
-rw-r--r--runtime/ops/fs_events.rs2
-rw-r--r--runtime/ops/net.rs47
-rw-r--r--runtime/ops/net_unix.rs9
-rw-r--r--runtime/ops/process.rs2
-rw-r--r--runtime/ops/timers.rs2
-rw-r--r--runtime/ops/tls.rs3
-rw-r--r--runtime/tokio_util.rs7
-rw-r--r--runtime/web_worker.rs2
11 files changed, 66 insertions, 56 deletions
diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml
index 1c75aaff1..127fd6241 100644
--- a/runtime/Cargo.toml
+++ b/runtime/Cargo.toml
@@ -37,37 +37,37 @@ deno_websocket = { path = "../op_crates/websocket", version = "0.1.0" }
atty = "0.2.14"
dlopen = "0.1.8"
-encoding_rs = "0.8.24"
-env_logger = "0.7.1"
-filetime = "0.2.12"
-http = "0.2.1"
-indexmap = "1.6.0"
+encoding_rs = "0.8.26"
+env_logger = "0.8.2"
+filetime = "0.2.13"
+http = "0.2.3"
+hyper = { version = "0.14.2", features = ["server"] }
+indexmap = "1.6.1"
lazy_static = "1.4.0"
-libc = "0.2.77"
-log = "0.4.11"
-notify = "5.0.0-pre.3"
+libc = "0.2.82"
+log = "0.4.13"
+notify = "5.0.0-pre.4"
percent-encoding = "2.1.0"
-regex = "1.3.9"
+regex = "1.4.3"
ring = "0.16.19"
rustyline = { version = "7.1.0", default-features = false }
rustyline-derive = "0.4.0"
serde = { version = "1.0.116", features = ["derive"] }
shell-escape = "0.1.5"
sys-info = "0.7.0"
-termcolor = "1.1.0"
-tokio = { version = "0.2.22", features = ["full"] }
-tokio-rustls = "0.14.1"
-uuid = { version = "0.8.1", features = ["v4"] }
-hyper = "0.13.9"
-webpki = "0.21.3"
-webpki-roots = "=0.19.0" # Pinned to v0.19.0 to match 'reqwest'.
+termcolor = "1.1.2"
+tokio = { version = "1.0.1", features = ["full"] }
+tokio-rustls = "0.22.0"
+uuid = { version = "0.8.2", features = ["v4"] }
+webpki = "0.21.4"
+webpki-roots = "0.21.0"
[target.'cfg(windows)'.dependencies]
-winapi = { version = "0.3.9", features = ["knownfolders", "mswsock", "objbase", "shlobj", "tlhelp32", "winbase", "winerror", "winsock2"] }
fwdansi = "1.1.0"
+winapi = { version = "0.3.9", features = ["knownfolders", "mswsock", "objbase", "shlobj", "tlhelp32", "winbase", "winerror", "winsock2"] }
[target.'cfg(unix)'.dependencies]
-nix = "0.19.0"
+nix = "0.19.1"
[dev-dependencies]
# Used in benchmark
diff --git a/runtime/inspector.rs b/runtime/inspector.rs
index 80beec0ec..0a2a236f2 100644
--- a/runtime/inspector.rs
+++ b/runtime/inspector.rs
@@ -16,6 +16,7 @@ use deno_core::futures::pin_mut;
use deno_core::futures::prelude::*;
use deno_core::futures::select;
use deno_core::futures::stream::FuturesUnordered;
+use deno_core::futures::stream::StreamExt;
use deno_core::futures::task;
use deno_core::futures::task::Context;
use deno_core::futures::task::Poll;
@@ -58,10 +59,10 @@ impl InspectorServer {
let (shutdown_server_tx, shutdown_server_rx) = oneshot::channel();
let thread_handle = thread::spawn(move || {
- let mut rt = crate::tokio_util::create_basic_runtime();
+ let rt = crate::tokio_util::create_basic_runtime();
let local = tokio::task::LocalSet::new();
local.block_on(
- &mut rt,
+ &rt,
server(host, register_inspector_rx, shutdown_server_rx, name),
)
});
@@ -182,9 +183,13 @@ fn handle_ws_request(
.status(http::StatusCode::BAD_REQUEST)
.body("Not a valid Websocket Request".into()),
});
+
+ let (parts, _) = req.into_parts();
+ let req = http::Request::from_parts(parts, body);
+
if resp.is_ok() {
tokio::task::spawn_local(async move {
- let upgraded = body.on_upgrade().await.unwrap();
+ let upgraded = hyper::upgrade::on(req).await.unwrap();
let websocket =
deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket(
upgraded,
diff --git a/runtime/ops/fs.rs b/runtime/ops/fs.rs
index 3b0c02083..d1a2489ba 100644
--- a/runtime/ops/fs.rs
+++ b/runtime/ops/fs.rs
@@ -27,6 +27,7 @@ use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
+use tokio::io::AsyncSeekExt;
#[cfg(not(unix))]
use deno_core::error::generic_error;
diff --git a/runtime/ops/fs_events.rs b/runtime/ops/fs_events.rs
index c97ba0af4..6f6ba04bd 100644
--- a/runtime/ops/fs_events.rs
+++ b/runtime/ops/fs_events.rs
@@ -99,7 +99,7 @@ fn op_fs_events_open(
let mut watcher: RecommendedWatcher =
Watcher::new_immediate(move |res: Result<NotifyEvent, NotifyError>| {
let res2 = res.map(FsEvent::from).map_err(AnyError::from);
- let mut sender = sender.lock().unwrap();
+ let sender = sender.lock().unwrap();
// Ignore result, if send failed it means that watcher was already closed,
// but not all messages have been flushed.
let _ = sender.try_send(res2);
diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs
index 6b7e05771..dea7ffe51 100644
--- a/runtime/ops/net.rs
+++ b/runtime/ops/net.rs
@@ -1,6 +1,5 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-use crate::ops::io::FullDuplexResource;
use crate::ops::io::TcpStreamResource;
use crate::permissions::Permissions;
use crate::resolve_addr::resolve_addr;
@@ -28,7 +27,7 @@ use std::cell::RefCell;
use std::net::Shutdown;
use std::net::SocketAddr;
use std::rc::Rc;
-use tokio::net::udp;
+use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::net::UdpSocket;
@@ -67,7 +66,7 @@ async fn accept_tcp(
.resource_table
.get::<TcpListenerResource>(rid)
.ok_or_else(|| bad_resource("Listener has been closed"))?;
- let mut listener = RcRef::map(&resource, |r| &r.listener)
+ let listener = RcRef::map(&resource, |r| &r.listener)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
let cancel = RcRef::map(resource, |r| &r.cancel);
@@ -140,11 +139,11 @@ async fn receive_udp(
.resource_table
.get::<UdpSocketResource>(rid)
.ok_or_else(|| bad_resource("Socket has been closed"))?;
- let (size, remote_addr) = resource
- .rd_borrow_mut()
- .await
+ let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
+ let cancel_handle = RcRef::map(&resource, |r| &r.cancel);
+ let (size, remote_addr) = socket
.recv_from(&mut zero_copy)
- .try_or_cancel(resource.cancel_handle())
+ .try_or_cancel(cancel_handle)
.await?;
Ok(json!({
"size": size,
@@ -212,11 +211,8 @@ async fn op_datagram_send(
.resource_table
.get::<UdpSocketResource>(rid as u32)
.ok_or_else(|| bad_resource("Socket has been closed"))?;
- let byte_length = resource
- .wr_borrow_mut()
- .await
- .send_to(&zero_copy, &addr)
- .await?;
+ let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
+ let byte_length = socket.send_to(&zero_copy, &addr).await?;
Ok(json!(byte_length))
}
#[cfg(unix)]
@@ -237,7 +233,7 @@ async fn op_datagram_send(
.ok_or_else(|| {
custom_error("NotConnected", "Socket has been closed")
})?;
- let mut socket = RcRef::map(&resource, |r| &r.socket)
+ let socket = RcRef::map(&resource, |r| &r.socket)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
let byte_length = socket.send_to(&zero_copy, address_path).await?;
@@ -350,7 +346,8 @@ async fn op_shutdown(
let rid = args.rid as u32;
let how = args.how;
- let shutdown_mode = match how {
+ // TODO(bartlomieju): no longer needed after Tokio 1.0 upgrade
+ let _shutdown_mode = match how {
0 => Shutdown::Read, // TODO: nonsense, remove me.
1 => Shutdown::Write,
_ => unimplemented!(),
@@ -362,18 +359,18 @@ async fn op_shutdown(
.get_any(rid)
.ok_or_else(bad_resource_id)?;
if let Some(stream) = resource.downcast_rc::<TcpStreamResource>() {
- let wr = stream.wr_borrow_mut().await;
- TcpStream::shutdown((*wr).as_ref(), shutdown_mode)?;
+ let mut wr = stream.wr_borrow_mut().await;
+ wr.shutdown().await?;
return Ok(json!({}));
}
#[cfg(unix)]
if let Some(stream) = resource.downcast_rc::<StreamResource>() {
if stream.unix_stream.is_some() {
- let wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap())
+ let mut wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap())
.borrow_mut()
.await;
- net_unix::UnixStream::shutdown(&*wr, shutdown_mode)?;
+ wr.shutdown().await?;
return Ok(json!({}));
}
}
@@ -396,7 +393,10 @@ impl Resource for TcpListenerResource {
}
}
-type UdpSocketResource = FullDuplexResource<udp::RecvHalf, udp::SendHalf>;
+struct UdpSocketResource {
+ socket: AsyncRefCell<UdpSocket>,
+ cancel: CancelHandle,
+}
impl Resource for UdpSocketResource {
fn name(&self) -> Cow<str> {
@@ -404,7 +404,7 @@ impl Resource for UdpSocketResource {
}
fn close(self: Rc<Self>) {
- self.cancel_read_ops()
+ self.cancel.cancel()
}
}
@@ -434,6 +434,7 @@ fn listen_tcp(
addr: SocketAddr,
) -> Result<(u32, SocketAddr), AnyError> {
let std_listener = std::net::TcpListener::bind(&addr)?;
+ std_listener.set_nonblocking(true)?;
let listener = TcpListener::from_std(std_listener)?;
let local_addr = listener.local_addr()?;
let listener_resource = TcpListenerResource {
@@ -450,9 +451,13 @@ fn listen_udp(
addr: SocketAddr,
) -> Result<(u32, SocketAddr), AnyError> {
let std_socket = std::net::UdpSocket::bind(&addr)?;
+ std_socket.set_nonblocking(true)?;
let socket = UdpSocket::from_std(std_socket)?;
let local_addr = socket.local_addr()?;
- let socket_resource = UdpSocketResource::new(socket.split());
+ let socket_resource = UdpSocketResource {
+ socket: AsyncRefCell::new(socket),
+ cancel: Default::default(),
+ };
let rid = state.resource_table.add(socket_resource);
Ok((rid, local_addr))
diff --git a/runtime/ops/net_unix.rs b/runtime/ops/net_unix.rs
index ace66425c..1177d071c 100644
--- a/runtime/ops/net_unix.rs
+++ b/runtime/ops/net_unix.rs
@@ -19,7 +19,6 @@ use serde::Deserialize;
use std::borrow::Cow;
use std::cell::RefCell;
use std::fs::remove_file;
-use std::os::unix;
use std::path::Path;
use std::rc::Rc;
use tokio::net::UnixDatagram;
@@ -73,7 +72,7 @@ pub(crate) async fn accept_unix(
.resource_table
.get::<UnixListenerResource>(rid)
.ok_or_else(|| bad_resource("Listener has been closed"))?;
- let mut listener = RcRef::map(&resource, |r| &r.listener)
+ let listener = RcRef::map(&resource, |r| &r.listener)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Listener already in use"))?;
let cancel = RcRef::map(resource, |r| &r.cancel);
@@ -113,7 +112,7 @@ pub(crate) async fn receive_unix_packet(
.resource_table
.get::<UnixDatagramResource>(rid)
.ok_or_else(|| bad_resource("Socket has been closed"))?;
- let mut socket = RcRef::map(&resource, |r| &r.socket)
+ let socket = RcRef::map(&resource, |r| &r.socket)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
let cancel = RcRef::map(resource, |r| &r.cancel);
@@ -131,7 +130,7 @@ pub(crate) async fn receive_unix_packet(
pub fn listen_unix(
state: &mut OpState,
addr: &Path,
-) -> Result<(u32, unix::net::SocketAddr), AnyError> {
+) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> {
if addr.exists() {
remove_file(&addr).unwrap();
}
@@ -149,7 +148,7 @@ pub fn listen_unix(
pub fn listen_unix_packet(
state: &mut OpState,
addr: &Path,
-) -> Result<(u32, unix::net::SocketAddr), AnyError> {
+) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> {
if addr.exists() {
remove_file(&addr).unwrap();
}
diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs
index e0a9a0795..63e22b601 100644
--- a/runtime/ops/process.rs
+++ b/runtime/ops/process.rs
@@ -199,7 +199,7 @@ async fn op_run_status(
.get::<ChildResource>(rid)
.ok_or_else(bad_resource_id)?;
let mut child = resource.borrow_mut().await;
- let run_status = (&mut *child).await?;
+ let run_status = child.wait().await?;
let code = run_status.code();
#[cfg(unix)]
diff --git a/runtime/ops/timers.rs b/runtime/ops/timers.rs
index 940a96a8e..d53a0c971 100644
--- a/runtime/ops/timers.rs
+++ b/runtime/ops/timers.rs
@@ -60,7 +60,7 @@ impl GlobalTimer {
let (tx, rx) = oneshot::channel();
self.tx = Some(tx);
- let delay = tokio::time::delay_until(deadline.into());
+ let delay = tokio::time::sleep_until(deadline.into()).boxed_local();
let rx = rx
.map_err(|err| panic!("Unexpected error in receiving channel {:?}", err));
diff --git a/runtime/ops/tls.rs b/runtime/ops/tls.rs
index 9c4c9f422..05d432e1c 100644
--- a/runtime/ops/tls.rs
+++ b/runtime/ops/tls.rs
@@ -303,6 +303,7 @@ fn op_listen_tls(
.next()
.ok_or_else(|| generic_error("No resolved address found"))?;
let std_listener = std::net::TcpListener::bind(&addr)?;
+ std_listener.set_nonblocking(true)?;
let listener = TcpListener::from_std(std_listener)?;
let local_addr = listener.local_addr()?;
let tls_listener_resource = TlsListenerResource {
@@ -341,7 +342,7 @@ async fn op_accept_tls(
.resource_table
.get::<TlsListenerResource>(rid)
.ok_or_else(|| bad_resource("Listener has been closed"))?;
- let mut listener = RcRef::map(&resource, |r| &r.listener)
+ let listener = RcRef::map(&resource, |r| &r.listener)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
let cancel = RcRef::map(resource, |r| &r.cancel);
diff --git a/runtime/tokio_util.rs b/runtime/tokio_util.rs
index ef0ba5be3..5ee45325d 100644
--- a/runtime/tokio_util.rs
+++ b/runtime/tokio_util.rs
@@ -1,8 +1,7 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
pub fn create_basic_runtime() -> tokio::runtime::Runtime {
- tokio::runtime::Builder::new()
- .basic_scheduler()
+ tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
// This limits the number of threads for blocking operations (like for
@@ -10,7 +9,7 @@ pub fn create_basic_runtime() -> tokio::runtime::Runtime {
// parallel for deno fmt.
// The default value is 512, which is an unhelpfully large thread pool. We
// don't ever want to have more than a couple dozen threads.
- .max_threads(32)
+ .max_blocking_threads(32)
.build()
.unwrap()
}
@@ -20,6 +19,6 @@ pub fn run_basic<F, R>(future: F) -> R
where
F: std::future::Future<Output = R>,
{
- let mut rt = create_basic_runtime();
+ let rt = create_basic_runtime();
rt.block_on(future)
}
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs
index 87487e499..0efb547f4 100644
--- a/runtime/web_worker.rs
+++ b/runtime/web_worker.rs
@@ -430,7 +430,7 @@ pub fn run_web_worker(
) -> Result<(), AnyError> {
let name = worker.name.to_string();
- let mut rt = create_basic_runtime();
+ let rt = create_basic_runtime();
// TODO(bartlomieju): run following block using "select!"
// with terminate