diff options
Diffstat (limited to 'cli/worker.rs')
-rw-r--r-- | cli/worker.rs | 601 |
1 files changed, 60 insertions, 541 deletions
diff --git a/cli/worker.rs b/cli/worker.rs index 74bee1d03..f4a919df6 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -1,6 +1,5 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use crate::colors; use crate::fmt_errors::PrettyJsError; use crate::inspector::DenoInspector; use crate::inspector::InspectorSession; @@ -13,109 +12,39 @@ use crate::permissions::Permissions; use crate::program_state::ProgramState; use crate::source_maps::apply_source_map; use deno_core::error::AnyError; -use deno_core::futures::channel::mpsc; use deno_core::futures::future::poll_fn; use deno_core::futures::future::FutureExt; -use deno_core::futures::stream::StreamExt; -use deno_core::futures::task::AtomicWaker; use deno_core::url::Url; -use deno_core::v8; use deno_core::JsRuntime; use deno_core::ModuleId; use deno_core::ModuleSpecifier; use deno_core::RuntimeOptions; -use deno_core::Snapshot; use std::env; -use std::ops::Deref; -use std::ops::DerefMut; -use std::rc::Rc; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::Context; use std::task::Poll; -use tokio::sync::Mutex as AsyncMutex; -/// Events that are sent to host from child -/// worker. -pub enum WorkerEvent { - Message(Box<[u8]>), - Error(AnyError), - TerminalError(AnyError), -} - -pub struct WorkerChannelsInternal { - pub sender: mpsc::Sender<WorkerEvent>, - pub receiver: mpsc::Receiver<Box<[u8]>>, -} - -#[derive(Clone)] -pub struct WorkerHandle { - pub sender: mpsc::Sender<Box<[u8]>>, - pub receiver: Arc<AsyncMutex<mpsc::Receiver<WorkerEvent>>>, -} - -impl WorkerHandle { - /// Post message to worker as a host. - pub fn post_message(&self, buf: Box<[u8]>) -> Result<(), AnyError> { - let mut sender = self.sender.clone(); - sender.try_send(buf)?; - Ok(()) - } - - /// Get the event with lock. - /// Return error if more than one listener tries to get event - pub async fn get_event(&self) -> Result<Option<WorkerEvent>, AnyError> { - let mut receiver = self.receiver.try_lock()?; - Ok(receiver.next().await) - } -} - -fn create_channels() -> (WorkerChannelsInternal, WorkerHandle) { - let (in_tx, in_rx) = mpsc::channel::<Box<[u8]>>(1); - let (out_tx, out_rx) = mpsc::channel::<WorkerEvent>(1); - let internal_channels = WorkerChannelsInternal { - sender: out_tx, - receiver: in_rx, - }; - let external_channels = WorkerHandle { - sender: in_tx, - receiver: Arc::new(AsyncMutex::new(out_rx)), - }; - (internal_channels, external_channels) -} - -/// Worker is a CLI wrapper for `deno_core::Isolate`. -/// -/// It provides infrastructure to communicate with a worker and -/// consequently between workers. +/// This worker is created and used by almost all +/// subcommands in Deno executable. /// -/// This struct is meant to be used as a base struct for concrete -/// type of worker that registers set of ops. +/// It provides ops available in the `Deno` namespace. /// -/// Currently there are two types of workers: -/// - `MainWorker` -/// - `WebWorker` -pub struct Worker { - external_channels: WorkerHandle, +/// All `WebWorker`s created during program execution +/// are descendants of this worker. +pub struct MainWorker { inspector: Option<Box<DenoInspector>>, - // Following fields are pub because they are accessed - // when creating a new WebWorker instance. - pub(crate) internal_channels: WorkerChannelsInternal, - pub(crate) js_runtime: JsRuntime, - pub(crate) name: String, + js_runtime: JsRuntime, should_break_on_first_statement: bool, - waker: AtomicWaker, } -impl Worker { +impl MainWorker { pub fn new( - name: String, - startup_snapshot: Snapshot, - program_state: Arc<ProgramState>, - module_loader: Rc<CliModuleLoader>, - is_main: bool, + program_state: &Arc<ProgramState>, + main_module: ModuleSpecifier, + permissions: Permissions, ) -> Self { + let module_loader = + CliModuleLoader::new(program_state.maybe_import_map.clone()); let global_state_ = program_state.clone(); let js_error_create_fn = Box::new(move |core_js_error| { @@ -126,7 +55,7 @@ impl Worker { let mut js_runtime = JsRuntime::new(RuntimeOptions { module_loader: Some(module_loader), - startup_snapshot: Some(startup_snapshot), + startup_snapshot: Some(js::deno_isolate_init()), js_error_create_fn: Some(js_error_create_fn), get_error_class_fn: Some(&crate::errors::get_error_class_name), ..Default::default() @@ -144,126 +73,15 @@ impl Worker { None }; - let should_break_on_first_statement = inspector.is_some() - && is_main - && program_state.flags.inspect_brk.is_some(); - - let (internal_channels, external_channels) = create_channels(); + let should_break_on_first_statement = + inspector.is_some() && program_state.flags.inspect_brk.is_some(); - Self { - external_channels, + let mut worker = Self { inspector, - internal_channels, js_runtime, - name, should_break_on_first_statement, - waker: AtomicWaker::new(), - } - } - - /// Same as execute2() but the filename defaults to "$CWD/__anonymous__". - pub fn execute(&mut self, js_source: &str) -> Result<(), AnyError> { - let path = env::current_dir().unwrap().join("__anonymous__"); - let url = Url::from_file_path(path).unwrap(); - self.execute2(url.as_str(), js_source) - } - - /// Executes the provided JavaScript source code. The js_filename argument is - /// provided only for debugging purposes. - pub fn execute2( - &mut self, - js_filename: &str, - js_source: &str, - ) -> Result<(), AnyError> { - self.js_runtime.execute(js_filename, js_source) - } - - /// Loads and instantiates specified JavaScript module. - pub async fn preload_module( - &mut self, - module_specifier: &ModuleSpecifier, - ) -> Result<ModuleId, AnyError> { - self.js_runtime.load_module(module_specifier, None).await - } - - /// Loads, instantiates and executes specified JavaScript module. - pub async fn execute_module( - &mut self, - module_specifier: &ModuleSpecifier, - ) -> Result<(), AnyError> { - let id = self.preload_module(module_specifier).await?; - self.wait_for_inspector_session(); - self.js_runtime.mod_evaluate(id).await - } - - /// Returns a way to communicate with the Worker from other threads. - pub fn thread_safe_handle(&self) -> WorkerHandle { - self.external_channels.clone() - } - - fn wait_for_inspector_session(&mut self) { - if self.should_break_on_first_statement { - self - .inspector - .as_mut() - .unwrap() - .wait_for_session_and_break_on_next_statement() - } - } - - /// Create new inspector session. This function panics if Worker - /// was not configured to create inspector. - pub fn create_inspector_session(&mut self) -> Box<InspectorSession> { - let inspector = self.inspector.as_mut().unwrap(); - - InspectorSession::new(&mut **inspector) - } - - pub fn poll_event_loop( - &mut self, - cx: &mut Context, - ) -> Poll<Result<(), AnyError>> { - // We always poll the inspector if it exists. - let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx)); - self.waker.register(cx.waker()); - self.js_runtime.poll_event_loop(cx) - } - - pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { - poll_fn(|cx| self.poll_event_loop(cx)).await - } -} - -impl Drop for Worker { - fn drop(&mut self) { - // The Isolate object must outlive the Inspector object, but this is - // currently not enforced by the type system. - self.inspector.take(); - } -} - -/// This worker is created and used by Deno executable. -/// -/// It provides ops available in the `Deno` namespace. -/// -/// All WebWorkers created during program execution are descendants of -/// this worker. -pub struct MainWorker(Worker); + }; -impl MainWorker { - pub fn new( - program_state: &Arc<ProgramState>, - main_module: ModuleSpecifier, - permissions: Permissions, - ) -> Self { - let loader = CliModuleLoader::new(program_state.maybe_import_map.clone()); - let mut worker = Worker::new( - "main".to_string(), - js::deno_isolate_init(), - program_state.clone(), - loader, - true, - ); let js_runtime = &mut worker.js_runtime; { // All ops registered in this function depend on these @@ -320,266 +138,71 @@ impl MainWorker { worker .execute("bootstrap.mainRuntime()") .expect("Failed to execute bootstrap script"); - Self(worker) - } -} - -impl Deref for MainWorker { - type Target = Worker; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for MainWorker { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 + worker } -} - -/// Wrapper for `WorkerHandle` that adds functionality -/// for terminating workers. -/// -/// This struct is used by host as well as worker itself. -/// -/// Host uses it to communicate with worker and terminate it, -/// while worker uses it only to finish execution on `self.close()`. -#[derive(Clone)] -pub struct WebWorkerHandle { - worker_handle: WorkerHandle, - terminate_tx: mpsc::Sender<()>, - terminated: Arc<AtomicBool>, - isolate_handle: v8::IsolateHandle, -} -impl Deref for WebWorkerHandle { - type Target = WorkerHandle; - fn deref(&self) -> &Self::Target { - &self.worker_handle + /// Same as execute2() but the filename defaults to "$CWD/__anonymous__". + pub fn execute(&mut self, js_source: &str) -> Result<(), AnyError> { + let path = env::current_dir().unwrap().join("__anonymous__"); + let url = Url::from_file_path(path).unwrap(); + self.js_runtime.execute(url.as_str(), js_source) } -} -impl DerefMut for WebWorkerHandle { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.worker_handle + /// Loads and instantiates specified JavaScript module. + pub async fn preload_module( + &mut self, + module_specifier: &ModuleSpecifier, + ) -> Result<ModuleId, AnyError> { + self.js_runtime.load_module(module_specifier, None).await } -} - -impl WebWorkerHandle { - pub fn terminate(&self) { - // This function can be called multiple times by whomever holds - // the handle. However only a single "termination" should occur so - // we need a guard here. - let already_terminated = self.terminated.swap(true, Ordering::Relaxed); - if !already_terminated { - self.isolate_handle.terminate_execution(); - let mut sender = self.terminate_tx.clone(); - // This call should be infallible hence the `expect`. - // This might change in the future. - sender.try_send(()).expect("Failed to terminate"); - } + /// Loads, instantiates and executes specified JavaScript module. + pub async fn execute_module( + &mut self, + module_specifier: &ModuleSpecifier, + ) -> Result<(), AnyError> { + let id = self.preload_module(module_specifier).await?; + self.wait_for_inspector_session(); + self.js_runtime.mod_evaluate(id).await } -} - -/// This worker is implementation of `Worker` Web API -/// -/// At the moment this type of worker supports only -/// communication with parent and creating new workers. -/// -/// Each `WebWorker` is either a child of `MainWorker` or other -/// `WebWorker`. -pub struct WebWorker { - worker: Worker, - event_loop_idle: bool, - terminate_rx: mpsc::Receiver<()>, - handle: WebWorkerHandle, - pub has_deno_namespace: bool, -} - -impl WebWorker { - pub fn new( - name: String, - permissions: Permissions, - main_module: ModuleSpecifier, - program_state: Arc<ProgramState>, - has_deno_namespace: bool, - ) -> Self { - let loader = CliModuleLoader::new_for_worker(); - let mut worker = Worker::new( - name, - js::deno_isolate_init(), - program_state.clone(), - loader, - false, - ); - - let terminated = Arc::new(AtomicBool::new(false)); - let isolate_handle = worker.js_runtime.v8_isolate().thread_safe_handle(); - let (terminate_tx, terminate_rx) = mpsc::channel::<()>(1); - - let handle = WebWorkerHandle { - worker_handle: worker.thread_safe_handle(), - terminated, - isolate_handle, - terminate_tx, - }; - - let mut web_worker = Self { - worker, - event_loop_idle: false, - terminate_rx, - handle, - has_deno_namespace, - }; - - { - let handle = web_worker.thread_safe_handle(); - let sender = web_worker.worker.internal_channels.sender.clone(); - let js_runtime = &mut web_worker.js_runtime; - // All ops registered in this function depend on these - { - let op_state = js_runtime.op_state(); - let mut op_state = op_state.borrow_mut(); - op_state.put::<Metrics>(Default::default()); - op_state.put::<Arc<ProgramState>>(program_state.clone()); - op_state.put::<Permissions>(permissions); - } - ops::web_worker::init(js_runtime, sender.clone(), handle); - ops::runtime::init(js_runtime, main_module); - ops::fetch::init(js_runtime, program_state.flags.ca_file.as_deref()); - ops::timers::init(js_runtime); - ops::worker_host::init(js_runtime, Some(sender)); - ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close); - ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources); - ops::reg_json_sync( - js_runtime, - "op_domain_to_ascii", - deno_web::op_domain_to_ascii, - ); - ops::errors::init(js_runtime); - ops::io::init(js_runtime); - ops::websocket::init(js_runtime); - - if has_deno_namespace { - ops::fs_events::init(js_runtime); - ops::fs::init(js_runtime); - ops::net::init(js_runtime); - ops::os::init(js_runtime); - ops::permissions::init(js_runtime); - ops::plugin::init(js_runtime); - ops::process::init(js_runtime); - ops::crypto::init(js_runtime, program_state.flags.seed); - ops::runtime_compiler::init(js_runtime); - ops::signal::init(js_runtime); - ops::tls::init(js_runtime); - ops::tty::init(js_runtime); - } + fn wait_for_inspector_session(&mut self) { + if self.should_break_on_first_statement { + self + .inspector + .as_mut() + .unwrap() + .wait_for_session_and_break_on_next_statement() } - - web_worker } -} -impl WebWorker { - /// Returns a way to communicate with the Worker from other threads. - pub fn thread_safe_handle(&self) -> WebWorkerHandle { - self.handle.clone() - } + /// Create new inspector session. This function panics if Worker + /// was not configured to create inspector. + pub fn create_inspector_session(&mut self) -> Box<InspectorSession> { + let inspector = self.inspector.as_mut().unwrap(); - pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { - poll_fn(|cx| self.poll_event_loop(cx)).await + InspectorSession::new(&mut **inspector) } pub fn poll_event_loop( &mut self, cx: &mut Context, ) -> Poll<Result<(), AnyError>> { - let worker = &mut self.worker; - - let terminated = self.handle.terminated.load(Ordering::Relaxed); - - if terminated { - return Poll::Ready(Ok(())); - } - - if !self.event_loop_idle { - match worker.poll_event_loop(cx) { - Poll::Ready(r) => { - let terminated = self.handle.terminated.load(Ordering::Relaxed); - if terminated { - return Poll::Ready(Ok(())); - } - - if let Err(e) = r { - eprintln!( - "{}: Uncaught (in worker \"{}\") {}", - colors::red_bold("error"), - worker.name.to_string(), - e.to_string().trim_start_matches("Uncaught "), - ); - let mut sender = worker.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) - .expect("Failed to post message to host"); - } - self.event_loop_idle = true; - } - Poll::Pending => {} - } - } - - if let Poll::Ready(r) = self.terminate_rx.poll_next_unpin(cx) { - // terminate_rx should never be closed - assert!(r.is_some()); - return Poll::Ready(Ok(())); - } - - if let Poll::Ready(r) = - worker.internal_channels.receiver.poll_next_unpin(cx) - { - match r { - Some(msg) => { - let msg = String::from_utf8(msg.to_vec()).unwrap(); - let script = format!("workerMessageRecvCallback({})", msg); - - if let Err(e) = worker.execute(&script) { - // If execution was terminated during message callback then - // just ignore it - if self.handle.terminated.load(Ordering::Relaxed) { - return Poll::Ready(Ok(())); - } - - // Otherwise forward error to host - let mut sender = worker.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) - .expect("Failed to post message to host"); - } - - // Let event loop be polled again - self.event_loop_idle = false; - worker.waker.wake(); - } - None => unreachable!(), - } - } - - Poll::Pending + // We always poll the inspector if it exists. + let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx)); + self.js_runtime.poll_event_loop(cx) } -} -impl Deref for WebWorker { - type Target = Worker; - fn deref(&self) -> &Self::Target { - &self.worker + pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { + poll_fn(|cx| self.poll_event_loop(cx)).await } } -impl DerefMut for WebWorker { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.worker +impl Drop for MainWorker { + fn drop(&mut self) { + // The Isolate object must outlive the Inspector object, but this is + // currently not enforced by the type system. + self.inspector.take(); } } @@ -589,9 +212,6 @@ mod tests { use crate::flags::DenoSubcommand; use crate::flags::Flags; use crate::program_state::ProgramState; - use crate::tokio_util; - use crate::worker::WorkerEvent; - use deno_core::serde_json::json; fn create_test_worker() -> MainWorker { let main_module = @@ -687,105 +307,4 @@ mod tests { let result = worker.execute_module(&module_specifier).await; assert!(result.is_ok()); } - - fn create_test_web_worker() -> WebWorker { - let main_module = - ModuleSpecifier::resolve_url_or_path("./hello.js").unwrap(); - let program_state = ProgramState::mock(vec!["deno".to_string()], None); - let mut worker = WebWorker::new( - "TEST".to_string(), - Permissions::allow_all(), - main_module, - program_state, - false, - ); - worker - .execute("bootstrap.workerRuntime(\"TEST\", false)") - .unwrap(); - worker - } - #[tokio::test] - async fn test_worker_messages() { - let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::<WebWorkerHandle>(1); - - let join_handle = std::thread::spawn(move || { - let mut worker = create_test_web_worker(); - let source = r#" - onmessage = function(e) { - console.log("msg from main script", e.data); - if (e.data == "exit") { - return close(); - } else { - console.assert(e.data === "hi"); - } - postMessage([1, 2, 3]); - console.log("after postMessage"); - } - "#; - worker.execute(source).unwrap(); - let handle = worker.thread_safe_handle(); - handle_sender.send(handle).unwrap(); - let r = tokio_util::run_basic(worker.run_event_loop()); - assert!(r.is_ok()) - }); - - let mut handle = handle_receiver.recv().unwrap(); - - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()); - assert!(r.is_ok()); - - let maybe_msg = handle.get_event().await.unwrap(); - assert!(maybe_msg.is_some()); - - let r = handle.post_message(msg.clone()); - assert!(r.is_ok()); - - let maybe_msg = handle.get_event().await.unwrap(); - assert!(maybe_msg.is_some()); - match maybe_msg { - Some(WorkerEvent::Message(buf)) => { - assert_eq!(*buf, *b"[1,2,3]"); - } - _ => unreachable!(), - } - - let msg = json!("exit") - .to_string() - .into_boxed_str() - .into_boxed_bytes(); - let r = handle.post_message(msg); - assert!(r.is_ok()); - let event = handle.get_event().await.unwrap(); - assert!(event.is_none()); - handle.sender.close_channel(); - join_handle.join().expect("Failed to join worker thread"); - } - - #[tokio::test] - async fn removed_from_resource_table_on_close() { - let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::<WebWorkerHandle>(1); - - let join_handle = std::thread::spawn(move || { - let mut worker = create_test_web_worker(); - worker.execute("onmessage = () => { close(); }").unwrap(); - let handle = worker.thread_safe_handle(); - handle_sender.send(handle).unwrap(); - let r = tokio_util::run_basic(worker.run_event_loop()); - assert!(r.is_ok()) - }); - - let mut handle = handle_receiver.recv().unwrap(); - - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()); - assert!(r.is_ok()); - let event = handle.get_event().await.unwrap(); - assert!(event.is_none()); - handle.sender.close_channel(); - - join_handle.join().expect("Failed to join worker thread"); - } } |