summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/workers/test.ts21
-rw-r--r--cli/tests/workers/worker_with_top_level_await.ts15
-rw-r--r--core/modules.rs12
-rw-r--r--core/runtime.rs38
-rw-r--r--runtime/web_worker.rs25
-rw-r--r--runtime/worker.rs17
6 files changed, 98 insertions, 30 deletions
diff --git a/cli/tests/workers/test.ts b/cli/tests/workers/test.ts
index 0888e01db..f411e434f 100644
--- a/cli/tests/workers/test.ts
+++ b/cli/tests/workers/test.ts
@@ -675,3 +675,24 @@ Deno.test({
w.terminate();
},
});
+
+Deno.test({
+ name: "Worker with top-level-await",
+ fn: async function (): Promise<void> {
+ const result = deferred();
+ const worker = new Worker(
+ new URL("worker_with_top_level_await.ts", import.meta.url).href,
+ { type: "module" },
+ );
+ worker.onmessage = (e): void => {
+ if (e.data == "ready") {
+ worker.postMessage("trigger worker handler");
+ } else if (e.data == "triggered worker handler") {
+ result.resolve();
+ } else {
+ result.reject(new Error("Handler didn't run during top-level delay."));
+ }
+ };
+ await result;
+ },
+});
diff --git a/cli/tests/workers/worker_with_top_level_await.ts b/cli/tests/workers/worker_with_top_level_await.ts
new file mode 100644
index 000000000..6c5528900
--- /dev/null
+++ b/cli/tests/workers/worker_with_top_level_await.ts
@@ -0,0 +1,15 @@
+function delay(ms: number): Promise<void> {
+ return new Promise<void>((resolve) => {
+ setTimeout(() => {
+ resolve();
+ }, ms);
+ });
+}
+
+onmessage = (e: MessageEvent) => {
+ postMessage("triggered worker handler");
+ close();
+};
+postMessage("ready");
+await delay(1000);
+postMessage("never");
diff --git a/core/modules.rs b/core/modules.rs
index ea772a8b2..b9b99d3b5 100644
--- a/core/modules.rs
+++ b/core/modules.rs
@@ -687,7 +687,8 @@ mod tests {
let a_id_fut = runtime.load_module(&spec, None);
let a_id = futures::executor::block_on(a_id_fut).expect("Failed to load");
- futures::executor::block_on(runtime.mod_evaluate(a_id)).unwrap();
+ runtime.mod_evaluate(a_id);
+ futures::executor::block_on(runtime.run_event_loop()).unwrap();
let l = loads.lock().unwrap();
assert_eq!(
l.to_vec(),
@@ -754,7 +755,8 @@ mod tests {
let result = runtime.load_module(&spec, None).await;
assert!(result.is_ok());
let circular1_id = result.unwrap();
- runtime.mod_evaluate(circular1_id).await.unwrap();
+ runtime.mod_evaluate(circular1_id);
+ runtime.run_event_loop().await.unwrap();
let l = loads.lock().unwrap();
assert_eq!(
@@ -827,7 +829,8 @@ mod tests {
println!(">> result {:?}", result);
assert!(result.is_ok());
let redirect1_id = result.unwrap();
- runtime.mod_evaluate(redirect1_id).await.unwrap();
+ runtime.mod_evaluate(redirect1_id);
+ runtime.run_event_loop().await.unwrap();
let l = loads.lock().unwrap();
assert_eq!(
l.to_vec(),
@@ -976,7 +979,8 @@ mod tests {
let main_id =
futures::executor::block_on(main_id_fut).expect("Failed to load");
- futures::executor::block_on(runtime.mod_evaluate(main_id)).unwrap();
+ runtime.mod_evaluate(main_id);
+ futures::executor::block_on(runtime.run_event_loop()).unwrap();
let l = loads.lock().unwrap();
assert_eq!(
diff --git a/core/runtime.rs b/core/runtime.rs
index 595a7733b..66d51eb73 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -853,14 +853,19 @@ impl JsRuntime {
Ok(())
}
+ // TODO(bartlomieju): make it return `ModuleEvaluationFuture`?
/// Evaluates an already instantiated ES module.
///
+ /// Returns a receiver handle that resolves when module promise resolves.
+ /// Implementors must manually call `run_event_loop()` to drive module
+ /// evaluation future.
+ ///
/// `AnyError` can be downcast to a type that exposes additional information
/// about the V8 exception. By default this type is `JsError`, however it may
/// be a different type if `RuntimeOptions::js_error_create_fn` has been set.
///
/// This function panics if module has not been instantiated.
- fn mod_evaluate_inner(
+ pub fn mod_evaluate(
&mut self,
id: ModuleId,
) -> mpsc::Receiver<Result<(), AnyError>> {
@@ -929,24 +934,6 @@ impl JsRuntime {
receiver
}
- pub async fn mod_evaluate(&mut self, id: ModuleId) -> Result<(), AnyError> {
- let mut receiver = self.mod_evaluate_inner(id);
-
- poll_fn(|cx| {
- if let Poll::Ready(maybe_result) = receiver.poll_next_unpin(cx) {
- debug!("received module evaluate {:#?}", maybe_result);
- // If `None` is returned it means that runtime was destroyed before
- // evaluation was complete. This can happen in Web Worker when `self.close()`
- // is called at top level.
- let result = maybe_result.unwrap_or(Ok(()));
- return Poll::Ready(result);
- }
- let _r = self.poll_event_loop(cx)?;
- Poll::Pending
- })
- .await
- }
-
fn dyn_import_error(&mut self, id: ModuleLoadId, err: AnyError) {
let state_rc = Self::state(self.v8_isolate());
let context = self.global_context();
@@ -1140,7 +1127,8 @@ impl JsRuntime {
v8::PromiseState::Fulfilled => {
state.pending_mod_evaluate.take();
scope.perform_microtask_checkpoint();
- sender.try_send(Ok(())).unwrap();
+ // Receiver end might have been already dropped, ignore the result
+ let _ = sender.try_send(Ok(()));
}
v8::PromiseState::Rejected => {
let exception = promise.result(scope);
@@ -1150,7 +1138,8 @@ impl JsRuntime {
let err1 = exception_to_err_result::<()>(scope, exception, false)
.map_err(|err| attach_handle_to_error(scope, err, exception))
.unwrap_err();
- sender.try_send(Err(err1)).unwrap();
+ // Receiver end might have been already dropped, ignore the result
+ let _ = sender.try_send(Err(err1));
}
}
}
@@ -1937,7 +1926,7 @@ pub mod tests {
throw Error("assert");
}
}
-
+
let asyncRecv = 0;
Deno.core.setAsyncHandler(1, (buf) => {
assert(buf.byteLength === 100 * 1024 * 1024);
@@ -2351,7 +2340,7 @@ pub mod tests {
runtime.mod_instantiate(mod_a).unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 0);
- runtime.mod_evaluate_inner(mod_a);
+ runtime.mod_evaluate(mod_a);
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
}
@@ -2594,7 +2583,8 @@ pub mod tests {
)
.unwrap();
- futures::executor::block_on(runtime.mod_evaluate(module_id)).unwrap();
+ runtime.mod_evaluate(module_id);
+ futures::executor::block_on(runtime.run_event_loop()).unwrap();
let _snapshot = runtime.snapshot();
}
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs
index 30869ff41..73d351c9c 100644
--- a/runtime/web_worker.rs
+++ b/runtime/web_worker.rs
@@ -325,7 +325,28 @@ impl WebWorker {
module_specifier: &ModuleSpecifier,
) -> Result<(), AnyError> {
let id = self.js_runtime.load_module(module_specifier, None).await?;
- self.js_runtime.mod_evaluate(id).await
+
+ let mut receiver = self.js_runtime.mod_evaluate(id);
+ tokio::select! {
+ maybe_result = receiver.next() => {
+ debug!("received worker module evaluate {:#?}", maybe_result);
+ // If `None` is returned it means that runtime was destroyed before
+ // evaluation was complete. This can happen in Web Worker when `self.close()`
+ // is called at top level.
+ let result = maybe_result.unwrap_or(Ok(()));
+ return result;
+ }
+
+ event_loop_result = self.run_event_loop() => {
+ if self.has_been_terminated() {
+ return Ok(());
+ }
+ event_loop_result?;
+ let maybe_result = receiver.next().await;
+ let result = maybe_result.unwrap_or(Ok(()));
+ return result;
+ }
+ }
}
/// Returns a way to communicate with the Worker from other threads.
@@ -384,6 +405,8 @@ impl WebWorker {
let msg = String::from_utf8(msg.to_vec()).unwrap();
let script = format!("workerMessageRecvCallback({})", msg);
+ // TODO(bartlomieju): set proper script name like "deno:runtime/web_worker.js"
+ // so it's dimmed in stack trace instead of using "__anonymous__"
if let Err(e) = self.execute(&script) {
// If execution was terminated during message callback then
// just ignore it
diff --git a/runtime/worker.rs b/runtime/worker.rs
index 97466fadb..e63fdbe18 100644
--- a/runtime/worker.rs
+++ b/runtime/worker.rs
@@ -10,6 +10,7 @@ use crate::permissions::Permissions;
use deno_core::error::AnyError;
use deno_core::futures::future::poll_fn;
use deno_core::futures::future::FutureExt;
+use deno_core::futures::stream::StreamExt;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::url::Url;
@@ -221,7 +222,21 @@ impl MainWorker {
) -> Result<(), AnyError> {
let id = self.preload_module(module_specifier).await?;
self.wait_for_inspector_session();
- self.js_runtime.mod_evaluate(id).await
+ let mut receiver = self.js_runtime.mod_evaluate(id);
+ tokio::select! {
+ maybe_result = receiver.next() => {
+ debug!("received module evaluate {:#?}", maybe_result);
+ let result = maybe_result.expect("Module evaluation result not provided.");
+ return result;
+ }
+
+ event_loop_result = self.run_event_loop() => {
+ event_loop_result?;
+ let maybe_result = receiver.next().await;
+ let result = maybe_result.expect("Module evaluation result not provided.");
+ return result;
+ }
+ }
}
fn wait_for_inspector_session(&mut self) {