summaryrefslogtreecommitdiff
path: root/cli/web_worker.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2020-12-07 04:30:40 +0100
committerGitHub <noreply@github.com>2020-12-07 04:30:40 +0100
commitc0ccbcdaeee04407b2198557cdc55ee4adf1ee7e (patch)
tree0545b939616b1fa6f910441d18c400ea231b3563 /cli/web_worker.rs
parent7135d34ccab7629da57c75ee239fcb0dda733eae (diff)
refactor(cli): Reorganize worker code, use stronger memory ordering (#8638)
Diffstat (limited to 'cli/web_worker.rs')
-rw-r--r--cli/web_worker.rs180
1 files changed, 126 insertions, 54 deletions
diff --git a/cli/web_worker.rs b/cli/web_worker.rs
index 12b79cb2d..ddce8666e 100644
--- a/cli/web_worker.rs
+++ b/cli/web_worker.rs
@@ -10,6 +10,7 @@ use crate::ops;
use crate::permissions::Permissions;
use crate::program_state::ProgramState;
use crate::source_maps::apply_source_map;
+use crate::tokio_util::create_basic_runtime;
use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc;
use deno_core::futures::future::poll_fn;
@@ -77,7 +78,7 @@ impl WebWorkerHandle {
// This function can be called multiple times by whomever holds
// the handle. However only a single "termination" should occur so
// we need a guard here.
- let already_terminated = self.terminated.swap(true, Ordering::Relaxed);
+ let already_terminated = self.terminated.swap(true, Ordering::SeqCst);
if !already_terminated {
self.isolate_handle.terminate_execution();
@@ -134,6 +135,7 @@ impl WebWorker {
main_module: ModuleSpecifier,
program_state: Arc<ProgramState>,
has_deno_namespace: bool,
+ worker_id: u32,
) -> Self {
let module_loader = CliModuleLoader::new_for_worker();
let global_state_ = program_state.clone();
@@ -173,7 +175,7 @@ impl WebWorker {
inspector,
internal_channels,
js_runtime,
- name,
+ name: name.clone(),
waker: AtomicWaker::new(),
event_loop_idle: false,
terminate_rx,
@@ -223,9 +225,32 @@ impl WebWorker {
ops::signal::init(js_runtime);
ops::tls::init(js_runtime);
ops::tty::init(js_runtime);
+
+ let op_state = js_runtime.op_state();
+ let mut op_state = op_state.borrow_mut();
+ let (stdin, stdout, stderr) = ops::io::get_stdio();
+ if let Some(stream) = stdin {
+ op_state.resource_table.add("stdin", Box::new(stream));
+ }
+ if let Some(stream) = stdout {
+ op_state.resource_table.add("stdout", Box::new(stream));
+ }
+ if let Some(stream) = stderr {
+ op_state.resource_table.add("stderr", Box::new(stream));
+ }
}
}
+ // Instead of using name for log we use `worker-${id}` because
+ // WebWorkers can have empty string as name.
+ let script = format!(
+ "bootstrap.workerRuntime(\"{}\", {}, \"worker-{}\")",
+ name, worker.has_deno_namespace, worker_id
+ );
+ worker
+ .execute(&script)
+ .expect("Failed to execute worker bootstrap script");
+
worker
}
@@ -250,13 +275,15 @@ impl WebWorker {
self.handle.clone()
}
+ pub fn has_been_terminated(&self) -> bool {
+ self.handle.terminated.load(Ordering::SeqCst)
+ }
+
pub fn poll_event_loop(
&mut self,
cx: &mut Context,
) -> Poll<Result<(), AnyError>> {
- let terminated = self.handle.terminated.load(Ordering::Relaxed);
-
- if terminated {
+ if self.has_been_terminated() {
return Poll::Ready(Ok(()));
}
@@ -267,28 +294,20 @@ impl WebWorker {
self.waker.register(cx.waker());
self.js_runtime.poll_event_loop(cx)
};
- match poll_result {
- Poll::Ready(r) => {
- let terminated = self.handle.terminated.load(Ordering::Relaxed);
- if terminated {
- return Poll::Ready(Ok(()));
- }
- if let Err(e) = r {
- eprintln!(
- "{}: Uncaught (in worker \"{}\") {}",
- colors::red_bold("error"),
- self.name.to_string(),
- e.to_string().trim_start_matches("Uncaught "),
- );
- let mut sender = self.internal_channels.sender.clone();
- sender
- .try_send(WorkerEvent::Error(e))
- .expect("Failed to post message to host");
- }
- self.event_loop_idle = true;
+ if let Poll::Ready(r) = poll_result {
+ if self.has_been_terminated() {
+ return Poll::Ready(Ok(()));
}
- Poll::Pending => {}
+
+ if let Err(e) = r {
+ print_worker_error(e.to_string(), &self.name);
+ let mut sender = self.internal_channels.sender.clone();
+ sender
+ .try_send(WorkerEvent::Error(e))
+ .expect("Failed to post message to host");
+ }
+ self.event_loop_idle = true;
}
}
@@ -298,33 +317,32 @@ impl WebWorker {
return Poll::Ready(Ok(()));
}
- if let Poll::Ready(r) = self.internal_channels.receiver.poll_next_unpin(cx)
- {
- match r {
- Some(msg) => {
- let msg = String::from_utf8(msg.to_vec()).unwrap();
- let script = format!("workerMessageRecvCallback({})", msg);
-
- if let Err(e) = self.execute(&script) {
- // If execution was terminated during message callback then
- // just ignore it
- if self.handle.terminated.load(Ordering::Relaxed) {
- return Poll::Ready(Ok(()));
- }
+ let maybe_msg_poll_result =
+ self.internal_channels.receiver.poll_next_unpin(cx);
- // Otherwise forward error to host
- let mut sender = self.internal_channels.sender.clone();
- sender
- .try_send(WorkerEvent::Error(e))
- .expect("Failed to post message to host");
- }
+ if let Poll::Ready(maybe_msg) = maybe_msg_poll_result {
+ let msg =
+ maybe_msg.expect("Received `None` instead of message in worker");
+ let msg = String::from_utf8(msg.to_vec()).unwrap();
+ let script = format!("workerMessageRecvCallback({})", msg);
- // Let event loop be polled again
- self.event_loop_idle = false;
- self.waker.wake();
+ if let Err(e) = self.execute(&script) {
+ // If execution was terminated during message callback then
+ // just ignore it
+ if self.has_been_terminated() {
+ return Poll::Ready(Ok(()));
}
- None => unreachable!(),
+
+ // Otherwise forward error to host
+ let mut sender = self.internal_channels.sender.clone();
+ sender
+ .try_send(WorkerEvent::Error(e))
+ .expect("Failed to post message to host");
}
+
+ // Let event loop be polled again
+ self.event_loop_idle = false;
+ self.waker.wake();
}
Poll::Pending
@@ -343,6 +361,63 @@ impl Drop for WebWorker {
}
}
+fn print_worker_error(error_str: String, name: &str) {
+ eprintln!(
+ "{}: Uncaught (in worker \"{}\") {}",
+ colors::red_bold("error"),
+ name,
+ error_str.trim_start_matches("Uncaught "),
+ );
+}
+
+/// This function should be called from a thread dedicated to this worker.
+// TODO(bartlomieju): check if order of actions is aligned to Worker spec
+pub fn run_web_worker(
+ mut worker: WebWorker,
+ specifier: ModuleSpecifier,
+ maybe_source_code: Option<String>,
+) -> Result<(), AnyError> {
+ let name = worker.name.to_string();
+
+ let mut rt = create_basic_runtime();
+
+ // TODO(bartlomieju): run following block using "select!"
+ // with terminate
+
+ // Execute provided source code immediately
+ let result = if let Some(source_code) = maybe_source_code {
+ worker.execute(&source_code)
+ } else {
+ // TODO(bartlomieju): add "type": "classic", ie. ability to load
+ // script instead of module
+ let load_future = worker.execute_module(&specifier).boxed_local();
+
+ rt.block_on(load_future)
+ };
+
+ let mut sender = worker.internal_channels.sender.clone();
+
+ // If sender is closed it means that worker has already been closed from
+ // within using "globalThis.close()"
+ if sender.is_closed() {
+ return Ok(());
+ }
+
+ if let Err(e) = result {
+ print_worker_error(e.to_string(), &name);
+ sender
+ .try_send(WorkerEvent::TerminalError(e))
+ .expect("Failed to post message to host");
+
+ // Failure to execute script is a terminal error, bye, bye.
+ return Ok(());
+ }
+
+ let result = rt.block_on(worker.run_event_loop());
+ debug!("Worker thread shuts down {}", &name);
+ result
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -354,17 +429,14 @@ mod tests {
let main_module =
ModuleSpecifier::resolve_url_or_path("./hello.js").unwrap();
let program_state = ProgramState::mock(vec!["deno".to_string()], None);
- let mut worker = WebWorker::new(
+ WebWorker::new(
"TEST".to_string(),
Permissions::allow_all(),
main_module,
program_state,
false,
- );
- worker
- .execute("bootstrap.workerRuntime(\"TEST\", false)")
- .unwrap();
- worker
+ 1,
+ )
}
#[tokio::test]