diff options
Diffstat (limited to 'cli/worker.rs')
-rw-r--r-- | cli/worker.rs | 426 |
1 files changed, 377 insertions, 49 deletions
diff --git a/cli/worker.rs b/cli/worker.rs index 2fc02c6ee..f6c518d0c 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -7,10 +7,6 @@ use crate::js; use crate::metrics::Metrics; use crate::ops; use crate::ops::io::get_stdio; -use crate::ops::timers::GlobalTimer; -use crate::ops::timers::StartTime; -use crate::ops::worker_host::WorkerId; -use crate::ops::worker_host::WorkersTable; use crate::permissions::Permissions; use crate::state::CliModuleLoader; use deno_core::error::AnyError; @@ -19,20 +15,20 @@ use deno_core::futures::future::FutureExt; use deno_core::futures::stream::StreamExt; use deno_core::futures::task::AtomicWaker; use deno_core::url::Url; +use deno_core::v8; use deno_core::JsRuntime; use deno_core::ModuleId; use deno_core::ModuleSpecifier; use deno_core::RuntimeOptions; use deno_core::Snapshot; -use deno_fetch::reqwest; -use rand::rngs::StdRng; -use rand::SeedableRng; use std::env; use std::future::Future; use std::ops::Deref; use std::ops::DerefMut; use std::pin::Pin; use std::rc::Rc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::Context; use std::task::Poll; @@ -111,18 +107,16 @@ pub struct Worker { impl Worker { pub fn new( name: String, - startup_snapshot: Option<Snapshot>, - permissions: Permissions, - main_module: ModuleSpecifier, + startup_snapshot: Snapshot, global_state: Arc<GlobalState>, - state: Rc<CliModuleLoader>, + module_loader: Rc<CliModuleLoader>, is_main: bool, ) -> Self { let global_state_ = global_state.clone(); let mut isolate = JsRuntime::new(RuntimeOptions { - module_loader: Some(state), - startup_snapshot, + module_loader: Some(module_loader), + startup_snapshot: Some(startup_snapshot), js_error_create_fn: Some(Box::new(move |core_js_error| { JsError::create(core_js_error, &global_state_.ts_compiler) })), @@ -132,25 +126,6 @@ impl Worker { let op_state = isolate.op_state(); let mut op_state = op_state.borrow_mut(); op_state.get_error_class_fn = &crate::errors::get_error_class_name; - - op_state.put::<GlobalTimer>(GlobalTimer::default()); - op_state.put::<StartTime>(StartTime::now()); - op_state.put::<Metrics>(Default::default()); - op_state.put::<WorkersTable>(WorkersTable::default()); - op_state.put::<WorkerId>(WorkerId::default()); - op_state.put::<Permissions>(permissions); - op_state.put::<ModuleSpecifier>(main_module); - op_state.put::<Arc<GlobalState>>(global_state.clone()); - - op_state.put::<reqwest::Client>({ - let ca_file = global_state.flags.ca_file.as_deref(); - crate::http_util::create_http_client(ca_file).unwrap() - }); - - if let Some(seed) = global_state.flags.seed { - let rng = StdRng::seed_from_u64(seed); - op_state.put::<StdRng>(rng); - } } let inspector = @@ -296,41 +271,48 @@ impl MainWorker { let loader = CliModuleLoader::new(global_state.maybe_import_map.clone()); let mut worker = Worker::new( "main".to_string(), - Some(js::deno_isolate_init()), - global_state.permissions.clone(), - main_module, + js::deno_isolate_init(), global_state.clone(), loader, true, ); { - ops::runtime::init(&mut worker); - ops::runtime_compiler::init(&mut worker); - ops::errors::init(&mut worker); - ops::fetch::init(&mut worker); - ops::websocket::init(&mut worker); - ops::fs::init(&mut worker); - ops::fs_events::init(&mut worker); + // All ops registered in this function depend on these + { + let op_state = worker.op_state(); + let mut op_state = op_state.borrow_mut(); + op_state.put::<Metrics>(Default::default()); + op_state.put::<Arc<GlobalState>>(global_state.clone()); + op_state.put::<Permissions>(global_state.permissions.clone()); + } + + ops::runtime::init(&mut worker, main_module); + ops::fetch::init(&mut worker, global_state.flags.ca_file.as_deref()); + ops::timers::init(&mut worker); + ops::worker_host::init(&mut worker); + ops::random::init(&mut worker, global_state.flags.seed); + ops::reg_json_sync(&mut worker, "op_close", deno_core::op_close); + ops::reg_json_sync(&mut worker, "op_resources", deno_core::op_resources); ops::reg_json_sync( &mut worker, "op_domain_to_ascii", deno_web::op_domain_to_ascii, ); + ops::errors::init(&mut worker); + ops::fs_events::init(&mut worker); + ops::fs::init(&mut worker); ops::io::init(&mut worker); - ops::plugin::init(&mut worker); ops::net::init(&mut worker); - ops::tls::init(&mut worker); ops::os::init(&mut worker); ops::permissions::init(&mut worker); + ops::plugin::init(&mut worker); ops::process::init(&mut worker); - ops::random::init(&mut worker); ops::repl::init(&mut worker); - ops::reg_json_sync(&mut worker, "op_close", deno_core::op_close); - ops::reg_json_sync(&mut worker, "op_resources", deno_core::op_resources); + ops::runtime_compiler::init(&mut worker); ops::signal::init(&mut worker); - ops::timers::init(&mut worker); + ops::tls::init(&mut worker); ops::tty::init(&mut worker); - ops::worker_host::init(&mut worker); + ops::websocket::init(&mut worker); } { let op_state = worker.op_state(); @@ -367,12 +349,257 @@ impl DerefMut for MainWorker { } } +/// Wrapper for `WorkerHandle` that adds functionality +/// for terminating workers. +/// +/// This struct is used by host as well as worker itself. +/// +/// Host uses it to communicate with worker and terminate it, +/// while worker uses it only to finish execution on `self.close()`. +#[derive(Clone)] +pub struct WebWorkerHandle { + worker_handle: WorkerHandle, + terminate_tx: mpsc::Sender<()>, + terminated: Arc<AtomicBool>, + isolate_handle: v8::IsolateHandle, +} + +impl Deref for WebWorkerHandle { + type Target = WorkerHandle; + fn deref(&self) -> &Self::Target { + &self.worker_handle + } +} + +impl DerefMut for WebWorkerHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.worker_handle + } +} + +impl WebWorkerHandle { + pub fn terminate(&self) { + // This function can be called multiple times by whomever holds + // the handle. However only a single "termination" should occur so + // we need a guard here. + let already_terminated = self.terminated.swap(true, Ordering::Relaxed); + + if !already_terminated { + self.isolate_handle.terminate_execution(); + let mut sender = self.terminate_tx.clone(); + // This call should be infallible hence the `expect`. + // This might change in the future. + sender.try_send(()).expect("Failed to terminate"); + } + } +} + +/// This worker is implementation of `Worker` Web API +/// +/// At the moment this type of worker supports only +/// communication with parent and creating new workers. +/// +/// Each `WebWorker` is either a child of `MainWorker` or other +/// `WebWorker`. +pub struct WebWorker { + worker: Worker, + event_loop_idle: bool, + terminate_rx: mpsc::Receiver<()>, + handle: WebWorkerHandle, + pub has_deno_namespace: bool, +} + +impl WebWorker { + pub fn new( + name: String, + permissions: Permissions, + main_module: ModuleSpecifier, + global_state: Arc<GlobalState>, + has_deno_namespace: bool, + ) -> Self { + let loader = CliModuleLoader::new_for_worker(); + let mut worker = Worker::new( + name, + js::deno_isolate_init(), + global_state.clone(), + loader, + false, + ); + + let terminated = Arc::new(AtomicBool::new(false)); + let isolate_handle = worker.isolate.thread_safe_handle(); + let (terminate_tx, terminate_rx) = mpsc::channel::<()>(1); + + let handle = WebWorkerHandle { + worker_handle: worker.thread_safe_handle(), + terminated, + isolate_handle, + terminate_tx, + }; + + let mut web_worker = Self { + worker, + event_loop_idle: false, + terminate_rx, + handle, + has_deno_namespace, + }; + + { + let handle = web_worker.thread_safe_handle(); + let sender = web_worker.worker.internal_channels.sender.clone(); + + // All ops registered in this function depend on these + { + let op_state = web_worker.op_state(); + let mut op_state = op_state.borrow_mut(); + op_state.put::<Metrics>(Default::default()); + op_state.put::<Arc<GlobalState>>(global_state.clone()); + op_state.put::<Permissions>(permissions); + } + + ops::web_worker::init(&mut web_worker, sender, handle); + ops::runtime::init(&mut web_worker, main_module); + ops::fetch::init(&mut web_worker, global_state.flags.ca_file.as_deref()); + ops::timers::init(&mut web_worker); + ops::worker_host::init(&mut web_worker); + ops::reg_json_sync(&mut web_worker, "op_close", deno_core::op_close); + ops::reg_json_sync( + &mut web_worker, + "op_resources", + deno_core::op_resources, + ); + ops::reg_json_sync( + &mut web_worker, + "op_domain_to_ascii", + deno_web::op_domain_to_ascii, + ); + ops::errors::init(&mut web_worker); + ops::io::init(&mut web_worker); + ops::websocket::init(&mut web_worker); + + if has_deno_namespace { + ops::fs_events::init(&mut web_worker); + ops::fs::init(&mut web_worker); + ops::net::init(&mut web_worker); + ops::os::init(&mut web_worker); + ops::permissions::init(&mut web_worker); + ops::plugin::init(&mut web_worker); + ops::process::init(&mut web_worker); + ops::random::init(&mut web_worker, global_state.flags.seed); + ops::runtime_compiler::init(&mut web_worker); + ops::signal::init(&mut web_worker); + ops::tls::init(&mut web_worker); + ops::tty::init(&mut web_worker); + } + } + + web_worker + } +} + +impl WebWorker { + /// Returns a way to communicate with the Worker from other threads. + pub fn thread_safe_handle(&self) -> WebWorkerHandle { + self.handle.clone() + } +} + +impl Deref for WebWorker { + type Target = Worker; + fn deref(&self) -> &Self::Target { + &self.worker + } +} + +impl DerefMut for WebWorker { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.worker + } +} + +impl Future for WebWorker { + type Output = Result<(), AnyError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + let worker = &mut inner.worker; + + let terminated = inner.handle.terminated.load(Ordering::Relaxed); + + if terminated { + return Poll::Ready(Ok(())); + } + + if !inner.event_loop_idle { + match worker.poll_unpin(cx) { + Poll::Ready(r) => { + let terminated = inner.handle.terminated.load(Ordering::Relaxed); + if terminated { + return Poll::Ready(Ok(())); + } + + if let Err(e) = r { + let mut sender = worker.internal_channels.sender.clone(); + sender + .try_send(WorkerEvent::Error(e)) + .expect("Failed to post message to host"); + } + inner.event_loop_idle = true; + } + Poll::Pending => {} + } + } + + if let Poll::Ready(r) = inner.terminate_rx.poll_next_unpin(cx) { + // terminate_rx should never be closed + assert!(r.is_some()); + return Poll::Ready(Ok(())); + } + + if let Poll::Ready(r) = + worker.internal_channels.receiver.poll_next_unpin(cx) + { + match r { + Some(msg) => { + let msg = String::from_utf8(msg.to_vec()).unwrap(); + let script = format!("workerMessageRecvCallback({})", msg); + + if let Err(e) = worker.execute(&script) { + // If execution was terminated during message callback then + // just ignore it + if inner.handle.terminated.load(Ordering::Relaxed) { + return Poll::Ready(Ok(())); + } + + // Otherwise forward error to host + let mut sender = worker.internal_channels.sender.clone(); + sender + .try_send(WorkerEvent::Error(e)) + .expect("Failed to post message to host"); + } + + // Let event loop be polled again + inner.event_loop_idle = false; + worker.waker.wake(); + } + None => unreachable!(), + } + } + + Poll::Pending + } +} + #[cfg(test)] mod tests { use super::*; use crate::flags::DenoSubcommand; use crate::flags::Flags; use crate::global_state::GlobalState; + use crate::tokio_util; + use crate::worker::WorkerEvent; + use deno_core::serde_json::json; fn create_test_worker() -> MainWorker { let main_module = @@ -466,4 +693,105 @@ mod tests { let result = worker.execute_module(&module_specifier).await; assert!(result.is_ok()); } + + fn create_test_web_worker() -> WebWorker { + let main_module = + ModuleSpecifier::resolve_url_or_path("./hello.js").unwrap(); + let global_state = GlobalState::mock(vec!["deno".to_string()], None); + let mut worker = WebWorker::new( + "TEST".to_string(), + Permissions::allow_all(), + main_module, + global_state, + false, + ); + worker + .execute("bootstrap.workerRuntime(\"TEST\", false)") + .unwrap(); + worker + } + #[tokio::test] + async fn test_worker_messages() { + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::<WebWorkerHandle>(1); + + let join_handle = std::thread::spawn(move || { + let mut worker = create_test_web_worker(); + let source = r#" + onmessage = function(e) { + console.log("msg from main script", e.data); + if (e.data == "exit") { + return close(); + } else { + console.assert(e.data === "hi"); + } + postMessage([1, 2, 3]); + console.log("after postMessage"); + } + "#; + worker.execute(source).unwrap(); + let handle = worker.thread_safe_handle(); + handle_sender.send(handle).unwrap(); + let r = tokio_util::run_basic(worker); + assert!(r.is_ok()) + }); + + let mut handle = handle_receiver.recv().unwrap(); + + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let r = handle.post_message(msg.clone()); + assert!(r.is_ok()); + + let maybe_msg = handle.get_event().await.unwrap(); + assert!(maybe_msg.is_some()); + + let r = handle.post_message(msg.clone()); + assert!(r.is_ok()); + + let maybe_msg = handle.get_event().await.unwrap(); + assert!(maybe_msg.is_some()); + match maybe_msg { + Some(WorkerEvent::Message(buf)) => { + assert_eq!(*buf, *b"[1,2,3]"); + } + _ => unreachable!(), + } + + let msg = json!("exit") + .to_string() + .into_boxed_str() + .into_boxed_bytes(); + let r = handle.post_message(msg); + assert!(r.is_ok()); + let event = handle.get_event().await.unwrap(); + assert!(event.is_none()); + handle.sender.close_channel(); + join_handle.join().expect("Failed to join worker thread"); + } + + #[tokio::test] + async fn removed_from_resource_table_on_close() { + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::<WebWorkerHandle>(1); + + let join_handle = std::thread::spawn(move || { + let mut worker = create_test_web_worker(); + worker.execute("onmessage = () => { close(); }").unwrap(); + let handle = worker.thread_safe_handle(); + handle_sender.send(handle).unwrap(); + let r = tokio_util::run_basic(worker); + assert!(r.is_ok()) + }); + + let mut handle = handle_receiver.recv().unwrap(); + + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let r = handle.post_message(msg.clone()); + assert!(r.is_ok()); + let event = handle.get_event().await.unwrap(); + assert!(event.is_none()); + handle.sender.close_channel(); + + join_handle.join().expect("Failed to join worker thread"); + } } |