summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/handlers.rs269
-rw-r--r--src/http.rs27
-rw-r--r--src/isolate.rs244
-rw-r--r--src/main.rs27
-rw-r--r--src/tokio_util.rs30
5 files changed, 390 insertions, 207 deletions
diff --git a/src/handlers.rs b/src/handlers.rs
index 623c64110..c914df36d 100644
--- a/src/handlers.rs
+++ b/src/handlers.rs
@@ -1,46 +1,44 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
+
use errors::DenoError;
use errors::DenoResult;
-use flatbuffers::FlatBufferBuilder;
use fs as deno_fs;
+use isolate::Buf;
+use isolate::IsolateState;
+use isolate::Op;
+use msg;
+
+use flatbuffers::FlatBufferBuilder;
use futures;
use futures::sync::oneshot;
use hyper;
use hyper::rt::{Future, Stream};
use hyper::Client;
-use isolate::from_c;
-use libdeno;
-use libdeno::{deno_buf, isolate};
-use msg;
use remove_dir_all::remove_dir_all;
use std;
use std::fs;
#[cfg(any(unix))]
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
+use std::sync::Arc;
use std::time::UNIX_EPOCH;
use std::time::{Duration, Instant};
use tokio::timer::Delay;
-// Buf represents a byte array returned from a "Op".
-// The message might be empty (which will be translated into a null object on
-// the javascript side) or it is a heap allocated opaque sequence of bytes.
-// Usually a flatbuffer message.
-type Buf = Option<Box<[u8]>>;
-
-// JS promises in Deno map onto a specific Future
-// which yields either a DenoError or a byte array.
-type Op = Future<Item = Buf, Error = DenoError>;
-
type OpResult = DenoResult<Buf>;
// TODO Ideally we wouldn't have to box the Op being returned.
// The box is just to make it easier to get a prototype refactor working.
-type Handler = fn(i: *const isolate, base: &msg::Base) -> Box<Op>;
+type Handler = fn(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op>;
-pub extern "C" fn msg_from_js(i: *const isolate, buf: deno_buf) {
- let bytes = unsafe { std::slice::from_raw_parts(buf.data_ptr, buf.data_len) };
+// Hopefully Rust optimizes this away.
+fn empty_buf() -> Buf {
+ Box::new([])
+}
+
+pub fn msg_from_js(state: Arc<IsolateState>, bytes: &[u8]) -> (bool, Box<Op>) {
let base = msg::get_root_as_base(bytes);
+ let is_sync = base.sync();
let msg_type = base.msg_type();
let cmd_id = base.cmd_id();
let handler: Handler = match msg_type {
@@ -68,73 +66,51 @@ pub extern "C" fn msg_from_js(i: *const isolate, buf: deno_buf) {
)),
};
- let future = handler(i, &base);
- let future = future.or_else(move |err| {
- // No matter whether we got an Err or Ok, we want a serialized message to
- // send back. So transform the DenoError into a deno_buf.
- let builder = &mut FlatBufferBuilder::new();
- let errmsg_offset = builder.create_string(&format!("{}", err));
- Ok(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- error: Some(errmsg_offset),
- error_kind: err.kind(),
- ..Default::default()
- },
- ))
- });
-
- let isolate = from_c(i);
- if base.sync() {
- // Execute future synchronously.
- // println!("sync handler {}", msg::enum_name_any(msg_type));
- let maybe_box_u8 = future.wait().unwrap();
- match maybe_box_u8 {
- None => {}
- Some(box_u8) => {
- let buf = deno_buf_from(box_u8);
- // Set the synchronous response, the value returned from isolate.send().
- unsafe { libdeno::deno_set_response(i, buf) }
- }
- }
- } else {
- // Execute future asynchornously.
- let future = future.and_then(move |maybe_box_u8| {
- let buf = match maybe_box_u8 {
- Some(box_u8) => deno_buf_from(box_u8),
- None => {
- // async RPCs that return None still need to
- // send a message back to signal completion.
- let builder = &mut FlatBufferBuilder::new();
- deno_buf_from(
- serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- ..Default::default()
- },
- ).unwrap(),
- )
- }
+ let op: Box<Op> = handler(state.clone(), &base);
+ let boxed_op = Box::new(
+ op.or_else(move |err: DenoError| -> DenoResult<Buf> {
+ debug!("op err {}", err);
+ // No matter whether we got an Err or Ok, we want a serialized message to
+ // send back. So transform the DenoError into a deno_buf.
+ let builder = &mut FlatBufferBuilder::new();
+ let errmsg_offset = builder.create_string(&format!("{}", err));
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ error: Some(errmsg_offset),
+ error_kind: err.kind(),
+ ..Default::default()
+ },
+ ))
+ }).and_then(move |buf: Buf| -> DenoResult<Buf> {
+ // Handle empty responses. For sync responses we just want
+ // to send null. For async we want to send a small message
+ // with the cmd_id.
+ let buf = if is_sync || buf.len() > 0 {
+ buf
+ } else {
+ // async RPCs that return empty still need to
+ // send a message back to signal completion.
+ let builder = &mut FlatBufferBuilder::new();
+ serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ )
};
- // TODO(ry) make this thread safe.
- unsafe { libdeno::deno_send(i, buf) };
- Ok(())
- });
- isolate.rt.spawn(future);
- }
-}
+ Ok(buf)
+ }),
+ );
-fn deno_buf_from(x: Box<[u8]>) -> deno_buf {
- let len = x.len();
- let ptr = Box::into_raw(x);
- deno_buf {
- alloc_ptr: 0 as *mut u8,
- alloc_len: 0,
- data_ptr: ptr as *mut u8,
- data_len: len,
- }
+ debug!(
+ "msg_from_js {} sync {}",
+ msg::enum_name_any(msg_type),
+ base.sync()
+ );
+ return (base.sync(), boxed_op);
}
fn permission_denied() -> DenoError {
@@ -151,16 +127,15 @@ fn not_implemented() -> DenoError {
))
}
-fn handle_exit(_i: *const isolate, base: &msg::Base) -> Box<Op> {
+fn handle_exit(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_exit().unwrap();
std::process::exit(msg.code())
}
-fn handle_start(i: *const isolate, base: &msg::Base) -> Box<Op> {
- let isolate = from_c(i);
+fn handle_start(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let mut builder = FlatBufferBuilder::new();
- let argv = isolate.argv.iter().map(|s| s.as_str()).collect::<Vec<_>>();
+ let argv = state.argv.iter().map(|s| s.as_str()).collect::<Vec<_>>();
let argv_off = builder.create_vector_of_strings(argv.as_slice());
let cwd_path = std::env::current_dir().unwrap();
@@ -172,8 +147,8 @@ fn handle_start(i: *const isolate, base: &msg::Base) -> Box<Op> {
&msg::StartResArgs {
cwd: Some(cwd_off),
argv: Some(argv_off),
- debug_flag: isolate.flags.log_debug,
- recompile_flag: isolate.flags.recompile,
+ debug_flag: state.flags.log_debug,
+ recompile_flag: state.flags.recompile,
..Default::default()
},
);
@@ -200,7 +175,7 @@ fn serialize_response(
let data = builder.finished_data();
// println!("serialize_response {:x?}", data);
let vec = data.to_vec();
- Some(vec.into_boxed_slice())
+ vec.into_boxed_slice()
}
fn ok_future(buf: Buf) -> Box<Op> {
@@ -213,22 +188,17 @@ fn odd_future(err: DenoError) -> Box<Op> {
}
// https://github.com/denoland/isolate/blob/golang/os.go#L100-L154
-fn handle_code_fetch(i: *const isolate, base: &msg::Base) -> Box<Op> {
+fn handle_code_fetch(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_code_fetch().unwrap();
let cmd_id = base.cmd_id();
let module_specifier = msg.module_specifier().unwrap();
let containing_file = msg.containing_file().unwrap();
- let isolate = from_c(i);
- assert_eq!(
- isolate.dir.root.join("gen"),
- isolate.dir.gen,
- "Sanity check"
- );
+ assert_eq!(state.dir.root.join("gen"), state.dir.gen, "Sanity check");
Box::new(futures::future::result(|| -> OpResult {
let builder = &mut FlatBufferBuilder::new();
- let out = isolate.dir.code_fetch(module_specifier, containing_file)?;
+ let out = state.dir.code_fetch(module_specifier, containing_file)?;
let mut msg_args = msg::CodeFetchResArgs {
module_name: Some(builder.create_string(&out.module_name)),
filename: Some(builder.create_string(&out.filename)),
@@ -255,36 +225,34 @@ fn handle_code_fetch(i: *const isolate, base: &msg::Base) -> Box<Op> {
}
// https://github.com/denoland/isolate/blob/golang/os.go#L156-L169
-fn handle_code_cache(i: *const isolate, base: &msg::Base) -> Box<Op> {
+fn handle_code_cache(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_code_cache().unwrap();
let filename = msg.filename().unwrap();
let source_code = msg.source_code().unwrap();
let output_code = msg.output_code().unwrap();
Box::new(futures::future::result(|| -> OpResult {
- let isolate = from_c(i);
- isolate.dir.code_cache(filename, source_code, output_code)?;
- Ok(None)
+ state.dir.code_cache(filename, source_code, output_code)?;
+ Ok(empty_buf())
}()))
}
-fn handle_set_env(i: *const isolate, base: &msg::Base) -> Box<Op> {
+fn handle_set_env(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_set_env().unwrap();
let key = msg.key().unwrap();
let value = msg.value().unwrap();
- let isolate = from_c(i);
- if !isolate.flags.allow_env {
+ if !state.flags.allow_env {
return odd_future(permission_denied());
}
std::env::set_var(key, value);
- ok_future(None)
+ ok_future(empty_buf())
}
-fn handle_env(i: *const isolate, base: &msg::Base) -> Box<Op> {
- let isolate = from_c(i);
+fn handle_env(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let cmd_id = base.cmd_id();
- if !isolate.flags.allow_env {
+
+ if !state.flags.allow_env {
return odd_future(permission_denied());
}
@@ -322,22 +290,23 @@ fn handle_env(i: *const isolate, base: &msg::Base) -> Box<Op> {
))
}
-fn handle_fetch_req(i: *const isolate, base: &msg::Base) -> Box<Op> {
+fn handle_fetch_req(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_fetch_req().unwrap();
let cmd_id = base.cmd_id();
let id = msg.id();
let url = msg.url().unwrap();
- let isolate = from_c(i);
- if !isolate.flags.allow_net {
+ if !state.flags.allow_net {
return odd_future(permission_denied());
}
let url = url.parse::<hyper::Uri>().unwrap();
let client = Client::new();
+ debug!("Before fetch {}", url);
let future = client.get(url).and_then(move |res| {
let status = res.status().as_u16() as i32;
+ debug!("fetch {}", status);
let headers = {
let map = res.headers();
@@ -361,6 +330,7 @@ fn handle_fetch_req(i: *const isolate, base: &msg::Base) -> Box<Op> {
let future = future.map_err(|err| -> DenoError { err.into() }).and_then(
move |(status, body, headers)| {
+ debug!("fetch body ");
let builder = &mut FlatBufferBuilder::new();
// Send the first message without a body. This is just to indicate
// what status code.
@@ -422,7 +392,7 @@ where
(delay_task, cancel_tx)
}
-fn handle_make_temp_dir(i: *const isolate, base: &msg::Base) -> Box<Op> {
+fn handle_make_temp_dir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let base = Box::new(*base);
let msg = base.msg_as_make_temp_dir().unwrap();
let cmd_id = base.cmd_id();
@@ -430,8 +400,7 @@ fn handle_make_temp_dir(i: *const isolate, base: &msg::Base) -> Box<Op> {
let prefix = msg.prefix();
let suffix = msg.suffix();
- let isolate = from_c(i);
- if !isolate.flags.allow_write {
+ if !state.flags.allow_write {
return odd_future(permission_denied());
}
// TODO Use blocking() here.
@@ -461,28 +430,28 @@ fn handle_make_temp_dir(i: *const isolate, base: &msg::Base) -> Box<Op> {
}()))
}
-fn handle_mkdir(i: *const isolate, base: &msg::Base) -> Box<Op> {
+fn handle_mkdir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_mkdir().unwrap();
let mode = msg.mode();
let path = msg.path().unwrap();
- let isolate = from_c(i);
- if !isolate.flags.allow_write {
+
+ if !state.flags.allow_write {
return odd_future(permission_denied());
}
// TODO Use tokio_threadpool.
Box::new(futures::future::result(|| -> OpResult {
debug!("handle_mkdir {}", path);
deno_fs::mkdir(Path::new(path), mode)?;
- Ok(None)
+ Ok(empty_buf())
}()))
}
-fn handle_remove(i: *const isolate, base: &msg::Base) -> Box<Op> {
+fn handle_remove(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_remove().unwrap();
let path = msg.path().unwrap();
let recursive = msg.recursive();
- let isolate = from_c(i);
- if !isolate.flags.allow_write {
+
+ if !state.flags.allow_write {
return odd_future(permission_denied());
}
// TODO Use tokio_threadpool.
@@ -499,12 +468,12 @@ fn handle_remove(i: *const isolate, base: &msg::Base) -> Box<Op> {
fs::remove_dir(&path_)?;
}
}
- Ok(None)
+ Ok(empty_buf())
}()))
}
// Prototype https://github.com/denoland/isolate/blob/golang/os.go#L171-L184
-fn handle_read_file(_i: *const isolate, base: &msg::Base) -> Box<Op> {
+fn handle_read_file(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_read_file().unwrap();
let cmd_id = base.cmd_id();
let filename = String::from(msg.filename().unwrap());
@@ -554,7 +523,7 @@ fn get_mode(_perm: fs::Permissions) -> u32 {
0
}
-fn handle_stat(_i: *const isolate, base: &msg::Base) -> Box<Op> {
+fn handle_stat(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_stat().unwrap();
let cmd_id = base.cmd_id();
let filename = String::from(msg.filename().unwrap());
@@ -597,48 +566,49 @@ fn handle_stat(_i: *const isolate, base: &msg::Base) -> Box<Op> {
}()))
}
-fn handle_write_file(i: *const isolate, base: &msg::Base) -> Box<Op> {
+fn handle_write_file(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_write_file().unwrap();
let filename = String::from(msg.filename().unwrap());
let data = msg.data().unwrap();
let perm = msg.perm();
- let isolate = from_c(i);
- if !isolate.flags.allow_write {
+ if !state.flags.allow_write {
return odd_future(permission_denied());
}
+
Box::new(futures::future::result(|| -> OpResult {
debug!("handle_write_file {}", filename);
deno_fs::write_file(Path::new(&filename), data, perm)?;
- Ok(None)
+ Ok(empty_buf())
}()))
}
-fn remove_timer(i: *const isolate, timer_id: u32) {
- let isolate = from_c(i);
- isolate.timers.remove(&timer_id);
+fn remove_timer(state: Arc<IsolateState>, timer_id: u32) {
+ let mut timers = state.timers.lock().unwrap();
+ timers.remove(&timer_id);
}
// Prototype: https://github.com/ry/isolate/blob/golang/timers.go#L25-L39
-fn handle_timer_start(i: *const isolate, base: &msg::Base) -> Box<Op> {
+fn handle_timer_start(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
debug!("handle_timer_start");
let msg = base.msg_as_timer_start().unwrap();
let cmd_id = base.cmd_id();
let timer_id = msg.id();
let delay = msg.delay();
- let isolate = from_c(i);
+ let config2 = state.clone();
let future = {
let (delay_task, cancel_delay) = set_timeout(
move || {
- remove_timer(i, timer_id);
+ remove_timer(config2, timer_id);
},
delay,
);
- isolate.timers.insert(timer_id, cancel_delay);
+ let mut timers = state.timers.lock().unwrap();
+ timers.insert(timer_id, cancel_delay);
delay_task
};
- Box::new(future.then(move |result| {
+ let r = Box::new(future.then(move |result| {
let builder = &mut FlatBufferBuilder::new();
let msg = msg::TimerReady::create(
builder,
@@ -657,20 +627,20 @@ fn handle_timer_start(i: *const isolate, base: &msg::Base) -> Box<Op> {
..Default::default()
},
))
- }))
+ }));
+ r
}
// Prototype: https://github.com/ry/isolate/blob/golang/timers.go#L40-L43
-fn handle_timer_clear(i: *const isolate, base: &msg::Base) -> Box<Op> {
+fn handle_timer_clear(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_timer_clear().unwrap();
debug!("handle_timer_clear");
- remove_timer(i, msg.id());
- ok_future(None)
+ remove_timer(state, msg.id());
+ ok_future(empty_buf())
}
-fn handle_rename(i: *const isolate, base: &msg::Base) -> Box<Op> {
- let isolate = from_c(i);
- if !isolate.flags.allow_write {
+fn handle_rename(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+ if !state.flags.allow_write {
return odd_future(permission_denied());
}
let msg = base.msg_as_rename().unwrap();
@@ -679,13 +649,12 @@ fn handle_rename(i: *const isolate, base: &msg::Base) -> Box<Op> {
Box::new(futures::future::result(|| -> OpResult {
debug!("handle_rename {} {}", oldpath, newpath);
fs::rename(Path::new(&oldpath), Path::new(&newpath))?;
- Ok(None)
+ Ok(empty_buf())
}()))
}
-fn handle_symlink(i: *const isolate, base: &msg::Base) -> Box<Op> {
- let deno = from_c(i);
- if !deno.flags.allow_write {
+fn handle_symlink(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+ if !state.flags.allow_write {
return odd_future(permission_denied());
}
// TODO Use type for Windows.
@@ -699,12 +668,12 @@ fn handle_symlink(i: *const isolate, base: &msg::Base) -> Box<Op> {
debug!("handle_symlink {} {}", oldname, newname);
#[cfg(any(unix))]
std::os::unix::fs::symlink(Path::new(&oldname), Path::new(&newname))?;
- Ok(None)
+ Ok(empty_buf())
}()))
}
}
-fn handle_read_link(_i: *const isolate, base: &msg::Base) -> Box<Op> {
+fn handle_read_link(_state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_readlink().unwrap();
let cmd_id = base.cmd_id();
let name = String::from(msg.name().unwrap());
diff --git a/src/http.rs b/src/http.rs
index 5907b35ed..3b5ede10e 100644
--- a/src/http.rs
+++ b/src/http.rs
@@ -1,10 +1,12 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
use errors::DenoResult;
+use tokio_util;
use futures::Future;
use futures::Stream;
use hyper;
+use hyper::client::Client;
use hyper::client::HttpConnector;
use hyper::Uri;
use hyper_rustls;
@@ -29,21 +31,24 @@ pub fn get_client() -> Client<Connector, hyper::Body> {
pub fn fetch_sync_string(module_name: &str) -> DenoResult<String> {
let url = module_name.parse::<Uri>().unwrap();
let client = get_client();
-
- // TODO Use Deno's RT
- let mut rt = Runtime::new().unwrap();
- let body = rt.block_on(
- client
- .get(url)
- .and_then(|response| response.into_body().concat2()),
- )?;
+ let future = client
+ .get(url)
+ .and_then(|response| response.into_body().concat2());
+ let body = tokio_util::block_on(future)?;
Ok(String::from_utf8(body.to_vec()).unwrap())
}
#[test]
fn test_fetch_sync_string() {
// Relies on external http server. See tools/http_server.py
- let p = fetch_sync_string("http://localhost:4545/package.json").unwrap();
- println!("package.json len {}", p.len());
- assert!(p.len() > 1);
+ use futures;
+
+ tokio_util::init(|| {
+ tokio_util::block_on(futures::future::lazy(|| -> DenoResult<()> {
+ let p = fetch_sync_string("http://127.0.0.1:4545/package.json")?;
+ println!("package.json len {}", p.len());
+ assert!(p.len() > 1);
+ Ok(())
+ })).unwrap();
+ });
}
diff --git a/src/isolate.rs b/src/isolate.rs
index 5daf45701..64bec5ddf 100644
--- a/src/isolate.rs
+++ b/src/isolate.rs
@@ -1,54 +1,113 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
+
+// Do not use FlatBuffers in this module.
+// TODO Currently this module uses Tokio, but it would be nice if they were
+// decoupled.
+
use deno_dir;
+use errors::DenoError;
use flags;
+use libdeno;
+
use futures;
-use handlers;
+use futures::Future;
use libc::c_void;
-use libdeno;
use std;
use std::collections::HashMap;
use std::ffi::CStr;
use std::ffi::CString;
+use std::sync::mpsc;
+use std::sync::Arc;
+use std::sync::Mutex;
use tokio;
+use tokio_util;
type DenoException<'a> = &'a str;
+// Buf represents a byte array returned from a "Op".
+// The message might be empty (which will be translated into a null object on
+// the javascript side) or it is a heap allocated opaque sequence of bytes.
+// Usually a flatbuffer message.
+pub type Buf = Box<[u8]>;
+
+// JS promises in Deno map onto a specific Future
+// which yields either a DenoError or a byte array.
+pub type Op = Future<Item = Buf, Error = DenoError> + Send;
+
+// Returns (is_sync, op)
+pub type Dispatch = fn(state: Arc<IsolateState>, buf: &[u8]) -> (bool, Box<Op>);
+
pub struct Isolate {
- pub ptr: *const libdeno::isolate,
+ ptr: *const libdeno::isolate,
+ dispatch: Dispatch,
+ rx: mpsc::Receiver<Buf>,
+ ntasks: i32,
+ pub state: Arc<IsolateState>,
+}
+
+// Isolate cannot be passed between threads but IsolateState can. So any state that
+// needs to be accessed outside the main V8 thread should be inside IsolateState.
+pub struct IsolateState {
pub dir: deno_dir::DenoDir,
- pub rt: tokio::runtime::current_thread::Runtime,
- pub timers: HashMap<u32, futures::sync::oneshot::Sender<()>>,
+ pub timers: Mutex<HashMap<u32, futures::sync::oneshot::Sender<()>>>,
pub argv: Vec<String>,
pub flags: flags::DenoFlags,
+ tx: Mutex<Option<mpsc::Sender<Buf>>>,
+}
+
+impl IsolateState {
+ // Thread safe.
+ fn send_to_js(&self, buf: Buf) {
+ let mut g = self.tx.lock().unwrap();
+ let maybe_tx = g.as_mut();
+ assert!(maybe_tx.is_some(), "Expected tx to not be deleted.");
+ let tx = maybe_tx.unwrap();
+ tx.send(buf).expect("tx.send error");
+ }
}
static DENO_INIT: std::sync::Once = std::sync::ONCE_INIT;
impl Isolate {
- pub fn new(argv: Vec<String>) -> Box<Isolate> {
+ pub fn new(argv: Vec<String>, dispatch: Dispatch) -> Box<Isolate> {
DENO_INIT.call_once(|| {
unsafe { libdeno::deno_init() };
});
let (flags, argv_rest) = flags::set_flags(argv);
- let mut deno_box = Box::new(Isolate {
+ // This channel handles sending async messages back to the runtime.
+ let (tx, rx) = mpsc::channel::<Buf>();
+
+ let mut isolate = Box::new(Isolate {
ptr: 0 as *const libdeno::isolate,
- dir: deno_dir::DenoDir::new(flags.reload, None).unwrap(),
- rt: tokio::runtime::current_thread::Runtime::new().unwrap(),
- timers: HashMap::new(),
- argv: argv_rest,
- flags,
+ dispatch,
+ rx,
+ ntasks: 0,
+ state: Arc::new(IsolateState {
+ dir: deno_dir::DenoDir::new(flags.reload, None).unwrap(),
+ timers: Mutex::new(HashMap::new()),
+ argv: argv_rest,
+ flags,
+ tx: Mutex::new(Some(tx)),
+ }),
});
- (*deno_box).ptr = unsafe {
+ (*isolate).ptr = unsafe {
libdeno::deno_new(
- deno_box.as_ref() as *const _ as *const c_void,
- handlers::msg_from_js,
+ isolate.as_ref() as *const _ as *const c_void,
+ pre_dispatch,
)
};
- deno_box
+ isolate
+ }
+
+ pub fn from_c<'a>(d: *const libdeno::isolate) -> &'a mut Isolate {
+ let ptr = unsafe { libdeno::deno_get_data(d) };
+ let ptr = ptr as *mut Isolate;
+ let isolate_box = unsafe { Box::from_raw(ptr) };
+ Box::leak(isolate_box)
}
pub fn execute(
@@ -68,6 +127,42 @@ impl Isolate {
}
Ok(())
}
+
+ pub fn set_response(&self, buf: Buf) {
+ unsafe { libdeno::deno_set_response(self.ptr, buf.into()) }
+ }
+
+ pub fn send(&self, buf: Buf) {
+ unsafe { libdeno::deno_send(self.ptr, buf.into()) };
+ }
+
+ // TODO Use Park abstraction? Note at time of writing Tokio default runtime
+ // does not have new_with_park().
+ pub fn event_loop(&mut self) {
+ // Main thread event loop.
+ while !self.is_idle() {
+ let buf = self.rx.recv().unwrap();
+ // Receiving a message on rx exactly corresponds to an async task
+ // completing.
+ self.ntasks_decrement();
+ // Call into JS with the buf.
+ self.send(buf);
+ }
+ }
+
+ fn ntasks_increment(&mut self) {
+ assert!(self.ntasks >= 0);
+ self.ntasks = self.ntasks + 1;
+ }
+
+ fn ntasks_decrement(&mut self) {
+ self.ntasks = self.ntasks - 1;
+ assert!(self.ntasks >= 0);
+ }
+
+ fn is_idle(&self) -> bool {
+ self.ntasks == 0
+ }
}
impl Drop for Isolate {
@@ -76,22 +171,107 @@ impl Drop for Isolate {
}
}
-pub fn from_c<'a>(i: *const libdeno::isolate) -> &'a mut Isolate {
- let ptr = unsafe { libdeno::deno_get_data(i) };
- let ptr = ptr as *mut Isolate;
- let isolate_box = unsafe { Box::from_raw(ptr) };
- Box::leak(isolate_box)
+/// Converts Rust Buf to libdeno deno_buf.
+impl From<Buf> for libdeno::deno_buf {
+ fn from(x: Buf) -> libdeno::deno_buf {
+ let len = x.len();
+ let ptr = Box::into_raw(x);
+ libdeno::deno_buf {
+ alloc_ptr: 0 as *mut u8,
+ alloc_len: 0,
+ data_ptr: ptr as *mut u8,
+ data_len: len,
+ }
+ }
}
-#[test]
-fn test_c_to_rust() {
- let argv = vec![String::from("./deno"), String::from("hello.js")];
- let isolate = Isolate::new(argv);
- let isolate2 = from_c(isolate.ptr);
- assert_eq!(isolate.ptr, isolate2.ptr);
- assert_eq!(
- isolate.dir.root.join("gen"),
- isolate.dir.gen,
- "Sanity check"
- );
+// Dereferences the C pointer into the Rust Isolate object.
+extern "C" fn pre_dispatch(d: *const libdeno::isolate, buf: libdeno::deno_buf) {
+ let bytes = unsafe { std::slice::from_raw_parts(buf.data_ptr, buf.data_len) };
+ let isolate = Isolate::from_c(d);
+ let dispatch = isolate.dispatch;
+ let (is_sync, op) = dispatch(isolate.state.clone(), bytes);
+
+ if is_sync {
+ // Execute op synchronously.
+ let buf = tokio_util::block_on(op).unwrap();
+ if buf.len() != 0 {
+ // Set the synchronous response, the value returned from isolate.send().
+ isolate.set_response(buf);
+ }
+ } else {
+ // Execute op asynchronously.
+ let state = isolate.state.clone();
+
+ // TODO Ideally Tokio would could tell us how many tasks are executing, but
+ // it cannot currently. Therefore we track top-level promises/tasks
+ // manually.
+ isolate.ntasks_increment();
+
+ let task = op
+ .and_then(move |buf| {
+ state.send_to_js(buf);
+ Ok(())
+ }).map_err(|_| ());
+ tokio::spawn(task);
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_c_to_rust() {
+ let argv = vec![String::from("./deno"), String::from("hello.js")];
+ let isolate = Isolate::new(argv, unreachable_dispatch);
+ let isolate2 = Isolate::from_c(isolate.ptr);
+ assert_eq!(isolate.ptr, isolate2.ptr);
+ assert_eq!(
+ isolate.state.dir.root.join("gen"),
+ isolate.state.dir.gen,
+ "Sanity check"
+ );
+ }
+
+ fn unreachable_dispatch(
+ _state: Arc<IsolateState>,
+ _buf: &[u8],
+ ) -> (bool, Box<Op>) {
+ unreachable!();
+ }
+
+ #[test]
+ fn test_dispatch_sync() {
+ let argv = vec![String::from("./deno"), String::from("hello.js")];
+ let mut isolate = Isolate::new(argv, dispatch_sync);
+ tokio_util::init(|| {
+ isolate
+ .execute(
+ "y.js",
+ r#"
+ const m = new Uint8Array([4, 5, 6]);
+ let n = libdeno.send(m);
+ if (!(n.byteLength === 3 &&
+ n[0] === 1 &&
+ n[1] === 2 &&
+ n[2] === 3)) {
+ throw Error("assert error");
+ }
+ "#,
+ ).expect("execute error");
+ isolate.event_loop();
+ });
+ }
+
+ fn dispatch_sync(_state: Arc<IsolateState>, buf: &[u8]) -> (bool, Box<Op>) {
+ assert_eq!(buf[0], 4);
+ assert_eq!(buf[1], 5);
+ assert_eq!(buf[2], 6);
+ // Send back some sync response.
+ let vec: Vec<u8> = vec![1, 2, 3];
+ let buf = vec.into_boxed_slice();
+ let op = Box::new(futures::future::ok(buf));
+ (true, op)
+ }
}
diff --git a/src/main.rs b/src/main.rs
index 1f9ead019..b60f80513 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -7,6 +7,7 @@ extern crate msg_rs as msg;
extern crate rand;
extern crate tempfile;
extern crate tokio;
+extern crate tokio_executor;
extern crate url;
#[macro_use]
extern crate lazy_static;
@@ -25,9 +26,9 @@ pub mod handlers;
mod http;
mod isolate;
mod libdeno;
+mod tokio_util;
mod version;
-use isolate::Isolate;
use std::env;
static LOGGER: Logger = Logger;
@@ -49,18 +50,16 @@ impl log::Log for Logger {
fn main() {
log::set_logger(&LOGGER).unwrap();
-
let args = env::args().collect();
- let mut isolate = Isolate::new(args);
- flags::process(&isolate.flags);
-
- isolate
- .execute("deno_main.js", "denoMain();")
- .unwrap_or_else(|err| {
- error!("{}", err);
- std::process::exit(1);
- });
-
- // Start the Tokio event loop
- isolate.rt.run().expect("err");
+ let mut isolate = isolate::Isolate::new(args, handlers::msg_from_js);
+ flags::process(&isolate.state.flags);
+ tokio_util::init(|| {
+ isolate
+ .execute("deno_main.js", "denoMain();")
+ .unwrap_or_else(|err| {
+ error!("{}", err);
+ std::process::exit(1);
+ });
+ isolate.event_loop();
+ });
}
diff --git a/src/tokio_util.rs b/src/tokio_util.rs
new file mode 100644
index 000000000..de81620ef
--- /dev/null
+++ b/src/tokio_util.rs
@@ -0,0 +1,30 @@
+// Copyright 2018 the Deno authors. All rights reserved. MIT license.
+
+use futures;
+use futures::Future;
+use tokio;
+use tokio_executor;
+
+pub fn block_on<F, R, E>(future: F) -> Result<R, E>
+where
+ F: Send + 'static + Future<Item = R, Error = E>,
+ R: Send + 'static,
+ E: Send + 'static,
+{
+ let (tx, rx) = futures::sync::oneshot::channel();
+ tokio::spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!())));
+ rx.wait().unwrap()
+}
+
+// Set the default executor so we can use tokio::spawn(). It's difficult to
+// pass around mut references to the runtime, so using with_default is
+// preferable. Ideally Tokio would provide this function.
+pub fn init<F>(f: F)
+where
+ F: FnOnce(),
+{
+ let rt = tokio::runtime::Runtime::new().unwrap();
+ let mut executor = rt.executor();
+ let mut enter = tokio_executor::enter().expect("Multiple executors at once");
+ tokio_executor::with_default(&mut executor, &mut enter, move |_enter| f());
+}