summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBert Belder <bertbelder@gmail.com>2018-10-02 17:47:40 -0700
committerBert Belder <bertbelder@gmail.com>2018-10-03 13:27:55 -0700
commitaa691ea26c5dc8fd15b3b60b95a2c23b4888c45d (patch)
treef2e168a022e597ee1ceaf8cf50c1f1fbf5efe4cb
parent6b77acf39ddcb68b26e877f8c4a9dc289cd3691e (diff)
timers: implement timers in javascript
-rw-r--r--js/deno.ts1
-rw-r--r--js/dispatch.ts34
-rw-r--r--js/timers.ts268
-rw-r--r--js/timers_test.ts5
-rw-r--r--src/handlers.rs84
-rw-r--r--src/isolate.rs67
-rw-r--r--src/msg.fbs2
7 files changed, 317 insertions, 144 deletions
diff --git a/js/deno.ts b/js/deno.ts
index 03f3d1a89..91f7ae1af 100644
--- a/js/deno.ts
+++ b/js/deno.ts
@@ -19,5 +19,4 @@ export { libdeno } from "./libdeno";
export { arch, platform } from "./platform";
export { trace } from "./trace";
export { truncateSync, truncate } from "./truncate";
-export { setGlobalTimeout } from "./timers";
export const args: string[] = [];
diff --git a/js/dispatch.ts b/js/dispatch.ts
index 9257be767..56d280ff9 100644
--- a/js/dispatch.ts
+++ b/js/dispatch.ts
@@ -9,19 +9,31 @@ import { maybePushTrace } from "./trace";
let nextCmdId = 0;
const promiseTable = new Map<number, util.Resolvable<fbs.Base>>();
+let fireTimers: () => void;
+
+export function setFireTimersCallback(fn: () => void) {
+ fireTimers = fn;
+}
+
export function handleAsyncMsgFromRust(ui8: Uint8Array) {
- const bb = new flatbuffers.ByteBuffer(ui8);
- const base = fbs.Base.getRootAsBase(bb);
- const cmdId = base.cmdId();
- const promise = promiseTable.get(cmdId);
- util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
- promiseTable.delete(cmdId);
- const err = errors.maybeError(base);
- if (err != null) {
- promise!.reject(err);
- } else {
- promise!.resolve(base);
+ // If a the buffer is empty, recv() on the native side timed out and we
+ // did not receive a message.
+ if (ui8.length) {
+ const bb = new flatbuffers.ByteBuffer(ui8);
+ const base = fbs.Base.getRootAsBase(bb);
+ const cmdId = base.cmdId();
+ const promise = promiseTable.get(cmdId);
+ util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
+ promiseTable.delete(cmdId);
+ const err = errors.maybeError(base);
+ if (err != null) {
+ promise!.reject(err);
+ } else {
+ promise!.resolve(base);
+ }
}
+ // Fire timers that have become runnable.
+ fireTimers();
}
// @internal
diff --git a/js/timers.ts b/js/timers.ts
index 03badfd46..79aefb645 100644
--- a/js/timers.ts
+++ b/js/timers.ts
@@ -1,107 +1,225 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
import { assert } from "./util";
-import * as util from "./util";
import * as fbs from "gen/msg_generated";
import { flatbuffers } from "flatbuffers";
-import { sendSync, sendAsync } from "./dispatch";
+import { sendSync, setFireTimersCallback } from "./dispatch";
-let nextTimerId = 1;
-
-// tslint:disable-next-line:no-any
-export type TimerCallback = (...args: any[]) => void;
+// Tell the dispatcher which function it should call to fire timers that are
+// due. This is done using a callback because circular imports are disallowed.
+setFireTimersCallback(fireTimers);
interface Timer {
id: number;
- cb: TimerCallback;
- interval: boolean;
- // tslint:disable-next-line:no-any
- args: any[];
- delay: number; // milliseconds
+ callback: () => void;
+ delay: number;
+ due: number;
+ repeat: boolean;
+ scheduled: boolean;
+}
+
+// We'll subtract EPOCH every time we retrieve the time with Date.now(). This
+// ensures that absolute time values stay below UINT32_MAX - 2, which is the
+// maximum object key that EcmaScript considers "numerical". After running for
+// about a month, this is no longer true, and Deno explodes.
+// TODO(piscisaureus): fix that ^.
+const EPOCH = Date.now();
+const APOCALYPS = 2 ** 32 - 2;
+
+let globalTimeoutDue: number | null = null;
+
+let nextTimerId = 1;
+const idMap = new Map<number, Timer>();
+const dueMap: { [due: number]: Timer[] } = Object.create(null);
+
+function getTime() {
+ // TODO: use a monotonic clock.
+ const now = Date.now() - EPOCH;
+ assert(now >= 0 && now < APOCALYPS);
+ return now;
}
-export function setGlobalTimeout(timeout: number) {
+function setGlobalTimeout(due: number | null, now: number) {
+ // Since JS and Rust don't use the same clock, pass the time to rust as a
+ // relative time value. On the Rust side we'll turn that into an absolute
+ // value again.
+ // Note that a negative time-out value stops the global timer.
+ let timeout;
+ if (due === null) {
+ timeout = -1;
+ } else {
+ timeout = due - now;
+ assert(timeout >= 0);
+ }
+ // Send message to the backend.
const builder = new flatbuffers.Builder();
fbs.SetTimeout.startSetTimeout(builder);
fbs.SetTimeout.addTimeout(builder, timeout);
const msg = fbs.SetTimeout.endSetTimeout(builder);
const res = sendSync(builder, fbs.Any.SetTimeout, msg);
assert(res == null);
+ // Remember when when the global timer will fire.
+ globalTimeoutDue = due;
}
-function startTimer(
- id: number,
- cb: TimerCallback,
- delay: number,
- interval: boolean,
- // tslint:disable-next-line:no-any
- args: any[]
-): void {
- const timer: Timer = {
- id,
- interval,
- delay,
- args,
- cb
- };
- util.log("timers.ts startTimer");
-
- // Send TimerStart message
- const builder = new flatbuffers.Builder();
- fbs.TimerStart.startTimerStart(builder);
- fbs.TimerStart.addId(builder, timer.id);
- fbs.TimerStart.addDelay(builder, timer.delay);
- const msg = fbs.TimerStart.endTimerStart(builder);
+function schedule(timer: Timer, now: number) {
+ assert(!timer.scheduled);
+ assert(now <= timer.due);
+ // Find or create the list of timers that will fire at point-in-time `due`.
+ let list = dueMap[timer.due];
+ if (list === undefined) {
+ list = dueMap[timer.due] = [];
+ }
+ // Append the newly scheduled timer to the list and mark it as scheduled.
+ list.push(timer);
+ timer.scheduled = true;
+ // If the new timer is scheduled to fire before any timer that existed before,
+ // update the global timeout to reflect this.
+ if (globalTimeoutDue === null || globalTimeoutDue > timer.due) {
+ setGlobalTimeout(timer.due, now);
+ }
+}
- sendAsync(builder, fbs.Any.TimerStart, msg).then(
- baseRes => {
- assert(fbs.Any.TimerReady === baseRes!.msgType());
- const msg = new fbs.TimerReady();
- assert(baseRes!.msg(msg) != null);
- assert(msg.id() === timer.id);
- if (msg.canceled()) {
- util.log("timer canceled message");
- } else {
- cb(...args);
- if (interval) {
- // TODO Faking setInterval with setTimeout.
- // We need a new timer implementation, this is just a stopgap.
- startTimer(id, cb, delay, true, args);
- }
+function unschedule(timer: Timer) {
+ if (!timer.scheduled) {
+ return;
+ }
+ // Find the list of timers that will fire at point-in-time `due`.
+ const list = dueMap[timer.due];
+ if (list.length === 1) {
+ // Time timer is the only one in the list. Remove the entire list.
+ assert(list[0] === timer);
+ delete dueMap[timer.due];
+ // If the unscheduled timer was 'next up', find when the next timer that
+ // still exists is due, and update the global alarm accordingly.
+ if (timer.due === globalTimeoutDue) {
+ let nextTimerDue: number | null = null;
+ for (const key in dueMap) {
+ nextTimerDue = Number(key);
+ break;
}
- },
- error => {
- throw error;
+ setGlobalTimeout(nextTimerDue, getTime());
}
- );
+ } else {
+ // Multiple timers that are due at the same point in time.
+ // Remove this timer from the list.
+ const index = list.indexOf(timer);
+ assert(index > 0);
+ list.splice(index, 1);
+ }
}
-export function setTimeout(
- cb: TimerCallback,
+function fire(timer: Timer) {
+ // If the timer isn't found in the ID map, that means it has been cancelled
+ // between the timer firing and the promise callback (this function).
+ if (!idMap.has(timer.id)) {
+ return;
+ }
+ // Reschedule the timer if it is a repeating one, otherwise drop it.
+ if (!timer.repeat) {
+ // One-shot timer: remove the timer from this id-to-timer map.
+ idMap.delete(timer.id);
+ } else {
+ // Interval timer: compute when timer was supposed to fire next.
+ // However make sure to never schedule the next interval in the past.
+ const now = getTime();
+ timer.due = Math.max(now, timer.due + timer.delay);
+ schedule(timer, now);
+ }
+ // Call the user callback. Intermediate assignment is to avoid leaking `this`
+ // to it, while also keeping the stack trace neat when it shows up in there.
+ const callback = timer.callback;
+ callback();
+}
+
+function fireTimers() {
+ const now = getTime();
+ // Bail out if we're not expecting the global timer to fire (yet).
+ if (globalTimeoutDue === null || now < globalTimeoutDue) {
+ return;
+ }
+ // After firing the timers that are due now, this will hold the due time of
+ // the first timer that hasn't fired yet.
+ let nextTimerDue: number | null = null;
+ // Walk over the keys of the 'due' map. Since dueMap is actually a regular
+ // object and its keys are numerical and smaller than UINT32_MAX - 2,
+ // keys are iterated in ascending order.
+ for (const key in dueMap) {
+ // Convert the object key (a string) to a number.
+ const due = Number(key);
+ // Break out of the loop if the next timer isn't due to fire yet.
+ if (Number(due) > now) {
+ nextTimerDue = due;
+ break;
+ }
+ // Get the list of timers that have this due time, then drop it.
+ const list = dueMap[key];
+ delete dueMap[key];
+ // Fire all the timers in the list.
+ for (const timer of list) {
+ // With the list dropped, the timer is no longer scheduled.
+ timer.scheduled = false;
+ // Place the callback on the microtask queue.
+ Promise.resolve(timer).then(fire);
+ }
+ }
+ // Update the global alarm to go off when the first-up timer that hasn't fired
+ // yet is due.
+ setGlobalTimeout(nextTimerDue, now);
+}
+
+function setTimer<Args extends Array<unknown>>(
+ cb: (...args: Args) => void,
delay: number,
- // tslint:disable-next-line:no-any
- ...args: any[]
+ args: Args,
+ repeat: boolean
): number {
- const id = nextTimerId++;
- startTimer(id, cb, delay, false, args);
- return id;
+ // If any `args` were provided (which is uncommon), bind them to the callback.
+ const callback: () => void = args.length === 0 ? cb : cb.bind(null, ...args);
+ // In the browser, the delay value must be coercable to an integer between 0
+ // and INT32_MAX. Any other value will cause the timer to fire immediately.
+ // We emulate this behavior.
+ const now = getTime();
+ delay = Math.max(0, delay | 0);
+ // Create a new, unscheduled timer object.
+ const timer = {
+ id: nextTimerId++,
+ callback,
+ args,
+ delay,
+ due: now + delay,
+ repeat,
+ scheduled: false
+ };
+ // Register the timer's existence in the id-to-timer map.
+ idMap.set(timer.id, timer);
+ // Schedule the timer in the due table.
+ schedule(timer, now);
+ return timer.id;
}
-export function setInterval(
- cb: TimerCallback,
+export function setTimeout<Args extends Array<unknown>>(
+ cb: (...args: Args) => void,
delay: number,
- // tslint:disable-next-line:no-any
- ...args: any[]
+ ...args: Args
): number {
- const id = nextTimerId++;
- startTimer(id, cb, delay, true, args);
- return id;
+ return setTimer(cb, delay, args, false);
}
-export function clearTimer(id: number) {
- const builder = new flatbuffers.Builder();
- fbs.TimerClear.startTimerClear(builder);
- fbs.TimerClear.addId(builder, id);
- const msg = fbs.TimerClear.endTimerClear(builder);
- const res = sendSync(builder, fbs.Any.TimerClear, msg);
- assert(res == null);
+export function setInterval<Args extends Array<unknown>>(
+ cb: (...args: Args) => void,
+ delay: number,
+ ...args: Args
+): number {
+ return setTimer(cb, delay, args, true);
+}
+
+export function clearTimer(id: number): void {
+ const timer = idMap.get(id);
+ if (timer === undefined) {
+ // Timer doesn't exist any more or never existed. This is not an error.
+ return;
+ }
+ // Unschedule the timer if it is currently scheduled, and forget about it.
+ unschedule(timer);
+ idMap.delete(timer.id);
}
diff --git a/js/timers_test.ts b/js/timers_test.ts
index e5fe2d478..af172e976 100644
--- a/js/timers_test.ts
+++ b/js/timers_test.ts
@@ -1,5 +1,4 @@
import { test, assertEqual } from "./test_util.ts";
-import { setGlobalTimeout } from "deno";
function deferred() {
let resolve;
@@ -96,7 +95,3 @@ test(async function intervalCancelInvalidSilentFail() {
// Should silently fail (no panic)
clearInterval(2147483647);
});
-
-test(async function SetGlobalTimeoutSmoke() {
- setGlobalTimeout(50);
-});
diff --git a/src/handlers.rs b/src/handlers.rs
index c27543d33..bb2d79e69 100644
--- a/src/handlers.rs
+++ b/src/handlers.rs
@@ -5,6 +5,7 @@ use errors::DenoError;
use errors::DenoResult;
use fs as deno_fs;
use isolate::Buf;
+use isolate::Isolate;
use isolate::IsolateState;
use isolate::Op;
use msg;
@@ -47,7 +48,7 @@ fn empty_buf() -> Buf {
}
pub fn msg_from_js(
- state: Arc<IsolateState>,
+ isolate: &mut Isolate,
control: &[u8],
data: &'static mut [u8],
) -> (bool, Box<Op>) {
@@ -55,38 +56,47 @@ pub fn msg_from_js(
let is_sync = base.sync();
let msg_type = base.msg_type();
let cmd_id = base.cmd_id();
- let handler: Handler = match msg_type {
- msg::Any::Start => handle_start,
- msg::Any::CodeFetch => handle_code_fetch,
- msg::Any::CodeCache => handle_code_cache,
- msg::Any::SetTimeout => handle_set_timeout,
- msg::Any::Environ => handle_env,
- msg::Any::FetchReq => handle_fetch_req,
- msg::Any::TimerStart => handle_timer_start,
- msg::Any::TimerClear => handle_timer_clear,
- msg::Any::MakeTempDir => handle_make_temp_dir,
- msg::Any::Mkdir => handle_mkdir,
- msg::Any::Open => handle_open,
- msg::Any::Read => handle_read,
- msg::Any::Write => handle_write,
- msg::Any::Remove => handle_remove,
- msg::Any::ReadFile => handle_read_file,
- msg::Any::Rename => handle_rename,
- msg::Any::Readlink => handle_read_link,
- msg::Any::Symlink => handle_symlink,
- msg::Any::SetEnv => handle_set_env,
- msg::Any::Stat => handle_stat,
- msg::Any::Truncate => handle_truncate,
- msg::Any::WriteFile => handle_write_file,
- msg::Any::Exit => handle_exit,
- msg::Any::CopyFile => handle_copy_file,
- _ => panic!(format!(
- "Unhandled message {}",
- msg::enum_name_any(msg_type)
- )),
+
+ let op: Box<Op> = if msg_type == msg::Any::SetTimeout {
+ // SetTimeout is an exceptional op: the global timeout field is part of the
+ // Isolate state (not the IsolateState state) and it must be updated on the
+ // main thread.
+ assert_eq!(is_sync, true);
+ handle_set_timeout(isolate, &base, data)
+ } else {
+ // Handle regular ops.
+ let handler: Handler = match msg_type {
+ msg::Any::Start => handle_start,
+ msg::Any::CodeFetch => handle_code_fetch,
+ msg::Any::CodeCache => handle_code_cache,
+ msg::Any::Environ => handle_env,
+ msg::Any::FetchReq => handle_fetch_req,
+ msg::Any::TimerStart => handle_timer_start,
+ msg::Any::TimerClear => handle_timer_clear,
+ msg::Any::MakeTempDir => handle_make_temp_dir,
+ msg::Any::Mkdir => handle_mkdir,
+ msg::Any::Open => handle_open,
+ msg::Any::Read => handle_read,
+ msg::Any::Write => handle_write,
+ msg::Any::Remove => handle_remove,
+ msg::Any::ReadFile => handle_read_file,
+ msg::Any::Rename => handle_rename,
+ msg::Any::Readlink => handle_read_link,
+ msg::Any::Symlink => handle_symlink,
+ msg::Any::SetEnv => handle_set_env,
+ msg::Any::Stat => handle_stat,
+ msg::Any::Truncate => handle_truncate,
+ msg::Any::WriteFile => handle_write_file,
+ msg::Any::Exit => handle_exit,
+ msg::Any::CopyFile => handle_copy_file,
+ _ => panic!(format!(
+ "Unhandled message {}",
+ msg::enum_name_any(msg_type)
+ )),
+ };
+ handler(isolate.state.clone(), &base, data)
};
- let op: Box<Op> = handler(state.clone(), &base, data);
let boxed_op = Box::new(
op.or_else(move |err: DenoError| -> DenoResult<Buf> {
debug!("op err {}", err);
@@ -274,16 +284,18 @@ fn handle_code_cache(
}
fn handle_set_timeout(
- state: Arc<IsolateState>,
+ isolate: &mut Isolate,
base: &msg::Base,
data: &'static mut [u8],
) -> Box<Op> {
assert_eq!(data.len(), 0);
let msg = base.msg_as_set_timeout().unwrap();
- let val = msg.timeout() as isize;
- state
- .timeout
- .swap(val, std::sync::atomic::Ordering::Relaxed);
+ let val = msg.timeout() as i64;
+ isolate.timeout_due = if val >= 0 {
+ Some(Instant::now() + Duration::from_millis(val as u64))
+ } else {
+ None
+ };
ok_future(empty_buf())
}
diff --git a/src/isolate.rs b/src/isolate.rs
index 8dfd4204a..bdd1ca956 100644
--- a/src/isolate.rs
+++ b/src/isolate.rs
@@ -16,10 +16,11 @@ use std;
use std::collections::HashMap;
use std::ffi::CStr;
use std::ffi::CString;
-use std::sync::atomic::AtomicIsize;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
+use std::time::Duration;
+use std::time::Instant;
use tokio;
use tokio_util;
@@ -37,7 +38,7 @@ pub type Op = Future<Item = Buf, Error = DenoError> + Send;
// Returns (is_sync, op)
pub type Dispatch =
- fn(state: Arc<IsolateState>, buf: &[u8], data_buf: &'static mut [u8])
+ fn(isolate: &mut Isolate, buf: &[u8], data_buf: &'static mut [u8])
-> (bool, Box<Op>);
pub struct Isolate {
@@ -45,13 +46,13 @@ pub struct Isolate {
dispatch: Dispatch,
rx: mpsc::Receiver<(i32, Buf)>,
ntasks: i32,
+ pub timeout_due: Option<Instant>,
pub state: Arc<IsolateState>,
}
// Isolate cannot be passed between threads but IsolateState can. So any state that
// needs to be accessed outside the main V8 thread should be inside IsolateState.
pub struct IsolateState {
- pub timeout: AtomicIsize,
pub dir: deno_dir::DenoDir,
pub timers: Mutex<HashMap<u32, futures::sync::oneshot::Sender<()>>>,
pub argv: Vec<String>,
@@ -88,8 +89,8 @@ impl Isolate {
dispatch,
rx,
ntasks: 0,
+ timeout_due: None,
state: Arc::new(IsolateState {
- timeout: AtomicIsize::new(-1),
dir: deno_dir::DenoDir::new(flags.reload, None).unwrap(),
timers: Mutex::new(HashMap::new()),
argv: argv_rest,
@@ -139,17 +140,54 @@ impl Isolate {
unsafe { libdeno::deno_respond(self.ptr, req_id, buf.into()) }
}
+ fn complete_op(&mut self, req_id: i32, buf: Buf) {
+ // Receiving a message on rx exactly corresponds to an async task
+ // completing.
+ self.ntasks_decrement();
+ // Call into JS with the buf.
+ self.respond(req_id, buf);
+ }
+
+ fn timeout(&self) {
+ let dummy_buf = libdeno::deno_buf {
+ alloc_ptr: 0 as *mut u8,
+ alloc_len: 0,
+ data_ptr: 0 as *mut u8,
+ data_len: 0,
+ };
+ unsafe { libdeno::deno_respond(self.ptr, -1, dummy_buf) }
+ }
+
// TODO Use Park abstraction? Note at time of writing Tokio default runtime
// does not have new_with_park().
pub fn event_loop(&mut self) {
// Main thread event loop.
while !self.is_idle() {
- let (req_id, buf) = self.rx.recv().unwrap();
- // Receiving a message on rx exactly corresponds to an async task
- // completing.
- self.ntasks_decrement();
- // Call into JS with the buf.
- self.respond(req_id, buf);
+ // Ideally, mpsc::Receiver would have a receive method that takes a optional
+ // timeout. But it doesn't so we need all this duplicate code.
+ match self.timeout_due {
+ Some(due) => {
+ // Subtracting two Instants causes a panic if the resulting duration
+ // would become negative. Avoid this.
+ let now = Instant::now();
+ let timeout = if due > now {
+ due - now
+ } else {
+ Duration::new(0, 0)
+ };
+ // TODO: use recv_deadline() instead of recv_timeout() when this
+ // feature becomes stable/available.
+ match self.rx.recv_timeout(timeout) {
+ Ok((req_id, buf)) => self.complete_op(req_id, buf),
+ Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(),
+ Err(e) => panic!("mpsc::Receiver::recv_timeout() failed: {:?}", e),
+ }
+ }
+ None => match self.rx.recv() {
+ Ok((req_id, buf)) => self.complete_op(req_id, buf),
+ Err(e) => panic!("mpsc::Receiver::recv() failed: {:?}", e),
+ },
+ };
}
}
@@ -164,7 +202,7 @@ impl Isolate {
}
fn is_idle(&self) -> bool {
- self.ntasks == 0
+ self.ntasks == 0 && self.timeout_due.is_none()
}
}
@@ -212,8 +250,7 @@ extern "C" fn pre_dispatch(
let isolate = Isolate::from_c(d);
let dispatch = isolate.dispatch;
- let (is_sync, op) =
- dispatch(isolate.state.clone(), control_slice, data_slice);
+ let (is_sync, op) = dispatch(isolate, control_slice, data_slice);
if is_sync {
// Execute op synchronously.
@@ -258,7 +295,7 @@ mod tests {
}
fn unreachable_dispatch(
- _state: Arc<IsolateState>,
+ _isolate: &mut Isolate,
_control: &[u8],
_data: &'static mut [u8],
) -> (bool, Box<Op>) {
@@ -289,7 +326,7 @@ mod tests {
}
fn dispatch_sync(
- _state: Arc<IsolateState>,
+ _isolate: &mut Isolate,
control: &[u8],
data: &'static mut [u8],
) -> (bool, Box<Op>) {
diff --git a/src/msg.fbs b/src/msg.fbs
index b9989c330..68de166ce 100644
--- a/src/msg.fbs
+++ b/src/msg.fbs
@@ -128,7 +128,7 @@ table CodeCache {
}
table SetTimeout {
- timeout: int;
+ timeout: double;
}
table Exit {