summaryrefslogtreecommitdiff
path: root/cli/workers.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/workers.rs')
-rw-r--r--cli/workers.rs181
1 files changed, 181 insertions, 0 deletions
diff --git a/cli/workers.rs b/cli/workers.rs
new file mode 100644
index 000000000..edded7756
--- /dev/null
+++ b/cli/workers.rs
@@ -0,0 +1,181 @@
+// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+use crate::cli::Buf;
+use crate::cli::Cli;
+use crate::flags::DenoFlags;
+use crate::isolate::Isolate;
+use crate::isolate_state::IsolateState;
+use crate::isolate_state::WorkerChannels;
+use crate::js_errors::JSErrorColor;
+use crate::permissions::DenoPermissions;
+use crate::resources;
+use crate::tokio_util;
+use deno_core::JSError;
+use deno_core::StartupData;
+use futures::future::lazy;
+use futures::sync::mpsc;
+use futures::sync::oneshot;
+use futures::Future;
+use futures::Poll;
+use std::sync::Arc;
+use std::thread;
+
+/// Rust interface for WebWorkers.
+pub struct Worker {
+ isolate: Isolate,
+}
+
+impl Worker {
+ pub fn new(
+ startup_data: Option<StartupData>,
+ flags: DenoFlags,
+ argv: Vec<String>,
+ permissions: DenoPermissions,
+ ) -> (Self, WorkerChannels) {
+ let (worker_in_tx, worker_in_rx) = mpsc::channel::<Buf>(1);
+ let (worker_out_tx, worker_out_rx) = mpsc::channel::<Buf>(1);
+
+ let internal_channels = (worker_out_tx, worker_in_rx);
+ let external_channels = (worker_in_tx, worker_out_rx);
+
+ let state =
+ Arc::new(IsolateState::new(flags, argv, Some(internal_channels)));
+
+ let cli = Cli::new(startup_data, state, permissions);
+ let isolate = Isolate::new(cli);
+
+ let worker = Worker { isolate };
+ (worker, external_channels)
+ }
+
+ pub fn execute(&mut self, js_source: &str) -> Result<(), JSError> {
+ self.isolate.execute(js_source)
+ }
+}
+
+impl Future for Worker {
+ type Item = ();
+ type Error = JSError;
+
+ fn poll(&mut self) -> Poll<(), JSError> {
+ self.isolate.poll()
+ }
+}
+
+pub fn spawn(
+ startup_data: Option<StartupData>,
+ state: &IsolateState,
+ js_source: String,
+ permissions: DenoPermissions,
+) -> resources::Resource {
+ // TODO This function should return a Future, so that the caller can retrieve
+ // the JSError if one is thrown. Currently it just prints to stderr and calls
+ // exit(1).
+ // let (js_error_tx, js_error_rx) = oneshot::channel::<JSError>();
+ let (p, c) = oneshot::channel::<resources::Resource>();
+ let builder = thread::Builder::new().name("worker".to_string());
+
+ let flags = state.flags.clone();
+ let argv = state.argv.clone();
+
+ let _tid = builder
+ .spawn(move || {
+ tokio_util::run(lazy(move || {
+ let (mut worker, external_channels) =
+ Worker::new(startup_data, flags, argv, permissions);
+ let resource = resources::add_worker(external_channels);
+ p.send(resource.clone()).unwrap();
+
+ worker
+ .execute("denoMain()")
+ .expect("worker denoMain failed");
+ worker
+ .execute("workerMain()")
+ .expect("worker workerMain failed");
+ worker.execute(&js_source).expect("worker js_source failed");
+
+ worker.then(move |r| -> Result<(), ()> {
+ resource.close();
+ debug!("workers.rs after resource close");
+ if let Err(err) = r {
+ eprintln!("{}", JSErrorColor(&err).to_string());
+ std::process::exit(1);
+ }
+ Ok(())
+ })
+ }));
+
+ debug!("workers.rs after spawn");
+ }).unwrap();
+
+ c.wait().unwrap()
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::startup_data;
+
+ #[test]
+ fn test_spawn() {
+ let startup_data = startup_data::compiler_isolate_init();
+ let resource = spawn(
+ Some(startup_data),
+ &IsolateState::mock(),
+ r#"
+ onmessage = function(e) {
+ let s = new TextDecoder().decode(e.data);;
+ console.log("msg from main script", s);
+ if (s == "exit") {
+ close();
+ return;
+ } else {
+ console.assert(s === "hi");
+ }
+ postMessage(new Uint8Array([1, 2, 3]));
+ console.log("after postMessage");
+ }
+ "#.into(),
+ DenoPermissions::default(),
+ );
+ let msg = String::from("hi").into_boxed_str().into_boxed_bytes();
+
+ let r = resources::worker_post_message(resource.rid, msg).wait();
+ assert!(r.is_ok());
+
+ let maybe_msg =
+ resources::worker_recv_message(resource.rid).wait().unwrap();
+ assert!(maybe_msg.is_some());
+ assert_eq!(*maybe_msg.unwrap(), [1, 2, 3]);
+
+ let msg = String::from("exit").into_boxed_str().into_boxed_bytes();
+ let r = resources::worker_post_message(resource.rid, msg).wait();
+ assert!(r.is_ok());
+ }
+
+ #[test]
+ fn removed_from_resource_table_on_close() {
+ let startup_data = startup_data::compiler_isolate_init();
+ let resource = spawn(
+ Some(startup_data),
+ &IsolateState::mock(),
+ "onmessage = () => close();".into(),
+ DenoPermissions::default(),
+ );
+
+ assert_eq!(
+ resources::get_type(resource.rid),
+ Some("worker".to_string())
+ );
+
+ let msg = String::from("hi").into_boxed_str().into_boxed_bytes();
+ let r = resources::worker_post_message(resource.rid, msg).wait();
+ assert!(r.is_ok());
+ println!("rid {:?}", resource.rid);
+
+ // TODO Need a way to get a future for when a resource closes.
+ // For now, just sleep for a bit.
+ // resource.close();
+ thread::sleep(std::time::Duration::from_millis(1000));
+ assert_eq!(resources::get_type(resource.rid), None);
+ }
+}