summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/handlers.rs84
-rw-r--r--src/isolate.rs67
-rw-r--r--src/msg.fbs2
3 files changed, 101 insertions, 52 deletions
diff --git a/src/handlers.rs b/src/handlers.rs
index c27543d33..bb2d79e69 100644
--- a/src/handlers.rs
+++ b/src/handlers.rs
@@ -5,6 +5,7 @@ use errors::DenoError;
use errors::DenoResult;
use fs as deno_fs;
use isolate::Buf;
+use isolate::Isolate;
use isolate::IsolateState;
use isolate::Op;
use msg;
@@ -47,7 +48,7 @@ fn empty_buf() -> Buf {
}
pub fn msg_from_js(
- state: Arc<IsolateState>,
+ isolate: &mut Isolate,
control: &[u8],
data: &'static mut [u8],
) -> (bool, Box<Op>) {
@@ -55,38 +56,47 @@ pub fn msg_from_js(
let is_sync = base.sync();
let msg_type = base.msg_type();
let cmd_id = base.cmd_id();
- let handler: Handler = match msg_type {
- msg::Any::Start => handle_start,
- msg::Any::CodeFetch => handle_code_fetch,
- msg::Any::CodeCache => handle_code_cache,
- msg::Any::SetTimeout => handle_set_timeout,
- msg::Any::Environ => handle_env,
- msg::Any::FetchReq => handle_fetch_req,
- msg::Any::TimerStart => handle_timer_start,
- msg::Any::TimerClear => handle_timer_clear,
- msg::Any::MakeTempDir => handle_make_temp_dir,
- msg::Any::Mkdir => handle_mkdir,
- msg::Any::Open => handle_open,
- msg::Any::Read => handle_read,
- msg::Any::Write => handle_write,
- msg::Any::Remove => handle_remove,
- msg::Any::ReadFile => handle_read_file,
- msg::Any::Rename => handle_rename,
- msg::Any::Readlink => handle_read_link,
- msg::Any::Symlink => handle_symlink,
- msg::Any::SetEnv => handle_set_env,
- msg::Any::Stat => handle_stat,
- msg::Any::Truncate => handle_truncate,
- msg::Any::WriteFile => handle_write_file,
- msg::Any::Exit => handle_exit,
- msg::Any::CopyFile => handle_copy_file,
- _ => panic!(format!(
- "Unhandled message {}",
- msg::enum_name_any(msg_type)
- )),
+
+ let op: Box<Op> = if msg_type == msg::Any::SetTimeout {
+ // SetTimeout is an exceptional op: the global timeout field is part of the
+ // Isolate state (not the IsolateState state) and it must be updated on the
+ // main thread.
+ assert_eq!(is_sync, true);
+ handle_set_timeout(isolate, &base, data)
+ } else {
+ // Handle regular ops.
+ let handler: Handler = match msg_type {
+ msg::Any::Start => handle_start,
+ msg::Any::CodeFetch => handle_code_fetch,
+ msg::Any::CodeCache => handle_code_cache,
+ msg::Any::Environ => handle_env,
+ msg::Any::FetchReq => handle_fetch_req,
+ msg::Any::TimerStart => handle_timer_start,
+ msg::Any::TimerClear => handle_timer_clear,
+ msg::Any::MakeTempDir => handle_make_temp_dir,
+ msg::Any::Mkdir => handle_mkdir,
+ msg::Any::Open => handle_open,
+ msg::Any::Read => handle_read,
+ msg::Any::Write => handle_write,
+ msg::Any::Remove => handle_remove,
+ msg::Any::ReadFile => handle_read_file,
+ msg::Any::Rename => handle_rename,
+ msg::Any::Readlink => handle_read_link,
+ msg::Any::Symlink => handle_symlink,
+ msg::Any::SetEnv => handle_set_env,
+ msg::Any::Stat => handle_stat,
+ msg::Any::Truncate => handle_truncate,
+ msg::Any::WriteFile => handle_write_file,
+ msg::Any::Exit => handle_exit,
+ msg::Any::CopyFile => handle_copy_file,
+ _ => panic!(format!(
+ "Unhandled message {}",
+ msg::enum_name_any(msg_type)
+ )),
+ };
+ handler(isolate.state.clone(), &base, data)
};
- let op: Box<Op> = handler(state.clone(), &base, data);
let boxed_op = Box::new(
op.or_else(move |err: DenoError| -> DenoResult<Buf> {
debug!("op err {}", err);
@@ -274,16 +284,18 @@ fn handle_code_cache(
}
fn handle_set_timeout(
- state: Arc<IsolateState>,
+ isolate: &mut Isolate,
base: &msg::Base,
data: &'static mut [u8],
) -> Box<Op> {
assert_eq!(data.len(), 0);
let msg = base.msg_as_set_timeout().unwrap();
- let val = msg.timeout() as isize;
- state
- .timeout
- .swap(val, std::sync::atomic::Ordering::Relaxed);
+ let val = msg.timeout() as i64;
+ isolate.timeout_due = if val >= 0 {
+ Some(Instant::now() + Duration::from_millis(val as u64))
+ } else {
+ None
+ };
ok_future(empty_buf())
}
diff --git a/src/isolate.rs b/src/isolate.rs
index 8dfd4204a..bdd1ca956 100644
--- a/src/isolate.rs
+++ b/src/isolate.rs
@@ -16,10 +16,11 @@ use std;
use std::collections::HashMap;
use std::ffi::CStr;
use std::ffi::CString;
-use std::sync::atomic::AtomicIsize;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
+use std::time::Duration;
+use std::time::Instant;
use tokio;
use tokio_util;
@@ -37,7 +38,7 @@ pub type Op = Future<Item = Buf, Error = DenoError> + Send;
// Returns (is_sync, op)
pub type Dispatch =
- fn(state: Arc<IsolateState>, buf: &[u8], data_buf: &'static mut [u8])
+ fn(isolate: &mut Isolate, buf: &[u8], data_buf: &'static mut [u8])
-> (bool, Box<Op>);
pub struct Isolate {
@@ -45,13 +46,13 @@ pub struct Isolate {
dispatch: Dispatch,
rx: mpsc::Receiver<(i32, Buf)>,
ntasks: i32,
+ pub timeout_due: Option<Instant>,
pub state: Arc<IsolateState>,
}
// Isolate cannot be passed between threads but IsolateState can. So any state that
// needs to be accessed outside the main V8 thread should be inside IsolateState.
pub struct IsolateState {
- pub timeout: AtomicIsize,
pub dir: deno_dir::DenoDir,
pub timers: Mutex<HashMap<u32, futures::sync::oneshot::Sender<()>>>,
pub argv: Vec<String>,
@@ -88,8 +89,8 @@ impl Isolate {
dispatch,
rx,
ntasks: 0,
+ timeout_due: None,
state: Arc::new(IsolateState {
- timeout: AtomicIsize::new(-1),
dir: deno_dir::DenoDir::new(flags.reload, None).unwrap(),
timers: Mutex::new(HashMap::new()),
argv: argv_rest,
@@ -139,17 +140,54 @@ impl Isolate {
unsafe { libdeno::deno_respond(self.ptr, req_id, buf.into()) }
}
+ fn complete_op(&mut self, req_id: i32, buf: Buf) {
+ // Receiving a message on rx exactly corresponds to an async task
+ // completing.
+ self.ntasks_decrement();
+ // Call into JS with the buf.
+ self.respond(req_id, buf);
+ }
+
+ fn timeout(&self) {
+ let dummy_buf = libdeno::deno_buf {
+ alloc_ptr: 0 as *mut u8,
+ alloc_len: 0,
+ data_ptr: 0 as *mut u8,
+ data_len: 0,
+ };
+ unsafe { libdeno::deno_respond(self.ptr, -1, dummy_buf) }
+ }
+
// TODO Use Park abstraction? Note at time of writing Tokio default runtime
// does not have new_with_park().
pub fn event_loop(&mut self) {
// Main thread event loop.
while !self.is_idle() {
- let (req_id, buf) = self.rx.recv().unwrap();
- // Receiving a message on rx exactly corresponds to an async task
- // completing.
- self.ntasks_decrement();
- // Call into JS with the buf.
- self.respond(req_id, buf);
+ // Ideally, mpsc::Receiver would have a receive method that takes a optional
+ // timeout. But it doesn't so we need all this duplicate code.
+ match self.timeout_due {
+ Some(due) => {
+ // Subtracting two Instants causes a panic if the resulting duration
+ // would become negative. Avoid this.
+ let now = Instant::now();
+ let timeout = if due > now {
+ due - now
+ } else {
+ Duration::new(0, 0)
+ };
+ // TODO: use recv_deadline() instead of recv_timeout() when this
+ // feature becomes stable/available.
+ match self.rx.recv_timeout(timeout) {
+ Ok((req_id, buf)) => self.complete_op(req_id, buf),
+ Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(),
+ Err(e) => panic!("mpsc::Receiver::recv_timeout() failed: {:?}", e),
+ }
+ }
+ None => match self.rx.recv() {
+ Ok((req_id, buf)) => self.complete_op(req_id, buf),
+ Err(e) => panic!("mpsc::Receiver::recv() failed: {:?}", e),
+ },
+ };
}
}
@@ -164,7 +202,7 @@ impl Isolate {
}
fn is_idle(&self) -> bool {
- self.ntasks == 0
+ self.ntasks == 0 && self.timeout_due.is_none()
}
}
@@ -212,8 +250,7 @@ extern "C" fn pre_dispatch(
let isolate = Isolate::from_c(d);
let dispatch = isolate.dispatch;
- let (is_sync, op) =
- dispatch(isolate.state.clone(), control_slice, data_slice);
+ let (is_sync, op) = dispatch(isolate, control_slice, data_slice);
if is_sync {
// Execute op synchronously.
@@ -258,7 +295,7 @@ mod tests {
}
fn unreachable_dispatch(
- _state: Arc<IsolateState>,
+ _isolate: &mut Isolate,
_control: &[u8],
_data: &'static mut [u8],
) -> (bool, Box<Op>) {
@@ -289,7 +326,7 @@ mod tests {
}
fn dispatch_sync(
- _state: Arc<IsolateState>,
+ _isolate: &mut Isolate,
control: &[u8],
data: &'static mut [u8],
) -> (bool, Box<Op>) {
diff --git a/src/msg.fbs b/src/msg.fbs
index b9989c330..68de166ce 100644
--- a/src/msg.fbs
+++ b/src/msg.fbs
@@ -128,7 +128,7 @@ table CodeCache {
}
table SetTimeout {
- timeout: int;
+ timeout: double;
}
table Exit {