summaryrefslogtreecommitdiff
path: root/core/isolate.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/isolate.rs')
-rw-r--r--core/isolate.rs108
1 files changed, 62 insertions, 46 deletions
diff --git a/core/isolate.rs b/core/isolate.rs
index 0f693ff92..d3ac4457e 100644
--- a/core/isolate.rs
+++ b/core/isolate.rs
@@ -12,6 +12,7 @@ use crate::libdeno::deno_buf;
use crate::libdeno::deno_dyn_import_id;
use crate::libdeno::deno_mod;
use crate::libdeno::deno_pinned_buf;
+use crate::libdeno::OpId;
use crate::libdeno::PinnedBuf;
use crate::libdeno::Snapshot1;
use crate::libdeno::Snapshot2;
@@ -33,6 +34,9 @@ pub type Buf = Box<[u8]>;
pub type OpAsyncFuture<E> = Box<dyn Future<Item = Buf, Error = E> + Send>;
+type PendingOpFuture =
+ Box<dyn Future<Item = (OpId, Buf), Error = CoreError> + Send>;
+
pub enum Op<E> {
Sync(Buf),
Async(OpAsyncFuture<E>),
@@ -40,10 +44,13 @@ pub enum Op<E> {
pub type CoreError = ();
-type CoreOpAsyncFuture = OpAsyncFuture<CoreError>;
-
pub type CoreOp = Op<CoreError>;
+pub type OpResult<E> = Result<Op<E>, E>;
+
+/// Args: op_id, control_buf, zero_copy_buf
+type CoreDispatchFn = dyn Fn(OpId, &[u8], Option<PinnedBuf>) -> CoreOp;
+
/// Stores a script used to initalize a Isolate
pub struct Script<'a> {
pub source: &'a str,
@@ -76,10 +83,6 @@ pub enum StartupData<'a> {
None,
}
-pub type OpResult<E> = Result<Op<E>, E>;
-
-type CoreDispatchFn = dyn Fn(&[u8], Option<PinnedBuf>) -> CoreOp;
-
pub type DynImportFuture =
Box<dyn Future<Item = deno_mod, Error = ErrBox> + Send>;
type DynImportFn = dyn Fn(&str, &str) -> DynImportFuture;
@@ -121,7 +124,7 @@ pub struct Isolate {
js_error_create: Arc<JSErrorCreateFn>,
needs_init: bool,
shared: SharedQueue,
- pending_ops: FuturesUnordered<CoreOpAsyncFuture>,
+ pending_ops: FuturesUnordered<PendingOpFuture>,
pending_dyn_imports: FuturesUnordered<DynImport>,
have_unpolled_ops: bool,
startup_script: Option<OwnedScript>,
@@ -198,7 +201,7 @@ impl Isolate {
/// corresponds to the second argument of Deno.core.dispatch().
pub fn set_dispatch<F>(&mut self, f: F)
where
- F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
+ F: Fn(OpId, &[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
{
self.dispatch = Some(Arc::new(f));
}
@@ -265,13 +268,14 @@ impl Isolate {
extern "C" fn pre_dispatch(
user_data: *mut c_void,
- control_argv0: deno_buf,
+ op_id: OpId,
+ control_buf: deno_buf,
zero_copy_buf: deno_pinned_buf,
) {
let isolate = unsafe { Isolate::from_raw_ptr(user_data) };
let op = if let Some(ref f) = isolate.dispatch {
- f(control_argv0.as_ref(), PinnedBuf::new(zero_copy_buf))
+ f(op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf))
} else {
panic!("isolate.dispatch not set")
};
@@ -280,13 +284,17 @@ impl Isolate {
match op {
Op::Sync(buf) => {
// For sync messages, we always return the response via Deno.core.send's
- // return value.
- // TODO(ry) check that if JSError thrown during respond(), that it will be
- // picked up.
- let _ = isolate.respond(Some(&buf));
+ // return value. Sync messages ignore the op_id.
+ let op_id = 0;
+ isolate
+ .respond(Some((op_id, &buf)))
+ // Because this is a sync op, deno_respond() does not actually call
+ // into JavaScript. We should not get an error here.
+ .expect("unexpected error");
}
Op::Async(fut) => {
- isolate.pending_ops.push(fut);
+ let fut2 = fut.map(move |buf| (op_id, buf));
+ isolate.pending_ops.push(Box::new(fut2));
isolate.have_unpolled_ops = true;
}
}
@@ -347,13 +355,16 @@ impl Isolate {
}
}
- fn respond(&mut self, maybe_buf: Option<&[u8]>) -> Result<(), ErrBox> {
- let buf = match maybe_buf {
- None => deno_buf::empty(),
- Some(r) => deno_buf::from(r),
+ fn respond(
+ &mut self,
+ maybe_buf: Option<(OpId, &[u8])>,
+ ) -> Result<(), ErrBox> {
+ let (op_id, buf) = match maybe_buf {
+ None => (0, deno_buf::empty()),
+ Some((op_id, r)) => (op_id, deno_buf::from(r)),
};
unsafe {
- libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), buf)
+ libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), op_id, buf)
}
self.check_last_exception()
}
@@ -541,7 +552,7 @@ impl Future for Isolate {
fn poll(&mut self) -> Poll<(), ErrBox> {
self.shared_init();
- let mut overflow_response: Option<Buf> = None;
+ let mut overflow_response: Option<(OpId, Buf)> = None;
loop {
// If there are any pending dyn_import futures, do those first.
@@ -567,13 +578,13 @@ impl Future for Isolate {
Err(_) => panic!("unexpected op error"),
Ok(Ready(None)) => break,
Ok(NotReady) => break,
- Ok(Ready(Some(buf))) => {
- let successful_push = self.shared.push(&buf);
+ Ok(Ready(Some((op_id, buf)))) => {
+ let successful_push = self.shared.push(op_id, &buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
- overflow_response = Some(buf);
+ overflow_response = Some((op_id, buf));
break;
}
}
@@ -592,8 +603,8 @@ impl Future for Isolate {
if overflow_response.is_some() {
// Lock the current thread for V8.
let locker = LockerScope::new(self.libdeno_isolate);
- let buf = overflow_response.take().unwrap();
- self.respond(Some(&buf))?;
+ let (op_id, buf) = overflow_response.take().unwrap();
+ self.respond(Some((op_id, &buf)))?;
drop(locker);
}
@@ -633,10 +644,11 @@ impl IsolateHandle {
}
}
-pub fn js_check(r: Result<(), ErrBox>) {
+pub fn js_check<T>(r: Result<T, ErrBox>) -> T {
if let Err(e) = r {
panic!(e.to_string());
}
+ r.unwrap()
}
#[cfg(test)]
@@ -689,7 +701,8 @@ pub mod tests {
let dispatch_count_ = dispatch_count.clone();
let mut isolate = Isolate::new(StartupData::None, false);
- isolate.set_dispatch(move |control, _| -> CoreOp {
+ isolate.set_dispatch(move |op_id, control, _| -> CoreOp {
+ println!("op_id {}", op_id);
dispatch_count_.fetch_add(1, Ordering::Relaxed);
match mode {
Mode::AsyncImmediate => {
@@ -749,9 +762,9 @@ pub mod tests {
"filename.js",
r#"
let control = new Uint8Array([42]);
- Deno.core.send(control);
+ Deno.core.send(42, control);
async function main() {
- Deno.core.send(control);
+ Deno.core.send(42, control);
}
main();
"#,
@@ -770,7 +783,7 @@ pub mod tests {
import { b } from 'b.js'
if (b() != 'b') throw Error();
let control = new Uint8Array([42]);
- Deno.core.send(control);
+ Deno.core.send(42, control);
"#,
)
.unwrap();
@@ -816,7 +829,7 @@ pub mod tests {
"setup2.js",
r#"
let nrecv = 0;
- Deno.core.setAsyncHandler((buf) => {
+ Deno.core.setAsyncHandler((opId, buf) => {
nrecv++;
});
"#,
@@ -827,7 +840,7 @@ pub mod tests {
r#"
assert(nrecv == 0);
let control = new Uint8Array([42]);
- Deno.core.send(control);
+ Deno.core.send(42, control);
assert(nrecv == 0);
"#,
));
@@ -838,7 +851,7 @@ pub mod tests {
"check2.js",
r#"
assert(nrecv == 1);
- Deno.core.send(control);
+ Deno.core.send(42, control);
assert(nrecv == 1);
"#,
));
@@ -1016,10 +1029,10 @@ pub mod tests {
"overflow_req_sync.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => { asyncRecv++ });
+ Deno.core.setAsyncHandler((opId, buf) => { asyncRecv++ });
// Large message that will overflow the shared space.
let control = new Uint8Array(100 * 1024 * 1024);
- let response = Deno.core.dispatch(control);
+ let response = Deno.core.dispatch(99, control);
assert(response instanceof Uint8Array);
assert(response.length == 1);
assert(response[0] == 43);
@@ -1038,10 +1051,10 @@ pub mod tests {
"overflow_res_sync.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => { asyncRecv++ });
+ Deno.core.setAsyncHandler((opId, buf) => { asyncRecv++ });
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
- let response = Deno.core.dispatch(control);
+ let response = Deno.core.dispatch(99, control);
assert(response instanceof Uint8Array);
assert(response.length == 100 * 1024 * 1024);
assert(response[0] == 99);
@@ -1059,21 +1072,22 @@ pub mod tests {
"overflow_req_async.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => {
+ Deno.core.setAsyncHandler((opId, buf) => {
+ assert(opId == 99);
assert(buf.byteLength === 1);
assert(buf[0] === 43);
asyncRecv++;
});
// Large message that will overflow the shared space.
let control = new Uint8Array(100 * 1024 * 1024);
- let response = Deno.core.dispatch(control);
+ let response = Deno.core.dispatch(99, control);
// Async messages always have null response.
assert(response == null);
assert(asyncRecv == 0);
"#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
- assert_eq!(Async::Ready(()), isolate.poll().unwrap());
+ assert_eq!(Async::Ready(()), js_check(isolate.poll()));
js_check(isolate.execute("check.js", "assert(asyncRecv == 1);"));
});
}
@@ -1088,14 +1102,15 @@ pub mod tests {
"overflow_res_async.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => {
+ Deno.core.setAsyncHandler((opId, buf) => {
+ assert(opId == 99);
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
});
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
- let response = Deno.core.dispatch(control);
+ let response = Deno.core.dispatch(99, control);
assert(response == null);
assert(asyncRecv == 0);
"#,
@@ -1116,19 +1131,20 @@ pub mod tests {
"overflow_res_multiple_dispatch_async.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => {
+ Deno.core.setAsyncHandler((opId, buf) => {
+ assert(opId === 99);
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
});
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
- let response = Deno.core.dispatch(control);
+ let response = Deno.core.dispatch(99, control);
assert(response == null);
assert(asyncRecv == 0);
// Dispatch another message to verify that pending ops
// are done even if shared space overflows
- Deno.core.dispatch(control);
+ Deno.core.dispatch(99, control);
"#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);