diff options
author | Bert Belder <bertbelder@gmail.com> | 2020-09-06 02:34:02 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-06 02:34:02 +0200 |
commit | c821e8f2f1fb8ad5e9eb00854277cafc8c80b2f5 (patch) | |
tree | c429a3c2707a4047fb512443a8468b7e15e5730d /core | |
parent | 849431eb1d112d1f79f4a327830dc1a5bf22dd47 (diff) |
Move JSON ops to deno_core (#7336)
Diffstat (limited to 'core')
-rw-r--r-- | core/Cargo.toml | 1 | ||||
-rw-r--r-- | core/basic_state.rs | 91 | ||||
-rw-r--r-- | core/bindings.rs | 75 | ||||
-rw-r--r-- | core/core_isolate.rs | 300 | ||||
-rw-r--r-- | core/errors.rs | 8 | ||||
-rw-r--r-- | core/es_isolate.rs | 40 | ||||
-rw-r--r-- | core/examples/http_bench_bin_ops.rs | 223 | ||||
-rw-r--r-- | core/examples/http_bench_json_ops.rs | 163 | ||||
-rw-r--r-- | core/flags.rs | 1 | ||||
-rw-r--r-- | core/lib.rs | 8 | ||||
-rw-r--r-- | core/modules.rs | 27 | ||||
-rw-r--r-- | core/ops.rs | 245 | ||||
-rw-r--r-- | core/plugin_api.rs | 1 | ||||
-rw-r--r-- | core/shared_queue.rs | 8 |
14 files changed, 597 insertions, 594 deletions
diff --git a/core/Cargo.toml b/core/Cargo.toml index 37d2f3268..94681cf7d 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -22,6 +22,7 @@ rusty_v8 = "0.9.1" serde_json = { version = "1.0.57", features = ["preserve_order"] } smallvec = "1.4.2" url = "2.1.1" +indexmap = "1.5.2" [[example]] name = "http_bench_bin_ops" diff --git a/core/basic_state.rs b/core/basic_state.rs new file mode 100644 index 000000000..54b9ee132 --- /dev/null +++ b/core/basic_state.rs @@ -0,0 +1,91 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use crate::BufVec; +use crate::Op; +use crate::OpId; +use crate::OpRegistry; +use crate::OpRouter; +use crate::OpTable; +use crate::ResourceTable; +use std::cell::RefCell; +use std::collections::HashMap; +use std::rc::Rc; + +/// A minimal state struct for use by tests, examples etc. It contains +/// an OpTable and ResourceTable, and implements the relevant traits +/// for working with ops in the most straightforward way possible. +#[derive(Default)] +pub struct BasicState { + pub op_table: RefCell<OpTable<Self>>, + pub resource_table: RefCell<ResourceTable>, +} + +impl BasicState { + pub fn new() -> Rc<Self> { + Default::default() + } +} + +impl OpRegistry for BasicState { + fn get_op_catalog(self: Rc<Self>) -> HashMap<String, OpId> { + self.op_table.borrow().get_op_catalog() + } + + fn register_op<F>(&self, name: &str, op_fn: F) -> OpId + where + F: Fn(Rc<Self>, BufVec) -> Op + 'static, + { + let mut op_table = self.op_table.borrow_mut(); + let (op_id, prev) = op_table.insert_full(name.to_owned(), Rc::new(op_fn)); + assert!(prev.is_none()); + op_id + } +} + +impl OpRouter for BasicState { + fn route_op(self: Rc<Self>, op_id: OpId, bufs: BufVec) -> Op { + let op_fn = self + .op_table + .borrow() + .get_index(op_id) + .map(|(_, op_fn)| op_fn.clone()) + .unwrap(); + (op_fn)(self, bufs) + } +} + +#[test] +fn test_basic_state_ops() { + let state = BasicState::new(); + + let foo_id = state.register_op("foo", |_, _| Op::Sync(b"oof!"[..].into())); + assert_eq!(foo_id, 1); + + let bar_id = state.register_op("bar", |_, _| Op::Sync(b"rab!"[..].into())); + assert_eq!(bar_id, 2); + + let state_ = state.clone(); + let foo_res = state_.route_op(foo_id, Default::default()); + assert!(matches!(foo_res, Op::Sync(buf) if &*buf == b"oof!")); + + let state_ = state.clone(); + let bar_res = state_.route_op(bar_id, Default::default()); + assert!(matches!(bar_res, Op::Sync(buf) if &*buf == b"rab!")); + + let catalog_res = state.route_op(0, Default::default()); + let mut catalog_entries = match catalog_res { + Op::Sync(buf) => serde_json::from_slice::<HashMap<String, OpId>>(&buf) + .map(|map| map.into_iter().collect::<Vec<_>>()) + .unwrap(), + _ => panic!("unexpected `Op` variant"), + }; + catalog_entries.sort_by(|(_, id1), (_, id2)| id1.partial_cmp(id2).unwrap()); + assert_eq!( + catalog_entries, + vec![ + ("ops".to_owned(), 0), + ("foo".to_owned(), 1), + ("bar".to_owned(), 2) + ] + ) +} diff --git a/core/bindings.rs b/core/bindings.rs index f0181a227..166c0ee6e 100644 --- a/core/bindings.rs +++ b/core/bindings.rs @@ -2,18 +2,19 @@ use crate::CoreIsolate; use crate::CoreIsolateState; +use crate::ErrBox; use crate::EsIsolate; use crate::JSError; +use crate::Op; +use crate::OpId; use crate::ZeroCopyBuf; - +use futures::future::FutureExt; use rusty_v8 as v8; -use v8::MapFnTo; - -use smallvec::SmallVec; use std::cell::Cell; use std::convert::TryFrom; use std::option::Option; use url::Url; +use v8::MapFnTo; lazy_static! { pub static ref EXTERNAL_REFERENCES: v8::ExternalReferences = @@ -49,7 +50,7 @@ lazy_static! { function: decode.map_fn_to() }, v8::ExternalReference { - function: get_promise_details.map_fn_to(), + function: get_promise_details.map_fn_to() } ]); } @@ -371,13 +372,19 @@ fn recv( slot.replace(v8::Global::new(scope, cb)); } -fn send( - scope: &mut v8::HandleScope, +fn send<'s>( + scope: &mut v8::HandleScope<'s>, args: v8::FunctionCallbackArguments, mut rv: v8::ReturnValue, ) { - let op_id = match v8::Local::<v8::Uint32>::try_from(args.get(0)) { - Ok(op_id) => op_id.value() as u32, + let state_rc = CoreIsolate::state(scope); + let state = state_rc.borrow_mut(); + + let op_id = match v8::Local::<v8::Integer>::try_from(args.get(0)) + .map_err(ErrBox::from) + .and_then(|l| OpId::try_from(l.value()).map_err(ErrBox::from)) + { + Ok(op_id) => op_id, Err(err) => { let msg = format!("invalid op id: {}", err); let msg = v8::String::new(scope, &msg).unwrap(); @@ -387,9 +394,6 @@ fn send( } }; - let state_rc = CoreIsolate::state(scope); - let mut state = state_rc.borrow_mut(); - let buf_iter = (1..args.length()).map(|idx| { v8::Local::<v8::ArrayBufferView>::try_from(args.get(idx)) .map(|view| ZeroCopyBuf::new(scope, view)) @@ -400,24 +404,37 @@ fn send( }) }); - // If response is empty then it's either async op or exception was thrown. - let maybe_response = - match buf_iter.collect::<Result<SmallVec<[ZeroCopyBuf; 2]>, _>>() { - Ok(mut bufs) => state.dispatch_op(scope, op_id, &mut bufs), - Err(exc) => { - scope.throw_exception(exc); - return; - } - }; - - if let Some(response) = maybe_response { - // Synchronous response. - // Note op_id is not passed back in the case of synchronous response. - let (_op_id, buf) = response; + let bufs = match buf_iter.collect::<Result<_, _>>() { + Ok(bufs) => bufs, + Err(exc) => { + scope.throw_exception(exc); + return; + } + }; - if !buf.is_empty() { - let ui8 = boxed_slice_to_uint8array(scope, buf); - rv.set(ui8.into()); + let op_router = state.op_router.clone(); + let op = op_router.route_op(op_id, bufs); + assert_eq!(state.shared.size(), 0); + match op { + Op::Sync(buf) if !buf.is_empty() => { + rv.set(boxed_slice_to_uint8array(scope, buf).into()); + } + Op::Sync(_) => {} + Op::Async(fut) => { + let fut2 = fut.map(move |buf| (op_id, buf)); + state.pending_ops.push(fut2.boxed_local()); + state.have_unpolled_ops.set(true); + } + Op::AsyncUnref(fut) => { + let fut2 = fut.map(move |buf| (op_id, buf)); + state.pending_unref_ops.push(fut2.boxed_local()); + state.have_unpolled_ops.set(true); + } + Op::NotFound => { + let msg = format!("Unknown op id: {}", op_id); + let msg = v8::String::new(scope, &msg).unwrap(); + let exc = v8::Exception::type_error(scope, msg); + scope.throw_exception(exc); } } } diff --git a/core/core_isolate.rs b/core/core_isolate.rs index a60ce6a82..52e856174 100644 --- a/core/core_isolate.rs +++ b/core/core_isolate.rs @@ -13,16 +13,12 @@ use crate::shared_queue::SharedQueue; use crate::shared_queue::RECOMMENDED_SIZE; use crate::ErrBox; use crate::JSError; -use crate::ResourceTable; -use crate::ZeroCopyBuf; -use futures::future::FutureExt; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use futures::task::AtomicWaker; use futures::Future; -use serde_json::json; -use serde_json::Value; use std::any::Any; +use std::cell::Cell; use std::cell::RefCell; use std::collections::HashMap; use std::convert::From; @@ -37,7 +33,7 @@ use std::sync::Once; use std::task::Context; use std::task::Poll; -type PendingOpFuture = Pin<Box<dyn Future<Output = (OpId, Buf)>>>; +type PendingOpFuture = Pin<Box<dyn Future<Output = (OpId, Box<[u8]>)>>>; /// Stores a script used to initialize a Isolate pub struct Script<'a> { @@ -87,7 +83,7 @@ impl StartupData<'_> { type JSErrorCreateFn = dyn Fn(JSError) -> ErrBox; -pub type GetErrorClassFn = &'static dyn for<'e> Fn(&'e ErrBox) -> &'static str; +pub type GetErrorClassFn = dyn for<'e> Fn(&'e ErrBox) -> &'static str; /// Objects that need to live as long as the isolate #[derive(Default)] @@ -118,19 +114,17 @@ pub struct CoreIsolate { /// Internal state for CoreIsolate which is stored in one of v8::Isolate's /// embedder slots. pub struct CoreIsolateState { - pub resource_table: Rc<RefCell<ResourceTable>>, pub global_context: Option<v8::Global<v8::Context>>, pub(crate) shared_ab: Option<v8::Global<v8::SharedArrayBuffer>>, pub(crate) js_recv_cb: Option<v8::Global<v8::Function>>, pub(crate) js_macrotask_cb: Option<v8::Global<v8::Function>>, pub(crate) pending_promise_exceptions: HashMap<i32, v8::Global<v8::Value>>, pub(crate) js_error_create_fn: Box<JSErrorCreateFn>, - pub get_error_class_fn: GetErrorClassFn, pub(crate) shared: SharedQueue, - pending_ops: FuturesUnordered<PendingOpFuture>, - pending_unref_ops: FuturesUnordered<PendingOpFuture>, - have_unpolled_ops: bool, - pub op_registry: OpRegistry, + pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>, + pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>, + pub(crate) have_unpolled_ops: Cell<bool>, + pub(crate) op_router: Rc<dyn OpRouter>, waker: AtomicWaker, } @@ -205,21 +199,27 @@ pub struct HeapLimits { } pub(crate) struct IsolateOptions { - will_snapshot: bool, + op_router: Rc<dyn OpRouter>, startup_script: Option<OwnedScript>, startup_snapshot: Option<Snapshot>, + will_snapshot: bool, heap_limits: Option<HeapLimits>, } impl CoreIsolate { /// startup_data defines the snapshot or script used at startup to initialize /// the isolate. - pub fn new(startup_data: StartupData, will_snapshot: bool) -> Self { + pub fn new( + op_router: Rc<dyn OpRouter>, + startup_data: StartupData, + will_snapshot: bool, + ) -> Self { let (startup_script, startup_snapshot) = startup_data.into_options(); let options = IsolateOptions { - will_snapshot, + op_router, startup_script, startup_snapshot, + will_snapshot, heap_limits: None, }; @@ -233,14 +233,16 @@ impl CoreIsolate { /// Make sure to use [`add_near_heap_limit_callback`](#method.add_near_heap_limit_callback) /// to prevent v8 from crashing when reaching the upper limit. pub fn with_heap_limits( + op_router: Rc<dyn OpRouter>, startup_data: StartupData, heap_limits: HeapLimits, ) -> Self { let (startup_script, startup_snapshot) = startup_data.into_options(); let options = IsolateOptions { - will_snapshot: false, + op_router, startup_script, startup_snapshot, + will_snapshot: false, heap_limits: Some(heap_limits), }; @@ -304,18 +306,16 @@ impl CoreIsolate { isolate.set_slot(Rc::new(RefCell::new(CoreIsolateState { global_context: Some(global_context), - resource_table: Rc::new(RefCell::new(ResourceTable::default())), pending_promise_exceptions: HashMap::new(), shared_ab: None, js_recv_cb: None, js_macrotask_cb: None, js_error_create_fn: Box::new(JSError::create), - get_error_class_fn: &|_| "Error", shared: SharedQueue::new(RECOMMENDED_SIZE), pending_ops: FuturesUnordered::new(), pending_unref_ops: FuturesUnordered::new(), - have_unpolled_ops: false, - op_registry: OpRegistry::new(), + have_unpolled_ops: Cell::new(false), + op_router: options.op_router, waker: AtomicWaker::new(), }))); @@ -421,66 +421,6 @@ impl CoreIsolate { snapshot } - /// Defines the how Deno.core.dispatch() acts. - /// Called whenever Deno.core.dispatch() is called in JavaScript. zero_copy_buf - /// corresponds to the second argument of Deno.core.dispatch(). - /// - /// Requires runtime to explicitly ask for op ids before using any of the ops. - pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId - where - F: Fn(&mut CoreIsolateState, &mut [ZeroCopyBuf]) -> Op + 'static, - { - let state_rc = Self::state(self); - let mut state = state_rc.borrow_mut(); - state.op_registry.register(name, op) - } - - pub fn register_op_json_sync<F>(&mut self, name: &str, op: F) -> OpId - where - F: 'static - + Fn( - &mut CoreIsolateState, - serde_json::Value, - &mut [ZeroCopyBuf], - ) -> Result<serde_json::Value, ErrBox>, - { - let core_op = - move |state: &mut CoreIsolateState, bufs: &mut [ZeroCopyBuf]| -> Op { - let value = serde_json::from_slice(&bufs[0]).unwrap(); - let result = op(state, value, &mut bufs[1..]); - let buf = serialize_result(None, result, state.get_error_class_fn); - Op::Sync(buf) - }; - - let state_rc = Self::state(self); - let mut state = state_rc.borrow_mut(); - state.op_registry.register(name, core_op) - } - - pub fn register_op_json_async<F, Fut>(&mut self, name: &str, op: F) -> OpId - where - Fut: 'static + Future<Output = Result<serde_json::Value, ErrBox>>, - F: 'static - + Fn(&mut CoreIsolateState, serde_json::Value, &mut [ZeroCopyBuf]) -> Fut, - { - let core_op = move |state: &mut CoreIsolateState, - bufs: &mut [ZeroCopyBuf]| - -> Op { - let get_error_class_fn = state.get_error_class_fn; - let value: serde_json::Value = serde_json::from_slice(&bufs[0]).unwrap(); - let promise_id = value.get("promiseId").unwrap().as_u64().unwrap(); - let fut = op(state, value, &mut bufs[1..]); - let fut2 = fut.map(move |result| { - serialize_result(Some(promise_id), result, get_error_class_fn) - }); - Op::Async(Box::pin(fut2)) - }; - - let state_rc = Self::state(self); - let mut state = state_rc.borrow_mut(); - state.op_registry.register(name, core_op) - } - /// Registers a callback on the isolate when the memory limits are approached. /// Use this to prevent V8 from crashing the process when reaching the limit. /// @@ -536,24 +476,6 @@ where callback(current_heap_limit, initial_heap_limit) } -fn serialize_result( - promise_id: Option<u64>, - result: Result<Value, ErrBox>, - get_error_class_fn: GetErrorClassFn, -) -> Buf { - let value = match result { - Ok(v) => json!({ "ok": v, "promiseId": promise_id }), - Err(err) => json!({ - "promiseId": promise_id , - "err": { - "className": (get_error_class_fn)(&err), - "message": err.to_string(), - } - }), - }; - serde_json::to_vec(&value).unwrap().into_boxed_slice() -} - impl Future for CoreIsolate { type Output = Result<(), ErrBox>; @@ -574,12 +496,12 @@ impl Future for CoreIsolate { check_promise_exceptions(scope)?; - let mut overflow_response: Option<(OpId, Buf)> = None; + let mut overflow_response: Option<(OpId, Box<[u8]>)> = None; loop { let mut state = state_rc.borrow_mut(); // Now handle actual ops. - state.have_unpolled_ops = false; + state.have_unpolled_ops.set(false); let pending_r = state.pending_ops.poll_next_unpin(cx); match pending_r { @@ -644,7 +566,7 @@ impl Future for CoreIsolate { if state.pending_ops.is_empty() { Poll::Ready(Ok(())) } else { - if state.have_unpolled_ops { + if state.have_unpolled_ops.get() { state.waker.wake(); } Poll::Pending @@ -653,18 +575,6 @@ impl Future for CoreIsolate { } impl CoreIsolateState { - /// Defines the how Deno.core.dispatch() acts. - /// Called whenever Deno.core.dispatch() is called in JavaScript. zero_copy_buf - /// corresponds to the second argument of Deno.core.dispatch(). - /// - /// Requires runtime to explicitly ask for op ids before using any of the ops. - pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId - where - F: Fn(&mut CoreIsolateState, &mut [ZeroCopyBuf]) -> Op + 'static, - { - self.op_registry.register(name, op) - } - /// Allows a callback to be set whenever a V8 exception is made. This allows /// the caller to wrap the JSError into an error. By default this callback /// is set to JSError::create. @@ -674,49 +584,6 @@ impl CoreIsolateState { ) { self.js_error_create_fn = Box::new(f); } - - pub fn set_get_error_class_fn(&mut self, f: GetErrorClassFn) { - self.get_error_class_fn = f; - } - - pub fn dispatch_op<'s>( - &mut self, - scope: &mut v8::HandleScope<'s>, - op_id: OpId, - zero_copy_bufs: &mut [ZeroCopyBuf], - ) -> Option<(OpId, Box<[u8]>)> { - let op = if let Some(dispatcher) = self.op_registry.get(op_id) { - dispatcher(self, zero_copy_bufs) - } else { - let message = - v8::String::new(scope, &format!("Unknown op id: {}", op_id)).unwrap(); - let exception = v8::Exception::type_error(scope, message); - scope.throw_exception(exception); - return None; - }; - - debug_assert_eq!(self.shared.size(), 0); - match op { - Op::Sync(buf) => { - // For sync messages, we always return the response via Deno.core.send's - // return value. Sync messages ignore the op_id. - let op_id = 0; - Some((op_id, buf)) - } - Op::Async(fut) => { - let fut2 = fut.map(move |buf| (op_id, buf)); - self.pending_ops.push(fut2.boxed_local()); - self.have_unpolled_ops = true; - None - } - Op::AsyncUnref(fut) => { - let fut2 = fut.map(move |buf| (op_id, buf)); - self.pending_unref_ops.push(fut2.boxed_local()); - self.have_unpolled_ops = true; - None - } - } - } } fn async_op_response<'s>( @@ -739,7 +606,7 @@ fn async_op_response<'s>( let op_id: v8::Local<v8::Value> = v8::Integer::new(tc_scope, op_id as i32).into(); let ui8: v8::Local<v8::Value> = - bindings::boxed_slice_to_uint8array(tc_scope, buf).into(); + boxed_slice_to_uint8array(tc_scope, buf).into(); js_recv_cb.call(tc_scope, global, &[op_id, ui8]) } None => js_recv_cb.call(tc_scope, global, &[]), @@ -848,11 +715,28 @@ pub fn js_check<T>(r: Result<T, ErrBox>) -> T { r.unwrap() } +fn boxed_slice_to_uint8array<'sc>( + scope: &mut v8::HandleScope<'sc>, + buf: Box<[u8]>, +) -> v8::Local<'sc, v8::Uint8Array> { + assert!(!buf.is_empty()); + let buf_len = buf.len(); + let backing_store = v8::ArrayBuffer::new_backing_store_from_boxed_slice(buf); + let backing_store_shared = backing_store.make_shared(); + let ab = v8::ArrayBuffer::with_backing_store(scope, &backing_store_shared); + v8::Uint8Array::new(scope, ab, 0, buf_len) + .expect("Failed to create UintArray8") +} + #[cfg(test)] pub mod tests { use super::*; + use crate::BasicState; + use crate::BufVec; use futures::future::lazy; + use futures::FutureExt; use std::ops::FnOnce; + use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -880,7 +764,7 @@ pub mod tests { ) } - pub enum Mode { + enum Mode { Async, AsyncUnref, AsyncZeroCopy(u8), @@ -890,28 +774,29 @@ pub mod tests { OverflowResAsync, } - pub fn setup(mode: Mode) -> (CoreIsolate, Arc<AtomicUsize>) { - let dispatch_count = Arc::new(AtomicUsize::new(0)); - let dispatch_count_ = dispatch_count.clone(); - - let mut isolate = CoreIsolate::new(StartupData::None, false); + struct TestOpRouter { + mode: Mode, + dispatch_count: Arc<AtomicUsize>, + } - let dispatcher = move |_state: &mut CoreIsolateState, - zero_copy: &mut [ZeroCopyBuf]| - -> Op { - dispatch_count_.fetch_add(1, Ordering::Relaxed); - match mode { + impl OpRouter for TestOpRouter { + fn route_op(self: Rc<Self>, op_id: OpId, bufs: BufVec) -> Op { + if op_id != 1 { + return Op::NotFound; + } + self.dispatch_count.fetch_add(1, Ordering::Relaxed); + match self.mode { Mode::Async => { - assert_eq!(zero_copy.len(), 1); - assert_eq!(zero_copy[0].len(), 1); - assert_eq!(zero_copy[0][0], 42); + assert_eq!(bufs.len(), 1); + assert_eq!(bufs[0].len(), 1); + assert_eq!(bufs[0][0], 42); let buf = vec![43u8].into_boxed_slice(); Op::Async(futures::future::ready(buf).boxed()) } Mode::AsyncUnref => { - assert_eq!(zero_copy.len(), 1); - assert_eq!(zero_copy[0].len(), 1); - assert_eq!(zero_copy[0][0], 42); + assert_eq!(bufs.len(), 1); + assert_eq!(bufs[0].len(), 1); + assert_eq!(bufs[0][0], 42); let fut = async { // This future never finish. futures::future::pending::<()>().await; @@ -920,8 +805,8 @@ pub mod tests { Op::AsyncUnref(fut.boxed()) } Mode::AsyncZeroCopy(count) => { - assert_eq!(zero_copy.len(), count as usize); - zero_copy.iter().enumerate().for_each(|(idx, buf)| { + assert_eq!(bufs.len(), count as usize); + bufs.iter().enumerate().for_each(|(idx, buf)| { assert_eq!(buf.len(), 1); assert_eq!(idx, buf[0] as usize); }); @@ -930,15 +815,15 @@ pub mod tests { Op::Async(futures::future::ready(buf).boxed()) } Mode::OverflowReqSync => { - assert_eq!(zero_copy.len(), 1); - assert_eq!(zero_copy[0].len(), 100 * 1024 * 1024); + assert_eq!(bufs.len(), 1); + assert_eq!(bufs[0].len(), 100 * 1024 * 1024); let buf = vec![43u8].into_boxed_slice(); Op::Sync(buf) } Mode::OverflowResSync => { - assert_eq!(zero_copy.len(), 1); - assert_eq!(zero_copy[0].len(), 1); - assert_eq!(zero_copy[0][0], 42); + assert_eq!(bufs.len(), 1); + assert_eq!(bufs[0].len(), 1); + assert_eq!(bufs[0][0], 42); let mut vec = Vec::<u8>::new(); vec.resize(100 * 1024 * 1024, 0); vec[0] = 99; @@ -946,15 +831,15 @@ pub mod tests { Op::Sync(buf) } Mode::OverflowReqAsync => { - assert_eq!(zero_copy.len(), 1); - assert_eq!(zero_copy[0].len(), 100 * 1024 * 1024); + assert_eq!(bufs.len(), 1); + assert_eq!(bufs[0].len(), 100 * 1024 * 1024); let buf = vec![43u8].into_boxed_slice(); Op::Async(futures::future::ready(buf).boxed()) } Mode::OverflowResAsync => { - assert_eq!(zero_copy.len(), 1); - assert_eq!(zero_copy[0].len(), 1); - assert_eq!(zero_copy[0][0], 42); + assert_eq!(bufs.len(), 1); + assert_eq!(bufs[0].len(), 1); + assert_eq!(bufs[0][0], 42); let mut vec = Vec::<u8>::new(); vec.resize(100 * 1024 * 1024, 0); vec[0] = 4; @@ -962,9 +847,16 @@ pub mod tests { Op::Async(futures::future::ready(buf).boxed()) } } - }; + } + } - isolate.register_op("test", dispatcher); + fn setup(mode: Mode) -> (CoreIsolate, Arc<AtomicUsize>) { + let dispatch_count = Arc::new(AtomicUsize::new(0)); + let test_state = Rc::new(TestOpRouter { + mode, + dispatch_count: dispatch_count.clone(), + }); + let mut isolate = CoreIsolate::new(test_state, StartupData::None, false); js_check(isolate.execute( "setup.js", @@ -1328,7 +1220,8 @@ pub mod tests { #[test] fn syntax_error() { - let mut isolate = CoreIsolate::new(StartupData::None, false); + let mut isolate = + CoreIsolate::new(BasicState::new(), StartupData::None, false); let src = "hocuspocus("; let r = isolate.execute("i.js", src); let e = r.unwrap_err(); @@ -1353,27 +1246,29 @@ pub mod tests { #[test] fn will_snapshot() { let snapshot = { - let mut isolate = CoreIsolate::new(StartupData::None, true); + let mut isolate = + CoreIsolate::new(BasicState::new(), StartupData::None, true); js_check(isolate.execute("a.js", "a = 1 + 2")); isolate.snapshot() }; let startup_data = StartupData::Snapshot(Snapshot::JustCreated(snapshot)); - let mut isolate2 = CoreIsolate::new(startup_data, false); + let mut isolate2 = CoreIsolate::new(BasicState::new(), startup_data, false); js_check(isolate2.execute("check.js", "if (a != 3) throw Error('x')")); } #[test] fn test_from_boxed_snapshot() { let snapshot = { - let mut isolate = CoreIsolate::new(StartupData::None, true); + let mut isolate = + CoreIsolate::new(BasicState::new(), StartupData::None, true); js_check(isolate.execute("a.js", "a = 1 + 2")); let snap: &[u8] = &*isolate.snapshot(); Vec::from(snap).into_boxed_slice() }; let startup_data = StartupData::Snapshot(Snapshot::Boxed(snapshot)); - let mut isolate2 = CoreIsolate::new(startup_data, false); + let mut isolate2 = CoreIsolate::new(BasicState::new(), startup_data, false); js_check(isolate2.execute("check.js", "if (a != 3) throw Error('x')")); } @@ -1383,8 +1278,11 @@ pub mod tests { initial: 0, max: 20 * 1024, // 20 kB }; - let mut isolate = - CoreIsolate::with_heap_limits(StartupData::None, heap_limits); + let mut isolate = CoreIsolate::with_heap_limits( + BasicState::new(), + StartupData::None, + heap_limits, + ); let cb_handle = isolate.thread_safe_handle(); let callback_invoke_count = Rc::new(AtomicUsize::default()); @@ -1412,7 +1310,8 @@ pub mod tests { #[test] fn test_heap_limit_cb_remove() { - let mut isolate = CoreIsolate::new(StartupData::None, false); + let mut isolate = + CoreIsolate::new(BasicState::new(), StartupData::None, false); isolate.add_near_heap_limit_callback(|current_limit, _initial_limit| { current_limit * 2 @@ -1427,8 +1326,11 @@ pub mod tests { initial: 0, max: 20 * 1024, // 20 kB }; - let mut isolate = - CoreIsolate::with_heap_limits(StartupData::None, heap_limits); + let mut isolate = CoreIsolate::with_heap_limits( + BasicState::new(), + StartupData::None, + heap_limits, + ); let cb_handle = isolate.thread_safe_handle(); let callback_invoke_count_first = Rc::new(AtomicUsize::default()); diff --git a/core/errors.rs b/core/errors.rs index 8fdcca059..2ce12689a 100644 --- a/core/errors.rs +++ b/core/errors.rs @@ -446,19 +446,21 @@ impl fmt::Debug for ErrWithV8Handle { } } -#[cfg(tests)] +#[cfg(test)] mod tests { + use super::*; + #[test] fn test_bad_resource() { let err = ErrBox::bad_resource("Resource has been closed"); - assert_eq!(err.1, "BadResource"); + assert!(matches!(err, ErrBox::Simple { class: "BadResource", .. })); assert_eq!(err.to_string(), "Resource has been closed"); } #[test] fn test_bad_resource_id() { let err = ErrBox::bad_resource_id(); - assert_eq!(err.1, "BadResource"); + assert!(matches!(err, ErrBox::Simple { class: "BadResource", .. })); assert_eq!(err.to_string(), "Bad resource ID"); } } diff --git a/core/es_isolate.rs b/core/es_isolate.rs index f2e2460ba..194f1adfa 100644 --- a/core/es_isolate.rs +++ b/core/es_isolate.rs @@ -10,6 +10,7 @@ use crate::bindings; use crate::errors::ErrBox; use crate::errors::ErrWithV8Handle; use crate::futures::FutureExt; +use crate::OpRouter; use futures::ready; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; @@ -76,10 +77,12 @@ impl DerefMut for EsIsolate { impl EsIsolate { pub fn new( loader: Rc<dyn ModuleLoader>, + op_router: Rc<dyn OpRouter>, startup_data: StartupData, will_snapshot: bool, ) -> Self { - let mut core_isolate = CoreIsolate::new(startup_data, will_snapshot); + let mut core_isolate = + CoreIsolate::new(op_router, startup_data, will_snapshot); { core_isolate.set_host_initialize_import_meta_object_callback( bindings::host_initialize_import_meta_object_callback, @@ -640,11 +643,11 @@ impl EsIsolateState { pub mod tests { use super::*; use crate::core_isolate::tests::run_in_task; - use crate::core_isolate::CoreIsolateState; use crate::js_check; use crate::modules::ModuleSourceFuture; use crate::ops::*; - use crate::ZeroCopyBuf; + use crate::BasicState; + use crate::BufVec; use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -681,24 +684,23 @@ pub mod tests { } let loader = Rc::new(ModsLoader::default()); + let state = BasicState::new(); + let resolve_count = loader.count.clone(); let dispatch_count = Arc::new(AtomicUsize::new(0)); let dispatch_count_ = dispatch_count.clone(); - let mut isolate = EsIsolate::new(loader, StartupData::None, false); - - let dispatcher = move |_state: &mut CoreIsolateState, - zero_copy: &mut [ZeroCopyBuf]| - -> Op { + let dispatcher = move |_state: Rc<BasicState>, bufs: BufVec| -> Op { dispatch_count_.fetch_add(1, Ordering::Relaxed); - assert_eq!(zero_copy.len(), 1); - assert_eq!(zero_copy[0].len(), 1); - assert_eq!(zero_copy[0][0], 42); - let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); + assert_eq!(bufs.len(), 1); + assert_eq!(bufs[0].len(), 1); + assert_eq!(bufs[0][0], 42); + let buf = [43u8, 0, 0, 0][..].into(); Op::Async(futures::future::ready(buf).boxed()) }; + state.register_op("test", dispatcher); - isolate.register_op("test", dispatcher); + let mut isolate = EsIsolate::new(loader, state, StartupData::None, false); js_check(isolate.execute( "setup.js", @@ -792,7 +794,8 @@ pub mod tests { run_in_task(|cx| { let loader = Rc::new(DynImportErrLoader::default()); let count = loader.count.clone(); - let mut isolate = EsIsolate::new(loader, StartupData::None, false); + let mut isolate = + EsIsolate::new(loader, BasicState::new(), StartupData::None, false); js_check(isolate.execute( "file:///dyn_import2.js", @@ -869,7 +872,8 @@ pub mod tests { let prepare_load_count = loader.prepare_load_count.clone(); let resolve_count = loader.resolve_count.clone(); let load_count = loader.load_count.clone(); - let mut isolate = EsIsolate::new(loader, StartupData::None, false); + let mut isolate = + EsIsolate::new(loader, BasicState::new(), StartupData::None, false); // Dynamically import mod_b js_check(isolate.execute( @@ -909,7 +913,8 @@ pub mod tests { run_in_task(|cx| { let loader = Rc::new(DynImportOkLoader::default()); let prepare_load_count = loader.prepare_load_count.clone(); - let mut isolate = EsIsolate::new(loader, StartupData::None, false); + let mut isolate = + EsIsolate::new(loader, BasicState::new(), StartupData::None, false); js_check(isolate.execute( "file:///dyn_import3.js", r#" @@ -960,7 +965,8 @@ pub mod tests { } let loader = std::rc::Rc::new(ModsLoader::default()); - let mut runtime_isolate = EsIsolate::new(loader, StartupData::None, true); + let mut runtime_isolate = + EsIsolate::new(loader, BasicState::new(), StartupData::None, true); let specifier = ModuleSpecifier::resolve_url("file:///main.js").unwrap(); let source_code = "Deno.core.print('hello\\n')".to_string(); diff --git a/core/examples/http_bench_bin_ops.rs b/core/examples/http_bench_bin_ops.rs index 366779e8c..f93b8b079 100644 --- a/core/examples/http_bench_bin_ops.rs +++ b/core/examples/http_bench_bin_ops.rs @@ -1,10 +1,12 @@ #[macro_use] extern crate log; +use deno_core::js_check; +use deno_core::BasicState; +use deno_core::BufVec; use deno_core::CoreIsolate; -use deno_core::CoreIsolateState; use deno_core::Op; -use deno_core::ResourceTable; +use deno_core::OpRegistry; use deno_core::Script; use deno_core::StartupData; use deno_core::ZeroCopyBuf; @@ -12,7 +14,6 @@ use futures::future::poll_fn; use futures::future::FutureExt; use futures::future::TryFuture; use futures::future::TryFutureExt; -use std::cell::RefCell; use std::convert::TryInto; use std::env; use std::fmt::Debug; @@ -27,6 +28,7 @@ use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::net::TcpListener; use tokio::net::TcpStream; +use tokio::runtime; struct Logger; @@ -46,9 +48,9 @@ impl log::Log for Logger { #[derive(Copy, Clone, Debug, PartialEq)] struct Record { - pub promise_id: u32, - pub rid: u32, - pub result: i32, + promise_id: u32, + rid: u32, + result: i32, } type RecordBuf = [u8; size_of::<Record>()]; @@ -75,131 +77,64 @@ impl From<Record> for RecordBuf { } } -pub fn isolate_new() -> CoreIsolate { +fn create_isolate() -> CoreIsolate { + let state = BasicState::new(); + register_op_bin_sync(&state, "listen", op_listen); + register_op_bin_sync(&state, "close", op_close); + register_op_bin_async(&state, "accept", op_accept); + register_op_bin_async(&state, "read", op_read); + register_op_bin_async(&state, "write", op_write); + let startup_data = StartupData::Script(Script { source: include_str!("http_bench_bin_ops.js"), filename: "http_bench_bin_ops.js", }); - let mut isolate = CoreIsolate::new(startup_data, false); - - fn register_sync_op<F>( - isolate: &mut CoreIsolate, - name: &'static str, - handler: F, - ) where - F: 'static - + Fn( - Rc<RefCell<ResourceTable>>, - u32, - &mut [ZeroCopyBuf], - ) -> Result<u32, Error>, - { - let core_handler = move |state: &mut CoreIsolateState, - zero_copy_bufs: &mut [ZeroCopyBuf]| - -> Op { - assert!(!zero_copy_bufs.is_empty()); - let record = Record::from(zero_copy_bufs[0].as_ref()); - let is_sync = record.promise_id == 0; - assert!(is_sync); - - let resource_table = state.resource_table.clone(); - let result: i32 = - match handler(resource_table, record.rid, &mut zero_copy_bufs[1..]) { - Ok(r) => r as i32, - Err(_) => -1, - }; - let buf = RecordBuf::from(Record { result, ..record })[..].into(); - Op::Sync(buf) - }; - - isolate.register_op(name, core_handler); - } - - fn register_async_op<F>( - isolate: &mut CoreIsolate, - name: &'static str, - handler: impl Fn(Rc<RefCell<ResourceTable>>, u32, &mut [ZeroCopyBuf]) -> F - + Copy - + 'static, - ) where - F: TryFuture, - F::Ok: TryInto<i32>, - <F::Ok as TryInto<i32>>::Error: Debug, - { - let core_handler = move |state: &mut CoreIsolateState, - zero_copy_bufs: &mut [ZeroCopyBuf]| - -> Op { - assert!(!zero_copy_bufs.is_empty()); - let record = Record::from(zero_copy_bufs[0].as_ref()); - let is_sync = record.promise_id == 0; - assert!(!is_sync); - - let mut zero_copy = zero_copy_bufs[1..].to_vec(); - let resource_table = state.resource_table.clone(); - let fut = async move { - let op = handler(resource_table, record.rid, &mut zero_copy); - let result = op - .map_ok(|r| r.try_into().expect("op result does not fit in i32")) - .unwrap_or_else(|_| -1) - .await; - RecordBuf::from(Record { result, ..record })[..].into() - }; - - Op::Async(fut.boxed_local()) - }; - - isolate.register_op(name, core_handler); - } - - register_sync_op(&mut isolate, "listen", op_listen); - register_async_op(&mut isolate, "accept", op_accept); - register_async_op(&mut isolate, "read", op_read); - register_async_op(&mut isolate, "write", op_write); - register_sync_op(&mut isolate, "close", op_close); - - isolate -} - -fn op_close( - resource_table: Rc<RefCell<ResourceTable>>, - rid: u32, - _buf: &mut [ZeroCopyBuf], -) -> Result<u32, Error> { - debug!("close rid={}", rid); - let resource_table = &mut resource_table.borrow_mut(); - resource_table - .close(rid) - .map(|_| 0) - .ok_or_else(bad_resource) + CoreIsolate::new(state, startup_data, false) } fn op_listen( - resource_table: Rc<RefCell<ResourceTable>>, + state: &BasicState, _rid: u32, - _buf: &mut [ZeroCopyBuf], + _bufs: &mut [ZeroCopyBuf], ) -> Result<u32, Error> { debug!("listen"); let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); let std_listener = std::net::TcpListener::bind(&addr)?; let listener = TcpListener::from_std(std_listener)?; - let resource_table = &mut resource_table.borrow_mut(); - let rid = resource_table.add("tcpListener", Box::new(listener)); + let rid = state + .resource_table + .borrow_mut() + .add("tcpListener", Box::new(listener)); Ok(rid) } +fn op_close( + state: &BasicState, + rid: u32, + _bufs: &mut [ZeroCopyBuf], +) -> Result<u32, Error> { + debug!("close rid={}", rid); + state + .resource_table + .borrow_mut() + .close(rid) + .map(|_| 0) + .ok_or_else(bad_resource_id) +} + fn op_accept( - resource_table: Rc<RefCell<ResourceTable>>, + state: Rc<BasicState>, rid: u32, - _buf: &mut [ZeroCopyBuf], + _bufs: BufVec, ) -> impl TryFuture<Ok = u32, Error = Error> { debug!("accept rid={}", rid); poll_fn(move |cx| { - let resource_table = &mut resource_table.borrow_mut(); + let resource_table = &mut state.resource_table.borrow_mut(); let listener = resource_table .get_mut::<TcpListener>(rid) - .ok_or_else(bad_resource)?; + .ok_or_else(bad_resource_id)?; listener.poll_accept(cx).map_ok(|(stream, _addr)| { resource_table.add("tcpStream", Box::new(stream)) }) @@ -207,9 +142,9 @@ fn op_accept( } fn op_read( - resource_table: Rc<RefCell<ResourceTable>>, + state: Rc<BasicState>, rid: u32, - bufs: &mut [ZeroCopyBuf], + bufs: BufVec, ) -> impl TryFuture<Ok = usize, Error = Error> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); let mut buf = bufs[0].clone(); @@ -217,33 +152,85 @@ fn op_read( debug!("read rid={}", rid); poll_fn(move |cx| { - let resource_table = &mut resource_table.borrow_mut(); + let resource_table = &mut state.resource_table.borrow_mut(); let stream = resource_table .get_mut::<TcpStream>(rid) - .ok_or_else(bad_resource)?; + .ok_or_else(bad_resource_id)?; Pin::new(stream).poll_read(cx, &mut buf) }) } fn op_write( - resource_table: Rc<RefCell<ResourceTable>>, + state: Rc<BasicState>, rid: u32, - bufs: &mut [ZeroCopyBuf], + bufs: BufVec, ) -> impl TryFuture<Ok = usize, Error = Error> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); let buf = bufs[0].clone(); debug!("write rid={}", rid); poll_fn(move |cx| { - let resource_table = &mut resource_table.borrow_mut(); + let resource_table = &mut state.resource_table.borrow_mut(); let stream = resource_table .get_mut::<TcpStream>(rid) - .ok_or_else(bad_resource)?; + .ok_or_else(bad_resource_id)?; Pin::new(stream).poll_write(cx, &buf) }) } -fn bad_resource() -> Error { +fn register_op_bin_sync<F>(state: &BasicState, name: &'static str, op_fn: F) +where + F: Fn(&BasicState, u32, &mut [ZeroCopyBuf]) -> Result<u32, Error> + 'static, +{ + let base_op_fn = move |state: Rc<BasicState>, mut bufs: BufVec| -> Op { + let record = Record::from(bufs[0].as_ref()); + let is_sync = record.promise_id == 0; + assert!(is_sync); + + let zero_copy_bufs = &mut bufs[1..]; + let result: i32 = match op_fn(&state, record.rid, zero_copy_bufs) { + Ok(r) => r as i32, + Err(_) => -1, + }; + let buf = RecordBuf::from(Record { result, ..record })[..].into(); + Op::Sync(buf) + }; + + state.register_op(name, base_op_fn); +} + +fn register_op_bin_async<F, R>(state: &BasicState, name: &'static str, op_fn: F) +where + F: Fn(Rc<BasicState>, u32, BufVec) -> R + Copy + 'static, + R: TryFuture, + R::Ok: TryInto<i32>, + <R::Ok as TryInto<i32>>::Error: Debug, +{ + let base_op_fn = move |state: Rc<BasicState>, bufs: BufVec| -> Op { + let mut bufs_iter = bufs.into_iter(); + let record_buf = bufs_iter.next().unwrap(); + let zero_copy_bufs = bufs_iter.collect::<BufVec>(); + + let record = Record::from(record_buf.as_ref()); + let is_sync = record.promise_id == 0; + assert!(!is_sync); + + let fut = async move { + let op = op_fn(state, record.rid, zero_copy_bufs); + let result = op + .map_ok(|r| r.try_into().expect("op result does not fit in i32")) + .unwrap_or_else(|_| -1) + .await; + RecordBuf::from(Record { result, ..record })[..].into() + }; + + Op::Async(fut.boxed_local()) + }; + + state.register_op(name, base_op_fn); +} + +fn bad_resource_id() -> Error { Error::new(ErrorKind::NotFound, "bad resource id") } @@ -259,13 +246,13 @@ fn main() { // NOTE: `--help` arg will display V8 help and exit deno_core::v8_set_flags(env::args().collect()); - let isolate = isolate_new(); - let mut runtime = tokio::runtime::Builder::new() + let isolate = create_isolate(); + let mut runtime = runtime::Builder::new() .basic_scheduler() .enable_all() .build() .unwrap(); - runtime.block_on(isolate).expect("unexpected isolate error"); + js_check(runtime.block_on(isolate)); } #[test] diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs index f0fc5f94c..6e3063a0f 100644 --- a/core/examples/http_bench_json_ops.rs +++ b/core/examples/http_bench_json_ops.rs @@ -1,25 +1,29 @@ #[macro_use] extern crate log; -use deno_core::serde_json; +use deno_core::js_check; +use deno_core::BasicState; +use deno_core::BufVec; use deno_core::CoreIsolate; -use deno_core::CoreIsolateState; use deno_core::ErrBox; +use deno_core::OpRegistry; use deno_core::Script; use deno_core::StartupData; use deno_core::ZeroCopyBuf; use futures::future::poll_fn; use futures::future::Future; +use serde_json::Value; +use std::convert::TryInto; use std::env; -use std::io::Error; -use std::io::ErrorKind; use std::net::SocketAddr; use std::pin::Pin; +use std::rc::Rc; use std::task::Poll; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::net::TcpListener; use tokio::net::TcpStream; +use tokio::runtime; struct Logger; @@ -37,66 +41,79 @@ impl log::Log for Logger { fn flush(&self) {} } -pub fn isolate_new() -> CoreIsolate { +fn create_isolate() -> CoreIsolate { + let state = BasicState::new(); + state.register_op_json_sync("listen", op_listen); + state.register_op_json_sync("close", op_close); + state.register_op_json_async("accept", op_accept); + state.register_op_json_async("read", op_read); + state.register_op_json_async("write", op_write); + let startup_data = StartupData::Script(Script { source: include_str!("http_bench_json_ops.js"), filename: "http_bench_json_ops.js", }); - let mut isolate = CoreIsolate::new(startup_data, false); - - isolate.register_op_json_sync("listen", op_listen); - isolate.register_op_json_async("accept", op_accept); - isolate.register_op_json_async("read", op_read); - isolate.register_op_json_async("write", op_write); - isolate.register_op_json_sync("close", op_close); + CoreIsolate::new(state, startup_data, false) +} - isolate +fn op_listen( + state: &BasicState, + _args: Value, + _bufs: &mut [ZeroCopyBuf], +) -> Result<Value, ErrBox> { + debug!("listen"); + let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); + let std_listener = std::net::TcpListener::bind(&addr)?; + let listener = TcpListener::from_std(std_listener)?; + let rid = state + .resource_table + .borrow_mut() + .add("tcpListener", Box::new(listener)); + Ok(serde_json::json!({ "rid": rid })) } fn op_close( - state: &mut CoreIsolateState, - args: serde_json::Value, + state: &BasicState, + args: Value, _buf: &mut [ZeroCopyBuf], -) -> Result<serde_json::Value, ErrBox> { - let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; +) -> Result<Value, ErrBox> { + let rid: u32 = args + .get("rid") + .unwrap() + .as_u64() + .unwrap() + .try_into() + .unwrap(); debug!("close rid={}", rid); - let resource_table = &mut state.resource_table.borrow_mut(); - resource_table + state + .resource_table + .borrow_mut() .close(rid) .map(|_| serde_json::json!(())) - .ok_or_else(bad_resource) -} - -fn op_listen( - state: &mut CoreIsolateState, - _args: serde_json::Value, - _buf: &mut [ZeroCopyBuf], -) -> Result<serde_json::Value, ErrBox> { - debug!("listen"); - let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); - let std_listener = std::net::TcpListener::bind(&addr)?; - let listener = TcpListener::from_std(std_listener)?; - let resource_table = &mut state.resource_table.borrow_mut(); - let rid = resource_table.add("tcpListener", Box::new(listener)); - Ok(serde_json::json!({ "rid": rid })) + .ok_or_else(ErrBox::bad_resource_id) } fn op_accept( - state: &mut CoreIsolateState, - args: serde_json::Value, - _buf: &mut [ZeroCopyBuf], -) -> impl Future<Output = Result<serde_json::Value, ErrBox>> { - let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; + state: Rc<BasicState>, + args: Value, + _bufs: BufVec, +) -> impl Future<Output = Result<Value, ErrBox>> { + let rid: u32 = args + .get("rid") + .unwrap() + .as_u64() + .unwrap() + .try_into() + .unwrap(); debug!("accept rid={}", rid); - let resource_table = state.resource_table.clone(); poll_fn(move |cx| { - let resource_table = &mut resource_table.borrow_mut(); + let resource_table = &mut state.resource_table.borrow_mut(); let listener = resource_table .get_mut::<TcpListener>(rid) - .ok_or_else(bad_resource)?; + .ok_or_else(ErrBox::bad_resource_id)?; listener.poll_accept(cx)?.map(|(stream, _addr)| { let rid = resource_table.add("tcpStream", Box::new(stream)); Ok(serde_json::json!({ "rid": rid })) @@ -105,57 +122,59 @@ fn op_accept( } fn op_read( - state: &mut CoreIsolateState, - args: serde_json::Value, - bufs: &mut [ZeroCopyBuf], -) -> impl Future<Output = Result<serde_json::Value, ErrBox>> { + state: Rc<BasicState>, + args: Value, + mut bufs: BufVec, +) -> impl Future<Output = Result<Value, ErrBox>> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; + let rid: u32 = args + .get("rid") + .unwrap() + .as_u64() + .unwrap() + .try_into() + .unwrap(); debug!("read rid={}", rid); - let mut buf = bufs[0].clone(); - let resource_table = state.resource_table.clone(); - - poll_fn(move |cx| -> Poll<Result<serde_json::Value, ErrBox>> { - let resource_table = &mut resource_table.borrow_mut(); + poll_fn(move |cx| -> Poll<Result<Value, ErrBox>> { + let resource_table = &mut state.resource_table.borrow_mut(); let stream = resource_table .get_mut::<TcpStream>(rid) - .ok_or_else(bad_resource)?; + .ok_or_else(ErrBox::bad_resource_id)?; Pin::new(stream) - .poll_read(cx, &mut buf)? + .poll_read(cx, &mut bufs[0])? .map(|nread| Ok(serde_json::json!({ "nread": nread }))) }) } fn op_write( - state: &mut CoreIsolateState, - args: serde_json::Value, - bufs: &mut [ZeroCopyBuf], -) -> impl Future<Output = Result<serde_json::Value, ErrBox>> { + state: Rc<BasicState>, + args: Value, + bufs: BufVec, +) -> impl Future<Output = Result<Value, ErrBox>> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; + let rid: u32 = args + .get("rid") + .unwrap() + .as_u64() + .unwrap() + .try_into() + .unwrap(); debug!("write rid={}", rid); - let buf = bufs[0].clone(); - let resource_table = state.resource_table.clone(); - poll_fn(move |cx| { - let resource_table = &mut resource_table.borrow_mut(); + let resource_table = &mut state.resource_table.borrow_mut(); let stream = resource_table .get_mut::<TcpStream>(rid) - .ok_or_else(bad_resource)?; + .ok_or_else(ErrBox::bad_resource_id)?; Pin::new(stream) - .poll_write(cx, &buf)? + .poll_write(cx, &bufs[0])? .map(|nwritten| Ok(serde_json::json!({ "nwritten": nwritten }))) }) } -fn bad_resource() -> ErrBox { - Error::new(ErrorKind::NotFound, "bad resource id").into() -} - fn main() { log::set_logger(&Logger).unwrap(); log::set_max_level( @@ -168,11 +187,11 @@ fn main() { // NOTE: `--help` arg will display V8 help and exit deno_core::v8_set_flags(env::args().collect()); - let isolate = isolate_new(); - let mut runtime = tokio::runtime::Builder::new() + let isolate = create_isolate(); + let mut runtime = runtime::Builder::new() .basic_scheduler() .enable_all() .build() .unwrap(); - deno_core::js_check(runtime.block_on(isolate)); + js_check(runtime.block_on(isolate)); } diff --git a/core/flags.rs b/core/flags.rs index 7c1f5c449..b69abab6f 100644 --- a/core/flags.rs +++ b/core/flags.rs @@ -1,4 +1,5 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + use rusty_v8 as v8; /// Pass the command line arguments to v8. /// Returns a vector of command line arguments that V8 did not understand. diff --git a/core/lib.rs b/core/lib.rs index d4a348f63..647c91aa4 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -8,6 +8,7 @@ extern crate lazy_static; #[macro_use] extern crate log; +mod basic_state; mod bindings; mod core_isolate; mod errors; @@ -24,6 +25,7 @@ mod zero_copy_buf; pub use rusty_v8 as v8; +pub use crate::basic_state::BasicState; pub use crate::core_isolate::js_check; pub use crate::core_isolate::CoreIsolate; pub use crate::core_isolate::CoreIsolateState; @@ -32,6 +34,7 @@ pub use crate::core_isolate::HeapLimits; pub use crate::core_isolate::Script; pub use crate::core_isolate::Snapshot; pub use crate::core_isolate::StartupData; +pub use crate::errors::AnyError; pub use crate::errors::ErrBox; pub use crate::errors::JSError; pub use crate::es_isolate::EsIsolate; @@ -47,10 +50,13 @@ pub use crate::modules::ModuleSource; pub use crate::modules::ModuleSourceFuture; pub use crate::modules::RecursiveModuleLoad; pub use crate::normalize_path::normalize_path; -pub use crate::ops::Buf; pub use crate::ops::Op; pub use crate::ops::OpAsyncFuture; +pub use crate::ops::OpFn; pub use crate::ops::OpId; +pub use crate::ops::OpRegistry; +pub use crate::ops::OpRouter; +pub use crate::ops::OpTable; pub use crate::resources::ResourceTable; pub use crate::zero_copy_buf::BufVec; pub use crate::zero_copy_buf::ZeroCopyBuf; diff --git a/core/modules.rs b/core/modules.rs index 516440bc0..817e1f25e 100644 --- a/core/modules.rs +++ b/core/modules.rs @@ -8,6 +8,7 @@ use futures::future::FutureExt; use futures::stream::FuturesUnordered; use futures::stream::Stream; use futures::stream::TryStreamExt; +use serde_json::Value; use std::collections::HashMap; use std::collections::HashSet; use std::fmt; @@ -481,7 +482,7 @@ impl Deps { } } - pub fn to_json(&self) -> serde_json::Value { + pub fn to_json(&self) -> Value { let children; if let Some(deps) = &self.deps { children = deps.iter().map(|c| c.to_json()).collect(); @@ -521,6 +522,7 @@ mod tests { use super::*; use crate::es_isolate::EsIsolate; use crate::js_check; + use crate::BasicState; use crate::StartupData; use futures::future::FutureExt; use std::error::Error; @@ -535,15 +537,14 @@ mod tests { // removed in the future. use crate::core_isolate::tests::run_in_task; + #[derive(Default)] struct MockLoader { pub loads: Arc<Mutex<Vec<String>>>, } impl MockLoader { - fn new() -> Self { - Self { - loads: Arc::new(Mutex::new(Vec::new())), - } + fn new() -> Rc<Self> { + Default::default() } } @@ -699,7 +700,8 @@ mod tests { fn test_recursive_load() { let loader = MockLoader::new(); let loads = loader.loads.clone(); - let mut isolate = EsIsolate::new(Rc::new(loader), StartupData::None, false); + let mut isolate = + EsIsolate::new(loader, BasicState::new(), StartupData::None, false); let spec = ModuleSpecifier::resolve_url("file:///a.js").unwrap(); let a_id_fut = isolate.load_module(&spec, None); let a_id = futures::executor::block_on(a_id_fut).expect("Failed to load"); @@ -761,7 +763,8 @@ mod tests { fn test_circular_load() { let loader = MockLoader::new(); let loads = loader.loads.clone(); - let mut isolate = EsIsolate::new(Rc::new(loader), StartupData::None, false); + let mut isolate = + EsIsolate::new(loader, BasicState::new(), StartupData::None, false); let fut = async move { let spec = ModuleSpecifier::resolve_url("file:///circular1.js").unwrap(); @@ -834,7 +837,8 @@ mod tests { fn test_redirect_load() { let loader = MockLoader::new(); let loads = loader.loads.clone(); - let mut isolate = EsIsolate::new(Rc::new(loader), StartupData::None, false); + let mut isolate = + EsIsolate::new(loader, BasicState::new(), StartupData::None, false); let fut = async move { let spec = ModuleSpecifier::resolve_url("file:///redirect1.js").unwrap(); @@ -899,7 +903,7 @@ mod tests { let loader = MockLoader::new(); let loads = loader.loads.clone(); let mut isolate = - EsIsolate::new(Rc::new(loader), StartupData::None, false); + EsIsolate::new(loader, BasicState::new(), StartupData::None, false); let spec = ModuleSpecifier::resolve_url("file:///main.js").unwrap(); let mut recursive_load = isolate.load_module(&spec, None).boxed_local(); @@ -945,7 +949,7 @@ mod tests { run_in_task(|mut cx| { let loader = MockLoader::new(); let mut isolate = - EsIsolate::new(Rc::new(loader), StartupData::None, false); + EsIsolate::new(loader, BasicState::new(), StartupData::None, false); let spec = ModuleSpecifier::resolve_url("file:///bad_import.js").unwrap(); let mut load_fut = isolate.load_module(&spec, None).boxed_local(); let result = load_fut.poll_unpin(&mut cx); @@ -973,7 +977,8 @@ mod tests { fn recursive_load_main_with_code() { let loader = MockLoader::new(); let loads = loader.loads.clone(); - let mut isolate = EsIsolate::new(Rc::new(loader), StartupData::None, false); + let mut isolate = + EsIsolate::new(loader, BasicState::new(), StartupData::None, false); // In default resolution code should be empty. // Instead we explicitly pass in our own code. // The behavior should be very similar to /a.js. diff --git a/core/ops.rs b/core/ops.rs index 65a0f325b..838596dc0 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -1,179 +1,146 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use crate::core_isolate::CoreIsolateState; + +use crate::BufVec; +use crate::ErrBox; use crate::ZeroCopyBuf; use futures::Future; +use futures::FutureExt; +use indexmap::IndexMap; +use serde_json::json; +use serde_json::Value; use std::collections::HashMap; +use std::iter::once; +use std::ops::Deref; +use std::ops::DerefMut; use std::pin::Pin; use std::rc::Rc; -pub type OpId = u32; - -pub type Buf = Box<[u8]>; - -pub type OpAsyncFuture = Pin<Box<dyn Future<Output = Buf>>>; +pub type OpAsyncFuture = Pin<Box<dyn Future<Output = Box<[u8]>>>>; +pub type OpFn<S> = dyn Fn(Rc<S>, BufVec) -> Op + 'static; +pub type OpId = usize; pub enum Op { - Sync(Buf), + Sync(Box<[u8]>), Async(OpAsyncFuture), /// AsyncUnref is the variation of Async, which doesn't block the program /// exiting. AsyncUnref(OpAsyncFuture), + NotFound, } -/// Main type describing op -pub type OpDispatcher = - dyn Fn(&mut CoreIsolateState, &mut [ZeroCopyBuf]) -> Op + 'static; - -#[derive(Default)] -pub struct OpRegistry { - dispatchers: Vec<Rc<OpDispatcher>>, - name_to_id: HashMap<String, OpId>, +pub trait OpRouter { + fn route_op(self: Rc<Self>, op_id: OpId, bufs: BufVec) -> Op; } -impl OpRegistry { - pub fn new() -> Self { - let mut registry = Self::default(); - let op_id = registry.register("ops", |state, _| { - let buf = state.op_registry.json_map(); +pub trait OpRegistry: OpRouter + 'static { + fn get_op_catalog(self: Rc<Self>) -> HashMap<String, OpId>; + + fn register_op<F>(&self, name: &str, op_fn: F) -> OpId + where + F: Fn(Rc<Self>, BufVec) -> Op + 'static; + + fn register_op_json_sync<F>(self: &Rc<Self>, name: &str, op_fn: F) -> OpId + where + F: Fn(&Self, Value, &mut [ZeroCopyBuf]) -> Result<Value, ErrBox> + 'static, + { + let base_op_fn = move |state: Rc<Self>, mut bufs: BufVec| -> Op { + let result = serde_json::from_slice(&bufs[0]) + .map_err(ErrBox::from) + .and_then(|args| op_fn(&state, args, &mut bufs[1..])); + let buf = state.json_serialize_op_result(None, result); Op::Sync(buf) - }); - assert_eq!(op_id, 0); - registry + }; + + self.register_op(name, base_op_fn) } - pub fn register<F>(&mut self, name: &str, op: F) -> OpId + fn register_op_json_async<F, R>(self: &Rc<Self>, name: &str, op_fn: F) -> OpId where - F: Fn(&mut CoreIsolateState, &mut [ZeroCopyBuf]) -> Op + 'static, + F: Fn(Rc<Self>, Value, BufVec) -> R + 'static, + R: Future<Output = Result<Value, ErrBox>> + 'static, { - let op_id = self.dispatchers.len() as u32; - - let existing = self.name_to_id.insert(name.to_string(), op_id); - assert!( - existing.is_none(), - format!("Op already registered: {}", name) - ); - self.dispatchers.push(Rc::new(op)); - op_id - } + let try_dispatch_op = move |state: Rc<Self>, + bufs: BufVec| + -> Result<Op, ErrBox> { + let args: Value = serde_json::from_slice(&bufs[0])?; + let promise_id = args + .get("promiseId") + .and_then(Value::as_u64) + .ok_or_else(|| ErrBox::type_error("missing or invalid `promiseId`"))?; + let bufs = bufs[1..].into(); + let fut = op_fn(state.clone(), args, bufs).map(move |result| { + state.json_serialize_op_result(Some(promise_id), result) + }); + Ok(Op::Async(Box::pin(fut))) + }; - fn json_map(&self) -> Buf { - let op_map_json = serde_json::to_string(&self.name_to_id).unwrap(); - op_map_json.as_bytes().to_owned().into_boxed_slice() - } + let base_op_fn = move |state: Rc<Self>, bufs: BufVec| -> Op { + match try_dispatch_op(state.clone(), bufs) { + Ok(op) => op, + Err(err) => Op::Sync(state.json_serialize_op_result(None, Err(err))), + } + }; - pub fn get(&self, op_id: OpId) -> Option<Rc<OpDispatcher>> { - self.dispatchers.get(op_id as usize).map(Rc::clone) + self.register_op(name, base_op_fn) } - pub fn unregister_op(&mut self, name: &str) { - let id = self.name_to_id.remove(name).unwrap(); - drop(self.dispatchers.remove(id as usize)); + fn json_serialize_op_result( + &self, + promise_id: Option<u64>, + result: Result<Value, ErrBox>, + ) -> Box<[u8]> { + let value = match result { + Ok(v) => json!({ "ok": v, "promiseId": promise_id }), + Err(err) => json!({ + "promiseId": promise_id , + "err": { + "className": self.get_error_class_name(&err), + "message": err.to_string(), + } + }), + }; + serde_json::to_vec(&value).unwrap().into_boxed_slice() } -} -#[test] -fn test_op_registry() { - use crate::CoreIsolate; - use std::sync::atomic; - use std::sync::Arc; - let mut op_registry = OpRegistry::new(); - - let c = Arc::new(atomic::AtomicUsize::new(0)); - let c_ = c.clone(); - - let test_id = op_registry.register("test", move |_, _| { - c_.fetch_add(1, atomic::Ordering::SeqCst); - Op::Sync(Box::new([])) - }); - assert!(test_id != 0); - - let mut expected = HashMap::new(); - expected.insert("ops".to_string(), 0); - expected.insert("test".to_string(), 1); - assert_eq!(op_registry.name_to_id, expected); - - let isolate = CoreIsolate::new(crate::StartupData::None, false); - - let dispatch = op_registry.get(test_id).unwrap(); - let state_rc = CoreIsolate::state(&isolate); - let mut state = state_rc.borrow_mut(); - let res = dispatch(&mut state, &mut []); - if let Op::Sync(buf) = res { - assert_eq!(buf.len(), 0); - } else { - unreachable!(); + fn get_error_class_name(&self, _err: &ErrBox) -> &'static str { + "Error" } - assert_eq!(c.load(atomic::Ordering::SeqCst), 1); - - assert!(op_registry.get(100).is_none()); - op_registry.unregister_op("test"); - expected.remove("test"); - assert_eq!(op_registry.name_to_id, expected); - assert!(op_registry.get(1).is_none()); } -#[test] -fn register_op_during_call() { - use crate::CoreIsolate; - use std::sync::atomic; - use std::sync::Arc; - use std::sync::Mutex; - let op_registry = Arc::new(Mutex::new(OpRegistry::new())); - - let c = Arc::new(atomic::AtomicUsize::new(0)); - let c_ = c.clone(); - - let op_registry_ = op_registry.clone(); - - let test_id = { - let mut g = op_registry.lock().unwrap(); - g.register("dynamic_register_op", move |_, _| { - let c__ = c_.clone(); - let mut g = op_registry_.lock().unwrap(); - g.register("test", move |_, _| { - c__.fetch_add(1, atomic::Ordering::SeqCst); - Op::Sync(Box::new([])) - }); - Op::Sync(Box::new([])) - }) - }; - assert!(test_id != 0); +/// Collection for storing registered ops. The special 'get_op_catalog' +/// op with OpId `0` is automatically added when the OpTable is created. +pub struct OpTable<S>(IndexMap<String, Rc<OpFn<S>>>); - let isolate = CoreIsolate::new(crate::StartupData::None, false); +impl<S: OpRegistry> OpTable<S> { + pub fn get_op_catalog(&self) -> HashMap<String, OpId> { + self.keys().cloned().zip(0..).collect() + } - let dispatcher1 = { - let g = op_registry.lock().unwrap(); - g.get(test_id).unwrap() - }; - { - let state_rc = CoreIsolate::state(&isolate); - let mut state = state_rc.borrow_mut(); - dispatcher1(&mut state, &mut []); + fn op_get_op_catalog(state: Rc<S>, _bufs: BufVec) -> Op { + let ops = state.get_op_catalog(); + let buf = serde_json::to_vec(&ops).map(Into::into).unwrap(); + Op::Sync(buf) } +} - let mut expected = HashMap::new(); - expected.insert("ops".to_string(), 0); - expected.insert("dynamic_register_op".to_string(), 1); - expected.insert("test".to_string(), 2); - { - let g = op_registry.lock().unwrap(); - assert_eq!(g.name_to_id, expected); +impl<S: OpRegistry> Default for OpTable<S> { + fn default() -> Self { + Self( + once(("ops".to_owned(), Rc::new(Self::op_get_op_catalog) as _)).collect(), + ) } +} - let dispatcher2 = { - let g = op_registry.lock().unwrap(); - g.get(2).unwrap() - }; - let state_rc = CoreIsolate::state(&isolate); - let mut state = state_rc.borrow_mut(); - let res = dispatcher2(&mut state, &mut []); - if let Op::Sync(buf) = res { - assert_eq!(buf.len(), 0); - } else { - unreachable!(); +impl<S> Deref for OpTable<S> { + type Target = IndexMap<String, Rc<OpFn<S>>>; + + fn deref(&self) -> &Self::Target { + &self.0 } - assert_eq!(c.load(atomic::Ordering::SeqCst), 1); +} - let g = op_registry.lock().unwrap(); - assert!(g.get(100).is_none()); +impl<S> DerefMut for OpTable<S> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } } diff --git a/core/plugin_api.rs b/core/plugin_api.rs index 0cb9acaeb..d57a5b3b5 100644 --- a/core/plugin_api.rs +++ b/core/plugin_api.rs @@ -8,7 +8,6 @@ // shared library itself, which would cause segfaults when the plugin is // unloaded and all functions in the plugin library are unmapped from memory. -pub use crate::Buf; pub use crate::Op; pub use crate::OpId; pub use crate::ZeroCopyBuf; diff --git a/core/shared_queue.rs b/core/shared_queue.rs index f35fff012..e8ac30ebc 100644 --- a/core/shared_queue.rs +++ b/core/shared_queue.rs @@ -19,6 +19,7 @@ SharedQueue Binary Layout use crate::bindings; use crate::ops::OpId; use rusty_v8 as v8; +use std::convert::TryInto; const MAX_RECORDS: usize = 100; /// Total number of records added. @@ -121,7 +122,7 @@ impl SharedQueue { fn set_meta(&mut self, index: usize, end: usize, op_id: OpId) { let s = self.as_u32_slice_mut(); s[INDEX_OFFSETS + 2 * index] = end as u32; - s[INDEX_OFFSETS + 2 * index + 1] = op_id; + s[INDEX_OFFSETS + 2 * index + 1] = op_id.try_into().unwrap(); } #[cfg(test)] @@ -129,7 +130,7 @@ impl SharedQueue { if index < self.num_records() { let s = self.as_u32_slice(); let end = s[INDEX_OFFSETS + 2 * index] as usize; - let op_id = s[INDEX_OFFSETS + 2 * index + 1]; + let op_id = s[INDEX_OFFSETS + 2 * index + 1] as OpId; Some((op_id, end)) } else { None @@ -218,7 +219,6 @@ impl SharedQueue { #[cfg(test)] mod tests { use super::*; - use crate::ops::Buf; #[test] fn basic() { @@ -262,7 +262,7 @@ mod tests { assert_eq!(q.size(), 0); } - fn alloc_buf(byte_length: usize) -> Buf { + fn alloc_buf(byte_length: usize) -> Box<[u8]> { let mut v = Vec::new(); v.resize(byte_length, 0); v.into_boxed_slice() |