summaryrefslogtreecommitdiff
path: root/src/isolate.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/isolate.rs')
-rw-r--r--src/isolate.rs244
1 files changed, 212 insertions, 32 deletions
diff --git a/src/isolate.rs b/src/isolate.rs
index 5daf45701..64bec5ddf 100644
--- a/src/isolate.rs
+++ b/src/isolate.rs
@@ -1,54 +1,113 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
+
+// Do not use FlatBuffers in this module.
+// TODO Currently this module uses Tokio, but it would be nice if they were
+// decoupled.
+
use deno_dir;
+use errors::DenoError;
use flags;
+use libdeno;
+
use futures;
-use handlers;
+use futures::Future;
use libc::c_void;
-use libdeno;
use std;
use std::collections::HashMap;
use std::ffi::CStr;
use std::ffi::CString;
+use std::sync::mpsc;
+use std::sync::Arc;
+use std::sync::Mutex;
use tokio;
+use tokio_util;
type DenoException<'a> = &'a str;
+// Buf represents a byte array returned from a "Op".
+// The message might be empty (which will be translated into a null object on
+// the javascript side) or it is a heap allocated opaque sequence of bytes.
+// Usually a flatbuffer message.
+pub type Buf = Box<[u8]>;
+
+// JS promises in Deno map onto a specific Future
+// which yields either a DenoError or a byte array.
+pub type Op = Future<Item = Buf, Error = DenoError> + Send;
+
+// Returns (is_sync, op)
+pub type Dispatch = fn(state: Arc<IsolateState>, buf: &[u8]) -> (bool, Box<Op>);
+
pub struct Isolate {
- pub ptr: *const libdeno::isolate,
+ ptr: *const libdeno::isolate,
+ dispatch: Dispatch,
+ rx: mpsc::Receiver<Buf>,
+ ntasks: i32,
+ pub state: Arc<IsolateState>,
+}
+
+// Isolate cannot be passed between threads but IsolateState can. So any state that
+// needs to be accessed outside the main V8 thread should be inside IsolateState.
+pub struct IsolateState {
pub dir: deno_dir::DenoDir,
- pub rt: tokio::runtime::current_thread::Runtime,
- pub timers: HashMap<u32, futures::sync::oneshot::Sender<()>>,
+ pub timers: Mutex<HashMap<u32, futures::sync::oneshot::Sender<()>>>,
pub argv: Vec<String>,
pub flags: flags::DenoFlags,
+ tx: Mutex<Option<mpsc::Sender<Buf>>>,
+}
+
+impl IsolateState {
+ // Thread safe.
+ fn send_to_js(&self, buf: Buf) {
+ let mut g = self.tx.lock().unwrap();
+ let maybe_tx = g.as_mut();
+ assert!(maybe_tx.is_some(), "Expected tx to not be deleted.");
+ let tx = maybe_tx.unwrap();
+ tx.send(buf).expect("tx.send error");
+ }
}
static DENO_INIT: std::sync::Once = std::sync::ONCE_INIT;
impl Isolate {
- pub fn new(argv: Vec<String>) -> Box<Isolate> {
+ pub fn new(argv: Vec<String>, dispatch: Dispatch) -> Box<Isolate> {
DENO_INIT.call_once(|| {
unsafe { libdeno::deno_init() };
});
let (flags, argv_rest) = flags::set_flags(argv);
- let mut deno_box = Box::new(Isolate {
+ // This channel handles sending async messages back to the runtime.
+ let (tx, rx) = mpsc::channel::<Buf>();
+
+ let mut isolate = Box::new(Isolate {
ptr: 0 as *const libdeno::isolate,
- dir: deno_dir::DenoDir::new(flags.reload, None).unwrap(),
- rt: tokio::runtime::current_thread::Runtime::new().unwrap(),
- timers: HashMap::new(),
- argv: argv_rest,
- flags,
+ dispatch,
+ rx,
+ ntasks: 0,
+ state: Arc::new(IsolateState {
+ dir: deno_dir::DenoDir::new(flags.reload, None).unwrap(),
+ timers: Mutex::new(HashMap::new()),
+ argv: argv_rest,
+ flags,
+ tx: Mutex::new(Some(tx)),
+ }),
});
- (*deno_box).ptr = unsafe {
+ (*isolate).ptr = unsafe {
libdeno::deno_new(
- deno_box.as_ref() as *const _ as *const c_void,
- handlers::msg_from_js,
+ isolate.as_ref() as *const _ as *const c_void,
+ pre_dispatch,
)
};
- deno_box
+ isolate
+ }
+
+ pub fn from_c<'a>(d: *const libdeno::isolate) -> &'a mut Isolate {
+ let ptr = unsafe { libdeno::deno_get_data(d) };
+ let ptr = ptr as *mut Isolate;
+ let isolate_box = unsafe { Box::from_raw(ptr) };
+ Box::leak(isolate_box)
}
pub fn execute(
@@ -68,6 +127,42 @@ impl Isolate {
}
Ok(())
}
+
+ pub fn set_response(&self, buf: Buf) {
+ unsafe { libdeno::deno_set_response(self.ptr, buf.into()) }
+ }
+
+ pub fn send(&self, buf: Buf) {
+ unsafe { libdeno::deno_send(self.ptr, buf.into()) };
+ }
+
+ // TODO Use Park abstraction? Note at time of writing Tokio default runtime
+ // does not have new_with_park().
+ pub fn event_loop(&mut self) {
+ // Main thread event loop.
+ while !self.is_idle() {
+ let buf = self.rx.recv().unwrap();
+ // Receiving a message on rx exactly corresponds to an async task
+ // completing.
+ self.ntasks_decrement();
+ // Call into JS with the buf.
+ self.send(buf);
+ }
+ }
+
+ fn ntasks_increment(&mut self) {
+ assert!(self.ntasks >= 0);
+ self.ntasks = self.ntasks + 1;
+ }
+
+ fn ntasks_decrement(&mut self) {
+ self.ntasks = self.ntasks - 1;
+ assert!(self.ntasks >= 0);
+ }
+
+ fn is_idle(&self) -> bool {
+ self.ntasks == 0
+ }
}
impl Drop for Isolate {
@@ -76,22 +171,107 @@ impl Drop for Isolate {
}
}
-pub fn from_c<'a>(i: *const libdeno::isolate) -> &'a mut Isolate {
- let ptr = unsafe { libdeno::deno_get_data(i) };
- let ptr = ptr as *mut Isolate;
- let isolate_box = unsafe { Box::from_raw(ptr) };
- Box::leak(isolate_box)
+/// Converts Rust Buf to libdeno deno_buf.
+impl From<Buf> for libdeno::deno_buf {
+ fn from(x: Buf) -> libdeno::deno_buf {
+ let len = x.len();
+ let ptr = Box::into_raw(x);
+ libdeno::deno_buf {
+ alloc_ptr: 0 as *mut u8,
+ alloc_len: 0,
+ data_ptr: ptr as *mut u8,
+ data_len: len,
+ }
+ }
}
-#[test]
-fn test_c_to_rust() {
- let argv = vec![String::from("./deno"), String::from("hello.js")];
- let isolate = Isolate::new(argv);
- let isolate2 = from_c(isolate.ptr);
- assert_eq!(isolate.ptr, isolate2.ptr);
- assert_eq!(
- isolate.dir.root.join("gen"),
- isolate.dir.gen,
- "Sanity check"
- );
+// Dereferences the C pointer into the Rust Isolate object.
+extern "C" fn pre_dispatch(d: *const libdeno::isolate, buf: libdeno::deno_buf) {
+ let bytes = unsafe { std::slice::from_raw_parts(buf.data_ptr, buf.data_len) };
+ let isolate = Isolate::from_c(d);
+ let dispatch = isolate.dispatch;
+ let (is_sync, op) = dispatch(isolate.state.clone(), bytes);
+
+ if is_sync {
+ // Execute op synchronously.
+ let buf = tokio_util::block_on(op).unwrap();
+ if buf.len() != 0 {
+ // Set the synchronous response, the value returned from isolate.send().
+ isolate.set_response(buf);
+ }
+ } else {
+ // Execute op asynchronously.
+ let state = isolate.state.clone();
+
+ // TODO Ideally Tokio would could tell us how many tasks are executing, but
+ // it cannot currently. Therefore we track top-level promises/tasks
+ // manually.
+ isolate.ntasks_increment();
+
+ let task = op
+ .and_then(move |buf| {
+ state.send_to_js(buf);
+ Ok(())
+ }).map_err(|_| ());
+ tokio::spawn(task);
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_c_to_rust() {
+ let argv = vec![String::from("./deno"), String::from("hello.js")];
+ let isolate = Isolate::new(argv, unreachable_dispatch);
+ let isolate2 = Isolate::from_c(isolate.ptr);
+ assert_eq!(isolate.ptr, isolate2.ptr);
+ assert_eq!(
+ isolate.state.dir.root.join("gen"),
+ isolate.state.dir.gen,
+ "Sanity check"
+ );
+ }
+
+ fn unreachable_dispatch(
+ _state: Arc<IsolateState>,
+ _buf: &[u8],
+ ) -> (bool, Box<Op>) {
+ unreachable!();
+ }
+
+ #[test]
+ fn test_dispatch_sync() {
+ let argv = vec![String::from("./deno"), String::from("hello.js")];
+ let mut isolate = Isolate::new(argv, dispatch_sync);
+ tokio_util::init(|| {
+ isolate
+ .execute(
+ "y.js",
+ r#"
+ const m = new Uint8Array([4, 5, 6]);
+ let n = libdeno.send(m);
+ if (!(n.byteLength === 3 &&
+ n[0] === 1 &&
+ n[1] === 2 &&
+ n[2] === 3)) {
+ throw Error("assert error");
+ }
+ "#,
+ ).expect("execute error");
+ isolate.event_loop();
+ });
+ }
+
+ fn dispatch_sync(_state: Arc<IsolateState>, buf: &[u8]) -> (bool, Box<Op>) {
+ assert_eq!(buf[0], 4);
+ assert_eq!(buf[1], 5);
+ assert_eq!(buf[2], 6);
+ // Send back some sync response.
+ let vec: Vec<u8> = vec![1, 2, 3];
+ let buf = vec.into_boxed_slice();
+ let op = Box::new(futures::future::ok(buf));
+ (true, op)
+ }
}