summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--BUILD.gn1
-rw-r--r--js/globals.ts6
-rw-r--r--js/workers.ts75
-rw-r--r--src/flags.rs2
-rw-r--r--src/isolate.rs38
-rw-r--r--src/main.rs3
-rw-r--r--src/msg.fbs15
-rw-r--r--src/ops.rs173
-rw-r--r--src/resources.rs64
-rw-r--r--src/workers.rs148
10 files changed, 470 insertions, 55 deletions
diff --git a/BUILD.gn b/BUILD.gn
index c183fe3e9..8e013a0e8 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -99,6 +99,7 @@ ts_sources = [
"js/url.ts",
"js/url_search_params.ts",
"js/util.ts",
+ "js/workers.ts",
"js/write_file.ts",
"tsconfig.json",
diff --git a/js/globals.ts b/js/globals.ts
index 663215341..849d2bb3f 100644
--- a/js/globals.ts
+++ b/js/globals.ts
@@ -19,6 +19,7 @@ import * as textEncoding from "./text_encoding";
import * as timers from "./timers";
import * as url from "./url";
import * as urlSearchParams from "./url_search_params";
+import * as workers from "./workers";
// These imports are not exposed and therefore are fine to just import the
// symbols required.
@@ -86,3 +87,8 @@ window.TextEncoder = textEncoding.TextEncoder;
export type TextEncoder = textEncoding.TextEncoder;
window.TextDecoder = textEncoding.TextDecoder;
export type TextDecoder = textEncoding.TextDecoder;
+
+window.workerMain = workers.workerMain;
+// TODO These shouldn't be available in main isolate.
+window.postMessage = workers.postMessage;
+window.close = workers.workerClose;
diff --git a/js/workers.ts b/js/workers.ts
new file mode 100644
index 000000000..f7aa857fc
--- /dev/null
+++ b/js/workers.ts
@@ -0,0 +1,75 @@
+// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+import * as dispatch from "./dispatch";
+import { libdeno } from "./libdeno";
+import * as msg from "gen/msg_generated";
+import * as flatbuffers from "./flatbuffers";
+import { assert, log } from "./util";
+import { globalEval } from "./global_eval";
+
+export async function postMessage(data: Uint8Array): Promise<void> {
+ const builder = flatbuffers.createBuilder();
+ msg.WorkerPostMessage.startWorkerPostMessage(builder);
+ const inner = msg.WorkerPostMessage.endWorkerPostMessage(builder);
+ const baseRes = await dispatch.sendAsync(
+ builder,
+ msg.Any.WorkerPostMessage,
+ inner,
+ data
+ );
+ assert(baseRes != null);
+}
+
+export async function getMessage(): Promise<null | Uint8Array> {
+ log("getMessage");
+ const builder = flatbuffers.createBuilder();
+ msg.WorkerGetMessage.startWorkerGetMessage(builder);
+ const inner = msg.WorkerGetMessage.endWorkerGetMessage(builder);
+ const baseRes = await dispatch.sendAsync(
+ builder,
+ msg.Any.WorkerGetMessage,
+ inner
+ );
+ assert(baseRes != null);
+ assert(
+ msg.Any.WorkerGetMessageRes === baseRes!.innerType(),
+ `base.innerType() unexpectedly is ${baseRes!.innerType()}`
+ );
+ const res = new msg.WorkerGetMessageRes();
+ assert(baseRes!.inner(res) != null);
+
+ const dataArray = res.dataArray();
+ if (dataArray == null) {
+ return null;
+ } else {
+ return new Uint8Array(dataArray!);
+ }
+}
+
+let isClosing = false;
+
+export function workerClose(): void {
+ isClosing = true;
+}
+
+export async function workerMain() {
+ log("workerMain");
+ libdeno.recv(dispatch.handleAsyncMsgFromRust);
+
+ // TODO avoid using globalEval to get Window. But circular imports if getting
+ // it from globals.ts
+ const window = globalEval("this");
+
+ while (!isClosing) {
+ const data = await getMessage();
+ if (data == null) {
+ log("workerMain got null message. quitting.");
+ break;
+ }
+ if (window["onmessage"]) {
+ const event = { data };
+ window.onmessage(event);
+ } else {
+ break;
+ }
+ }
+}
diff --git a/src/flags.rs b/src/flags.rs
index befb15ab8..5e6855a3d 100644
--- a/src/flags.rs
+++ b/src/flags.rs
@@ -15,7 +15,7 @@ macro_rules! svec {
}
#[cfg_attr(feature = "cargo-clippy", allow(stutter))]
-#[derive(Debug, PartialEq, Default)]
+#[derive(Clone, Debug, PartialEq, Default)]
pub struct DenoFlags {
pub help: bool,
pub log_debug: bool,
diff --git a/src/isolate.rs b/src/isolate.rs
index a2e5ae275..c4174de3f 100644
--- a/src/isolate.rs
+++ b/src/isolate.rs
@@ -12,6 +12,7 @@ use js_errors::JSError;
use libdeno;
use permissions::DenoPermissions;
+use futures::sync::mpsc as async_mpsc;
use futures::Future;
use libc::c_char;
use libc::c_void;
@@ -23,6 +24,7 @@ use std::ffi::CString;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
+use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use tokio;
@@ -53,6 +55,10 @@ pub struct Isolate {
pub state: Arc<IsolateState>,
}
+pub type WorkerSender = async_mpsc::Sender<Buf>;
+pub type WorkerReceiver = async_mpsc::Receiver<Buf>;
+pub type WorkerChannels = (WorkerSender, WorkerReceiver);
+
// Isolate cannot be passed between threads but IsolateState can.
// IsolateState satisfies Send and Sync.
// So any state that needs to be accessed outside the main V8 thread should be
@@ -64,20 +70,35 @@ pub struct IsolateState {
pub permissions: DenoPermissions,
pub flags: flags::DenoFlags,
pub metrics: Metrics,
+ pub worker_channels: Option<Mutex<WorkerChannels>>,
}
impl IsolateState {
- pub fn new(flags: flags::DenoFlags, argv_rest: Vec<String>) -> Self {
+ pub fn new(
+ flags: flags::DenoFlags,
+ argv_rest: Vec<String>,
+ worker_channels: Option<WorkerChannels>,
+ ) -> Self {
let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok();
+
Self {
dir: deno_dir::DenoDir::new(flags.reload, custom_root).unwrap(),
argv: argv_rest,
permissions: DenoPermissions::new(&flags),
flags,
metrics: Metrics::default(),
+ worker_channels: worker_channels.map(|wc| Mutex::new(wc)),
}
}
+ #[cfg(test)]
+ pub fn mock() -> Arc<IsolateState> {
+ let argv = vec![String::from("./deno"), String::from("hello.js")];
+ // For debugging: argv.push_back(String::from("-D"));
+ let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
+ Arc::new(IsolateState::new(flags, rest_argv, None))
+ }
+
#[inline]
pub fn check_write(&self, filename: &str) -> DenoResult<()> {
self.permissions.check_write(filename)
@@ -451,10 +472,7 @@ mod tests {
#[test]
fn test_dispatch_sync() {
- let argv = vec![String::from("./deno"), String::from("hello.js")];
- let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
-
- let state = Arc::new(IsolateState::new(flags, rest_argv));
+ let state = IsolateState::mock();
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, dispatch_sync);
tokio_util::init(|| {
@@ -493,9 +511,7 @@ mod tests {
#[test]
fn test_metrics_sync() {
- let argv = vec![String::from("./deno"), String::from("hello.js")];
- let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
- let state = Arc::new(IsolateState::new(flags, rest_argv));
+ let state = IsolateState::mock();
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, metrics_dispatch_sync);
tokio_util::init(|| {
@@ -529,9 +545,7 @@ mod tests {
#[test]
fn test_metrics_async() {
- let argv = vec![String::from("./deno"), String::from("hello.js")];
- let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
- let state = Arc::new(IsolateState::new(flags, rest_argv));
+ let state = IsolateState::mock();
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, metrics_dispatch_async);
tokio_util::init(|| {
@@ -619,7 +633,7 @@ mod tests {
let argv = vec![String::from("./deno"), String::from(filename)];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
- let state = Arc::new(IsolateState::new(flags, rest_argv));
+ let state = Arc::new(IsolateState::new(flags, rest_argv, None));
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, dispatch_sync);
tokio_util::init(|| {
diff --git a/src/main.rs b/src/main.rs
index 364a9cf7e..75cc61b58 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -47,6 +47,7 @@ pub mod snapshot;
mod tokio_util;
mod tokio_write;
pub mod version;
+mod workers;
#[cfg(unix)]
mod eager_unix;
@@ -96,7 +97,7 @@ fn main() {
log::LevelFilter::Warn
});
- let state = Arc::new(isolate::IsolateState::new(flags, rest_argv));
+ let state = Arc::new(isolate::IsolateState::new(flags, rest_argv, None));
let snapshot = snapshot::deno_snapshot();
let isolate = isolate::Isolate::new(snapshot, state, ops::dispatch);
tokio_util::init(|| {
diff --git a/src/msg.fbs b/src/msg.fbs
index 989fafd0b..a9afb195f 100644
--- a/src/msg.fbs
+++ b/src/msg.fbs
@@ -1,6 +1,9 @@
union Any {
Start,
StartRes,
+ WorkerGetMessage,
+ WorkerGetMessageRes,
+ WorkerPostMessage,
CodeFetch,
CodeFetchRes,
CodeCache,
@@ -149,6 +152,18 @@ table StartRes {
v8_version: string;
}
+table WorkerGetMessage {
+ unused: int8;
+}
+
+table WorkerGetMessageRes {
+ data: [ubyte];
+}
+
+table WorkerPostMessage {
+ // data passed thru the zero-copy data parameter.
+}
+
table CodeFetch {
specifier: string;
referrer: string;
diff --git a/src/ops.rs b/src/ops.rs
index d678e9727..cf25f29e0 100644
--- a/src/ops.rs
+++ b/src/ops.rs
@@ -16,7 +16,10 @@ use version;
use flatbuffers::FlatBufferBuilder;
use futures;
+use futures::Async;
use futures::Poll;
+use futures::Sink;
+use futures::Stream;
use hyper;
use hyper::rt::Future;
use remove_dir_all::remove_dir_all;
@@ -34,6 +37,7 @@ use std::path::Path;
use std::path::PathBuf;
use std::process::Command;
use std::str::FromStr;
+use std::sync::Arc;
use std::time::UNIX_EPOCH;
use std::time::{Duration, Instant};
use tokio;
@@ -48,7 +52,7 @@ type OpResult = DenoResult<Buf>;
// TODO Ideally we wouldn't have to box the Op being returned.
// The box is just to make it easier to get a prototype refactor working.
type OpCreator =
- fn(state: &IsolateState, base: &msg::Base, data: libdeno::deno_buf)
+ fn(state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf)
-> Box<Op>;
#[inline]
@@ -113,8 +117,10 @@ pub fn dispatch(
msg::Any::Stat => op_stat,
msg::Any::Symlink => op_symlink,
msg::Any::Truncate => op_truncate,
- msg::Any::WriteFile => op_write_file,
+ msg::Any::WorkerGetMessage => op_worker_get_message,
+ msg::Any::WorkerPostMessage => op_worker_post_message,
msg::Any::Write => op_write,
+ msg::Any::WriteFile => op_write_file,
_ => panic!(format!(
"Unhandled message {}",
msg::enum_name_any(inner_type)
@@ -168,7 +174,7 @@ pub fn dispatch(
}
fn op_exit(
- _config: &IsolateState,
+ _config: &Arc<IsolateState>,
base: &msg::Base,
_data: libdeno::deno_buf,
) -> Box<Op> {
@@ -177,7 +183,7 @@ fn op_exit(
}
fn op_start(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -238,19 +244,19 @@ fn serialize_response(
}
#[inline]
-fn ok_future(buf: Buf) -> Box<Op> {
+pub fn ok_future(buf: Buf) -> Box<Op> {
Box::new(futures::future::ok(buf))
}
// Shout out to Earl Sweatshirt.
#[inline]
-fn odd_future(err: DenoError) -> Box<Op> {
+pub fn odd_future(err: DenoError) -> Box<Op> {
Box::new(futures::future::err(err))
}
// https://github.com/denoland/deno/blob/golang/os.go#L100-L154
fn op_code_fetch(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -293,7 +299,7 @@ fn op_code_fetch(
// https://github.com/denoland/deno/blob/golang/os.go#L156-L169
fn op_code_cache(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -312,7 +318,7 @@ fn op_code_cache(
}
fn op_chdir(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -344,7 +350,7 @@ fn op_set_timeout(
}
fn op_set_env(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -360,7 +366,7 @@ fn op_set_env(
}
fn op_env(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -392,7 +398,7 @@ fn op_env(
}
fn op_fetch(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -479,7 +485,7 @@ where
}
fn op_make_temp_dir(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -528,7 +534,7 @@ fn op_make_temp_dir(
}
fn op_mkdir(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -548,7 +554,7 @@ fn op_mkdir(
}
fn op_chmod(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -581,7 +587,7 @@ fn op_chmod(
}
fn op_open(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -657,7 +663,7 @@ fn op_open(
}
fn op_close(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -674,7 +680,7 @@ fn op_close(
}
fn op_shutdown(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -700,7 +706,7 @@ fn op_shutdown(
}
fn op_read(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -738,7 +744,7 @@ fn op_read(
}
fn op_write(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -775,7 +781,7 @@ fn op_write(
}
fn op_remove(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -805,7 +811,7 @@ fn op_remove(
// Prototype https://github.com/denoland/deno/blob/golang/os.go#L171-L184
fn op_read_file(
- _config: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -839,7 +845,7 @@ fn op_read_file(
}
fn op_copy_file(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -891,7 +897,7 @@ fn get_mode(_perm: &fs::Permissions) -> u32 {
}
fn op_cwd(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -917,7 +923,7 @@ fn op_cwd(
}
fn op_stat(
- _config: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -964,7 +970,7 @@ fn op_stat(
}
fn op_read_dir(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1022,7 +1028,7 @@ fn op_read_dir(
}
fn op_write_file(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1042,7 +1048,7 @@ fn op_write_file(
}
fn op_rename(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1062,7 +1068,7 @@ fn op_rename(
}
fn op_symlink(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1091,7 +1097,7 @@ fn op_symlink(
}
fn op_read_link(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1124,7 +1130,7 @@ fn op_read_link(
}
fn op_repl_start(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1155,7 +1161,7 @@ fn op_repl_start(
}
fn op_repl_readline(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1193,7 +1199,7 @@ fn op_repl_readline(
}
fn op_truncate(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1216,7 +1222,7 @@ fn op_truncate(
}
fn op_listen(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1282,7 +1288,7 @@ fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> OpResult {
}
fn op_accept(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1308,7 +1314,7 @@ fn op_accept(
}
fn op_dial(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1332,7 +1338,7 @@ fn op_dial(
}
fn op_metrics(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1356,7 +1362,7 @@ fn op_metrics(
}
fn op_resources(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1408,7 +1414,7 @@ fn subprocess_stdio_map(v: msg::ProcessStdio) -> std::process::Stdio {
}
fn op_run(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1476,7 +1482,7 @@ fn op_run(
}
fn op_run_status(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1530,3 +1536,90 @@ fn op_run_status(
});
Box::new(future)
}
+
+struct GetMessageFuture {
+ pub state: Arc<IsolateState>,
+}
+
+impl Future for GetMessageFuture {
+ type Item = Option<Buf>;
+ type Error = ();
+
+ fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
+ assert!(self.state.worker_channels.is_some());
+ match self.state.worker_channels {
+ None => panic!("expected worker_channels"),
+ Some(ref wc) => {
+ let mut wc = wc.lock().unwrap();
+ wc.1.poll()
+ }
+ }
+ }
+}
+
+fn op_worker_get_message(
+ state: &Arc<IsolateState>,
+ base: &msg::Base,
+ data: libdeno::deno_buf,
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
+ let cmd_id = base.cmd_id();
+
+ let op = GetMessageFuture {
+ state: state.clone(),
+ };
+ let op = op.map_err(move |_| -> DenoError { unimplemented!() });
+ let op = op.and_then(move |maybe_buf| -> DenoResult<Buf> {
+ debug!("op_worker_get_message");
+ let builder = &mut FlatBufferBuilder::new();
+
+ let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf));
+ let inner = msg::WorkerGetMessageRes::create(
+ builder,
+ &msg::WorkerGetMessageResArgs { data },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(inner.as_union_value()),
+ inner_type: msg::Any::WorkerGetMessageRes,
+ ..Default::default()
+ },
+ ))
+ });
+ Box::new(op)
+}
+
+fn op_worker_post_message(
+ state: &Arc<IsolateState>,
+ base: &msg::Base,
+ data: libdeno::deno_buf,
+) -> Box<Op> {
+ let cmd_id = base.cmd_id();
+
+ let d = Vec::from(data.as_ref()).into_boxed_slice();
+
+ assert!(state.worker_channels.is_some());
+ let tx = match state.worker_channels {
+ None => panic!("expected worker_channels"),
+ Some(ref wc) => {
+ let mut wc = wc.lock().unwrap();
+ wc.0.clone()
+ }
+ };
+ let op = tx.send(d);
+ let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string()));
+ let op = op.and_then(move |_| -> DenoResult<Buf> {
+ let builder = &mut FlatBufferBuilder::new();
+
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
+ });
+ Box::new(op)
+}
diff --git a/src/resources.rs b/src/resources.rs
index f1497f214..69173fe85 100644
--- a/src/resources.rs
+++ b/src/resources.rs
@@ -10,10 +10,12 @@
#[cfg(unix)]
use eager_unix as eager;
+use errors;
use errors::bad_resource;
use errors::DenoError;
use errors::DenoResult;
use http_body::HttpBody;
+use isolate::WorkerChannels;
use repl::Repl;
use tokio_util;
use tokio_write;
@@ -22,7 +24,10 @@ use futures;
use futures::future::{Either, FutureResult};
use futures::Future;
use futures::Poll;
+use futures::Sink;
+use futures::Stream;
use hyper;
+use isolate::Buf;
use std;
use std::collections::HashMap;
use std::io::{Error, Read, Write};
@@ -96,6 +101,14 @@ enum Repr {
ChildStdin(tokio_process::ChildStdin),
ChildStdout(tokio_process::ChildStdout),
ChildStderr(tokio_process::ChildStderr),
+ Worker(WorkerChannels),
+}
+
+/// If the given rid is open, this returns the type of resource, E.G. "worker".
+/// If the rid is closed or was never open, it returns None.
+pub fn get_type(rid: ResourceId) -> Option<String> {
+ let table = RESOURCE_TABLE.lock().unwrap();
+ table.get(&rid).map(inspect_repr)
}
pub fn table_entries() -> Vec<(u32, String)> {
@@ -131,6 +144,7 @@ fn inspect_repr(repr: &Repr) -> String {
Repr::ChildStdin(_) => "childStdin",
Repr::ChildStdout(_) => "childStdout",
Repr::ChildStderr(_) => "childStderr",
+ Repr::Worker(_) => "worker",
};
String::from(h_repr)
@@ -138,7 +152,7 @@ fn inspect_repr(repr: &Repr) -> String {
// Abstract async file interface.
// Ideally in unix, if Resource represents an OS rid, it will be the same.
-#[derive(Debug)]
+#[derive(Clone, Debug)]
pub struct Resource {
pub rid: ResourceId,
}
@@ -284,6 +298,54 @@ pub fn add_repl(repl: Repl) -> Resource {
Resource { rid }
}
+pub fn add_worker(wc: WorkerChannels) -> Resource {
+ let rid = new_rid();
+ let mut tg = RESOURCE_TABLE.lock().unwrap();
+ let r = tg.insert(rid, Repr::Worker(wc));
+ assert!(r.is_none());
+ Resource { rid }
+}
+
+pub fn worker_post_message(
+ rid: ResourceId,
+ buf: Buf,
+) -> futures::sink::Send<futures::sync::mpsc::Sender<Buf>> {
+ let mut table = RESOURCE_TABLE.lock().unwrap();
+ let maybe_repr = table.get_mut(&rid);
+ match maybe_repr {
+ Some(Repr::Worker(ref mut wc)) => {
+ // unwrap here is incorrect, but doing it anyway
+ wc.0.clone().send(buf)
+ }
+ _ => panic!("bad resource"), // futures::future::err(bad_resource()).into(),
+ }
+}
+
+pub struct WorkerReceiver {
+ rid: ResourceId,
+}
+
+// Invert the dumbness that tokio_process causes by making Child itself a future.
+impl Future for WorkerReceiver {
+ type Item = Option<Buf>;
+ type Error = DenoError;
+
+ fn poll(&mut self) -> Poll<Option<Buf>, DenoError> {
+ let mut table = RESOURCE_TABLE.lock().unwrap();
+ let maybe_repr = table.get_mut(&self.rid);
+ match maybe_repr {
+ Some(Repr::Worker(ref mut wc)) => wc.1.poll().map_err(|()| {
+ errors::new(errors::ErrorKind::Other, "recv msg error".to_string())
+ }),
+ _ => Err(bad_resource()),
+ }
+ }
+}
+
+pub fn worker_recv_message(rid: ResourceId) -> WorkerReceiver {
+ WorkerReceiver { rid }
+}
+
#[cfg_attr(feature = "cargo-clippy", allow(stutter))]
pub struct ChildResources {
pub child_rid: ResourceId,
diff --git a/src/workers.rs b/src/workers.rs
new file mode 100644
index 000000000..319f4018d
--- /dev/null
+++ b/src/workers.rs
@@ -0,0 +1,148 @@
+// Copyright 2018 the Deno authors. All rights reserved. MIT license.
+
+#![allow(dead_code)]
+
+use isolate::Buf;
+use isolate::Isolate;
+use isolate::IsolateState;
+use isolate::WorkerChannels;
+use js_errors::JSError;
+use ops;
+use resources;
+use snapshot;
+use tokio_util;
+
+use futures::sync::mpsc;
+use futures::sync::oneshot;
+use futures::Future;
+use std::sync::Arc;
+use std::thread;
+
+/// Rust interface for WebWorkers.
+pub struct Worker {
+ isolate: Isolate,
+}
+
+impl Worker {
+ pub fn new(parent_state: &Arc<IsolateState>) -> (Self, WorkerChannels) {
+ let (worker_in_tx, worker_in_rx) = mpsc::channel::<Buf>(1);
+ let (worker_out_tx, worker_out_rx) = mpsc::channel::<Buf>(1);
+
+ let internal_channels = (worker_out_tx, worker_in_rx);
+ let external_channels = (worker_in_tx, worker_out_rx);
+
+ let state = Arc::new(IsolateState::new(
+ parent_state.flags.clone(),
+ parent_state.argv.clone(),
+ Some(internal_channels),
+ ));
+
+ let snapshot = snapshot::deno_snapshot();
+ let isolate = Isolate::new(snapshot, state, ops::dispatch);
+
+ let worker = Worker { isolate };
+ (worker, external_channels)
+ }
+
+ pub fn execute(&self, js_source: &str) -> Result<(), JSError> {
+ self.isolate.execute(js_source)
+ }
+
+ pub fn event_loop(&self) -> Result<(), JSError> {
+ self.isolate.event_loop()
+ }
+}
+
+fn spawn(state: Arc<IsolateState>, js_source: String) -> resources::Resource {
+ // TODO This function should return a Future, so that the caller can retrieve
+ // the JSError if one is thrown. Currently it just prints to stderr and calls
+ // exit(1).
+ // let (js_error_tx, js_error_rx) = oneshot::channel::<JSError>();
+ let (p, c) = oneshot::channel::<resources::Resource>();
+ let builder = thread::Builder::new().name("worker".to_string());
+ let _tid = builder
+ .spawn(move || {
+ let (worker, external_channels) = Worker::new(&state);
+
+ let mut resource = resources::add_worker(external_channels);
+ p.send(resource.clone()).unwrap();
+
+ tokio_util::init(|| {
+ (|| -> Result<(), JSError> {
+ worker.execute("workerMain()")?;
+ worker.execute(&js_source)?;
+ worker.event_loop()?;
+ Ok(())
+ })().or_else(|err: JSError| -> Result<(), JSError> {
+ eprintln!("{}", err.to_string());
+ std::process::exit(1)
+ }).unwrap();
+ });
+
+ resource.close();
+ }).unwrap();
+
+ let resource = c.wait().unwrap();
+
+ resource
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_spawn() {
+ let resource = spawn(
+ IsolateState::mock(),
+ r#"
+ onmessage = function(e) {
+ let s = new TextDecoder().decode(e.data);;
+ console.log("msg from main script", s);
+ if (s == "exit") {
+ close();
+ return;
+ } else {
+ console.assert(s === "hi");
+ }
+ postMessage(new Uint8Array([1, 2, 3]));
+ console.log("after postMessage");
+ }
+ "#.into(),
+ );
+ let msg = String::from("hi").into_boxed_str().into_boxed_bytes();
+
+ let r = resources::worker_post_message(resource.rid, msg).wait();
+ assert!(r.is_ok());
+
+ let maybe_msg =
+ resources::worker_recv_message(resource.rid).wait().unwrap();
+ assert!(maybe_msg.is_some());
+ assert_eq!(*maybe_msg.unwrap(), [1, 2, 3]);
+
+ let msg = String::from("exit").into_boxed_str().into_boxed_bytes();
+ let r = resources::worker_post_message(resource.rid, msg).wait();
+ assert!(r.is_ok());
+ }
+
+ #[test]
+ fn removed_from_resource_table_on_close() {
+ let resource =
+ spawn(IsolateState::mock(), "onmessage = () => close();".into());
+
+ assert_eq!(
+ resources::get_type(resource.rid),
+ Some("worker".to_string())
+ );
+
+ let msg = String::from("hi").into_boxed_str().into_boxed_bytes();
+ let r = resources::worker_post_message(resource.rid, msg).wait();
+ assert!(r.is_ok());
+ println!("rid {:?}", resource.rid);
+
+ // TODO Need a way to get a future for when a resource closes.
+ // For now, just sleep for a bit.
+ thread::sleep(std::time::Duration::from_millis(100));
+ assert_eq!(resources::get_type(resource.rid), None);
+ }
+}