diff options
Diffstat (limited to 'cli/worker.rs')
-rw-r--r-- | cli/worker.rs | 162 |
1 files changed, 124 insertions, 38 deletions
diff --git a/cli/worker.rs b/cli/worker.rs index 4bb68fb95..90fb95af7 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -1,33 +1,57 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use crate::deno_error::bad_resource; use crate::fmt_errors::JSError; use crate::ops; +use crate::resources; +use crate::resources::CoreResource; +use crate::resources::ResourceId; use crate::state::ThreadSafeState; use deno; +use deno::Buf; use deno::ErrBox; use deno::ModuleSpecifier; use deno::RecursiveLoad; use deno::StartupData; use futures::Async; use futures::Future; +use futures::Poll; +use futures::Sink; +use futures::Stream; use std::env; use std::sync::Arc; use std::sync::Mutex; +use tokio::sync::mpsc; use url::Url; +/// Wraps mpsc channels into a generic resource so they can be referenced +/// from ops and used to facilitate parent-child communication +/// for workers. +pub struct WorkerChannels { + pub sender: mpsc::Sender<Buf>, + pub receiver: mpsc::Receiver<Buf>, +} + +impl CoreResource for WorkerChannels { + fn inspect_repr(&self) -> &str { + "worker" + } +} + /// Wraps deno::Isolate to provide source maps, ops for the CLI, and -/// high-level module loading +/// high-level module loading. #[derive(Clone)] pub struct Worker { + pub name: String, isolate: Arc<Mutex<deno::Isolate>>, pub state: ThreadSafeState, } impl Worker { pub fn new( - _name: String, + name: String, startup_data: StartupData, state: ThreadSafeState, - ) -> Worker { + ) -> Self { let isolate = Arc::new(Mutex::new(deno::Isolate::new(startup_data, false))); { let mut i = isolate.lock().unwrap(); @@ -61,12 +85,16 @@ impl Worker { Box::new(load_stream) }); - let state_ = state.clone(); + let global_state_ = state.global_state.clone(); i.set_js_error_create(move |v8_exception| { - JSError::from_v8_exception(v8_exception, &state_.ts_compiler) + JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler) }) } - Self { isolate, state } + Self { + name, + isolate, + state, + } } /// Same as execute2() but the filename defaults to "$CWD/__anonymous__". @@ -106,7 +134,7 @@ impl Worker { ) .get_future(isolate); recursive_load.and_then(move |id| -> Result<(), ErrBox> { - worker.state.progress.done(); + worker.state.global_state.progress.done(); if is_prefetch { Ok(()) } else { @@ -115,6 +143,37 @@ impl Worker { } }) } + + /// Post message to worker as a host or privileged overlord + pub fn post_message(self: &Self, buf: Buf) -> Result<Async<()>, ErrBox> { + Worker::post_message_to_resource(self.state.rid, buf) + } + + pub fn post_message_to_resource( + rid: resources::ResourceId, + buf: Buf, + ) -> Result<Async<()>, ErrBox> { + debug!("post message to resource {}", rid); + let mut table = resources::lock_resource_table(); + let worker = table + .get_mut::<WorkerChannels>(rid) + .ok_or_else(bad_resource)?; + let sender = &mut worker.sender; + sender + .send(buf) + .poll() + .map(|_| Async::Ready(())) + .map_err(ErrBox::from) + } + + pub fn get_message(self: &Self) -> WorkerReceiver { + Worker::get_message_from_resource(self.state.rid) + } + + pub fn get_message_from_resource(rid: ResourceId) -> WorkerReceiver { + debug!("get message from resource {}", rid); + WorkerReceiver { rid } + } } impl Future for Worker { @@ -127,16 +186,39 @@ impl Future for Worker { } } +/// This structure wraps worker's resource id to implement future +/// that will return message received from worker or None +/// if worker's channel has been closed. +pub struct WorkerReceiver { + rid: ResourceId, +} + +impl Future for WorkerReceiver { + type Item = Option<Buf>; + type Error = ErrBox; + + fn poll(&mut self) -> Poll<Option<Buf>, ErrBox> { + let mut table = resources::lock_resource_table(); + let worker = table + .get_mut::<WorkerChannels>(self.rid) + .ok_or_else(bad_resource)?; + let receiver = &mut worker.receiver; + receiver.poll().map_err(ErrBox::from) + } +} + #[cfg(test)] mod tests { use super::*; use crate::flags; + use crate::flags::DenoFlags; + use crate::global_state::ThreadSafeGlobalState; use crate::progress::Progress; - use crate::resources; use crate::startup_data; use crate::state::ThreadSafeState; use crate::tokio_util; use futures::future::lazy; + use futures::IntoFuture; use std::sync::atomic::Ordering; #[test] @@ -149,13 +231,15 @@ mod tests { let module_specifier = ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap(); let argv = vec![String::from("./deno"), module_specifier.to_string()]; - let state = ThreadSafeState::new( + let global_state = ThreadSafeGlobalState::new( flags::DenoFlags::default(), argv, Progress::new(), - true, ) .unwrap(); + let state = + ThreadSafeState::new(global_state, Some(module_specifier.clone()), true) + .unwrap(); let state_ = state.clone(); tokio_util::run(lazy(move || { let mut worker = @@ -186,13 +270,12 @@ mod tests { let module_specifier = ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap(); let argv = vec![String::from("deno"), module_specifier.to_string()]; - let state = ThreadSafeState::new( - flags::DenoFlags::default(), - argv, - Progress::new(), - true, - ) - .unwrap(); + let global_state = + ThreadSafeGlobalState::new(DenoFlags::default(), argv, Progress::new()) + .unwrap(); + let state = + ThreadSafeState::new(global_state, Some(module_specifier.clone()), true) + .unwrap(); let state_ = state.clone(); tokio_util::run(lazy(move || { let mut worker = @@ -227,8 +310,15 @@ mod tests { let argv = vec![String::from("deno"), module_specifier.to_string()]; let mut flags = flags::DenoFlags::default(); flags.reload = true; - let state = - ThreadSafeState::new(flags, argv, Progress::new(), true).unwrap(); + let global_state = + ThreadSafeGlobalState::new(flags, argv, Progress::new()).unwrap(); + let state = ThreadSafeState::new( + global_state.clone(), + Some(module_specifier.clone()), + true, + ) + .unwrap(); + let global_state_ = global_state.clone(); let state_ = state.clone(); tokio_util::run(lazy(move || { let mut worker = Worker::new( @@ -247,10 +337,12 @@ mod tests { }) })); - let metrics = &state_.metrics; - assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 3); + assert_eq!(state_.metrics.resolve_count.load(Ordering::SeqCst), 3); // Check that we've only invoked the compiler once. - assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 1); + assert_eq!( + global_state_.metrics.compiler_starts.load(Ordering::SeqCst), + 1 + ); drop(http_server_guard); } @@ -285,8 +377,9 @@ mod tests { "#; worker.execute(source).unwrap(); - let resource = worker.state.resource.clone(); - let resource_ = resource.clone(); + let worker_ = worker.clone(); + let rid = worker.state.rid; + let resource_ = resources::Resource { rid }; tokio::spawn(lazy(move || { worker.then(move |r| -> Result<(), ()> { @@ -298,14 +391,10 @@ mod tests { let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = resources::post_message_to_worker(resource.rid, msg) - .expect("Bad resource") - .wait(); + let r = worker_.post_message(msg).into_future().wait(); assert!(r.is_ok()); - let maybe_msg = resources::get_message_from_worker(resource.rid) - .wait() - .unwrap(); + let maybe_msg = worker_.get_message().wait().unwrap(); assert!(maybe_msg.is_some()); // Check if message received is [1, 2, 3] in json assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]"); @@ -314,9 +403,7 @@ mod tests { .to_string() .into_boxed_str() .into_boxed_bytes(); - let r = resources::post_message_to_worker(resource.rid, msg) - .expect("Bad resource") - .wait(); + let r = worker_.post_message(msg).into_future().wait(); assert!(r.is_ok()); }) } @@ -329,8 +416,9 @@ mod tests { .execute("onmessage = () => { delete window.onmessage; }") .unwrap(); - let resource = worker.state.resource.clone(); - let rid = resource.rid; + let rid = worker.state.rid; + let resource = resources::Resource { rid }; + let worker_ = worker.clone(); let worker_future = worker .then(move |r| -> Result<(), ()> { @@ -345,9 +433,7 @@ mod tests { tokio::spawn(lazy(move || worker_future_.then(|_| Ok(())))); let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = resources::post_message_to_worker(rid, msg) - .expect("Bad resource") - .wait(); + let r = worker_.post_message(msg).into_future().wait(); assert!(r.is_ok()); debug!("rid {:?}", rid); |