summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/ops/websocket.rs4
-rw-r--r--cli/ops/worker_host.rs284
-rw-r--r--cli/web_worker.rs180
-rw-r--r--cli/worker.rs3
4 files changed, 218 insertions, 253 deletions
diff --git a/cli/ops/websocket.rs b/cli/ops/websocket.rs
index 40f5fd53b..c04c3b476 100644
--- a/cli/ops/websocket.rs
+++ b/cli/ops/websocket.rs
@@ -93,8 +93,8 @@ pub async fn op_ws_create(
}
let ca_file = {
- let cli_state = super::global_state2(&state);
- cli_state.flags.ca_file.clone()
+ let program_state = super::global_state2(&state);
+ program_state.flags.ca_file.clone()
};
let uri: Uri = args.url.parse()?;
let mut request = Request::builder().method(Method::GET).uri(&uri);
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs
index c464e6df2..6a2d79968 100644
--- a/cli/ops/worker_host.rs
+++ b/cli/ops/worker_host.rs
@@ -1,10 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use crate::colors;
-use crate::ops::io::get_stdio;
use crate::permissions::Permissions;
-use crate::program_state::ProgramState;
-use crate::tokio_util::create_basic_runtime;
+use crate::web_worker::run_web_worker;
use crate::web_worker::WebWorker;
use crate::web_worker::WebWorkerHandle;
use crate::web_worker::WorkerEvent;
@@ -12,7 +9,6 @@ use deno_core::error::generic_error;
use deno_core::error::AnyError;
use deno_core::error::JsError;
use deno_core::futures::channel::mpsc;
-use deno_core::futures::future::FutureExt;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
@@ -25,7 +21,6 @@ use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::From;
use std::rc::Rc;
-use std::sync::Arc;
use std::thread::JoinHandle;
#[derive(Deserialize)]
@@ -68,151 +63,13 @@ pub fn init(
);
}
-pub type WorkersTable = HashMap<u32, (JoinHandle<()>, WebWorkerHandle)>;
-pub type WorkerId = u32;
-
-fn create_web_worker(
- worker_id: u32,
- name: String,
- program_state: &Arc<ProgramState>,
- permissions: Permissions,
- specifier: ModuleSpecifier,
- has_deno_namespace: bool,
-) -> Result<WebWorker, AnyError> {
- let mut worker = WebWorker::new(
- name.clone(),
- permissions,
- specifier,
- program_state.clone(),
- has_deno_namespace,
- );
-
- if has_deno_namespace {
- let state = worker.js_runtime.op_state();
- let mut state = state.borrow_mut();
- let (stdin, stdout, stderr) = get_stdio();
- if let Some(stream) = stdin {
- state.resource_table.add("stdin", Box::new(stream));
- }
- if let Some(stream) = stdout {
- state.resource_table.add("stdout", Box::new(stream));
- }
- if let Some(stream) = stderr {
- state.resource_table.add("stderr", Box::new(stream));
- }
- }
-
- // Instead of using name for log we use `worker-${id}` because
- // WebWorkers can have empty string as name.
- let script = format!(
- "bootstrap.workerRuntime(\"{}\", {}, \"worker-{}\")",
- name, worker.has_deno_namespace, worker_id
- );
- worker.execute(&script)?;
-
- Ok(worker)
+pub struct WorkerThread {
+ join_handle: JoinHandle<Result<(), AnyError>>,
+ worker_handle: WebWorkerHandle,
}
-// TODO(bartlomieju): check if order of actions is aligned to Worker spec
-fn run_worker_thread(
- worker_id: u32,
- name: String,
- program_state: &Arc<ProgramState>,
- permissions: Permissions,
- specifier: ModuleSpecifier,
- has_deno_namespace: bool,
- maybe_source_code: Option<String>,
-) -> Result<(JoinHandle<()>, WebWorkerHandle), AnyError> {
- let program_state = program_state.clone();
- let (handle_sender, handle_receiver) =
- std::sync::mpsc::sync_channel::<Result<WebWorkerHandle, AnyError>>(1);
-
- let builder =
- std::thread::Builder::new().name(format!("deno-worker-{}", worker_id));
- let join_handle = builder.spawn(move || {
- // Any error inside this block is terminal:
- // - JS worker is useless - meaning it throws an exception and can't do anything else,
- // all action done upon it should be noops
- // - newly spawned thread exits
- let result = create_web_worker(
- worker_id,
- name,
- &program_state,
- permissions,
- specifier.clone(),
- has_deno_namespace,
- );
-
- if let Err(err) = result {
- handle_sender.send(Err(err)).unwrap();
- return;
- }
-
- let mut worker = result.unwrap();
- let name = worker.name.to_string();
- // Send thread safe handle to newly created worker to host thread
- handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
- drop(handle_sender);
-
- // At this point the only method of communication with host
- // is using `worker.internal_channels`.
- //
- // Host can already push messages and interact with worker.
- //
- // Next steps:
- // - create tokio runtime
- // - load provided module or code
- // - start driving worker's event loop
-
- let mut rt = create_basic_runtime();
-
- // TODO: run with using select with terminate
-
- // Execute provided source code immediately
- let result = if let Some(source_code) = maybe_source_code {
- worker.execute(&source_code)
- } else {
- // TODO(bartlomieju): add "type": "classic", ie. ability to load
- // script instead of module
- let load_future = worker.execute_module(&specifier).boxed_local();
-
- rt.block_on(load_future)
- };
-
- let mut sender = worker.internal_channels.sender.clone();
-
- // If sender is closed it means that worker has already been closed from
- // within using "globalThis.close()"
- if sender.is_closed() {
- return;
- }
-
- if let Err(e) = result {
- eprintln!(
- "{}: Uncaught (in worker \"{}\") {}",
- colors::red_bold("error"),
- name,
- e.to_string().trim_start_matches("Uncaught "),
- );
- sender
- .try_send(WorkerEvent::TerminalError(e))
- .expect("Failed to post message to host");
-
- // Failure to execute script is a terminal error, bye, bye.
- return;
- }
-
- // TODO(bartlomieju): this thread should return result of event loop
- // that means that we should store JoinHandle to thread to ensure
- // that it actually terminates.
- rt.block_on(worker.run_event_loop())
- .expect("Panic in event loop");
- debug!("Worker thread shuts down {}", &name);
- })?;
-
- let worker_handle = handle_receiver.recv().unwrap()?;
- Ok((join_handle, worker_handle))
-}
+pub type WorkersTable = HashMap<u32, WorkerThread>;
+pub type WorkerId = u32;
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -249,22 +106,53 @@ fn op_create_worker(
let module_specifier = ModuleSpecifier::resolve_url(&specifier)?;
let worker_name = args_name.unwrap_or_else(|| "".to_string());
- let cli_state = super::program_state(state);
+ let program_state = super::program_state(state);
+
+ let (handle_sender, handle_receiver) =
+ std::sync::mpsc::sync_channel::<Result<WebWorkerHandle, AnyError>>(1);
+
+ // Setup new thread
+ let thread_builder =
+ std::thread::Builder::new().name(format!("deno-worker-{}", worker_id));
+
+ // Spawn it
+ let join_handle = thread_builder.spawn(move || {
+ // Any error inside this block is terminal:
+ // - JS worker is useless - meaning it throws an exception and can't do anything else,
+ // all action done upon it should be noops
+ // - newly spawned thread exits
+ let worker = WebWorker::new(
+ worker_name,
+ permissions,
+ module_specifier.clone(),
+ program_state,
+ use_deno_namespace,
+ worker_id,
+ );
+
+ // Send thread safe handle to newly created worker to host thread
+ handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
+ drop(handle_sender);
+
+ // At this point the only method of communication with host
+ // is using `worker.internal_channels`.
+ //
+ // Host can already push messages and interact with worker.
+ run_web_worker(worker, module_specifier, maybe_source_code)
+ })?;
+
+ let worker_handle = handle_receiver.recv().unwrap()?;
+
+ let worker_thread = WorkerThread {
+ join_handle,
+ worker_handle,
+ };
- let (join_handle, worker_handle) = run_worker_thread(
- worker_id,
- worker_name,
- &cli_state,
- permissions,
- module_specifier,
- use_deno_namespace,
- maybe_source_code,
- )?;
// At this point all interactions with worker happen using thread
- // safe handler returned from previous function call
+ // safe handler returned from previous function calls
state
.borrow_mut::<WorkersTable>()
- .insert(worker_id, (join_handle, worker_handle));
+ .insert(worker_id, worker_thread);
Ok(json!({ "id": worker_id }))
}
@@ -281,12 +169,16 @@ fn op_host_terminate_worker(
) -> Result<Value, AnyError> {
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
- let (join_handle, worker_handle) = state
+ let worker_thread = state
.borrow_mut::<WorkersTable>()
.remove(&id)
.expect("No worker handle found");
- worker_handle.terminate();
- join_handle.join().expect("Panic in worker thread");
+ worker_thread.worker_handle.terminate();
+ worker_thread
+ .join_handle
+ .join()
+ .expect("Panic in worker thread")
+ .expect("Panic in worker event loop");
Ok(json!({}))
}
@@ -330,6 +222,22 @@ fn serialize_worker_event(event: WorkerEvent) -> Value {
}
}
+/// Try to remove worker from workers table - NOTE: `Worker.terminate()`
+/// might have been called already meaning that we won't find worker in
+/// table - in that case ignore.
+fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: u32) {
+ let mut s = state.borrow_mut();
+ let workers = s.borrow_mut::<WorkersTable>();
+ if let Some(mut worker_thread) = workers.remove(&id) {
+ worker_thread.worker_handle.sender.close_channel();
+ worker_thread
+ .join_handle
+ .join()
+ .expect("Worker thread panicked")
+ .expect("Panic in worker event loop");
+ }
+}
+
/// Get message from guest worker as host
async fn op_host_get_message(
state: Rc<RefCell<OpState>>,
@@ -344,41 +252,25 @@ async fn op_host_get_message(
let workers_table = s.borrow::<WorkersTable>();
let maybe_handle = workers_table.get(&id);
if let Some(handle) = maybe_handle {
- handle.1.clone()
+ handle.worker_handle.clone()
} else {
// If handle was not found it means worker has already shutdown
return Ok(json!({ "type": "close" }));
}
};
- let response = match worker_handle.get_event().await? {
- Some(event) => {
- // Terminal error means that worker should be removed from worker table.
- if let WorkerEvent::TerminalError(_) = &event {
- let mut s = state.borrow_mut();
- if let Some((join_handle, mut worker_handle)) =
- s.borrow_mut::<WorkersTable>().remove(&id)
- {
- worker_handle.sender.close_channel();
- join_handle.join().expect("Worker thread panicked");
- };
- }
- serialize_worker_event(event)
+ let maybe_event = worker_handle.get_event().await?;
+ if let Some(event) = maybe_event {
+ // Terminal error means that worker should be removed from worker table.
+ if let WorkerEvent::TerminalError(_) = &event {
+ try_remove_and_close(state, id);
}
- None => {
- // Worker shuts down
- let mut s = state.borrow_mut();
- let workers = s.borrow_mut::<WorkersTable>();
- // Try to remove worker from workers table - NOTE: `Worker.terminate()` might have been called
- // already meaning that we won't find worker in table - in that case ignore.
- if let Some((join_handle, mut worker_handle)) = workers.remove(&id) {
- worker_handle.sender.close_channel();
- join_handle.join().expect("Worker thread panicked");
- }
- json!({ "type": "close" })
- }
- };
- Ok(response)
+ return Ok(serialize_worker_event(event));
+ }
+
+ // If there was no event from worker it means it has already been closed.
+ try_remove_and_close(state, id);
+ Ok(json!({ "type": "close" }))
}
/// Post message to guest worker as host
@@ -393,8 +285,10 @@ fn op_host_post_message(
let msg = Vec::from(&*data[0]).into_boxed_slice();
debug!("post message to worker {}", id);
- let workers = state.borrow::<WorkersTable>();
- let worker_handle = workers[&id].1.clone();
- worker_handle.post_message(msg)?;
+ let worker_thread = state
+ .borrow::<WorkersTable>()
+ .get(&id)
+ .expect("No worker handle found");
+ worker_thread.worker_handle.post_message(msg)?;
Ok(json!({}))
}
diff --git a/cli/web_worker.rs b/cli/web_worker.rs
index 12b79cb2d..ddce8666e 100644
--- a/cli/web_worker.rs
+++ b/cli/web_worker.rs
@@ -10,6 +10,7 @@ use crate::ops;
use crate::permissions::Permissions;
use crate::program_state::ProgramState;
use crate::source_maps::apply_source_map;
+use crate::tokio_util::create_basic_runtime;
use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc;
use deno_core::futures::future::poll_fn;
@@ -77,7 +78,7 @@ impl WebWorkerHandle {
// This function can be called multiple times by whomever holds
// the handle. However only a single "termination" should occur so
// we need a guard here.
- let already_terminated = self.terminated.swap(true, Ordering::Relaxed);
+ let already_terminated = self.terminated.swap(true, Ordering::SeqCst);
if !already_terminated {
self.isolate_handle.terminate_execution();
@@ -134,6 +135,7 @@ impl WebWorker {
main_module: ModuleSpecifier,
program_state: Arc<ProgramState>,
has_deno_namespace: bool,
+ worker_id: u32,
) -> Self {
let module_loader = CliModuleLoader::new_for_worker();
let global_state_ = program_state.clone();
@@ -173,7 +175,7 @@ impl WebWorker {
inspector,
internal_channels,
js_runtime,
- name,
+ name: name.clone(),
waker: AtomicWaker::new(),
event_loop_idle: false,
terminate_rx,
@@ -223,9 +225,32 @@ impl WebWorker {
ops::signal::init(js_runtime);
ops::tls::init(js_runtime);
ops::tty::init(js_runtime);
+
+ let op_state = js_runtime.op_state();
+ let mut op_state = op_state.borrow_mut();
+ let (stdin, stdout, stderr) = ops::io::get_stdio();
+ if let Some(stream) = stdin {
+ op_state.resource_table.add("stdin", Box::new(stream));
+ }
+ if let Some(stream) = stdout {
+ op_state.resource_table.add("stdout", Box::new(stream));
+ }
+ if let Some(stream) = stderr {
+ op_state.resource_table.add("stderr", Box::new(stream));
+ }
}
}
+ // Instead of using name for log we use `worker-${id}` because
+ // WebWorkers can have empty string as name.
+ let script = format!(
+ "bootstrap.workerRuntime(\"{}\", {}, \"worker-{}\")",
+ name, worker.has_deno_namespace, worker_id
+ );
+ worker
+ .execute(&script)
+ .expect("Failed to execute worker bootstrap script");
+
worker
}
@@ -250,13 +275,15 @@ impl WebWorker {
self.handle.clone()
}
+ pub fn has_been_terminated(&self) -> bool {
+ self.handle.terminated.load(Ordering::SeqCst)
+ }
+
pub fn poll_event_loop(
&mut self,
cx: &mut Context,
) -> Poll<Result<(), AnyError>> {
- let terminated = self.handle.terminated.load(Ordering::Relaxed);
-
- if terminated {
+ if self.has_been_terminated() {
return Poll::Ready(Ok(()));
}
@@ -267,28 +294,20 @@ impl WebWorker {
self.waker.register(cx.waker());
self.js_runtime.poll_event_loop(cx)
};
- match poll_result {
- Poll::Ready(r) => {
- let terminated = self.handle.terminated.load(Ordering::Relaxed);
- if terminated {
- return Poll::Ready(Ok(()));
- }
- if let Err(e) = r {
- eprintln!(
- "{}: Uncaught (in worker \"{}\") {}",
- colors::red_bold("error"),
- self.name.to_string(),
- e.to_string().trim_start_matches("Uncaught "),
- );
- let mut sender = self.internal_channels.sender.clone();
- sender
- .try_send(WorkerEvent::Error(e))
- .expect("Failed to post message to host");
- }
- self.event_loop_idle = true;
+ if let Poll::Ready(r) = poll_result {
+ if self.has_been_terminated() {
+ return Poll::Ready(Ok(()));
}
- Poll::Pending => {}
+
+ if let Err(e) = r {
+ print_worker_error(e.to_string(), &self.name);
+ let mut sender = self.internal_channels.sender.clone();
+ sender
+ .try_send(WorkerEvent::Error(e))
+ .expect("Failed to post message to host");
+ }
+ self.event_loop_idle = true;
}
}
@@ -298,33 +317,32 @@ impl WebWorker {
return Poll::Ready(Ok(()));
}
- if let Poll::Ready(r) = self.internal_channels.receiver.poll_next_unpin(cx)
- {
- match r {
- Some(msg) => {
- let msg = String::from_utf8(msg.to_vec()).unwrap();
- let script = format!("workerMessageRecvCallback({})", msg);
-
- if let Err(e) = self.execute(&script) {
- // If execution was terminated during message callback then
- // just ignore it
- if self.handle.terminated.load(Ordering::Relaxed) {
- return Poll::Ready(Ok(()));
- }
+ let maybe_msg_poll_result =
+ self.internal_channels.receiver.poll_next_unpin(cx);
- // Otherwise forward error to host
- let mut sender = self.internal_channels.sender.clone();
- sender
- .try_send(WorkerEvent::Error(e))
- .expect("Failed to post message to host");
- }
+ if let Poll::Ready(maybe_msg) = maybe_msg_poll_result {
+ let msg =
+ maybe_msg.expect("Received `None` instead of message in worker");
+ let msg = String::from_utf8(msg.to_vec()).unwrap();
+ let script = format!("workerMessageRecvCallback({})", msg);
- // Let event loop be polled again
- self.event_loop_idle = false;
- self.waker.wake();
+ if let Err(e) = self.execute(&script) {
+ // If execution was terminated during message callback then
+ // just ignore it
+ if self.has_been_terminated() {
+ return Poll::Ready(Ok(()));
}
- None => unreachable!(),
+
+ // Otherwise forward error to host
+ let mut sender = self.internal_channels.sender.clone();
+ sender
+ .try_send(WorkerEvent::Error(e))
+ .expect("Failed to post message to host");
}
+
+ // Let event loop be polled again
+ self.event_loop_idle = false;
+ self.waker.wake();
}
Poll::Pending
@@ -343,6 +361,63 @@ impl Drop for WebWorker {
}
}
+fn print_worker_error(error_str: String, name: &str) {
+ eprintln!(
+ "{}: Uncaught (in worker \"{}\") {}",
+ colors::red_bold("error"),
+ name,
+ error_str.trim_start_matches("Uncaught "),
+ );
+}
+
+/// This function should be called from a thread dedicated to this worker.
+// TODO(bartlomieju): check if order of actions is aligned to Worker spec
+pub fn run_web_worker(
+ mut worker: WebWorker,
+ specifier: ModuleSpecifier,
+ maybe_source_code: Option<String>,
+) -> Result<(), AnyError> {
+ let name = worker.name.to_string();
+
+ let mut rt = create_basic_runtime();
+
+ // TODO(bartlomieju): run following block using "select!"
+ // with terminate
+
+ // Execute provided source code immediately
+ let result = if let Some(source_code) = maybe_source_code {
+ worker.execute(&source_code)
+ } else {
+ // TODO(bartlomieju): add "type": "classic", ie. ability to load
+ // script instead of module
+ let load_future = worker.execute_module(&specifier).boxed_local();
+
+ rt.block_on(load_future)
+ };
+
+ let mut sender = worker.internal_channels.sender.clone();
+
+ // If sender is closed it means that worker has already been closed from
+ // within using "globalThis.close()"
+ if sender.is_closed() {
+ return Ok(());
+ }
+
+ if let Err(e) = result {
+ print_worker_error(e.to_string(), &name);
+ sender
+ .try_send(WorkerEvent::TerminalError(e))
+ .expect("Failed to post message to host");
+
+ // Failure to execute script is a terminal error, bye, bye.
+ return Ok(());
+ }
+
+ let result = rt.block_on(worker.run_event_loop());
+ debug!("Worker thread shuts down {}", &name);
+ result
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -354,17 +429,14 @@ mod tests {
let main_module =
ModuleSpecifier::resolve_url_or_path("./hello.js").unwrap();
let program_state = ProgramState::mock(vec!["deno".to_string()], None);
- let mut worker = WebWorker::new(
+ WebWorker::new(
"TEST".to_string(),
Permissions::allow_all(),
main_module,
program_state,
false,
- );
- worker
- .execute("bootstrap.workerRuntime(\"TEST\", false)")
- .unwrap();
- worker
+ 1,
+ )
}
#[tokio::test]
diff --git a/cli/worker.rs b/cli/worker.rs
index 3068ab1f7..c2ed8871b 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -7,7 +7,6 @@ use crate::js;
use crate::metrics::Metrics;
use crate::module_loader::CliModuleLoader;
use crate::ops;
-use crate::ops::io::get_stdio;
use crate::permissions::Permissions;
use crate::program_state::ProgramState;
use crate::source_maps::apply_source_map;
@@ -148,7 +147,7 @@ impl MainWorker {
let op_state = js_runtime.op_state();
let mut op_state = op_state.borrow_mut();
let t = &mut op_state.resource_table;
- let (stdin, stdout, stderr) = get_stdio();
+ let (stdin, stdout, stderr) = ops::io::get_stdio();
if let Some(stream) = stdin {
t.add("stdin", Box::new(stream));
}