summaryrefslogtreecommitdiff
path: root/cli/worker.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2020-01-21 17:50:06 +0100
committerGitHub <noreply@github.com>2020-01-21 17:50:06 +0100
commitecd1d3abb0cae9c7cbc1330cbaa035a5786e94d7 (patch)
tree73d4b12a06f5f6a6bc0e7168e0021efc3a2eda7c /cli/worker.rs
parent229eb292f83dedbc32dc24f912841caf79a53e9c (diff)
refactor: split cli::Worker (#3735)
* cli::Worker is base struct to create specialized workers * add MainWorker * add CompilerWorker * refactor WebWorker to use Worker
Diffstat (limited to 'cli/worker.rs')
-rw-r--r--cli/worker.rs157
1 files changed, 99 insertions, 58 deletions
diff --git a/cli/worker.rs b/cli/worker.rs
index 4ad79a09b..ef72602d4 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -15,9 +15,10 @@ use futures::stream::StreamExt;
use futures::task::AtomicWaker;
use std::env;
use std::future::Future;
+use std::ops::Deref;
+use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::Arc;
-use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;
use tokio::sync::Mutex as AsyncMutex;
@@ -26,19 +27,30 @@ use url::Url;
/// Wraps mpsc channels so they can be referenced
/// from ops and used to facilitate parent-child communication
/// for workers.
+#[derive(Clone)]
pub struct WorkerChannels {
pub sender: mpsc::Sender<Buf>,
- pub receiver: mpsc::Receiver<Buf>,
+ pub receiver: Arc<AsyncMutex<mpsc::Receiver<Buf>>>,
}
-/// Wraps deno_core::Isolate to provide source maps, ops for the CLI, and
-/// high-level module loading.
+/// Worker is a CLI wrapper for `deno_core::Isolate`.
+///
+/// It provides infrastructure to communicate with a worker and
+/// consequently between workers.
+///
+/// This struct is meant to be used as a base struct for concrete
+/// type of worker that registers set of ops.
+///
+/// Currently there are three types of workers:
+/// - `MainWorker`
+/// - `CompilerWorker`
+/// - `WebWorker`
#[derive(Clone)]
pub struct Worker {
pub name: String,
pub isolate: Arc<AsyncMutex<Box<deno_core::EsIsolate>>>,
pub state: ThreadSafeState,
- external_channels: Arc<Mutex<WorkerChannels>>,
+ external_channels: WorkerChannels,
}
impl Worker {
@@ -50,26 +62,6 @@ impl Worker {
) -> Self {
let mut isolate =
deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false);
- let op_registry = isolate.op_registry.clone();
-
- ops::compiler::init(&mut isolate, &state);
- ops::errors::init(&mut isolate, &state);
- ops::fetch::init(&mut isolate, &state);
- ops::files::init(&mut isolate, &state);
- ops::fs::init(&mut isolate, &state);
- ops::io::init(&mut isolate, &state);
- ops::plugins::init(&mut isolate, &state, op_registry);
- ops::net::init(&mut isolate, &state);
- ops::tls::init(&mut isolate, &state);
- ops::os::init(&mut isolate, &state);
- ops::permissions::init(&mut isolate, &state);
- ops::process::init(&mut isolate, &state);
- ops::random::init(&mut isolate, &state);
- ops::repl::init(&mut isolate, &state);
- ops::resources::init(&mut isolate, &state);
- ops::timers::init(&mut isolate, &state);
- ops::worker_host::init(&mut isolate, &state);
- ops::web_worker::init(&mut isolate, &state);
let global_state_ = state.global_state.clone();
isolate.set_js_error_create(move |v8_exception| {
@@ -80,7 +72,7 @@ impl Worker {
name,
isolate: Arc::new(AsyncMutex::new(isolate)),
state,
- external_channels: Arc::new(Mutex::new(external_channels)),
+ external_channels,
}
}
@@ -128,24 +120,24 @@ impl Worker {
/// Post message to worker as a host.
///
/// This method blocks current thread.
- pub fn post_message(
- &self,
- buf: Buf,
- ) -> impl Future<Output = Result<(), ErrBox>> {
- let channels = self.external_channels.lock().unwrap();
- let mut sender = channels.sender.clone();
- async move {
- let result = sender.send(buf).map_err(ErrBox::from).await;
- drop(sender);
- result
- }
+ pub async fn post_message(&self, buf: Buf) -> Result<(), ErrBox> {
+ let mut sender = self.external_channels.sender.clone();
+ let result = sender.send(buf).map_err(ErrBox::from).await;
+ drop(sender);
+ result
}
/// Get message from worker as a host.
- pub fn get_message(&self) -> WorkerReceiver {
- WorkerReceiver {
- channels: self.external_channels.clone(),
+ pub fn get_message(
+ &self,
+ ) -> Pin<Box<dyn Future<Output = Option<Buf>> + Send>> {
+ let receiver_mutex = self.external_channels.receiver.clone();
+
+ async move {
+ let mut receiver = receiver_mutex.lock().await;
+ receiver.next().await
}
+ .boxed()
}
pub fn clear_exception(&mut self) {
@@ -171,22 +163,71 @@ impl Future for Worker {
}
}
-/// This structure wraps worker's resource id to implement future
-/// that will return message received from worker or None
-/// if worker's channel has been closed.
-pub struct WorkerReceiver {
- pub channels: Arc<Mutex<WorkerChannels>>,
+/// This worker is created and used by Deno executable.
+///
+/// It provides ops available in the `Deno` namespace.
+///
+/// All WebWorkers created during program execution are decendants of
+/// this worker.
+#[derive(Clone)]
+pub struct MainWorker(Worker);
+
+impl MainWorker {
+ pub fn new(
+ name: String,
+ startup_data: StartupData,
+ state: ThreadSafeState,
+ external_channels: WorkerChannels,
+ ) -> Self {
+ let state_ = state.clone();
+ let worker = Worker::new(name, startup_data, state_, external_channels);
+ {
+ let mut isolate = worker.isolate.try_lock().unwrap();
+ let op_registry = isolate.op_registry.clone();
+
+ ops::runtime_compiler::init(&mut isolate, &state);
+ ops::errors::init(&mut isolate, &state);
+ ops::fetch::init(&mut isolate, &state);
+ ops::files::init(&mut isolate, &state);
+ ops::fs::init(&mut isolate, &state);
+ ops::io::init(&mut isolate, &state);
+ ops::plugins::init(&mut isolate, &state, op_registry);
+ ops::net::init(&mut isolate, &state);
+ ops::tls::init(&mut isolate, &state);
+ ops::os::init(&mut isolate, &state);
+ ops::permissions::init(&mut isolate, &state);
+ ops::process::init(&mut isolate, &state);
+ ops::random::init(&mut isolate, &state);
+ ops::repl::init(&mut isolate, &state);
+ ops::resources::init(&mut isolate, &state);
+ ops::timers::init(&mut isolate, &state);
+ ops::worker_host::init(&mut isolate, &state);
+ ops::web_worker::init(&mut isolate, &state);
+ }
+
+ Self(worker)
+ }
+}
+
+impl Deref for MainWorker {
+ type Target = Worker;
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
}
-impl Future for WorkerReceiver {
- type Output = Result<Option<Buf>, ErrBox>;
+impl DerefMut for MainWorker {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
+ }
+}
+
+impl Future for MainWorker {
+ type Output = Result<(), ErrBox>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
- let mut channels = self.channels.lock().unwrap();
- match channels.receiver.poll_next_unpin(cx) {
- Poll::Ready(v) => Poll::Ready(Ok(v)),
- Poll::Pending => Poll::Pending,
- }
+ let inner = self.get_mut();
+ inner.0.poll_unpin(cx)
}
}
@@ -248,7 +289,7 @@ mod tests {
let state_ = state.clone();
tokio_util::run(async move {
let mut worker =
- Worker::new("TEST".to_string(), StartupData::None, state, ext);
+ MainWorker::new("TEST".to_string(), StartupData::None, state, ext);
let result = worker
.execute_mod_async(&module_specifier, None, false)
.await;
@@ -291,7 +332,7 @@ mod tests {
let state_ = state.clone();
tokio_util::run(async move {
let mut worker =
- Worker::new("TEST".to_string(), StartupData::None, state, ext);
+ MainWorker::new("TEST".to_string(), StartupData::None, state, ext);
let result = worker
.execute_mod_async(&module_specifier, None, false)
.await;
@@ -333,7 +374,7 @@ mod tests {
let global_state_ = global_state;
let state_ = state.clone();
tokio_util::run(async move {
- let mut worker = Worker::new(
+ let mut worker = MainWorker::new(
"TEST".to_string(),
startup_data::deno_isolate_init(),
state,
@@ -359,13 +400,13 @@ mod tests {
drop(http_server_guard);
}
- fn create_test_worker() -> Worker {
+ fn create_test_worker() -> MainWorker {
let (int, ext) = ThreadSafeState::create_channels();
let state = ThreadSafeState::mock(
vec![String::from("./deno"), String::from("hello.js")],
int,
);
- let mut worker = Worker::new(
+ let mut worker = MainWorker::new(
"TEST".to_string(),
startup_data::deno_isolate_init(),
state,
@@ -409,7 +450,7 @@ mod tests {
let r = block_on(worker_.post_message(msg));
assert!(r.is_ok());
- let maybe_msg = block_on(worker_.get_message()).unwrap();
+ let maybe_msg = block_on(worker_.get_message());
assert!(maybe_msg.is_some());
// Check if message received is [1, 2, 3] in json
assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");