summaryrefslogtreecommitdiff
path: root/core/isolate.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/isolate.rs')
-rw-r--r--core/isolate.rs286
1 files changed, 138 insertions, 148 deletions
diff --git a/core/isolate.rs b/core/isolate.rs
index f7aa431aa..33b52da75 100644
--- a/core/isolate.rs
+++ b/core/isolate.rs
@@ -3,6 +3,8 @@ use crate::js_errors::JSError;
use crate::libdeno;
use crate::libdeno::deno_buf;
use crate::libdeno::deno_mod;
+use crate::shared_queue::SharedQueue;
+use crate::shared_queue::RECOMMENDED_SIZE;
use futures::Async;
use futures::Future;
use futures::Poll;
@@ -11,19 +13,20 @@ use std::ffi::CStr;
use std::ffi::CString;
use std::sync::{Once, ONCE_INIT};
-pub type Op<R> = dyn Future<Item = R, Error = ()> + Send;
+pub type Buf = Box<[u8]>;
+pub type Op = dyn Future<Item = Buf, Error = ()> + Send;
-struct PendingOp<R> {
- op: Box<Op<R>>,
+struct PendingOp {
+ op: Box<Op>,
polled_recently: bool,
zero_copy_id: usize, // non-zero if associated zero-copy buffer.
}
-impl<R> Future for PendingOp<R> {
- type Item = R;
+impl Future for PendingOp {
+ type Item = Buf;
type Error = ();
- fn poll(&mut self) -> Poll<R, ()> {
+ fn poll(&mut self) -> Poll<Buf, ()> {
// Do not call poll on ops we've already polled this turn.
if self.polled_recently {
Ok(Async::NotReady)
@@ -32,7 +35,7 @@ impl<R> Future for PendingOp<R> {
let op = &mut self.op;
op.poll().map_err(|()| {
// Ops should not error. If an op experiences an error it needs to
- // encode that error into the record R, so it can be returned to JS.
+ // encode that error into a buf, so it can be returned to JS.
panic!("ops should not error")
})
}
@@ -40,35 +43,21 @@ impl<R> Future for PendingOp<R> {
}
/// Defines the behavior of an Isolate.
-pub trait Behavior<R> {
+pub trait Behavior {
/// Called exactly once when an Isolate is created to retrieve the startup
/// snapshot.
fn startup_snapshot(&mut self) -> Option<deno_buf>;
- /// Called exactly once when an Isolate is created to provide the
- /// backing memory for the libdeno.shared SharedArrayBuffer.
- fn startup_shared(&mut self) -> Option<deno_buf>;
-
/// Called during mod_instantiate() to resolve imports.
fn resolve(&mut self, specifier: &str, referrer: deno_mod) -> deno_mod;
/// Called whenever libdeno.send() is called in JavaScript. zero_copy_buf
/// corresponds to the second argument of libdeno.send().
- fn recv(&mut self, record: R, zero_copy_buf: deno_buf) -> (bool, Box<Op<R>>);
-
- // TODO(ry) Remove records_reset().
- // TODO(ry) Abstract records_* and startup_shared() methods into standalone
- // trait called Shared. It should, however, wait until integration with
- // existing Deno codebase is complete.
-
- /// Clears the shared buffer.
- fn records_reset(&mut self);
-
- /// Returns false if not enough room.
- fn records_push(&mut self, record: R) -> bool;
-
- /// Returns none if empty.
- fn records_shift(&mut self) -> Option<R>;
+ fn dispatch(
+ &mut self,
+ control: &[u8],
+ zero_copy_buf: deno_buf,
+ ) -> (bool, Box<Op>);
}
/// A single execution context of JavaScript. Corresponds roughly to the "Web
@@ -77,18 +66,19 @@ pub trait Behavior<R> {
/// pending ops have completed.
///
/// Ops are created in JavaScript by calling libdeno.send(), and in Rust by
-/// implementing Behavior::recv. An Op corresponds exactly to a Promise in
+/// implementing Behavior::dispatch. An Op corresponds exactly to a Promise in
/// JavaScript.
-pub struct Isolate<R, B: Behavior<R>> {
+pub struct Isolate<B: Behavior> {
libdeno_isolate: *const libdeno::isolate,
behavior: B,
- pending_ops: Vec<PendingOp<R>>,
+ shared: SharedQueue,
+ pending_ops: Vec<PendingOp>,
polled_recently: bool,
}
-unsafe impl<R, B: Behavior<R>> Send for Isolate<R, B> {}
+unsafe impl<B: Behavior> Send for Isolate<B> {}
-impl<R, B: Behavior<R>> Drop for Isolate<R, B> {
+impl<B: Behavior> Drop for Isolate<B> {
fn drop(&mut self) {
unsafe { libdeno::deno_delete(self.libdeno_isolate) }
}
@@ -96,22 +86,21 @@ impl<R, B: Behavior<R>> Drop for Isolate<R, B> {
static DENO_INIT: Once = ONCE_INIT;
-impl<R, B: Behavior<R>> Isolate<R, B> {
+impl<B: Behavior> Isolate<B> {
pub fn new(mut behavior: B) -> Self {
DENO_INIT.call_once(|| {
unsafe { libdeno::deno_init() };
});
+ let shared = SharedQueue::new(RECOMMENDED_SIZE);
+
let config = libdeno::deno_config {
will_snapshot: 0,
load_snapshot: match behavior.startup_snapshot() {
Some(s) => s,
None => libdeno::deno_buf::empty(),
},
- shared: match behavior.startup_shared() {
- Some(s) => s,
- None => libdeno::deno_buf::empty(),
- },
+ shared: shared.as_deno_buf(),
recv_cb: Self::pre_dispatch,
};
let libdeno_isolate = unsafe { libdeno::deno_new(config) };
@@ -119,28 +108,50 @@ impl<R, B: Behavior<R>> Isolate<R, B> {
Self {
libdeno_isolate,
behavior,
+ shared,
pending_ops: Vec::new(),
polled_recently: false,
}
}
+ /// Executes a bit of built-in JavaScript to provide Deno._sharedQueue.
+ pub fn shared_init(&self) {
+ js_check(self.execute("shared_queue.js", include_str!("shared_queue.js")));
+ }
+
extern "C" fn pre_dispatch(
user_data: *mut c_void,
- control_buf: deno_buf,
+ control_argv0: deno_buf,
zero_copy_buf: deno_buf,
) {
- let isolate = unsafe { Isolate::<R, B>::from_raw_ptr(user_data) };
- assert_eq!(control_buf.len(), 0);
+ let isolate = unsafe { Isolate::<B>::from_raw_ptr(user_data) };
let zero_copy_id = zero_copy_buf.zero_copy_id;
- let req_record = isolate.behavior.records_shift().unwrap();
+ let control_shared = isolate.shared.shift();
- isolate.behavior.records_reset();
+ let (is_sync, op) = if control_argv0.len() > 0 {
+ // The user called libdeno.send(control)
+ isolate
+ .behavior
+ .dispatch(control_argv0.as_ref(), zero_copy_buf)
+ } else if let Some(c) = control_shared {
+ // The user called Deno._sharedQueue.push(control)
+ isolate.behavior.dispatch(&c, zero_copy_buf)
+ } else {
+ // The sharedQueue is empty. The shouldn't happen usually, but it's also
+ // not technically a failure.
+ #[cfg(test)]
+ unreachable!();
+ #[cfg(not(test))]
+ return;
+ };
+
+ // At this point the SharedQueue should be empty.
+ assert_eq!(isolate.shared.size(), 0);
- let (is_sync, op) = isolate.behavior.recv(req_record, zero_copy_buf);
if is_sync {
let res_record = op.wait().unwrap();
- let push_success = isolate.behavior.records_push(res_record);
+ let push_success = isolate.shared.push(res_record);
assert!(push_success);
// TODO check that if JSError thrown during respond(), that it will be
// picked up.
@@ -295,7 +306,7 @@ impl<R, B: Behavior<R>> Isolate<R, B> {
specifier_ptr: *const libc::c_char,
referrer: deno_mod,
) -> deno_mod {
- let isolate = unsafe { Isolate::<R, B>::from_raw_ptr(user_data) };
+ let isolate = unsafe { Isolate::<B>::from_raw_ptr(user_data) };
let specifier_c: &CStr = unsafe { CStr::from_ptr(specifier_ptr) };
let specifier: &str = specifier_c.to_str().unwrap();
isolate.behavior.resolve(specifier, referrer)
@@ -319,7 +330,7 @@ impl Drop for LockerScope {
}
}
-impl<R, B: Behavior<R>> Future for Isolate<R, B> {
+impl<B: Behavior> Future for Isolate<B> {
type Item = ();
type Error = JSError;
@@ -336,22 +347,18 @@ impl<R, B: Behavior<R>> Future for Isolate<R, B> {
while !self.polled_recently {
let mut completed_count = 0;
-
- debug!("poll loop");
-
self.polled_recently = true;
-
- self.behavior.records_reset();
+ assert_eq!(self.shared.size(), 0);
let mut i = 0;
- while i != self.pending_ops.len() {
+ while i < self.pending_ops.len() {
let pending = &mut self.pending_ops[i];
match pending.poll() {
- Err(()) => panic!("unexpectd error"),
+ Err(()) => panic!("unexpected error"),
Ok(Async::NotReady) => {
i += 1;
}
- Ok(Async::Ready(record)) => {
+ Ok(Async::Ready(buf)) => {
let completed = self.pending_ops.remove(i);
completed_count += 1;
@@ -359,15 +366,16 @@ impl<R, B: Behavior<R>> Future for Isolate<R, B> {
self.zero_copy_release(completed.zero_copy_id);
}
- self.behavior.records_push(record);
+ self.shared.push(buf);
}
}
}
if completed_count > 0 {
- debug!("respond");
self.respond()?;
- debug!("after respond");
+ // The other side should have shifted off all the messages.
+ assert_eq!(self.shared.size(), 0);
+ self.shared.reset();
}
}
@@ -385,99 +393,33 @@ impl<R, B: Behavior<R>> Future for Isolate<R, B> {
}
}
+pub fn js_check(r: Result<(), JSError>) {
+ if let Err(e) = r {
+ panic!(e.to_string());
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
- use std::collections::HashMap;
-
- fn js_check(r: Result<(), JSError>) {
- if let Err(e) = r {
- panic!(e.to_string());
- }
- }
-
- struct TestBehavior {
- recv_count: usize,
- resolve_count: usize,
- push_count: usize,
- shift_count: usize,
- reset_count: usize,
- mod_map: HashMap<String, deno_mod>,
- }
-
- impl TestBehavior {
- fn new() -> Self {
- Self {
- recv_count: 0,
- resolve_count: 0,
- push_count: 0,
- shift_count: 0,
- reset_count: 0,
- mod_map: HashMap::new(),
- }
- }
-
- fn register(&mut self, name: &str, id: deno_mod) {
- self.mod_map.insert(name.to_string(), id);
- }
- }
-
- impl Behavior<()> for TestBehavior {
- fn startup_snapshot(&mut self) -> Option<deno_buf> {
- None
- }
-
- fn startup_shared(&mut self) -> Option<deno_buf> {
- None
- }
-
- fn recv(
- &mut self,
- _record: (),
- _zero_copy_buf: deno_buf,
- ) -> (bool, Box<Op<()>>) {
- self.recv_count += 1;
- (false, Box::new(futures::future::ok(())))
- }
-
- fn resolve(&mut self, specifier: &str, _referrer: deno_mod) -> deno_mod {
- self.resolve_count += 1;
- match self.mod_map.get(specifier) {
- Some(id) => *id,
- None => 0,
- }
- }
-
- fn records_reset(&mut self) {
- self.reset_count += 1;
- }
-
- fn records_push(&mut self, _record: ()) -> bool {
- self.push_count += 1;
- true
- }
-
- fn records_shift(&mut self) -> Option<()> {
- self.shift_count += 1;
- Some(())
- }
- }
+ use crate::test_util::*;
#[test]
- fn test_recv() {
+ fn test_dispatch() {
let behavior = TestBehavior::new();
let isolate = Isolate::new(behavior);
js_check(isolate.execute(
"filename.js",
r#"
- libdeno.send();
+ let control = new Uint8Array([42]);
+ libdeno.send(control);
async function main() {
- libdeno.send();
+ libdeno.send(control);
}
main();
"#,
));
- assert_eq!(isolate.behavior.recv_count, 2);
+ assert_eq!(isolate.behavior.dispatch_count, 2);
}
#[test]
@@ -491,10 +433,11 @@ mod tests {
r#"
import { b } from 'b.js'
if (b() != 'b') throw Error();
- libdeno.send();
+ let control = new Uint8Array([42]);
+ libdeno.send(control);
"#,
).unwrap();
- assert_eq!(isolate.behavior.recv_count, 0);
+ assert_eq!(isolate.behavior.dispatch_count, 0);
assert_eq!(isolate.behavior.resolve_count, 0);
let imports = isolate.mod_get_imports(mod_a);
@@ -507,16 +450,16 @@ mod tests {
assert_eq!(imports.len(), 0);
js_check(isolate.mod_instantiate(mod_b));
- assert_eq!(isolate.behavior.recv_count, 0);
+ assert_eq!(isolate.behavior.dispatch_count, 0);
assert_eq!(isolate.behavior.resolve_count, 0);
isolate.behavior.register("b.js", mod_b);
js_check(isolate.mod_instantiate(mod_a));
- assert_eq!(isolate.behavior.recv_count, 0);
+ assert_eq!(isolate.behavior.dispatch_count, 0);
assert_eq!(isolate.behavior.resolve_count, 1);
js_check(isolate.mod_evaluate(mod_a));
- assert_eq!(isolate.behavior.recv_count, 1);
+ assert_eq!(isolate.behavior.dispatch_count, 1);
assert_eq!(isolate.behavior.resolve_count, 1);
}
@@ -525,11 +468,13 @@ mod tests {
let behavior = TestBehavior::new();
let mut isolate = Isolate::new(behavior);
+ isolate.shared_init();
+
js_check(isolate.execute(
"setup.js",
r#"
let nrecv = 0;
- libdeno.recv(() => {
+ Deno._setAsyncHandler((buf) => {
nrecv++;
});
function assertEq(actual, expected) {
@@ -539,32 +484,77 @@ mod tests {
}
"#,
));
- assert_eq!(isolate.behavior.recv_count, 0);
+ assert_eq!(isolate.behavior.dispatch_count, 0);
js_check(isolate.execute(
"check1.js",
r#"
assertEq(nrecv, 0);
- libdeno.send();
+ let control = new Uint8Array([42]);
+ libdeno.send(control);
assertEq(nrecv, 0);
"#,
));
- assert_eq!(isolate.behavior.recv_count, 1);
+ assert_eq!(isolate.behavior.dispatch_count, 1);
assert_eq!(Ok(Async::Ready(())), isolate.poll());
- assert_eq!(isolate.behavior.recv_count, 1);
+ assert_eq!(isolate.behavior.dispatch_count, 1);
js_check(isolate.execute(
"check2.js",
r#"
assertEq(nrecv, 1);
- libdeno.send();
+ libdeno.send(control);
assertEq(nrecv, 1);
"#,
));
- assert_eq!(isolate.behavior.recv_count, 2);
+ assert_eq!(isolate.behavior.dispatch_count, 2);
assert_eq!(Ok(Async::Ready(())), isolate.poll());
js_check(isolate.execute("check3.js", "assertEq(nrecv, 2)"));
- assert_eq!(isolate.behavior.recv_count, 2);
+ assert_eq!(isolate.behavior.dispatch_count, 2);
// We are idle, so the next poll should be the last.
assert_eq!(Ok(Async::Ready(())), isolate.poll());
}
+ #[test]
+ fn test_shared() {
+ let behavior = TestBehavior::new();
+ let mut isolate = Isolate::new(behavior);
+
+ isolate.shared_init();
+
+ js_check(isolate.execute(
+ "setup.js",
+ r#"
+ let nrecv = 0;
+ Deno._setAsyncHandler((buf) => {
+ assert(buf.byteLength === 1);
+ assert(buf[0] === 43);
+ nrecv++;
+ });
+ function assert(cond) {
+ if (!cond) {
+ throw Error("assert");
+ }
+ }
+ "#,
+ ));
+ assert_eq!(isolate.behavior.dispatch_count, 0);
+
+ js_check(isolate.execute(
+ "send1.js",
+ r#"
+ let control = new Uint8Array([42]);
+ Deno._sharedQueue.push(control);
+ libdeno.send();
+ assert(nrecv === 0);
+
+ Deno._sharedQueue.push(control);
+ libdeno.send();
+ assert(nrecv === 0);
+ "#,
+ ));
+ assert_eq!(isolate.behavior.dispatch_count, 2);
+ assert_eq!(Ok(Async::Ready(())), isolate.poll());
+
+ js_check(isolate.execute("send1.js", "assert(nrecv === 2);"));
+ }
+
}