diff options
Diffstat (limited to 'src/isolate.rs')
-rw-r--r-- | src/isolate.rs | 67 |
1 files changed, 52 insertions, 15 deletions
diff --git a/src/isolate.rs b/src/isolate.rs index 8dfd4204a..bdd1ca956 100644 --- a/src/isolate.rs +++ b/src/isolate.rs @@ -16,10 +16,11 @@ use std; use std::collections::HashMap; use std::ffi::CStr; use std::ffi::CString; -use std::sync::atomic::AtomicIsize; use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; +use std::time::Duration; +use std::time::Instant; use tokio; use tokio_util; @@ -37,7 +38,7 @@ pub type Op = Future<Item = Buf, Error = DenoError> + Send; // Returns (is_sync, op) pub type Dispatch = - fn(state: Arc<IsolateState>, buf: &[u8], data_buf: &'static mut [u8]) + fn(isolate: &mut Isolate, buf: &[u8], data_buf: &'static mut [u8]) -> (bool, Box<Op>); pub struct Isolate { @@ -45,13 +46,13 @@ pub struct Isolate { dispatch: Dispatch, rx: mpsc::Receiver<(i32, Buf)>, ntasks: i32, + pub timeout_due: Option<Instant>, pub state: Arc<IsolateState>, } // Isolate cannot be passed between threads but IsolateState can. So any state that // needs to be accessed outside the main V8 thread should be inside IsolateState. pub struct IsolateState { - pub timeout: AtomicIsize, pub dir: deno_dir::DenoDir, pub timers: Mutex<HashMap<u32, futures::sync::oneshot::Sender<()>>>, pub argv: Vec<String>, @@ -88,8 +89,8 @@ impl Isolate { dispatch, rx, ntasks: 0, + timeout_due: None, state: Arc::new(IsolateState { - timeout: AtomicIsize::new(-1), dir: deno_dir::DenoDir::new(flags.reload, None).unwrap(), timers: Mutex::new(HashMap::new()), argv: argv_rest, @@ -139,17 +140,54 @@ impl Isolate { unsafe { libdeno::deno_respond(self.ptr, req_id, buf.into()) } } + fn complete_op(&mut self, req_id: i32, buf: Buf) { + // Receiving a message on rx exactly corresponds to an async task + // completing. + self.ntasks_decrement(); + // Call into JS with the buf. + self.respond(req_id, buf); + } + + fn timeout(&self) { + let dummy_buf = libdeno::deno_buf { + alloc_ptr: 0 as *mut u8, + alloc_len: 0, + data_ptr: 0 as *mut u8, + data_len: 0, + }; + unsafe { libdeno::deno_respond(self.ptr, -1, dummy_buf) } + } + // TODO Use Park abstraction? Note at time of writing Tokio default runtime // does not have new_with_park(). pub fn event_loop(&mut self) { // Main thread event loop. while !self.is_idle() { - let (req_id, buf) = self.rx.recv().unwrap(); - // Receiving a message on rx exactly corresponds to an async task - // completing. - self.ntasks_decrement(); - // Call into JS with the buf. - self.respond(req_id, buf); + // Ideally, mpsc::Receiver would have a receive method that takes a optional + // timeout. But it doesn't so we need all this duplicate code. + match self.timeout_due { + Some(due) => { + // Subtracting two Instants causes a panic if the resulting duration + // would become negative. Avoid this. + let now = Instant::now(); + let timeout = if due > now { + due - now + } else { + Duration::new(0, 0) + }; + // TODO: use recv_deadline() instead of recv_timeout() when this + // feature becomes stable/available. + match self.rx.recv_timeout(timeout) { + Ok((req_id, buf)) => self.complete_op(req_id, buf), + Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(), + Err(e) => panic!("mpsc::Receiver::recv_timeout() failed: {:?}", e), + } + } + None => match self.rx.recv() { + Ok((req_id, buf)) => self.complete_op(req_id, buf), + Err(e) => panic!("mpsc::Receiver::recv() failed: {:?}", e), + }, + }; } } @@ -164,7 +202,7 @@ impl Isolate { } fn is_idle(&self) -> bool { - self.ntasks == 0 + self.ntasks == 0 && self.timeout_due.is_none() } } @@ -212,8 +250,7 @@ extern "C" fn pre_dispatch( let isolate = Isolate::from_c(d); let dispatch = isolate.dispatch; - let (is_sync, op) = - dispatch(isolate.state.clone(), control_slice, data_slice); + let (is_sync, op) = dispatch(isolate, control_slice, data_slice); if is_sync { // Execute op synchronously. @@ -258,7 +295,7 @@ mod tests { } fn unreachable_dispatch( - _state: Arc<IsolateState>, + _isolate: &mut Isolate, _control: &[u8], _data: &'static mut [u8], ) -> (bool, Box<Op>) { @@ -289,7 +326,7 @@ mod tests { } fn dispatch_sync( - _state: Arc<IsolateState>, + _isolate: &mut Isolate, control: &[u8], data: &'static mut [u8], ) -> (bool, Box<Op>) { |