diff options
Diffstat (limited to 'core/lib.rs')
-rw-r--r-- | core/lib.rs | 364 |
1 files changed, 364 insertions, 0 deletions
diff --git a/core/lib.rs b/core/lib.rs new file mode 100644 index 000000000..d13339ee5 --- /dev/null +++ b/core/lib.rs @@ -0,0 +1,364 @@ +#[macro_use] +extern crate log; +extern crate futures; +extern crate libc; + +mod js_errors; +mod libdeno; +mod shared; + +pub use crate::js_errors::JSError; +pub use crate::libdeno::deno_buf; +pub use crate::shared::*; +use futures::Async; +use futures::Future; +use futures::Poll; +use libc::c_void; +use std::collections::HashMap; +use std::ffi::CStr; +use std::ffi::CString; +use std::sync::{Once, ONCE_INIT}; + +pub struct Isolate { + libdeno_isolate: *const libdeno::isolate, + pending_ops: HashMap<i32, PendingOp>, // promise_id -> op + polled_recently: bool, + recv_cb: RecvCallback, + + pub shared: Shared, + pub test_send_counter: u32, // TODO only used for testing- REMOVE. +} + +pub type RecvCallback = fn(isolate: &mut Isolate, zero_copy_buf: deno_buf); + +pub const NUM_RECORDS: usize = 100; + +// TODO rename to AsyncResult +pub struct AsyncResult { + pub result: i32, +} + +pub type Op = dyn Future<Item = AsyncResult, Error = std::io::Error> + Send; + +struct PendingOp { + op: Box<Op>, + polled_recently: bool, + zero_copy_id: usize, // non-zero if associated zero-copy buffer. +} + +static DENO_INIT: Once = ONCE_INIT; + +unsafe impl Send for Isolate {} + +impl Isolate { + pub fn new(recv_cb: RecvCallback) -> Self { + DENO_INIT.call_once(|| { + unsafe { libdeno::deno_init() }; + }); + + // Allocate unmanaged memory for the shared buffer by creating a Vec<u8>, + // grabbing the raw pointer, and then leaking the Vec so it is never freed. + let mut shared = Shared::new(); + let shared_deno_buf = shared.as_deno_buf(); + + let config = libdeno::deno_config { + will_snapshot: 0, + load_snapshot: deno_buf::empty(), // TODO + shared: shared_deno_buf, + recv_cb: pre_dispatch, + }; + let libdeno_isolate = unsafe { libdeno::deno_new(config) }; + + Self { + pending_ops: HashMap::new(), + polled_recently: false, + libdeno_isolate, + test_send_counter: 0, + recv_cb, + shared, + } + } + + fn zero_copy_release(&self, zero_copy_id: usize) { + unsafe { + libdeno::deno_zero_copy_release(self.libdeno_isolate, zero_copy_id) + } + } + + pub fn add_op( + self: &mut Self, + promise_id: i32, + op: Box<Op>, + zero_copy_id: usize, + ) { + debug!("add_op {}", zero_copy_id); + self.pending_ops.insert( + promise_id, + PendingOp { + op, + polled_recently: false, + zero_copy_id, + }, + ); + self.polled_recently = false; + } + + #[inline] + pub unsafe fn from_raw_ptr<'a>(ptr: *const c_void) -> &'a mut Self { + let ptr = ptr as *mut _; + &mut *ptr + } + + #[inline] + pub fn as_raw_ptr(&self) -> *const c_void { + self as *const _ as *const c_void + } + + pub fn execute( + &self, + js_filename: &str, + js_source: &str, + ) -> Result<(), JSError> { + let filename = CString::new(js_filename).unwrap(); + let source = CString::new(js_source).unwrap(); + unsafe { + libdeno::deno_execute( + self.libdeno_isolate, + self.as_raw_ptr(), + filename.as_ptr(), + source.as_ptr(), + ) + }; + if let Some(err) = self.last_exception() { + return Err(err); + } + Ok(()) + } + + pub fn last_exception(&self) -> Option<JSError> { + let ptr = unsafe { libdeno::deno_last_exception(self.libdeno_isolate) }; + if ptr.is_null() { + None + } else { + let cstr = unsafe { CStr::from_ptr(ptr) }; + let v8_exception = cstr.to_str().unwrap(); + debug!("v8_exception\n{}\n", v8_exception); + let js_error = JSError::from_v8_exception(v8_exception).unwrap(); + Some(js_error) + } + } + + fn check_promise_errors(&self) { + unsafe { + libdeno::deno_check_promise_errors(self.libdeno_isolate); + } + } + + fn respond(&mut self) -> Result<(), JSError> { + let buf = deno_buf::empty(); + unsafe { + libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), buf) + } + if let Some(err) = self.last_exception() { + Err(err) + } else { + Ok(()) + } + } +} + +struct LockerScope { + libdeno_isolate: *const libdeno::isolate, +} + +impl LockerScope { + fn new(isolate: &Isolate) -> LockerScope { + let libdeno_isolate = isolate.libdeno_isolate; + unsafe { libdeno::deno_lock(libdeno_isolate) } + LockerScope { libdeno_isolate } + } +} + +impl Drop for LockerScope { + fn drop(&mut self) { + unsafe { libdeno::deno_unlock(self.libdeno_isolate) } + } +} + +impl Future for Isolate { + type Item = (); + type Error = JSError; + + fn poll(&mut self) -> Poll<(), JSError> { + // Lock the current thread for V8. + let _locker = LockerScope::new(self); + + // Clear + self.polled_recently = false; + for (_, pending) in self.pending_ops.iter_mut() { + pending.polled_recently = false; + } + + while !self.polled_recently { + let mut complete = HashMap::<i32, AsyncResult>::new(); + + self.polled_recently = true; + for (promise_id, pending) in self.pending_ops.iter_mut() { + // Do not call poll on futures we've already polled this turn. + if pending.polled_recently { + continue; + } + pending.polled_recently = true; + + let promise_id = *promise_id; + let op = &mut pending.op; + match op.poll() { + Err(op_err) => { + eprintln!("op err {:?}", op_err); + complete.insert(promise_id, AsyncResult { result: -1 }); + debug!("pending op {} complete err", promise_id); + } + Ok(Async::Ready(async_result)) => { + complete.insert(promise_id, async_result); + debug!("pending op {} complete ready", promise_id); + } + Ok(Async::NotReady) => { + debug!("pending op {} not ready", promise_id); + continue; + } + } + } + + self.shared.set_num_records(complete.len() as i32); + if complete.len() > 0 { + // self.zero_copy_release() and self.respond() need Locker. + let mut i = 0; + for (promise_id, async_result) in complete.iter_mut() { + let pending = self.pending_ops.remove(promise_id).unwrap(); + + if pending.zero_copy_id > 0 { + self.zero_copy_release(pending.zero_copy_id); + } + + self + .shared + .set_record(i, RECORD_OFFSET_PROMISE_ID, *promise_id); + self + .shared + .set_record(i, RECORD_OFFSET_RESULT, async_result.result); + i += 1; + } + self.respond()?; + } + } + + self.check_promise_errors(); + if let Some(err) = self.last_exception() { + return Err(err); + } + + // We're idle if pending_ops is empty. + if self.pending_ops.is_empty() { + Ok(futures::Async::Ready(())) + } else { + Ok(futures::Async::NotReady) + } + } +} + +extern "C" fn pre_dispatch( + user_data: *mut c_void, + control_buf: deno_buf, + zero_copy_buf: deno_buf, +) { + let isolate = unsafe { Isolate::from_raw_ptr(user_data) }; + assert_eq!(control_buf.len(), 0); + (isolate.recv_cb)(isolate, zero_copy_buf); +} + +#[cfg(test)] +mod tests { + use super::*; + + fn inc_counter(isolate: &mut Isolate, zero_copy_buf: deno_buf) { + assert_eq!(zero_copy_buf.len(), 0); + isolate.test_send_counter += 1; // TODO ideally store this in isolate.state? + } + + fn js_check(r: Result<(), JSError>) { + if let Err(e) = r { + panic!(e.to_string()); + } + } + + #[test] + fn test_execute() { + let isolate = Isolate::new(inc_counter); + js_check(isolate.execute( + "filename.js", + r#" + libdeno.send(); + async function main() { + libdeno.send(); + } + main(); + "#, + )); + // We expect that main is executed even tho we didn't poll. + assert_eq!(isolate.test_send_counter, 2); + } + + fn async_immediate(isolate: &mut Isolate, zero_copy_buf: deno_buf) { + assert_eq!(zero_copy_buf.len(), 0); + isolate.test_send_counter += 1; // TODO ideally store this in isolate.state? + + let promise_id = 0; + let op = Box::new(futures::future::ok(AsyncResult { result: 0 })); + isolate.add_op(promise_id, op, zero_copy_buf.zero_copy_id); + } + + #[test] + fn test_poll_async_immediate_ops() { + let mut isolate = Isolate::new(async_immediate); + js_check(isolate.execute( + "setup.js", + r#" + let nrecv = 0; + libdeno.recv(() => { + nrecv++; + }); + function assertEq(actual, expected) { + if (expected != actual) { + throw Error(`actual ${actual} expected ${expected} `); + } + } + "#, + )); + assert_eq!(isolate.test_send_counter, 0); + js_check(isolate.execute( + "check1.js", + r#" + assertEq(nrecv, 0); + libdeno.send(); + assertEq(nrecv, 0); + "#, + )); + assert_eq!(isolate.test_send_counter, 1); + assert_eq!(Ok(Async::Ready(())), isolate.poll()); + assert_eq!(isolate.test_send_counter, 1); + js_check(isolate.execute( + "check2.js", + r#" + assertEq(nrecv, 1); + libdeno.send(); + assertEq(nrecv, 1); + "#, + )); + assert_eq!(isolate.test_send_counter, 2); + assert_eq!(Ok(Async::Ready(())), isolate.poll()); + js_check(isolate.execute("check3.js", "assertEq(nrecv, 2)")); + assert_eq!(isolate.test_send_counter, 2); + // We are idle, so the next poll should be the last. + assert_eq!(Ok(Async::Ready(())), isolate.poll()); + } +} |