summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2020-10-06 10:18:22 +0200
committerGitHub <noreply@github.com>2020-10-06 10:18:22 +0200
commitc7c767782538243ded64742dca9b34d6af74d62d (patch)
treee0c4cdaac58f56b09c54476d73f3d5feb419e731
parent40324ff74816a99ea061929ece1c6a4ff3078bc3 (diff)
fix(core): module execution with top level await (#7672)
This commit fixes implementation of top level await in "deno_core". Previously promise returned from module execution was ignored causing to execute modules out-of-order. With this commit promise returned from module execution is stored on "JsRuntime" and event loop is polled until the promise resolves.
-rw-r--r--cli/ops/worker_host.rs7
-rw-r--r--cli/tests/integration_tests.rs10
-rw-r--r--cli/tests/top_level_await_bug.js2
-rw-r--r--cli/tests/top_level_await_bug.out1
-rw-r--r--cli/tests/top_level_await_bug2.js15
-rw-r--r--cli/tests/top_level_await_bug2.out4
-rw-r--r--cli/tests/top_level_await_bug_nested.js5
-rw-r--r--cli/worker.rs4
-rw-r--r--core/modules.rs15
-rw-r--r--core/runtime.rs251
10 files changed, 293 insertions, 21 deletions
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs
index 9175ca0f1..17e0e397f 100644
--- a/cli/ops/worker_host.rs
+++ b/cli/ops/worker_host.rs
@@ -155,6 +155,13 @@ fn run_worker_thread(
if let Err(e) = result {
let mut sender = worker.internal_channels.sender.clone();
+
+ // If sender is closed it means that worker has already been closed from
+ // within using "globalThis.close()"
+ if sender.is_closed() {
+ return;
+ }
+
sender
.try_send(WorkerEvent::TerminalError(e))
.expect("Failed to post message to host");
diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs
index 8e2007b42..9ad7bac8c 100644
--- a/cli/tests/integration_tests.rs
+++ b/cli/tests/integration_tests.rs
@@ -2662,6 +2662,16 @@ itest!(ignore_require {
exit_code: 0,
});
+itest!(top_level_await_bug {
+ args: "run --allow-read top_level_await_bug.js",
+ output: "top_level_await_bug.out",
+});
+
+itest!(top_level_await_bug2 {
+ args: "run --allow-read top_level_await_bug2.js",
+ output: "top_level_await_bug2.out",
+});
+
#[test]
fn cafile_env_fetch() {
use deno_core::url::Url;
diff --git a/cli/tests/top_level_await_bug.js b/cli/tests/top_level_await_bug.js
new file mode 100644
index 000000000..3c6860a5b
--- /dev/null
+++ b/cli/tests/top_level_await_bug.js
@@ -0,0 +1,2 @@
+const mod = await import("./top_level_await_bug_nested.js");
+console.log(mod);
diff --git a/cli/tests/top_level_await_bug.out b/cli/tests/top_level_await_bug.out
new file mode 100644
index 000000000..f0369645c
--- /dev/null
+++ b/cli/tests/top_level_await_bug.out
@@ -0,0 +1 @@
+Module { default: 1, [Symbol(Symbol.toStringTag)]: "Module" }
diff --git a/cli/tests/top_level_await_bug2.js b/cli/tests/top_level_await_bug2.js
new file mode 100644
index 000000000..c847bbd34
--- /dev/null
+++ b/cli/tests/top_level_await_bug2.js
@@ -0,0 +1,15 @@
+const mod = await import("./top_level_await_bug_nested.js");
+console.log(mod);
+
+const sleep = (n) => new Promise((r) => setTimeout(r, n));
+
+await sleep(100);
+console.log("slept");
+
+window.addEventListener("load", () => {
+ console.log("load event");
+});
+
+setTimeout(() => {
+ console.log("timeout");
+}, 1000);
diff --git a/cli/tests/top_level_await_bug2.out b/cli/tests/top_level_await_bug2.out
new file mode 100644
index 000000000..509ee27c2
--- /dev/null
+++ b/cli/tests/top_level_await_bug2.out
@@ -0,0 +1,4 @@
+Module { default: 1, [Symbol(Symbol.toStringTag)]: "Module" }
+slept
+load event
+timeout
diff --git a/cli/tests/top_level_await_bug_nested.js b/cli/tests/top_level_await_bug_nested.js
new file mode 100644
index 000000000..894f0de2d
--- /dev/null
+++ b/cli/tests/top_level_await_bug_nested.js
@@ -0,0 +1,5 @@
+const sleep = (n) => new Promise((r) => setTimeout(r, n));
+
+await sleep(100);
+
+export default 1;
diff --git a/cli/worker.rs b/cli/worker.rs
index 08ccd418e..47e5c4761 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -189,7 +189,7 @@ impl Worker {
) -> Result<(), AnyError> {
let id = self.preload_module(module_specifier).await?;
self.wait_for_inspector_session();
- self.isolate.mod_evaluate(id)
+ self.isolate.mod_evaluate(id).await
}
/// Loads, instantiates and executes provided source code
@@ -204,7 +204,7 @@ impl Worker {
.load_module(module_specifier, Some(code))
.await?;
self.wait_for_inspector_session();
- self.isolate.mod_evaluate(id)
+ self.isolate.mod_evaluate(id).await
}
/// Returns a way to communicate with the Worker from other threads.
diff --git a/core/modules.rs b/core/modules.rs
index 235bfeb4e..a0e4fad95 100644
--- a/core/modules.rs
+++ b/core/modules.rs
@@ -341,6 +341,13 @@ pub struct ModuleInfo {
pub name: String,
pub handle: v8::Global<v8::Module>,
pub import_specifiers: Vec<ModuleSpecifier>,
+ // TODO(bartlomieju): there should be "state"
+ // field that describes if module is already being loaded,
+ // so concurent dynamic imports don't introduce dead lock
+ // pub state: LoadState {
+ // Loading(shared_future),
+ // Loaded,
+ // },
}
/// A symbolic module entity.
@@ -667,7 +674,7 @@ mod tests {
let a_id_fut = runtime.load_module(&spec, None);
let a_id = futures::executor::block_on(a_id_fut).expect("Failed to load");
- runtime.mod_evaluate(a_id).unwrap();
+ futures::executor::block_on(runtime.mod_evaluate(a_id)).unwrap();
let l = loads.lock().unwrap();
assert_eq!(
l.to_vec(),
@@ -734,7 +741,7 @@ mod tests {
let result = runtime.load_module(&spec, None).await;
assert!(result.is_ok());
let circular1_id = result.unwrap();
- runtime.mod_evaluate(circular1_id).unwrap();
+ runtime.mod_evaluate(circular1_id).await.unwrap();
let l = loads.lock().unwrap();
assert_eq!(
@@ -811,7 +818,7 @@ mod tests {
println!(">> result {:?}", result);
assert!(result.is_ok());
let redirect1_id = result.unwrap();
- runtime.mod_evaluate(redirect1_id).unwrap();
+ runtime.mod_evaluate(redirect1_id).await.unwrap();
let l = loads.lock().unwrap();
assert_eq!(
l.to_vec(),
@@ -961,7 +968,7 @@ mod tests {
let main_id =
futures::executor::block_on(main_id_fut).expect("Failed to load");
- runtime.mod_evaluate(main_id).unwrap();
+ futures::executor::block_on(runtime.mod_evaluate(main_id)).unwrap();
let l = loads.lock().unwrap();
assert_eq!(
diff --git a/core/runtime.rs b/core/runtime.rs
index 514703f34..2474d1887 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -23,6 +23,8 @@ use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use crate::BufVec;
use crate::OpState;
+use futures::channel::mpsc;
+use futures::future::poll_fn;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::stream::StreamFuture;
@@ -84,6 +86,11 @@ pub struct JsRuntime {
allocations: IsolateAllocations,
}
+type DynImportModEvaluate =
+ (ModuleId, v8::Global<v8::Promise>, v8::Global<v8::Module>);
+type ModEvaluate =
+ (v8::Global<v8::Promise>, mpsc::Sender<Result<(), AnyError>>);
+
/// Internal state for JsRuntime which is stored in one of v8::Isolate's
/// embedder slots.
pub(crate) struct JsRuntimeState {
@@ -92,6 +99,8 @@ pub(crate) struct JsRuntimeState {
pub(crate) js_recv_cb: Option<v8::Global<v8::Function>>,
pub(crate) js_macrotask_cb: Option<v8::Global<v8::Function>>,
pub(crate) pending_promise_exceptions: HashMap<i32, v8::Global<v8::Value>>,
+ pub(crate) pending_dyn_mod_evaluate: HashMap<i32, DynImportModEvaluate>,
+ pub(crate) pending_mod_evaluate: HashMap<ModuleId, ModEvaluate>,
pub(crate) js_error_create_fn: Box<JsErrorCreateFn>,
pub(crate) shared: SharedQueue,
pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>,
@@ -278,6 +287,8 @@ impl JsRuntime {
isolate.set_slot(Rc::new(RefCell::new(JsRuntimeState {
global_context: Some(global_context),
pending_promise_exceptions: HashMap::new(),
+ pending_dyn_mod_evaluate: HashMap::new(),
+ pending_mod_evaluate: HashMap::new(),
shared_ab: None,
js_recv_cb: None,
js_macrotask_cb: None,
@@ -484,6 +495,9 @@ impl Future for JsRuntime {
state.waker.register(cx.waker());
}
+ // Top level modules
+ runtime.poll_mod_evaluate(cx)?;
+
// Dynamic module loading - ie. modules loaded using "import()"
{
let poll_imports = runtime.prepare_dyn_imports(cx)?;
@@ -492,6 +506,8 @@ impl Future for JsRuntime {
let poll_imports = runtime.poll_dyn_imports(cx)?;
assert!(poll_imports.is_ready());
+ runtime.poll_dyn_imports_evaluate(cx)?;
+
runtime.check_promise_exceptions()?;
}
@@ -508,6 +524,8 @@ impl Future for JsRuntime {
state.pending_ops.is_empty()
&& state.pending_dyn_imports.is_empty()
&& state.preparing_dyn_imports.is_empty()
+ && state.pending_dyn_mod_evaluate.is_empty()
+ && state.pending_mod_evaluate.is_empty()
};
if is_idle {
@@ -700,7 +718,91 @@ impl JsRuntime {
/// `AnyError` can be downcast to a type that exposes additional information
/// about the V8 exception. By default this type is `JsError`, however it may
/// be a different type if `RuntimeOptions::js_error_create_fn` has been set.
- pub fn mod_evaluate(&mut self, id: ModuleId) -> Result<(), AnyError> {
+ pub fn dyn_mod_evaluate(
+ &mut self,
+ load_id: ModuleLoadId,
+ id: ModuleId,
+ ) -> Result<(), AnyError> {
+ self.shared_init();
+
+ let state_rc = Self::state(self);
+ let context = self.global_context();
+ let context1 = self.global_context();
+
+ let module_handle = state_rc
+ .borrow()
+ .modules
+ .get_info(id)
+ .expect("ModuleInfo not found")
+ .handle
+ .clone();
+
+ let status = {
+ let scope = &mut v8::HandleScope::with_context(&mut **self, context);
+ let module = module_handle.get(scope);
+ module.get_status()
+ };
+
+ if status == v8::ModuleStatus::Instantiated {
+ // IMPORTANT: Top-level-await is enabled, which means that return value
+ // of module evaluation is a promise.
+ //
+ // Because that promise is created internally by V8, when error occurs during
+ // module evaluation the promise is rejected, and since the promise has no rejection
+ // handler it will result in call to `bindings::promise_reject_callback` adding
+ // the promise to pending promise rejection table - meaning JsRuntime will return
+ // error on next poll().
+ //
+ // This situation is not desirable as we want to manually return error at the
+ // end of this function to handle it further. It means we need to manually
+ // remove this promise from pending promise rejection table.
+ //
+ // For more details see:
+ // https://github.com/denoland/deno/issues/4908
+ // https://v8.dev/features/top-level-await#module-execution-order
+ let scope = &mut v8::HandleScope::with_context(&mut **self, context1);
+ let module = v8::Local::new(scope, &module_handle);
+ let maybe_value = module.evaluate(scope);
+
+ // Update status after evaluating.
+ let status = module.get_status();
+
+ if let Some(value) = maybe_value {
+ assert!(
+ status == v8::ModuleStatus::Evaluated
+ || status == v8::ModuleStatus::Errored
+ );
+ let promise = v8::Local::<v8::Promise>::try_from(value)
+ .expect("Expected to get promise as module evaluation result");
+ let promise_id = promise.get_identity_hash();
+ let mut state = state_rc.borrow_mut();
+ state.pending_promise_exceptions.remove(&promise_id);
+ let promise_global = v8::Global::new(scope, promise);
+ let module_global = v8::Global::new(scope, module);
+ state
+ .pending_dyn_mod_evaluate
+ .insert(load_id, (id, promise_global, module_global));
+ } else {
+ assert!(status == v8::ModuleStatus::Errored);
+ }
+ }
+
+ if status == v8::ModuleStatus::Evaluated {
+ self.dyn_import_done(load_id, id)?;
+ }
+
+ Ok(())
+ }
+
+ /// Evaluates an already instantiated ES module.
+ ///
+ /// `AnyError` can be downcast to a type that exposes additional information
+ /// about the V8 exception. By default this type is `JsError`, however it may
+ /// be a different type if `RuntimeOptions::js_error_create_fn` has been set.
+ fn mod_evaluate_inner(
+ &mut self,
+ id: ModuleId,
+ ) -> Result<mpsc::Receiver<Result<(), AnyError>>, AnyError> {
self.shared_init();
let state_rc = Self::state(self);
@@ -716,6 +818,8 @@ impl JsRuntime {
.expect("ModuleInfo not found");
let mut status = module.get_status();
+ let (sender, receiver) = mpsc::channel(1);
+
if status == v8::ModuleStatus::Instantiated {
// IMPORTANT: Top-level-await is enabled, which means that return value
// of module evaluation is a promise.
@@ -748,20 +852,30 @@ impl JsRuntime {
let promise_id = promise.get_identity_hash();
let mut state = state_rc.borrow_mut();
state.pending_promise_exceptions.remove(&promise_id);
+ let promise_global = v8::Global::new(scope, promise);
+ state
+ .pending_mod_evaluate
+ .insert(id, (promise_global, sender));
} else {
assert!(status == v8::ModuleStatus::Errored);
}
}
- match status {
- v8::ModuleStatus::Evaluated => Ok(()),
- v8::ModuleStatus::Errored => {
- let exception = module.get_exception();
- exception_to_err_result(scope, exception)
- .map_err(|err| attach_handle_to_error(scope, err, exception))
+ Ok(receiver)
+ }
+
+ pub async fn mod_evaluate(&mut self, id: ModuleId) -> Result<(), AnyError> {
+ let mut receiver = self.mod_evaluate_inner(id)?;
+
+ poll_fn(|cx| {
+ if let Poll::Ready(result) = receiver.poll_next_unpin(cx) {
+ debug!("received module evaluate");
+ return Poll::Ready(result.unwrap());
}
- other => panic!("Unexpected module status {:?}", other),
- }
+ let _r = self.poll_unpin(cx)?;
+ Poll::Pending
+ })
+ .await
}
fn dyn_import_error(
@@ -922,14 +1036,121 @@ impl JsRuntime {
// Load is done.
let module_id = load.root_module_id.unwrap();
self.mod_instantiate(module_id)?;
- match self.mod_evaluate(module_id) {
- Ok(()) => self.dyn_import_done(dyn_import_id, module_id)?,
- Err(err) => self.dyn_import_error(dyn_import_id, err)?,
- };
+ self.dyn_mod_evaluate(dyn_import_id, module_id)?;
+ }
+ }
+ }
+ }
+ }
+
+ fn poll_mod_evaluate(&mut self, _cx: &mut Context) -> Result<(), AnyError> {
+ let state_rc = Self::state(self);
+
+ let context = self.global_context();
+ {
+ let scope = &mut v8::HandleScope::with_context(&mut **self, context);
+
+ let mut state = state_rc.borrow_mut();
+
+ if let Some(&module_id) = state.pending_mod_evaluate.keys().next() {
+ let handle = state.pending_mod_evaluate.remove(&module_id).unwrap();
+ drop(state);
+
+ let promise = handle.0.get(scope);
+ let mut sender = handle.1.clone();
+
+ let promise_state = promise.state();
+
+ match promise_state {
+ v8::PromiseState::Pending => {
+ state_rc
+ .borrow_mut()
+ .pending_mod_evaluate
+ .insert(module_id, handle);
+ state_rc.borrow().waker.wake();
+ }
+ v8::PromiseState::Fulfilled => {
+ sender.try_send(Ok(())).unwrap();
+ }
+ v8::PromiseState::Rejected => {
+ let exception = promise.result(scope);
+ let err1 = exception_to_err_result::<()>(scope, exception)
+ .map_err(|err| attach_handle_to_error(scope, err, exception))
+ .unwrap_err();
+ sender.try_send(Err(err1)).unwrap();
+ }
+ }
+ }
+ };
+
+ Ok(())
+ }
+
+ fn poll_dyn_imports_evaluate(
+ &mut self,
+ _cx: &mut Context,
+ ) -> Result<(), AnyError> {
+ let state_rc = Self::state(self);
+
+ loop {
+ let context = self.global_context();
+ let maybe_result = {
+ let scope = &mut v8::HandleScope::with_context(&mut **self, context);
+
+ let mut state = state_rc.borrow_mut();
+ if let Some(&dyn_import_id) =
+ state.pending_dyn_mod_evaluate.keys().next()
+ {
+ let handle = state
+ .pending_dyn_mod_evaluate
+ .remove(&dyn_import_id)
+ .unwrap();
+ drop(state);
+
+ let module_id = handle.0;
+ let promise = handle.1.get(scope);
+ let _module = handle.2.get(scope);
+
+ let promise_state = promise.state();
+
+ match promise_state {
+ v8::PromiseState::Pending => {
+ state_rc
+ .borrow_mut()
+ .pending_dyn_mod_evaluate
+ .insert(dyn_import_id, handle);
+ state_rc.borrow().waker.wake();
+ None
+ }
+ v8::PromiseState::Fulfilled => Some(Ok((dyn_import_id, module_id))),
+ v8::PromiseState::Rejected => {
+ let exception = promise.result(scope);
+ let err1 = exception_to_err_result::<()>(scope, exception)
+ .map_err(|err| attach_handle_to_error(scope, err, exception))
+ .unwrap_err();
+ Some(Err((dyn_import_id, err1)))
+ }
+ }
+ } else {
+ None
+ }
+ };
+
+ if let Some(result) = maybe_result {
+ match result {
+ Ok((dyn_import_id, module_id)) => {
+ self.dyn_import_done(dyn_import_id, module_id)?;
+ }
+ Err((dyn_import_id, err1)) => {
+ self.dyn_import_error(dyn_import_id, err1)?;
}
}
+ } else {
+ break;
}
}
+
+ Ok(())
}
fn register_during_load(
@@ -2003,7 +2224,7 @@ pub mod tests {
runtime.mod_instantiate(mod_a).unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 0);
- runtime.mod_evaluate(mod_a).unwrap();
+ runtime.mod_evaluate_inner(mod_a).unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
}
@@ -2246,7 +2467,7 @@ pub mod tests {
)
.unwrap();
- runtime.mod_evaluate(module_id).unwrap();
+ futures::executor::block_on(runtime.mod_evaluate(module_id)).unwrap();
let _snapshot = runtime.snapshot();
}