diff options
author | Ryan Dahl <ry@tinyclouds.org> | 2020-10-10 05:41:11 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-10 11:41:11 +0200 |
commit | 08bb8b3d53eb2445de9b5e2845ab8acf9d353800 (patch) | |
tree | ebf00cb815ee1a10be00c74cbb332af33dd52dc2 /core | |
parent | 782e6a2ed5d76bb5a154c56d7daf4607e5bdb93f (diff) |
Fix 100% CPU idling problem by reverting #7672 (#7911)
* Revert "refactor: Worker is not a Future (#7895)"
This reverts commit f4357f0ff9d39411f22504fcc20db6bd5dec6ddb.
* Revert "refactor(core): JsRuntime is not a Future (#7855)"
This reverts commit d8879feb8c832dbb38649551b1cb0730874f7be6.
* Revert "fix(core): module execution with top level await (#7672)"
This reverts commit c7c767782538243ded64742dca9b34d6af74d62d.
Diffstat (limited to 'core')
-rw-r--r-- | core/README.md | 9 | ||||
-rw-r--r-- | core/examples/http_bench_bin_ops.rs | 2 | ||||
-rw-r--r-- | core/examples/http_bench_json_ops.rs | 2 | ||||
-rw-r--r-- | core/modules.rs | 15 | ||||
-rw-r--r-- | core/runtime.rs | 352 |
5 files changed, 70 insertions, 310 deletions
diff --git a/core/README.md b/core/README.md index 2438ecede..f6b429bb8 100644 --- a/core/README.md +++ b/core/README.md @@ -9,12 +9,9 @@ bindings. This Rust crate contains the essential V8 bindings for Deno's command-line interface (Deno CLI). The main abstraction here is the JsRuntime which provides -a way to execute JavaScript. - -The JsRuntime implements an event loop abstraction for the executed code that -keeps track of all pending tasks (async ops, dynamic module loads). It is user's -responsibility to drive that loop by using `JsRuntime::run_event_loop` method - -it must be executed in the context of Rust's future executor (eg. tokio, smol). +a way to execute JavaScript. The JsRuntime is modeled as a +`Future<Item=(), Error=JsError>` which completes once all of its ops have +completed. In order to bind Rust functions into JavaScript, use the `Deno.core.dispatch()` function to trigger the "dispatch" callback in Rust. The user is responsible for diff --git a/core/examples/http_bench_bin_ops.rs b/core/examples/http_bench_bin_ops.rs index 7335b8670..8d612f146 100644 --- a/core/examples/http_bench_bin_ops.rs +++ b/core/examples/http_bench_bin_ops.rs @@ -260,7 +260,7 @@ fn main() { include_str!("http_bench_bin_ops.js"), ) .unwrap(); - js_runtime.run_event_loop().await + js_runtime.await }; runtime.block_on(future).unwrap(); } diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs index 2cf3d09e3..106b96f36 100644 --- a/core/examples/http_bench_json_ops.rs +++ b/core/examples/http_bench_json_ops.rs @@ -193,7 +193,7 @@ fn main() { include_str!("http_bench_json_ops.js"), ) .unwrap(); - js_runtime.run_event_loop().await + js_runtime.await }; runtime.block_on(future).unwrap(); } diff --git a/core/modules.rs b/core/modules.rs index 1038dd84f..130becab8 100644 --- a/core/modules.rs +++ b/core/modules.rs @@ -341,13 +341,6 @@ pub struct ModuleInfo { pub name: String, pub handle: v8::Global<v8::Module>, pub import_specifiers: Vec<ModuleSpecifier>, - // TODO(bartlomieju): there should be "state" - // field that describes if module is already being loaded, - // so concurent dynamic imports don't introduce dead lock - // pub state: LoadState { - // Loading(shared_future), - // Loaded, - // }, } /// A symbolic module entity. @@ -674,7 +667,7 @@ mod tests { let a_id_fut = runtime.load_module(&spec, None); let a_id = futures::executor::block_on(a_id_fut).expect("Failed to load"); - futures::executor::block_on(runtime.mod_evaluate(a_id)).unwrap(); + runtime.mod_evaluate(a_id).unwrap(); let l = loads.lock().unwrap(); assert_eq!( l.to_vec(), @@ -741,7 +734,7 @@ mod tests { let result = runtime.load_module(&spec, None).await; assert!(result.is_ok()); let circular1_id = result.unwrap(); - runtime.mod_evaluate(circular1_id).await.unwrap(); + runtime.mod_evaluate(circular1_id).unwrap(); let l = loads.lock().unwrap(); assert_eq!( @@ -818,7 +811,7 @@ mod tests { println!(">> result {:?}", result); assert!(result.is_ok()); let redirect1_id = result.unwrap(); - runtime.mod_evaluate(redirect1_id).await.unwrap(); + runtime.mod_evaluate(redirect1_id).unwrap(); let l = loads.lock().unwrap(); assert_eq!( l.to_vec(), @@ -968,7 +961,7 @@ mod tests { let main_id = futures::executor::block_on(main_id_fut).expect("Failed to load"); - futures::executor::block_on(runtime.mod_evaluate(main_id)).unwrap(); + runtime.mod_evaluate(main_id).unwrap(); let l = loads.lock().unwrap(); assert_eq!( diff --git a/core/runtime.rs b/core/runtime.rs index f04788d4e..193e33420 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -23,8 +23,6 @@ use crate::shared_queue::SharedQueue; use crate::shared_queue::RECOMMENDED_SIZE; use crate::BufVec; use crate::OpState; -use futures::channel::mpsc; -use futures::future::poll_fn; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use futures::stream::StreamFuture; @@ -84,11 +82,6 @@ pub struct JsRuntime { allocations: IsolateAllocations, } -type DynImportModEvaluate = - (ModuleId, v8::Global<v8::Promise>, v8::Global<v8::Module>); -type ModEvaluate = - (v8::Global<v8::Promise>, mpsc::Sender<Result<(), AnyError>>); - /// Internal state for JsRuntime which is stored in one of v8::Isolate's /// embedder slots. pub(crate) struct JsRuntimeState { @@ -97,8 +90,6 @@ pub(crate) struct JsRuntimeState { pub(crate) js_recv_cb: Option<v8::Global<v8::Function>>, pub(crate) js_macrotask_cb: Option<v8::Global<v8::Function>>, pub(crate) pending_promise_exceptions: HashMap<i32, v8::Global<v8::Value>>, - pub(crate) pending_dyn_mod_evaluate: HashMap<i32, DynImportModEvaluate>, - pub(crate) pending_mod_evaluate: HashMap<ModuleId, ModEvaluate>, pub(crate) js_error_create_fn: Box<JsErrorCreateFn>, pub(crate) shared: SharedQueue, pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>, @@ -272,8 +263,6 @@ impl JsRuntime { isolate.set_slot(Rc::new(RefCell::new(JsRuntimeState { global_context: Some(global_context), pending_promise_exceptions: HashMap::new(), - pending_dyn_mod_evaluate: HashMap::new(), - pending_mod_evaluate: HashMap::new(), shared_ab: None, js_recv_cb: None, js_macrotask_cb: None, @@ -453,51 +442,50 @@ impl JsRuntime { .remove_near_heap_limit_callback(cb, heap_limit); } } +} - /// Runs event loop to completion - /// - /// This future resolves when: - /// - there are no more pending dynamic imports - /// - there are no more pending ops - pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { - poll_fn(|cx| self.poll_event_loop(cx)).await - } +extern "C" fn near_heap_limit_callback<F>( + data: *mut c_void, + current_heap_limit: usize, + initial_heap_limit: usize, +) -> usize +where + F: FnMut(usize, usize) -> usize, +{ + let callback = unsafe { &mut *(data as *mut F) }; + callback(current_heap_limit, initial_heap_limit) +} - /// Runs a single tick of event loop - pub fn poll_event_loop( - &mut self, - cx: &mut Context, - ) -> Poll<Result<(), AnyError>> { - self.shared_init(); +impl Future for JsRuntime { + type Output = Result<(), AnyError>; - let state_rc = Self::state(self.v8_isolate()); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let runtime = self.get_mut(); + runtime.shared_init(); + + let state_rc = Self::state(runtime.v8_isolate()); { let state = state_rc.borrow(); state.waker.register(cx.waker()); } - // Top level modules - self.evaluate_pending_modules()?; - // Dynamic module loading - ie. modules loaded using "import()" { - let poll_imports = self.prepare_dyn_imports(cx)?; + let poll_imports = runtime.prepare_dyn_imports(cx)?; assert!(poll_imports.is_ready()); - let poll_imports = self.poll_dyn_imports(cx)?; + let poll_imports = runtime.poll_dyn_imports(cx)?; assert!(poll_imports.is_ready()); - self.evaluate_dyn_imports()?; - - self.check_promise_exceptions()?; + runtime.check_promise_exceptions()?; } // Ops { - let overflow_response = self.poll_pending_ops(cx); - self.async_op_response(overflow_response)?; - self.drain_macrotasks()?; - self.check_promise_exceptions()?; + let overflow_response = runtime.poll_pending_ops(cx); + runtime.async_op_response(overflow_response)?; + runtime.drain_macrotasks()?; + runtime.check_promise_exceptions()?; } let state = state_rc.borrow(); @@ -505,8 +493,6 @@ impl JsRuntime { state.pending_ops.is_empty() && state.pending_dyn_imports.is_empty() && state.preparing_dyn_imports.is_empty() - && state.pending_dyn_mod_evaluate.is_empty() - && state.pending_mod_evaluate.is_empty() }; if is_idle { @@ -523,18 +509,6 @@ impl JsRuntime { } } -extern "C" fn near_heap_limit_callback<F>( - data: *mut c_void, - current_heap_limit: usize, - initial_heap_limit: usize, -) -> usize -where - F: FnMut(usize, usize) -> usize, -{ - let callback = unsafe { &mut *(data as *mut F) }; - callback(current_heap_limit, initial_heap_limit) -} - impl JsRuntimeState { // Called by V8 during `Isolate::mod_instantiate`. pub fn module_resolve_cb( @@ -711,93 +685,7 @@ impl JsRuntime { /// `AnyError` can be downcast to a type that exposes additional information /// about the V8 exception. By default this type is `JsError`, however it may /// be a different type if `RuntimeOptions::js_error_create_fn` has been set. - pub fn dyn_mod_evaluate( - &mut self, - load_id: ModuleLoadId, - id: ModuleId, - ) -> Result<(), AnyError> { - self.shared_init(); - - let state_rc = Self::state(self.v8_isolate()); - let context = self.global_context(); - let context1 = self.global_context(); - - let module_handle = state_rc - .borrow() - .modules - .get_info(id) - .expect("ModuleInfo not found") - .handle - .clone(); - - let status = { - let scope = - &mut v8::HandleScope::with_context(self.v8_isolate(), context); - let module = module_handle.get(scope); - module.get_status() - }; - - if status == v8::ModuleStatus::Instantiated { - // IMPORTANT: Top-level-await is enabled, which means that return value - // of module evaluation is a promise. - // - // Because that promise is created internally by V8, when error occurs during - // module evaluation the promise is rejected, and since the promise has no rejection - // handler it will result in call to `bindings::promise_reject_callback` adding - // the promise to pending promise rejection table - meaning JsRuntime will return - // error on next poll(). - // - // This situation is not desirable as we want to manually return error at the - // end of this function to handle it further. It means we need to manually - // remove this promise from pending promise rejection table. - // - // For more details see: - // https://github.com/denoland/deno/issues/4908 - // https://v8.dev/features/top-level-await#module-execution-order - let scope = - &mut v8::HandleScope::with_context(self.v8_isolate(), context1); - let module = v8::Local::new(scope, &module_handle); - let maybe_value = module.evaluate(scope); - - // Update status after evaluating. - let status = module.get_status(); - - if let Some(value) = maybe_value { - assert!( - status == v8::ModuleStatus::Evaluated - || status == v8::ModuleStatus::Errored - ); - let promise = v8::Local::<v8::Promise>::try_from(value) - .expect("Expected to get promise as module evaluation result"); - let promise_id = promise.get_identity_hash(); - let mut state = state_rc.borrow_mut(); - state.pending_promise_exceptions.remove(&promise_id); - let promise_global = v8::Global::new(scope, promise); - let module_global = v8::Global::new(scope, module); - state - .pending_dyn_mod_evaluate - .insert(load_id, (id, promise_global, module_global)); - } else { - assert!(status == v8::ModuleStatus::Errored); - } - } - - if status == v8::ModuleStatus::Evaluated { - self.dyn_import_done(load_id, id)?; - } - - Ok(()) - } - - /// Evaluates an already instantiated ES module. - /// - /// `AnyError` can be downcast to a type that exposes additional information - /// about the V8 exception. By default this type is `JsError`, however it may - /// be a different type if `RuntimeOptions::js_error_create_fn` has been set. - fn mod_evaluate_inner( - &mut self, - id: ModuleId, - ) -> Result<mpsc::Receiver<Result<(), AnyError>>, AnyError> { + pub fn mod_evaluate(&mut self, id: ModuleId) -> Result<(), AnyError> { self.shared_init(); let state_rc = Self::state(self.v8_isolate()); @@ -813,8 +701,6 @@ impl JsRuntime { .expect("ModuleInfo not found"); let mut status = module.get_status(); - let (sender, receiver) = mpsc::channel(1); - if status == v8::ModuleStatus::Instantiated { // IMPORTANT: Top-level-await is enabled, which means that return value // of module evaluation is a promise. @@ -847,30 +733,20 @@ impl JsRuntime { let promise_id = promise.get_identity_hash(); let mut state = state_rc.borrow_mut(); state.pending_promise_exceptions.remove(&promise_id); - let promise_global = v8::Global::new(scope, promise); - state - .pending_mod_evaluate - .insert(id, (promise_global, sender)); } else { assert!(status == v8::ModuleStatus::Errored); } } - Ok(receiver) - } - - pub async fn mod_evaluate(&mut self, id: ModuleId) -> Result<(), AnyError> { - let mut receiver = self.mod_evaluate_inner(id)?; - - poll_fn(|cx| { - if let Poll::Ready(result) = receiver.poll_next_unpin(cx) { - debug!("received module evaluate"); - return Poll::Ready(result.unwrap()); + match status { + v8::ModuleStatus::Evaluated => Ok(()), + v8::ModuleStatus::Errored => { + let exception = module.get_exception(); + exception_to_err_result(scope, exception) + .map_err(|err| attach_handle_to_error(scope, err, exception)) } - let _r = self.poll_event_loop(cx)?; - Poll::Pending - }) - .await + other => panic!("Unexpected module status {:?}", other), + } } fn dyn_import_error( @@ -1031,122 +907,16 @@ impl JsRuntime { // Load is done. let module_id = load.root_module_id.unwrap(); self.mod_instantiate(module_id)?; - self.dyn_mod_evaluate(dyn_import_id, module_id)?; + match self.mod_evaluate(module_id) { + Ok(()) => self.dyn_import_done(dyn_import_id, module_id)?, + Err(err) => self.dyn_import_error(dyn_import_id, err)?, + }; } } } } } - fn evaluate_pending_modules(&mut self) -> Result<(), AnyError> { - let state_rc = Self::state(self.v8_isolate()); - - let context = self.global_context(); - { - let scope = - &mut v8::HandleScope::with_context(self.v8_isolate(), context); - - let mut state = state_rc.borrow_mut(); - - if let Some(&module_id) = state.pending_mod_evaluate.keys().next() { - let handle = state.pending_mod_evaluate.remove(&module_id).unwrap(); - drop(state); - - let promise = handle.0.get(scope); - let mut sender = handle.1.clone(); - - let promise_state = promise.state(); - - match promise_state { - v8::PromiseState::Pending => { - state_rc - .borrow_mut() - .pending_mod_evaluate - .insert(module_id, handle); - state_rc.borrow().waker.wake(); - } - v8::PromiseState::Fulfilled => { - sender.try_send(Ok(())).unwrap(); - } - v8::PromiseState::Rejected => { - let exception = promise.result(scope); - let err1 = exception_to_err_result::<()>(scope, exception) - .map_err(|err| attach_handle_to_error(scope, err, exception)) - .unwrap_err(); - sender.try_send(Err(err1)).unwrap(); - } - } - } - }; - - Ok(()) - } - - fn evaluate_dyn_imports(&mut self) -> Result<(), AnyError> { - let state_rc = Self::state(self.v8_isolate()); - - loop { - let context = self.global_context(); - let maybe_result = { - let scope = - &mut v8::HandleScope::with_context(self.v8_isolate(), context); - - let mut state = state_rc.borrow_mut(); - if let Some(&dyn_import_id) = - state.pending_dyn_mod_evaluate.keys().next() - { - let handle = state - .pending_dyn_mod_evaluate - .remove(&dyn_import_id) - .unwrap(); - drop(state); - - let module_id = handle.0; - let promise = handle.1.get(scope); - let _module = handle.2.get(scope); - - let promise_state = promise.state(); - - match promise_state { - v8::PromiseState::Pending => { - state_rc - .borrow_mut() - .pending_dyn_mod_evaluate - .insert(dyn_import_id, handle); - state_rc.borrow().waker.wake(); - None - } - v8::PromiseState::Fulfilled => Some(Ok((dyn_import_id, module_id))), - v8::PromiseState::Rejected => { - let exception = promise.result(scope); - let err1 = exception_to_err_result::<()>(scope, exception) - .map_err(|err| attach_handle_to_error(scope, err, exception)) - .unwrap_err(); - Some(Err((dyn_import_id, err1))) - } - } - } else { - None - } - }; - - if let Some(result) = maybe_result { - match result { - Ok((dyn_import_id, module_id)) => { - self.dyn_import_done(dyn_import_id, module_id)?; - } - Err((dyn_import_id, err1)) => { - self.dyn_import_error(dyn_import_id, err1)?; - } - } - } else { - break; - } - } - - Ok(()) - } - fn register_during_load( &mut self, info: ModuleSource, @@ -1445,13 +1215,13 @@ pub mod tests { futures::executor::block_on(lazy(move |cx| f(cx))); } - fn poll_until_ready( - runtime: &mut JsRuntime, - max_poll_count: usize, - ) -> Result<(), AnyError> { + fn poll_until_ready<F>(future: &mut F, max_poll_count: usize) -> F::Output + where + F: Future + Unpin, + { let mut cx = Context::from_waker(futures::task::noop_waker_ref()); for _ in 0..max_poll_count { - match runtime.poll_event_loop(&mut cx) { + match future.poll_unpin(&mut cx) { Poll::Pending => continue, Poll::Ready(val) => return val, } @@ -1667,7 +1437,7 @@ pub mod tests { ) .unwrap(); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); - assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_)))); + assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); runtime .execute( @@ -1680,11 +1450,11 @@ pub mod tests { ) .unwrap(); assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); - assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_)))); + assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); runtime.execute("check3.js", "assert(nrecv == 2)").unwrap(); assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); // We are idle, so the next poll should be the last. - assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_)))); + assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); }); } @@ -1708,7 +1478,7 @@ pub mod tests { assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); // The above op never finish, but runtime can finish // because the op is an unreffed async op. - assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_)))); + assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); }) } @@ -1838,7 +1608,7 @@ pub mod tests { ) .unwrap(); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); - assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_)))); + assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); runtime .execute("check.js", "assert(asyncRecv == 1);") .unwrap(); @@ -1930,7 +1700,7 @@ pub mod tests { "#, ) .unwrap(); - if let Poll::Ready(Err(_)) = runtime.poll_event_loop(&mut cx) { + if let Poll::Ready(Err(_)) = runtime.poll_unpin(&mut cx) { unreachable!(); } }); @@ -1943,7 +1713,7 @@ pub mod tests { runtime .execute("core_test.js", include_str!("core_test.js")) .unwrap(); - if let Poll::Ready(Err(_)) = runtime.poll_event_loop(&mut cx) { + if let Poll::Ready(Err(_)) = runtime.poll_unpin(&mut cx) { unreachable!(); } }); @@ -1969,7 +1739,7 @@ pub mod tests { include_str!("encode_decode_test.js"), ) .unwrap(); - if let Poll::Ready(Err(_)) = runtime.poll_event_loop(&mut cx) { + if let Poll::Ready(Err(_)) = runtime.poll_unpin(&mut cx) { unreachable!(); } }); @@ -2218,7 +1988,7 @@ pub mod tests { runtime.mod_instantiate(mod_a).unwrap(); assert_eq!(dispatch_count.load(Ordering::Relaxed), 0); - runtime.mod_evaluate_inner(mod_a).unwrap(); + runtime.mod_evaluate(mod_a).unwrap(); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); } @@ -2277,7 +2047,7 @@ pub mod tests { assert_eq!(count.load(Ordering::Relaxed), 0); // We should get an error here. - let result = runtime.poll_event_loop(cx); + let result = runtime.poll_unpin(cx); if let Poll::Ready(Ok(_)) = result { unreachable!(); } @@ -2370,14 +2140,14 @@ pub mod tests { .unwrap(); // First poll runs `prepare_load` hook. - assert!(matches!(runtime.poll_event_loop(cx), Poll::Pending)); + assert!(matches!(runtime.poll_unpin(cx), Poll::Pending)); assert_eq!(prepare_load_count.load(Ordering::Relaxed), 1); // Second poll actually loads modules into the isolate. - assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_)))); + assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); assert_eq!(resolve_count.load(Ordering::Relaxed), 4); assert_eq!(load_count.load(Ordering::Relaxed), 2); - assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_)))); + assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); assert_eq!(resolve_count.load(Ordering::Relaxed), 4); assert_eq!(load_count.load(Ordering::Relaxed), 2); }) @@ -2409,10 +2179,10 @@ pub mod tests { ) .unwrap(); // First poll runs `prepare_load` hook. - let _ = runtime.poll_event_loop(cx); + let _ = runtime.poll_unpin(cx); assert_eq!(prepare_load_count.load(Ordering::Relaxed), 1); // Second poll triggers error - let _ = runtime.poll_event_loop(cx); + let _ = runtime.poll_unpin(cx); }) } @@ -2461,7 +2231,7 @@ pub mod tests { ) .unwrap(); - futures::executor::block_on(runtime.mod_evaluate(module_id)).unwrap(); + runtime.mod_evaluate(module_id).unwrap(); let _snapshot = runtime.snapshot(); } @@ -2543,7 +2313,7 @@ main(); at async error_async_stack.js:10:5 "#; - match runtime.poll_event_loop(cx) { + match runtime.poll_unpin(cx) { Poll::Ready(Err(e)) => { assert_eq!(e.to_string(), expected_error); } |