diff options
-rw-r--r-- | cli/main.rs | 26 | ||||
-rw-r--r-- | cli/standalone.rs | 4 | ||||
-rw-r--r-- | cli/tests/integration/compat_tests.rs | 5 | ||||
-rw-r--r-- | cli/tests/testdata/compat/worker/worker.mjs | 9 | ||||
-rw-r--r-- | cli/tests/testdata/compat/worker/worker_test.mjs | 18 | ||||
-rw-r--r-- | cli/tests/testdata/compat/worker/worker_test.out | 2 | ||||
-rw-r--r-- | runtime/examples/hello_runtime.rs | 4 | ||||
-rw-r--r-- | runtime/js/99_main.js | 2 | ||||
-rw-r--r-- | runtime/ops/worker_host.rs | 41 | ||||
-rw-r--r-- | runtime/web_worker.rs | 93 | ||||
-rw-r--r-- | runtime/worker.rs | 9 |
11 files changed, 194 insertions, 19 deletions
diff --git a/cli/main.rs b/cli/main.rs index ecadf65bd..acf49bb3f 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -71,6 +71,7 @@ use deno_ast::MediaType; use deno_core::error::generic_error; use deno_core::error::AnyError; use deno_core::futures::future::FutureExt; +use deno_core::futures::future::LocalFutureObj; use deno_core::futures::Future; use deno_core::located_script_name; use deno_core::parking_lot::RwLock; @@ -82,6 +83,7 @@ use deno_core::Extension; use deno_core::ModuleSpecifier; use deno_runtime::colors; use deno_runtime::ops::worker_host::CreateWebWorkerCb; +use deno_runtime::ops::worker_host::PreloadModuleCb; use deno_runtime::permissions::Permissions; use deno_runtime::tokio_util::run_basic; use deno_runtime::web_worker::WebWorker; @@ -100,6 +102,24 @@ use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; +fn create_web_worker_preload_module_callback( + ps: ProcState, +) -> Arc<PreloadModuleCb> { + let compat = ps.flags.compat; + + Arc::new(move |mut worker| { + let fut = async move { + if compat { + worker.execute_side_module(&compat::GLOBAL_URL).await?; + worker.execute_side_module(&compat::MODULE_URL).await?; + } + + Ok(worker) + }; + LocalFutureObj::new(Box::new(fut)) + }) +} + fn create_web_worker_callback(ps: ProcState) -> Arc<CreateWebWorkerCb> { Arc::new(move |args| { let global_state_ = ps.clone(); @@ -116,6 +136,8 @@ fn create_web_worker_callback(ps: ProcState) -> Arc<CreateWebWorkerCb> { args.parent_permissions.clone(), ); let create_web_worker_cb = create_web_worker_callback(ps.clone()); + let preload_module_cb = + create_web_worker_preload_module_callback(ps.clone()); let extensions = ops::cli_exts(ps.clone(), args.use_deno_namespace); @@ -145,6 +167,7 @@ fn create_web_worker_callback(ps: ProcState) -> Arc<CreateWebWorkerCb> { seed: ps.flags.seed, module_loader, create_web_worker_cb, + preload_module_cb, js_error_create_fn: Some(js_error_create_fn), use_deno_namespace: args.use_deno_namespace, worker_type: args.worker_type, @@ -187,6 +210,8 @@ pub fn create_main_worker( let should_break_on_first_statement = ps.flags.inspect_brk.is_some(); let create_web_worker_cb = create_web_worker_callback(ps.clone()); + let web_worker_preload_module_cb = + create_web_worker_preload_module_callback(ps.clone()); let maybe_storage_key = if let Some(location) = &ps.flags.location { // if a location is set, then the ascii serialization of the location is @@ -240,6 +265,7 @@ pub fn create_main_worker( seed: ps.flags.seed, js_error_create_fn: Some(js_error_create_fn), create_web_worker_cb, + web_worker_preload_module_cb, maybe_inspector_server, should_break_on_first_statement, module_loader, diff --git a/cli/standalone.rs b/cli/standalone.rs index e31fa15f7..a50e7d80d 100644 --- a/cli/standalone.rs +++ b/cli/standalone.rs @@ -208,6 +208,9 @@ pub async fn run( let create_web_worker_cb = Arc::new(|_| { todo!("Worker are currently not supported in standalone binaries"); }); + let web_worker_preload_module_cb = Arc::new(|_| { + todo!("Worker are currently not supported in standalone binaries"); + }); // Keep in sync with `main.rs`. v8_set_flags( @@ -257,6 +260,7 @@ pub async fn run( seed: metadata.seed, js_error_create_fn: None, create_web_worker_cb, + web_worker_preload_module_cb, maybe_inspector_server: None, should_break_on_first_statement: false, module_loader, diff --git a/cli/tests/integration/compat_tests.rs b/cli/tests/integration/compat_tests.rs index bafe24af3..189e1eb41 100644 --- a/cli/tests/integration/compat_tests.rs +++ b/cli/tests/integration/compat_tests.rs @@ -90,6 +90,11 @@ itest!(top_level_fail_esm { output: "compat/test_runner/top_level_fail_esm.out", }); +itest!(compat_worker { + args: "run --compat --unstable -A --quiet --no-check compat/worker/worker_test.mjs", + output: "compat/worker/worker_test.out", +}); + #[test] fn globals_in_repl() { let (out, _err) = util::run_and_collect_output_with_args( diff --git a/cli/tests/testdata/compat/worker/worker.mjs b/cli/tests/testdata/compat/worker/worker.mjs new file mode 100644 index 000000000..eb7cfed19 --- /dev/null +++ b/cli/tests/testdata/compat/worker/worker.mjs @@ -0,0 +1,9 @@ +console.log("hello from worker"); + +self.onmessage = (e) => { + if (e.data != "hello") { + throw new Error("wrong message"); + } + + self.postMessage({ pid: process.pid }); +} diff --git a/cli/tests/testdata/compat/worker/worker_test.mjs b/cli/tests/testdata/compat/worker/worker_test.mjs new file mode 100644 index 000000000..215605487 --- /dev/null +++ b/cli/tests/testdata/compat/worker/worker_test.mjs @@ -0,0 +1,18 @@ +import { deferred } from "../../../../../test_util/std/async/deferred.ts"; + +const promise = deferred(); +const url = new URL("./worker.mjs", import.meta.url); +const worker = new Worker(url.href, { type: "module", deno: true }); + +worker.onmessage = (e) => { + const pid = e.data.pid; + if (typeof pid != "number") { + throw new Error("pid is not a number"); + } + console.log("process.pid from worker:", pid); + promise.resolve(); +}; + +worker.postMessage("hello"); +await promise; +worker.terminate(); diff --git a/cli/tests/testdata/compat/worker/worker_test.out b/cli/tests/testdata/compat/worker/worker_test.out new file mode 100644 index 000000000..373841945 --- /dev/null +++ b/cli/tests/testdata/compat/worker/worker_test.out @@ -0,0 +1,2 @@ +hello from worker +process.pid from worker: [WILDCARD] diff --git a/runtime/examples/hello_runtime.rs b/runtime/examples/hello_runtime.rs index 74a9ef398..e74920c34 100644 --- a/runtime/examples/hello_runtime.rs +++ b/runtime/examples/hello_runtime.rs @@ -22,6 +22,9 @@ async fn main() -> Result<(), AnyError> { let create_web_worker_cb = Arc::new(|_| { todo!("Web workers are not supported in the example"); }); + let web_worker_preload_module_cb = Arc::new(|_| { + todo!("Web workers are not supported in the example"); + }); let options = WorkerOptions { bootstrap: BootstrapOptions { @@ -42,6 +45,7 @@ async fn main() -> Result<(), AnyError> { user_agent: "hello_runtime".to_string(), seed: None, js_error_create_fn: None, + web_worker_preload_module_cb, create_web_worker_cb, maybe_inspector_server: None, should_break_on_first_statement: false, diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index 5a4d7e989..fb5de250c 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -676,7 +676,7 @@ delete Object.prototype.__proto__; numCpus = cpuCount; registerErrors(); - pollForMessages(); + globalThis.pollForMessages = pollForMessages; const internalSymbol = Symbol("Deno.internal"); diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs index c241e9a54..1213da6d2 100644 --- a/runtime/ops/worker_host.rs +++ b/runtime/ops/worker_host.rs @@ -12,6 +12,7 @@ use crate::web_worker::WebWorkerType; use crate::web_worker::WorkerControlEvent; use crate::web_worker::WorkerId; use deno_core::error::AnyError; +use deno_core::futures::future::LocalFutureObj; use deno_core::op_async; use deno_core::op_sync; use deno_core::serde::Deserialize; @@ -42,13 +43,24 @@ pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, Sendable + Sync + Send; +pub type PreloadModuleCb = dyn Fn(WebWorker) -> LocalFutureObj<'static, Result<WebWorker, AnyError>> + + Sync + + Send; + /// A holder for callback that is used to create a new /// WebWorker. It's a struct instead of a type alias /// because `GothamState` used in `OpState` overrides -/// value if type alises have the same underlying type +/// value if type aliases have the same underlying type #[derive(Clone)] pub struct CreateWebWorkerCbHolder(Arc<CreateWebWorkerCb>); +/// A holder for callback that can used to preload some modules into a WebWorker +/// before actual worker code is executed. It's a struct instead of a type +/// because `GothamState` used in `OpState` overrides +/// value if type aliases have the same underlying type +#[derive(Clone)] +pub struct PreloadModuleCbHolder(Arc<PreloadModuleCb>); + pub struct WorkerThread { // It's an Option so we can take the value before dropping the WorkerThread. join_handle: Option<JoinHandle<Result<(), AnyError>>>, @@ -91,15 +103,21 @@ impl Drop for WorkerThread { pub type WorkersTable = HashMap<WorkerId, WorkerThread>; -pub fn init(create_web_worker_cb: Arc<CreateWebWorkerCb>) -> Extension { +pub fn init( + create_web_worker_cb: Arc<CreateWebWorkerCb>, + preload_module_cb: Arc<PreloadModuleCb>, +) -> Extension { Extension::builder() .state(move |state| { state.put::<WorkersTable>(WorkersTable::default()); state.put::<WorkerId>(WorkerId::default()); - let create_module_loader = + let create_web_worker_cb_holder = CreateWebWorkerCbHolder(create_web_worker_cb.clone()); - state.put::<CreateWebWorkerCbHolder>(create_module_loader); + state.put::<CreateWebWorkerCbHolder>(create_web_worker_cb_holder); + let preload_module_cb_holder = + PreloadModuleCbHolder(preload_module_cb.clone()); + state.put::<PreloadModuleCbHolder>(preload_module_cb_holder); Ok(()) }) @@ -174,8 +192,10 @@ fn op_create_worker( // have access to `exit_code` but the child does? let maybe_exit_code = state.try_borrow::<Arc<AtomicI32>>().cloned(); let worker_id = state.take::<WorkerId>(); - let create_module_loader = state.take::<CreateWebWorkerCbHolder>(); - state.put::<CreateWebWorkerCbHolder>(create_module_loader.clone()); + let create_web_worker_cb = state.take::<CreateWebWorkerCbHolder>(); + state.put::<CreateWebWorkerCbHolder>(create_web_worker_cb.clone()); + let preload_module_cb = state.take::<PreloadModuleCbHolder>(); + state.put::<PreloadModuleCbHolder>(preload_module_cb.clone()); state.put::<WorkerId>(worker_id.next().unwrap()); let module_specifier = deno_core::resolve_url(&specifier)?; @@ -197,7 +217,7 @@ fn op_create_worker( // - newly spawned thread exits let (worker, external_handle) = - (create_module_loader.0)(CreateWebWorkerArgs { + (create_web_worker_cb.0)(CreateWebWorkerArgs { name: worker_name, worker_id, parent_permissions, @@ -216,7 +236,12 @@ fn op_create_worker( // is using `worker.internal_channels`. // // Host can already push messages and interact with worker. - run_web_worker(worker, module_specifier, maybe_source_code) + run_web_worker( + worker, + module_specifier, + maybe_source_code, + preload_module_cb.0, + ) })?; // Receive WebWorkerHandle from newly created worker diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 130b13dc0..8cbbb5d4f 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -304,6 +304,7 @@ pub struct WebWorker { pub use_deno_namespace: bool, pub worker_type: WebWorkerType, pub main_module: ModuleSpecifier, + poll_for_messages_fn: Option<v8::Global<v8::Value>>, } pub struct WebWorkerOptions { @@ -315,6 +316,7 @@ pub struct WebWorkerOptions { pub seed: Option<u64>, pub module_loader: Rc<dyn ModuleLoader>, pub create_web_worker_cb: Arc<ops::worker_host::CreateWebWorkerCb>, + pub preload_module_cb: Arc<ops::worker_host::PreloadModuleCb>, pub js_error_create_fn: Option<Rc<JsErrorCreateFn>>, pub use_deno_namespace: bool, pub worker_type: WebWorkerType, @@ -395,7 +397,10 @@ impl WebWorker { let runtime_exts = vec![ ops::web_worker::init(), ops::runtime::init(main_module.clone()), - ops::worker_host::init(options.create_web_worker_cb.clone()), + ops::worker_host::init( + options.create_web_worker_cb.clone(), + options.preload_module_cb.clone(), + ), ops::io::init(), ]; @@ -468,6 +473,7 @@ impl WebWorker { use_deno_namespace: options.use_deno_namespace, worker_type: options.worker_type, main_module, + poll_for_messages_fn: None, }, external_handle, ) @@ -486,6 +492,18 @@ impl WebWorker { self .execute_script(&located_script_name!(), &script) .expect("Failed to execute worker bootstrap script"); + // Save a reference to function that will start polling for messages + // from a worker host; it will be called after the user code is loaded. + let script = r#" + const pollForMessages = globalThis.pollForMessages; + delete globalThis.pollForMessages; + pollForMessages + "#; + let poll_for_messages_fn = self + .js_runtime + .execute_script(&located_script_name!(), script) + .expect("Failed to execute worker bootstrap script"); + self.poll_for_messages_fn = Some(poll_for_messages_fn); } /// See [JsRuntime::execute_script](deno_core::JsRuntime::execute_script) @@ -519,11 +537,36 @@ impl WebWorker { } /// Loads, instantiates and executes specified JavaScript module. - pub async fn execute_main_module( + /// + /// This method assumes that worker can't be terminated when executing + /// side module code. + pub async fn execute_side_module( &mut self, module_specifier: &ModuleSpecifier, ) -> Result<(), AnyError> { - let id = self.preload_module(module_specifier, true).await?; + let id = self.preload_module(module_specifier, false).await?; + let mut receiver = self.js_runtime.mod_evaluate(id); + tokio::select! { + maybe_result = &mut receiver => { + debug!("received module evaluate {:#?}", maybe_result); + maybe_result.expect("Module evaluation result not provided.") + } + + event_loop_result = self.js_runtime.run_event_loop(false) => { + event_loop_result?; + let maybe_result = receiver.await; + maybe_result.expect("Module evaluation result not provided.") + } + } + } + + /// Loads, instantiates and executes specified JavaScript module. + /// + /// This module will have "import.meta.main" equal to true. + pub async fn execute_main_module( + &mut self, + id: ModuleId, + ) -> Result<(), AnyError> { let mut receiver = self.js_runtime.mod_evaluate(id); tokio::select! { maybe_result = &mut receiver => { @@ -582,6 +625,17 @@ impl WebWorker { ) -> Result<(), AnyError> { poll_fn(|cx| self.poll_event_loop(cx, wait_for_inspector)).await } + + // Starts polling for messages from worker host from JavaScript. + fn start_polling_for_messages(&mut self) { + let poll_for_messages_fn = self.poll_for_messages_fn.take().unwrap(); + let scope = &mut self.js_runtime.handle_scope(); + let poll_for_messages = + v8::Local::<v8::Value>::new(scope, poll_for_messages_fn); + let fn_ = v8::Local::<v8::Function>::try_from(poll_for_messages).unwrap(); + let undefined = v8::undefined(scope); + fn_.call(scope, undefined.into(), &[]).unwrap(); + } } fn print_worker_error(error_str: String, name: &str) { @@ -596,9 +650,10 @@ fn print_worker_error(error_str: String, name: &str) { /// This function should be called from a thread dedicated to this worker. // TODO(bartlomieju): check if order of actions is aligned to Worker spec pub fn run_web_worker( - mut worker: WebWorker, + worker: WebWorker, specifier: ModuleSpecifier, maybe_source_code: Option<String>, + preload_module_cb: Arc<ops::worker_host::PreloadModuleCb>, ) -> Result<(), AnyError> { let name = worker.name.to_string(); @@ -606,17 +661,39 @@ pub fn run_web_worker( // with terminate let fut = async move { + let internal_handle = worker.internal_handle.clone(); + let result = (preload_module_cb)(worker).await; + + let mut worker = match result { + Ok(worker) => worker, + Err(e) => { + print_worker_error(e.to_string(), &name); + internal_handle + .post_event(WorkerControlEvent::TerminalError(e)) + .expect("Failed to post message to host"); + + // Failure to execute script is a terminal error, bye, bye. + return Ok(()); + } + }; + // Execute provided source code immediately let result = if let Some(source_code) = maybe_source_code { - worker.execute_script(&located_script_name!(), &source_code) + let r = worker.execute_script(&located_script_name!(), &source_code); + worker.start_polling_for_messages(); + r } else { // TODO(bartlomieju): add "type": "classic", ie. ability to load // script instead of module - worker.execute_main_module(&specifier).await + match worker.preload_module(&specifier, true).await { + Ok(id) => { + worker.start_polling_for_messages(); + worker.execute_main_module(id).await + } + Err(e) => Err(e), + } }; - let internal_handle = worker.internal_handle.clone(); - // If sender is closed it means that worker has already been closed from // within using "globalThis.close()" if internal_handle.is_terminated() { diff --git a/runtime/worker.rs b/runtime/worker.rs index 7a3a6c1c3..1dc9504d6 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -51,8 +51,9 @@ pub struct WorkerOptions { pub user_agent: String, pub seed: Option<u64>, pub module_loader: Rc<dyn ModuleLoader>, - // Callback invoked when creating new instance of WebWorker + // Callbacks invoked when creating new instance of WebWorker pub create_web_worker_cb: Arc<ops::worker_host::CreateWebWorkerCb>, + pub web_worker_preload_module_cb: Arc<ops::worker_host::PreloadModuleCb>, pub js_error_create_fn: Option<Rc<JsErrorCreateFn>>, pub maybe_inspector_server: Option<Arc<InspectorServer>>, pub should_break_on_first_statement: bool, @@ -126,7 +127,10 @@ impl MainWorker { deno_ffi::init::<Permissions>(unstable), // Runtime ops ops::runtime::init(main_module.clone()), - ops::worker_host::init(options.create_web_worker_cb.clone()), + ops::worker_host::init( + options.create_web_worker_cb.clone(), + options.web_worker_preload_module_cb.clone(), + ), ops::fs_events::init(), ops::fs::init(), ops::io::init(), @@ -367,6 +371,7 @@ mod tests { root_cert_store: None, seed: None, js_error_create_fn: None, + web_worker_preload_module_cb: Arc::new(|_| unreachable!()), create_web_worker_cb: Arc::new(|_| unreachable!()), maybe_inspector_server: None, should_break_on_first_statement: false, |