summaryrefslogtreecommitdiff
path: root/core/core_isolate.rs
diff options
context:
space:
mode:
authorBert Belder <bertbelder@gmail.com>2020-09-06 02:34:02 +0200
committerGitHub <noreply@github.com>2020-09-06 02:34:02 +0200
commitc821e8f2f1fb8ad5e9eb00854277cafc8c80b2f5 (patch)
treec429a3c2707a4047fb512443a8468b7e15e5730d /core/core_isolate.rs
parent849431eb1d112d1f79f4a327830dc1a5bf22dd47 (diff)
Move JSON ops to deno_core (#7336)
Diffstat (limited to 'core/core_isolate.rs')
-rw-r--r--core/core_isolate.rs300
1 files changed, 101 insertions, 199 deletions
diff --git a/core/core_isolate.rs b/core/core_isolate.rs
index a60ce6a82..52e856174 100644
--- a/core/core_isolate.rs
+++ b/core/core_isolate.rs
@@ -13,16 +13,12 @@ use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use crate::ErrBox;
use crate::JSError;
-use crate::ResourceTable;
-use crate::ZeroCopyBuf;
-use futures::future::FutureExt;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::task::AtomicWaker;
use futures::Future;
-use serde_json::json;
-use serde_json::Value;
use std::any::Any;
+use std::cell::Cell;
use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::From;
@@ -37,7 +33,7 @@ use std::sync::Once;
use std::task::Context;
use std::task::Poll;
-type PendingOpFuture = Pin<Box<dyn Future<Output = (OpId, Buf)>>>;
+type PendingOpFuture = Pin<Box<dyn Future<Output = (OpId, Box<[u8]>)>>>;
/// Stores a script used to initialize a Isolate
pub struct Script<'a> {
@@ -87,7 +83,7 @@ impl StartupData<'_> {
type JSErrorCreateFn = dyn Fn(JSError) -> ErrBox;
-pub type GetErrorClassFn = &'static dyn for<'e> Fn(&'e ErrBox) -> &'static str;
+pub type GetErrorClassFn = dyn for<'e> Fn(&'e ErrBox) -> &'static str;
/// Objects that need to live as long as the isolate
#[derive(Default)]
@@ -118,19 +114,17 @@ pub struct CoreIsolate {
/// Internal state for CoreIsolate which is stored in one of v8::Isolate's
/// embedder slots.
pub struct CoreIsolateState {
- pub resource_table: Rc<RefCell<ResourceTable>>,
pub global_context: Option<v8::Global<v8::Context>>,
pub(crate) shared_ab: Option<v8::Global<v8::SharedArrayBuffer>>,
pub(crate) js_recv_cb: Option<v8::Global<v8::Function>>,
pub(crate) js_macrotask_cb: Option<v8::Global<v8::Function>>,
pub(crate) pending_promise_exceptions: HashMap<i32, v8::Global<v8::Value>>,
pub(crate) js_error_create_fn: Box<JSErrorCreateFn>,
- pub get_error_class_fn: GetErrorClassFn,
pub(crate) shared: SharedQueue,
- pending_ops: FuturesUnordered<PendingOpFuture>,
- pending_unref_ops: FuturesUnordered<PendingOpFuture>,
- have_unpolled_ops: bool,
- pub op_registry: OpRegistry,
+ pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>,
+ pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>,
+ pub(crate) have_unpolled_ops: Cell<bool>,
+ pub(crate) op_router: Rc<dyn OpRouter>,
waker: AtomicWaker,
}
@@ -205,21 +199,27 @@ pub struct HeapLimits {
}
pub(crate) struct IsolateOptions {
- will_snapshot: bool,
+ op_router: Rc<dyn OpRouter>,
startup_script: Option<OwnedScript>,
startup_snapshot: Option<Snapshot>,
+ will_snapshot: bool,
heap_limits: Option<HeapLimits>,
}
impl CoreIsolate {
/// startup_data defines the snapshot or script used at startup to initialize
/// the isolate.
- pub fn new(startup_data: StartupData, will_snapshot: bool) -> Self {
+ pub fn new(
+ op_router: Rc<dyn OpRouter>,
+ startup_data: StartupData,
+ will_snapshot: bool,
+ ) -> Self {
let (startup_script, startup_snapshot) = startup_data.into_options();
let options = IsolateOptions {
- will_snapshot,
+ op_router,
startup_script,
startup_snapshot,
+ will_snapshot,
heap_limits: None,
};
@@ -233,14 +233,16 @@ impl CoreIsolate {
/// Make sure to use [`add_near_heap_limit_callback`](#method.add_near_heap_limit_callback)
/// to prevent v8 from crashing when reaching the upper limit.
pub fn with_heap_limits(
+ op_router: Rc<dyn OpRouter>,
startup_data: StartupData,
heap_limits: HeapLimits,
) -> Self {
let (startup_script, startup_snapshot) = startup_data.into_options();
let options = IsolateOptions {
- will_snapshot: false,
+ op_router,
startup_script,
startup_snapshot,
+ will_snapshot: false,
heap_limits: Some(heap_limits),
};
@@ -304,18 +306,16 @@ impl CoreIsolate {
isolate.set_slot(Rc::new(RefCell::new(CoreIsolateState {
global_context: Some(global_context),
- resource_table: Rc::new(RefCell::new(ResourceTable::default())),
pending_promise_exceptions: HashMap::new(),
shared_ab: None,
js_recv_cb: None,
js_macrotask_cb: None,
js_error_create_fn: Box::new(JSError::create),
- get_error_class_fn: &|_| "Error",
shared: SharedQueue::new(RECOMMENDED_SIZE),
pending_ops: FuturesUnordered::new(),
pending_unref_ops: FuturesUnordered::new(),
- have_unpolled_ops: false,
- op_registry: OpRegistry::new(),
+ have_unpolled_ops: Cell::new(false),
+ op_router: options.op_router,
waker: AtomicWaker::new(),
})));
@@ -421,66 +421,6 @@ impl CoreIsolate {
snapshot
}
- /// Defines the how Deno.core.dispatch() acts.
- /// Called whenever Deno.core.dispatch() is called in JavaScript. zero_copy_buf
- /// corresponds to the second argument of Deno.core.dispatch().
- ///
- /// Requires runtime to explicitly ask for op ids before using any of the ops.
- pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId
- where
- F: Fn(&mut CoreIsolateState, &mut [ZeroCopyBuf]) -> Op + 'static,
- {
- let state_rc = Self::state(self);
- let mut state = state_rc.borrow_mut();
- state.op_registry.register(name, op)
- }
-
- pub fn register_op_json_sync<F>(&mut self, name: &str, op: F) -> OpId
- where
- F: 'static
- + Fn(
- &mut CoreIsolateState,
- serde_json::Value,
- &mut [ZeroCopyBuf],
- ) -> Result<serde_json::Value, ErrBox>,
- {
- let core_op =
- move |state: &mut CoreIsolateState, bufs: &mut [ZeroCopyBuf]| -> Op {
- let value = serde_json::from_slice(&bufs[0]).unwrap();
- let result = op(state, value, &mut bufs[1..]);
- let buf = serialize_result(None, result, state.get_error_class_fn);
- Op::Sync(buf)
- };
-
- let state_rc = Self::state(self);
- let mut state = state_rc.borrow_mut();
- state.op_registry.register(name, core_op)
- }
-
- pub fn register_op_json_async<F, Fut>(&mut self, name: &str, op: F) -> OpId
- where
- Fut: 'static + Future<Output = Result<serde_json::Value, ErrBox>>,
- F: 'static
- + Fn(&mut CoreIsolateState, serde_json::Value, &mut [ZeroCopyBuf]) -> Fut,
- {
- let core_op = move |state: &mut CoreIsolateState,
- bufs: &mut [ZeroCopyBuf]|
- -> Op {
- let get_error_class_fn = state.get_error_class_fn;
- let value: serde_json::Value = serde_json::from_slice(&bufs[0]).unwrap();
- let promise_id = value.get("promiseId").unwrap().as_u64().unwrap();
- let fut = op(state, value, &mut bufs[1..]);
- let fut2 = fut.map(move |result| {
- serialize_result(Some(promise_id), result, get_error_class_fn)
- });
- Op::Async(Box::pin(fut2))
- };
-
- let state_rc = Self::state(self);
- let mut state = state_rc.borrow_mut();
- state.op_registry.register(name, core_op)
- }
-
/// Registers a callback on the isolate when the memory limits are approached.
/// Use this to prevent V8 from crashing the process when reaching the limit.
///
@@ -536,24 +476,6 @@ where
callback(current_heap_limit, initial_heap_limit)
}
-fn serialize_result(
- promise_id: Option<u64>,
- result: Result<Value, ErrBox>,
- get_error_class_fn: GetErrorClassFn,
-) -> Buf {
- let value = match result {
- Ok(v) => json!({ "ok": v, "promiseId": promise_id }),
- Err(err) => json!({
- "promiseId": promise_id ,
- "err": {
- "className": (get_error_class_fn)(&err),
- "message": err.to_string(),
- }
- }),
- };
- serde_json::to_vec(&value).unwrap().into_boxed_slice()
-}
-
impl Future for CoreIsolate {
type Output = Result<(), ErrBox>;
@@ -574,12 +496,12 @@ impl Future for CoreIsolate {
check_promise_exceptions(scope)?;
- let mut overflow_response: Option<(OpId, Buf)> = None;
+ let mut overflow_response: Option<(OpId, Box<[u8]>)> = None;
loop {
let mut state = state_rc.borrow_mut();
// Now handle actual ops.
- state.have_unpolled_ops = false;
+ state.have_unpolled_ops.set(false);
let pending_r = state.pending_ops.poll_next_unpin(cx);
match pending_r {
@@ -644,7 +566,7 @@ impl Future for CoreIsolate {
if state.pending_ops.is_empty() {
Poll::Ready(Ok(()))
} else {
- if state.have_unpolled_ops {
+ if state.have_unpolled_ops.get() {
state.waker.wake();
}
Poll::Pending
@@ -653,18 +575,6 @@ impl Future for CoreIsolate {
}
impl CoreIsolateState {
- /// Defines the how Deno.core.dispatch() acts.
- /// Called whenever Deno.core.dispatch() is called in JavaScript. zero_copy_buf
- /// corresponds to the second argument of Deno.core.dispatch().
- ///
- /// Requires runtime to explicitly ask for op ids before using any of the ops.
- pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId
- where
- F: Fn(&mut CoreIsolateState, &mut [ZeroCopyBuf]) -> Op + 'static,
- {
- self.op_registry.register(name, op)
- }
-
/// Allows a callback to be set whenever a V8 exception is made. This allows
/// the caller to wrap the JSError into an error. By default this callback
/// is set to JSError::create.
@@ -674,49 +584,6 @@ impl CoreIsolateState {
) {
self.js_error_create_fn = Box::new(f);
}
-
- pub fn set_get_error_class_fn(&mut self, f: GetErrorClassFn) {
- self.get_error_class_fn = f;
- }
-
- pub fn dispatch_op<'s>(
- &mut self,
- scope: &mut v8::HandleScope<'s>,
- op_id: OpId,
- zero_copy_bufs: &mut [ZeroCopyBuf],
- ) -> Option<(OpId, Box<[u8]>)> {
- let op = if let Some(dispatcher) = self.op_registry.get(op_id) {
- dispatcher(self, zero_copy_bufs)
- } else {
- let message =
- v8::String::new(scope, &format!("Unknown op id: {}", op_id)).unwrap();
- let exception = v8::Exception::type_error(scope, message);
- scope.throw_exception(exception);
- return None;
- };
-
- debug_assert_eq!(self.shared.size(), 0);
- match op {
- Op::Sync(buf) => {
- // For sync messages, we always return the response via Deno.core.send's
- // return value. Sync messages ignore the op_id.
- let op_id = 0;
- Some((op_id, buf))
- }
- Op::Async(fut) => {
- let fut2 = fut.map(move |buf| (op_id, buf));
- self.pending_ops.push(fut2.boxed_local());
- self.have_unpolled_ops = true;
- None
- }
- Op::AsyncUnref(fut) => {
- let fut2 = fut.map(move |buf| (op_id, buf));
- self.pending_unref_ops.push(fut2.boxed_local());
- self.have_unpolled_ops = true;
- None
- }
- }
- }
}
fn async_op_response<'s>(
@@ -739,7 +606,7 @@ fn async_op_response<'s>(
let op_id: v8::Local<v8::Value> =
v8::Integer::new(tc_scope, op_id as i32).into();
let ui8: v8::Local<v8::Value> =
- bindings::boxed_slice_to_uint8array(tc_scope, buf).into();
+ boxed_slice_to_uint8array(tc_scope, buf).into();
js_recv_cb.call(tc_scope, global, &[op_id, ui8])
}
None => js_recv_cb.call(tc_scope, global, &[]),
@@ -848,11 +715,28 @@ pub fn js_check<T>(r: Result<T, ErrBox>) -> T {
r.unwrap()
}
+fn boxed_slice_to_uint8array<'sc>(
+ scope: &mut v8::HandleScope<'sc>,
+ buf: Box<[u8]>,
+) -> v8::Local<'sc, v8::Uint8Array> {
+ assert!(!buf.is_empty());
+ let buf_len = buf.len();
+ let backing_store = v8::ArrayBuffer::new_backing_store_from_boxed_slice(buf);
+ let backing_store_shared = backing_store.make_shared();
+ let ab = v8::ArrayBuffer::with_backing_store(scope, &backing_store_shared);
+ v8::Uint8Array::new(scope, ab, 0, buf_len)
+ .expect("Failed to create UintArray8")
+}
+
#[cfg(test)]
pub mod tests {
use super::*;
+ use crate::BasicState;
+ use crate::BufVec;
use futures::future::lazy;
+ use futures::FutureExt;
use std::ops::FnOnce;
+ use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
@@ -880,7 +764,7 @@ pub mod tests {
)
}
- pub enum Mode {
+ enum Mode {
Async,
AsyncUnref,
AsyncZeroCopy(u8),
@@ -890,28 +774,29 @@ pub mod tests {
OverflowResAsync,
}
- pub fn setup(mode: Mode) -> (CoreIsolate, Arc<AtomicUsize>) {
- let dispatch_count = Arc::new(AtomicUsize::new(0));
- let dispatch_count_ = dispatch_count.clone();
-
- let mut isolate = CoreIsolate::new(StartupData::None, false);
+ struct TestOpRouter {
+ mode: Mode,
+ dispatch_count: Arc<AtomicUsize>,
+ }
- let dispatcher = move |_state: &mut CoreIsolateState,
- zero_copy: &mut [ZeroCopyBuf]|
- -> Op {
- dispatch_count_.fetch_add(1, Ordering::Relaxed);
- match mode {
+ impl OpRouter for TestOpRouter {
+ fn route_op(self: Rc<Self>, op_id: OpId, bufs: BufVec) -> Op {
+ if op_id != 1 {
+ return Op::NotFound;
+ }
+ self.dispatch_count.fetch_add(1, Ordering::Relaxed);
+ match self.mode {
Mode::Async => {
- assert_eq!(zero_copy.len(), 1);
- assert_eq!(zero_copy[0].len(), 1);
- assert_eq!(zero_copy[0][0], 42);
+ assert_eq!(bufs.len(), 1);
+ assert_eq!(bufs[0].len(), 1);
+ assert_eq!(bufs[0][0], 42);
let buf = vec![43u8].into_boxed_slice();
Op::Async(futures::future::ready(buf).boxed())
}
Mode::AsyncUnref => {
- assert_eq!(zero_copy.len(), 1);
- assert_eq!(zero_copy[0].len(), 1);
- assert_eq!(zero_copy[0][0], 42);
+ assert_eq!(bufs.len(), 1);
+ assert_eq!(bufs[0].len(), 1);
+ assert_eq!(bufs[0][0], 42);
let fut = async {
// This future never finish.
futures::future::pending::<()>().await;
@@ -920,8 +805,8 @@ pub mod tests {
Op::AsyncUnref(fut.boxed())
}
Mode::AsyncZeroCopy(count) => {
- assert_eq!(zero_copy.len(), count as usize);
- zero_copy.iter().enumerate().for_each(|(idx, buf)| {
+ assert_eq!(bufs.len(), count as usize);
+ bufs.iter().enumerate().for_each(|(idx, buf)| {
assert_eq!(buf.len(), 1);
assert_eq!(idx, buf[0] as usize);
});
@@ -930,15 +815,15 @@ pub mod tests {
Op::Async(futures::future::ready(buf).boxed())
}
Mode::OverflowReqSync => {
- assert_eq!(zero_copy.len(), 1);
- assert_eq!(zero_copy[0].len(), 100 * 1024 * 1024);
+ assert_eq!(bufs.len(), 1);
+ assert_eq!(bufs[0].len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
Op::Sync(buf)
}
Mode::OverflowResSync => {
- assert_eq!(zero_copy.len(), 1);
- assert_eq!(zero_copy[0].len(), 1);
- assert_eq!(zero_copy[0][0], 42);
+ assert_eq!(bufs.len(), 1);
+ assert_eq!(bufs[0].len(), 1);
+ assert_eq!(bufs[0][0], 42);
let mut vec = Vec::<u8>::new();
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 99;
@@ -946,15 +831,15 @@ pub mod tests {
Op::Sync(buf)
}
Mode::OverflowReqAsync => {
- assert_eq!(zero_copy.len(), 1);
- assert_eq!(zero_copy[0].len(), 100 * 1024 * 1024);
+ assert_eq!(bufs.len(), 1);
+ assert_eq!(bufs[0].len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
Op::Async(futures::future::ready(buf).boxed())
}
Mode::OverflowResAsync => {
- assert_eq!(zero_copy.len(), 1);
- assert_eq!(zero_copy[0].len(), 1);
- assert_eq!(zero_copy[0][0], 42);
+ assert_eq!(bufs.len(), 1);
+ assert_eq!(bufs[0].len(), 1);
+ assert_eq!(bufs[0][0], 42);
let mut vec = Vec::<u8>::new();
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 4;
@@ -962,9 +847,16 @@ pub mod tests {
Op::Async(futures::future::ready(buf).boxed())
}
}
- };
+ }
+ }
- isolate.register_op("test", dispatcher);
+ fn setup(mode: Mode) -> (CoreIsolate, Arc<AtomicUsize>) {
+ let dispatch_count = Arc::new(AtomicUsize::new(0));
+ let test_state = Rc::new(TestOpRouter {
+ mode,
+ dispatch_count: dispatch_count.clone(),
+ });
+ let mut isolate = CoreIsolate::new(test_state, StartupData::None, false);
js_check(isolate.execute(
"setup.js",
@@ -1328,7 +1220,8 @@ pub mod tests {
#[test]
fn syntax_error() {
- let mut isolate = CoreIsolate::new(StartupData::None, false);
+ let mut isolate =
+ CoreIsolate::new(BasicState::new(), StartupData::None, false);
let src = "hocuspocus(";
let r = isolate.execute("i.js", src);
let e = r.unwrap_err();
@@ -1353,27 +1246,29 @@ pub mod tests {
#[test]
fn will_snapshot() {
let snapshot = {
- let mut isolate = CoreIsolate::new(StartupData::None, true);
+ let mut isolate =
+ CoreIsolate::new(BasicState::new(), StartupData::None, true);
js_check(isolate.execute("a.js", "a = 1 + 2"));
isolate.snapshot()
};
let startup_data = StartupData::Snapshot(Snapshot::JustCreated(snapshot));
- let mut isolate2 = CoreIsolate::new(startup_data, false);
+ let mut isolate2 = CoreIsolate::new(BasicState::new(), startup_data, false);
js_check(isolate2.execute("check.js", "if (a != 3) throw Error('x')"));
}
#[test]
fn test_from_boxed_snapshot() {
let snapshot = {
- let mut isolate = CoreIsolate::new(StartupData::None, true);
+ let mut isolate =
+ CoreIsolate::new(BasicState::new(), StartupData::None, true);
js_check(isolate.execute("a.js", "a = 1 + 2"));
let snap: &[u8] = &*isolate.snapshot();
Vec::from(snap).into_boxed_slice()
};
let startup_data = StartupData::Snapshot(Snapshot::Boxed(snapshot));
- let mut isolate2 = CoreIsolate::new(startup_data, false);
+ let mut isolate2 = CoreIsolate::new(BasicState::new(), startup_data, false);
js_check(isolate2.execute("check.js", "if (a != 3) throw Error('x')"));
}
@@ -1383,8 +1278,11 @@ pub mod tests {
initial: 0,
max: 20 * 1024, // 20 kB
};
- let mut isolate =
- CoreIsolate::with_heap_limits(StartupData::None, heap_limits);
+ let mut isolate = CoreIsolate::with_heap_limits(
+ BasicState::new(),
+ StartupData::None,
+ heap_limits,
+ );
let cb_handle = isolate.thread_safe_handle();
let callback_invoke_count = Rc::new(AtomicUsize::default());
@@ -1412,7 +1310,8 @@ pub mod tests {
#[test]
fn test_heap_limit_cb_remove() {
- let mut isolate = CoreIsolate::new(StartupData::None, false);
+ let mut isolate =
+ CoreIsolate::new(BasicState::new(), StartupData::None, false);
isolate.add_near_heap_limit_callback(|current_limit, _initial_limit| {
current_limit * 2
@@ -1427,8 +1326,11 @@ pub mod tests {
initial: 0,
max: 20 * 1024, // 20 kB
};
- let mut isolate =
- CoreIsolate::with_heap_limits(StartupData::None, heap_limits);
+ let mut isolate = CoreIsolate::with_heap_limits(
+ BasicState::new(),
+ StartupData::None,
+ heap_limits,
+ );
let cb_handle = isolate.thread_safe_handle();
let callback_invoke_count_first = Rc::new(AtomicUsize::default());