diff options
author | Bert Belder <bertbelder@gmail.com> | 2020-09-06 02:34:02 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-06 02:34:02 +0200 |
commit | c821e8f2f1fb8ad5e9eb00854277cafc8c80b2f5 (patch) | |
tree | c429a3c2707a4047fb512443a8468b7e15e5730d /core/core_isolate.rs | |
parent | 849431eb1d112d1f79f4a327830dc1a5bf22dd47 (diff) |
Move JSON ops to deno_core (#7336)
Diffstat (limited to 'core/core_isolate.rs')
-rw-r--r-- | core/core_isolate.rs | 300 |
1 files changed, 101 insertions, 199 deletions
diff --git a/core/core_isolate.rs b/core/core_isolate.rs index a60ce6a82..52e856174 100644 --- a/core/core_isolate.rs +++ b/core/core_isolate.rs @@ -13,16 +13,12 @@ use crate::shared_queue::SharedQueue; use crate::shared_queue::RECOMMENDED_SIZE; use crate::ErrBox; use crate::JSError; -use crate::ResourceTable; -use crate::ZeroCopyBuf; -use futures::future::FutureExt; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use futures::task::AtomicWaker; use futures::Future; -use serde_json::json; -use serde_json::Value; use std::any::Any; +use std::cell::Cell; use std::cell::RefCell; use std::collections::HashMap; use std::convert::From; @@ -37,7 +33,7 @@ use std::sync::Once; use std::task::Context; use std::task::Poll; -type PendingOpFuture = Pin<Box<dyn Future<Output = (OpId, Buf)>>>; +type PendingOpFuture = Pin<Box<dyn Future<Output = (OpId, Box<[u8]>)>>>; /// Stores a script used to initialize a Isolate pub struct Script<'a> { @@ -87,7 +83,7 @@ impl StartupData<'_> { type JSErrorCreateFn = dyn Fn(JSError) -> ErrBox; -pub type GetErrorClassFn = &'static dyn for<'e> Fn(&'e ErrBox) -> &'static str; +pub type GetErrorClassFn = dyn for<'e> Fn(&'e ErrBox) -> &'static str; /// Objects that need to live as long as the isolate #[derive(Default)] @@ -118,19 +114,17 @@ pub struct CoreIsolate { /// Internal state for CoreIsolate which is stored in one of v8::Isolate's /// embedder slots. pub struct CoreIsolateState { - pub resource_table: Rc<RefCell<ResourceTable>>, pub global_context: Option<v8::Global<v8::Context>>, pub(crate) shared_ab: Option<v8::Global<v8::SharedArrayBuffer>>, pub(crate) js_recv_cb: Option<v8::Global<v8::Function>>, pub(crate) js_macrotask_cb: Option<v8::Global<v8::Function>>, pub(crate) pending_promise_exceptions: HashMap<i32, v8::Global<v8::Value>>, pub(crate) js_error_create_fn: Box<JSErrorCreateFn>, - pub get_error_class_fn: GetErrorClassFn, pub(crate) shared: SharedQueue, - pending_ops: FuturesUnordered<PendingOpFuture>, - pending_unref_ops: FuturesUnordered<PendingOpFuture>, - have_unpolled_ops: bool, - pub op_registry: OpRegistry, + pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>, + pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>, + pub(crate) have_unpolled_ops: Cell<bool>, + pub(crate) op_router: Rc<dyn OpRouter>, waker: AtomicWaker, } @@ -205,21 +199,27 @@ pub struct HeapLimits { } pub(crate) struct IsolateOptions { - will_snapshot: bool, + op_router: Rc<dyn OpRouter>, startup_script: Option<OwnedScript>, startup_snapshot: Option<Snapshot>, + will_snapshot: bool, heap_limits: Option<HeapLimits>, } impl CoreIsolate { /// startup_data defines the snapshot or script used at startup to initialize /// the isolate. - pub fn new(startup_data: StartupData, will_snapshot: bool) -> Self { + pub fn new( + op_router: Rc<dyn OpRouter>, + startup_data: StartupData, + will_snapshot: bool, + ) -> Self { let (startup_script, startup_snapshot) = startup_data.into_options(); let options = IsolateOptions { - will_snapshot, + op_router, startup_script, startup_snapshot, + will_snapshot, heap_limits: None, }; @@ -233,14 +233,16 @@ impl CoreIsolate { /// Make sure to use [`add_near_heap_limit_callback`](#method.add_near_heap_limit_callback) /// to prevent v8 from crashing when reaching the upper limit. pub fn with_heap_limits( + op_router: Rc<dyn OpRouter>, startup_data: StartupData, heap_limits: HeapLimits, ) -> Self { let (startup_script, startup_snapshot) = startup_data.into_options(); let options = IsolateOptions { - will_snapshot: false, + op_router, startup_script, startup_snapshot, + will_snapshot: false, heap_limits: Some(heap_limits), }; @@ -304,18 +306,16 @@ impl CoreIsolate { isolate.set_slot(Rc::new(RefCell::new(CoreIsolateState { global_context: Some(global_context), - resource_table: Rc::new(RefCell::new(ResourceTable::default())), pending_promise_exceptions: HashMap::new(), shared_ab: None, js_recv_cb: None, js_macrotask_cb: None, js_error_create_fn: Box::new(JSError::create), - get_error_class_fn: &|_| "Error", shared: SharedQueue::new(RECOMMENDED_SIZE), pending_ops: FuturesUnordered::new(), pending_unref_ops: FuturesUnordered::new(), - have_unpolled_ops: false, - op_registry: OpRegistry::new(), + have_unpolled_ops: Cell::new(false), + op_router: options.op_router, waker: AtomicWaker::new(), }))); @@ -421,66 +421,6 @@ impl CoreIsolate { snapshot } - /// Defines the how Deno.core.dispatch() acts. - /// Called whenever Deno.core.dispatch() is called in JavaScript. zero_copy_buf - /// corresponds to the second argument of Deno.core.dispatch(). - /// - /// Requires runtime to explicitly ask for op ids before using any of the ops. - pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId - where - F: Fn(&mut CoreIsolateState, &mut [ZeroCopyBuf]) -> Op + 'static, - { - let state_rc = Self::state(self); - let mut state = state_rc.borrow_mut(); - state.op_registry.register(name, op) - } - - pub fn register_op_json_sync<F>(&mut self, name: &str, op: F) -> OpId - where - F: 'static - + Fn( - &mut CoreIsolateState, - serde_json::Value, - &mut [ZeroCopyBuf], - ) -> Result<serde_json::Value, ErrBox>, - { - let core_op = - move |state: &mut CoreIsolateState, bufs: &mut [ZeroCopyBuf]| -> Op { - let value = serde_json::from_slice(&bufs[0]).unwrap(); - let result = op(state, value, &mut bufs[1..]); - let buf = serialize_result(None, result, state.get_error_class_fn); - Op::Sync(buf) - }; - - let state_rc = Self::state(self); - let mut state = state_rc.borrow_mut(); - state.op_registry.register(name, core_op) - } - - pub fn register_op_json_async<F, Fut>(&mut self, name: &str, op: F) -> OpId - where - Fut: 'static + Future<Output = Result<serde_json::Value, ErrBox>>, - F: 'static - + Fn(&mut CoreIsolateState, serde_json::Value, &mut [ZeroCopyBuf]) -> Fut, - { - let core_op = move |state: &mut CoreIsolateState, - bufs: &mut [ZeroCopyBuf]| - -> Op { - let get_error_class_fn = state.get_error_class_fn; - let value: serde_json::Value = serde_json::from_slice(&bufs[0]).unwrap(); - let promise_id = value.get("promiseId").unwrap().as_u64().unwrap(); - let fut = op(state, value, &mut bufs[1..]); - let fut2 = fut.map(move |result| { - serialize_result(Some(promise_id), result, get_error_class_fn) - }); - Op::Async(Box::pin(fut2)) - }; - - let state_rc = Self::state(self); - let mut state = state_rc.borrow_mut(); - state.op_registry.register(name, core_op) - } - /// Registers a callback on the isolate when the memory limits are approached. /// Use this to prevent V8 from crashing the process when reaching the limit. /// @@ -536,24 +476,6 @@ where callback(current_heap_limit, initial_heap_limit) } -fn serialize_result( - promise_id: Option<u64>, - result: Result<Value, ErrBox>, - get_error_class_fn: GetErrorClassFn, -) -> Buf { - let value = match result { - Ok(v) => json!({ "ok": v, "promiseId": promise_id }), - Err(err) => json!({ - "promiseId": promise_id , - "err": { - "className": (get_error_class_fn)(&err), - "message": err.to_string(), - } - }), - }; - serde_json::to_vec(&value).unwrap().into_boxed_slice() -} - impl Future for CoreIsolate { type Output = Result<(), ErrBox>; @@ -574,12 +496,12 @@ impl Future for CoreIsolate { check_promise_exceptions(scope)?; - let mut overflow_response: Option<(OpId, Buf)> = None; + let mut overflow_response: Option<(OpId, Box<[u8]>)> = None; loop { let mut state = state_rc.borrow_mut(); // Now handle actual ops. - state.have_unpolled_ops = false; + state.have_unpolled_ops.set(false); let pending_r = state.pending_ops.poll_next_unpin(cx); match pending_r { @@ -644,7 +566,7 @@ impl Future for CoreIsolate { if state.pending_ops.is_empty() { Poll::Ready(Ok(())) } else { - if state.have_unpolled_ops { + if state.have_unpolled_ops.get() { state.waker.wake(); } Poll::Pending @@ -653,18 +575,6 @@ impl Future for CoreIsolate { } impl CoreIsolateState { - /// Defines the how Deno.core.dispatch() acts. - /// Called whenever Deno.core.dispatch() is called in JavaScript. zero_copy_buf - /// corresponds to the second argument of Deno.core.dispatch(). - /// - /// Requires runtime to explicitly ask for op ids before using any of the ops. - pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId - where - F: Fn(&mut CoreIsolateState, &mut [ZeroCopyBuf]) -> Op + 'static, - { - self.op_registry.register(name, op) - } - /// Allows a callback to be set whenever a V8 exception is made. This allows /// the caller to wrap the JSError into an error. By default this callback /// is set to JSError::create. @@ -674,49 +584,6 @@ impl CoreIsolateState { ) { self.js_error_create_fn = Box::new(f); } - - pub fn set_get_error_class_fn(&mut self, f: GetErrorClassFn) { - self.get_error_class_fn = f; - } - - pub fn dispatch_op<'s>( - &mut self, - scope: &mut v8::HandleScope<'s>, - op_id: OpId, - zero_copy_bufs: &mut [ZeroCopyBuf], - ) -> Option<(OpId, Box<[u8]>)> { - let op = if let Some(dispatcher) = self.op_registry.get(op_id) { - dispatcher(self, zero_copy_bufs) - } else { - let message = - v8::String::new(scope, &format!("Unknown op id: {}", op_id)).unwrap(); - let exception = v8::Exception::type_error(scope, message); - scope.throw_exception(exception); - return None; - }; - - debug_assert_eq!(self.shared.size(), 0); - match op { - Op::Sync(buf) => { - // For sync messages, we always return the response via Deno.core.send's - // return value. Sync messages ignore the op_id. - let op_id = 0; - Some((op_id, buf)) - } - Op::Async(fut) => { - let fut2 = fut.map(move |buf| (op_id, buf)); - self.pending_ops.push(fut2.boxed_local()); - self.have_unpolled_ops = true; - None - } - Op::AsyncUnref(fut) => { - let fut2 = fut.map(move |buf| (op_id, buf)); - self.pending_unref_ops.push(fut2.boxed_local()); - self.have_unpolled_ops = true; - None - } - } - } } fn async_op_response<'s>( @@ -739,7 +606,7 @@ fn async_op_response<'s>( let op_id: v8::Local<v8::Value> = v8::Integer::new(tc_scope, op_id as i32).into(); let ui8: v8::Local<v8::Value> = - bindings::boxed_slice_to_uint8array(tc_scope, buf).into(); + boxed_slice_to_uint8array(tc_scope, buf).into(); js_recv_cb.call(tc_scope, global, &[op_id, ui8]) } None => js_recv_cb.call(tc_scope, global, &[]), @@ -848,11 +715,28 @@ pub fn js_check<T>(r: Result<T, ErrBox>) -> T { r.unwrap() } +fn boxed_slice_to_uint8array<'sc>( + scope: &mut v8::HandleScope<'sc>, + buf: Box<[u8]>, +) -> v8::Local<'sc, v8::Uint8Array> { + assert!(!buf.is_empty()); + let buf_len = buf.len(); + let backing_store = v8::ArrayBuffer::new_backing_store_from_boxed_slice(buf); + let backing_store_shared = backing_store.make_shared(); + let ab = v8::ArrayBuffer::with_backing_store(scope, &backing_store_shared); + v8::Uint8Array::new(scope, ab, 0, buf_len) + .expect("Failed to create UintArray8") +} + #[cfg(test)] pub mod tests { use super::*; + use crate::BasicState; + use crate::BufVec; use futures::future::lazy; + use futures::FutureExt; use std::ops::FnOnce; + use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -880,7 +764,7 @@ pub mod tests { ) } - pub enum Mode { + enum Mode { Async, AsyncUnref, AsyncZeroCopy(u8), @@ -890,28 +774,29 @@ pub mod tests { OverflowResAsync, } - pub fn setup(mode: Mode) -> (CoreIsolate, Arc<AtomicUsize>) { - let dispatch_count = Arc::new(AtomicUsize::new(0)); - let dispatch_count_ = dispatch_count.clone(); - - let mut isolate = CoreIsolate::new(StartupData::None, false); + struct TestOpRouter { + mode: Mode, + dispatch_count: Arc<AtomicUsize>, + } - let dispatcher = move |_state: &mut CoreIsolateState, - zero_copy: &mut [ZeroCopyBuf]| - -> Op { - dispatch_count_.fetch_add(1, Ordering::Relaxed); - match mode { + impl OpRouter for TestOpRouter { + fn route_op(self: Rc<Self>, op_id: OpId, bufs: BufVec) -> Op { + if op_id != 1 { + return Op::NotFound; + } + self.dispatch_count.fetch_add(1, Ordering::Relaxed); + match self.mode { Mode::Async => { - assert_eq!(zero_copy.len(), 1); - assert_eq!(zero_copy[0].len(), 1); - assert_eq!(zero_copy[0][0], 42); + assert_eq!(bufs.len(), 1); + assert_eq!(bufs[0].len(), 1); + assert_eq!(bufs[0][0], 42); let buf = vec![43u8].into_boxed_slice(); Op::Async(futures::future::ready(buf).boxed()) } Mode::AsyncUnref => { - assert_eq!(zero_copy.len(), 1); - assert_eq!(zero_copy[0].len(), 1); - assert_eq!(zero_copy[0][0], 42); + assert_eq!(bufs.len(), 1); + assert_eq!(bufs[0].len(), 1); + assert_eq!(bufs[0][0], 42); let fut = async { // This future never finish. futures::future::pending::<()>().await; @@ -920,8 +805,8 @@ pub mod tests { Op::AsyncUnref(fut.boxed()) } Mode::AsyncZeroCopy(count) => { - assert_eq!(zero_copy.len(), count as usize); - zero_copy.iter().enumerate().for_each(|(idx, buf)| { + assert_eq!(bufs.len(), count as usize); + bufs.iter().enumerate().for_each(|(idx, buf)| { assert_eq!(buf.len(), 1); assert_eq!(idx, buf[0] as usize); }); @@ -930,15 +815,15 @@ pub mod tests { Op::Async(futures::future::ready(buf).boxed()) } Mode::OverflowReqSync => { - assert_eq!(zero_copy.len(), 1); - assert_eq!(zero_copy[0].len(), 100 * 1024 * 1024); + assert_eq!(bufs.len(), 1); + assert_eq!(bufs[0].len(), 100 * 1024 * 1024); let buf = vec![43u8].into_boxed_slice(); Op::Sync(buf) } Mode::OverflowResSync => { - assert_eq!(zero_copy.len(), 1); - assert_eq!(zero_copy[0].len(), 1); - assert_eq!(zero_copy[0][0], 42); + assert_eq!(bufs.len(), 1); + assert_eq!(bufs[0].len(), 1); + assert_eq!(bufs[0][0], 42); let mut vec = Vec::<u8>::new(); vec.resize(100 * 1024 * 1024, 0); vec[0] = 99; @@ -946,15 +831,15 @@ pub mod tests { Op::Sync(buf) } Mode::OverflowReqAsync => { - assert_eq!(zero_copy.len(), 1); - assert_eq!(zero_copy[0].len(), 100 * 1024 * 1024); + assert_eq!(bufs.len(), 1); + assert_eq!(bufs[0].len(), 100 * 1024 * 1024); let buf = vec![43u8].into_boxed_slice(); Op::Async(futures::future::ready(buf).boxed()) } Mode::OverflowResAsync => { - assert_eq!(zero_copy.len(), 1); - assert_eq!(zero_copy[0].len(), 1); - assert_eq!(zero_copy[0][0], 42); + assert_eq!(bufs.len(), 1); + assert_eq!(bufs[0].len(), 1); + assert_eq!(bufs[0][0], 42); let mut vec = Vec::<u8>::new(); vec.resize(100 * 1024 * 1024, 0); vec[0] = 4; @@ -962,9 +847,16 @@ pub mod tests { Op::Async(futures::future::ready(buf).boxed()) } } - }; + } + } - isolate.register_op("test", dispatcher); + fn setup(mode: Mode) -> (CoreIsolate, Arc<AtomicUsize>) { + let dispatch_count = Arc::new(AtomicUsize::new(0)); + let test_state = Rc::new(TestOpRouter { + mode, + dispatch_count: dispatch_count.clone(), + }); + let mut isolate = CoreIsolate::new(test_state, StartupData::None, false); js_check(isolate.execute( "setup.js", @@ -1328,7 +1220,8 @@ pub mod tests { #[test] fn syntax_error() { - let mut isolate = CoreIsolate::new(StartupData::None, false); + let mut isolate = + CoreIsolate::new(BasicState::new(), StartupData::None, false); let src = "hocuspocus("; let r = isolate.execute("i.js", src); let e = r.unwrap_err(); @@ -1353,27 +1246,29 @@ pub mod tests { #[test] fn will_snapshot() { let snapshot = { - let mut isolate = CoreIsolate::new(StartupData::None, true); + let mut isolate = + CoreIsolate::new(BasicState::new(), StartupData::None, true); js_check(isolate.execute("a.js", "a = 1 + 2")); isolate.snapshot() }; let startup_data = StartupData::Snapshot(Snapshot::JustCreated(snapshot)); - let mut isolate2 = CoreIsolate::new(startup_data, false); + let mut isolate2 = CoreIsolate::new(BasicState::new(), startup_data, false); js_check(isolate2.execute("check.js", "if (a != 3) throw Error('x')")); } #[test] fn test_from_boxed_snapshot() { let snapshot = { - let mut isolate = CoreIsolate::new(StartupData::None, true); + let mut isolate = + CoreIsolate::new(BasicState::new(), StartupData::None, true); js_check(isolate.execute("a.js", "a = 1 + 2")); let snap: &[u8] = &*isolate.snapshot(); Vec::from(snap).into_boxed_slice() }; let startup_data = StartupData::Snapshot(Snapshot::Boxed(snapshot)); - let mut isolate2 = CoreIsolate::new(startup_data, false); + let mut isolate2 = CoreIsolate::new(BasicState::new(), startup_data, false); js_check(isolate2.execute("check.js", "if (a != 3) throw Error('x')")); } @@ -1383,8 +1278,11 @@ pub mod tests { initial: 0, max: 20 * 1024, // 20 kB }; - let mut isolate = - CoreIsolate::with_heap_limits(StartupData::None, heap_limits); + let mut isolate = CoreIsolate::with_heap_limits( + BasicState::new(), + StartupData::None, + heap_limits, + ); let cb_handle = isolate.thread_safe_handle(); let callback_invoke_count = Rc::new(AtomicUsize::default()); @@ -1412,7 +1310,8 @@ pub mod tests { #[test] fn test_heap_limit_cb_remove() { - let mut isolate = CoreIsolate::new(StartupData::None, false); + let mut isolate = + CoreIsolate::new(BasicState::new(), StartupData::None, false); isolate.add_near_heap_limit_callback(|current_limit, _initial_limit| { current_limit * 2 @@ -1427,8 +1326,11 @@ pub mod tests { initial: 0, max: 20 * 1024, // 20 kB }; - let mut isolate = - CoreIsolate::with_heap_limits(StartupData::None, heap_limits); + let mut isolate = CoreIsolate::with_heap_limits( + BasicState::new(), + StartupData::None, + heap_limits, + ); let cb_handle = isolate.thread_safe_handle(); let callback_invoke_count_first = Rc::new(AtomicUsize::default()); |