diff options
author | Ryan Dahl <ry@tinyclouds.org> | 2020-10-10 05:41:11 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-10 11:41:11 +0200 |
commit | 08bb8b3d53eb2445de9b5e2845ab8acf9d353800 (patch) | |
tree | ebf00cb815ee1a10be00c74cbb332af33dd52dc2 /cli/worker.rs | |
parent | 782e6a2ed5d76bb5a154c56d7daf4607e5bdb93f (diff) |
Fix 100% CPU idling problem by reverting #7672 (#7911)
* Revert "refactor: Worker is not a Future (#7895)"
This reverts commit f4357f0ff9d39411f22504fcc20db6bd5dec6ddb.
* Revert "refactor(core): JsRuntime is not a Future (#7855)"
This reverts commit d8879feb8c832dbb38649551b1cb0730874f7be6.
* Revert "fix(core): module execution with top level await (#7672)"
This reverts commit c7c767782538243ded64742dca9b34d6af74d62d.
Diffstat (limited to 'cli/worker.rs')
-rw-r--r-- | cli/worker.rs | 239 |
1 files changed, 121 insertions, 118 deletions
diff --git a/cli/worker.rs b/cli/worker.rs index 4af363825..20832016a 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -3,7 +3,6 @@ use crate::fmt_errors::JsError; use crate::global_state::GlobalState; use crate::inspector::DenoInspector; -use crate::inspector::InspectorSession; use crate::js; use crate::metrics::Metrics; use crate::ops; @@ -12,7 +11,6 @@ use crate::permissions::Permissions; use crate::state::CliModuleLoader; use deno_core::error::AnyError; use deno_core::futures::channel::mpsc; -use deno_core::futures::future::poll_fn; use deno_core::futures::future::FutureExt; use deno_core::futures::stream::StreamExt; use deno_core::futures::task::AtomicWaker; @@ -24,8 +22,10 @@ use deno_core::ModuleSpecifier; use deno_core::RuntimeOptions; use deno_core::Snapshot; use std::env; +use std::future::Future; use std::ops::Deref; use std::ops::DerefMut; +use std::pin::Pin; use std::rc::Rc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; @@ -95,15 +95,13 @@ fn create_channels() -> (WorkerChannelsInternal, WorkerHandle) { /// - `MainWorker` /// - `WebWorker` pub struct Worker { - external_channels: WorkerHandle, - inspector: Option<Box<DenoInspector>>, - // Following fields are pub because they are accessed - // when creating a new WebWorker instance. + pub name: String, + pub js_runtime: JsRuntime, + pub inspector: Option<Box<DenoInspector>>, + pub waker: AtomicWaker, pub(crate) internal_channels: WorkerChannelsInternal, - pub(crate) js_runtime: JsRuntime, - pub(crate) name: String, + external_channels: WorkerHandle, should_break_on_first_statement: bool, - waker: AtomicWaker, } impl Worker { @@ -149,13 +147,13 @@ impl Worker { let (internal_channels, external_channels) = create_channels(); Self { - external_channels, + name, + js_runtime, inspector, + waker: AtomicWaker::new(), internal_channels, - js_runtime, - name, + external_channels, should_break_on_first_statement, - waker: AtomicWaker::new(), } } @@ -191,7 +189,7 @@ impl Worker { ) -> Result<(), AnyError> { let id = self.preload_module(module_specifier).await?; self.wait_for_inspector_session(); - self.js_runtime.mod_evaluate(id).await + self.js_runtime.mod_evaluate(id) } /// Loads, instantiates and executes provided source code @@ -206,7 +204,7 @@ impl Worker { .load_module(module_specifier, Some(code)) .await?; self.wait_for_inspector_session(); - self.js_runtime.mod_evaluate(id).await + self.js_runtime.mod_evaluate(id) } /// Returns a way to communicate with the Worker from other threads. @@ -223,35 +221,39 @@ impl Worker { .wait_for_session_and_break_on_next_statement() } } +} - /// Create new inspector session. This function panics if Worker - /// was not configured to create inspector. - pub fn create_inspector_session(&mut self) -> Box<InspectorSession> { - let inspector = self.inspector.as_mut().unwrap(); - - InspectorSession::new(&mut **inspector) +impl Drop for Worker { + fn drop(&mut self) { + // The Isolate object must outlive the Inspector object, but this is + // currently not enforced by the type system. + self.inspector.take(); } +} + +impl Future for Worker { + type Output = Result<(), AnyError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); - pub fn poll_event_loop( - &mut self, - cx: &mut Context, - ) -> Poll<Result<(), AnyError>> { // We always poll the inspector if it exists. - let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx)); - self.waker.register(cx.waker()); - self.js_runtime.poll_event_loop(cx) + let _ = inner.inspector.as_mut().map(|i| i.poll_unpin(cx)); + inner.waker.register(cx.waker()); + inner.js_runtime.poll_unpin(cx) } +} - pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { - poll_fn(|cx| self.poll_event_loop(cx)).await +impl Deref for Worker { + type Target = JsRuntime; + fn deref(&self) -> &Self::Target { + &self.js_runtime } } -impl Drop for Worker { - fn drop(&mut self) { - // The Isolate object must outlive the Inspector object, but this is - // currently not enforced by the type system. - self.inspector.take(); +impl DerefMut for Worker { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.js_runtime } } @@ -276,46 +278,45 @@ impl MainWorker { loader, true, ); - let js_runtime = &mut worker.js_runtime; { // All ops registered in this function depend on these { - let op_state = js_runtime.op_state(); + let op_state = worker.op_state(); let mut op_state = op_state.borrow_mut(); op_state.put::<Metrics>(Default::default()); op_state.put::<Arc<GlobalState>>(global_state.clone()); op_state.put::<Permissions>(global_state.permissions.clone()); } - ops::runtime::init(js_runtime, main_module); - ops::fetch::init(js_runtime, global_state.flags.ca_file.as_deref()); - ops::timers::init(js_runtime); - ops::worker_host::init(js_runtime); - ops::random::init(js_runtime, global_state.flags.seed); - ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close); - ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources); + ops::runtime::init(&mut worker, main_module); + ops::fetch::init(&mut worker, global_state.flags.ca_file.as_deref()); + ops::timers::init(&mut worker); + ops::worker_host::init(&mut worker); + ops::random::init(&mut worker, global_state.flags.seed); + ops::reg_json_sync(&mut worker, "op_close", deno_core::op_close); + ops::reg_json_sync(&mut worker, "op_resources", deno_core::op_resources); ops::reg_json_sync( - js_runtime, + &mut worker, "op_domain_to_ascii", deno_web::op_domain_to_ascii, ); - ops::errors::init(js_runtime); - ops::fs_events::init(js_runtime); - ops::fs::init(js_runtime); - ops::io::init(js_runtime); - ops::net::init(js_runtime); - ops::os::init(js_runtime); - ops::permissions::init(js_runtime); - ops::plugin::init(js_runtime); - ops::process::init(js_runtime); - ops::runtime_compiler::init(js_runtime); - ops::signal::init(js_runtime); - ops::tls::init(js_runtime); - ops::tty::init(js_runtime); - ops::websocket::init(js_runtime); + ops::errors::init(&mut worker); + ops::fs_events::init(&mut worker); + ops::fs::init(&mut worker); + ops::io::init(&mut worker); + ops::net::init(&mut worker); + ops::os::init(&mut worker); + ops::permissions::init(&mut worker); + ops::plugin::init(&mut worker); + ops::process::init(&mut worker); + ops::runtime_compiler::init(&mut worker); + ops::signal::init(&mut worker); + ops::tls::init(&mut worker); + ops::tty::init(&mut worker); + ops::websocket::init(&mut worker); } { - let op_state = js_runtime.op_state(); + let op_state = worker.op_state(); let mut op_state = op_state.borrow_mut(); let t = &mut op_state.resource_table; let (stdin, stdout, stderr) = get_stdio(); @@ -448,45 +449,49 @@ impl WebWorker { { let handle = web_worker.thread_safe_handle(); let sender = web_worker.worker.internal_channels.sender.clone(); - let js_runtime = &mut web_worker.js_runtime; + // All ops registered in this function depend on these { - let op_state = js_runtime.op_state(); + let op_state = web_worker.op_state(); let mut op_state = op_state.borrow_mut(); op_state.put::<Metrics>(Default::default()); op_state.put::<Arc<GlobalState>>(global_state.clone()); op_state.put::<Permissions>(permissions); } - ops::web_worker::init(js_runtime, sender, handle); - ops::runtime::init(js_runtime, main_module); - ops::fetch::init(js_runtime, global_state.flags.ca_file.as_deref()); - ops::timers::init(js_runtime); - ops::worker_host::init(js_runtime); - ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close); - ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources); + ops::web_worker::init(&mut web_worker, sender, handle); + ops::runtime::init(&mut web_worker, main_module); + ops::fetch::init(&mut web_worker, global_state.flags.ca_file.as_deref()); + ops::timers::init(&mut web_worker); + ops::worker_host::init(&mut web_worker); + ops::reg_json_sync(&mut web_worker, "op_close", deno_core::op_close); + ops::reg_json_sync( + &mut web_worker, + "op_resources", + deno_core::op_resources, + ); ops::reg_json_sync( - js_runtime, + &mut web_worker, "op_domain_to_ascii", deno_web::op_domain_to_ascii, ); - ops::errors::init(js_runtime); - ops::io::init(js_runtime); - ops::websocket::init(js_runtime); + ops::errors::init(&mut web_worker); + ops::io::init(&mut web_worker); + ops::websocket::init(&mut web_worker); if has_deno_namespace { - ops::fs_events::init(js_runtime); - ops::fs::init(js_runtime); - ops::net::init(js_runtime); - ops::os::init(js_runtime); - ops::permissions::init(js_runtime); - ops::plugin::init(js_runtime); - ops::process::init(js_runtime); - ops::random::init(js_runtime, global_state.flags.seed); - ops::runtime_compiler::init(js_runtime); - ops::signal::init(js_runtime); - ops::tls::init(js_runtime); - ops::tty::init(js_runtime); + ops::fs_events::init(&mut web_worker); + ops::fs::init(&mut web_worker); + ops::net::init(&mut web_worker); + ops::os::init(&mut web_worker); + ops::permissions::init(&mut web_worker); + ops::plugin::init(&mut web_worker); + ops::process::init(&mut web_worker); + ops::random::init(&mut web_worker, global_state.flags.seed); + ops::runtime_compiler::init(&mut web_worker); + ops::signal::init(&mut web_worker); + ops::tls::init(&mut web_worker); + ops::tty::init(&mut web_worker); } } @@ -499,27 +504,38 @@ impl WebWorker { pub fn thread_safe_handle(&self) -> WebWorkerHandle { self.handle.clone() } +} - pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { - poll_fn(|cx| self.poll_event_loop(cx)).await +impl Deref for WebWorker { + type Target = Worker; + fn deref(&self) -> &Self::Target { + &self.worker } +} - pub fn poll_event_loop( - &mut self, - cx: &mut Context, - ) -> Poll<Result<(), AnyError>> { - let worker = &mut self.worker; +impl DerefMut for WebWorker { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.worker + } +} - let terminated = self.handle.terminated.load(Ordering::Relaxed); +impl Future for WebWorker { + type Output = Result<(), AnyError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + let worker = &mut inner.worker; + + let terminated = inner.handle.terminated.load(Ordering::Relaxed); if terminated { return Poll::Ready(Ok(())); } - if !self.event_loop_idle { - match worker.poll_event_loop(cx) { + if !inner.event_loop_idle { + match worker.poll_unpin(cx) { Poll::Ready(r) => { - let terminated = self.handle.terminated.load(Ordering::Relaxed); + let terminated = inner.handle.terminated.load(Ordering::Relaxed); if terminated { return Poll::Ready(Ok(())); } @@ -530,13 +546,13 @@ impl WebWorker { .try_send(WorkerEvent::Error(e)) .expect("Failed to post message to host"); } - self.event_loop_idle = true; + inner.event_loop_idle = true; } Poll::Pending => {} } } - if let Poll::Ready(r) = self.terminate_rx.poll_next_unpin(cx) { + if let Poll::Ready(r) = inner.terminate_rx.poll_next_unpin(cx) { // terminate_rx should never be closed assert!(r.is_some()); return Poll::Ready(Ok(())); @@ -553,7 +569,7 @@ impl WebWorker { if let Err(e) = worker.execute(&script) { // If execution was terminated during message callback then // just ignore it - if self.handle.terminated.load(Ordering::Relaxed) { + if inner.handle.terminated.load(Ordering::Relaxed) { return Poll::Ready(Ok(())); } @@ -565,7 +581,7 @@ impl WebWorker { } // Let event loop be polled again - self.event_loop_idle = false; + inner.event_loop_idle = false; worker.waker.wake(); } None => unreachable!(), @@ -576,19 +592,6 @@ impl WebWorker { } } -impl Deref for WebWorker { - type Target = Worker; - fn deref(&self) -> &Self::Target { - &self.worker - } -} - -impl DerefMut for WebWorker { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.worker - } -} - #[cfg(test)] mod tests { use super::*; @@ -625,7 +628,7 @@ mod tests { if let Err(err) = result { eprintln!("execute_mod err {:?}", err); } - if let Err(e) = worker.run_event_loop().await { + if let Err(e) = (&mut *worker).await { panic!("Future got unexpected error: {:?}", e); } } @@ -643,7 +646,7 @@ mod tests { if let Err(err) = result { eprintln!("execute_mod err {:?}", err); } - if let Err(e) = worker.run_event_loop().await { + if let Err(e) = (&mut *worker).await { panic!("Future got unexpected error: {:?}", e); } } @@ -662,7 +665,7 @@ mod tests { if let Err(err) = result { eprintln!("execute_mod err {:?}", err); } - if let Err(e) = worker.run_event_loop().await { + if let Err(e) = (&mut *worker).await { panic!("Future got unexpected error: {:?}", e); } } @@ -730,7 +733,7 @@ mod tests { worker.execute(source).unwrap(); let handle = worker.thread_safe_handle(); handle_sender.send(handle).unwrap(); - let r = tokio_util::run_basic(worker.run_event_loop()); + let r = tokio_util::run_basic(worker); assert!(r.is_ok()) }); @@ -777,7 +780,7 @@ mod tests { worker.execute("onmessage = () => { close(); }").unwrap(); let handle = worker.thread_safe_handle(); handle_sender.send(handle).unwrap(); - let r = tokio_util::run_basic(worker.run_event_loop()); + let r = tokio_util::run_basic(worker); assert!(r.is_ok()) }); |