summaryrefslogtreecommitdiff
path: root/src/workers.rs
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2019-01-08 14:44:06 -0500
committerGitHub <noreply@github.com>2019-01-08 14:44:06 -0500
commit6f79ad721a9f8c9d66d79f21ea479286f3ca5374 (patch)
tree3f8bad38a5bc109586d86846cb299717b425cdba /src/workers.rs
parent9ff6bca86388dab2cfa26e1712822f105985425e (diff)
Minimal Worker support (#1476)
This adds the ability to spawn additional Isolates from Rust and send and receive messages from them. This is preliminary work to support running the typescript compiler in a separate isolate and thus support native ES modules. Ref #975.
Diffstat (limited to 'src/workers.rs')
-rw-r--r--src/workers.rs148
1 files changed, 148 insertions, 0 deletions
diff --git a/src/workers.rs b/src/workers.rs
new file mode 100644
index 000000000..319f4018d
--- /dev/null
+++ b/src/workers.rs
@@ -0,0 +1,148 @@
+// Copyright 2018 the Deno authors. All rights reserved. MIT license.
+
+#![allow(dead_code)]
+
+use isolate::Buf;
+use isolate::Isolate;
+use isolate::IsolateState;
+use isolate::WorkerChannels;
+use js_errors::JSError;
+use ops;
+use resources;
+use snapshot;
+use tokio_util;
+
+use futures::sync::mpsc;
+use futures::sync::oneshot;
+use futures::Future;
+use std::sync::Arc;
+use std::thread;
+
+/// Rust interface for WebWorkers.
+pub struct Worker {
+ isolate: Isolate,
+}
+
+impl Worker {
+ pub fn new(parent_state: &Arc<IsolateState>) -> (Self, WorkerChannels) {
+ let (worker_in_tx, worker_in_rx) = mpsc::channel::<Buf>(1);
+ let (worker_out_tx, worker_out_rx) = mpsc::channel::<Buf>(1);
+
+ let internal_channels = (worker_out_tx, worker_in_rx);
+ let external_channels = (worker_in_tx, worker_out_rx);
+
+ let state = Arc::new(IsolateState::new(
+ parent_state.flags.clone(),
+ parent_state.argv.clone(),
+ Some(internal_channels),
+ ));
+
+ let snapshot = snapshot::deno_snapshot();
+ let isolate = Isolate::new(snapshot, state, ops::dispatch);
+
+ let worker = Worker { isolate };
+ (worker, external_channels)
+ }
+
+ pub fn execute(&self, js_source: &str) -> Result<(), JSError> {
+ self.isolate.execute(js_source)
+ }
+
+ pub fn event_loop(&self) -> Result<(), JSError> {
+ self.isolate.event_loop()
+ }
+}
+
+fn spawn(state: Arc<IsolateState>, js_source: String) -> resources::Resource {
+ // TODO This function should return a Future, so that the caller can retrieve
+ // the JSError if one is thrown. Currently it just prints to stderr and calls
+ // exit(1).
+ // let (js_error_tx, js_error_rx) = oneshot::channel::<JSError>();
+ let (p, c) = oneshot::channel::<resources::Resource>();
+ let builder = thread::Builder::new().name("worker".to_string());
+ let _tid = builder
+ .spawn(move || {
+ let (worker, external_channels) = Worker::new(&state);
+
+ let mut resource = resources::add_worker(external_channels);
+ p.send(resource.clone()).unwrap();
+
+ tokio_util::init(|| {
+ (|| -> Result<(), JSError> {
+ worker.execute("workerMain()")?;
+ worker.execute(&js_source)?;
+ worker.event_loop()?;
+ Ok(())
+ })().or_else(|err: JSError| -> Result<(), JSError> {
+ eprintln!("{}", err.to_string());
+ std::process::exit(1)
+ }).unwrap();
+ });
+
+ resource.close();
+ }).unwrap();
+
+ let resource = c.wait().unwrap();
+
+ resource
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_spawn() {
+ let resource = spawn(
+ IsolateState::mock(),
+ r#"
+ onmessage = function(e) {
+ let s = new TextDecoder().decode(e.data);;
+ console.log("msg from main script", s);
+ if (s == "exit") {
+ close();
+ return;
+ } else {
+ console.assert(s === "hi");
+ }
+ postMessage(new Uint8Array([1, 2, 3]));
+ console.log("after postMessage");
+ }
+ "#.into(),
+ );
+ let msg = String::from("hi").into_boxed_str().into_boxed_bytes();
+
+ let r = resources::worker_post_message(resource.rid, msg).wait();
+ assert!(r.is_ok());
+
+ let maybe_msg =
+ resources::worker_recv_message(resource.rid).wait().unwrap();
+ assert!(maybe_msg.is_some());
+ assert_eq!(*maybe_msg.unwrap(), [1, 2, 3]);
+
+ let msg = String::from("exit").into_boxed_str().into_boxed_bytes();
+ let r = resources::worker_post_message(resource.rid, msg).wait();
+ assert!(r.is_ok());
+ }
+
+ #[test]
+ fn removed_from_resource_table_on_close() {
+ let resource =
+ spawn(IsolateState::mock(), "onmessage = () => close();".into());
+
+ assert_eq!(
+ resources::get_type(resource.rid),
+ Some("worker".to_string())
+ );
+
+ let msg = String::from("hi").into_boxed_str().into_boxed_bytes();
+ let r = resources::worker_post_message(resource.rid, msg).wait();
+ assert!(r.is_ok());
+ println!("rid {:?}", resource.rid);
+
+ // TODO Need a way to get a future for when a resource closes.
+ // For now, just sleep for a bit.
+ thread::sleep(std::time::Duration::from_millis(100));
+ assert_eq!(resources::get_type(resource.rid), None);
+ }
+}