diff options
author | Ryan Dahl <ry@tinyclouds.org> | 2020-02-03 18:08:44 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-03 18:08:44 -0500 |
commit | 161cf7cdfd44ace8937fb7940727984990742d18 (patch) | |
tree | 1ef88b3cd6427353366d930ea9be5ae494504255 /core | |
parent | 0471243334ac1aeb76dcaadbc3f0b8114d188fb8 (diff) |
refactor: Use Tokio's single-threaded runtime (#3844)
This change simplifies how we execute V8. Previously V8 Isolates jumped
around threads every time they were woken up. This was overly complex and
potentially hurting performance in a myriad ways. Now isolates run on
their own dedicated thread and never move.
- blocking_json spawns a thread and does not use a thread pool
- op_host_poll_worker and op_host_resume_worker are non-operational
- removes Worker::get_message and Worker::post_message
- ThreadSafeState::workers table contains WorkerChannel entries instead
of actual Worker instances.
- MainWorker and CompilerWorker are no longer Futures.
- The multi-threaded version of deno_core_http_bench was removed.
- AyncOps no longer need to be Send + Sync
This PR is very large and several tests were disabled to speed
integration:
- installer_test_local_module_run
- installer_test_remote_module_run
- _015_duplicate_parallel_import
- _026_workers
Diffstat (limited to 'core')
-rw-r--r-- | core/Cargo.toml | 2 | ||||
-rw-r--r-- | core/es_isolate.rs | 4 | ||||
-rw-r--r-- | core/examples/http_bench.rs | 19 | ||||
-rw-r--r-- | core/isolate.rs | 9 | ||||
-rw-r--r-- | core/lib.rs | 1 | ||||
-rw-r--r-- | core/modules.rs | 20 | ||||
-rw-r--r-- | core/ops.rs | 10 | ||||
-rw-r--r-- | core/plugins.rs | 4 |
8 files changed, 29 insertions, 40 deletions
diff --git a/core/Cargo.toml b/core/Cargo.toml index fb1a2dd8f..59a0bd2bc 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -29,5 +29,5 @@ path = "examples/http_bench.rs" # tokio is only used for deno_core_http_bench [dev_dependencies] -tokio = { version = "0.2", features = ["full"] } +tokio = { version = "0.2", features = ["rt-core", "tcp"] } num_cpus = "1.11.1" diff --git a/core/es_isolate.rs b/core/es_isolate.rs index 1ad46ad25..295fe00ed 100644 --- a/core/es_isolate.rs +++ b/core/es_isolate.rs @@ -8,14 +8,14 @@ use rusty_v8 as v8; use crate::any_error::ErrBox; use crate::bindings; +use crate::futures::FutureExt; use crate::ErrWithV8Handle; -use futures::future::Future; -use futures::future::FutureExt; use futures::ready; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use futures::stream::StreamFuture; use futures::task::AtomicWaker; +use futures::Future; use libc::c_void; use std::collections::HashMap; use std::ops::{Deref, DerefMut}; diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index 8151c4575..fa570acfb 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -164,27 +164,18 @@ fn main() { isolate.register_op("write", http_op(op_write)); isolate.register_op("close", http_op(op_close)); - let multi_thread = args.iter().any(|a| a == "--multi-thread"); - println!( "num cpus; logical: {}; physical: {}", num_cpus::get(), num_cpus::get_physical() ); - let mut builder = tokio::runtime::Builder::new(); - let builder = if multi_thread { - println!("multi-thread"); - builder.threaded_scheduler() - } else { - println!("single-thread"); - builder.basic_scheduler() - }; - let mut runtime = builder - .enable_io() + let mut runtime = tokio::runtime::Builder::new() + .basic_scheduler() + .enable_all() .build() - .expect("Unable to create tokio runtime"); - let result = runtime.block_on(isolate.boxed()); + .unwrap(); + let result = runtime.block_on(isolate.boxed_local()); js_check(result); } diff --git a/core/isolate.rs b/core/isolate.rs index 3be90193c..55ba52d5c 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -20,12 +20,12 @@ use futures::stream::select; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use futures::task::AtomicWaker; +use futures::Future; use libc::c_void; use std::collections::HashMap; use std::convert::From; use std::error::Error; use std::fmt; -use std::future::Future; use std::ops::{Deref, DerefMut}; use std::option::Option; use std::pin::Pin; @@ -180,6 +180,7 @@ pub struct Isolate { error_handler: Option<Box<IsolateErrorHandleFn>>, } +// TODO(ry) this shouldn't be necessary, v8::OwnedIsolate should impl Send. unsafe impl Send for Isolate {} impl Drop for Isolate { @@ -423,7 +424,7 @@ impl Isolate { /// Requires runtime to explicitly ask for op ids before using any of the ops. pub fn register_op<F>(&self, name: &str, op: F) -> OpId where - F: Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp + Send + Sync + 'static, + F: Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp + 'static, { self.op_registry.register(name, op) } @@ -489,13 +490,13 @@ impl Isolate { } Op::Async(fut) => { let fut2 = fut.map_ok(move |buf| (op_id, buf)); - self.pending_ops.push(fut2.boxed()); + self.pending_ops.push(fut2.boxed_local()); self.have_unpolled_ops = true; None } Op::AsyncUnref(fut) => { let fut2 = fut.map_ok(move |buf| (op_id, buf)); - self.pending_unref_ops.push(fut2.boxed()); + self.pending_unref_ops.push(fut2.boxed_local()); self.have_unpolled_ops = true; None } diff --git a/core/lib.rs b/core/lib.rs index 2fcfa178b..9387d0cab 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -1,5 +1,4 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -#![deny(warnings)] #[macro_use] extern crate log; diff --git a/core/modules.rs b/core/modules.rs index c02bd4cab..b2d057219 100644 --- a/core/modules.rs +++ b/core/modules.rs @@ -21,9 +21,9 @@ use std::task::Context; use std::task::Poll; pub type SourceCodeInfoFuture = - dyn Future<Output = Result<SourceCodeInfo, ErrBox>> + Send; + dyn Future<Output = Result<SourceCodeInfo, ErrBox>>; -pub trait Loader: Send + Sync { +pub trait Loader: Send { /// Returns an absolute URL. /// When implementing an spec-complaint VM, this should be exactly the /// algorithm described here: @@ -148,7 +148,7 @@ impl RecursiveModuleLoad { _ => self .loader .load(&module_specifier, None, self.is_dynamic_import()) - .boxed(), + .boxed_local(), }; self.pending.push(load_fut); @@ -167,7 +167,7 @@ impl RecursiveModuleLoad { self .loader .load(&specifier, Some(referrer), self.is_dynamic_import()); - self.pending.push(fut.boxed()); + self.pending.push(fut.boxed_local()); self.is_pending.insert(specifier); } } @@ -759,7 +759,7 @@ mod tests { ]) ); } - .boxed(); + .boxed_local(); futures::executor::block_on(fut); } @@ -818,7 +818,7 @@ mod tests { Some(redirect3_id) ); } - .boxed(); + .boxed_local(); futures::executor::block_on(fut); } @@ -846,7 +846,8 @@ mod tests { let loads = loader.loads.clone(); let mut isolate = EsIsolate::new(Box::new(loader), StartupData::None, false); - let mut recursive_load = isolate.load_module("/main.js", None).boxed(); + let mut recursive_load = + isolate.load_module("/main.js", None).boxed_local(); let result = recursive_load.poll_unpin(&mut cx); assert!(result.is_pending()); @@ -891,7 +892,8 @@ mod tests { let loader = MockLoader::new(); let mut isolate = EsIsolate::new(Box::new(loader), StartupData::None, false); - let mut load_fut = isolate.load_module("/bad_import.js", None).boxed(); + let mut load_fut = + isolate.load_module("/bad_import.js", None).boxed_local(); let result = load_fut.poll_unpin(&mut cx); if let Poll::Ready(Err(err)) = result { assert_eq!( @@ -924,7 +926,7 @@ mod tests { // The behavior should be very similar to /a.js. let main_id_fut = isolate .load_module("/main_with_code.js", Some(MAIN_WITH_CODE_SRC.to_owned())) - .boxed(); + .boxed_local(); let main_id = futures::executor::block_on(main_id_fut).expect("Failed to load"); diff --git a/core/ops.rs b/core/ops.rs index f1798a398..266a36648 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -10,11 +10,10 @@ pub type OpId = u32; pub type Buf = Box<[u8]>; -pub type OpAsyncFuture<E> = - Pin<Box<dyn Future<Output = Result<Buf, E>> + Send>>; +pub type OpAsyncFuture<E> = Pin<Box<dyn Future<Output = Result<Buf, E>>>>; pub(crate) type PendingOpFuture = - Pin<Box<dyn Future<Output = Result<(OpId, Buf), CoreError>> + Send>>; + Pin<Box<dyn Future<Output = Result<(OpId, Buf), CoreError>>>>; pub type OpResult<E> = Result<Op<E>, E>; @@ -31,8 +30,7 @@ pub type CoreError = (); pub type CoreOp = Op<CoreError>; /// Main type describing op -pub type OpDispatcher = - dyn Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp + Send + Sync + 'static; +pub type OpDispatcher = dyn Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp + 'static; #[derive(Default)] pub struct OpRegistry { @@ -53,7 +51,7 @@ impl OpRegistry { pub fn register<F>(&self, name: &str, op: F) -> OpId where - F: Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp + Send + Sync + 'static, + F: Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp + 'static, { let mut lock = self.dispatchers.write().unwrap(); let op_id = lock.len() as u32; diff --git a/core/plugins.rs b/core/plugins.rs index 14cfce307..edb675120 100644 --- a/core/plugins.rs +++ b/core/plugins.rs @@ -7,9 +7,7 @@ pub trait PluginInitContext { fn register_op( &mut self, name: &str, - op: Box< - dyn Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp + Send + Sync + 'static, - >, + op: Box<dyn Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp + 'static>, ); } |