diff options
Diffstat (limited to 'core/isolate.rs')
-rw-r--r-- | core/isolate.rs | 118 |
1 files changed, 85 insertions, 33 deletions
diff --git a/core/isolate.rs b/core/isolate.rs index 14e1b88aa..ae14c0040 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -21,21 +21,29 @@ use futures::Async::*; use futures::Future; use futures::Poll; use libc::c_char; +use libc::c_int; use libc::c_void; use std::ffi::CStr; use std::ffi::CString; use std::ptr::null; +use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::{Arc, Mutex, Once, ONCE_INIT}; pub type Buf = Box<[u8]>; -pub type OpAsyncFuture = Box<dyn Future<Item = Buf, Error = ()> + Send>; +pub type OpAsyncFuture<I, E> = Box<dyn Future<Item = I, Error = E> + Send>; -pub enum Op { +pub enum Op<E> { Sync(Buf), - Async(OpAsyncFuture), + Async(OpAsyncFuture<Buf, E>), } +pub type CoreError = (); + +type CoreOpAsyncFuture = OpAsyncFuture<(c_int, Buf), CoreError>; + +pub type CoreOp = Op<CoreError>; + /// Stores a script used to initalize a Isolate pub struct Script<'a> { pub source: &'a str, @@ -68,7 +76,9 @@ pub enum StartupData<'a> { None, } -type DispatchFn = Fn(&[u8], Option<PinnedBuf>) -> Op; +pub type OpResult<E> = Result<Op<E>, E>; + +type CoreDispatchFn = Fn(&[u8], Option<PinnedBuf>) -> CoreOp; pub type DynImportFuture = Box<dyn Future<Item = deno_mod, Error = ()> + Send>; type DynImportFn = Fn(&str, &str) -> DynImportFuture; @@ -93,6 +103,12 @@ impl Future for DynImport { } } +enum ResponseData { + None, + Buffer(deno_buf), + PromiseId(c_int), +} + /// A single execution context of JavaScript. Corresponds roughly to the "Web /// Worker" concept in the DOM. An Isolate is a Future that can be used with /// Tokio. The Isolate future complete when there is an error or when all @@ -104,14 +120,15 @@ impl Future for DynImport { pub struct Isolate { libdeno_isolate: *const libdeno::isolate, shared_libdeno_isolate: Arc<Mutex<Option<*const libdeno::isolate>>>, - dispatch: Option<Arc<DispatchFn>>, + dispatch: Option<Arc<CoreDispatchFn>>, dyn_import: Option<Arc<DynImportFn>>, needs_init: bool, shared: SharedQueue, - pending_ops: FuturesUnordered<OpAsyncFuture>, + pending_ops: FuturesUnordered<CoreOpAsyncFuture>, pending_dyn_imports: FuturesUnordered<DynImport>, have_unpolled_ops: bool, startup_script: Option<OwnedScript>, + next_promise_id: AtomicI32, } unsafe impl Send for Isolate {} @@ -176,6 +193,7 @@ impl Isolate { have_unpolled_ops: false, pending_dyn_imports: FuturesUnordered::new(), startup_script, + next_promise_id: AtomicI32::new(1), } } @@ -184,7 +202,7 @@ impl Isolate { /// corresponds to the second argument of Deno.core.dispatch(). pub fn set_dispatch<F>(&mut self, f: F) where - F: Fn(&[u8], Option<PinnedBuf>) -> Op + Send + Sync + 'static, + F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static, { self.dispatch = Some(Arc::new(f)); } @@ -239,6 +257,10 @@ impl Isolate { } } + pub fn get_next_promise_id(&self) -> i32 { + self.next_promise_id.fetch_add(1, Ordering::SeqCst) + } + extern "C" fn pre_dispatch( user_data: *mut c_void, control_argv0: deno_buf, @@ -279,9 +301,17 @@ impl Isolate { // return value. // TODO(ry) check that if JSError thrown during respond(), that it will be // picked up. - let _ = isolate.respond(Some(&buf)); + let _ = + isolate.respond(ResponseData::Buffer(deno_buf::from(buf.as_ref()))); } Op::Async(fut) => { + let promise_id = isolate.get_next_promise_id(); + let _ = isolate.respond(ResponseData::PromiseId(promise_id)); + let fut = Box::new(fut.and_then( + move |buf| -> Result<(c_int, Buf), CoreError> { + Ok((promise_id, buf)) + }, + )); isolate.pending_ops.push(fut); isolate.have_unpolled_ops = true; } @@ -340,14 +370,34 @@ impl Isolate { } } - fn respond(&mut self, maybe_buf: Option<&[u8]>) -> Result<(), JSError> { - let buf = match maybe_buf { - None => deno_buf::empty(), - Some(r) => deno_buf::from(r), + // the result type is a placeholder for a more specific enum type + fn respond(&mut self, data: ResponseData) -> Result<(), JSError> { + match data { + ResponseData::PromiseId(pid) => unsafe { + libdeno::deno_respond( + self.libdeno_isolate, + self.as_raw_ptr(), + deno_buf::empty(), + &pid, + ) + }, + ResponseData::Buffer(r) => unsafe { + libdeno::deno_respond( + self.libdeno_isolate, + self.as_raw_ptr(), + r, + null(), + ) + }, + ResponseData::None => unsafe { + libdeno::deno_respond( + self.libdeno_isolate, + self.as_raw_ptr(), + deno_buf::empty(), + null(), + ) + }, }; - unsafe { - libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), buf) - } if let Some(err) = self.last_exception() { Err(err) } else { @@ -525,7 +575,7 @@ impl Future for Isolate { self.shared_init(); - let mut overflow_response: Option<Buf> = None; + let mut overflow_response: Option<(c_int, Buf)> = None; loop { // If there are any pending dyn_import futures, do those first. @@ -546,13 +596,13 @@ impl Future for Isolate { Err(_) => panic!("unexpected op error"), Ok(Ready(None)) => break, Ok(NotReady) => break, - Ok(Ready(Some(buf))) => { - let successful_push = self.shared.push(&buf); + Ok(Ready(Some(op))) => { + let successful_push = self.shared.push(op.0, &op.1); if !successful_push { // If we couldn't push the response to the shared queue, because // there wasn't enough size, we will return the buffer via the // legacy route, using the argument of deno_respond. - overflow_response = Some(buf); + overflow_response = Some(op); break; } } @@ -560,14 +610,16 @@ impl Future for Isolate { } if self.shared.size() > 0 { - self.respond(None)?; + self.respond(ResponseData::None)?; // The other side should have shifted off all the messages. assert_eq!(self.shared.size(), 0); } if overflow_response.is_some() { - let buf = overflow_response.take().unwrap(); - self.respond(Some(&buf))?; + let op = overflow_response.take().unwrap(); + let promise_id_bytes = op.0.to_be_bytes(); + let buf: Buf = [&promise_id_bytes, &op.1[..]].concat().into(); + self.respond(ResponseData::Buffer(deno_buf::from(buf.as_ref())))?; } self.check_promise_errors(); @@ -664,7 +716,7 @@ pub mod tests { let dispatch_count_ = dispatch_count.clone(); let mut isolate = Isolate::new(StartupData::None, false); - isolate.set_dispatch(move |control, _| -> Op { + isolate.set_dispatch(move |control, _| -> CoreOp { dispatch_count_.fetch_add(1, Ordering::Relaxed); match mode { Mode::AsyncImmediate => { @@ -834,7 +886,7 @@ pub mod tests { "setup2.js", r#" let nrecv = 0; - Deno.core.setAsyncHandler((buf) => { + Deno.core.setAsyncHandler((promiseId, buf) => { assert(buf.byteLength === 1); assert(buf[0] === 43); nrecv++; @@ -1025,7 +1077,7 @@ pub mod tests { "overflow_req_sync.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { asyncRecv++ }); + Deno.core.setAsyncHandler((promiseId, buf) => { asyncRecv++ }); // Large message that will overflow the shared space. let control = new Uint8Array(100 * 1024 * 1024); let response = Deno.core.dispatch(control); @@ -1047,7 +1099,7 @@ pub mod tests { "overflow_res_sync.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { asyncRecv++ }); + Deno.core.setAsyncHandler((promiseId, buf) => { asyncRecv++ }); // Large message that will overflow the shared space. let control = new Uint8Array([42]); let response = Deno.core.dispatch(control); @@ -1068,7 +1120,7 @@ pub mod tests { "overflow_req_async.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { + Deno.core.setAsyncHandler((cmdId, buf) => { assert(buf.byteLength === 1); assert(buf[0] === 43); asyncRecv++; @@ -1076,8 +1128,8 @@ pub mod tests { // Large message that will overflow the shared space. let control = new Uint8Array(100 * 1024 * 1024); let response = Deno.core.dispatch(control); - // Async messages always have null response. - assert(response == null); + // Async messages always have number type response. + assert(typeof response == "number"); assert(asyncRecv == 0); "#, )); @@ -1097,7 +1149,7 @@ pub mod tests { "overflow_res_async.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { + Deno.core.setAsyncHandler((cmdId, buf) => { assert(buf.byteLength === 100 * 1024 * 1024); assert(buf[0] === 4); asyncRecv++; @@ -1105,7 +1157,7 @@ pub mod tests { // Large message that will overflow the shared space. let control = new Uint8Array([42]); let response = Deno.core.dispatch(control); - assert(response == null); + assert(typeof response == "number"); assert(asyncRecv == 0); "#, )); @@ -1125,7 +1177,7 @@ pub mod tests { "overflow_res_multiple_dispatch_async.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { + Deno.core.setAsyncHandler((cmdId, buf) => { assert(buf.byteLength === 100 * 1024 * 1024); assert(buf[0] === 4); asyncRecv++; @@ -1133,7 +1185,7 @@ pub mod tests { // Large message that will overflow the shared space. let control = new Uint8Array([42]); let response = Deno.core.dispatch(control); - assert(response == null); + assert(typeof response == "number"); assert(asyncRecv == 0); // Dispatch another message to verify that pending ops // are done even if shared space overflows |