diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2020-01-21 17:50:06 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-21 17:50:06 +0100 |
commit | ecd1d3abb0cae9c7cbc1330cbaa035a5786e94d7 (patch) | |
tree | 73d4b12a06f5f6a6bc0e7168e0021efc3a2eda7c /cli/worker.rs | |
parent | 229eb292f83dedbc32dc24f912841caf79a53e9c (diff) |
refactor: split cli::Worker (#3735)
* cli::Worker is base struct to create specialized workers
* add MainWorker
* add CompilerWorker
* refactor WebWorker to use Worker
Diffstat (limited to 'cli/worker.rs')
-rw-r--r-- | cli/worker.rs | 157 |
1 files changed, 99 insertions, 58 deletions
diff --git a/cli/worker.rs b/cli/worker.rs index 4ad79a09b..ef72602d4 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -15,9 +15,10 @@ use futures::stream::StreamExt; use futures::task::AtomicWaker; use std::env; use std::future::Future; +use std::ops::Deref; +use std::ops::DerefMut; use std::pin::Pin; use std::sync::Arc; -use std::sync::Mutex; use std::task::Context; use std::task::Poll; use tokio::sync::Mutex as AsyncMutex; @@ -26,19 +27,30 @@ use url::Url; /// Wraps mpsc channels so they can be referenced /// from ops and used to facilitate parent-child communication /// for workers. +#[derive(Clone)] pub struct WorkerChannels { pub sender: mpsc::Sender<Buf>, - pub receiver: mpsc::Receiver<Buf>, + pub receiver: Arc<AsyncMutex<mpsc::Receiver<Buf>>>, } -/// Wraps deno_core::Isolate to provide source maps, ops for the CLI, and -/// high-level module loading. +/// Worker is a CLI wrapper for `deno_core::Isolate`. +/// +/// It provides infrastructure to communicate with a worker and +/// consequently between workers. +/// +/// This struct is meant to be used as a base struct for concrete +/// type of worker that registers set of ops. +/// +/// Currently there are three types of workers: +/// - `MainWorker` +/// - `CompilerWorker` +/// - `WebWorker` #[derive(Clone)] pub struct Worker { pub name: String, pub isolate: Arc<AsyncMutex<Box<deno_core::EsIsolate>>>, pub state: ThreadSafeState, - external_channels: Arc<Mutex<WorkerChannels>>, + external_channels: WorkerChannels, } impl Worker { @@ -50,26 +62,6 @@ impl Worker { ) -> Self { let mut isolate = deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false); - let op_registry = isolate.op_registry.clone(); - - ops::compiler::init(&mut isolate, &state); - ops::errors::init(&mut isolate, &state); - ops::fetch::init(&mut isolate, &state); - ops::files::init(&mut isolate, &state); - ops::fs::init(&mut isolate, &state); - ops::io::init(&mut isolate, &state); - ops::plugins::init(&mut isolate, &state, op_registry); - ops::net::init(&mut isolate, &state); - ops::tls::init(&mut isolate, &state); - ops::os::init(&mut isolate, &state); - ops::permissions::init(&mut isolate, &state); - ops::process::init(&mut isolate, &state); - ops::random::init(&mut isolate, &state); - ops::repl::init(&mut isolate, &state); - ops::resources::init(&mut isolate, &state); - ops::timers::init(&mut isolate, &state); - ops::worker_host::init(&mut isolate, &state); - ops::web_worker::init(&mut isolate, &state); let global_state_ = state.global_state.clone(); isolate.set_js_error_create(move |v8_exception| { @@ -80,7 +72,7 @@ impl Worker { name, isolate: Arc::new(AsyncMutex::new(isolate)), state, - external_channels: Arc::new(Mutex::new(external_channels)), + external_channels, } } @@ -128,24 +120,24 @@ impl Worker { /// Post message to worker as a host. /// /// This method blocks current thread. - pub fn post_message( - &self, - buf: Buf, - ) -> impl Future<Output = Result<(), ErrBox>> { - let channels = self.external_channels.lock().unwrap(); - let mut sender = channels.sender.clone(); - async move { - let result = sender.send(buf).map_err(ErrBox::from).await; - drop(sender); - result - } + pub async fn post_message(&self, buf: Buf) -> Result<(), ErrBox> { + let mut sender = self.external_channels.sender.clone(); + let result = sender.send(buf).map_err(ErrBox::from).await; + drop(sender); + result } /// Get message from worker as a host. - pub fn get_message(&self) -> WorkerReceiver { - WorkerReceiver { - channels: self.external_channels.clone(), + pub fn get_message( + &self, + ) -> Pin<Box<dyn Future<Output = Option<Buf>> + Send>> { + let receiver_mutex = self.external_channels.receiver.clone(); + + async move { + let mut receiver = receiver_mutex.lock().await; + receiver.next().await } + .boxed() } pub fn clear_exception(&mut self) { @@ -171,22 +163,71 @@ impl Future for Worker { } } -/// This structure wraps worker's resource id to implement future -/// that will return message received from worker or None -/// if worker's channel has been closed. -pub struct WorkerReceiver { - pub channels: Arc<Mutex<WorkerChannels>>, +/// This worker is created and used by Deno executable. +/// +/// It provides ops available in the `Deno` namespace. +/// +/// All WebWorkers created during program execution are decendants of +/// this worker. +#[derive(Clone)] +pub struct MainWorker(Worker); + +impl MainWorker { + pub fn new( + name: String, + startup_data: StartupData, + state: ThreadSafeState, + external_channels: WorkerChannels, + ) -> Self { + let state_ = state.clone(); + let worker = Worker::new(name, startup_data, state_, external_channels); + { + let mut isolate = worker.isolate.try_lock().unwrap(); + let op_registry = isolate.op_registry.clone(); + + ops::runtime_compiler::init(&mut isolate, &state); + ops::errors::init(&mut isolate, &state); + ops::fetch::init(&mut isolate, &state); + ops::files::init(&mut isolate, &state); + ops::fs::init(&mut isolate, &state); + ops::io::init(&mut isolate, &state); + ops::plugins::init(&mut isolate, &state, op_registry); + ops::net::init(&mut isolate, &state); + ops::tls::init(&mut isolate, &state); + ops::os::init(&mut isolate, &state); + ops::permissions::init(&mut isolate, &state); + ops::process::init(&mut isolate, &state); + ops::random::init(&mut isolate, &state); + ops::repl::init(&mut isolate, &state); + ops::resources::init(&mut isolate, &state); + ops::timers::init(&mut isolate, &state); + ops::worker_host::init(&mut isolate, &state); + ops::web_worker::init(&mut isolate, &state); + } + + Self(worker) + } +} + +impl Deref for MainWorker { + type Target = Worker; + fn deref(&self) -> &Self::Target { + &self.0 + } } -impl Future for WorkerReceiver { - type Output = Result<Option<Buf>, ErrBox>; +impl DerefMut for MainWorker { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Future for MainWorker { + type Output = Result<(), ErrBox>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let mut channels = self.channels.lock().unwrap(); - match channels.receiver.poll_next_unpin(cx) { - Poll::Ready(v) => Poll::Ready(Ok(v)), - Poll::Pending => Poll::Pending, - } + let inner = self.get_mut(); + inner.0.poll_unpin(cx) } } @@ -248,7 +289,7 @@ mod tests { let state_ = state.clone(); tokio_util::run(async move { let mut worker = - Worker::new("TEST".to_string(), StartupData::None, state, ext); + MainWorker::new("TEST".to_string(), StartupData::None, state, ext); let result = worker .execute_mod_async(&module_specifier, None, false) .await; @@ -291,7 +332,7 @@ mod tests { let state_ = state.clone(); tokio_util::run(async move { let mut worker = - Worker::new("TEST".to_string(), StartupData::None, state, ext); + MainWorker::new("TEST".to_string(), StartupData::None, state, ext); let result = worker .execute_mod_async(&module_specifier, None, false) .await; @@ -333,7 +374,7 @@ mod tests { let global_state_ = global_state; let state_ = state.clone(); tokio_util::run(async move { - let mut worker = Worker::new( + let mut worker = MainWorker::new( "TEST".to_string(), startup_data::deno_isolate_init(), state, @@ -359,13 +400,13 @@ mod tests { drop(http_server_guard); } - fn create_test_worker() -> Worker { + fn create_test_worker() -> MainWorker { let (int, ext) = ThreadSafeState::create_channels(); let state = ThreadSafeState::mock( vec![String::from("./deno"), String::from("hello.js")], int, ); - let mut worker = Worker::new( + let mut worker = MainWorker::new( "TEST".to_string(), startup_data::deno_isolate_init(), state, @@ -409,7 +450,7 @@ mod tests { let r = block_on(worker_.post_message(msg)); assert!(r.is_ok()); - let maybe_msg = block_on(worker_.get_message()).unwrap(); + let maybe_msg = block_on(worker_.get_message()); assert!(maybe_msg.is_some()); // Check if message received is [1, 2, 3] in json assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]"); |