diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/global_timer.rs | 49 | ||||
-rw-r--r-- | src/isolate.rs | 46 | ||||
-rw-r--r-- | src/main.rs | 1 | ||||
-rw-r--r-- | src/msg.fbs | 10 | ||||
-rw-r--r-- | src/ops.rs | 55 | ||||
-rw-r--r-- | src/tokio_util.rs | 8 |
6 files changed, 111 insertions, 58 deletions
diff --git a/src/global_timer.rs b/src/global_timer.rs new file mode 100644 index 000000000..eef70ddc2 --- /dev/null +++ b/src/global_timer.rs @@ -0,0 +1,49 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. + +//! This module helps deno implement timers. +//! +//! As an optimization, we want to avoid an expensive calls into rust for every +//! setTimeout in JavaScript. Thus in //js/timers.ts a data structure is +//! implemented that calls into Rust for only the smallest timeout. Thus we +//! only need to be able to start and cancel a single timer (or Delay, as Tokio +//! calls it) for an entire Isolate. This is what is implemented here. + +use crate::tokio_util::panic_on_error; +use futures::Future; +use std::time::Instant; +use tokio::sync::oneshot; +use tokio::timer::Delay; + +pub struct GlobalTimer { + tx: Option<oneshot::Sender<()>>, +} + +impl GlobalTimer { + pub fn new() -> Self { + Self { tx: None } + } + + pub fn cancel(&mut self) { + if let Some(tx) = self.tx.take() { + tx.send(()).ok(); + } + } + + pub fn new_timeout( + &mut self, + deadline: Instant, + ) -> impl Future<Item = (), Error = ()> { + if self.tx.is_some() { + self.cancel(); + } + assert!(self.tx.is_none()); + + let (tx, rx) = oneshot::channel(); + self.tx = Some(tx); + + let delay = panic_on_error(Delay::new(deadline)); + let rx = panic_on_error(rx); + + delay.select(rx).then(|_| Ok(())) + } +} diff --git a/src/isolate.rs b/src/isolate.rs index d4f0f2539..8a77777d2 100644 --- a/src/isolate.rs +++ b/src/isolate.rs @@ -12,6 +12,7 @@ use crate::errors::DenoError; use crate::errors::DenoResult; use crate::errors::RustOrJsError; use crate::flags; +use crate::global_timer::GlobalTimer; use crate::isolate_init::IsolateInit; use crate::js_errors::apply_source_map; use crate::libdeno; @@ -35,8 +36,6 @@ use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; use std::sync::{Once, ONCE_INIT}; -use std::time::Duration; -use std::time::Instant; use tokio; // Buf represents a byte array returned from a "Op". @@ -62,7 +61,6 @@ pub struct Isolate { rx: mpsc::Receiver<(usize, Buf)>, tx: mpsc::Sender<(usize, Buf)>, ntasks: Cell<i32>, - timeout_due: Cell<Option<Instant>>, pub modules: RefCell<Modules>, pub state: Arc<IsolateState>, pub permissions: Arc<DenoPermissions>, @@ -83,6 +81,7 @@ pub struct IsolateState { pub flags: flags::DenoFlags, pub metrics: Metrics, pub worker_channels: Option<Mutex<WorkerChannels>>, + pub global_timer: Mutex<GlobalTimer>, } impl IsolateState { @@ -100,6 +99,7 @@ impl IsolateState { flags, metrics: Metrics::default(), worker_channels: worker_channels.map(Mutex::new), + global_timer: Mutex::new(GlobalTimer::new()), } } @@ -194,7 +194,6 @@ impl Isolate { rx, tx, ntasks: Cell::new(0), - timeout_due: Cell::new(None), modules: RefCell::new(Modules::new()), state, permissions: Arc::new(permissions), @@ -223,16 +222,6 @@ impl Isolate { } #[inline] - pub fn get_timeout_due(&self) -> Option<Instant> { - self.timeout_due.clone().into_inner() - } - - #[inline] - pub fn set_timeout_due(&self, inst: Option<Instant>) { - self.timeout_due.set(inst); - } - - #[inline] pub fn check_read(&self, filename: &str) -> DenoResult<()> { self.permissions.check_read(filename) } @@ -463,10 +452,9 @@ impl Isolate { pub fn event_loop(&self) -> Result<(), JSError> { // Main thread event loop. while !self.is_idle() { - match recv_deadline(&self.rx, self.get_timeout_due()) { + match self.rx.recv() { Ok((zero_copy_id, buf)) => self.complete_op(zero_copy_id, buf), - Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(), - Err(e) => panic!("recv_deadline() failed: {:?}", e), + Err(e) => panic!("Isolate.rx.recv() failed: {:?}", e), } self.check_promise_errors(); if let Some(err) = self.last_exception() { @@ -495,7 +483,7 @@ impl Isolate { #[inline] fn is_idle(&self) -> bool { - self.ntasks.get() == 0 && self.get_timeout_due().is_none() + self.ntasks.get() == 0 } } @@ -596,28 +584,6 @@ extern "C" fn pre_dispatch( } } -fn recv_deadline<T>( - rx: &mpsc::Receiver<T>, - maybe_due: Option<Instant>, -) -> Result<T, mpsc::RecvTimeoutError> { - match maybe_due { - None => rx.recv().map_err(|e| e.into()), - Some(due) => { - // Subtracting two Instants causes a panic if the resulting duration - // would become negative. Avoid this. - let now = Instant::now(); - let timeout = if due > now { - due - now - } else { - Duration::new(0, 0) - }; - // TODO: use recv_deadline() instead of recv_timeout() when this - // feature becomes stable/available. - rx.recv_timeout(timeout) - } - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/main.rs b/src/main.rs index 48d96b04a..7bafe5c3c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,7 @@ pub mod deno_dir; pub mod errors; pub mod flags; mod fs; +mod global_timer; mod http_body; mod http_util; pub mod isolate; diff --git a/src/msg.fbs b/src/msg.fbs index 4d54b185e..279264a45 100644 --- a/src/msg.fbs +++ b/src/msg.fbs @@ -16,6 +16,9 @@ union Any { FetchRes, FormatError, FormatErrorRes, + GlobalTimer, + GlobalTimerRes, + GlobalTimerStop, IsTTY, IsTTYRes, Listen, @@ -55,7 +58,6 @@ union Any { RunStatusRes, Seek, SetEnv, - SetTimeout, Shutdown, Start, StartRes, @@ -210,10 +212,14 @@ table Chdir { directory: string; } -table SetTimeout { +table GlobalTimer { timeout: int; } +table GlobalTimerRes { } + +table GlobalTimerStop { } + table Exit { code: int; } diff --git a/src/ops.rs b/src/ops.rs index f4ee2434e..17cb008db 100644 --- a/src/ops.rs +++ b/src/ops.rs @@ -80,13 +80,7 @@ pub fn dispatch( let inner_type = base.inner_type(); let cmd_id = base.cmd_id(); - let op: Box<Op> = if inner_type == msg::Any::SetTimeout { - // SetTimeout is an exceptional op: the global timeout field is part of the - // Isolate state (not the IsolateState state) and it must be updated on the - // main thread. - assert_eq!(is_sync, true); - op_set_timeout(isolate, &base, data) - } else { + let op: Box<Op> = { // Handle regular ops. let op_creator: OpCreator = match inner_type { msg::Any::Accept => op_accept, @@ -101,6 +95,8 @@ pub fn dispatch( msg::Any::Fetch => op_fetch, msg::Any::FetchModuleMetaData => op_fetch_module_meta_data, msg::Any::FormatError => op_format_error, + msg::Any::GlobalTimer => op_global_timer, + msg::Any::GlobalTimerStop => op_global_timer_stop, msg::Any::IsTTY => op_is_tty, msg::Any::Listen => op_listen, msg::Any::MakeTempDir => op_make_temp_dir, @@ -440,23 +436,50 @@ fn op_chdir( }())) } -fn op_set_timeout( +fn op_global_timer_stop( isolate: &Isolate, base: &msg::Base<'_>, data: libdeno::deno_buf, ) -> Box<Op> { + assert!(base.sync()); assert_eq!(data.len(), 0); - let inner = base.inner_as_set_timeout().unwrap(); - let val = inner.timeout(); - let timeout_due = if val >= 0 { - Some(Instant::now() + Duration::from_millis(val as u64)) - } else { - None - }; - isolate.set_timeout_due(timeout_due); + let mut t = isolate.state.global_timer.lock().unwrap(); + t.cancel(); ok_future(empty_buf()) } +fn op_global_timer( + isolate: &Isolate, + base: &msg::Base<'_>, + data: libdeno::deno_buf, +) -> Box<Op> { + assert!(!base.sync()); + assert_eq!(data.len(), 0); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_global_timer().unwrap(); + let val = inner.timeout(); + assert!(val >= 0); + + let mut t = isolate.state.global_timer.lock().unwrap(); + let deadline = Instant::now() + Duration::from_millis(val as u64); + let f = t.new_timeout(deadline); + + Box::new(f.then(move |_| { + let builder = &mut FlatBufferBuilder::new(); + let inner = + msg::GlobalTimerRes::create(builder, &msg::GlobalTimerResArgs {}); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(inner.as_union_value()), + inner_type: msg::Any::GlobalTimerRes, + ..Default::default() + }, + )) + })) +} + fn op_set_env( isolate: &Isolate, base: &msg::Base<'_>, diff --git a/src/tokio_util.rs b/src/tokio_util.rs index 3dddff9c2..ef66f4610 100644 --- a/src/tokio_util.rs +++ b/src/tokio_util.rs @@ -122,3 +122,11 @@ where f() } } + +pub fn panic_on_error<I, E, F>(f: F) -> impl Future<Item = I, Error = ()> +where + F: Future<Item = I, Error = E>, + E: std::fmt::Debug, +{ + f.map_err(|err| panic!("Future got unexpected error: {:?}", err)) +} |