diff options
| author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2020-12-07 04:30:40 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-12-07 04:30:40 +0100 |
| commit | c0ccbcdaeee04407b2198557cdc55ee4adf1ee7e (patch) | |
| tree | 0545b939616b1fa6f910441d18c400ea231b3563 /cli/web_worker.rs | |
| parent | 7135d34ccab7629da57c75ee239fcb0dda733eae (diff) | |
refactor(cli): Reorganize worker code, use stronger memory ordering (#8638)
Diffstat (limited to 'cli/web_worker.rs')
| -rw-r--r-- | cli/web_worker.rs | 180 |
1 files changed, 126 insertions, 54 deletions
diff --git a/cli/web_worker.rs b/cli/web_worker.rs index 12b79cb2d..ddce8666e 100644 --- a/cli/web_worker.rs +++ b/cli/web_worker.rs @@ -10,6 +10,7 @@ use crate::ops; use crate::permissions::Permissions; use crate::program_state::ProgramState; use crate::source_maps::apply_source_map; +use crate::tokio_util::create_basic_runtime; use deno_core::error::AnyError; use deno_core::futures::channel::mpsc; use deno_core::futures::future::poll_fn; @@ -77,7 +78,7 @@ impl WebWorkerHandle { // This function can be called multiple times by whomever holds // the handle. However only a single "termination" should occur so // we need a guard here. - let already_terminated = self.terminated.swap(true, Ordering::Relaxed); + let already_terminated = self.terminated.swap(true, Ordering::SeqCst); if !already_terminated { self.isolate_handle.terminate_execution(); @@ -134,6 +135,7 @@ impl WebWorker { main_module: ModuleSpecifier, program_state: Arc<ProgramState>, has_deno_namespace: bool, + worker_id: u32, ) -> Self { let module_loader = CliModuleLoader::new_for_worker(); let global_state_ = program_state.clone(); @@ -173,7 +175,7 @@ impl WebWorker { inspector, internal_channels, js_runtime, - name, + name: name.clone(), waker: AtomicWaker::new(), event_loop_idle: false, terminate_rx, @@ -223,9 +225,32 @@ impl WebWorker { ops::signal::init(js_runtime); ops::tls::init(js_runtime); ops::tty::init(js_runtime); + + let op_state = js_runtime.op_state(); + let mut op_state = op_state.borrow_mut(); + let (stdin, stdout, stderr) = ops::io::get_stdio(); + if let Some(stream) = stdin { + op_state.resource_table.add("stdin", Box::new(stream)); + } + if let Some(stream) = stdout { + op_state.resource_table.add("stdout", Box::new(stream)); + } + if let Some(stream) = stderr { + op_state.resource_table.add("stderr", Box::new(stream)); + } } } + // Instead of using name for log we use `worker-${id}` because + // WebWorkers can have empty string as name. + let script = format!( + "bootstrap.workerRuntime(\"{}\", {}, \"worker-{}\")", + name, worker.has_deno_namespace, worker_id + ); + worker + .execute(&script) + .expect("Failed to execute worker bootstrap script"); + worker } @@ -250,13 +275,15 @@ impl WebWorker { self.handle.clone() } + pub fn has_been_terminated(&self) -> bool { + self.handle.terminated.load(Ordering::SeqCst) + } + pub fn poll_event_loop( &mut self, cx: &mut Context, ) -> Poll<Result<(), AnyError>> { - let terminated = self.handle.terminated.load(Ordering::Relaxed); - - if terminated { + if self.has_been_terminated() { return Poll::Ready(Ok(())); } @@ -267,28 +294,20 @@ impl WebWorker { self.waker.register(cx.waker()); self.js_runtime.poll_event_loop(cx) }; - match poll_result { - Poll::Ready(r) => { - let terminated = self.handle.terminated.load(Ordering::Relaxed); - if terminated { - return Poll::Ready(Ok(())); - } - if let Err(e) = r { - eprintln!( - "{}: Uncaught (in worker \"{}\") {}", - colors::red_bold("error"), - self.name.to_string(), - e.to_string().trim_start_matches("Uncaught "), - ); - let mut sender = self.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) - .expect("Failed to post message to host"); - } - self.event_loop_idle = true; + if let Poll::Ready(r) = poll_result { + if self.has_been_terminated() { + return Poll::Ready(Ok(())); } - Poll::Pending => {} + + if let Err(e) = r { + print_worker_error(e.to_string(), &self.name); + let mut sender = self.internal_channels.sender.clone(); + sender + .try_send(WorkerEvent::Error(e)) + .expect("Failed to post message to host"); + } + self.event_loop_idle = true; } } @@ -298,33 +317,32 @@ impl WebWorker { return Poll::Ready(Ok(())); } - if let Poll::Ready(r) = self.internal_channels.receiver.poll_next_unpin(cx) - { - match r { - Some(msg) => { - let msg = String::from_utf8(msg.to_vec()).unwrap(); - let script = format!("workerMessageRecvCallback({})", msg); - - if let Err(e) = self.execute(&script) { - // If execution was terminated during message callback then - // just ignore it - if self.handle.terminated.load(Ordering::Relaxed) { - return Poll::Ready(Ok(())); - } + let maybe_msg_poll_result = + self.internal_channels.receiver.poll_next_unpin(cx); - // Otherwise forward error to host - let mut sender = self.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) - .expect("Failed to post message to host"); - } + if let Poll::Ready(maybe_msg) = maybe_msg_poll_result { + let msg = + maybe_msg.expect("Received `None` instead of message in worker"); + let msg = String::from_utf8(msg.to_vec()).unwrap(); + let script = format!("workerMessageRecvCallback({})", msg); - // Let event loop be polled again - self.event_loop_idle = false; - self.waker.wake(); + if let Err(e) = self.execute(&script) { + // If execution was terminated during message callback then + // just ignore it + if self.has_been_terminated() { + return Poll::Ready(Ok(())); } - None => unreachable!(), + + // Otherwise forward error to host + let mut sender = self.internal_channels.sender.clone(); + sender + .try_send(WorkerEvent::Error(e)) + .expect("Failed to post message to host"); } + + // Let event loop be polled again + self.event_loop_idle = false; + self.waker.wake(); } Poll::Pending @@ -343,6 +361,63 @@ impl Drop for WebWorker { } } +fn print_worker_error(error_str: String, name: &str) { + eprintln!( + "{}: Uncaught (in worker \"{}\") {}", + colors::red_bold("error"), + name, + error_str.trim_start_matches("Uncaught "), + ); +} + +/// This function should be called from a thread dedicated to this worker. +// TODO(bartlomieju): check if order of actions is aligned to Worker spec +pub fn run_web_worker( + mut worker: WebWorker, + specifier: ModuleSpecifier, + maybe_source_code: Option<String>, +) -> Result<(), AnyError> { + let name = worker.name.to_string(); + + let mut rt = create_basic_runtime(); + + // TODO(bartlomieju): run following block using "select!" + // with terminate + + // Execute provided source code immediately + let result = if let Some(source_code) = maybe_source_code { + worker.execute(&source_code) + } else { + // TODO(bartlomieju): add "type": "classic", ie. ability to load + // script instead of module + let load_future = worker.execute_module(&specifier).boxed_local(); + + rt.block_on(load_future) + }; + + let mut sender = worker.internal_channels.sender.clone(); + + // If sender is closed it means that worker has already been closed from + // within using "globalThis.close()" + if sender.is_closed() { + return Ok(()); + } + + if let Err(e) = result { + print_worker_error(e.to_string(), &name); + sender + .try_send(WorkerEvent::TerminalError(e)) + .expect("Failed to post message to host"); + + // Failure to execute script is a terminal error, bye, bye. + return Ok(()); + } + + let result = rt.block_on(worker.run_event_loop()); + debug!("Worker thread shuts down {}", &name); + result +} + #[cfg(test)] mod tests { use super::*; @@ -354,17 +429,14 @@ mod tests { let main_module = ModuleSpecifier::resolve_url_or_path("./hello.js").unwrap(); let program_state = ProgramState::mock(vec!["deno".to_string()], None); - let mut worker = WebWorker::new( + WebWorker::new( "TEST".to_string(), Permissions::allow_all(), main_module, program_state, false, - ); - worker - .execute("bootstrap.workerRuntime(\"TEST\", false)") - .unwrap(); - worker + 1, + ) } #[tokio::test] |
