summaryrefslogtreecommitdiff
path: root/cli/worker.rs
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2020-10-10 05:41:11 -0400
committerGitHub <noreply@github.com>2020-10-10 11:41:11 +0200
commit08bb8b3d53eb2445de9b5e2845ab8acf9d353800 (patch)
treeebf00cb815ee1a10be00c74cbb332af33dd52dc2 /cli/worker.rs
parent782e6a2ed5d76bb5a154c56d7daf4607e5bdb93f (diff)
Fix 100% CPU idling problem by reverting #7672 (#7911)
* Revert "refactor: Worker is not a Future (#7895)" This reverts commit f4357f0ff9d39411f22504fcc20db6bd5dec6ddb. * Revert "refactor(core): JsRuntime is not a Future (#7855)" This reverts commit d8879feb8c832dbb38649551b1cb0730874f7be6. * Revert "fix(core): module execution with top level await (#7672)" This reverts commit c7c767782538243ded64742dca9b34d6af74d62d.
Diffstat (limited to 'cli/worker.rs')
-rw-r--r--cli/worker.rs239
1 files changed, 121 insertions, 118 deletions
diff --git a/cli/worker.rs b/cli/worker.rs
index 4af363825..20832016a 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -3,7 +3,6 @@
use crate::fmt_errors::JsError;
use crate::global_state::GlobalState;
use crate::inspector::DenoInspector;
-use crate::inspector::InspectorSession;
use crate::js;
use crate::metrics::Metrics;
use crate::ops;
@@ -12,7 +11,6 @@ use crate::permissions::Permissions;
use crate::state::CliModuleLoader;
use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc;
-use deno_core::futures::future::poll_fn;
use deno_core::futures::future::FutureExt;
use deno_core::futures::stream::StreamExt;
use deno_core::futures::task::AtomicWaker;
@@ -24,8 +22,10 @@ use deno_core::ModuleSpecifier;
use deno_core::RuntimeOptions;
use deno_core::Snapshot;
use std::env;
+use std::future::Future;
use std::ops::Deref;
use std::ops::DerefMut;
+use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
@@ -95,15 +95,13 @@ fn create_channels() -> (WorkerChannelsInternal, WorkerHandle) {
/// - `MainWorker`
/// - `WebWorker`
pub struct Worker {
- external_channels: WorkerHandle,
- inspector: Option<Box<DenoInspector>>,
- // Following fields are pub because they are accessed
- // when creating a new WebWorker instance.
+ pub name: String,
+ pub js_runtime: JsRuntime,
+ pub inspector: Option<Box<DenoInspector>>,
+ pub waker: AtomicWaker,
pub(crate) internal_channels: WorkerChannelsInternal,
- pub(crate) js_runtime: JsRuntime,
- pub(crate) name: String,
+ external_channels: WorkerHandle,
should_break_on_first_statement: bool,
- waker: AtomicWaker,
}
impl Worker {
@@ -149,13 +147,13 @@ impl Worker {
let (internal_channels, external_channels) = create_channels();
Self {
- external_channels,
+ name,
+ js_runtime,
inspector,
+ waker: AtomicWaker::new(),
internal_channels,
- js_runtime,
- name,
+ external_channels,
should_break_on_first_statement,
- waker: AtomicWaker::new(),
}
}
@@ -191,7 +189,7 @@ impl Worker {
) -> Result<(), AnyError> {
let id = self.preload_module(module_specifier).await?;
self.wait_for_inspector_session();
- self.js_runtime.mod_evaluate(id).await
+ self.js_runtime.mod_evaluate(id)
}
/// Loads, instantiates and executes provided source code
@@ -206,7 +204,7 @@ impl Worker {
.load_module(module_specifier, Some(code))
.await?;
self.wait_for_inspector_session();
- self.js_runtime.mod_evaluate(id).await
+ self.js_runtime.mod_evaluate(id)
}
/// Returns a way to communicate with the Worker from other threads.
@@ -223,35 +221,39 @@ impl Worker {
.wait_for_session_and_break_on_next_statement()
}
}
+}
- /// Create new inspector session. This function panics if Worker
- /// was not configured to create inspector.
- pub fn create_inspector_session(&mut self) -> Box<InspectorSession> {
- let inspector = self.inspector.as_mut().unwrap();
-
- InspectorSession::new(&mut **inspector)
+impl Drop for Worker {
+ fn drop(&mut self) {
+ // The Isolate object must outlive the Inspector object, but this is
+ // currently not enforced by the type system.
+ self.inspector.take();
}
+}
+
+impl Future for Worker {
+ type Output = Result<(), AnyError>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
- pub fn poll_event_loop(
- &mut self,
- cx: &mut Context,
- ) -> Poll<Result<(), AnyError>> {
// We always poll the inspector if it exists.
- let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx));
- self.waker.register(cx.waker());
- self.js_runtime.poll_event_loop(cx)
+ let _ = inner.inspector.as_mut().map(|i| i.poll_unpin(cx));
+ inner.waker.register(cx.waker());
+ inner.js_runtime.poll_unpin(cx)
}
+}
- pub async fn run_event_loop(&mut self) -> Result<(), AnyError> {
- poll_fn(|cx| self.poll_event_loop(cx)).await
+impl Deref for Worker {
+ type Target = JsRuntime;
+ fn deref(&self) -> &Self::Target {
+ &self.js_runtime
}
}
-impl Drop for Worker {
- fn drop(&mut self) {
- // The Isolate object must outlive the Inspector object, but this is
- // currently not enforced by the type system.
- self.inspector.take();
+impl DerefMut for Worker {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.js_runtime
}
}
@@ -276,46 +278,45 @@ impl MainWorker {
loader,
true,
);
- let js_runtime = &mut worker.js_runtime;
{
// All ops registered in this function depend on these
{
- let op_state = js_runtime.op_state();
+ let op_state = worker.op_state();
let mut op_state = op_state.borrow_mut();
op_state.put::<Metrics>(Default::default());
op_state.put::<Arc<GlobalState>>(global_state.clone());
op_state.put::<Permissions>(global_state.permissions.clone());
}
- ops::runtime::init(js_runtime, main_module);
- ops::fetch::init(js_runtime, global_state.flags.ca_file.as_deref());
- ops::timers::init(js_runtime);
- ops::worker_host::init(js_runtime);
- ops::random::init(js_runtime, global_state.flags.seed);
- ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close);
- ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources);
+ ops::runtime::init(&mut worker, main_module);
+ ops::fetch::init(&mut worker, global_state.flags.ca_file.as_deref());
+ ops::timers::init(&mut worker);
+ ops::worker_host::init(&mut worker);
+ ops::random::init(&mut worker, global_state.flags.seed);
+ ops::reg_json_sync(&mut worker, "op_close", deno_core::op_close);
+ ops::reg_json_sync(&mut worker, "op_resources", deno_core::op_resources);
ops::reg_json_sync(
- js_runtime,
+ &mut worker,
"op_domain_to_ascii",
deno_web::op_domain_to_ascii,
);
- ops::errors::init(js_runtime);
- ops::fs_events::init(js_runtime);
- ops::fs::init(js_runtime);
- ops::io::init(js_runtime);
- ops::net::init(js_runtime);
- ops::os::init(js_runtime);
- ops::permissions::init(js_runtime);
- ops::plugin::init(js_runtime);
- ops::process::init(js_runtime);
- ops::runtime_compiler::init(js_runtime);
- ops::signal::init(js_runtime);
- ops::tls::init(js_runtime);
- ops::tty::init(js_runtime);
- ops::websocket::init(js_runtime);
+ ops::errors::init(&mut worker);
+ ops::fs_events::init(&mut worker);
+ ops::fs::init(&mut worker);
+ ops::io::init(&mut worker);
+ ops::net::init(&mut worker);
+ ops::os::init(&mut worker);
+ ops::permissions::init(&mut worker);
+ ops::plugin::init(&mut worker);
+ ops::process::init(&mut worker);
+ ops::runtime_compiler::init(&mut worker);
+ ops::signal::init(&mut worker);
+ ops::tls::init(&mut worker);
+ ops::tty::init(&mut worker);
+ ops::websocket::init(&mut worker);
}
{
- let op_state = js_runtime.op_state();
+ let op_state = worker.op_state();
let mut op_state = op_state.borrow_mut();
let t = &mut op_state.resource_table;
let (stdin, stdout, stderr) = get_stdio();
@@ -448,45 +449,49 @@ impl WebWorker {
{
let handle = web_worker.thread_safe_handle();
let sender = web_worker.worker.internal_channels.sender.clone();
- let js_runtime = &mut web_worker.js_runtime;
+
// All ops registered in this function depend on these
{
- let op_state = js_runtime.op_state();
+ let op_state = web_worker.op_state();
let mut op_state = op_state.borrow_mut();
op_state.put::<Metrics>(Default::default());
op_state.put::<Arc<GlobalState>>(global_state.clone());
op_state.put::<Permissions>(permissions);
}
- ops::web_worker::init(js_runtime, sender, handle);
- ops::runtime::init(js_runtime, main_module);
- ops::fetch::init(js_runtime, global_state.flags.ca_file.as_deref());
- ops::timers::init(js_runtime);
- ops::worker_host::init(js_runtime);
- ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close);
- ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources);
+ ops::web_worker::init(&mut web_worker, sender, handle);
+ ops::runtime::init(&mut web_worker, main_module);
+ ops::fetch::init(&mut web_worker, global_state.flags.ca_file.as_deref());
+ ops::timers::init(&mut web_worker);
+ ops::worker_host::init(&mut web_worker);
+ ops::reg_json_sync(&mut web_worker, "op_close", deno_core::op_close);
+ ops::reg_json_sync(
+ &mut web_worker,
+ "op_resources",
+ deno_core::op_resources,
+ );
ops::reg_json_sync(
- js_runtime,
+ &mut web_worker,
"op_domain_to_ascii",
deno_web::op_domain_to_ascii,
);
- ops::errors::init(js_runtime);
- ops::io::init(js_runtime);
- ops::websocket::init(js_runtime);
+ ops::errors::init(&mut web_worker);
+ ops::io::init(&mut web_worker);
+ ops::websocket::init(&mut web_worker);
if has_deno_namespace {
- ops::fs_events::init(js_runtime);
- ops::fs::init(js_runtime);
- ops::net::init(js_runtime);
- ops::os::init(js_runtime);
- ops::permissions::init(js_runtime);
- ops::plugin::init(js_runtime);
- ops::process::init(js_runtime);
- ops::random::init(js_runtime, global_state.flags.seed);
- ops::runtime_compiler::init(js_runtime);
- ops::signal::init(js_runtime);
- ops::tls::init(js_runtime);
- ops::tty::init(js_runtime);
+ ops::fs_events::init(&mut web_worker);
+ ops::fs::init(&mut web_worker);
+ ops::net::init(&mut web_worker);
+ ops::os::init(&mut web_worker);
+ ops::permissions::init(&mut web_worker);
+ ops::plugin::init(&mut web_worker);
+ ops::process::init(&mut web_worker);
+ ops::random::init(&mut web_worker, global_state.flags.seed);
+ ops::runtime_compiler::init(&mut web_worker);
+ ops::signal::init(&mut web_worker);
+ ops::tls::init(&mut web_worker);
+ ops::tty::init(&mut web_worker);
}
}
@@ -499,27 +504,38 @@ impl WebWorker {
pub fn thread_safe_handle(&self) -> WebWorkerHandle {
self.handle.clone()
}
+}
- pub async fn run_event_loop(&mut self) -> Result<(), AnyError> {
- poll_fn(|cx| self.poll_event_loop(cx)).await
+impl Deref for WebWorker {
+ type Target = Worker;
+ fn deref(&self) -> &Self::Target {
+ &self.worker
}
+}
- pub fn poll_event_loop(
- &mut self,
- cx: &mut Context,
- ) -> Poll<Result<(), AnyError>> {
- let worker = &mut self.worker;
+impl DerefMut for WebWorker {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.worker
+ }
+}
- let terminated = self.handle.terminated.load(Ordering::Relaxed);
+impl Future for WebWorker {
+ type Output = Result<(), AnyError>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ let worker = &mut inner.worker;
+
+ let terminated = inner.handle.terminated.load(Ordering::Relaxed);
if terminated {
return Poll::Ready(Ok(()));
}
- if !self.event_loop_idle {
- match worker.poll_event_loop(cx) {
+ if !inner.event_loop_idle {
+ match worker.poll_unpin(cx) {
Poll::Ready(r) => {
- let terminated = self.handle.terminated.load(Ordering::Relaxed);
+ let terminated = inner.handle.terminated.load(Ordering::Relaxed);
if terminated {
return Poll::Ready(Ok(()));
}
@@ -530,13 +546,13 @@ impl WebWorker {
.try_send(WorkerEvent::Error(e))
.expect("Failed to post message to host");
}
- self.event_loop_idle = true;
+ inner.event_loop_idle = true;
}
Poll::Pending => {}
}
}
- if let Poll::Ready(r) = self.terminate_rx.poll_next_unpin(cx) {
+ if let Poll::Ready(r) = inner.terminate_rx.poll_next_unpin(cx) {
// terminate_rx should never be closed
assert!(r.is_some());
return Poll::Ready(Ok(()));
@@ -553,7 +569,7 @@ impl WebWorker {
if let Err(e) = worker.execute(&script) {
// If execution was terminated during message callback then
// just ignore it
- if self.handle.terminated.load(Ordering::Relaxed) {
+ if inner.handle.terminated.load(Ordering::Relaxed) {
return Poll::Ready(Ok(()));
}
@@ -565,7 +581,7 @@ impl WebWorker {
}
// Let event loop be polled again
- self.event_loop_idle = false;
+ inner.event_loop_idle = false;
worker.waker.wake();
}
None => unreachable!(),
@@ -576,19 +592,6 @@ impl WebWorker {
}
}
-impl Deref for WebWorker {
- type Target = Worker;
- fn deref(&self) -> &Self::Target {
- &self.worker
- }
-}
-
-impl DerefMut for WebWorker {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.worker
- }
-}
-
#[cfg(test)]
mod tests {
use super::*;
@@ -625,7 +628,7 @@ mod tests {
if let Err(err) = result {
eprintln!("execute_mod err {:?}", err);
}
- if let Err(e) = worker.run_event_loop().await {
+ if let Err(e) = (&mut *worker).await {
panic!("Future got unexpected error: {:?}", e);
}
}
@@ -643,7 +646,7 @@ mod tests {
if let Err(err) = result {
eprintln!("execute_mod err {:?}", err);
}
- if let Err(e) = worker.run_event_loop().await {
+ if let Err(e) = (&mut *worker).await {
panic!("Future got unexpected error: {:?}", e);
}
}
@@ -662,7 +665,7 @@ mod tests {
if let Err(err) = result {
eprintln!("execute_mod err {:?}", err);
}
- if let Err(e) = worker.run_event_loop().await {
+ if let Err(e) = (&mut *worker).await {
panic!("Future got unexpected error: {:?}", e);
}
}
@@ -730,7 +733,7 @@ mod tests {
worker.execute(source).unwrap();
let handle = worker.thread_safe_handle();
handle_sender.send(handle).unwrap();
- let r = tokio_util::run_basic(worker.run_event_loop());
+ let r = tokio_util::run_basic(worker);
assert!(r.is_ok())
});
@@ -777,7 +780,7 @@ mod tests {
worker.execute("onmessage = () => { close(); }").unwrap();
let handle = worker.thread_safe_handle();
handle_sender.send(handle).unwrap();
- let r = tokio_util::run_basic(worker.run_event_loop());
+ let r = tokio_util::run_basic(worker);
assert!(r.is_ok())
});