summaryrefslogtreecommitdiff
path: root/cli/workers.rs
diff options
context:
space:
mode:
authorandy finch <andyfinch7@gmail.com>2019-04-01 15:09:59 -0400
committerRyan Dahl <ry@tinyclouds.org>2019-04-01 15:09:59 -0400
commitb0a23beb8fae964be3cdd8c23c38af66257d34c7 (patch)
tree8f7875c8ca059dfb0a3ade4da7bfb94e57d6e1aa /cli/workers.rs
parent659acadf77fdbeef8579a37839a464feb408437a (diff)
Add web worker JS API (#1993)
* Refactored the way worker polling is scheduled and errors are handled. * Share the worker future as a Shared
Diffstat (limited to 'cli/workers.rs')
-rw-r--r--cli/workers.rs303
1 files changed, 212 insertions, 91 deletions
diff --git a/cli/workers.rs b/cli/workers.rs
index 0c8d49fa7..cb919e8ed 100644
--- a/cli/workers.rs
+++ b/cli/workers.rs
@@ -1,17 +1,72 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+use crate::errors::*;
+use crate::flags::DenoFlags;
use crate::isolate::{DenoBehavior, Isolate};
+use crate::isolate_state::IsolateState;
+use crate::isolate_state::IsolateStateContainer;
use crate::isolate_state::WorkerChannels;
-use crate::js_errors::JSErrorColor;
+use crate::ops;
use crate::resources;
-use crate::tokio_util;
+use crate::startup_data;
+use deno::deno_buf;
+use deno::Behavior;
use deno::Buf;
use deno::JSError;
-use futures::future::lazy;
+use deno::Op;
+use deno::StartupData;
use futures::sync::mpsc;
-use futures::sync::oneshot;
use futures::Future;
use futures::Poll;
-use std::thread;
+use std::sync::Arc;
+
+pub struct UserWorkerBehavior {
+ pub state: Arc<IsolateState>,
+}
+
+impl UserWorkerBehavior {
+ pub fn new(flags: DenoFlags, argv_rest: Vec<String>) -> Self {
+ Self {
+ state: Arc::new(IsolateState::new(flags, argv_rest, None, true)),
+ }
+ }
+}
+
+impl IsolateStateContainer for UserWorkerBehavior {
+ fn state(&self) -> Arc<IsolateState> {
+ self.state.clone()
+ }
+}
+
+impl IsolateStateContainer for &UserWorkerBehavior {
+ fn state(&self) -> Arc<IsolateState> {
+ self.state.clone()
+ }
+}
+
+impl Behavior for UserWorkerBehavior {
+ fn startup_data(&mut self) -> Option<StartupData> {
+ Some(startup_data::deno_isolate_init())
+ }
+
+ fn dispatch(
+ &mut self,
+ control: &[u8],
+ zero_copy: deno_buf,
+ ) -> (bool, Box<Op>) {
+ ops::dispatch_all(self, control, zero_copy, ops::op_selector_worker)
+ }
+}
+
+impl WorkerBehavior for UserWorkerBehavior {
+ fn set_internal_channels(&mut self, worker_channels: WorkerChannels) {
+ self.state = Arc::new(IsolateState::new(
+ self.state.flags.clone(),
+ self.state.argv.clone(),
+ Some(worker_channels),
+ true,
+ ));
+ }
+}
/// Behavior trait specific to workers
pub trait WorkerBehavior: DenoBehavior {
@@ -24,10 +79,11 @@ pub trait WorkerBehavior: DenoBehavior {
/// Rust interface for WebWorkers.
pub struct Worker<B: WorkerBehavior> {
isolate: Isolate<B>,
+ pub resource: resources::Resource,
}
impl<B: WorkerBehavior> Worker<B> {
- pub fn new(mut behavior: B) -> (Self, WorkerChannels) {
+ pub fn new(mut behavior: B) -> Self {
let (worker_in_tx, worker_in_rx) = mpsc::channel::<Buf>(1);
let (worker_out_tx, worker_out_rx) = mpsc::channel::<Buf>(1);
@@ -38,13 +94,23 @@ impl<B: WorkerBehavior> Worker<B> {
let isolate = Isolate::new(behavior);
- let worker = Worker { isolate };
- (worker, external_channels)
+ Worker {
+ isolate,
+ resource: resources::add_worker(external_channels),
+ }
}
pub fn execute(&mut self, js_source: &str) -> Result<(), JSError> {
self.isolate.execute(js_source)
}
+
+ pub fn execute_mod(
+ &mut self,
+ js_filename: &str,
+ is_prefetch: bool,
+ ) -> Result<(), RustOrJsError> {
+ self.isolate.execute_mod(js_filename, is_prefetch)
+ }
}
impl<B: WorkerBehavior> Future for Worker<B> {
@@ -56,47 +122,48 @@ impl<B: WorkerBehavior> Future for Worker<B> {
}
}
+/// Method and data used to initalize a worker
+pub enum WorkerInit {
+ Script(String),
+ Module(String),
+}
+
pub fn spawn<B: WorkerBehavior + 'static>(
behavior: B,
- js_source: String,
-) -> resources::Resource {
- // TODO This function should return a Future, so that the caller can retrieve
- // the JSError if one is thrown. Currently it just prints to stderr and calls
- // exit(1).
- // let (js_error_tx, js_error_rx) = oneshot::channel::<JSError>();
- let (p, c) = oneshot::channel::<resources::Resource>();
- let builder = thread::Builder::new().name("worker".to_string());
-
- let _tid = builder
- .spawn(move || {
- tokio_util::run(lazy(move || {
- let (mut worker, external_channels) = Worker::new(behavior);
- let resource = resources::add_worker(external_channels);
- p.send(resource.clone()).unwrap();
-
- worker
- .execute("denoMain()")
- .expect("worker denoMain failed");
- worker
- .execute("workerMain()")
- .expect("worker workerMain failed");
- worker.execute(&js_source).expect("worker js_source failed");
+ worker_debug_name: &str,
+ init: WorkerInit,
+) -> Result<Worker<B>, RustOrJsError> {
+ let state = behavior.state().clone();
+ let mut worker = Worker::new(behavior);
- worker.then(move |r| -> Result<(), ()> {
- resource.close();
- debug!("workers.rs after resource close");
- if let Err(err) = r {
- eprintln!("{}", JSErrorColor(&err).to_string());
- std::process::exit(1);
- }
- Ok(())
- })
- }));
+ worker
+ .execute(&format!("denoMain('{}')", worker_debug_name))
+ .expect("worker workerInit failed");
+
+ worker
+ .execute("workerMain()")
+ .expect("worker workerMain failed");
- debug!("workers.rs after spawn");
- }).unwrap();
+ let init_result = match init {
+ WorkerInit::Script(script) => match worker.execute(&script) {
+ Ok(v) => Ok(v),
+ Err(e) => Err(RustOrJsError::Js(e)),
+ },
+ WorkerInit::Module(specifier) => {
+ let should_prefetch = state.flags.prefetch || state.flags.info;
+ match state.dir.resolve_module_url(&specifier, ".") {
+ Err(err) => Err(RustOrJsError::Rust(DenoError::from(err))),
+ Ok(module_url) => {
+ worker.execute_mod(&module_url.to_string(), should_prefetch)
+ }
+ }
+ }
+ };
- c.wait().unwrap()
+ match init_result {
+ Ok(_) => Ok(worker),
+ Err(err) => Err(err),
+ }
}
#[cfg(test)]
@@ -104,63 +171,117 @@ mod tests {
use super::*;
use crate::compiler::CompilerBehavior;
use crate::isolate_state::IsolateState;
- use std::sync::Arc;
+ use crate::js_errors::JSErrorColor;
+ use crate::tokio_util;
+ use futures::future::lazy;
+ use std::thread;
#[test]
fn test_spawn() {
- let resource = spawn(
- CompilerBehavior::new(Arc::new(IsolateState::mock())),
- r#"
- onmessage = function(e) {
- let s = new TextDecoder().decode(e.data);;
- console.log("msg from main script", s);
- if (s == "exit") {
- close();
- return;
- } else {
- console.assert(s === "hi");
+ tokio_util::init(|| {
+ let worker_result = spawn(
+ CompilerBehavior::new(
+ IsolateState::mock().flags.clone(),
+ IsolateState::mock().argv.clone(),
+ ),
+ "TEST",
+ WorkerInit::Script(
+ r#"
+ onmessage = function(e) {
+ console.log("msg from main script", e.data);
+ if (e.data == "exit") {
+ close();
+ return;
+ } else {
+ console.assert(e.data === "hi");
+ }
+ postMessage([1, 2, 3]);
+ console.log("after postMessage");
}
- postMessage(new Uint8Array([1, 2, 3]));
- console.log("after postMessage");
- }
- "#.into(),
- );
- let msg = String::from("hi").into_boxed_str().into_boxed_bytes();
+ "#.into(),
+ ),
+ );
+ assert!(worker_result.is_ok());
+ let worker = worker_result.unwrap();
+ let resource = worker.resource.clone();
+ let resource_ = resource.clone();
- let r = resources::worker_post_message(resource.rid, msg).wait();
- assert!(r.is_ok());
+ tokio::spawn(lazy(move || {
+ worker.then(move |r| -> Result<(), ()> {
+ resource_.close();
+ debug!("workers.rs after resource close");
+ if let Err(err) = r {
+ eprintln!("{}", JSErrorColor(&err).to_string());
+ assert!(false)
+ }
+ Ok(())
+ })
+ }));
- let maybe_msg =
- resources::worker_recv_message(resource.rid).wait().unwrap();
- assert!(maybe_msg.is_some());
- assert_eq!(*maybe_msg.unwrap(), [1, 2, 3]);
+ let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let msg = String::from("exit").into_boxed_str().into_boxed_bytes();
- let r = resources::worker_post_message(resource.rid, msg).wait();
- assert!(r.is_ok());
+ let r = resources::post_message_to_worker(resource.rid, msg).wait();
+ assert!(r.is_ok());
+
+ let maybe_msg = resources::get_message_from_worker(resource.rid)
+ .wait()
+ .unwrap();
+ assert!(maybe_msg.is_some());
+ // Check if message received is [1, 2, 3] in json
+ assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");
+
+ let msg = json!("exit")
+ .to_string()
+ .into_boxed_str()
+ .into_boxed_bytes();
+ let r = resources::post_message_to_worker(resource.rid, msg).wait();
+ assert!(r.is_ok());
+ })
}
#[test]
fn removed_from_resource_table_on_close() {
- let resource = spawn(
- CompilerBehavior::new(Arc::new(IsolateState::mock())),
- "onmessage = () => close();".into(),
- );
-
- assert_eq!(
- resources::get_type(resource.rid),
- Some("worker".to_string())
- );
-
- let msg = String::from("hi").into_boxed_str().into_boxed_bytes();
- let r = resources::worker_post_message(resource.rid, msg).wait();
- assert!(r.is_ok());
- println!("rid {:?}", resource.rid);
-
- // TODO Need a way to get a future for when a resource closes.
- // For now, just sleep for a bit.
- // resource.close();
- thread::sleep(std::time::Duration::from_millis(1000));
- assert_eq!(resources::get_type(resource.rid), None);
+ tokio_util::init(|| {
+ let worker_result = spawn(
+ CompilerBehavior::new(
+ IsolateState::mock().flags.clone(),
+ IsolateState::mock().argv.clone(),
+ ),
+ "TEST",
+ WorkerInit::Script("onmessage = () => close();".into()),
+ );
+ assert!(worker_result.is_ok());
+ let worker = worker_result.unwrap();
+ let resource = worker.resource.clone();
+ let resource_ = resource.clone();
+
+ tokio::spawn(lazy(move || {
+ worker.then(move |r| -> Result<(), ()> {
+ resource_.close();
+ debug!("workers.rs after resource close");
+ if let Err(err) = r {
+ eprintln!("{}", JSErrorColor(&err).to_string());
+ assert!(false)
+ }
+ Ok(())
+ })
+ }));
+
+ assert_eq!(
+ resources::get_type(resource.rid),
+ Some("worker".to_string())
+ );
+
+ let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
+ let r = resources::post_message_to_worker(resource.rid, msg).wait();
+ assert!(r.is_ok());
+ println!("rid {:?}", resource.rid);
+
+ // TODO Need a way to get a future for when a resource closes.
+ // For now, just sleep for a bit.
+ // resource.close();
+ thread::sleep(std::time::Duration::from_millis(1000));
+ assert_eq!(resources::get_type(resource.rid), None);
+ })
}
}