summaryrefslogtreecommitdiff
path: root/core/isolate.rs
diff options
context:
space:
mode:
authorandy finch <andyfinch7@gmail.com>2019-06-13 23:43:54 -0400
committerRyan Dahl <ry@tinyclouds.org>2019-06-13 20:43:54 -0700
commitdc60fe9f300043f191286ef804a365e16e455f87 (patch)
treec6b74e9faa6f26745b8770a18d0ae46ee34f3774 /core/isolate.rs
parentfdd2eb538327ee3f50fe2869320411191830c985 (diff)
Refactor dispatch handling (#2452)
Promise id is now created in core and passed back to JS.
Diffstat (limited to 'core/isolate.rs')
-rw-r--r--core/isolate.rs118
1 files changed, 85 insertions, 33 deletions
diff --git a/core/isolate.rs b/core/isolate.rs
index 14e1b88aa..ae14c0040 100644
--- a/core/isolate.rs
+++ b/core/isolate.rs
@@ -21,21 +21,29 @@ use futures::Async::*;
use futures::Future;
use futures::Poll;
use libc::c_char;
+use libc::c_int;
use libc::c_void;
use std::ffi::CStr;
use std::ffi::CString;
use std::ptr::null;
+use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::{Arc, Mutex, Once, ONCE_INIT};
pub type Buf = Box<[u8]>;
-pub type OpAsyncFuture = Box<dyn Future<Item = Buf, Error = ()> + Send>;
+pub type OpAsyncFuture<I, E> = Box<dyn Future<Item = I, Error = E> + Send>;
-pub enum Op {
+pub enum Op<E> {
Sync(Buf),
- Async(OpAsyncFuture),
+ Async(OpAsyncFuture<Buf, E>),
}
+pub type CoreError = ();
+
+type CoreOpAsyncFuture = OpAsyncFuture<(c_int, Buf), CoreError>;
+
+pub type CoreOp = Op<CoreError>;
+
/// Stores a script used to initalize a Isolate
pub struct Script<'a> {
pub source: &'a str,
@@ -68,7 +76,9 @@ pub enum StartupData<'a> {
None,
}
-type DispatchFn = Fn(&[u8], Option<PinnedBuf>) -> Op;
+pub type OpResult<E> = Result<Op<E>, E>;
+
+type CoreDispatchFn = Fn(&[u8], Option<PinnedBuf>) -> CoreOp;
pub type DynImportFuture = Box<dyn Future<Item = deno_mod, Error = ()> + Send>;
type DynImportFn = Fn(&str, &str) -> DynImportFuture;
@@ -93,6 +103,12 @@ impl Future for DynImport {
}
}
+enum ResponseData {
+ None,
+ Buffer(deno_buf),
+ PromiseId(c_int),
+}
+
/// A single execution context of JavaScript. Corresponds roughly to the "Web
/// Worker" concept in the DOM. An Isolate is a Future that can be used with
/// Tokio. The Isolate future complete when there is an error or when all
@@ -104,14 +120,15 @@ impl Future for DynImport {
pub struct Isolate {
libdeno_isolate: *const libdeno::isolate,
shared_libdeno_isolate: Arc<Mutex<Option<*const libdeno::isolate>>>,
- dispatch: Option<Arc<DispatchFn>>,
+ dispatch: Option<Arc<CoreDispatchFn>>,
dyn_import: Option<Arc<DynImportFn>>,
needs_init: bool,
shared: SharedQueue,
- pending_ops: FuturesUnordered<OpAsyncFuture>,
+ pending_ops: FuturesUnordered<CoreOpAsyncFuture>,
pending_dyn_imports: FuturesUnordered<DynImport>,
have_unpolled_ops: bool,
startup_script: Option<OwnedScript>,
+ next_promise_id: AtomicI32,
}
unsafe impl Send for Isolate {}
@@ -176,6 +193,7 @@ impl Isolate {
have_unpolled_ops: false,
pending_dyn_imports: FuturesUnordered::new(),
startup_script,
+ next_promise_id: AtomicI32::new(1),
}
}
@@ -184,7 +202,7 @@ impl Isolate {
/// corresponds to the second argument of Deno.core.dispatch().
pub fn set_dispatch<F>(&mut self, f: F)
where
- F: Fn(&[u8], Option<PinnedBuf>) -> Op + Send + Sync + 'static,
+ F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
{
self.dispatch = Some(Arc::new(f));
}
@@ -239,6 +257,10 @@ impl Isolate {
}
}
+ pub fn get_next_promise_id(&self) -> i32 {
+ self.next_promise_id.fetch_add(1, Ordering::SeqCst)
+ }
+
extern "C" fn pre_dispatch(
user_data: *mut c_void,
control_argv0: deno_buf,
@@ -279,9 +301,17 @@ impl Isolate {
// return value.
// TODO(ry) check that if JSError thrown during respond(), that it will be
// picked up.
- let _ = isolate.respond(Some(&buf));
+ let _ =
+ isolate.respond(ResponseData::Buffer(deno_buf::from(buf.as_ref())));
}
Op::Async(fut) => {
+ let promise_id = isolate.get_next_promise_id();
+ let _ = isolate.respond(ResponseData::PromiseId(promise_id));
+ let fut = Box::new(fut.and_then(
+ move |buf| -> Result<(c_int, Buf), CoreError> {
+ Ok((promise_id, buf))
+ },
+ ));
isolate.pending_ops.push(fut);
isolate.have_unpolled_ops = true;
}
@@ -340,14 +370,34 @@ impl Isolate {
}
}
- fn respond(&mut self, maybe_buf: Option<&[u8]>) -> Result<(), JSError> {
- let buf = match maybe_buf {
- None => deno_buf::empty(),
- Some(r) => deno_buf::from(r),
+ // the result type is a placeholder for a more specific enum type
+ fn respond(&mut self, data: ResponseData) -> Result<(), JSError> {
+ match data {
+ ResponseData::PromiseId(pid) => unsafe {
+ libdeno::deno_respond(
+ self.libdeno_isolate,
+ self.as_raw_ptr(),
+ deno_buf::empty(),
+ &pid,
+ )
+ },
+ ResponseData::Buffer(r) => unsafe {
+ libdeno::deno_respond(
+ self.libdeno_isolate,
+ self.as_raw_ptr(),
+ r,
+ null(),
+ )
+ },
+ ResponseData::None => unsafe {
+ libdeno::deno_respond(
+ self.libdeno_isolate,
+ self.as_raw_ptr(),
+ deno_buf::empty(),
+ null(),
+ )
+ },
};
- unsafe {
- libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), buf)
- }
if let Some(err) = self.last_exception() {
Err(err)
} else {
@@ -525,7 +575,7 @@ impl Future for Isolate {
self.shared_init();
- let mut overflow_response: Option<Buf> = None;
+ let mut overflow_response: Option<(c_int, Buf)> = None;
loop {
// If there are any pending dyn_import futures, do those first.
@@ -546,13 +596,13 @@ impl Future for Isolate {
Err(_) => panic!("unexpected op error"),
Ok(Ready(None)) => break,
Ok(NotReady) => break,
- Ok(Ready(Some(buf))) => {
- let successful_push = self.shared.push(&buf);
+ Ok(Ready(Some(op))) => {
+ let successful_push = self.shared.push(op.0, &op.1);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
- overflow_response = Some(buf);
+ overflow_response = Some(op);
break;
}
}
@@ -560,14 +610,16 @@ impl Future for Isolate {
}
if self.shared.size() > 0 {
- self.respond(None)?;
+ self.respond(ResponseData::None)?;
// The other side should have shifted off all the messages.
assert_eq!(self.shared.size(), 0);
}
if overflow_response.is_some() {
- let buf = overflow_response.take().unwrap();
- self.respond(Some(&buf))?;
+ let op = overflow_response.take().unwrap();
+ let promise_id_bytes = op.0.to_be_bytes();
+ let buf: Buf = [&promise_id_bytes, &op.1[..]].concat().into();
+ self.respond(ResponseData::Buffer(deno_buf::from(buf.as_ref())))?;
}
self.check_promise_errors();
@@ -664,7 +716,7 @@ pub mod tests {
let dispatch_count_ = dispatch_count.clone();
let mut isolate = Isolate::new(StartupData::None, false);
- isolate.set_dispatch(move |control, _| -> Op {
+ isolate.set_dispatch(move |control, _| -> CoreOp {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
match mode {
Mode::AsyncImmediate => {
@@ -834,7 +886,7 @@ pub mod tests {
"setup2.js",
r#"
let nrecv = 0;
- Deno.core.setAsyncHandler((buf) => {
+ Deno.core.setAsyncHandler((promiseId, buf) => {
assert(buf.byteLength === 1);
assert(buf[0] === 43);
nrecv++;
@@ -1025,7 +1077,7 @@ pub mod tests {
"overflow_req_sync.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => { asyncRecv++ });
+ Deno.core.setAsyncHandler((promiseId, buf) => { asyncRecv++ });
// Large message that will overflow the shared space.
let control = new Uint8Array(100 * 1024 * 1024);
let response = Deno.core.dispatch(control);
@@ -1047,7 +1099,7 @@ pub mod tests {
"overflow_res_sync.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => { asyncRecv++ });
+ Deno.core.setAsyncHandler((promiseId, buf) => { asyncRecv++ });
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
let response = Deno.core.dispatch(control);
@@ -1068,7 +1120,7 @@ pub mod tests {
"overflow_req_async.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => {
+ Deno.core.setAsyncHandler((cmdId, buf) => {
assert(buf.byteLength === 1);
assert(buf[0] === 43);
asyncRecv++;
@@ -1076,8 +1128,8 @@ pub mod tests {
// Large message that will overflow the shared space.
let control = new Uint8Array(100 * 1024 * 1024);
let response = Deno.core.dispatch(control);
- // Async messages always have null response.
- assert(response == null);
+ // Async messages always have number type response.
+ assert(typeof response == "number");
assert(asyncRecv == 0);
"#,
));
@@ -1097,7 +1149,7 @@ pub mod tests {
"overflow_res_async.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => {
+ Deno.core.setAsyncHandler((cmdId, buf) => {
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
@@ -1105,7 +1157,7 @@ pub mod tests {
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
let response = Deno.core.dispatch(control);
- assert(response == null);
+ assert(typeof response == "number");
assert(asyncRecv == 0);
"#,
));
@@ -1125,7 +1177,7 @@ pub mod tests {
"overflow_res_multiple_dispatch_async.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => {
+ Deno.core.setAsyncHandler((cmdId, buf) => {
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
@@ -1133,7 +1185,7 @@ pub mod tests {
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
let response = Deno.core.dispatch(control);
- assert(response == null);
+ assert(typeof response == "number");
assert(asyncRecv == 0);
// Dispatch another message to verify that pending ops
// are done even if shared space overflows