diff options
author | Bert Belder <bertbelder@gmail.com> | 2018-10-02 17:47:40 -0700 |
---|---|---|
committer | Bert Belder <bertbelder@gmail.com> | 2018-10-03 13:27:55 -0700 |
commit | aa691ea26c5dc8fd15b3b60b95a2c23b4888c45d (patch) | |
tree | f2e168a022e597ee1ceaf8cf50c1f1fbf5efe4cb /src | |
parent | 6b77acf39ddcb68b26e877f8c4a9dc289cd3691e (diff) |
timers: implement timers in javascript
Diffstat (limited to 'src')
-rw-r--r-- | src/handlers.rs | 84 | ||||
-rw-r--r-- | src/isolate.rs | 67 | ||||
-rw-r--r-- | src/msg.fbs | 2 |
3 files changed, 101 insertions, 52 deletions
diff --git a/src/handlers.rs b/src/handlers.rs index c27543d33..bb2d79e69 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -5,6 +5,7 @@ use errors::DenoError; use errors::DenoResult; use fs as deno_fs; use isolate::Buf; +use isolate::Isolate; use isolate::IsolateState; use isolate::Op; use msg; @@ -47,7 +48,7 @@ fn empty_buf() -> Buf { } pub fn msg_from_js( - state: Arc<IsolateState>, + isolate: &mut Isolate, control: &[u8], data: &'static mut [u8], ) -> (bool, Box<Op>) { @@ -55,38 +56,47 @@ pub fn msg_from_js( let is_sync = base.sync(); let msg_type = base.msg_type(); let cmd_id = base.cmd_id(); - let handler: Handler = match msg_type { - msg::Any::Start => handle_start, - msg::Any::CodeFetch => handle_code_fetch, - msg::Any::CodeCache => handle_code_cache, - msg::Any::SetTimeout => handle_set_timeout, - msg::Any::Environ => handle_env, - msg::Any::FetchReq => handle_fetch_req, - msg::Any::TimerStart => handle_timer_start, - msg::Any::TimerClear => handle_timer_clear, - msg::Any::MakeTempDir => handle_make_temp_dir, - msg::Any::Mkdir => handle_mkdir, - msg::Any::Open => handle_open, - msg::Any::Read => handle_read, - msg::Any::Write => handle_write, - msg::Any::Remove => handle_remove, - msg::Any::ReadFile => handle_read_file, - msg::Any::Rename => handle_rename, - msg::Any::Readlink => handle_read_link, - msg::Any::Symlink => handle_symlink, - msg::Any::SetEnv => handle_set_env, - msg::Any::Stat => handle_stat, - msg::Any::Truncate => handle_truncate, - msg::Any::WriteFile => handle_write_file, - msg::Any::Exit => handle_exit, - msg::Any::CopyFile => handle_copy_file, - _ => panic!(format!( - "Unhandled message {}", - msg::enum_name_any(msg_type) - )), + + let op: Box<Op> = if msg_type == msg::Any::SetTimeout { + // SetTimeout is an exceptional op: the global timeout field is part of the + // Isolate state (not the IsolateState state) and it must be updated on the + // main thread. + assert_eq!(is_sync, true); + handle_set_timeout(isolate, &base, data) + } else { + // Handle regular ops. + let handler: Handler = match msg_type { + msg::Any::Start => handle_start, + msg::Any::CodeFetch => handle_code_fetch, + msg::Any::CodeCache => handle_code_cache, + msg::Any::Environ => handle_env, + msg::Any::FetchReq => handle_fetch_req, + msg::Any::TimerStart => handle_timer_start, + msg::Any::TimerClear => handle_timer_clear, + msg::Any::MakeTempDir => handle_make_temp_dir, + msg::Any::Mkdir => handle_mkdir, + msg::Any::Open => handle_open, + msg::Any::Read => handle_read, + msg::Any::Write => handle_write, + msg::Any::Remove => handle_remove, + msg::Any::ReadFile => handle_read_file, + msg::Any::Rename => handle_rename, + msg::Any::Readlink => handle_read_link, + msg::Any::Symlink => handle_symlink, + msg::Any::SetEnv => handle_set_env, + msg::Any::Stat => handle_stat, + msg::Any::Truncate => handle_truncate, + msg::Any::WriteFile => handle_write_file, + msg::Any::Exit => handle_exit, + msg::Any::CopyFile => handle_copy_file, + _ => panic!(format!( + "Unhandled message {}", + msg::enum_name_any(msg_type) + )), + }; + handler(isolate.state.clone(), &base, data) }; - let op: Box<Op> = handler(state.clone(), &base, data); let boxed_op = Box::new( op.or_else(move |err: DenoError| -> DenoResult<Buf> { debug!("op err {}", err); @@ -274,16 +284,18 @@ fn handle_code_cache( } fn handle_set_timeout( - state: Arc<IsolateState>, + isolate: &mut Isolate, base: &msg::Base, data: &'static mut [u8], ) -> Box<Op> { assert_eq!(data.len(), 0); let msg = base.msg_as_set_timeout().unwrap(); - let val = msg.timeout() as isize; - state - .timeout - .swap(val, std::sync::atomic::Ordering::Relaxed); + let val = msg.timeout() as i64; + isolate.timeout_due = if val >= 0 { + Some(Instant::now() + Duration::from_millis(val as u64)) + } else { + None + }; ok_future(empty_buf()) } diff --git a/src/isolate.rs b/src/isolate.rs index 8dfd4204a..bdd1ca956 100644 --- a/src/isolate.rs +++ b/src/isolate.rs @@ -16,10 +16,11 @@ use std; use std::collections::HashMap; use std::ffi::CStr; use std::ffi::CString; -use std::sync::atomic::AtomicIsize; use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; +use std::time::Duration; +use std::time::Instant; use tokio; use tokio_util; @@ -37,7 +38,7 @@ pub type Op = Future<Item = Buf, Error = DenoError> + Send; // Returns (is_sync, op) pub type Dispatch = - fn(state: Arc<IsolateState>, buf: &[u8], data_buf: &'static mut [u8]) + fn(isolate: &mut Isolate, buf: &[u8], data_buf: &'static mut [u8]) -> (bool, Box<Op>); pub struct Isolate { @@ -45,13 +46,13 @@ pub struct Isolate { dispatch: Dispatch, rx: mpsc::Receiver<(i32, Buf)>, ntasks: i32, + pub timeout_due: Option<Instant>, pub state: Arc<IsolateState>, } // Isolate cannot be passed between threads but IsolateState can. So any state that // needs to be accessed outside the main V8 thread should be inside IsolateState. pub struct IsolateState { - pub timeout: AtomicIsize, pub dir: deno_dir::DenoDir, pub timers: Mutex<HashMap<u32, futures::sync::oneshot::Sender<()>>>, pub argv: Vec<String>, @@ -88,8 +89,8 @@ impl Isolate { dispatch, rx, ntasks: 0, + timeout_due: None, state: Arc::new(IsolateState { - timeout: AtomicIsize::new(-1), dir: deno_dir::DenoDir::new(flags.reload, None).unwrap(), timers: Mutex::new(HashMap::new()), argv: argv_rest, @@ -139,17 +140,54 @@ impl Isolate { unsafe { libdeno::deno_respond(self.ptr, req_id, buf.into()) } } + fn complete_op(&mut self, req_id: i32, buf: Buf) { + // Receiving a message on rx exactly corresponds to an async task + // completing. + self.ntasks_decrement(); + // Call into JS with the buf. + self.respond(req_id, buf); + } + + fn timeout(&self) { + let dummy_buf = libdeno::deno_buf { + alloc_ptr: 0 as *mut u8, + alloc_len: 0, + data_ptr: 0 as *mut u8, + data_len: 0, + }; + unsafe { libdeno::deno_respond(self.ptr, -1, dummy_buf) } + } + // TODO Use Park abstraction? Note at time of writing Tokio default runtime // does not have new_with_park(). pub fn event_loop(&mut self) { // Main thread event loop. while !self.is_idle() { - let (req_id, buf) = self.rx.recv().unwrap(); - // Receiving a message on rx exactly corresponds to an async task - // completing. - self.ntasks_decrement(); - // Call into JS with the buf. - self.respond(req_id, buf); + // Ideally, mpsc::Receiver would have a receive method that takes a optional + // timeout. But it doesn't so we need all this duplicate code. + match self.timeout_due { + Some(due) => { + // Subtracting two Instants causes a panic if the resulting duration + // would become negative. Avoid this. + let now = Instant::now(); + let timeout = if due > now { + due - now + } else { + Duration::new(0, 0) + }; + // TODO: use recv_deadline() instead of recv_timeout() when this + // feature becomes stable/available. + match self.rx.recv_timeout(timeout) { + Ok((req_id, buf)) => self.complete_op(req_id, buf), + Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(), + Err(e) => panic!("mpsc::Receiver::recv_timeout() failed: {:?}", e), + } + } + None => match self.rx.recv() { + Ok((req_id, buf)) => self.complete_op(req_id, buf), + Err(e) => panic!("mpsc::Receiver::recv() failed: {:?}", e), + }, + }; } } @@ -164,7 +202,7 @@ impl Isolate { } fn is_idle(&self) -> bool { - self.ntasks == 0 + self.ntasks == 0 && self.timeout_due.is_none() } } @@ -212,8 +250,7 @@ extern "C" fn pre_dispatch( let isolate = Isolate::from_c(d); let dispatch = isolate.dispatch; - let (is_sync, op) = - dispatch(isolate.state.clone(), control_slice, data_slice); + let (is_sync, op) = dispatch(isolate, control_slice, data_slice); if is_sync { // Execute op synchronously. @@ -258,7 +295,7 @@ mod tests { } fn unreachable_dispatch( - _state: Arc<IsolateState>, + _isolate: &mut Isolate, _control: &[u8], _data: &'static mut [u8], ) -> (bool, Box<Op>) { @@ -289,7 +326,7 @@ mod tests { } fn dispatch_sync( - _state: Arc<IsolateState>, + _isolate: &mut Isolate, control: &[u8], data: &'static mut [u8], ) -> (bool, Box<Op>) { diff --git a/src/msg.fbs b/src/msg.fbs index b9989c330..68de166ce 100644 --- a/src/msg.fbs +++ b/src/msg.fbs @@ -128,7 +128,7 @@ table CodeCache { } table SetTimeout { - timeout: int; + timeout: double; } table Exit { |