summaryrefslogtreecommitdiff
path: root/src/isolate.rs
diff options
context:
space:
mode:
authorBert Belder <bertbelder@gmail.com>2018-10-02 17:47:40 -0700
committerBert Belder <bertbelder@gmail.com>2018-10-03 13:27:55 -0700
commitaa691ea26c5dc8fd15b3b60b95a2c23b4888c45d (patch)
treef2e168a022e597ee1ceaf8cf50c1f1fbf5efe4cb /src/isolate.rs
parent6b77acf39ddcb68b26e877f8c4a9dc289cd3691e (diff)
timers: implement timers in javascript
Diffstat (limited to 'src/isolate.rs')
-rw-r--r--src/isolate.rs67
1 files changed, 52 insertions, 15 deletions
diff --git a/src/isolate.rs b/src/isolate.rs
index 8dfd4204a..bdd1ca956 100644
--- a/src/isolate.rs
+++ b/src/isolate.rs
@@ -16,10 +16,11 @@ use std;
use std::collections::HashMap;
use std::ffi::CStr;
use std::ffi::CString;
-use std::sync::atomic::AtomicIsize;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
+use std::time::Duration;
+use std::time::Instant;
use tokio;
use tokio_util;
@@ -37,7 +38,7 @@ pub type Op = Future<Item = Buf, Error = DenoError> + Send;
// Returns (is_sync, op)
pub type Dispatch =
- fn(state: Arc<IsolateState>, buf: &[u8], data_buf: &'static mut [u8])
+ fn(isolate: &mut Isolate, buf: &[u8], data_buf: &'static mut [u8])
-> (bool, Box<Op>);
pub struct Isolate {
@@ -45,13 +46,13 @@ pub struct Isolate {
dispatch: Dispatch,
rx: mpsc::Receiver<(i32, Buf)>,
ntasks: i32,
+ pub timeout_due: Option<Instant>,
pub state: Arc<IsolateState>,
}
// Isolate cannot be passed between threads but IsolateState can. So any state that
// needs to be accessed outside the main V8 thread should be inside IsolateState.
pub struct IsolateState {
- pub timeout: AtomicIsize,
pub dir: deno_dir::DenoDir,
pub timers: Mutex<HashMap<u32, futures::sync::oneshot::Sender<()>>>,
pub argv: Vec<String>,
@@ -88,8 +89,8 @@ impl Isolate {
dispatch,
rx,
ntasks: 0,
+ timeout_due: None,
state: Arc::new(IsolateState {
- timeout: AtomicIsize::new(-1),
dir: deno_dir::DenoDir::new(flags.reload, None).unwrap(),
timers: Mutex::new(HashMap::new()),
argv: argv_rest,
@@ -139,17 +140,54 @@ impl Isolate {
unsafe { libdeno::deno_respond(self.ptr, req_id, buf.into()) }
}
+ fn complete_op(&mut self, req_id: i32, buf: Buf) {
+ // Receiving a message on rx exactly corresponds to an async task
+ // completing.
+ self.ntasks_decrement();
+ // Call into JS with the buf.
+ self.respond(req_id, buf);
+ }
+
+ fn timeout(&self) {
+ let dummy_buf = libdeno::deno_buf {
+ alloc_ptr: 0 as *mut u8,
+ alloc_len: 0,
+ data_ptr: 0 as *mut u8,
+ data_len: 0,
+ };
+ unsafe { libdeno::deno_respond(self.ptr, -1, dummy_buf) }
+ }
+
// TODO Use Park abstraction? Note at time of writing Tokio default runtime
// does not have new_with_park().
pub fn event_loop(&mut self) {
// Main thread event loop.
while !self.is_idle() {
- let (req_id, buf) = self.rx.recv().unwrap();
- // Receiving a message on rx exactly corresponds to an async task
- // completing.
- self.ntasks_decrement();
- // Call into JS with the buf.
- self.respond(req_id, buf);
+ // Ideally, mpsc::Receiver would have a receive method that takes a optional
+ // timeout. But it doesn't so we need all this duplicate code.
+ match self.timeout_due {
+ Some(due) => {
+ // Subtracting two Instants causes a panic if the resulting duration
+ // would become negative. Avoid this.
+ let now = Instant::now();
+ let timeout = if due > now {
+ due - now
+ } else {
+ Duration::new(0, 0)
+ };
+ // TODO: use recv_deadline() instead of recv_timeout() when this
+ // feature becomes stable/available.
+ match self.rx.recv_timeout(timeout) {
+ Ok((req_id, buf)) => self.complete_op(req_id, buf),
+ Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(),
+ Err(e) => panic!("mpsc::Receiver::recv_timeout() failed: {:?}", e),
+ }
+ }
+ None => match self.rx.recv() {
+ Ok((req_id, buf)) => self.complete_op(req_id, buf),
+ Err(e) => panic!("mpsc::Receiver::recv() failed: {:?}", e),
+ },
+ };
}
}
@@ -164,7 +202,7 @@ impl Isolate {
}
fn is_idle(&self) -> bool {
- self.ntasks == 0
+ self.ntasks == 0 && self.timeout_due.is_none()
}
}
@@ -212,8 +250,7 @@ extern "C" fn pre_dispatch(
let isolate = Isolate::from_c(d);
let dispatch = isolate.dispatch;
- let (is_sync, op) =
- dispatch(isolate.state.clone(), control_slice, data_slice);
+ let (is_sync, op) = dispatch(isolate, control_slice, data_slice);
if is_sync {
// Execute op synchronously.
@@ -258,7 +295,7 @@ mod tests {
}
fn unreachable_dispatch(
- _state: Arc<IsolateState>,
+ _isolate: &mut Isolate,
_control: &[u8],
_data: &'static mut [u8],
) -> (bool, Box<Op>) {
@@ -289,7 +326,7 @@ mod tests {
}
fn dispatch_sync(
- _state: Arc<IsolateState>,
+ _isolate: &mut Isolate,
control: &[u8],
data: &'static mut [u8],
) -> (bool, Box<Op>) {