diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2021-01-12 08:50:02 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-11 23:50:02 -0800 |
commit | 275a5c65a20529cd4a3d775b8d8c6e9b261c76b1 (patch) | |
tree | 9f861e36e70be809d5586128a24b9f7b4332e09e /runtime | |
parent | 36ff7bdf575e0547fabd8957ee778cc4224d5956 (diff) |
upgrade: tokio 1.0 (#8779)
Co-authored-by: Bert Belder <bertbelder@gmail.com>
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/Cargo.toml | 36 | ||||
-rw-r--r-- | runtime/inspector.rs | 11 | ||||
-rw-r--r-- | runtime/ops/fs.rs | 1 | ||||
-rw-r--r-- | runtime/ops/fs_events.rs | 2 | ||||
-rw-r--r-- | runtime/ops/net.rs | 47 | ||||
-rw-r--r-- | runtime/ops/net_unix.rs | 9 | ||||
-rw-r--r-- | runtime/ops/process.rs | 2 | ||||
-rw-r--r-- | runtime/ops/timers.rs | 2 | ||||
-rw-r--r-- | runtime/ops/tls.rs | 3 | ||||
-rw-r--r-- | runtime/tokio_util.rs | 7 | ||||
-rw-r--r-- | runtime/web_worker.rs | 2 |
11 files changed, 66 insertions, 56 deletions
diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 1c75aaff1..127fd6241 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -37,37 +37,37 @@ deno_websocket = { path = "../op_crates/websocket", version = "0.1.0" } atty = "0.2.14" dlopen = "0.1.8" -encoding_rs = "0.8.24" -env_logger = "0.7.1" -filetime = "0.2.12" -http = "0.2.1" -indexmap = "1.6.0" +encoding_rs = "0.8.26" +env_logger = "0.8.2" +filetime = "0.2.13" +http = "0.2.3" +hyper = { version = "0.14.2", features = ["server"] } +indexmap = "1.6.1" lazy_static = "1.4.0" -libc = "0.2.77" -log = "0.4.11" -notify = "5.0.0-pre.3" +libc = "0.2.82" +log = "0.4.13" +notify = "5.0.0-pre.4" percent-encoding = "2.1.0" -regex = "1.3.9" +regex = "1.4.3" ring = "0.16.19" rustyline = { version = "7.1.0", default-features = false } rustyline-derive = "0.4.0" serde = { version = "1.0.116", features = ["derive"] } shell-escape = "0.1.5" sys-info = "0.7.0" -termcolor = "1.1.0" -tokio = { version = "0.2.22", features = ["full"] } -tokio-rustls = "0.14.1" -uuid = { version = "0.8.1", features = ["v4"] } -hyper = "0.13.9" -webpki = "0.21.3" -webpki-roots = "=0.19.0" # Pinned to v0.19.0 to match 'reqwest'. +termcolor = "1.1.2" +tokio = { version = "1.0.1", features = ["full"] } +tokio-rustls = "0.22.0" +uuid = { version = "0.8.2", features = ["v4"] } +webpki = "0.21.4" +webpki-roots = "0.21.0" [target.'cfg(windows)'.dependencies] -winapi = { version = "0.3.9", features = ["knownfolders", "mswsock", "objbase", "shlobj", "tlhelp32", "winbase", "winerror", "winsock2"] } fwdansi = "1.1.0" +winapi = { version = "0.3.9", features = ["knownfolders", "mswsock", "objbase", "shlobj", "tlhelp32", "winbase", "winerror", "winsock2"] } [target.'cfg(unix)'.dependencies] -nix = "0.19.0" +nix = "0.19.1" [dev-dependencies] # Used in benchmark diff --git a/runtime/inspector.rs b/runtime/inspector.rs index 80beec0ec..0a2a236f2 100644 --- a/runtime/inspector.rs +++ b/runtime/inspector.rs @@ -16,6 +16,7 @@ use deno_core::futures::pin_mut; use deno_core::futures::prelude::*; use deno_core::futures::select; use deno_core::futures::stream::FuturesUnordered; +use deno_core::futures::stream::StreamExt; use deno_core::futures::task; use deno_core::futures::task::Context; use deno_core::futures::task::Poll; @@ -58,10 +59,10 @@ impl InspectorServer { let (shutdown_server_tx, shutdown_server_rx) = oneshot::channel(); let thread_handle = thread::spawn(move || { - let mut rt = crate::tokio_util::create_basic_runtime(); + let rt = crate::tokio_util::create_basic_runtime(); let local = tokio::task::LocalSet::new(); local.block_on( - &mut rt, + &rt, server(host, register_inspector_rx, shutdown_server_rx, name), ) }); @@ -182,9 +183,13 @@ fn handle_ws_request( .status(http::StatusCode::BAD_REQUEST) .body("Not a valid Websocket Request".into()), }); + + let (parts, _) = req.into_parts(); + let req = http::Request::from_parts(parts, body); + if resp.is_ok() { tokio::task::spawn_local(async move { - let upgraded = body.on_upgrade().await.unwrap(); + let upgraded = hyper::upgrade::on(req).await.unwrap(); let websocket = deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket( upgraded, diff --git a/runtime/ops/fs.rs b/runtime/ops/fs.rs index 3b0c02083..d1a2489ba 100644 --- a/runtime/ops/fs.rs +++ b/runtime/ops/fs.rs @@ -27,6 +27,7 @@ use std::path::{Path, PathBuf}; use std::rc::Rc; use std::time::SystemTime; use std::time::UNIX_EPOCH; +use tokio::io::AsyncSeekExt; #[cfg(not(unix))] use deno_core::error::generic_error; diff --git a/runtime/ops/fs_events.rs b/runtime/ops/fs_events.rs index c97ba0af4..6f6ba04bd 100644 --- a/runtime/ops/fs_events.rs +++ b/runtime/ops/fs_events.rs @@ -99,7 +99,7 @@ fn op_fs_events_open( let mut watcher: RecommendedWatcher = Watcher::new_immediate(move |res: Result<NotifyEvent, NotifyError>| { let res2 = res.map(FsEvent::from).map_err(AnyError::from); - let mut sender = sender.lock().unwrap(); + let sender = sender.lock().unwrap(); // Ignore result, if send failed it means that watcher was already closed, // but not all messages have been flushed. let _ = sender.try_send(res2); diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs index 6b7e05771..dea7ffe51 100644 --- a/runtime/ops/net.rs +++ b/runtime/ops/net.rs @@ -1,6 +1,5 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use crate::ops::io::FullDuplexResource; use crate::ops::io::TcpStreamResource; use crate::permissions::Permissions; use crate::resolve_addr::resolve_addr; @@ -28,7 +27,7 @@ use std::cell::RefCell; use std::net::Shutdown; use std::net::SocketAddr; use std::rc::Rc; -use tokio::net::udp; +use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::net::UdpSocket; @@ -67,7 +66,7 @@ async fn accept_tcp( .resource_table .get::<TcpListenerResource>(rid) .ok_or_else(|| bad_resource("Listener has been closed"))?; - let mut listener = RcRef::map(&resource, |r| &r.listener) + let listener = RcRef::map(&resource, |r| &r.listener) .try_borrow_mut() .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?; let cancel = RcRef::map(resource, |r| &r.cancel); @@ -140,11 +139,11 @@ async fn receive_udp( .resource_table .get::<UdpSocketResource>(rid) .ok_or_else(|| bad_resource("Socket has been closed"))?; - let (size, remote_addr) = resource - .rd_borrow_mut() - .await + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + let cancel_handle = RcRef::map(&resource, |r| &r.cancel); + let (size, remote_addr) = socket .recv_from(&mut zero_copy) - .try_or_cancel(resource.cancel_handle()) + .try_or_cancel(cancel_handle) .await?; Ok(json!({ "size": size, @@ -212,11 +211,8 @@ async fn op_datagram_send( .resource_table .get::<UdpSocketResource>(rid as u32) .ok_or_else(|| bad_resource("Socket has been closed"))?; - let byte_length = resource - .wr_borrow_mut() - .await - .send_to(&zero_copy, &addr) - .await?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + let byte_length = socket.send_to(&zero_copy, &addr).await?; Ok(json!(byte_length)) } #[cfg(unix)] @@ -237,7 +233,7 @@ async fn op_datagram_send( .ok_or_else(|| { custom_error("NotConnected", "Socket has been closed") })?; - let mut socket = RcRef::map(&resource, |r| &r.socket) + let socket = RcRef::map(&resource, |r| &r.socket) .try_borrow_mut() .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; let byte_length = socket.send_to(&zero_copy, address_path).await?; @@ -350,7 +346,8 @@ async fn op_shutdown( let rid = args.rid as u32; let how = args.how; - let shutdown_mode = match how { + // TODO(bartlomieju): no longer needed after Tokio 1.0 upgrade + let _shutdown_mode = match how { 0 => Shutdown::Read, // TODO: nonsense, remove me. 1 => Shutdown::Write, _ => unimplemented!(), @@ -362,18 +359,18 @@ async fn op_shutdown( .get_any(rid) .ok_or_else(bad_resource_id)?; if let Some(stream) = resource.downcast_rc::<TcpStreamResource>() { - let wr = stream.wr_borrow_mut().await; - TcpStream::shutdown((*wr).as_ref(), shutdown_mode)?; + let mut wr = stream.wr_borrow_mut().await; + wr.shutdown().await?; return Ok(json!({})); } #[cfg(unix)] if let Some(stream) = resource.downcast_rc::<StreamResource>() { if stream.unix_stream.is_some() { - let wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap()) + let mut wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap()) .borrow_mut() .await; - net_unix::UnixStream::shutdown(&*wr, shutdown_mode)?; + wr.shutdown().await?; return Ok(json!({})); } } @@ -396,7 +393,10 @@ impl Resource for TcpListenerResource { } } -type UdpSocketResource = FullDuplexResource<udp::RecvHalf, udp::SendHalf>; +struct UdpSocketResource { + socket: AsyncRefCell<UdpSocket>, + cancel: CancelHandle, +} impl Resource for UdpSocketResource { fn name(&self) -> Cow<str> { @@ -404,7 +404,7 @@ impl Resource for UdpSocketResource { } fn close(self: Rc<Self>) { - self.cancel_read_ops() + self.cancel.cancel() } } @@ -434,6 +434,7 @@ fn listen_tcp( addr: SocketAddr, ) -> Result<(u32, SocketAddr), AnyError> { let std_listener = std::net::TcpListener::bind(&addr)?; + std_listener.set_nonblocking(true)?; let listener = TcpListener::from_std(std_listener)?; let local_addr = listener.local_addr()?; let listener_resource = TcpListenerResource { @@ -450,9 +451,13 @@ fn listen_udp( addr: SocketAddr, ) -> Result<(u32, SocketAddr), AnyError> { let std_socket = std::net::UdpSocket::bind(&addr)?; + std_socket.set_nonblocking(true)?; let socket = UdpSocket::from_std(std_socket)?; let local_addr = socket.local_addr()?; - let socket_resource = UdpSocketResource::new(socket.split()); + let socket_resource = UdpSocketResource { + socket: AsyncRefCell::new(socket), + cancel: Default::default(), + }; let rid = state.resource_table.add(socket_resource); Ok((rid, local_addr)) diff --git a/runtime/ops/net_unix.rs b/runtime/ops/net_unix.rs index ace66425c..1177d071c 100644 --- a/runtime/ops/net_unix.rs +++ b/runtime/ops/net_unix.rs @@ -19,7 +19,6 @@ use serde::Deserialize; use std::borrow::Cow; use std::cell::RefCell; use std::fs::remove_file; -use std::os::unix; use std::path::Path; use std::rc::Rc; use tokio::net::UnixDatagram; @@ -73,7 +72,7 @@ pub(crate) async fn accept_unix( .resource_table .get::<UnixListenerResource>(rid) .ok_or_else(|| bad_resource("Listener has been closed"))?; - let mut listener = RcRef::map(&resource, |r| &r.listener) + let listener = RcRef::map(&resource, |r| &r.listener) .try_borrow_mut() .ok_or_else(|| custom_error("Busy", "Listener already in use"))?; let cancel = RcRef::map(resource, |r| &r.cancel); @@ -113,7 +112,7 @@ pub(crate) async fn receive_unix_packet( .resource_table .get::<UnixDatagramResource>(rid) .ok_or_else(|| bad_resource("Socket has been closed"))?; - let mut socket = RcRef::map(&resource, |r| &r.socket) + let socket = RcRef::map(&resource, |r| &r.socket) .try_borrow_mut() .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; let cancel = RcRef::map(resource, |r| &r.cancel); @@ -131,7 +130,7 @@ pub(crate) async fn receive_unix_packet( pub fn listen_unix( state: &mut OpState, addr: &Path, -) -> Result<(u32, unix::net::SocketAddr), AnyError> { +) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> { if addr.exists() { remove_file(&addr).unwrap(); } @@ -149,7 +148,7 @@ pub fn listen_unix( pub fn listen_unix_packet( state: &mut OpState, addr: &Path, -) -> Result<(u32, unix::net::SocketAddr), AnyError> { +) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> { if addr.exists() { remove_file(&addr).unwrap(); } diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs index e0a9a0795..63e22b601 100644 --- a/runtime/ops/process.rs +++ b/runtime/ops/process.rs @@ -199,7 +199,7 @@ async fn op_run_status( .get::<ChildResource>(rid) .ok_or_else(bad_resource_id)?; let mut child = resource.borrow_mut().await; - let run_status = (&mut *child).await?; + let run_status = child.wait().await?; let code = run_status.code(); #[cfg(unix)] diff --git a/runtime/ops/timers.rs b/runtime/ops/timers.rs index 940a96a8e..d53a0c971 100644 --- a/runtime/ops/timers.rs +++ b/runtime/ops/timers.rs @@ -60,7 +60,7 @@ impl GlobalTimer { let (tx, rx) = oneshot::channel(); self.tx = Some(tx); - let delay = tokio::time::delay_until(deadline.into()); + let delay = tokio::time::sleep_until(deadline.into()).boxed_local(); let rx = rx .map_err(|err| panic!("Unexpected error in receiving channel {:?}", err)); diff --git a/runtime/ops/tls.rs b/runtime/ops/tls.rs index 9c4c9f422..05d432e1c 100644 --- a/runtime/ops/tls.rs +++ b/runtime/ops/tls.rs @@ -303,6 +303,7 @@ fn op_listen_tls( .next() .ok_or_else(|| generic_error("No resolved address found"))?; let std_listener = std::net::TcpListener::bind(&addr)?; + std_listener.set_nonblocking(true)?; let listener = TcpListener::from_std(std_listener)?; let local_addr = listener.local_addr()?; let tls_listener_resource = TlsListenerResource { @@ -341,7 +342,7 @@ async fn op_accept_tls( .resource_table .get::<TlsListenerResource>(rid) .ok_or_else(|| bad_resource("Listener has been closed"))?; - let mut listener = RcRef::map(&resource, |r| &r.listener) + let listener = RcRef::map(&resource, |r| &r.listener) .try_borrow_mut() .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?; let cancel = RcRef::map(resource, |r| &r.cancel); diff --git a/runtime/tokio_util.rs b/runtime/tokio_util.rs index ef0ba5be3..5ee45325d 100644 --- a/runtime/tokio_util.rs +++ b/runtime/tokio_util.rs @@ -1,8 +1,7 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. pub fn create_basic_runtime() -> tokio::runtime::Runtime { - tokio::runtime::Builder::new() - .basic_scheduler() + tokio::runtime::Builder::new_current_thread() .enable_io() .enable_time() // This limits the number of threads for blocking operations (like for @@ -10,7 +9,7 @@ pub fn create_basic_runtime() -> tokio::runtime::Runtime { // parallel for deno fmt. // The default value is 512, which is an unhelpfully large thread pool. We // don't ever want to have more than a couple dozen threads. - .max_threads(32) + .max_blocking_threads(32) .build() .unwrap() } @@ -20,6 +19,6 @@ pub fn run_basic<F, R>(future: F) -> R where F: std::future::Future<Output = R>, { - let mut rt = create_basic_runtime(); + let rt = create_basic_runtime(); rt.block_on(future) } diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 87487e499..0efb547f4 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -430,7 +430,7 @@ pub fn run_web_worker( ) -> Result<(), AnyError> { let name = worker.name.to_string(); - let mut rt = create_basic_runtime(); + let rt = create_basic_runtime(); // TODO(bartlomieju): run following block using "select!" // with terminate |