summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/compilers/compiler_worker.rs78
-rw-r--r--cli/compilers/mod.rs1
-rw-r--r--cli/compilers/ts.rs14
-rw-r--r--cli/compilers/wasm.rs9
-rw-r--r--cli/lib.rs13
-rw-r--r--cli/ops/compiler.rs48
-rw-r--r--cli/ops/mod.rs1
-rw-r--r--cli/ops/runtime_compiler.rs56
-rw-r--r--cli/ops/web_worker.rs30
-rw-r--r--cli/ops/worker_host.rs28
-rw-r--r--cli/state.rs9
-rw-r--r--cli/web_worker.rs135
-rw-r--r--cli/worker.rs157
13 files changed, 299 insertions, 280 deletions
diff --git a/cli/compilers/compiler_worker.rs b/cli/compilers/compiler_worker.rs
new file mode 100644
index 000000000..461194c37
--- /dev/null
+++ b/cli/compilers/compiler_worker.rs
@@ -0,0 +1,78 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+use crate::ops;
+use crate::state::ThreadSafeState;
+use crate::worker::Worker;
+use crate::worker::WorkerChannels;
+use deno_core;
+use deno_core::ErrBox;
+use deno_core::StartupData;
+use futures::future::FutureExt;
+use std::future::Future;
+use std::ops::Deref;
+use std::ops::DerefMut;
+use std::pin::Pin;
+use std::task::Context;
+use std::task::Poll;
+
+/// This worker is used to host TypeScript and WASM compilers.
+///
+/// It provides minimal set of ops that are necessary to facilitate
+/// compilation.
+///
+/// NOTE: This worker is considered priveleged, because it may
+/// access file system without permission check.
+///
+/// At the moment this worker is meant to be single-use - after
+/// performing single compilation/bundling it should be destroyed.
+///
+/// TODO(bartlomieju): add support to reuse the worker - or in other
+/// words support stateful TS compiler
+#[derive(Clone)]
+pub struct CompilerWorker(Worker);
+
+impl CompilerWorker {
+ pub fn new(
+ name: String,
+ startup_data: StartupData,
+ state: ThreadSafeState,
+ external_channels: WorkerChannels,
+ ) -> Self {
+ let state_ = state.clone();
+ let worker = Worker::new(name, startup_data, state_, external_channels);
+ {
+ let mut isolate = worker.isolate.try_lock().unwrap();
+ ops::compiler::init(&mut isolate, &state);
+ ops::web_worker::init(&mut isolate, &state);
+ // TODO(bartlomieju): CompilerWorker should not
+ // depend on those ops
+ ops::os::init(&mut isolate, &state);
+ ops::files::init(&mut isolate, &state);
+ ops::fs::init(&mut isolate, &state);
+ ops::io::init(&mut isolate, &state);
+ }
+
+ Self(worker)
+ }
+}
+
+impl Deref for CompilerWorker {
+ type Target = Worker;
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl DerefMut for CompilerWorker {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
+ }
+}
+
+impl Future for CompilerWorker {
+ type Output = Result<(), ErrBox>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ inner.0.poll_unpin(cx)
+ }
+}
diff --git a/cli/compilers/mod.rs b/cli/compilers/mod.rs
index 87e34ac5f..f4aac3681 100644
--- a/cli/compilers/mod.rs
+++ b/cli/compilers/mod.rs
@@ -3,6 +3,7 @@ use deno_core::ErrBox;
use futures::Future;
use serde_json::Value;
+mod compiler_worker;
mod js;
mod json;
mod ts;
diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs
index f3147334f..3adf92495 100644
--- a/cli/compilers/ts.rs
+++ b/cli/compilers/ts.rs
@@ -1,4 +1,5 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+use super::compiler_worker::CompilerWorker;
use crate::compilers::CompilationResultFuture;
use crate::compilers::CompiledModule;
use crate::compilers::CompiledModuleFuture;
@@ -13,7 +14,6 @@ use crate::source_maps::SourceMapGetter;
use crate::startup_data;
use crate::state::*;
use crate::version;
-use crate::worker::Worker;
use deno_core::Buf;
use deno_core::ErrBox;
use deno_core::ModuleSpecifier;
@@ -228,7 +228,7 @@ impl TsCompiler {
}
/// Create a new V8 worker with snapshot of TS compiler and setup compiler's runtime.
- fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker {
+ fn setup_worker(global_state: ThreadSafeGlobalState) -> CompilerWorker {
let (int, ext) = ThreadSafeState::create_channels();
let worker_state =
ThreadSafeState::new(global_state.clone(), None, None, int)
@@ -240,7 +240,7 @@ impl TsCompiler {
.compiler_starts
.fetch_add(1, Ordering::SeqCst);
- let mut worker = Worker::new(
+ let mut worker = CompilerWorker::new(
"TS".to_string(),
startup_data::compiler_isolate_init(),
worker_state,
@@ -279,7 +279,7 @@ impl TsCompiler {
worker.post_message(req_msg).await?;
worker.await?;
debug!("Sent message to worker");
- let maybe_msg = worker_.get_message().await?;
+ let maybe_msg = worker_.get_message().await;
debug!("Received message from worker");
if let Some(msg) = maybe_msg {
let json_str = std::str::from_utf8(&msg).unwrap();
@@ -378,7 +378,7 @@ impl TsCompiler {
worker.post_message(req_msg).await?;
worker.await?;
debug!("Sent message to worker");
- let maybe_msg = worker_.get_message().await?;
+ let maybe_msg = worker_.get_message().await;
if let Some(msg) = maybe_msg {
let json_str = std::str::from_utf8(&msg).unwrap();
debug!("Message: {}", json_str);
@@ -633,7 +633,7 @@ pub fn runtime_compile_async<S: BuildHasher>(
worker.post_message(req_msg).await?;
worker.await?;
debug!("Sent message to worker");
- let msg = (worker_.get_message().await?).unwrap();
+ let msg = (worker_.get_message().await).unwrap();
let json_str = std::str::from_utf8(&msg).unwrap();
Ok(json!(json_str))
}
@@ -661,7 +661,7 @@ pub fn runtime_transpile_async<S: BuildHasher>(
worker.post_message(req_msg).await?;
worker.await?;
debug!("Sent message to worker");
- let msg = (worker_.get_message().await?).unwrap();
+ let msg = (worker_.get_message().await).unwrap();
let json_str = std::str::from_utf8(&msg).unwrap();
Ok(json!(json_str))
}
diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs
index e2a293f18..637fc7687 100644
--- a/cli/compilers/wasm.rs
+++ b/cli/compilers/wasm.rs
@@ -1,11 +1,11 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+use super::compiler_worker::CompilerWorker;
use crate::compilers::CompiledModule;
use crate::compilers::CompiledModuleFuture;
use crate::file_fetcher::SourceFile;
use crate::global_state::ThreadSafeGlobalState;
use crate::startup_data;
use crate::state::*;
-use crate::worker::Worker;
use futures::FutureExt;
use serde_derive::Deserialize;
use serde_json;
@@ -42,7 +42,7 @@ pub struct WasmCompiler {
impl WasmCompiler {
/// Create a new V8 worker with snapshot of WASM compiler and setup compiler's runtime.
- fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker {
+ fn setup_worker(global_state: ThreadSafeGlobalState) -> CompilerWorker {
let (int, ext) = ThreadSafeState::create_channels();
let worker_state =
ThreadSafeState::new(global_state.clone(), None, None, int)
@@ -54,7 +54,7 @@ impl WasmCompiler {
.compiler_starts
.fetch_add(1, Ordering::SeqCst);
- let mut worker = Worker::new(
+ let mut worker = CompilerWorker::new(
"WASM".to_string(),
startup_data::compiler_isolate_init(),
worker_state,
@@ -100,10 +100,9 @@ impl WasmCompiler {
std::process::exit(1);
}
debug!("Sent message to worker");
- let maybe_msg = worker_.get_message().await.expect("not handled");
+ let json_msg = worker_.get_message().await.expect("not handled");
debug!("Received message from worker");
- let json_msg = maybe_msg.unwrap();
let module_info: WasmModuleInfo =
serde_json::from_slice(&json_msg).unwrap();
debug!("WASM module info: {:#?}", &module_info);
diff --git a/cli/lib.rs b/cli/lib.rs
index c0ad84c85..53dac1ea9 100644
--- a/cli/lib.rs
+++ b/cli/lib.rs
@@ -60,7 +60,7 @@ use crate::global_state::ThreadSafeGlobalState;
use crate::ops::io::get_stdio;
use crate::progress::Progress;
use crate::state::ThreadSafeState;
-use crate::worker::Worker;
+use crate::worker::MainWorker;
use deno_core::v8_set_flags;
use deno_core::ErrBox;
use deno_core::ModuleSpecifier;
@@ -97,7 +97,7 @@ impl log::Log for Logger {
fn create_worker_and_state(
flags: DenoFlags,
-) -> (Worker, ThreadSafeGlobalState) {
+) -> (MainWorker, ThreadSafeGlobalState) {
use crate::shell::Shell;
use std::sync::Arc;
use std::sync::Mutex;
@@ -135,7 +135,7 @@ fn create_worker_and_state(
resource_table.add("stderr", Box::new(stderr));
}
- let worker = Worker::new(
+ let worker = MainWorker::new(
"main".to_string(),
startup_data::deno_isolate_init(),
state,
@@ -150,7 +150,7 @@ fn types_command() {
println!("{}", content);
}
-fn print_cache_info(worker: Worker) {
+fn print_cache_info(worker: MainWorker) {
let state = &worker.state.global_state;
println!(
@@ -170,7 +170,10 @@ fn print_cache_info(worker: Worker) {
);
}
-async fn print_file_info(worker: Worker, module_specifier: ModuleSpecifier) {
+async fn print_file_info(
+ worker: MainWorker,
+ module_specifier: ModuleSpecifier,
+) {
let global_state_ = &worker.state.global_state;
let maybe_source_file = global_state_
diff --git a/cli/ops/compiler.rs b/cli/ops/compiler.rs
index 5d6875fb0..e515081df 100644
--- a/cli/ops/compiler.rs
+++ b/cli/ops/compiler.rs
@@ -1,14 +1,11 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
-use crate::compilers::runtime_compile_async;
-use crate::compilers::runtime_transpile_async;
use crate::futures::future::try_join_all;
use crate::msg;
use crate::ops::json_op;
use crate::state::ThreadSafeState;
use deno_core::Loader;
use deno_core::*;
-use std::collections::HashMap;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
i.register_op("cache", s.core_op(json_op(s.stateful_op(op_cache))));
@@ -20,8 +17,6 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
"fetch_source_files",
s.core_op(json_op(s.stateful_op(op_fetch_source_files))),
);
- i.register_op("compile", s.core_op(json_op(s.stateful_op(op_compile))));
- i.register_op("transpile", s.core_op(json_op(s.stateful_op(op_transpile))));
}
#[derive(Deserialize)]
@@ -150,46 +145,3 @@ fn op_fetch_source_files(
Ok(JsonOp::Async(future))
}
-
-#[derive(Deserialize, Debug)]
-#[serde(rename_all = "camelCase")]
-struct CompileArgs {
- root_name: String,
- sources: Option<HashMap<String, String>>,
- bundle: bool,
- options: Option<String>,
-}
-
-fn op_compile(
- state: &ThreadSafeState,
- args: Value,
- _zero_copy: Option<PinnedBuf>,
-) -> Result<JsonOp, ErrBox> {
- let args: CompileArgs = serde_json::from_value(args)?;
- Ok(JsonOp::Async(runtime_compile_async(
- state.global_state.clone(),
- &args.root_name,
- &args.sources,
- args.bundle,
- &args.options,
- )))
-}
-
-#[derive(Deserialize, Debug)]
-struct TranspileArgs {
- sources: HashMap<String, String>,
- options: Option<String>,
-}
-
-fn op_transpile(
- state: &ThreadSafeState,
- args: Value,
- _zero_copy: Option<PinnedBuf>,
-) -> Result<JsonOp, ErrBox> {
- let args: TranspileArgs = serde_json::from_value(args)?;
- Ok(JsonOp::Async(runtime_transpile_async(
- state.global_state.clone(),
- &args.sources,
- &args.options,
- )))
-}
diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs
index 203d1e17e..81f95ffb9 100644
--- a/cli/ops/mod.rs
+++ b/cli/ops/mod.rs
@@ -21,6 +21,7 @@ pub mod process;
pub mod random;
pub mod repl;
pub mod resources;
+pub mod runtime_compiler;
pub mod timers;
pub mod tls;
pub mod web_worker;
diff --git a/cli/ops/runtime_compiler.rs b/cli/ops/runtime_compiler.rs
new file mode 100644
index 000000000..4202f6b3c
--- /dev/null
+++ b/cli/ops/runtime_compiler.rs
@@ -0,0 +1,56 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+use super::dispatch_json::{Deserialize, JsonOp, Value};
+use crate::compilers::runtime_compile_async;
+use crate::compilers::runtime_transpile_async;
+use crate::ops::json_op;
+use crate::state::ThreadSafeState;
+use deno_core::*;
+use std::collections::HashMap;
+
+pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
+ i.register_op("compile", s.core_op(json_op(s.stateful_op(op_compile))));
+ i.register_op("transpile", s.core_op(json_op(s.stateful_op(op_transpile))));
+}
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase")]
+struct CompileArgs {
+ root_name: String,
+ sources: Option<HashMap<String, String>>,
+ bundle: bool,
+ options: Option<String>,
+}
+
+fn op_compile(
+ state: &ThreadSafeState,
+ args: Value,
+ _zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: CompileArgs = serde_json::from_value(args)?;
+ Ok(JsonOp::Async(runtime_compile_async(
+ state.global_state.clone(),
+ &args.root_name,
+ &args.sources,
+ args.bundle,
+ &args.options,
+ )))
+}
+
+#[derive(Deserialize, Debug)]
+struct TranspileArgs {
+ sources: HashMap<String, String>,
+ options: Option<String>,
+}
+
+fn op_transpile(
+ state: &ThreadSafeState,
+ args: Value,
+ _zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: TranspileArgs = serde_json::from_value(args)?;
+ Ok(JsonOp::Async(runtime_transpile_async(
+ state.global_state.clone(),
+ &args.sources,
+ &args.options,
+ )))
+}
diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs
index 300a0dfd1..be987c09f 100644
--- a/cli/ops/web_worker.rs
+++ b/cli/ops/web_worker.rs
@@ -11,10 +11,6 @@ use futures::sink::SinkExt;
use futures::stream::StreamExt;
use std;
use std::convert::From;
-use std::future::Future;
-use std::pin::Pin;
-use std::task::Context;
-use std::task::Poll;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
i.register_op(
@@ -27,33 +23,16 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
);
}
-struct GetMessageFuture {
- state: ThreadSafeState,
-}
-
-impl Future for GetMessageFuture {
- type Output = Option<Buf>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
- let inner = self.get_mut();
- let mut channels = inner.state.worker_channels.lock().unwrap();
- let receiver = &mut channels.receiver;
- receiver.poll_next_unpin(cx)
- }
-}
-
/// Get message from host as guest worker
fn op_worker_get_message(
state: &ThreadSafeState,
_args: Value,
_data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
- let op = GetMessageFuture {
- state: state.clone(),
- };
-
+ let state_ = state.clone();
let op = async move {
- let maybe_buf = op.await;
+ let mut receiver = state_.worker_channels.receiver.lock().await;
+ let maybe_buf = receiver.next().await;
debug!("op_worker_get_message");
Ok(json!({ "data": maybe_buf }))
};
@@ -68,8 +47,7 @@ fn op_worker_post_message(
data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
- let mut channels = state.worker_channels.lock().unwrap();
- let sender = &mut channels.sender;
+ let mut sender = state.worker_channels.sender.clone();
futures::executor::block_on(sender.send(d))
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs
index 6ac48228d..c64e86c1c 100644
--- a/cli/ops/worker_host.rs
+++ b/cli/ops/worker_host.rs
@@ -57,21 +57,6 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics))));
}
-struct GetMessageFuture {
- state: ThreadSafeState,
-}
-
-impl Future for GetMessageFuture {
- type Output = Option<Buf>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
- let inner = self.get_mut();
- let mut channels = inner.state.worker_channels.lock().unwrap();
- let receiver = &mut channels.receiver;
- receiver.poll_next_unpin(cx)
- }
-}
-
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct CreateWorkerArgs {
@@ -250,9 +235,12 @@ fn op_host_close_worker(
let mut workers_table = state_.workers.lock().unwrap();
let maybe_worker = workers_table.remove(&id);
if let Some(worker) = maybe_worker {
- let mut channels = worker.state.worker_channels.lock().unwrap();
- channels.sender.close_channel();
- channels.receiver.close();
+ let channels = worker.state.worker_channels.clone();
+ let mut sender = channels.sender.clone();
+ sender.close_channel();
+
+ let mut receiver = futures::executor::block_on(channels.receiver.lock());
+ receiver.close();
};
Ok(JsonOp::Sync(json!({})))
@@ -285,9 +273,9 @@ fn op_host_get_message(
_data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: HostGetMessageArgs = serde_json::from_value(args)?;
-
+ let state_ = state.clone();
let id = args.id as u32;
- let mut table = state.workers.lock().unwrap();
+ let mut table = state_.workers.lock().unwrap();
// TODO: don't return bad resource anymore
let worker = table.get_mut(&id).ok_or_else(bad_resource)?;
let fut = worker.get_message();
diff --git a/cli/state.rs b/cli/state.rs
index 4ad8241be..02c258280 100644
--- a/cli/state.rs
+++ b/cli/state.rs
@@ -35,6 +35,7 @@ use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;
use std::time::Instant;
+use tokio::sync::Mutex as AsyncMutex;
/// Isolate cannot be passed between threads but ThreadSafeState can.
/// ThreadSafeState satisfies Send and Sync. So any state that needs to be
@@ -46,7 +47,7 @@ pub struct State {
pub global_state: ThreadSafeGlobalState,
pub permissions: Arc<Mutex<DenoPermissions>>,
pub main_module: Option<ModuleSpecifier>,
- pub worker_channels: Mutex<WorkerChannels>,
+ pub worker_channels: WorkerChannels,
/// When flags contains a `.import_map_path` option, the content of the
/// import map file will be resolved and set.
pub import_map: Option<ImportMap>,
@@ -203,11 +204,11 @@ impl ThreadSafeState {
let (out_tx, out_rx) = mpsc::channel::<Buf>(1);
let internal_channels = WorkerChannels {
sender: out_tx,
- receiver: in_rx,
+ receiver: Arc::new(AsyncMutex::new(in_rx)),
};
let external_channels = WorkerChannels {
sender: in_tx,
- receiver: out_rx,
+ receiver: Arc::new(AsyncMutex::new(out_rx)),
};
(internal_channels, external_channels)
}
@@ -241,7 +242,7 @@ impl ThreadSafeState {
main_module,
permissions,
import_map,
- worker_channels: Mutex::new(internal_channels),
+ worker_channels: internal_channels,
metrics: Metrics::default(),
global_timer: Mutex::new(GlobalTimer::new()),
workers: Mutex::new(HashMap::new()),
diff --git a/cli/web_worker.rs b/cli/web_worker.rs
index f933cbdc4..7b21d4937 100644
--- a/cli/web_worker.rs
+++ b/cli/web_worker.rs
@@ -1,35 +1,28 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use crate::fmt_errors::JSError;
use crate::ops;
use crate::state::ThreadSafeState;
+use crate::worker::Worker;
use crate::worker::WorkerChannels;
-use crate::worker::WorkerReceiver;
use deno_core;
-use deno_core::Buf;
use deno_core::ErrBox;
-use deno_core::ModuleSpecifier;
use deno_core::StartupData;
use futures::future::FutureExt;
-use futures::future::TryFutureExt;
-use futures::sink::SinkExt;
-use futures::task::AtomicWaker;
-use std::env;
use std::future::Future;
+use std::ops::Deref;
+use std::ops::DerefMut;
use std::pin::Pin;
-use std::sync::Arc;
-use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;
-use tokio::sync::Mutex as AsyncMutex;
-use url::Url;
+/// This worker is implementation of `Worker` Web API
+///
+/// At the moment this type of worker supports only
+/// communication with parent and creating new workers.
+///
+/// Each `WebWorker` is either a child of `MainWorker` or other
+/// `WebWorker`.
#[derive(Clone)]
-pub struct WebWorker {
- pub name: String,
- pub isolate: Arc<AsyncMutex<Box<deno_core::EsIsolate>>>,
- pub state: ThreadSafeState,
- external_channels: Arc<Mutex<WorkerChannels>>,
-}
+pub struct WebWorker(Worker);
impl WebWorker {
pub fn new(
@@ -38,92 +31,28 @@ impl WebWorker {
state: ThreadSafeState,
external_channels: WorkerChannels,
) -> Self {
- let mut isolate =
- deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false);
-
- ops::web_worker::init(&mut isolate, &state);
- ops::worker_host::init(&mut isolate, &state);
-
- let global_state_ = state.global_state.clone();
- isolate.set_js_error_create(move |v8_exception| {
- JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
- });
-
- Self {
- name,
- isolate: Arc::new(AsyncMutex::new(isolate)),
- state,
- external_channels: Arc::new(Mutex::new(external_channels)),
- }
- }
-
- /// Same as execute2() but the filename defaults to "$CWD/__anonymous__".
- pub fn execute(&mut self, js_source: &str) -> Result<(), ErrBox> {
- let path = env::current_dir().unwrap().join("__anonymous__");
- let url = Url::from_file_path(path).unwrap();
- self.execute2(url.as_str(), js_source)
- }
-
- /// Executes the provided JavaScript source code. The js_filename argument is
- /// provided only for debugging purposes.
- fn execute2(
- &mut self,
- js_filename: &str,
- js_source: &str,
- ) -> Result<(), ErrBox> {
- let mut isolate = self.isolate.try_lock().unwrap();
- isolate.execute(js_filename, js_source)
- }
-
- /// Executes the provided JavaScript module.
- ///
- /// Takes ownership of the isolate behind mutex.
- pub async fn execute_mod_async(
- &mut self,
- module_specifier: &ModuleSpecifier,
- maybe_code: Option<String>,
- is_prefetch: bool,
- ) -> Result<(), ErrBox> {
- let specifier = module_specifier.to_string();
- let worker = self.clone();
-
- let mut isolate = self.isolate.lock().await;
- let id = isolate.load_module(&specifier, maybe_code).await?;
- worker.state.global_state.progress.done();
-
- if !is_prefetch {
- return isolate.mod_evaluate(id);
+ let state_ = state.clone();
+ let worker = Worker::new(name, startup_data, state_, external_channels);
+ {
+ let mut isolate = worker.isolate.try_lock().unwrap();
+ ops::web_worker::init(&mut isolate, &state);
+ ops::worker_host::init(&mut isolate, &state);
}
- Ok(())
- }
-
- /// Post message to worker as a host.
- ///
- /// This method blocks current thread.
- pub fn post_message(
- &self,
- buf: Buf,
- ) -> impl Future<Output = Result<(), ErrBox>> {
- let channels = self.external_channels.lock().unwrap();
- let mut sender = channels.sender.clone();
- async move {
- let result = sender.send(buf).map_err(ErrBox::from).await;
- drop(sender);
- result
- }
+ Self(worker)
}
+}
- /// Get message from worker as a host.
- pub fn get_message(&self) -> WorkerReceiver {
- WorkerReceiver {
- channels: self.external_channels.clone(),
- }
+impl Deref for WebWorker {
+ type Target = Worker;
+ fn deref(&self) -> &Self::Target {
+ &self.0
}
+}
- pub fn clear_exception(&mut self) {
- let mut isolate = self.isolate.try_lock().unwrap();
- isolate.clear_exception();
+impl DerefMut for WebWorker {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
}
}
@@ -132,14 +61,6 @@ impl Future for WebWorker {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
- let waker = AtomicWaker::new();
- waker.register(cx.waker());
- match inner.isolate.try_lock() {
- Ok(mut isolate) => isolate.poll_unpin(cx),
- Err(_) => {
- waker.wake();
- Poll::Pending
- }
- }
+ inner.0.poll_unpin(cx)
}
}
diff --git a/cli/worker.rs b/cli/worker.rs
index 4ad79a09b..ef72602d4 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -15,9 +15,10 @@ use futures::stream::StreamExt;
use futures::task::AtomicWaker;
use std::env;
use std::future::Future;
+use std::ops::Deref;
+use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::Arc;
-use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;
use tokio::sync::Mutex as AsyncMutex;
@@ -26,19 +27,30 @@ use url::Url;
/// Wraps mpsc channels so they can be referenced
/// from ops and used to facilitate parent-child communication
/// for workers.
+#[derive(Clone)]
pub struct WorkerChannels {
pub sender: mpsc::Sender<Buf>,
- pub receiver: mpsc::Receiver<Buf>,
+ pub receiver: Arc<AsyncMutex<mpsc::Receiver<Buf>>>,
}
-/// Wraps deno_core::Isolate to provide source maps, ops for the CLI, and
-/// high-level module loading.
+/// Worker is a CLI wrapper for `deno_core::Isolate`.
+///
+/// It provides infrastructure to communicate with a worker and
+/// consequently between workers.
+///
+/// This struct is meant to be used as a base struct for concrete
+/// type of worker that registers set of ops.
+///
+/// Currently there are three types of workers:
+/// - `MainWorker`
+/// - `CompilerWorker`
+/// - `WebWorker`
#[derive(Clone)]
pub struct Worker {
pub name: String,
pub isolate: Arc<AsyncMutex<Box<deno_core::EsIsolate>>>,
pub state: ThreadSafeState,
- external_channels: Arc<Mutex<WorkerChannels>>,
+ external_channels: WorkerChannels,
}
impl Worker {
@@ -50,26 +62,6 @@ impl Worker {
) -> Self {
let mut isolate =
deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false);
- let op_registry = isolate.op_registry.clone();
-
- ops::compiler::init(&mut isolate, &state);
- ops::errors::init(&mut isolate, &state);
- ops::fetch::init(&mut isolate, &state);
- ops::files::init(&mut isolate, &state);
- ops::fs::init(&mut isolate, &state);
- ops::io::init(&mut isolate, &state);
- ops::plugins::init(&mut isolate, &state, op_registry);
- ops::net::init(&mut isolate, &state);
- ops::tls::init(&mut isolate, &state);
- ops::os::init(&mut isolate, &state);
- ops::permissions::init(&mut isolate, &state);
- ops::process::init(&mut isolate, &state);
- ops::random::init(&mut isolate, &state);
- ops::repl::init(&mut isolate, &state);
- ops::resources::init(&mut isolate, &state);
- ops::timers::init(&mut isolate, &state);
- ops::worker_host::init(&mut isolate, &state);
- ops::web_worker::init(&mut isolate, &state);
let global_state_ = state.global_state.clone();
isolate.set_js_error_create(move |v8_exception| {
@@ -80,7 +72,7 @@ impl Worker {
name,
isolate: Arc::new(AsyncMutex::new(isolate)),
state,
- external_channels: Arc::new(Mutex::new(external_channels)),
+ external_channels,
}
}
@@ -128,24 +120,24 @@ impl Worker {
/// Post message to worker as a host.
///
/// This method blocks current thread.
- pub fn post_message(
- &self,
- buf: Buf,
- ) -> impl Future<Output = Result<(), ErrBox>> {
- let channels = self.external_channels.lock().unwrap();
- let mut sender = channels.sender.clone();
- async move {
- let result = sender.send(buf).map_err(ErrBox::from).await;
- drop(sender);
- result
- }
+ pub async fn post_message(&self, buf: Buf) -> Result<(), ErrBox> {
+ let mut sender = self.external_channels.sender.clone();
+ let result = sender.send(buf).map_err(ErrBox::from).await;
+ drop(sender);
+ result
}
/// Get message from worker as a host.
- pub fn get_message(&self) -> WorkerReceiver {
- WorkerReceiver {
- channels: self.external_channels.clone(),
+ pub fn get_message(
+ &self,
+ ) -> Pin<Box<dyn Future<Output = Option<Buf>> + Send>> {
+ let receiver_mutex = self.external_channels.receiver.clone();
+
+ async move {
+ let mut receiver = receiver_mutex.lock().await;
+ receiver.next().await
}
+ .boxed()
}
pub fn clear_exception(&mut self) {
@@ -171,22 +163,71 @@ impl Future for Worker {
}
}
-/// This structure wraps worker's resource id to implement future
-/// that will return message received from worker or None
-/// if worker's channel has been closed.
-pub struct WorkerReceiver {
- pub channels: Arc<Mutex<WorkerChannels>>,
+/// This worker is created and used by Deno executable.
+///
+/// It provides ops available in the `Deno` namespace.
+///
+/// All WebWorkers created during program execution are decendants of
+/// this worker.
+#[derive(Clone)]
+pub struct MainWorker(Worker);
+
+impl MainWorker {
+ pub fn new(
+ name: String,
+ startup_data: StartupData,
+ state: ThreadSafeState,
+ external_channels: WorkerChannels,
+ ) -> Self {
+ let state_ = state.clone();
+ let worker = Worker::new(name, startup_data, state_, external_channels);
+ {
+ let mut isolate = worker.isolate.try_lock().unwrap();
+ let op_registry = isolate.op_registry.clone();
+
+ ops::runtime_compiler::init(&mut isolate, &state);
+ ops::errors::init(&mut isolate, &state);
+ ops::fetch::init(&mut isolate, &state);
+ ops::files::init(&mut isolate, &state);
+ ops::fs::init(&mut isolate, &state);
+ ops::io::init(&mut isolate, &state);
+ ops::plugins::init(&mut isolate, &state, op_registry);
+ ops::net::init(&mut isolate, &state);
+ ops::tls::init(&mut isolate, &state);
+ ops::os::init(&mut isolate, &state);
+ ops::permissions::init(&mut isolate, &state);
+ ops::process::init(&mut isolate, &state);
+ ops::random::init(&mut isolate, &state);
+ ops::repl::init(&mut isolate, &state);
+ ops::resources::init(&mut isolate, &state);
+ ops::timers::init(&mut isolate, &state);
+ ops::worker_host::init(&mut isolate, &state);
+ ops::web_worker::init(&mut isolate, &state);
+ }
+
+ Self(worker)
+ }
+}
+
+impl Deref for MainWorker {
+ type Target = Worker;
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
}
-impl Future for WorkerReceiver {
- type Output = Result<Option<Buf>, ErrBox>;
+impl DerefMut for MainWorker {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
+ }
+}
+
+impl Future for MainWorker {
+ type Output = Result<(), ErrBox>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
- let mut channels = self.channels.lock().unwrap();
- match channels.receiver.poll_next_unpin(cx) {
- Poll::Ready(v) => Poll::Ready(Ok(v)),
- Poll::Pending => Poll::Pending,
- }
+ let inner = self.get_mut();
+ inner.0.poll_unpin(cx)
}
}
@@ -248,7 +289,7 @@ mod tests {
let state_ = state.clone();
tokio_util::run(async move {
let mut worker =
- Worker::new("TEST".to_string(), StartupData::None, state, ext);
+ MainWorker::new("TEST".to_string(), StartupData::None, state, ext);
let result = worker
.execute_mod_async(&module_specifier, None, false)
.await;
@@ -291,7 +332,7 @@ mod tests {
let state_ = state.clone();
tokio_util::run(async move {
let mut worker =
- Worker::new("TEST".to_string(), StartupData::None, state, ext);
+ MainWorker::new("TEST".to_string(), StartupData::None, state, ext);
let result = worker
.execute_mod_async(&module_specifier, None, false)
.await;
@@ -333,7 +374,7 @@ mod tests {
let global_state_ = global_state;
let state_ = state.clone();
tokio_util::run(async move {
- let mut worker = Worker::new(
+ let mut worker = MainWorker::new(
"TEST".to_string(),
startup_data::deno_isolate_init(),
state,
@@ -359,13 +400,13 @@ mod tests {
drop(http_server_guard);
}
- fn create_test_worker() -> Worker {
+ fn create_test_worker() -> MainWorker {
let (int, ext) = ThreadSafeState::create_channels();
let state = ThreadSafeState::mock(
vec![String::from("./deno"), String::from("hello.js")],
int,
);
- let mut worker = Worker::new(
+ let mut worker = MainWorker::new(
"TEST".to_string(),
startup_data::deno_isolate_init(),
state,
@@ -409,7 +450,7 @@ mod tests {
let r = block_on(worker_.post_message(msg));
assert!(r.is_ok());
- let maybe_msg = block_on(worker_.get_message()).unwrap();
+ let maybe_msg = block_on(worker_.get_message());
assert!(maybe_msg.is_some());
// Check if message received is [1, 2, 3] in json
assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");