From c0ccbcdaeee04407b2198557cdc55ee4adf1ee7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Mon, 7 Dec 2020 04:30:40 +0100 Subject: refactor(cli): Reorganize worker code, use stronger memory ordering (#8638) --- cli/web_worker.rs | 180 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 126 insertions(+), 54 deletions(-) (limited to 'cli/web_worker.rs') diff --git a/cli/web_worker.rs b/cli/web_worker.rs index 12b79cb2d..ddce8666e 100644 --- a/cli/web_worker.rs +++ b/cli/web_worker.rs @@ -10,6 +10,7 @@ use crate::ops; use crate::permissions::Permissions; use crate::program_state::ProgramState; use crate::source_maps::apply_source_map; +use crate::tokio_util::create_basic_runtime; use deno_core::error::AnyError; use deno_core::futures::channel::mpsc; use deno_core::futures::future::poll_fn; @@ -77,7 +78,7 @@ impl WebWorkerHandle { // This function can be called multiple times by whomever holds // the handle. However only a single "termination" should occur so // we need a guard here. - let already_terminated = self.terminated.swap(true, Ordering::Relaxed); + let already_terminated = self.terminated.swap(true, Ordering::SeqCst); if !already_terminated { self.isolate_handle.terminate_execution(); @@ -134,6 +135,7 @@ impl WebWorker { main_module: ModuleSpecifier, program_state: Arc, has_deno_namespace: bool, + worker_id: u32, ) -> Self { let module_loader = CliModuleLoader::new_for_worker(); let global_state_ = program_state.clone(); @@ -173,7 +175,7 @@ impl WebWorker { inspector, internal_channels, js_runtime, - name, + name: name.clone(), waker: AtomicWaker::new(), event_loop_idle: false, terminate_rx, @@ -223,9 +225,32 @@ impl WebWorker { ops::signal::init(js_runtime); ops::tls::init(js_runtime); ops::tty::init(js_runtime); + + let op_state = js_runtime.op_state(); + let mut op_state = op_state.borrow_mut(); + let (stdin, stdout, stderr) = ops::io::get_stdio(); + if let Some(stream) = stdin { + op_state.resource_table.add("stdin", Box::new(stream)); + } + if let Some(stream) = stdout { + op_state.resource_table.add("stdout", Box::new(stream)); + } + if let Some(stream) = stderr { + op_state.resource_table.add("stderr", Box::new(stream)); + } } } + // Instead of using name for log we use `worker-${id}` because + // WebWorkers can have empty string as name. + let script = format!( + "bootstrap.workerRuntime(\"{}\", {}, \"worker-{}\")", + name, worker.has_deno_namespace, worker_id + ); + worker + .execute(&script) + .expect("Failed to execute worker bootstrap script"); + worker } @@ -250,13 +275,15 @@ impl WebWorker { self.handle.clone() } + pub fn has_been_terminated(&self) -> bool { + self.handle.terminated.load(Ordering::SeqCst) + } + pub fn poll_event_loop( &mut self, cx: &mut Context, ) -> Poll> { - let terminated = self.handle.terminated.load(Ordering::Relaxed); - - if terminated { + if self.has_been_terminated() { return Poll::Ready(Ok(())); } @@ -267,28 +294,20 @@ impl WebWorker { self.waker.register(cx.waker()); self.js_runtime.poll_event_loop(cx) }; - match poll_result { - Poll::Ready(r) => { - let terminated = self.handle.terminated.load(Ordering::Relaxed); - if terminated { - return Poll::Ready(Ok(())); - } - if let Err(e) = r { - eprintln!( - "{}: Uncaught (in worker \"{}\") {}", - colors::red_bold("error"), - self.name.to_string(), - e.to_string().trim_start_matches("Uncaught "), - ); - let mut sender = self.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) - .expect("Failed to post message to host"); - } - self.event_loop_idle = true; + if let Poll::Ready(r) = poll_result { + if self.has_been_terminated() { + return Poll::Ready(Ok(())); } - Poll::Pending => {} + + if let Err(e) = r { + print_worker_error(e.to_string(), &self.name); + let mut sender = self.internal_channels.sender.clone(); + sender + .try_send(WorkerEvent::Error(e)) + .expect("Failed to post message to host"); + } + self.event_loop_idle = true; } } @@ -298,33 +317,32 @@ impl WebWorker { return Poll::Ready(Ok(())); } - if let Poll::Ready(r) = self.internal_channels.receiver.poll_next_unpin(cx) - { - match r { - Some(msg) => { - let msg = String::from_utf8(msg.to_vec()).unwrap(); - let script = format!("workerMessageRecvCallback({})", msg); - - if let Err(e) = self.execute(&script) { - // If execution was terminated during message callback then - // just ignore it - if self.handle.terminated.load(Ordering::Relaxed) { - return Poll::Ready(Ok(())); - } + let maybe_msg_poll_result = + self.internal_channels.receiver.poll_next_unpin(cx); - // Otherwise forward error to host - let mut sender = self.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) - .expect("Failed to post message to host"); - } + if let Poll::Ready(maybe_msg) = maybe_msg_poll_result { + let msg = + maybe_msg.expect("Received `None` instead of message in worker"); + let msg = String::from_utf8(msg.to_vec()).unwrap(); + let script = format!("workerMessageRecvCallback({})", msg); - // Let event loop be polled again - self.event_loop_idle = false; - self.waker.wake(); + if let Err(e) = self.execute(&script) { + // If execution was terminated during message callback then + // just ignore it + if self.has_been_terminated() { + return Poll::Ready(Ok(())); } - None => unreachable!(), + + // Otherwise forward error to host + let mut sender = self.internal_channels.sender.clone(); + sender + .try_send(WorkerEvent::Error(e)) + .expect("Failed to post message to host"); } + + // Let event loop be polled again + self.event_loop_idle = false; + self.waker.wake(); } Poll::Pending @@ -343,6 +361,63 @@ impl Drop for WebWorker { } } +fn print_worker_error(error_str: String, name: &str) { + eprintln!( + "{}: Uncaught (in worker \"{}\") {}", + colors::red_bold("error"), + name, + error_str.trim_start_matches("Uncaught "), + ); +} + +/// This function should be called from a thread dedicated to this worker. +// TODO(bartlomieju): check if order of actions is aligned to Worker spec +pub fn run_web_worker( + mut worker: WebWorker, + specifier: ModuleSpecifier, + maybe_source_code: Option, +) -> Result<(), AnyError> { + let name = worker.name.to_string(); + + let mut rt = create_basic_runtime(); + + // TODO(bartlomieju): run following block using "select!" + // with terminate + + // Execute provided source code immediately + let result = if let Some(source_code) = maybe_source_code { + worker.execute(&source_code) + } else { + // TODO(bartlomieju): add "type": "classic", ie. ability to load + // script instead of module + let load_future = worker.execute_module(&specifier).boxed_local(); + + rt.block_on(load_future) + }; + + let mut sender = worker.internal_channels.sender.clone(); + + // If sender is closed it means that worker has already been closed from + // within using "globalThis.close()" + if sender.is_closed() { + return Ok(()); + } + + if let Err(e) = result { + print_worker_error(e.to_string(), &name); + sender + .try_send(WorkerEvent::TerminalError(e)) + .expect("Failed to post message to host"); + + // Failure to execute script is a terminal error, bye, bye. + return Ok(()); + } + + let result = rt.block_on(worker.run_event_loop()); + debug!("Worker thread shuts down {}", &name); + result +} + #[cfg(test)] mod tests { use super::*; @@ -354,17 +429,14 @@ mod tests { let main_module = ModuleSpecifier::resolve_url_or_path("./hello.js").unwrap(); let program_state = ProgramState::mock(vec!["deno".to_string()], None); - let mut worker = WebWorker::new( + WebWorker::new( "TEST".to_string(), Permissions::allow_all(), main_module, program_state, false, - ); - worker - .execute("bootstrap.workerRuntime(\"TEST\", false)") - .unwrap(); - worker + 1, + ) } #[tokio::test] -- cgit v1.2.3