summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/main.rs79
-rw-r--r--cli/ops/web_worker.rs4
-rw-r--r--cli/ops/worker_host.rs6
-rw-r--r--cli/tools/repl.rs19
-rw-r--r--cli/web_worker.rs454
-rw-r--r--cli/worker.rs601
6 files changed, 574 insertions, 589 deletions
diff --git a/cli/main.rs b/cli/main.rs
index 47dd4087d..fdbfdad77 100644
--- a/cli/main.rs
+++ b/cli/main.rs
@@ -45,6 +45,7 @@ mod tools;
mod tsc;
mod tsc_config;
mod version;
+mod web_worker;
mod worker;
use crate::file_fetcher::File;
@@ -792,37 +793,31 @@ async fn test_command(
Ok(())
}
-pub fn main() {
- #[cfg(windows)]
- colors::enable_ansi(); // For Windows 10
-
- let args: Vec<String> = env::args().collect();
- let flags = flags::flags_from_vec(args);
-
- if let Some(ref v8_flags) = flags.v8_flags {
- let v8_flags_includes_help = v8_flags
- .iter()
- .any(|flag| flag == "-help" || flag == "--help");
- let v8_flags = once("UNUSED_BUT_NECESSARY_ARG0".to_owned())
- .chain(v8_flags.iter().cloned())
- .collect::<Vec<_>>();
- let unrecognized_v8_flags = v8_set_flags(v8_flags)
- .into_iter()
- .skip(1)
- .collect::<Vec<_>>();
- if !unrecognized_v8_flags.is_empty() {
- for f in unrecognized_v8_flags {
- eprintln!("error: V8 did not recognize flag '{}'", f);
- }
- eprintln!("\nFor a list of V8 flags, use '--v8-flags=--help'");
- std::process::exit(1);
- }
- if v8_flags_includes_help {
- std::process::exit(0);
+fn init_v8_flags(v8_flags: &[String]) {
+ let v8_flags_includes_help = v8_flags
+ .iter()
+ .any(|flag| flag == "-help" || flag == "--help");
+ let v8_flags = once("UNUSED_BUT_NECESSARY_ARG0".to_owned())
+ .chain(v8_flags.iter().cloned())
+ .collect::<Vec<_>>();
+ let unrecognized_v8_flags = v8_set_flags(v8_flags)
+ .into_iter()
+ .skip(1)
+ .collect::<Vec<_>>();
+ if !unrecognized_v8_flags.is_empty() {
+ for f in unrecognized_v8_flags {
+ eprintln!("error: V8 did not recognize flag '{}'", f);
}
+ eprintln!("\nFor a list of V8 flags, use '--v8-flags=--help'");
+ std::process::exit(1);
+ }
+ if v8_flags_includes_help {
+ std::process::exit(0);
}
+}
- let log_level = match flags.log_level {
+fn init_logger(maybe_level: Option<Level>) {
+ let log_level = match maybe_level {
Some(level) => level,
None => Level::Info, // Default log level
};
@@ -853,8 +848,12 @@ pub fn main() {
}
})
.init();
+}
- let fut = match flags.clone().subcommand {
+fn get_subcommand(
+ flags: Flags,
+) -> Pin<Box<dyn Future<Output = Result<(), AnyError>>>> {
+ match flags.clone().subcommand {
DenoSubcommand::Bundle {
source_file,
out_file,
@@ -914,7 +913,7 @@ pub fn main() {
eprintln!("{}", e);
std::process::exit(1);
}
- return;
+ std::process::exit(0);
}
DenoSubcommand::Types => {
let types = get_types(flags.unstable);
@@ -922,7 +921,7 @@ pub fn main() {
eprintln!("{}", e);
std::process::exit(1);
}
- return;
+ std::process::exit(0);
}
DenoSubcommand::Upgrade {
force,
@@ -934,9 +933,23 @@ pub fn main() {
tools::upgrade::upgrade_command(dry_run, force, version, output, ca_file)
.boxed_local()
}
- };
+ }
+}
+
+pub fn main() {
+ #[cfg(windows)]
+ colors::enable_ansi(); // For Windows 10
+
+ let args: Vec<String> = env::args().collect();
+ let flags = flags::flags_from_vec(args);
+
+ if let Some(ref v8_flags) = flags.v8_flags {
+ init_v8_flags(v8_flags);
+ }
+ init_logger(flags.log_level);
- let result = tokio_util::run_basic(fut);
+ let subcommand_future = get_subcommand(flags);
+ let result = tokio_util::run_basic(subcommand_future);
if let Err(err) = result {
eprintln!("{}: {}", colors::red_bold("error"), err.to_string());
std::process::exit(1);
diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs
index 42b6a56ce..d88330a04 100644
--- a/cli/ops/web_worker.rs
+++ b/cli/ops/web_worker.rs
@@ -1,7 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use crate::worker::WebWorkerHandle;
-use crate::worker::WorkerEvent;
+use crate::web_worker::WebWorkerHandle;
+use crate::web_worker::WorkerEvent;
use deno_core::futures::channel::mpsc;
use deno_core::serde_json::json;
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs
index f2e936ef2..c464e6df2 100644
--- a/cli/ops/worker_host.rs
+++ b/cli/ops/worker_host.rs
@@ -5,9 +5,9 @@ use crate::ops::io::get_stdio;
use crate::permissions::Permissions;
use crate::program_state::ProgramState;
use crate::tokio_util::create_basic_runtime;
-use crate::worker::WebWorker;
-use crate::worker::WebWorkerHandle;
-use crate::worker::WorkerEvent;
+use crate::web_worker::WebWorker;
+use crate::web_worker::WebWorkerHandle;
+use crate::web_worker::WorkerEvent;
use deno_core::error::generic_error;
use deno_core::error::AnyError;
use deno_core::error::JsError;
diff --git a/cli/tools/repl.rs b/cli/tools/repl.rs
index e6a2fd709..8786f57f3 100644
--- a/cli/tools/repl.rs
+++ b/cli/tools/repl.rs
@@ -4,7 +4,6 @@ use crate::colors;
use crate::inspector::InspectorSession;
use crate::program_state::ProgramState;
use crate::worker::MainWorker;
-use crate::worker::Worker;
use deno_core::error::AnyError;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
@@ -280,7 +279,7 @@ impl Highlighter for LineHighlighter {
}
async fn post_message_and_poll(
- worker: &mut Worker,
+ worker: &mut MainWorker,
session: &mut InspectorSession,
method: &str,
params: Option<Value>,
@@ -305,7 +304,7 @@ async fn post_message_and_poll(
}
async fn read_line_and_poll(
- worker: &mut Worker,
+ worker: &mut MainWorker,
session: &mut InspectorSession,
message_rx: &Receiver<(String, Option<Value>)>,
response_tx: &Sender<Result<Value, AnyError>>,
@@ -425,7 +424,7 @@ pub async fn run(
let history_file = program_state.dir.root.join("deno_history.txt");
- post_message_and_poll(&mut *worker, &mut session, "Runtime.enable", None)
+ post_message_and_poll(&mut worker, &mut session, "Runtime.enable", None)
.await?;
// Enabling the runtime domain will always send trigger one executionContextCreated for each
@@ -474,7 +473,7 @@ pub async fn run(
while !is_closing(&mut worker, &mut session, context_id).await? {
let line = read_line_and_poll(
- &mut *worker,
+ &mut worker,
&mut session,
&message_rx,
&response_tx,
@@ -495,7 +494,7 @@ pub async fn run(
};
let evaluate_response = post_message_and_poll(
- &mut *worker,
+ &mut worker,
&mut session,
"Runtime.evaluate",
Some(json!({
@@ -513,7 +512,7 @@ pub async fn run(
&& wrapped_line != line
{
post_message_and_poll(
- &mut *worker,
+ &mut worker,
&mut session,
"Runtime.evaluate",
Some(json!({
@@ -533,7 +532,7 @@ pub async fn run(
if evaluate_exception_details.is_some() {
post_message_and_poll(
- &mut *worker,
+ &mut worker,
&mut session,
"Runtime.callFunctionOn",
Some(json!({
@@ -546,7 +545,7 @@ pub async fn run(
).await?;
} else {
post_message_and_poll(
- &mut *worker,
+ &mut worker,
&mut session,
"Runtime.callFunctionOn",
Some(json!({
@@ -564,7 +563,7 @@ pub async fn run(
// Deno.inspectArgs.
let inspect_response =
post_message_and_poll(
- &mut *worker,
+ &mut worker,
&mut session,
"Runtime.callFunctionOn",
Some(json!({
diff --git a/cli/web_worker.rs b/cli/web_worker.rs
new file mode 100644
index 000000000..97db42279
--- /dev/null
+++ b/cli/web_worker.rs
@@ -0,0 +1,454 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use crate::colors;
+use crate::fmt_errors::PrettyJsError;
+use crate::inspector::DenoInspector;
+use crate::js;
+use crate::metrics::Metrics;
+use crate::module_loader::CliModuleLoader;
+use crate::ops;
+use crate::permissions::Permissions;
+use crate::program_state::ProgramState;
+use crate::source_maps::apply_source_map;
+use deno_core::error::AnyError;
+use deno_core::futures::channel::mpsc;
+use deno_core::futures::future::poll_fn;
+use deno_core::futures::future::FutureExt;
+use deno_core::futures::stream::StreamExt;
+use deno_core::futures::task::AtomicWaker;
+use deno_core::url::Url;
+use deno_core::v8;
+use deno_core::JsRuntime;
+use deno_core::ModuleSpecifier;
+use deno_core::RuntimeOptions;
+use std::env;
+use std::sync::atomic::AtomicBool;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
+use tokio::sync::Mutex as AsyncMutex;
+
+/// Events that are sent to host from child
+/// worker.
+pub enum WorkerEvent {
+ Message(Box<[u8]>),
+ Error(AnyError),
+ TerminalError(AnyError),
+}
+
+pub struct WorkerChannelsInternal {
+ pub sender: mpsc::Sender<WorkerEvent>,
+ pub receiver: mpsc::Receiver<Box<[u8]>>,
+}
+
+/// Wrapper for `WorkerHandle` that adds functionality
+/// for terminating workers.
+///
+/// This struct is used by host as well as worker itself.
+///
+/// Host uses it to communicate with worker and terminate it,
+/// while worker uses it only to finish execution on `self.close()`.
+#[derive(Clone)]
+pub struct WebWorkerHandle {
+ pub sender: mpsc::Sender<Box<[u8]>>,
+ pub receiver: Arc<AsyncMutex<mpsc::Receiver<WorkerEvent>>>,
+ terminate_tx: mpsc::Sender<()>,
+ terminated: Arc<AtomicBool>,
+ isolate_handle: v8::IsolateHandle,
+}
+
+impl WebWorkerHandle {
+ /// Post message to worker as a host.
+ pub fn post_message(&self, buf: Box<[u8]>) -> Result<(), AnyError> {
+ let mut sender = self.sender.clone();
+ sender.try_send(buf)?;
+ Ok(())
+ }
+
+ /// Get the event with lock.
+ /// Return error if more than one listener tries to get event
+ pub async fn get_event(&self) -> Result<Option<WorkerEvent>, AnyError> {
+ let mut receiver = self.receiver.try_lock()?;
+ Ok(receiver.next().await)
+ }
+
+ pub fn terminate(&self) {
+ // This function can be called multiple times by whomever holds
+ // the handle. However only a single "termination" should occur so
+ // we need a guard here.
+ let already_terminated = self.terminated.swap(true, Ordering::Relaxed);
+
+ if !already_terminated {
+ self.isolate_handle.terminate_execution();
+ let mut sender = self.terminate_tx.clone();
+ // This call should be infallible hence the `expect`.
+ // This might change in the future.
+ sender.try_send(()).expect("Failed to terminate");
+ }
+ }
+}
+
+fn create_channels(
+ isolate_handle: v8::IsolateHandle,
+ terminate_tx: mpsc::Sender<()>,
+) -> (WorkerChannelsInternal, WebWorkerHandle) {
+ let (in_tx, in_rx) = mpsc::channel::<Box<[u8]>>(1);
+ let (out_tx, out_rx) = mpsc::channel::<WorkerEvent>(1);
+ let internal_channels = WorkerChannelsInternal {
+ sender: out_tx,
+ receiver: in_rx,
+ };
+ let external_channels = WebWorkerHandle {
+ sender: in_tx,
+ receiver: Arc::new(AsyncMutex::new(out_rx)),
+ terminated: Arc::new(AtomicBool::new(false)),
+ terminate_tx,
+ isolate_handle,
+ };
+ (internal_channels, external_channels)
+}
+
+/// This struct is an implementation of `Worker` Web API
+///
+/// Each `WebWorker` is either a child of `MainWorker` or other
+/// `WebWorker`.
+pub struct WebWorker {
+ inspector: Option<Box<DenoInspector>>,
+ // Following fields are pub because they are accessed
+ // when creating a new WebWorker instance.
+ pub(crate) internal_channels: WorkerChannelsInternal,
+ pub(crate) js_runtime: JsRuntime,
+ pub(crate) name: String,
+ waker: AtomicWaker,
+ event_loop_idle: bool,
+ terminate_rx: mpsc::Receiver<()>,
+ handle: WebWorkerHandle,
+ pub has_deno_namespace: bool,
+}
+
+impl WebWorker {
+ pub fn new(
+ name: String,
+ permissions: Permissions,
+ main_module: ModuleSpecifier,
+ program_state: Arc<ProgramState>,
+ has_deno_namespace: bool,
+ ) -> Self {
+ let module_loader = CliModuleLoader::new_for_worker();
+ let global_state_ = program_state.clone();
+
+ let js_error_create_fn = Box::new(move |core_js_error| {
+ let source_mapped_error =
+ apply_source_map(&core_js_error, global_state_.clone());
+ PrettyJsError::create(source_mapped_error)
+ });
+
+ let mut js_runtime = JsRuntime::new(RuntimeOptions {
+ module_loader: Some(module_loader),
+ startup_snapshot: Some(js::deno_isolate_init()),
+ js_error_create_fn: Some(js_error_create_fn),
+ get_error_class_fn: Some(&crate::errors::get_error_class_name),
+ ..Default::default()
+ });
+
+ let inspector =
+ if let Some(inspector_server) = &program_state.maybe_inspector_server {
+ Some(DenoInspector::new(
+ &mut js_runtime,
+ Some(inspector_server.clone()),
+ ))
+ } else if program_state.flags.coverage || program_state.flags.repl {
+ Some(DenoInspector::new(&mut js_runtime, None))
+ } else {
+ None
+ };
+
+ let (terminate_tx, terminate_rx) = mpsc::channel::<()>(1);
+ let isolate_handle = js_runtime.v8_isolate().thread_safe_handle();
+ let (internal_channels, handle) =
+ create_channels(isolate_handle, terminate_tx);
+
+ let mut worker = Self {
+ inspector,
+ internal_channels,
+ js_runtime,
+ name,
+ waker: AtomicWaker::new(),
+ event_loop_idle: false,
+ terminate_rx,
+ handle,
+ has_deno_namespace,
+ };
+
+ {
+ let handle = worker.thread_safe_handle();
+ let sender = worker.internal_channels.sender.clone();
+ let js_runtime = &mut worker.js_runtime;
+ // All ops registered in this function depend on these
+ {
+ let op_state = js_runtime.op_state();
+ let mut op_state = op_state.borrow_mut();
+ op_state.put::<Metrics>(Default::default());
+ op_state.put::<Arc<ProgramState>>(program_state.clone());
+ op_state.put::<Permissions>(permissions);
+ }
+
+ ops::web_worker::init(js_runtime, sender.clone(), handle);
+ ops::runtime::init(js_runtime, main_module);
+ ops::fetch::init(js_runtime, program_state.flags.ca_file.as_deref());
+ ops::timers::init(js_runtime);
+ ops::worker_host::init(js_runtime, Some(sender));
+ ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close);
+ ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources);
+ ops::reg_json_sync(
+ js_runtime,
+ "op_domain_to_ascii",
+ deno_web::op_domain_to_ascii,
+ );
+ ops::errors::init(js_runtime);
+ ops::io::init(js_runtime);
+ ops::websocket::init(js_runtime);
+
+ if has_deno_namespace {
+ ops::fs_events::init(js_runtime);
+ ops::fs::init(js_runtime);
+ ops::net::init(js_runtime);
+ ops::os::init(js_runtime);
+ ops::permissions::init(js_runtime);
+ ops::plugin::init(js_runtime);
+ ops::process::init(js_runtime);
+ ops::crypto::init(js_runtime, program_state.flags.seed);
+ ops::runtime_compiler::init(js_runtime);
+ ops::signal::init(js_runtime);
+ ops::tls::init(js_runtime);
+ ops::tty::init(js_runtime);
+ }
+ }
+
+ worker
+ }
+
+ /// Same as execute2() but the filename defaults to "$CWD/__anonymous__".
+ pub fn execute(&mut self, js_source: &str) -> Result<(), AnyError> {
+ let path = env::current_dir().unwrap().join("__anonymous__");
+ let url = Url::from_file_path(path).unwrap();
+ self.js_runtime.execute(url.as_str(), js_source)
+ }
+
+ /// Loads, instantiates and executes specified JavaScript module.
+ pub async fn execute_module(
+ &mut self,
+ module_specifier: &ModuleSpecifier,
+ ) -> Result<(), AnyError> {
+ let id = self.js_runtime.load_module(module_specifier, None).await?;
+ self.js_runtime.mod_evaluate(id).await
+ }
+
+ /// Returns a way to communicate with the Worker from other threads.
+ pub fn thread_safe_handle(&self) -> WebWorkerHandle {
+ self.handle.clone()
+ }
+
+ pub fn poll_event_loop(
+ &mut self,
+ cx: &mut Context,
+ ) -> Poll<Result<(), AnyError>> {
+ let terminated = self.handle.terminated.load(Ordering::Relaxed);
+
+ if terminated {
+ return Poll::Ready(Ok(()));
+ }
+
+ if !self.event_loop_idle {
+ let poll_result = {
+ // We always poll the inspector if it exists.
+ let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx));
+ self.waker.register(cx.waker());
+ self.js_runtime.poll_event_loop(cx)
+ };
+ match poll_result {
+ Poll::Ready(r) => {
+ let terminated = self.handle.terminated.load(Ordering::Relaxed);
+ if terminated {
+ return Poll::Ready(Ok(()));
+ }
+
+ if let Err(e) = r {
+ eprintln!(
+ "{}: Uncaught (in worker \"{}\") {}",
+ colors::red_bold("error"),
+ self.name.to_string(),
+ e.to_string().trim_start_matches("Uncaught "),
+ );
+ let mut sender = self.internal_channels.sender.clone();
+ sender
+ .try_send(WorkerEvent::Error(e))
+ .expect("Failed to post message to host");
+ }
+ self.event_loop_idle = true;
+ }
+ Poll::Pending => {}
+ }
+ }
+
+ if let Poll::Ready(r) = self.terminate_rx.poll_next_unpin(cx) {
+ // terminate_rx should never be closed
+ assert!(r.is_some());
+ return Poll::Ready(Ok(()));
+ }
+
+ if let Poll::Ready(r) = self.internal_channels.receiver.poll_next_unpin(cx)
+ {
+ match r {
+ Some(msg) => {
+ let msg = String::from_utf8(msg.to_vec()).unwrap();
+ let script = format!("workerMessageRecvCallback({})", msg);
+
+ if let Err(e) = self.execute(&script) {
+ // If execution was terminated during message callback then
+ // just ignore it
+ if self.handle.terminated.load(Ordering::Relaxed) {
+ return Poll::Ready(Ok(()));
+ }
+
+ // Otherwise forward error to host
+ let mut sender = self.internal_channels.sender.clone();
+ sender
+ .try_send(WorkerEvent::Error(e))
+ .expect("Failed to post message to host");
+ }
+
+ // Let event loop be polled again
+ self.event_loop_idle = false;
+ self.waker.wake();
+ }
+ None => unreachable!(),
+ }
+ }
+
+ Poll::Pending
+ }
+
+ pub async fn run_event_loop(&mut self) -> Result<(), AnyError> {
+ poll_fn(|cx| self.poll_event_loop(cx)).await
+ }
+}
+
+impl Drop for WebWorker {
+ fn drop(&mut self) {
+ // The Isolate object must outlive the Inspector object, but this is
+ // currently not enforced by the type system.
+ self.inspector.take();
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::program_state::ProgramState;
+ use crate::tokio_util;
+ use deno_core::serde_json::json;
+
+ fn create_test_web_worker() -> WebWorker {
+ let main_module =
+ ModuleSpecifier::resolve_url_or_path("./hello.js").unwrap();
+ let program_state = ProgramState::mock(vec!["deno".to_string()], None);
+ let mut worker = WebWorker::new(
+ "TEST".to_string(),
+ Permissions::allow_all(),
+ main_module,
+ program_state,
+ false,
+ );
+ worker
+ .execute("bootstrap.workerRuntime(\"TEST\", false)")
+ .unwrap();
+ worker
+ }
+
+ #[tokio::test]
+ async fn test_worker_messages() {
+ let (handle_sender, handle_receiver) =
+ std::sync::mpsc::sync_channel::<WebWorkerHandle>(1);
+
+ let join_handle = std::thread::spawn(move || {
+ let mut worker = create_test_web_worker();
+ let source = r#"
+ onmessage = function(e) {
+ console.log("msg from main script", e.data);
+ if (e.data == "exit") {
+ return close();
+ } else {
+ console.assert(e.data === "hi");
+ }
+ postMessage([1, 2, 3]);
+ console.log("after postMessage");
+ }
+ "#;
+ worker.execute(source).unwrap();
+ let handle = worker.thread_safe_handle();
+ handle_sender.send(handle).unwrap();
+ let r = tokio_util::run_basic(worker.run_event_loop());
+ assert!(r.is_ok())
+ });
+
+ let mut handle = handle_receiver.recv().unwrap();
+
+ let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
+ let r = handle.post_message(msg.clone());
+ assert!(r.is_ok());
+
+ let maybe_msg = handle.get_event().await.unwrap();
+ assert!(maybe_msg.is_some());
+
+ let r = handle.post_message(msg.clone());
+ assert!(r.is_ok());
+
+ let maybe_msg = handle.get_event().await.unwrap();
+ assert!(maybe_msg.is_some());
+ match maybe_msg {
+ Some(WorkerEvent::Message(buf)) => {
+ assert_eq!(*buf, *b"[1,2,3]");
+ }
+ _ => unreachable!(),
+ }
+
+ let msg = json!("exit")
+ .to_string()
+ .into_boxed_str()
+ .into_boxed_bytes();
+ let r = handle.post_message(msg);
+ assert!(r.is_ok());
+ let event = handle.get_event().await.unwrap();
+ assert!(event.is_none());
+ handle.sender.close_channel();
+ join_handle.join().expect("Failed to join worker thread");
+ }
+
+ #[tokio::test]
+ async fn removed_from_resource_table_on_close() {
+ let (handle_sender, handle_receiver) =
+ std::sync::mpsc::sync_channel::<WebWorkerHandle>(1);
+
+ let join_handle = std::thread::spawn(move || {
+ let mut worker = create_test_web_worker();
+ worker.execute("onmessage = () => { close(); }").unwrap();
+ let handle = worker.thread_safe_handle();
+ handle_sender.send(handle).unwrap();
+ let r = tokio_util::run_basic(worker.run_event_loop());
+ assert!(r.is_ok())
+ });
+
+ let mut handle = handle_receiver.recv().unwrap();
+
+ let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
+ let r = handle.post_message(msg.clone());
+ assert!(r.is_ok());
+ let event = handle.get_event().await.unwrap();
+ assert!(event.is_none());
+ handle.sender.close_channel();
+
+ join_handle.join().expect("Failed to join worker thread");
+ }
+}
diff --git a/cli/worker.rs b/cli/worker.rs
index 74bee1d03..f4a919df6 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -1,6 +1,5 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use crate::colors;
use crate::fmt_errors::PrettyJsError;
use crate::inspector::DenoInspector;
use crate::inspector::InspectorSession;
@@ -13,109 +12,39 @@ use crate::permissions::Permissions;
use crate::program_state::ProgramState;
use crate::source_maps::apply_source_map;
use deno_core::error::AnyError;
-use deno_core::futures::channel::mpsc;
use deno_core::futures::future::poll_fn;
use deno_core::futures::future::FutureExt;
-use deno_core::futures::stream::StreamExt;
-use deno_core::futures::task::AtomicWaker;
use deno_core::url::Url;
-use deno_core::v8;
use deno_core::JsRuntime;
use deno_core::ModuleId;
use deno_core::ModuleSpecifier;
use deno_core::RuntimeOptions;
-use deno_core::Snapshot;
use std::env;
-use std::ops::Deref;
-use std::ops::DerefMut;
-use std::rc::Rc;
-use std::sync::atomic::AtomicBool;
-use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
-use tokio::sync::Mutex as AsyncMutex;
-/// Events that are sent to host from child
-/// worker.
-pub enum WorkerEvent {
- Message(Box<[u8]>),
- Error(AnyError),
- TerminalError(AnyError),
-}
-
-pub struct WorkerChannelsInternal {
- pub sender: mpsc::Sender<WorkerEvent>,
- pub receiver: mpsc::Receiver<Box<[u8]>>,
-}
-
-#[derive(Clone)]
-pub struct WorkerHandle {
- pub sender: mpsc::Sender<Box<[u8]>>,
- pub receiver: Arc<AsyncMutex<mpsc::Receiver<WorkerEvent>>>,
-}
-
-impl WorkerHandle {
- /// Post message to worker as a host.
- pub fn post_message(&self, buf: Box<[u8]>) -> Result<(), AnyError> {
- let mut sender = self.sender.clone();
- sender.try_send(buf)?;
- Ok(())
- }
-
- /// Get the event with lock.
- /// Return error if more than one listener tries to get event
- pub async fn get_event(&self) -> Result<Option<WorkerEvent>, AnyError> {
- let mut receiver = self.receiver.try_lock()?;
- Ok(receiver.next().await)
- }
-}
-
-fn create_channels() -> (WorkerChannelsInternal, WorkerHandle) {
- let (in_tx, in_rx) = mpsc::channel::<Box<[u8]>>(1);
- let (out_tx, out_rx) = mpsc::channel::<WorkerEvent>(1);
- let internal_channels = WorkerChannelsInternal {
- sender: out_tx,
- receiver: in_rx,
- };
- let external_channels = WorkerHandle {
- sender: in_tx,
- receiver: Arc::new(AsyncMutex::new(out_rx)),
- };
- (internal_channels, external_channels)
-}
-
-/// Worker is a CLI wrapper for `deno_core::Isolate`.
-///
-/// It provides infrastructure to communicate with a worker and
-/// consequently between workers.
+/// This worker is created and used by almost all
+/// subcommands in Deno executable.
///
-/// This struct is meant to be used as a base struct for concrete
-/// type of worker that registers set of ops.
+/// It provides ops available in the `Deno` namespace.
///
-/// Currently there are two types of workers:
-/// - `MainWorker`
-/// - `WebWorker`
-pub struct Worker {
- external_channels: WorkerHandle,
+/// All `WebWorker`s created during program execution
+/// are descendants of this worker.
+pub struct MainWorker {
inspector: Option<Box<DenoInspector>>,
- // Following fields are pub because they are accessed
- // when creating a new WebWorker instance.
- pub(crate) internal_channels: WorkerChannelsInternal,
- pub(crate) js_runtime: JsRuntime,
- pub(crate) name: String,
+ js_runtime: JsRuntime,
should_break_on_first_statement: bool,
- waker: AtomicWaker,
}
-impl Worker {
+impl MainWorker {
pub fn new(
- name: String,
- startup_snapshot: Snapshot,
- program_state: Arc<ProgramState>,
- module_loader: Rc<CliModuleLoader>,
- is_main: bool,
+ program_state: &Arc<ProgramState>,
+ main_module: ModuleSpecifier,
+ permissions: Permissions,
) -> Self {
+ let module_loader =
+ CliModuleLoader::new(program_state.maybe_import_map.clone());
let global_state_ = program_state.clone();
let js_error_create_fn = Box::new(move |core_js_error| {
@@ -126,7 +55,7 @@ impl Worker {
let mut js_runtime = JsRuntime::new(RuntimeOptions {
module_loader: Some(module_loader),
- startup_snapshot: Some(startup_snapshot),
+ startup_snapshot: Some(js::deno_isolate_init()),
js_error_create_fn: Some(js_error_create_fn),
get_error_class_fn: Some(&crate::errors::get_error_class_name),
..Default::default()
@@ -144,126 +73,15 @@ impl Worker {
None
};
- let should_break_on_first_statement = inspector.is_some()
- && is_main
- && program_state.flags.inspect_brk.is_some();
-
- let (internal_channels, external_channels) = create_channels();
+ let should_break_on_first_statement =
+ inspector.is_some() && program_state.flags.inspect_brk.is_some();
- Self {
- external_channels,
+ let mut worker = Self {
inspector,
- internal_channels,
js_runtime,
- name,
should_break_on_first_statement,
- waker: AtomicWaker::new(),
- }
- }
-
- /// Same as execute2() but the filename defaults to "$CWD/__anonymous__".
- pub fn execute(&mut self, js_source: &str) -> Result<(), AnyError> {
- let path = env::current_dir().unwrap().join("__anonymous__");
- let url = Url::from_file_path(path).unwrap();
- self.execute2(url.as_str(), js_source)
- }
-
- /// Executes the provided JavaScript source code. The js_filename argument is
- /// provided only for debugging purposes.
- pub fn execute2(
- &mut self,
- js_filename: &str,
- js_source: &str,
- ) -> Result<(), AnyError> {
- self.js_runtime.execute(js_filename, js_source)
- }
-
- /// Loads and instantiates specified JavaScript module.
- pub async fn preload_module(
- &mut self,
- module_specifier: &ModuleSpecifier,
- ) -> Result<ModuleId, AnyError> {
- self.js_runtime.load_module(module_specifier, None).await
- }
-
- /// Loads, instantiates and executes specified JavaScript module.
- pub async fn execute_module(
- &mut self,
- module_specifier: &ModuleSpecifier,
- ) -> Result<(), AnyError> {
- let id = self.preload_module(module_specifier).await?;
- self.wait_for_inspector_session();
- self.js_runtime.mod_evaluate(id).await
- }
-
- /// Returns a way to communicate with the Worker from other threads.
- pub fn thread_safe_handle(&self) -> WorkerHandle {
- self.external_channels.clone()
- }
-
- fn wait_for_inspector_session(&mut self) {
- if self.should_break_on_first_statement {
- self
- .inspector
- .as_mut()
- .unwrap()
- .wait_for_session_and_break_on_next_statement()
- }
- }
-
- /// Create new inspector session. This function panics if Worker
- /// was not configured to create inspector.
- pub fn create_inspector_session(&mut self) -> Box<InspectorSession> {
- let inspector = self.inspector.as_mut().unwrap();
-
- InspectorSession::new(&mut **inspector)
- }
-
- pub fn poll_event_loop(
- &mut self,
- cx: &mut Context,
- ) -> Poll<Result<(), AnyError>> {
- // We always poll the inspector if it exists.
- let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx));
- self.waker.register(cx.waker());
- self.js_runtime.poll_event_loop(cx)
- }
-
- pub async fn run_event_loop(&mut self) -> Result<(), AnyError> {
- poll_fn(|cx| self.poll_event_loop(cx)).await
- }
-}
-
-impl Drop for Worker {
- fn drop(&mut self) {
- // The Isolate object must outlive the Inspector object, but this is
- // currently not enforced by the type system.
- self.inspector.take();
- }
-}
-
-/// This worker is created and used by Deno executable.
-///
-/// It provides ops available in the `Deno` namespace.
-///
-/// All WebWorkers created during program execution are descendants of
-/// this worker.
-pub struct MainWorker(Worker);
+ };
-impl MainWorker {
- pub fn new(
- program_state: &Arc<ProgramState>,
- main_module: ModuleSpecifier,
- permissions: Permissions,
- ) -> Self {
- let loader = CliModuleLoader::new(program_state.maybe_import_map.clone());
- let mut worker = Worker::new(
- "main".to_string(),
- js::deno_isolate_init(),
- program_state.clone(),
- loader,
- true,
- );
let js_runtime = &mut worker.js_runtime;
{
// All ops registered in this function depend on these
@@ -320,266 +138,71 @@ impl MainWorker {
worker
.execute("bootstrap.mainRuntime()")
.expect("Failed to execute bootstrap script");
- Self(worker)
- }
-}
-
-impl Deref for MainWorker {
- type Target = Worker;
- fn deref(&self) -> &Self::Target {
- &self.0
- }
-}
-
-impl DerefMut for MainWorker {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.0
+ worker
}
-}
-
-/// Wrapper for `WorkerHandle` that adds functionality
-/// for terminating workers.
-///
-/// This struct is used by host as well as worker itself.
-///
-/// Host uses it to communicate with worker and terminate it,
-/// while worker uses it only to finish execution on `self.close()`.
-#[derive(Clone)]
-pub struct WebWorkerHandle {
- worker_handle: WorkerHandle,
- terminate_tx: mpsc::Sender<()>,
- terminated: Arc<AtomicBool>,
- isolate_handle: v8::IsolateHandle,
-}
-impl Deref for WebWorkerHandle {
- type Target = WorkerHandle;
- fn deref(&self) -> &Self::Target {
- &self.worker_handle
+ /// Same as execute2() but the filename defaults to "$CWD/__anonymous__".
+ pub fn execute(&mut self, js_source: &str) -> Result<(), AnyError> {
+ let path = env::current_dir().unwrap().join("__anonymous__");
+ let url = Url::from_file_path(path).unwrap();
+ self.js_runtime.execute(url.as_str(), js_source)
}
-}
-impl DerefMut for WebWorkerHandle {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.worker_handle
+ /// Loads and instantiates specified JavaScript module.
+ pub async fn preload_module(
+ &mut self,
+ module_specifier: &ModuleSpecifier,
+ ) -> Result<ModuleId, AnyError> {
+ self.js_runtime.load_module(module_specifier, None).await
}
-}
-
-impl WebWorkerHandle {
- pub fn terminate(&self) {
- // This function can be called multiple times by whomever holds
- // the handle. However only a single "termination" should occur so
- // we need a guard here.
- let already_terminated = self.terminated.swap(true, Ordering::Relaxed);
- if !already_terminated {
- self.isolate_handle.terminate_execution();
- let mut sender = self.terminate_tx.clone();
- // This call should be infallible hence the `expect`.
- // This might change in the future.
- sender.try_send(()).expect("Failed to terminate");
- }
+ /// Loads, instantiates and executes specified JavaScript module.
+ pub async fn execute_module(
+ &mut self,
+ module_specifier: &ModuleSpecifier,
+ ) -> Result<(), AnyError> {
+ let id = self.preload_module(module_specifier).await?;
+ self.wait_for_inspector_session();
+ self.js_runtime.mod_evaluate(id).await
}
-}
-
-/// This worker is implementation of `Worker` Web API
-///
-/// At the moment this type of worker supports only
-/// communication with parent and creating new workers.
-///
-/// Each `WebWorker` is either a child of `MainWorker` or other
-/// `WebWorker`.
-pub struct WebWorker {
- worker: Worker,
- event_loop_idle: bool,
- terminate_rx: mpsc::Receiver<()>,
- handle: WebWorkerHandle,
- pub has_deno_namespace: bool,
-}
-
-impl WebWorker {
- pub fn new(
- name: String,
- permissions: Permissions,
- main_module: ModuleSpecifier,
- program_state: Arc<ProgramState>,
- has_deno_namespace: bool,
- ) -> Self {
- let loader = CliModuleLoader::new_for_worker();
- let mut worker = Worker::new(
- name,
- js::deno_isolate_init(),
- program_state.clone(),
- loader,
- false,
- );
-
- let terminated = Arc::new(AtomicBool::new(false));
- let isolate_handle = worker.js_runtime.v8_isolate().thread_safe_handle();
- let (terminate_tx, terminate_rx) = mpsc::channel::<()>(1);
-
- let handle = WebWorkerHandle {
- worker_handle: worker.thread_safe_handle(),
- terminated,
- isolate_handle,
- terminate_tx,
- };
-
- let mut web_worker = Self {
- worker,
- event_loop_idle: false,
- terminate_rx,
- handle,
- has_deno_namespace,
- };
-
- {
- let handle = web_worker.thread_safe_handle();
- let sender = web_worker.worker.internal_channels.sender.clone();
- let js_runtime = &mut web_worker.js_runtime;
- // All ops registered in this function depend on these
- {
- let op_state = js_runtime.op_state();
- let mut op_state = op_state.borrow_mut();
- op_state.put::<Metrics>(Default::default());
- op_state.put::<Arc<ProgramState>>(program_state.clone());
- op_state.put::<Permissions>(permissions);
- }
- ops::web_worker::init(js_runtime, sender.clone(), handle);
- ops::runtime::init(js_runtime, main_module);
- ops::fetch::init(js_runtime, program_state.flags.ca_file.as_deref());
- ops::timers::init(js_runtime);
- ops::worker_host::init(js_runtime, Some(sender));
- ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close);
- ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources);
- ops::reg_json_sync(
- js_runtime,
- "op_domain_to_ascii",
- deno_web::op_domain_to_ascii,
- );
- ops::errors::init(js_runtime);
- ops::io::init(js_runtime);
- ops::websocket::init(js_runtime);
-
- if has_deno_namespace {
- ops::fs_events::init(js_runtime);
- ops::fs::init(js_runtime);
- ops::net::init(js_runtime);
- ops::os::init(js_runtime);
- ops::permissions::init(js_runtime);
- ops::plugin::init(js_runtime);
- ops::process::init(js_runtime);
- ops::crypto::init(js_runtime, program_state.flags.seed);
- ops::runtime_compiler::init(js_runtime);
- ops::signal::init(js_runtime);
- ops::tls::init(js_runtime);
- ops::tty::init(js_runtime);
- }
+ fn wait_for_inspector_session(&mut self) {
+ if self.should_break_on_first_statement {
+ self
+ .inspector
+ .as_mut()
+ .unwrap()
+ .wait_for_session_and_break_on_next_statement()
}
-
- web_worker
}
-}
-impl WebWorker {
- /// Returns a way to communicate with the Worker from other threads.
- pub fn thread_safe_handle(&self) -> WebWorkerHandle {
- self.handle.clone()
- }
+ /// Create new inspector session. This function panics if Worker
+ /// was not configured to create inspector.
+ pub fn create_inspector_session(&mut self) -> Box<InspectorSession> {
+ let inspector = self.inspector.as_mut().unwrap();
- pub async fn run_event_loop(&mut self) -> Result<(), AnyError> {
- poll_fn(|cx| self.poll_event_loop(cx)).await
+ InspectorSession::new(&mut **inspector)
}
pub fn poll_event_loop(
&mut self,
cx: &mut Context,
) -> Poll<Result<(), AnyError>> {
- let worker = &mut self.worker;
-
- let terminated = self.handle.terminated.load(Ordering::Relaxed);
-
- if terminated {
- return Poll::Ready(Ok(()));
- }
-
- if !self.event_loop_idle {
- match worker.poll_event_loop(cx) {
- Poll::Ready(r) => {
- let terminated = self.handle.terminated.load(Ordering::Relaxed);
- if terminated {
- return Poll::Ready(Ok(()));
- }
-
- if let Err(e) = r {
- eprintln!(
- "{}: Uncaught (in worker \"{}\") {}",
- colors::red_bold("error"),
- worker.name.to_string(),
- e.to_string().trim_start_matches("Uncaught "),
- );
- let mut sender = worker.internal_channels.sender.clone();
- sender
- .try_send(WorkerEvent::Error(e))
- .expect("Failed to post message to host");
- }
- self.event_loop_idle = true;
- }
- Poll::Pending => {}
- }
- }
-
- if let Poll::Ready(r) = self.terminate_rx.poll_next_unpin(cx) {
- // terminate_rx should never be closed
- assert!(r.is_some());
- return Poll::Ready(Ok(()));
- }
-
- if let Poll::Ready(r) =
- worker.internal_channels.receiver.poll_next_unpin(cx)
- {
- match r {
- Some(msg) => {
- let msg = String::from_utf8(msg.to_vec()).unwrap();
- let script = format!("workerMessageRecvCallback({})", msg);
-
- if let Err(e) = worker.execute(&script) {
- // If execution was terminated during message callback then
- // just ignore it
- if self.handle.terminated.load(Ordering::Relaxed) {
- return Poll::Ready(Ok(()));
- }
-
- // Otherwise forward error to host
- let mut sender = worker.internal_channels.sender.clone();
- sender
- .try_send(WorkerEvent::Error(e))
- .expect("Failed to post message to host");
- }
-
- // Let event loop be polled again
- self.event_loop_idle = false;
- worker.waker.wake();
- }
- None => unreachable!(),
- }
- }
-
- Poll::Pending
+ // We always poll the inspector if it exists.
+ let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx));
+ self.js_runtime.poll_event_loop(cx)
}
-}
-impl Deref for WebWorker {
- type Target = Worker;
- fn deref(&self) -> &Self::Target {
- &self.worker
+ pub async fn run_event_loop(&mut self) -> Result<(), AnyError> {
+ poll_fn(|cx| self.poll_event_loop(cx)).await
}
}
-impl DerefMut for WebWorker {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.worker
+impl Drop for MainWorker {
+ fn drop(&mut self) {
+ // The Isolate object must outlive the Inspector object, but this is
+ // currently not enforced by the type system.
+ self.inspector.take();
}
}
@@ -589,9 +212,6 @@ mod tests {
use crate::flags::DenoSubcommand;
use crate::flags::Flags;
use crate::program_state::ProgramState;
- use crate::tokio_util;
- use crate::worker::WorkerEvent;
- use deno_core::serde_json::json;
fn create_test_worker() -> MainWorker {
let main_module =
@@ -687,105 +307,4 @@ mod tests {
let result = worker.execute_module(&module_specifier).await;
assert!(result.is_ok());
}
-
- fn create_test_web_worker() -> WebWorker {
- let main_module =
- ModuleSpecifier::resolve_url_or_path("./hello.js").unwrap();
- let program_state = ProgramState::mock(vec!["deno".to_string()], None);
- let mut worker = WebWorker::new(
- "TEST".to_string(),
- Permissions::allow_all(),
- main_module,
- program_state,
- false,
- );
- worker
- .execute("bootstrap.workerRuntime(\"TEST\", false)")
- .unwrap();
- worker
- }
- #[tokio::test]
- async fn test_worker_messages() {
- let (handle_sender, handle_receiver) =
- std::sync::mpsc::sync_channel::<WebWorkerHandle>(1);
-
- let join_handle = std::thread::spawn(move || {
- let mut worker = create_test_web_worker();
- let source = r#"
- onmessage = function(e) {
- console.log("msg from main script", e.data);
- if (e.data == "exit") {
- return close();
- } else {
- console.assert(e.data === "hi");
- }
- postMessage([1, 2, 3]);
- console.log("after postMessage");
- }
- "#;
- worker.execute(source).unwrap();
- let handle = worker.thread_safe_handle();
- handle_sender.send(handle).unwrap();
- let r = tokio_util::run_basic(worker.run_event_loop());
- assert!(r.is_ok())
- });
-
- let mut handle = handle_receiver.recv().unwrap();
-
- let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = handle.post_message(msg.clone());
- assert!(r.is_ok());
-
- let maybe_msg = handle.get_event().await.unwrap();
- assert!(maybe_msg.is_some());
-
- let r = handle.post_message(msg.clone());
- assert!(r.is_ok());
-
- let maybe_msg = handle.get_event().await.unwrap();
- assert!(maybe_msg.is_some());
- match maybe_msg {
- Some(WorkerEvent::Message(buf)) => {
- assert_eq!(*buf, *b"[1,2,3]");
- }
- _ => unreachable!(),
- }
-
- let msg = json!("exit")
- .to_string()
- .into_boxed_str()
- .into_boxed_bytes();
- let r = handle.post_message(msg);
- assert!(r.is_ok());
- let event = handle.get_event().await.unwrap();
- assert!(event.is_none());
- handle.sender.close_channel();
- join_handle.join().expect("Failed to join worker thread");
- }
-
- #[tokio::test]
- async fn removed_from_resource_table_on_close() {
- let (handle_sender, handle_receiver) =
- std::sync::mpsc::sync_channel::<WebWorkerHandle>(1);
-
- let join_handle = std::thread::spawn(move || {
- let mut worker = create_test_web_worker();
- worker.execute("onmessage = () => { close(); }").unwrap();
- let handle = worker.thread_safe_handle();
- handle_sender.send(handle).unwrap();
- let r = tokio_util::run_basic(worker.run_event_loop());
- assert!(r.is_ok())
- });
-
- let mut handle = handle_receiver.recv().unwrap();
-
- let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = handle.post_message(msg.clone());
- assert!(r.is_ok());
- let event = handle.get_event().await.unwrap();
- assert!(event.is_none());
- handle.sender.close_channel();
-
- join_handle.join().expect("Failed to join worker thread");
- }
}