summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2019-03-10 15:37:05 -0400
committerRyan Dahl <ry@tinyclouds.org>2019-03-12 19:25:57 -0400
commit58cc69f672f91841984fc4e1e9bcfb1a75362677 (patch)
treea3e50ff11dfe8348c6574eeedcd2577e3690b440
parent9691d7b53bea5a2656ec47e8437fac1f527b3cce (diff)
Make timers act like normal ops
This is in preperation for core integration.
-rw-r--r--js/dispatch.ts35
-rw-r--r--js/timers.ts56
-rw-r--r--src/global_timer.rs49
-rw-r--r--src/isolate.rs46
-rw-r--r--src/main.rs1
-rw-r--r--src/msg.fbs10
-rw-r--r--src/ops.rs55
-rw-r--r--src/tokio_util.rs8
8 files changed, 155 insertions, 105 deletions
diff --git a/js/dispatch.ts b/js/dispatch.ts
index 537291877..9dcd2f420 100644
--- a/js/dispatch.ts
+++ b/js/dispatch.ts
@@ -8,31 +8,20 @@ import * as util from "./util";
let nextCmdId = 0;
const promiseTable = new Map<number, util.Resolvable<msg.Base>>();
-let fireTimers: () => void;
-
-export function setFireTimersCallback(fn: () => void): void {
- fireTimers = fn;
-}
-
export function handleAsyncMsgFromRust(ui8: Uint8Array): void {
- // If a the buffer is empty, recv() on the native side timed out and we
- // did not receive a message.
- if (ui8 && ui8.length) {
- const bb = new flatbuffers.ByteBuffer(ui8);
- const base = msg.Base.getRootAsBase(bb);
- const cmdId = base.cmdId();
- const promise = promiseTable.get(cmdId);
- util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
- promiseTable.delete(cmdId);
- const err = errors.maybeError(base);
- if (err != null) {
- promise!.reject(err);
- } else {
- promise!.resolve(base);
- }
+ util.assert(ui8 != null && ui8.length > 0);
+ const bb = new flatbuffers.ByteBuffer(ui8);
+ const base = msg.Base.getRootAsBase(bb);
+ const cmdId = base.cmdId();
+ const promise = promiseTable.get(cmdId);
+ util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
+ promiseTable.delete(cmdId);
+ const err = errors.maybeError(base);
+ if (err != null) {
+ promise!.reject(err);
+ } else {
+ promise!.resolve(base);
}
- // Fire timers that have become runnable.
- fireTimers();
}
function sendInternal(
diff --git a/js/timers.ts b/js/timers.ts
index d06056cf2..4089d5f8b 100644
--- a/js/timers.ts
+++ b/js/timers.ts
@@ -2,7 +2,7 @@
import { assert } from "./util";
import * as msg from "gen/msg_generated";
import * as flatbuffers from "./flatbuffers";
-import { sendSync, setFireTimersCallback } from "./dispatch";
+import { sendAsync, sendSync } from "./dispatch";
interface Timer {
id: number;
@@ -37,28 +37,39 @@ function getTime(): number {
return now;
}
-function setGlobalTimeout(due: number | null, now: number): void {
+function clearGlobalTimeout(): void {
+ const builder = flatbuffers.createBuilder();
+ msg.GlobalTimerStop.startGlobalTimerStop(builder);
+ const inner = msg.GlobalTimerStop.endGlobalTimerStop(builder);
+ globalTimeoutDue = null;
+ let res = sendSync(builder, msg.Any.GlobalTimerStop, inner);
+ assert(res == null);
+}
+
+async function setGlobalTimeout(due: number, now: number): Promise<void> {
// Since JS and Rust don't use the same clock, pass the time to rust as a
// relative time value. On the Rust side we'll turn that into an absolute
// value again.
- // Note that a negative time-out value stops the global timer.
- let timeout;
- if (due === null) {
- timeout = -1;
- } else {
- timeout = due - now;
- assert(timeout >= 0);
- }
+ let timeout = due - now;
+ assert(timeout >= 0);
// Send message to the backend.
const builder = flatbuffers.createBuilder();
- msg.SetTimeout.startSetTimeout(builder);
- msg.SetTimeout.addTimeout(builder, timeout);
- const inner = msg.SetTimeout.endSetTimeout(builder);
- const res = sendSync(builder, msg.Any.SetTimeout, inner);
- assert(res == null);
- // Remember when when the global timer will fire.
+ msg.GlobalTimer.startGlobalTimer(builder);
+ msg.GlobalTimer.addTimeout(builder, timeout);
+ const inner = msg.GlobalTimer.endGlobalTimer(builder);
globalTimeoutDue = due;
+ await sendAsync(builder, msg.Any.GlobalTimer, inner);
+ // eslint-disable-next-line @typescript-eslint/no-use-before-define
+ fireTimers();
+}
+
+function setOrClearGlobalTimeout(due: number | null, now: number): void {
+ if (due == null) {
+ clearGlobalTimeout();
+ } else {
+ setGlobalTimeout(due, now);
+ }
}
function schedule(timer: Timer, now: number): void {
@@ -75,7 +86,7 @@ function schedule(timer: Timer, now: number): void {
// If the new timer is scheduled to fire before any timer that existed before,
// update the global timeout to reflect this.
if (globalTimeoutDue === null || globalTimeoutDue > timer.due) {
- setGlobalTimeout(timer.due, now);
+ setOrClearGlobalTimeout(timer.due, now);
}
}
@@ -97,7 +108,7 @@ function unschedule(timer: Timer): void {
nextTimerDue = Number(key);
break;
}
- setGlobalTimeout(nextTimerDue, getTime());
+ setOrClearGlobalTimeout(nextTimerDue, getTime());
}
} else {
// Multiple timers that are due at the same point in time.
@@ -162,9 +173,10 @@ function fireTimers(): void {
Promise.resolve(timer).then(fire);
}
}
+
// Update the global alarm to go off when the first-up timer that hasn't fired
// yet is due.
- setGlobalTimeout(nextTimerDue, now);
+ setOrClearGlobalTimeout(nextTimerDue, now);
}
export type Args = unknown[];
@@ -226,7 +238,7 @@ export function setInterval(
return setTimer(cb, delay, args, true);
}
-/** Clears a previously set timer by id. */
+/** Clears a previously set timer by id. AKA clearTimeout and clearInterval. */
export function clearTimer(id: number): void {
const timer = idMap.get(id);
if (timer === undefined) {
@@ -237,7 +249,3 @@ export function clearTimer(id: number): void {
unschedule(timer);
idMap.delete(timer.id);
}
-
-// Tell the dispatcher which function it should call to fire timers that are
-// due. This is done using a callback because circular imports are disallowed.
-setFireTimersCallback(fireTimers);
diff --git a/src/global_timer.rs b/src/global_timer.rs
new file mode 100644
index 000000000..eef70ddc2
--- /dev/null
+++ b/src/global_timer.rs
@@ -0,0 +1,49 @@
+// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+
+//! This module helps deno implement timers.
+//!
+//! As an optimization, we want to avoid an expensive calls into rust for every
+//! setTimeout in JavaScript. Thus in //js/timers.ts a data structure is
+//! implemented that calls into Rust for only the smallest timeout. Thus we
+//! only need to be able to start and cancel a single timer (or Delay, as Tokio
+//! calls it) for an entire Isolate. This is what is implemented here.
+
+use crate::tokio_util::panic_on_error;
+use futures::Future;
+use std::time::Instant;
+use tokio::sync::oneshot;
+use tokio::timer::Delay;
+
+pub struct GlobalTimer {
+ tx: Option<oneshot::Sender<()>>,
+}
+
+impl GlobalTimer {
+ pub fn new() -> Self {
+ Self { tx: None }
+ }
+
+ pub fn cancel(&mut self) {
+ if let Some(tx) = self.tx.take() {
+ tx.send(()).ok();
+ }
+ }
+
+ pub fn new_timeout(
+ &mut self,
+ deadline: Instant,
+ ) -> impl Future<Item = (), Error = ()> {
+ if self.tx.is_some() {
+ self.cancel();
+ }
+ assert!(self.tx.is_none());
+
+ let (tx, rx) = oneshot::channel();
+ self.tx = Some(tx);
+
+ let delay = panic_on_error(Delay::new(deadline));
+ let rx = panic_on_error(rx);
+
+ delay.select(rx).then(|_| Ok(()))
+ }
+}
diff --git a/src/isolate.rs b/src/isolate.rs
index d4f0f2539..8a77777d2 100644
--- a/src/isolate.rs
+++ b/src/isolate.rs
@@ -12,6 +12,7 @@ use crate::errors::DenoError;
use crate::errors::DenoResult;
use crate::errors::RustOrJsError;
use crate::flags;
+use crate::global_timer::GlobalTimer;
use crate::isolate_init::IsolateInit;
use crate::js_errors::apply_source_map;
use crate::libdeno;
@@ -35,8 +36,6 @@ use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::{Once, ONCE_INIT};
-use std::time::Duration;
-use std::time::Instant;
use tokio;
// Buf represents a byte array returned from a "Op".
@@ -62,7 +61,6 @@ pub struct Isolate {
rx: mpsc::Receiver<(usize, Buf)>,
tx: mpsc::Sender<(usize, Buf)>,
ntasks: Cell<i32>,
- timeout_due: Cell<Option<Instant>>,
pub modules: RefCell<Modules>,
pub state: Arc<IsolateState>,
pub permissions: Arc<DenoPermissions>,
@@ -83,6 +81,7 @@ pub struct IsolateState {
pub flags: flags::DenoFlags,
pub metrics: Metrics,
pub worker_channels: Option<Mutex<WorkerChannels>>,
+ pub global_timer: Mutex<GlobalTimer>,
}
impl IsolateState {
@@ -100,6 +99,7 @@ impl IsolateState {
flags,
metrics: Metrics::default(),
worker_channels: worker_channels.map(Mutex::new),
+ global_timer: Mutex::new(GlobalTimer::new()),
}
}
@@ -194,7 +194,6 @@ impl Isolate {
rx,
tx,
ntasks: Cell::new(0),
- timeout_due: Cell::new(None),
modules: RefCell::new(Modules::new()),
state,
permissions: Arc::new(permissions),
@@ -223,16 +222,6 @@ impl Isolate {
}
#[inline]
- pub fn get_timeout_due(&self) -> Option<Instant> {
- self.timeout_due.clone().into_inner()
- }
-
- #[inline]
- pub fn set_timeout_due(&self, inst: Option<Instant>) {
- self.timeout_due.set(inst);
- }
-
- #[inline]
pub fn check_read(&self, filename: &str) -> DenoResult<()> {
self.permissions.check_read(filename)
}
@@ -463,10 +452,9 @@ impl Isolate {
pub fn event_loop(&self) -> Result<(), JSError> {
// Main thread event loop.
while !self.is_idle() {
- match recv_deadline(&self.rx, self.get_timeout_due()) {
+ match self.rx.recv() {
Ok((zero_copy_id, buf)) => self.complete_op(zero_copy_id, buf),
- Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(),
- Err(e) => panic!("recv_deadline() failed: {:?}", e),
+ Err(e) => panic!("Isolate.rx.recv() failed: {:?}", e),
}
self.check_promise_errors();
if let Some(err) = self.last_exception() {
@@ -495,7 +483,7 @@ impl Isolate {
#[inline]
fn is_idle(&self) -> bool {
- self.ntasks.get() == 0 && self.get_timeout_due().is_none()
+ self.ntasks.get() == 0
}
}
@@ -596,28 +584,6 @@ extern "C" fn pre_dispatch(
}
}
-fn recv_deadline<T>(
- rx: &mpsc::Receiver<T>,
- maybe_due: Option<Instant>,
-) -> Result<T, mpsc::RecvTimeoutError> {
- match maybe_due {
- None => rx.recv().map_err(|e| e.into()),
- Some(due) => {
- // Subtracting two Instants causes a panic if the resulting duration
- // would become negative. Avoid this.
- let now = Instant::now();
- let timeout = if due > now {
- due - now
- } else {
- Duration::new(0, 0)
- };
- // TODO: use recv_deadline() instead of recv_timeout() when this
- // feature becomes stable/available.
- rx.recv_timeout(timeout)
- }
- }
-}
-
#[cfg(test)]
mod tests {
use super::*;
diff --git a/src/main.rs b/src/main.rs
index 48d96b04a..7bafe5c3c 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -14,6 +14,7 @@ pub mod deno_dir;
pub mod errors;
pub mod flags;
mod fs;
+mod global_timer;
mod http_body;
mod http_util;
pub mod isolate;
diff --git a/src/msg.fbs b/src/msg.fbs
index 4d54b185e..279264a45 100644
--- a/src/msg.fbs
+++ b/src/msg.fbs
@@ -16,6 +16,9 @@ union Any {
FetchRes,
FormatError,
FormatErrorRes,
+ GlobalTimer,
+ GlobalTimerRes,
+ GlobalTimerStop,
IsTTY,
IsTTYRes,
Listen,
@@ -55,7 +58,6 @@ union Any {
RunStatusRes,
Seek,
SetEnv,
- SetTimeout,
Shutdown,
Start,
StartRes,
@@ -210,10 +212,14 @@ table Chdir {
directory: string;
}
-table SetTimeout {
+table GlobalTimer {
timeout: int;
}
+table GlobalTimerRes { }
+
+table GlobalTimerStop { }
+
table Exit {
code: int;
}
diff --git a/src/ops.rs b/src/ops.rs
index f4ee2434e..17cb008db 100644
--- a/src/ops.rs
+++ b/src/ops.rs
@@ -80,13 +80,7 @@ pub fn dispatch(
let inner_type = base.inner_type();
let cmd_id = base.cmd_id();
- let op: Box<Op> = if inner_type == msg::Any::SetTimeout {
- // SetTimeout is an exceptional op: the global timeout field is part of the
- // Isolate state (not the IsolateState state) and it must be updated on the
- // main thread.
- assert_eq!(is_sync, true);
- op_set_timeout(isolate, &base, data)
- } else {
+ let op: Box<Op> = {
// Handle regular ops.
let op_creator: OpCreator = match inner_type {
msg::Any::Accept => op_accept,
@@ -101,6 +95,8 @@ pub fn dispatch(
msg::Any::Fetch => op_fetch,
msg::Any::FetchModuleMetaData => op_fetch_module_meta_data,
msg::Any::FormatError => op_format_error,
+ msg::Any::GlobalTimer => op_global_timer,
+ msg::Any::GlobalTimerStop => op_global_timer_stop,
msg::Any::IsTTY => op_is_tty,
msg::Any::Listen => op_listen,
msg::Any::MakeTempDir => op_make_temp_dir,
@@ -440,23 +436,50 @@ fn op_chdir(
}()))
}
-fn op_set_timeout(
+fn op_global_timer_stop(
isolate: &Isolate,
base: &msg::Base<'_>,
data: libdeno::deno_buf,
) -> Box<Op> {
+ assert!(base.sync());
assert_eq!(data.len(), 0);
- let inner = base.inner_as_set_timeout().unwrap();
- let val = inner.timeout();
- let timeout_due = if val >= 0 {
- Some(Instant::now() + Duration::from_millis(val as u64))
- } else {
- None
- };
- isolate.set_timeout_due(timeout_due);
+ let mut t = isolate.state.global_timer.lock().unwrap();
+ t.cancel();
ok_future(empty_buf())
}
+fn op_global_timer(
+ isolate: &Isolate,
+ base: &msg::Base<'_>,
+ data: libdeno::deno_buf,
+) -> Box<Op> {
+ assert!(!base.sync());
+ assert_eq!(data.len(), 0);
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_global_timer().unwrap();
+ let val = inner.timeout();
+ assert!(val >= 0);
+
+ let mut t = isolate.state.global_timer.lock().unwrap();
+ let deadline = Instant::now() + Duration::from_millis(val as u64);
+ let f = t.new_timeout(deadline);
+
+ Box::new(f.then(move |_| {
+ let builder = &mut FlatBufferBuilder::new();
+ let inner =
+ msg::GlobalTimerRes::create(builder, &msg::GlobalTimerResArgs {});
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(inner.as_union_value()),
+ inner_type: msg::Any::GlobalTimerRes,
+ ..Default::default()
+ },
+ ))
+ }))
+}
+
fn op_set_env(
isolate: &Isolate,
base: &msg::Base<'_>,
diff --git a/src/tokio_util.rs b/src/tokio_util.rs
index 3dddff9c2..ef66f4610 100644
--- a/src/tokio_util.rs
+++ b/src/tokio_util.rs
@@ -122,3 +122,11 @@ where
f()
}
}
+
+pub fn panic_on_error<I, E, F>(f: F) -> impl Future<Item = I, Error = ()>
+where
+ F: Future<Item = I, Error = E>,
+ E: std::fmt::Debug,
+{
+ f.map_err(|err| panic!("Future got unexpected error: {:?}", err))
+}