summaryrefslogtreecommitdiff
path: root/cli/worker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/worker.rs')
-rw-r--r--cli/worker.rs162
1 files changed, 124 insertions, 38 deletions
diff --git a/cli/worker.rs b/cli/worker.rs
index 4bb68fb95..90fb95af7 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -1,33 +1,57 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+use crate::deno_error::bad_resource;
use crate::fmt_errors::JSError;
use crate::ops;
+use crate::resources;
+use crate::resources::CoreResource;
+use crate::resources::ResourceId;
use crate::state::ThreadSafeState;
use deno;
+use deno::Buf;
use deno::ErrBox;
use deno::ModuleSpecifier;
use deno::RecursiveLoad;
use deno::StartupData;
use futures::Async;
use futures::Future;
+use futures::Poll;
+use futures::Sink;
+use futures::Stream;
use std::env;
use std::sync::Arc;
use std::sync::Mutex;
+use tokio::sync::mpsc;
use url::Url;
+/// Wraps mpsc channels into a generic resource so they can be referenced
+/// from ops and used to facilitate parent-child communication
+/// for workers.
+pub struct WorkerChannels {
+ pub sender: mpsc::Sender<Buf>,
+ pub receiver: mpsc::Receiver<Buf>,
+}
+
+impl CoreResource for WorkerChannels {
+ fn inspect_repr(&self) -> &str {
+ "worker"
+ }
+}
+
/// Wraps deno::Isolate to provide source maps, ops for the CLI, and
-/// high-level module loading
+/// high-level module loading.
#[derive(Clone)]
pub struct Worker {
+ pub name: String,
isolate: Arc<Mutex<deno::Isolate>>,
pub state: ThreadSafeState,
}
impl Worker {
pub fn new(
- _name: String,
+ name: String,
startup_data: StartupData,
state: ThreadSafeState,
- ) -> Worker {
+ ) -> Self {
let isolate = Arc::new(Mutex::new(deno::Isolate::new(startup_data, false)));
{
let mut i = isolate.lock().unwrap();
@@ -61,12 +85,16 @@ impl Worker {
Box::new(load_stream)
});
- let state_ = state.clone();
+ let global_state_ = state.global_state.clone();
i.set_js_error_create(move |v8_exception| {
- JSError::from_v8_exception(v8_exception, &state_.ts_compiler)
+ JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
})
}
- Self { isolate, state }
+ Self {
+ name,
+ isolate,
+ state,
+ }
}
/// Same as execute2() but the filename defaults to "$CWD/__anonymous__".
@@ -106,7 +134,7 @@ impl Worker {
)
.get_future(isolate);
recursive_load.and_then(move |id| -> Result<(), ErrBox> {
- worker.state.progress.done();
+ worker.state.global_state.progress.done();
if is_prefetch {
Ok(())
} else {
@@ -115,6 +143,37 @@ impl Worker {
}
})
}
+
+ /// Post message to worker as a host or privileged overlord
+ pub fn post_message(self: &Self, buf: Buf) -> Result<Async<()>, ErrBox> {
+ Worker::post_message_to_resource(self.state.rid, buf)
+ }
+
+ pub fn post_message_to_resource(
+ rid: resources::ResourceId,
+ buf: Buf,
+ ) -> Result<Async<()>, ErrBox> {
+ debug!("post message to resource {}", rid);
+ let mut table = resources::lock_resource_table();
+ let worker = table
+ .get_mut::<WorkerChannels>(rid)
+ .ok_or_else(bad_resource)?;
+ let sender = &mut worker.sender;
+ sender
+ .send(buf)
+ .poll()
+ .map(|_| Async::Ready(()))
+ .map_err(ErrBox::from)
+ }
+
+ pub fn get_message(self: &Self) -> WorkerReceiver {
+ Worker::get_message_from_resource(self.state.rid)
+ }
+
+ pub fn get_message_from_resource(rid: ResourceId) -> WorkerReceiver {
+ debug!("get message from resource {}", rid);
+ WorkerReceiver { rid }
+ }
}
impl Future for Worker {
@@ -127,16 +186,39 @@ impl Future for Worker {
}
}
+/// This structure wraps worker's resource id to implement future
+/// that will return message received from worker or None
+/// if worker's channel has been closed.
+pub struct WorkerReceiver {
+ rid: ResourceId,
+}
+
+impl Future for WorkerReceiver {
+ type Item = Option<Buf>;
+ type Error = ErrBox;
+
+ fn poll(&mut self) -> Poll<Option<Buf>, ErrBox> {
+ let mut table = resources::lock_resource_table();
+ let worker = table
+ .get_mut::<WorkerChannels>(self.rid)
+ .ok_or_else(bad_resource)?;
+ let receiver = &mut worker.receiver;
+ receiver.poll().map_err(ErrBox::from)
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
use crate::flags;
+ use crate::flags::DenoFlags;
+ use crate::global_state::ThreadSafeGlobalState;
use crate::progress::Progress;
- use crate::resources;
use crate::startup_data;
use crate::state::ThreadSafeState;
use crate::tokio_util;
use futures::future::lazy;
+ use futures::IntoFuture;
use std::sync::atomic::Ordering;
#[test]
@@ -149,13 +231,15 @@ mod tests {
let module_specifier =
ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap();
let argv = vec![String::from("./deno"), module_specifier.to_string()];
- let state = ThreadSafeState::new(
+ let global_state = ThreadSafeGlobalState::new(
flags::DenoFlags::default(),
argv,
Progress::new(),
- true,
)
.unwrap();
+ let state =
+ ThreadSafeState::new(global_state, Some(module_specifier.clone()), true)
+ .unwrap();
let state_ = state.clone();
tokio_util::run(lazy(move || {
let mut worker =
@@ -186,13 +270,12 @@ mod tests {
let module_specifier =
ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap();
let argv = vec![String::from("deno"), module_specifier.to_string()];
- let state = ThreadSafeState::new(
- flags::DenoFlags::default(),
- argv,
- Progress::new(),
- true,
- )
- .unwrap();
+ let global_state =
+ ThreadSafeGlobalState::new(DenoFlags::default(), argv, Progress::new())
+ .unwrap();
+ let state =
+ ThreadSafeState::new(global_state, Some(module_specifier.clone()), true)
+ .unwrap();
let state_ = state.clone();
tokio_util::run(lazy(move || {
let mut worker =
@@ -227,8 +310,15 @@ mod tests {
let argv = vec![String::from("deno"), module_specifier.to_string()];
let mut flags = flags::DenoFlags::default();
flags.reload = true;
- let state =
- ThreadSafeState::new(flags, argv, Progress::new(), true).unwrap();
+ let global_state =
+ ThreadSafeGlobalState::new(flags, argv, Progress::new()).unwrap();
+ let state = ThreadSafeState::new(
+ global_state.clone(),
+ Some(module_specifier.clone()),
+ true,
+ )
+ .unwrap();
+ let global_state_ = global_state.clone();
let state_ = state.clone();
tokio_util::run(lazy(move || {
let mut worker = Worker::new(
@@ -247,10 +337,12 @@ mod tests {
})
}));
- let metrics = &state_.metrics;
- assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 3);
+ assert_eq!(state_.metrics.resolve_count.load(Ordering::SeqCst), 3);
// Check that we've only invoked the compiler once.
- assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 1);
+ assert_eq!(
+ global_state_.metrics.compiler_starts.load(Ordering::SeqCst),
+ 1
+ );
drop(http_server_guard);
}
@@ -285,8 +377,9 @@ mod tests {
"#;
worker.execute(source).unwrap();
- let resource = worker.state.resource.clone();
- let resource_ = resource.clone();
+ let worker_ = worker.clone();
+ let rid = worker.state.rid;
+ let resource_ = resources::Resource { rid };
tokio::spawn(lazy(move || {
worker.then(move |r| -> Result<(), ()> {
@@ -298,14 +391,10 @@ mod tests {
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = resources::post_message_to_worker(resource.rid, msg)
- .expect("Bad resource")
- .wait();
+ let r = worker_.post_message(msg).into_future().wait();
assert!(r.is_ok());
- let maybe_msg = resources::get_message_from_worker(resource.rid)
- .wait()
- .unwrap();
+ let maybe_msg = worker_.get_message().wait().unwrap();
assert!(maybe_msg.is_some());
// Check if message received is [1, 2, 3] in json
assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");
@@ -314,9 +403,7 @@ mod tests {
.to_string()
.into_boxed_str()
.into_boxed_bytes();
- let r = resources::post_message_to_worker(resource.rid, msg)
- .expect("Bad resource")
- .wait();
+ let r = worker_.post_message(msg).into_future().wait();
assert!(r.is_ok());
})
}
@@ -329,8 +416,9 @@ mod tests {
.execute("onmessage = () => { delete window.onmessage; }")
.unwrap();
- let resource = worker.state.resource.clone();
- let rid = resource.rid;
+ let rid = worker.state.rid;
+ let resource = resources::Resource { rid };
+ let worker_ = worker.clone();
let worker_future = worker
.then(move |r| -> Result<(), ()> {
@@ -345,9 +433,7 @@ mod tests {
tokio::spawn(lazy(move || worker_future_.then(|_| Ok(()))));
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = resources::post_message_to_worker(rid, msg)
- .expect("Bad resource")
- .wait();
+ let r = worker_.post_message(msg).into_future().wait();
assert!(r.is_ok());
debug!("rid {:?}", rid);