diff options
author | andy finch <andyfinch7@gmail.com> | 2019-04-01 15:09:59 -0400 |
---|---|---|
committer | Ryan Dahl <ry@tinyclouds.org> | 2019-04-01 15:09:59 -0400 |
commit | b0a23beb8fae964be3cdd8c23c38af66257d34c7 (patch) | |
tree | 8f7875c8ca059dfb0a3ade4da7bfb94e57d6e1aa /cli/workers.rs | |
parent | 659acadf77fdbeef8579a37839a464feb408437a (diff) |
Add web worker JS API (#1993)
* Refactored the way worker polling is scheduled and errors are handled.
* Share the worker future as a Shared
Diffstat (limited to 'cli/workers.rs')
-rw-r--r-- | cli/workers.rs | 303 |
1 files changed, 212 insertions, 91 deletions
diff --git a/cli/workers.rs b/cli/workers.rs index 0c8d49fa7..cb919e8ed 100644 --- a/cli/workers.rs +++ b/cli/workers.rs @@ -1,17 +1,72 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use crate::errors::*; +use crate::flags::DenoFlags; use crate::isolate::{DenoBehavior, Isolate}; +use crate::isolate_state::IsolateState; +use crate::isolate_state::IsolateStateContainer; use crate::isolate_state::WorkerChannels; -use crate::js_errors::JSErrorColor; +use crate::ops; use crate::resources; -use crate::tokio_util; +use crate::startup_data; +use deno::deno_buf; +use deno::Behavior; use deno::Buf; use deno::JSError; -use futures::future::lazy; +use deno::Op; +use deno::StartupData; use futures::sync::mpsc; -use futures::sync::oneshot; use futures::Future; use futures::Poll; -use std::thread; +use std::sync::Arc; + +pub struct UserWorkerBehavior { + pub state: Arc<IsolateState>, +} + +impl UserWorkerBehavior { + pub fn new(flags: DenoFlags, argv_rest: Vec<String>) -> Self { + Self { + state: Arc::new(IsolateState::new(flags, argv_rest, None, true)), + } + } +} + +impl IsolateStateContainer for UserWorkerBehavior { + fn state(&self) -> Arc<IsolateState> { + self.state.clone() + } +} + +impl IsolateStateContainer for &UserWorkerBehavior { + fn state(&self) -> Arc<IsolateState> { + self.state.clone() + } +} + +impl Behavior for UserWorkerBehavior { + fn startup_data(&mut self) -> Option<StartupData> { + Some(startup_data::deno_isolate_init()) + } + + fn dispatch( + &mut self, + control: &[u8], + zero_copy: deno_buf, + ) -> (bool, Box<Op>) { + ops::dispatch_all(self, control, zero_copy, ops::op_selector_worker) + } +} + +impl WorkerBehavior for UserWorkerBehavior { + fn set_internal_channels(&mut self, worker_channels: WorkerChannels) { + self.state = Arc::new(IsolateState::new( + self.state.flags.clone(), + self.state.argv.clone(), + Some(worker_channels), + true, + )); + } +} /// Behavior trait specific to workers pub trait WorkerBehavior: DenoBehavior { @@ -24,10 +79,11 @@ pub trait WorkerBehavior: DenoBehavior { /// Rust interface for WebWorkers. pub struct Worker<B: WorkerBehavior> { isolate: Isolate<B>, + pub resource: resources::Resource, } impl<B: WorkerBehavior> Worker<B> { - pub fn new(mut behavior: B) -> (Self, WorkerChannels) { + pub fn new(mut behavior: B) -> Self { let (worker_in_tx, worker_in_rx) = mpsc::channel::<Buf>(1); let (worker_out_tx, worker_out_rx) = mpsc::channel::<Buf>(1); @@ -38,13 +94,23 @@ impl<B: WorkerBehavior> Worker<B> { let isolate = Isolate::new(behavior); - let worker = Worker { isolate }; - (worker, external_channels) + Worker { + isolate, + resource: resources::add_worker(external_channels), + } } pub fn execute(&mut self, js_source: &str) -> Result<(), JSError> { self.isolate.execute(js_source) } + + pub fn execute_mod( + &mut self, + js_filename: &str, + is_prefetch: bool, + ) -> Result<(), RustOrJsError> { + self.isolate.execute_mod(js_filename, is_prefetch) + } } impl<B: WorkerBehavior> Future for Worker<B> { @@ -56,47 +122,48 @@ impl<B: WorkerBehavior> Future for Worker<B> { } } +/// Method and data used to initalize a worker +pub enum WorkerInit { + Script(String), + Module(String), +} + pub fn spawn<B: WorkerBehavior + 'static>( behavior: B, - js_source: String, -) -> resources::Resource { - // TODO This function should return a Future, so that the caller can retrieve - // the JSError if one is thrown. Currently it just prints to stderr and calls - // exit(1). - // let (js_error_tx, js_error_rx) = oneshot::channel::<JSError>(); - let (p, c) = oneshot::channel::<resources::Resource>(); - let builder = thread::Builder::new().name("worker".to_string()); - - let _tid = builder - .spawn(move || { - tokio_util::run(lazy(move || { - let (mut worker, external_channels) = Worker::new(behavior); - let resource = resources::add_worker(external_channels); - p.send(resource.clone()).unwrap(); - - worker - .execute("denoMain()") - .expect("worker denoMain failed"); - worker - .execute("workerMain()") - .expect("worker workerMain failed"); - worker.execute(&js_source).expect("worker js_source failed"); + worker_debug_name: &str, + init: WorkerInit, +) -> Result<Worker<B>, RustOrJsError> { + let state = behavior.state().clone(); + let mut worker = Worker::new(behavior); - worker.then(move |r| -> Result<(), ()> { - resource.close(); - debug!("workers.rs after resource close"); - if let Err(err) = r { - eprintln!("{}", JSErrorColor(&err).to_string()); - std::process::exit(1); - } - Ok(()) - }) - })); + worker + .execute(&format!("denoMain('{}')", worker_debug_name)) + .expect("worker workerInit failed"); + + worker + .execute("workerMain()") + .expect("worker workerMain failed"); - debug!("workers.rs after spawn"); - }).unwrap(); + let init_result = match init { + WorkerInit::Script(script) => match worker.execute(&script) { + Ok(v) => Ok(v), + Err(e) => Err(RustOrJsError::Js(e)), + }, + WorkerInit::Module(specifier) => { + let should_prefetch = state.flags.prefetch || state.flags.info; + match state.dir.resolve_module_url(&specifier, ".") { + Err(err) => Err(RustOrJsError::Rust(DenoError::from(err))), + Ok(module_url) => { + worker.execute_mod(&module_url.to_string(), should_prefetch) + } + } + } + }; - c.wait().unwrap() + match init_result { + Ok(_) => Ok(worker), + Err(err) => Err(err), + } } #[cfg(test)] @@ -104,63 +171,117 @@ mod tests { use super::*; use crate::compiler::CompilerBehavior; use crate::isolate_state::IsolateState; - use std::sync::Arc; + use crate::js_errors::JSErrorColor; + use crate::tokio_util; + use futures::future::lazy; + use std::thread; #[test] fn test_spawn() { - let resource = spawn( - CompilerBehavior::new(Arc::new(IsolateState::mock())), - r#" - onmessage = function(e) { - let s = new TextDecoder().decode(e.data);; - console.log("msg from main script", s); - if (s == "exit") { - close(); - return; - } else { - console.assert(s === "hi"); + tokio_util::init(|| { + let worker_result = spawn( + CompilerBehavior::new( + IsolateState::mock().flags.clone(), + IsolateState::mock().argv.clone(), + ), + "TEST", + WorkerInit::Script( + r#" + onmessage = function(e) { + console.log("msg from main script", e.data); + if (e.data == "exit") { + close(); + return; + } else { + console.assert(e.data === "hi"); + } + postMessage([1, 2, 3]); + console.log("after postMessage"); } - postMessage(new Uint8Array([1, 2, 3])); - console.log("after postMessage"); - } - "#.into(), - ); - let msg = String::from("hi").into_boxed_str().into_boxed_bytes(); + "#.into(), + ), + ); + assert!(worker_result.is_ok()); + let worker = worker_result.unwrap(); + let resource = worker.resource.clone(); + let resource_ = resource.clone(); - let r = resources::worker_post_message(resource.rid, msg).wait(); - assert!(r.is_ok()); + tokio::spawn(lazy(move || { + worker.then(move |r| -> Result<(), ()> { + resource_.close(); + debug!("workers.rs after resource close"); + if let Err(err) = r { + eprintln!("{}", JSErrorColor(&err).to_string()); + assert!(false) + } + Ok(()) + }) + })); - let maybe_msg = - resources::worker_recv_message(resource.rid).wait().unwrap(); - assert!(maybe_msg.is_some()); - assert_eq!(*maybe_msg.unwrap(), [1, 2, 3]); + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let msg = String::from("exit").into_boxed_str().into_boxed_bytes(); - let r = resources::worker_post_message(resource.rid, msg).wait(); - assert!(r.is_ok()); + let r = resources::post_message_to_worker(resource.rid, msg).wait(); + assert!(r.is_ok()); + + let maybe_msg = resources::get_message_from_worker(resource.rid) + .wait() + .unwrap(); + assert!(maybe_msg.is_some()); + // Check if message received is [1, 2, 3] in json + assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]"); + + let msg = json!("exit") + .to_string() + .into_boxed_str() + .into_boxed_bytes(); + let r = resources::post_message_to_worker(resource.rid, msg).wait(); + assert!(r.is_ok()); + }) } #[test] fn removed_from_resource_table_on_close() { - let resource = spawn( - CompilerBehavior::new(Arc::new(IsolateState::mock())), - "onmessage = () => close();".into(), - ); - - assert_eq!( - resources::get_type(resource.rid), - Some("worker".to_string()) - ); - - let msg = String::from("hi").into_boxed_str().into_boxed_bytes(); - let r = resources::worker_post_message(resource.rid, msg).wait(); - assert!(r.is_ok()); - println!("rid {:?}", resource.rid); - - // TODO Need a way to get a future for when a resource closes. - // For now, just sleep for a bit. - // resource.close(); - thread::sleep(std::time::Duration::from_millis(1000)); - assert_eq!(resources::get_type(resource.rid), None); + tokio_util::init(|| { + let worker_result = spawn( + CompilerBehavior::new( + IsolateState::mock().flags.clone(), + IsolateState::mock().argv.clone(), + ), + "TEST", + WorkerInit::Script("onmessage = () => close();".into()), + ); + assert!(worker_result.is_ok()); + let worker = worker_result.unwrap(); + let resource = worker.resource.clone(); + let resource_ = resource.clone(); + + tokio::spawn(lazy(move || { + worker.then(move |r| -> Result<(), ()> { + resource_.close(); + debug!("workers.rs after resource close"); + if let Err(err) = r { + eprintln!("{}", JSErrorColor(&err).to_string()); + assert!(false) + } + Ok(()) + }) + })); + + assert_eq!( + resources::get_type(resource.rid), + Some("worker".to_string()) + ); + + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let r = resources::post_message_to_worker(resource.rid, msg).wait(); + assert!(r.is_ok()); + println!("rid {:?}", resource.rid); + + // TODO Need a way to get a future for when a resource closes. + // For now, just sleep for a bit. + // resource.close(); + thread::sleep(std::time::Duration::from_millis(1000)); + assert_eq!(resources::get_type(resource.rid), None); + }) } } |