diff options
-rw-r--r-- | js/deno.ts | 1 | ||||
-rw-r--r-- | js/dispatch.ts | 34 | ||||
-rw-r--r-- | js/timers.ts | 268 | ||||
-rw-r--r-- | js/timers_test.ts | 5 | ||||
-rw-r--r-- | src/handlers.rs | 84 | ||||
-rw-r--r-- | src/isolate.rs | 67 | ||||
-rw-r--r-- | src/msg.fbs | 2 |
7 files changed, 317 insertions, 144 deletions
diff --git a/js/deno.ts b/js/deno.ts index 03f3d1a89..91f7ae1af 100644 --- a/js/deno.ts +++ b/js/deno.ts @@ -19,5 +19,4 @@ export { libdeno } from "./libdeno"; export { arch, platform } from "./platform"; export { trace } from "./trace"; export { truncateSync, truncate } from "./truncate"; -export { setGlobalTimeout } from "./timers"; export const args: string[] = []; diff --git a/js/dispatch.ts b/js/dispatch.ts index 9257be767..56d280ff9 100644 --- a/js/dispatch.ts +++ b/js/dispatch.ts @@ -9,19 +9,31 @@ import { maybePushTrace } from "./trace"; let nextCmdId = 0; const promiseTable = new Map<number, util.Resolvable<fbs.Base>>(); +let fireTimers: () => void; + +export function setFireTimersCallback(fn: () => void) { + fireTimers = fn; +} + export function handleAsyncMsgFromRust(ui8: Uint8Array) { - const bb = new flatbuffers.ByteBuffer(ui8); - const base = fbs.Base.getRootAsBase(bb); - const cmdId = base.cmdId(); - const promise = promiseTable.get(cmdId); - util.assert(promise != null, `Expecting promise in table. ${cmdId}`); - promiseTable.delete(cmdId); - const err = errors.maybeError(base); - if (err != null) { - promise!.reject(err); - } else { - promise!.resolve(base); + // If a the buffer is empty, recv() on the native side timed out and we + // did not receive a message. + if (ui8.length) { + const bb = new flatbuffers.ByteBuffer(ui8); + const base = fbs.Base.getRootAsBase(bb); + const cmdId = base.cmdId(); + const promise = promiseTable.get(cmdId); + util.assert(promise != null, `Expecting promise in table. ${cmdId}`); + promiseTable.delete(cmdId); + const err = errors.maybeError(base); + if (err != null) { + promise!.reject(err); + } else { + promise!.resolve(base); + } } + // Fire timers that have become runnable. + fireTimers(); } // @internal diff --git a/js/timers.ts b/js/timers.ts index 03badfd46..79aefb645 100644 --- a/js/timers.ts +++ b/js/timers.ts @@ -1,107 +1,225 @@ // Copyright 2018 the Deno authors. All rights reserved. MIT license. import { assert } from "./util"; -import * as util from "./util"; import * as fbs from "gen/msg_generated"; import { flatbuffers } from "flatbuffers"; -import { sendSync, sendAsync } from "./dispatch"; +import { sendSync, setFireTimersCallback } from "./dispatch"; -let nextTimerId = 1; - -// tslint:disable-next-line:no-any -export type TimerCallback = (...args: any[]) => void; +// Tell the dispatcher which function it should call to fire timers that are +// due. This is done using a callback because circular imports are disallowed. +setFireTimersCallback(fireTimers); interface Timer { id: number; - cb: TimerCallback; - interval: boolean; - // tslint:disable-next-line:no-any - args: any[]; - delay: number; // milliseconds + callback: () => void; + delay: number; + due: number; + repeat: boolean; + scheduled: boolean; +} + +// We'll subtract EPOCH every time we retrieve the time with Date.now(). This +// ensures that absolute time values stay below UINT32_MAX - 2, which is the +// maximum object key that EcmaScript considers "numerical". After running for +// about a month, this is no longer true, and Deno explodes. +// TODO(piscisaureus): fix that ^. +const EPOCH = Date.now(); +const APOCALYPS = 2 ** 32 - 2; + +let globalTimeoutDue: number | null = null; + +let nextTimerId = 1; +const idMap = new Map<number, Timer>(); +const dueMap: { [due: number]: Timer[] } = Object.create(null); + +function getTime() { + // TODO: use a monotonic clock. + const now = Date.now() - EPOCH; + assert(now >= 0 && now < APOCALYPS); + return now; } -export function setGlobalTimeout(timeout: number) { +function setGlobalTimeout(due: number | null, now: number) { + // Since JS and Rust don't use the same clock, pass the time to rust as a + // relative time value. On the Rust side we'll turn that into an absolute + // value again. + // Note that a negative time-out value stops the global timer. + let timeout; + if (due === null) { + timeout = -1; + } else { + timeout = due - now; + assert(timeout >= 0); + } + // Send message to the backend. const builder = new flatbuffers.Builder(); fbs.SetTimeout.startSetTimeout(builder); fbs.SetTimeout.addTimeout(builder, timeout); const msg = fbs.SetTimeout.endSetTimeout(builder); const res = sendSync(builder, fbs.Any.SetTimeout, msg); assert(res == null); + // Remember when when the global timer will fire. + globalTimeoutDue = due; } -function startTimer( - id: number, - cb: TimerCallback, - delay: number, - interval: boolean, - // tslint:disable-next-line:no-any - args: any[] -): void { - const timer: Timer = { - id, - interval, - delay, - args, - cb - }; - util.log("timers.ts startTimer"); - - // Send TimerStart message - const builder = new flatbuffers.Builder(); - fbs.TimerStart.startTimerStart(builder); - fbs.TimerStart.addId(builder, timer.id); - fbs.TimerStart.addDelay(builder, timer.delay); - const msg = fbs.TimerStart.endTimerStart(builder); +function schedule(timer: Timer, now: number) { + assert(!timer.scheduled); + assert(now <= timer.due); + // Find or create the list of timers that will fire at point-in-time `due`. + let list = dueMap[timer.due]; + if (list === undefined) { + list = dueMap[timer.due] = []; + } + // Append the newly scheduled timer to the list and mark it as scheduled. + list.push(timer); + timer.scheduled = true; + // If the new timer is scheduled to fire before any timer that existed before, + // update the global timeout to reflect this. + if (globalTimeoutDue === null || globalTimeoutDue > timer.due) { + setGlobalTimeout(timer.due, now); + } +} - sendAsync(builder, fbs.Any.TimerStart, msg).then( - baseRes => { - assert(fbs.Any.TimerReady === baseRes!.msgType()); - const msg = new fbs.TimerReady(); - assert(baseRes!.msg(msg) != null); - assert(msg.id() === timer.id); - if (msg.canceled()) { - util.log("timer canceled message"); - } else { - cb(...args); - if (interval) { - // TODO Faking setInterval with setTimeout. - // We need a new timer implementation, this is just a stopgap. - startTimer(id, cb, delay, true, args); - } +function unschedule(timer: Timer) { + if (!timer.scheduled) { + return; + } + // Find the list of timers that will fire at point-in-time `due`. + const list = dueMap[timer.due]; + if (list.length === 1) { + // Time timer is the only one in the list. Remove the entire list. + assert(list[0] === timer); + delete dueMap[timer.due]; + // If the unscheduled timer was 'next up', find when the next timer that + // still exists is due, and update the global alarm accordingly. + if (timer.due === globalTimeoutDue) { + let nextTimerDue: number | null = null; + for (const key in dueMap) { + nextTimerDue = Number(key); + break; } - }, - error => { - throw error; + setGlobalTimeout(nextTimerDue, getTime()); } - ); + } else { + // Multiple timers that are due at the same point in time. + // Remove this timer from the list. + const index = list.indexOf(timer); + assert(index > 0); + list.splice(index, 1); + } } -export function setTimeout( - cb: TimerCallback, +function fire(timer: Timer) { + // If the timer isn't found in the ID map, that means it has been cancelled + // between the timer firing and the promise callback (this function). + if (!idMap.has(timer.id)) { + return; + } + // Reschedule the timer if it is a repeating one, otherwise drop it. + if (!timer.repeat) { + // One-shot timer: remove the timer from this id-to-timer map. + idMap.delete(timer.id); + } else { + // Interval timer: compute when timer was supposed to fire next. + // However make sure to never schedule the next interval in the past. + const now = getTime(); + timer.due = Math.max(now, timer.due + timer.delay); + schedule(timer, now); + } + // Call the user callback. Intermediate assignment is to avoid leaking `this` + // to it, while also keeping the stack trace neat when it shows up in there. + const callback = timer.callback; + callback(); +} + +function fireTimers() { + const now = getTime(); + // Bail out if we're not expecting the global timer to fire (yet). + if (globalTimeoutDue === null || now < globalTimeoutDue) { + return; + } + // After firing the timers that are due now, this will hold the due time of + // the first timer that hasn't fired yet. + let nextTimerDue: number | null = null; + // Walk over the keys of the 'due' map. Since dueMap is actually a regular + // object and its keys are numerical and smaller than UINT32_MAX - 2, + // keys are iterated in ascending order. + for (const key in dueMap) { + // Convert the object key (a string) to a number. + const due = Number(key); + // Break out of the loop if the next timer isn't due to fire yet. + if (Number(due) > now) { + nextTimerDue = due; + break; + } + // Get the list of timers that have this due time, then drop it. + const list = dueMap[key]; + delete dueMap[key]; + // Fire all the timers in the list. + for (const timer of list) { + // With the list dropped, the timer is no longer scheduled. + timer.scheduled = false; + // Place the callback on the microtask queue. + Promise.resolve(timer).then(fire); + } + } + // Update the global alarm to go off when the first-up timer that hasn't fired + // yet is due. + setGlobalTimeout(nextTimerDue, now); +} + +function setTimer<Args extends Array<unknown>>( + cb: (...args: Args) => void, delay: number, - // tslint:disable-next-line:no-any - ...args: any[] + args: Args, + repeat: boolean ): number { - const id = nextTimerId++; - startTimer(id, cb, delay, false, args); - return id; + // If any `args` were provided (which is uncommon), bind them to the callback. + const callback: () => void = args.length === 0 ? cb : cb.bind(null, ...args); + // In the browser, the delay value must be coercable to an integer between 0 + // and INT32_MAX. Any other value will cause the timer to fire immediately. + // We emulate this behavior. + const now = getTime(); + delay = Math.max(0, delay | 0); + // Create a new, unscheduled timer object. + const timer = { + id: nextTimerId++, + callback, + args, + delay, + due: now + delay, + repeat, + scheduled: false + }; + // Register the timer's existence in the id-to-timer map. + idMap.set(timer.id, timer); + // Schedule the timer in the due table. + schedule(timer, now); + return timer.id; } -export function setInterval( - cb: TimerCallback, +export function setTimeout<Args extends Array<unknown>>( + cb: (...args: Args) => void, delay: number, - // tslint:disable-next-line:no-any - ...args: any[] + ...args: Args ): number { - const id = nextTimerId++; - startTimer(id, cb, delay, true, args); - return id; + return setTimer(cb, delay, args, false); } -export function clearTimer(id: number) { - const builder = new flatbuffers.Builder(); - fbs.TimerClear.startTimerClear(builder); - fbs.TimerClear.addId(builder, id); - const msg = fbs.TimerClear.endTimerClear(builder); - const res = sendSync(builder, fbs.Any.TimerClear, msg); - assert(res == null); +export function setInterval<Args extends Array<unknown>>( + cb: (...args: Args) => void, + delay: number, + ...args: Args +): number { + return setTimer(cb, delay, args, true); +} + +export function clearTimer(id: number): void { + const timer = idMap.get(id); + if (timer === undefined) { + // Timer doesn't exist any more or never existed. This is not an error. + return; + } + // Unschedule the timer if it is currently scheduled, and forget about it. + unschedule(timer); + idMap.delete(timer.id); } diff --git a/js/timers_test.ts b/js/timers_test.ts index e5fe2d478..af172e976 100644 --- a/js/timers_test.ts +++ b/js/timers_test.ts @@ -1,5 +1,4 @@ import { test, assertEqual } from "./test_util.ts"; -import { setGlobalTimeout } from "deno"; function deferred() { let resolve; @@ -96,7 +95,3 @@ test(async function intervalCancelInvalidSilentFail() { // Should silently fail (no panic) clearInterval(2147483647); }); - -test(async function SetGlobalTimeoutSmoke() { - setGlobalTimeout(50); -}); diff --git a/src/handlers.rs b/src/handlers.rs index c27543d33..bb2d79e69 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -5,6 +5,7 @@ use errors::DenoError; use errors::DenoResult; use fs as deno_fs; use isolate::Buf; +use isolate::Isolate; use isolate::IsolateState; use isolate::Op; use msg; @@ -47,7 +48,7 @@ fn empty_buf() -> Buf { } pub fn msg_from_js( - state: Arc<IsolateState>, + isolate: &mut Isolate, control: &[u8], data: &'static mut [u8], ) -> (bool, Box<Op>) { @@ -55,38 +56,47 @@ pub fn msg_from_js( let is_sync = base.sync(); let msg_type = base.msg_type(); let cmd_id = base.cmd_id(); - let handler: Handler = match msg_type { - msg::Any::Start => handle_start, - msg::Any::CodeFetch => handle_code_fetch, - msg::Any::CodeCache => handle_code_cache, - msg::Any::SetTimeout => handle_set_timeout, - msg::Any::Environ => handle_env, - msg::Any::FetchReq => handle_fetch_req, - msg::Any::TimerStart => handle_timer_start, - msg::Any::TimerClear => handle_timer_clear, - msg::Any::MakeTempDir => handle_make_temp_dir, - msg::Any::Mkdir => handle_mkdir, - msg::Any::Open => handle_open, - msg::Any::Read => handle_read, - msg::Any::Write => handle_write, - msg::Any::Remove => handle_remove, - msg::Any::ReadFile => handle_read_file, - msg::Any::Rename => handle_rename, - msg::Any::Readlink => handle_read_link, - msg::Any::Symlink => handle_symlink, - msg::Any::SetEnv => handle_set_env, - msg::Any::Stat => handle_stat, - msg::Any::Truncate => handle_truncate, - msg::Any::WriteFile => handle_write_file, - msg::Any::Exit => handle_exit, - msg::Any::CopyFile => handle_copy_file, - _ => panic!(format!( - "Unhandled message {}", - msg::enum_name_any(msg_type) - )), + + let op: Box<Op> = if msg_type == msg::Any::SetTimeout { + // SetTimeout is an exceptional op: the global timeout field is part of the + // Isolate state (not the IsolateState state) and it must be updated on the + // main thread. + assert_eq!(is_sync, true); + handle_set_timeout(isolate, &base, data) + } else { + // Handle regular ops. + let handler: Handler = match msg_type { + msg::Any::Start => handle_start, + msg::Any::CodeFetch => handle_code_fetch, + msg::Any::CodeCache => handle_code_cache, + msg::Any::Environ => handle_env, + msg::Any::FetchReq => handle_fetch_req, + msg::Any::TimerStart => handle_timer_start, + msg::Any::TimerClear => handle_timer_clear, + msg::Any::MakeTempDir => handle_make_temp_dir, + msg::Any::Mkdir => handle_mkdir, + msg::Any::Open => handle_open, + msg::Any::Read => handle_read, + msg::Any::Write => handle_write, + msg::Any::Remove => handle_remove, + msg::Any::ReadFile => handle_read_file, + msg::Any::Rename => handle_rename, + msg::Any::Readlink => handle_read_link, + msg::Any::Symlink => handle_symlink, + msg::Any::SetEnv => handle_set_env, + msg::Any::Stat => handle_stat, + msg::Any::Truncate => handle_truncate, + msg::Any::WriteFile => handle_write_file, + msg::Any::Exit => handle_exit, + msg::Any::CopyFile => handle_copy_file, + _ => panic!(format!( + "Unhandled message {}", + msg::enum_name_any(msg_type) + )), + }; + handler(isolate.state.clone(), &base, data) }; - let op: Box<Op> = handler(state.clone(), &base, data); let boxed_op = Box::new( op.or_else(move |err: DenoError| -> DenoResult<Buf> { debug!("op err {}", err); @@ -274,16 +284,18 @@ fn handle_code_cache( } fn handle_set_timeout( - state: Arc<IsolateState>, + isolate: &mut Isolate, base: &msg::Base, data: &'static mut [u8], ) -> Box<Op> { assert_eq!(data.len(), 0); let msg = base.msg_as_set_timeout().unwrap(); - let val = msg.timeout() as isize; - state - .timeout - .swap(val, std::sync::atomic::Ordering::Relaxed); + let val = msg.timeout() as i64; + isolate.timeout_due = if val >= 0 { + Some(Instant::now() + Duration::from_millis(val as u64)) + } else { + None + }; ok_future(empty_buf()) } diff --git a/src/isolate.rs b/src/isolate.rs index 8dfd4204a..bdd1ca956 100644 --- a/src/isolate.rs +++ b/src/isolate.rs @@ -16,10 +16,11 @@ use std; use std::collections::HashMap; use std::ffi::CStr; use std::ffi::CString; -use std::sync::atomic::AtomicIsize; use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; +use std::time::Duration; +use std::time::Instant; use tokio; use tokio_util; @@ -37,7 +38,7 @@ pub type Op = Future<Item = Buf, Error = DenoError> + Send; // Returns (is_sync, op) pub type Dispatch = - fn(state: Arc<IsolateState>, buf: &[u8], data_buf: &'static mut [u8]) + fn(isolate: &mut Isolate, buf: &[u8], data_buf: &'static mut [u8]) -> (bool, Box<Op>); pub struct Isolate { @@ -45,13 +46,13 @@ pub struct Isolate { dispatch: Dispatch, rx: mpsc::Receiver<(i32, Buf)>, ntasks: i32, + pub timeout_due: Option<Instant>, pub state: Arc<IsolateState>, } // Isolate cannot be passed between threads but IsolateState can. So any state that // needs to be accessed outside the main V8 thread should be inside IsolateState. pub struct IsolateState { - pub timeout: AtomicIsize, pub dir: deno_dir::DenoDir, pub timers: Mutex<HashMap<u32, futures::sync::oneshot::Sender<()>>>, pub argv: Vec<String>, @@ -88,8 +89,8 @@ impl Isolate { dispatch, rx, ntasks: 0, + timeout_due: None, state: Arc::new(IsolateState { - timeout: AtomicIsize::new(-1), dir: deno_dir::DenoDir::new(flags.reload, None).unwrap(), timers: Mutex::new(HashMap::new()), argv: argv_rest, @@ -139,17 +140,54 @@ impl Isolate { unsafe { libdeno::deno_respond(self.ptr, req_id, buf.into()) } } + fn complete_op(&mut self, req_id: i32, buf: Buf) { + // Receiving a message on rx exactly corresponds to an async task + // completing. + self.ntasks_decrement(); + // Call into JS with the buf. + self.respond(req_id, buf); + } + + fn timeout(&self) { + let dummy_buf = libdeno::deno_buf { + alloc_ptr: 0 as *mut u8, + alloc_len: 0, + data_ptr: 0 as *mut u8, + data_len: 0, + }; + unsafe { libdeno::deno_respond(self.ptr, -1, dummy_buf) } + } + // TODO Use Park abstraction? Note at time of writing Tokio default runtime // does not have new_with_park(). pub fn event_loop(&mut self) { // Main thread event loop. while !self.is_idle() { - let (req_id, buf) = self.rx.recv().unwrap(); - // Receiving a message on rx exactly corresponds to an async task - // completing. - self.ntasks_decrement(); - // Call into JS with the buf. - self.respond(req_id, buf); + // Ideally, mpsc::Receiver would have a receive method that takes a optional + // timeout. But it doesn't so we need all this duplicate code. + match self.timeout_due { + Some(due) => { + // Subtracting two Instants causes a panic if the resulting duration + // would become negative. Avoid this. + let now = Instant::now(); + let timeout = if due > now { + due - now + } else { + Duration::new(0, 0) + }; + // TODO: use recv_deadline() instead of recv_timeout() when this + // feature becomes stable/available. + match self.rx.recv_timeout(timeout) { + Ok((req_id, buf)) => self.complete_op(req_id, buf), + Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(), + Err(e) => panic!("mpsc::Receiver::recv_timeout() failed: {:?}", e), + } + } + None => match self.rx.recv() { + Ok((req_id, buf)) => self.complete_op(req_id, buf), + Err(e) => panic!("mpsc::Receiver::recv() failed: {:?}", e), + }, + }; } } @@ -164,7 +202,7 @@ impl Isolate { } fn is_idle(&self) -> bool { - self.ntasks == 0 + self.ntasks == 0 && self.timeout_due.is_none() } } @@ -212,8 +250,7 @@ extern "C" fn pre_dispatch( let isolate = Isolate::from_c(d); let dispatch = isolate.dispatch; - let (is_sync, op) = - dispatch(isolate.state.clone(), control_slice, data_slice); + let (is_sync, op) = dispatch(isolate, control_slice, data_slice); if is_sync { // Execute op synchronously. @@ -258,7 +295,7 @@ mod tests { } fn unreachable_dispatch( - _state: Arc<IsolateState>, + _isolate: &mut Isolate, _control: &[u8], _data: &'static mut [u8], ) -> (bool, Box<Op>) { @@ -289,7 +326,7 @@ mod tests { } fn dispatch_sync( - _state: Arc<IsolateState>, + _isolate: &mut Isolate, control: &[u8], data: &'static mut [u8], ) -> (bool, Box<Op>) { diff --git a/src/msg.fbs b/src/msg.fbs index b9989c330..68de166ce 100644 --- a/src/msg.fbs +++ b/src/msg.fbs @@ -128,7 +128,7 @@ table CodeCache { } table SetTimeout { - timeout: int; + timeout: double; } table Exit { |