summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/main.rs26
-rw-r--r--cli/standalone.rs4
-rw-r--r--cli/tests/integration/compat_tests.rs5
-rw-r--r--cli/tests/testdata/compat/worker/worker.mjs9
-rw-r--r--cli/tests/testdata/compat/worker/worker_test.mjs18
-rw-r--r--cli/tests/testdata/compat/worker/worker_test.out2
-rw-r--r--runtime/examples/hello_runtime.rs4
-rw-r--r--runtime/js/99_main.js2
-rw-r--r--runtime/ops/worker_host.rs41
-rw-r--r--runtime/web_worker.rs93
-rw-r--r--runtime/worker.rs9
11 files changed, 194 insertions, 19 deletions
diff --git a/cli/main.rs b/cli/main.rs
index ecadf65bd..acf49bb3f 100644
--- a/cli/main.rs
+++ b/cli/main.rs
@@ -71,6 +71,7 @@ use deno_ast::MediaType;
use deno_core::error::generic_error;
use deno_core::error::AnyError;
use deno_core::futures::future::FutureExt;
+use deno_core::futures::future::LocalFutureObj;
use deno_core::futures::Future;
use deno_core::located_script_name;
use deno_core::parking_lot::RwLock;
@@ -82,6 +83,7 @@ use deno_core::Extension;
use deno_core::ModuleSpecifier;
use deno_runtime::colors;
use deno_runtime::ops::worker_host::CreateWebWorkerCb;
+use deno_runtime::ops::worker_host::PreloadModuleCb;
use deno_runtime::permissions::Permissions;
use deno_runtime::tokio_util::run_basic;
use deno_runtime::web_worker::WebWorker;
@@ -100,6 +102,24 @@ use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
+fn create_web_worker_preload_module_callback(
+ ps: ProcState,
+) -> Arc<PreloadModuleCb> {
+ let compat = ps.flags.compat;
+
+ Arc::new(move |mut worker| {
+ let fut = async move {
+ if compat {
+ worker.execute_side_module(&compat::GLOBAL_URL).await?;
+ worker.execute_side_module(&compat::MODULE_URL).await?;
+ }
+
+ Ok(worker)
+ };
+ LocalFutureObj::new(Box::new(fut))
+ })
+}
+
fn create_web_worker_callback(ps: ProcState) -> Arc<CreateWebWorkerCb> {
Arc::new(move |args| {
let global_state_ = ps.clone();
@@ -116,6 +136,8 @@ fn create_web_worker_callback(ps: ProcState) -> Arc<CreateWebWorkerCb> {
args.parent_permissions.clone(),
);
let create_web_worker_cb = create_web_worker_callback(ps.clone());
+ let preload_module_cb =
+ create_web_worker_preload_module_callback(ps.clone());
let extensions = ops::cli_exts(ps.clone(), args.use_deno_namespace);
@@ -145,6 +167,7 @@ fn create_web_worker_callback(ps: ProcState) -> Arc<CreateWebWorkerCb> {
seed: ps.flags.seed,
module_loader,
create_web_worker_cb,
+ preload_module_cb,
js_error_create_fn: Some(js_error_create_fn),
use_deno_namespace: args.use_deno_namespace,
worker_type: args.worker_type,
@@ -187,6 +210,8 @@ pub fn create_main_worker(
let should_break_on_first_statement = ps.flags.inspect_brk.is_some();
let create_web_worker_cb = create_web_worker_callback(ps.clone());
+ let web_worker_preload_module_cb =
+ create_web_worker_preload_module_callback(ps.clone());
let maybe_storage_key = if let Some(location) = &ps.flags.location {
// if a location is set, then the ascii serialization of the location is
@@ -240,6 +265,7 @@ pub fn create_main_worker(
seed: ps.flags.seed,
js_error_create_fn: Some(js_error_create_fn),
create_web_worker_cb,
+ web_worker_preload_module_cb,
maybe_inspector_server,
should_break_on_first_statement,
module_loader,
diff --git a/cli/standalone.rs b/cli/standalone.rs
index e31fa15f7..a50e7d80d 100644
--- a/cli/standalone.rs
+++ b/cli/standalone.rs
@@ -208,6 +208,9 @@ pub async fn run(
let create_web_worker_cb = Arc::new(|_| {
todo!("Worker are currently not supported in standalone binaries");
});
+ let web_worker_preload_module_cb = Arc::new(|_| {
+ todo!("Worker are currently not supported in standalone binaries");
+ });
// Keep in sync with `main.rs`.
v8_set_flags(
@@ -257,6 +260,7 @@ pub async fn run(
seed: metadata.seed,
js_error_create_fn: None,
create_web_worker_cb,
+ web_worker_preload_module_cb,
maybe_inspector_server: None,
should_break_on_first_statement: false,
module_loader,
diff --git a/cli/tests/integration/compat_tests.rs b/cli/tests/integration/compat_tests.rs
index bafe24af3..189e1eb41 100644
--- a/cli/tests/integration/compat_tests.rs
+++ b/cli/tests/integration/compat_tests.rs
@@ -90,6 +90,11 @@ itest!(top_level_fail_esm {
output: "compat/test_runner/top_level_fail_esm.out",
});
+itest!(compat_worker {
+ args: "run --compat --unstable -A --quiet --no-check compat/worker/worker_test.mjs",
+ output: "compat/worker/worker_test.out",
+});
+
#[test]
fn globals_in_repl() {
let (out, _err) = util::run_and_collect_output_with_args(
diff --git a/cli/tests/testdata/compat/worker/worker.mjs b/cli/tests/testdata/compat/worker/worker.mjs
new file mode 100644
index 000000000..eb7cfed19
--- /dev/null
+++ b/cli/tests/testdata/compat/worker/worker.mjs
@@ -0,0 +1,9 @@
+console.log("hello from worker");
+
+self.onmessage = (e) => {
+ if (e.data != "hello") {
+ throw new Error("wrong message");
+ }
+
+ self.postMessage({ pid: process.pid });
+}
diff --git a/cli/tests/testdata/compat/worker/worker_test.mjs b/cli/tests/testdata/compat/worker/worker_test.mjs
new file mode 100644
index 000000000..215605487
--- /dev/null
+++ b/cli/tests/testdata/compat/worker/worker_test.mjs
@@ -0,0 +1,18 @@
+import { deferred } from "../../../../../test_util/std/async/deferred.ts";
+
+const promise = deferred();
+const url = new URL("./worker.mjs", import.meta.url);
+const worker = new Worker(url.href, { type: "module", deno: true });
+
+worker.onmessage = (e) => {
+ const pid = e.data.pid;
+ if (typeof pid != "number") {
+ throw new Error("pid is not a number");
+ }
+ console.log("process.pid from worker:", pid);
+ promise.resolve();
+};
+
+worker.postMessage("hello");
+await promise;
+worker.terminate();
diff --git a/cli/tests/testdata/compat/worker/worker_test.out b/cli/tests/testdata/compat/worker/worker_test.out
new file mode 100644
index 000000000..373841945
--- /dev/null
+++ b/cli/tests/testdata/compat/worker/worker_test.out
@@ -0,0 +1,2 @@
+hello from worker
+process.pid from worker: [WILDCARD]
diff --git a/runtime/examples/hello_runtime.rs b/runtime/examples/hello_runtime.rs
index 74a9ef398..e74920c34 100644
--- a/runtime/examples/hello_runtime.rs
+++ b/runtime/examples/hello_runtime.rs
@@ -22,6 +22,9 @@ async fn main() -> Result<(), AnyError> {
let create_web_worker_cb = Arc::new(|_| {
todo!("Web workers are not supported in the example");
});
+ let web_worker_preload_module_cb = Arc::new(|_| {
+ todo!("Web workers are not supported in the example");
+ });
let options = WorkerOptions {
bootstrap: BootstrapOptions {
@@ -42,6 +45,7 @@ async fn main() -> Result<(), AnyError> {
user_agent: "hello_runtime".to_string(),
seed: None,
js_error_create_fn: None,
+ web_worker_preload_module_cb,
create_web_worker_cb,
maybe_inspector_server: None,
should_break_on_first_statement: false,
diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js
index 5a4d7e989..fb5de250c 100644
--- a/runtime/js/99_main.js
+++ b/runtime/js/99_main.js
@@ -676,7 +676,7 @@ delete Object.prototype.__proto__;
numCpus = cpuCount;
registerErrors();
- pollForMessages();
+ globalThis.pollForMessages = pollForMessages;
const internalSymbol = Symbol("Deno.internal");
diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs
index c241e9a54..1213da6d2 100644
--- a/runtime/ops/worker_host.rs
+++ b/runtime/ops/worker_host.rs
@@ -12,6 +12,7 @@ use crate::web_worker::WebWorkerType;
use crate::web_worker::WorkerControlEvent;
use crate::web_worker::WorkerId;
use deno_core::error::AnyError;
+use deno_core::futures::future::LocalFutureObj;
use deno_core::op_async;
use deno_core::op_sync;
use deno_core::serde::Deserialize;
@@ -42,13 +43,24 @@ pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, Sendable
+ Sync
+ Send;
+pub type PreloadModuleCb = dyn Fn(WebWorker) -> LocalFutureObj<'static, Result<WebWorker, AnyError>>
+ + Sync
+ + Send;
+
/// A holder for callback that is used to create a new
/// WebWorker. It's a struct instead of a type alias
/// because `GothamState` used in `OpState` overrides
-/// value if type alises have the same underlying type
+/// value if type aliases have the same underlying type
#[derive(Clone)]
pub struct CreateWebWorkerCbHolder(Arc<CreateWebWorkerCb>);
+/// A holder for callback that can used to preload some modules into a WebWorker
+/// before actual worker code is executed. It's a struct instead of a type
+/// because `GothamState` used in `OpState` overrides
+/// value if type aliases have the same underlying type
+#[derive(Clone)]
+pub struct PreloadModuleCbHolder(Arc<PreloadModuleCb>);
+
pub struct WorkerThread {
// It's an Option so we can take the value before dropping the WorkerThread.
join_handle: Option<JoinHandle<Result<(), AnyError>>>,
@@ -91,15 +103,21 @@ impl Drop for WorkerThread {
pub type WorkersTable = HashMap<WorkerId, WorkerThread>;
-pub fn init(create_web_worker_cb: Arc<CreateWebWorkerCb>) -> Extension {
+pub fn init(
+ create_web_worker_cb: Arc<CreateWebWorkerCb>,
+ preload_module_cb: Arc<PreloadModuleCb>,
+) -> Extension {
Extension::builder()
.state(move |state| {
state.put::<WorkersTable>(WorkersTable::default());
state.put::<WorkerId>(WorkerId::default());
- let create_module_loader =
+ let create_web_worker_cb_holder =
CreateWebWorkerCbHolder(create_web_worker_cb.clone());
- state.put::<CreateWebWorkerCbHolder>(create_module_loader);
+ state.put::<CreateWebWorkerCbHolder>(create_web_worker_cb_holder);
+ let preload_module_cb_holder =
+ PreloadModuleCbHolder(preload_module_cb.clone());
+ state.put::<PreloadModuleCbHolder>(preload_module_cb_holder);
Ok(())
})
@@ -174,8 +192,10 @@ fn op_create_worker(
// have access to `exit_code` but the child does?
let maybe_exit_code = state.try_borrow::<Arc<AtomicI32>>().cloned();
let worker_id = state.take::<WorkerId>();
- let create_module_loader = state.take::<CreateWebWorkerCbHolder>();
- state.put::<CreateWebWorkerCbHolder>(create_module_loader.clone());
+ let create_web_worker_cb = state.take::<CreateWebWorkerCbHolder>();
+ state.put::<CreateWebWorkerCbHolder>(create_web_worker_cb.clone());
+ let preload_module_cb = state.take::<PreloadModuleCbHolder>();
+ state.put::<PreloadModuleCbHolder>(preload_module_cb.clone());
state.put::<WorkerId>(worker_id.next().unwrap());
let module_specifier = deno_core::resolve_url(&specifier)?;
@@ -197,7 +217,7 @@ fn op_create_worker(
// - newly spawned thread exits
let (worker, external_handle) =
- (create_module_loader.0)(CreateWebWorkerArgs {
+ (create_web_worker_cb.0)(CreateWebWorkerArgs {
name: worker_name,
worker_id,
parent_permissions,
@@ -216,7 +236,12 @@ fn op_create_worker(
// is using `worker.internal_channels`.
//
// Host can already push messages and interact with worker.
- run_web_worker(worker, module_specifier, maybe_source_code)
+ run_web_worker(
+ worker,
+ module_specifier,
+ maybe_source_code,
+ preload_module_cb.0,
+ )
})?;
// Receive WebWorkerHandle from newly created worker
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs
index 130b13dc0..8cbbb5d4f 100644
--- a/runtime/web_worker.rs
+++ b/runtime/web_worker.rs
@@ -304,6 +304,7 @@ pub struct WebWorker {
pub use_deno_namespace: bool,
pub worker_type: WebWorkerType,
pub main_module: ModuleSpecifier,
+ poll_for_messages_fn: Option<v8::Global<v8::Value>>,
}
pub struct WebWorkerOptions {
@@ -315,6 +316,7 @@ pub struct WebWorkerOptions {
pub seed: Option<u64>,
pub module_loader: Rc<dyn ModuleLoader>,
pub create_web_worker_cb: Arc<ops::worker_host::CreateWebWorkerCb>,
+ pub preload_module_cb: Arc<ops::worker_host::PreloadModuleCb>,
pub js_error_create_fn: Option<Rc<JsErrorCreateFn>>,
pub use_deno_namespace: bool,
pub worker_type: WebWorkerType,
@@ -395,7 +397,10 @@ impl WebWorker {
let runtime_exts = vec![
ops::web_worker::init(),
ops::runtime::init(main_module.clone()),
- ops::worker_host::init(options.create_web_worker_cb.clone()),
+ ops::worker_host::init(
+ options.create_web_worker_cb.clone(),
+ options.preload_module_cb.clone(),
+ ),
ops::io::init(),
];
@@ -468,6 +473,7 @@ impl WebWorker {
use_deno_namespace: options.use_deno_namespace,
worker_type: options.worker_type,
main_module,
+ poll_for_messages_fn: None,
},
external_handle,
)
@@ -486,6 +492,18 @@ impl WebWorker {
self
.execute_script(&located_script_name!(), &script)
.expect("Failed to execute worker bootstrap script");
+ // Save a reference to function that will start polling for messages
+ // from a worker host; it will be called after the user code is loaded.
+ let script = r#"
+ const pollForMessages = globalThis.pollForMessages;
+ delete globalThis.pollForMessages;
+ pollForMessages
+ "#;
+ let poll_for_messages_fn = self
+ .js_runtime
+ .execute_script(&located_script_name!(), script)
+ .expect("Failed to execute worker bootstrap script");
+ self.poll_for_messages_fn = Some(poll_for_messages_fn);
}
/// See [JsRuntime::execute_script](deno_core::JsRuntime::execute_script)
@@ -519,11 +537,36 @@ impl WebWorker {
}
/// Loads, instantiates and executes specified JavaScript module.
- pub async fn execute_main_module(
+ ///
+ /// This method assumes that worker can't be terminated when executing
+ /// side module code.
+ pub async fn execute_side_module(
&mut self,
module_specifier: &ModuleSpecifier,
) -> Result<(), AnyError> {
- let id = self.preload_module(module_specifier, true).await?;
+ let id = self.preload_module(module_specifier, false).await?;
+ let mut receiver = self.js_runtime.mod_evaluate(id);
+ tokio::select! {
+ maybe_result = &mut receiver => {
+ debug!("received module evaluate {:#?}", maybe_result);
+ maybe_result.expect("Module evaluation result not provided.")
+ }
+
+ event_loop_result = self.js_runtime.run_event_loop(false) => {
+ event_loop_result?;
+ let maybe_result = receiver.await;
+ maybe_result.expect("Module evaluation result not provided.")
+ }
+ }
+ }
+
+ /// Loads, instantiates and executes specified JavaScript module.
+ ///
+ /// This module will have "import.meta.main" equal to true.
+ pub async fn execute_main_module(
+ &mut self,
+ id: ModuleId,
+ ) -> Result<(), AnyError> {
let mut receiver = self.js_runtime.mod_evaluate(id);
tokio::select! {
maybe_result = &mut receiver => {
@@ -582,6 +625,17 @@ impl WebWorker {
) -> Result<(), AnyError> {
poll_fn(|cx| self.poll_event_loop(cx, wait_for_inspector)).await
}
+
+ // Starts polling for messages from worker host from JavaScript.
+ fn start_polling_for_messages(&mut self) {
+ let poll_for_messages_fn = self.poll_for_messages_fn.take().unwrap();
+ let scope = &mut self.js_runtime.handle_scope();
+ let poll_for_messages =
+ v8::Local::<v8::Value>::new(scope, poll_for_messages_fn);
+ let fn_ = v8::Local::<v8::Function>::try_from(poll_for_messages).unwrap();
+ let undefined = v8::undefined(scope);
+ fn_.call(scope, undefined.into(), &[]).unwrap();
+ }
}
fn print_worker_error(error_str: String, name: &str) {
@@ -596,9 +650,10 @@ fn print_worker_error(error_str: String, name: &str) {
/// This function should be called from a thread dedicated to this worker.
// TODO(bartlomieju): check if order of actions is aligned to Worker spec
pub fn run_web_worker(
- mut worker: WebWorker,
+ worker: WebWorker,
specifier: ModuleSpecifier,
maybe_source_code: Option<String>,
+ preload_module_cb: Arc<ops::worker_host::PreloadModuleCb>,
) -> Result<(), AnyError> {
let name = worker.name.to_string();
@@ -606,17 +661,39 @@ pub fn run_web_worker(
// with terminate
let fut = async move {
+ let internal_handle = worker.internal_handle.clone();
+ let result = (preload_module_cb)(worker).await;
+
+ let mut worker = match result {
+ Ok(worker) => worker,
+ Err(e) => {
+ print_worker_error(e.to_string(), &name);
+ internal_handle
+ .post_event(WorkerControlEvent::TerminalError(e))
+ .expect("Failed to post message to host");
+
+ // Failure to execute script is a terminal error, bye, bye.
+ return Ok(());
+ }
+ };
+
// Execute provided source code immediately
let result = if let Some(source_code) = maybe_source_code {
- worker.execute_script(&located_script_name!(), &source_code)
+ let r = worker.execute_script(&located_script_name!(), &source_code);
+ worker.start_polling_for_messages();
+ r
} else {
// TODO(bartlomieju): add "type": "classic", ie. ability to load
// script instead of module
- worker.execute_main_module(&specifier).await
+ match worker.preload_module(&specifier, true).await {
+ Ok(id) => {
+ worker.start_polling_for_messages();
+ worker.execute_main_module(id).await
+ }
+ Err(e) => Err(e),
+ }
};
- let internal_handle = worker.internal_handle.clone();
-
// If sender is closed it means that worker has already been closed from
// within using "globalThis.close()"
if internal_handle.is_terminated() {
diff --git a/runtime/worker.rs b/runtime/worker.rs
index 7a3a6c1c3..1dc9504d6 100644
--- a/runtime/worker.rs
+++ b/runtime/worker.rs
@@ -51,8 +51,9 @@ pub struct WorkerOptions {
pub user_agent: String,
pub seed: Option<u64>,
pub module_loader: Rc<dyn ModuleLoader>,
- // Callback invoked when creating new instance of WebWorker
+ // Callbacks invoked when creating new instance of WebWorker
pub create_web_worker_cb: Arc<ops::worker_host::CreateWebWorkerCb>,
+ pub web_worker_preload_module_cb: Arc<ops::worker_host::PreloadModuleCb>,
pub js_error_create_fn: Option<Rc<JsErrorCreateFn>>,
pub maybe_inspector_server: Option<Arc<InspectorServer>>,
pub should_break_on_first_statement: bool,
@@ -126,7 +127,10 @@ impl MainWorker {
deno_ffi::init::<Permissions>(unstable),
// Runtime ops
ops::runtime::init(main_module.clone()),
- ops::worker_host::init(options.create_web_worker_cb.clone()),
+ ops::worker_host::init(
+ options.create_web_worker_cb.clone(),
+ options.web_worker_preload_module_cb.clone(),
+ ),
ops::fs_events::init(),
ops::fs::init(),
ops::io::init(),
@@ -367,6 +371,7 @@ mod tests {
root_cert_store: None,
seed: None,
js_error_create_fn: None,
+ web_worker_preload_module_cb: Arc::new(|_| unreachable!()),
create_web_worker_cb: Arc::new(|_| unreachable!()),
maybe_inspector_server: None,
should_break_on_first_statement: false,