diff options
Diffstat (limited to 'core/isolate.rs')
-rw-r--r-- | core/isolate.rs | 108 |
1 files changed, 62 insertions, 46 deletions
diff --git a/core/isolate.rs b/core/isolate.rs index 0f693ff92..d3ac4457e 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -12,6 +12,7 @@ use crate::libdeno::deno_buf; use crate::libdeno::deno_dyn_import_id; use crate::libdeno::deno_mod; use crate::libdeno::deno_pinned_buf; +use crate::libdeno::OpId; use crate::libdeno::PinnedBuf; use crate::libdeno::Snapshot1; use crate::libdeno::Snapshot2; @@ -33,6 +34,9 @@ pub type Buf = Box<[u8]>; pub type OpAsyncFuture<E> = Box<dyn Future<Item = Buf, Error = E> + Send>; +type PendingOpFuture = + Box<dyn Future<Item = (OpId, Buf), Error = CoreError> + Send>; + pub enum Op<E> { Sync(Buf), Async(OpAsyncFuture<E>), @@ -40,10 +44,13 @@ pub enum Op<E> { pub type CoreError = (); -type CoreOpAsyncFuture = OpAsyncFuture<CoreError>; - pub type CoreOp = Op<CoreError>; +pub type OpResult<E> = Result<Op<E>, E>; + +/// Args: op_id, control_buf, zero_copy_buf +type CoreDispatchFn = dyn Fn(OpId, &[u8], Option<PinnedBuf>) -> CoreOp; + /// Stores a script used to initalize a Isolate pub struct Script<'a> { pub source: &'a str, @@ -76,10 +83,6 @@ pub enum StartupData<'a> { None, } -pub type OpResult<E> = Result<Op<E>, E>; - -type CoreDispatchFn = dyn Fn(&[u8], Option<PinnedBuf>) -> CoreOp; - pub type DynImportFuture = Box<dyn Future<Item = deno_mod, Error = ErrBox> + Send>; type DynImportFn = dyn Fn(&str, &str) -> DynImportFuture; @@ -121,7 +124,7 @@ pub struct Isolate { js_error_create: Arc<JSErrorCreateFn>, needs_init: bool, shared: SharedQueue, - pending_ops: FuturesUnordered<CoreOpAsyncFuture>, + pending_ops: FuturesUnordered<PendingOpFuture>, pending_dyn_imports: FuturesUnordered<DynImport>, have_unpolled_ops: bool, startup_script: Option<OwnedScript>, @@ -198,7 +201,7 @@ impl Isolate { /// corresponds to the second argument of Deno.core.dispatch(). pub fn set_dispatch<F>(&mut self, f: F) where - F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static, + F: Fn(OpId, &[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static, { self.dispatch = Some(Arc::new(f)); } @@ -265,13 +268,14 @@ impl Isolate { extern "C" fn pre_dispatch( user_data: *mut c_void, - control_argv0: deno_buf, + op_id: OpId, + control_buf: deno_buf, zero_copy_buf: deno_pinned_buf, ) { let isolate = unsafe { Isolate::from_raw_ptr(user_data) }; let op = if let Some(ref f) = isolate.dispatch { - f(control_argv0.as_ref(), PinnedBuf::new(zero_copy_buf)) + f(op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf)) } else { panic!("isolate.dispatch not set") }; @@ -280,13 +284,17 @@ impl Isolate { match op { Op::Sync(buf) => { // For sync messages, we always return the response via Deno.core.send's - // return value. - // TODO(ry) check that if JSError thrown during respond(), that it will be - // picked up. - let _ = isolate.respond(Some(&buf)); + // return value. Sync messages ignore the op_id. + let op_id = 0; + isolate + .respond(Some((op_id, &buf))) + // Because this is a sync op, deno_respond() does not actually call + // into JavaScript. We should not get an error here. + .expect("unexpected error"); } Op::Async(fut) => { - isolate.pending_ops.push(fut); + let fut2 = fut.map(move |buf| (op_id, buf)); + isolate.pending_ops.push(Box::new(fut2)); isolate.have_unpolled_ops = true; } } @@ -347,13 +355,16 @@ impl Isolate { } } - fn respond(&mut self, maybe_buf: Option<&[u8]>) -> Result<(), ErrBox> { - let buf = match maybe_buf { - None => deno_buf::empty(), - Some(r) => deno_buf::from(r), + fn respond( + &mut self, + maybe_buf: Option<(OpId, &[u8])>, + ) -> Result<(), ErrBox> { + let (op_id, buf) = match maybe_buf { + None => (0, deno_buf::empty()), + Some((op_id, r)) => (op_id, deno_buf::from(r)), }; unsafe { - libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), buf) + libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), op_id, buf) } self.check_last_exception() } @@ -541,7 +552,7 @@ impl Future for Isolate { fn poll(&mut self) -> Poll<(), ErrBox> { self.shared_init(); - let mut overflow_response: Option<Buf> = None; + let mut overflow_response: Option<(OpId, Buf)> = None; loop { // If there are any pending dyn_import futures, do those first. @@ -567,13 +578,13 @@ impl Future for Isolate { Err(_) => panic!("unexpected op error"), Ok(Ready(None)) => break, Ok(NotReady) => break, - Ok(Ready(Some(buf))) => { - let successful_push = self.shared.push(&buf); + Ok(Ready(Some((op_id, buf)))) => { + let successful_push = self.shared.push(op_id, &buf); if !successful_push { // If we couldn't push the response to the shared queue, because // there wasn't enough size, we will return the buffer via the // legacy route, using the argument of deno_respond. - overflow_response = Some(buf); + overflow_response = Some((op_id, buf)); break; } } @@ -592,8 +603,8 @@ impl Future for Isolate { if overflow_response.is_some() { // Lock the current thread for V8. let locker = LockerScope::new(self.libdeno_isolate); - let buf = overflow_response.take().unwrap(); - self.respond(Some(&buf))?; + let (op_id, buf) = overflow_response.take().unwrap(); + self.respond(Some((op_id, &buf)))?; drop(locker); } @@ -633,10 +644,11 @@ impl IsolateHandle { } } -pub fn js_check(r: Result<(), ErrBox>) { +pub fn js_check<T>(r: Result<T, ErrBox>) -> T { if let Err(e) = r { panic!(e.to_string()); } + r.unwrap() } #[cfg(test)] @@ -689,7 +701,8 @@ pub mod tests { let dispatch_count_ = dispatch_count.clone(); let mut isolate = Isolate::new(StartupData::None, false); - isolate.set_dispatch(move |control, _| -> CoreOp { + isolate.set_dispatch(move |op_id, control, _| -> CoreOp { + println!("op_id {}", op_id); dispatch_count_.fetch_add(1, Ordering::Relaxed); match mode { Mode::AsyncImmediate => { @@ -749,9 +762,9 @@ pub mod tests { "filename.js", r#" let control = new Uint8Array([42]); - Deno.core.send(control); + Deno.core.send(42, control); async function main() { - Deno.core.send(control); + Deno.core.send(42, control); } main(); "#, @@ -770,7 +783,7 @@ pub mod tests { import { b } from 'b.js' if (b() != 'b') throw Error(); let control = new Uint8Array([42]); - Deno.core.send(control); + Deno.core.send(42, control); "#, ) .unwrap(); @@ -816,7 +829,7 @@ pub mod tests { "setup2.js", r#" let nrecv = 0; - Deno.core.setAsyncHandler((buf) => { + Deno.core.setAsyncHandler((opId, buf) => { nrecv++; }); "#, @@ -827,7 +840,7 @@ pub mod tests { r#" assert(nrecv == 0); let control = new Uint8Array([42]); - Deno.core.send(control); + Deno.core.send(42, control); assert(nrecv == 0); "#, )); @@ -838,7 +851,7 @@ pub mod tests { "check2.js", r#" assert(nrecv == 1); - Deno.core.send(control); + Deno.core.send(42, control); assert(nrecv == 1); "#, )); @@ -1016,10 +1029,10 @@ pub mod tests { "overflow_req_sync.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { asyncRecv++ }); + Deno.core.setAsyncHandler((opId, buf) => { asyncRecv++ }); // Large message that will overflow the shared space. let control = new Uint8Array(100 * 1024 * 1024); - let response = Deno.core.dispatch(control); + let response = Deno.core.dispatch(99, control); assert(response instanceof Uint8Array); assert(response.length == 1); assert(response[0] == 43); @@ -1038,10 +1051,10 @@ pub mod tests { "overflow_res_sync.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { asyncRecv++ }); + Deno.core.setAsyncHandler((opId, buf) => { asyncRecv++ }); // Large message that will overflow the shared space. let control = new Uint8Array([42]); - let response = Deno.core.dispatch(control); + let response = Deno.core.dispatch(99, control); assert(response instanceof Uint8Array); assert(response.length == 100 * 1024 * 1024); assert(response[0] == 99); @@ -1059,21 +1072,22 @@ pub mod tests { "overflow_req_async.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { + Deno.core.setAsyncHandler((opId, buf) => { + assert(opId == 99); assert(buf.byteLength === 1); assert(buf[0] === 43); asyncRecv++; }); // Large message that will overflow the shared space. let control = new Uint8Array(100 * 1024 * 1024); - let response = Deno.core.dispatch(control); + let response = Deno.core.dispatch(99, control); // Async messages always have null response. assert(response == null); assert(asyncRecv == 0); "#, )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); - assert_eq!(Async::Ready(()), isolate.poll().unwrap()); + assert_eq!(Async::Ready(()), js_check(isolate.poll())); js_check(isolate.execute("check.js", "assert(asyncRecv == 1);")); }); } @@ -1088,14 +1102,15 @@ pub mod tests { "overflow_res_async.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { + Deno.core.setAsyncHandler((opId, buf) => { + assert(opId == 99); assert(buf.byteLength === 100 * 1024 * 1024); assert(buf[0] === 4); asyncRecv++; }); // Large message that will overflow the shared space. let control = new Uint8Array([42]); - let response = Deno.core.dispatch(control); + let response = Deno.core.dispatch(99, control); assert(response == null); assert(asyncRecv == 0); "#, @@ -1116,19 +1131,20 @@ pub mod tests { "overflow_res_multiple_dispatch_async.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { + Deno.core.setAsyncHandler((opId, buf) => { + assert(opId === 99); assert(buf.byteLength === 100 * 1024 * 1024); assert(buf[0] === 4); asyncRecv++; }); // Large message that will overflow the shared space. let control = new Uint8Array([42]); - let response = Deno.core.dispatch(control); + let response = Deno.core.dispatch(99, control); assert(response == null); assert(asyncRecv == 0); // Dispatch another message to verify that pending ops // are done even if shared space overflows - Deno.core.dispatch(control); + Deno.core.dispatch(99, control); "#, )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); |