summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorBert Belder <bertbelder@gmail.com>2020-09-06 02:34:02 +0200
committerGitHub <noreply@github.com>2020-09-06 02:34:02 +0200
commitc821e8f2f1fb8ad5e9eb00854277cafc8c80b2f5 (patch)
treec429a3c2707a4047fb512443a8468b7e15e5730d /core
parent849431eb1d112d1f79f4a327830dc1a5bf22dd47 (diff)
Move JSON ops to deno_core (#7336)
Diffstat (limited to 'core')
-rw-r--r--core/Cargo.toml1
-rw-r--r--core/basic_state.rs91
-rw-r--r--core/bindings.rs75
-rw-r--r--core/core_isolate.rs300
-rw-r--r--core/errors.rs8
-rw-r--r--core/es_isolate.rs40
-rw-r--r--core/examples/http_bench_bin_ops.rs223
-rw-r--r--core/examples/http_bench_json_ops.rs163
-rw-r--r--core/flags.rs1
-rw-r--r--core/lib.rs8
-rw-r--r--core/modules.rs27
-rw-r--r--core/ops.rs245
-rw-r--r--core/plugin_api.rs1
-rw-r--r--core/shared_queue.rs8
14 files changed, 597 insertions, 594 deletions
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 37d2f3268..94681cf7d 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -22,6 +22,7 @@ rusty_v8 = "0.9.1"
serde_json = { version = "1.0.57", features = ["preserve_order"] }
smallvec = "1.4.2"
url = "2.1.1"
+indexmap = "1.5.2"
[[example]]
name = "http_bench_bin_ops"
diff --git a/core/basic_state.rs b/core/basic_state.rs
new file mode 100644
index 000000000..54b9ee132
--- /dev/null
+++ b/core/basic_state.rs
@@ -0,0 +1,91 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use crate::BufVec;
+use crate::Op;
+use crate::OpId;
+use crate::OpRegistry;
+use crate::OpRouter;
+use crate::OpTable;
+use crate::ResourceTable;
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::rc::Rc;
+
+/// A minimal state struct for use by tests, examples etc. It contains
+/// an OpTable and ResourceTable, and implements the relevant traits
+/// for working with ops in the most straightforward way possible.
+#[derive(Default)]
+pub struct BasicState {
+ pub op_table: RefCell<OpTable<Self>>,
+ pub resource_table: RefCell<ResourceTable>,
+}
+
+impl BasicState {
+ pub fn new() -> Rc<Self> {
+ Default::default()
+ }
+}
+
+impl OpRegistry for BasicState {
+ fn get_op_catalog(self: Rc<Self>) -> HashMap<String, OpId> {
+ self.op_table.borrow().get_op_catalog()
+ }
+
+ fn register_op<F>(&self, name: &str, op_fn: F) -> OpId
+ where
+ F: Fn(Rc<Self>, BufVec) -> Op + 'static,
+ {
+ let mut op_table = self.op_table.borrow_mut();
+ let (op_id, prev) = op_table.insert_full(name.to_owned(), Rc::new(op_fn));
+ assert!(prev.is_none());
+ op_id
+ }
+}
+
+impl OpRouter for BasicState {
+ fn route_op(self: Rc<Self>, op_id: OpId, bufs: BufVec) -> Op {
+ let op_fn = self
+ .op_table
+ .borrow()
+ .get_index(op_id)
+ .map(|(_, op_fn)| op_fn.clone())
+ .unwrap();
+ (op_fn)(self, bufs)
+ }
+}
+
+#[test]
+fn test_basic_state_ops() {
+ let state = BasicState::new();
+
+ let foo_id = state.register_op("foo", |_, _| Op::Sync(b"oof!"[..].into()));
+ assert_eq!(foo_id, 1);
+
+ let bar_id = state.register_op("bar", |_, _| Op::Sync(b"rab!"[..].into()));
+ assert_eq!(bar_id, 2);
+
+ let state_ = state.clone();
+ let foo_res = state_.route_op(foo_id, Default::default());
+ assert!(matches!(foo_res, Op::Sync(buf) if &*buf == b"oof!"));
+
+ let state_ = state.clone();
+ let bar_res = state_.route_op(bar_id, Default::default());
+ assert!(matches!(bar_res, Op::Sync(buf) if &*buf == b"rab!"));
+
+ let catalog_res = state.route_op(0, Default::default());
+ let mut catalog_entries = match catalog_res {
+ Op::Sync(buf) => serde_json::from_slice::<HashMap<String, OpId>>(&buf)
+ .map(|map| map.into_iter().collect::<Vec<_>>())
+ .unwrap(),
+ _ => panic!("unexpected `Op` variant"),
+ };
+ catalog_entries.sort_by(|(_, id1), (_, id2)| id1.partial_cmp(id2).unwrap());
+ assert_eq!(
+ catalog_entries,
+ vec![
+ ("ops".to_owned(), 0),
+ ("foo".to_owned(), 1),
+ ("bar".to_owned(), 2)
+ ]
+ )
+}
diff --git a/core/bindings.rs b/core/bindings.rs
index f0181a227..166c0ee6e 100644
--- a/core/bindings.rs
+++ b/core/bindings.rs
@@ -2,18 +2,19 @@
use crate::CoreIsolate;
use crate::CoreIsolateState;
+use crate::ErrBox;
use crate::EsIsolate;
use crate::JSError;
+use crate::Op;
+use crate::OpId;
use crate::ZeroCopyBuf;
-
+use futures::future::FutureExt;
use rusty_v8 as v8;
-use v8::MapFnTo;
-
-use smallvec::SmallVec;
use std::cell::Cell;
use std::convert::TryFrom;
use std::option::Option;
use url::Url;
+use v8::MapFnTo;
lazy_static! {
pub static ref EXTERNAL_REFERENCES: v8::ExternalReferences =
@@ -49,7 +50,7 @@ lazy_static! {
function: decode.map_fn_to()
},
v8::ExternalReference {
- function: get_promise_details.map_fn_to(),
+ function: get_promise_details.map_fn_to()
}
]);
}
@@ -371,13 +372,19 @@ fn recv(
slot.replace(v8::Global::new(scope, cb));
}
-fn send(
- scope: &mut v8::HandleScope,
+fn send<'s>(
+ scope: &mut v8::HandleScope<'s>,
args: v8::FunctionCallbackArguments,
mut rv: v8::ReturnValue,
) {
- let op_id = match v8::Local::<v8::Uint32>::try_from(args.get(0)) {
- Ok(op_id) => op_id.value() as u32,
+ let state_rc = CoreIsolate::state(scope);
+ let state = state_rc.borrow_mut();
+
+ let op_id = match v8::Local::<v8::Integer>::try_from(args.get(0))
+ .map_err(ErrBox::from)
+ .and_then(|l| OpId::try_from(l.value()).map_err(ErrBox::from))
+ {
+ Ok(op_id) => op_id,
Err(err) => {
let msg = format!("invalid op id: {}", err);
let msg = v8::String::new(scope, &msg).unwrap();
@@ -387,9 +394,6 @@ fn send(
}
};
- let state_rc = CoreIsolate::state(scope);
- let mut state = state_rc.borrow_mut();
-
let buf_iter = (1..args.length()).map(|idx| {
v8::Local::<v8::ArrayBufferView>::try_from(args.get(idx))
.map(|view| ZeroCopyBuf::new(scope, view))
@@ -400,24 +404,37 @@ fn send(
})
});
- // If response is empty then it's either async op or exception was thrown.
- let maybe_response =
- match buf_iter.collect::<Result<SmallVec<[ZeroCopyBuf; 2]>, _>>() {
- Ok(mut bufs) => state.dispatch_op(scope, op_id, &mut bufs),
- Err(exc) => {
- scope.throw_exception(exc);
- return;
- }
- };
-
- if let Some(response) = maybe_response {
- // Synchronous response.
- // Note op_id is not passed back in the case of synchronous response.
- let (_op_id, buf) = response;
+ let bufs = match buf_iter.collect::<Result<_, _>>() {
+ Ok(bufs) => bufs,
+ Err(exc) => {
+ scope.throw_exception(exc);
+ return;
+ }
+ };
- if !buf.is_empty() {
- let ui8 = boxed_slice_to_uint8array(scope, buf);
- rv.set(ui8.into());
+ let op_router = state.op_router.clone();
+ let op = op_router.route_op(op_id, bufs);
+ assert_eq!(state.shared.size(), 0);
+ match op {
+ Op::Sync(buf) if !buf.is_empty() => {
+ rv.set(boxed_slice_to_uint8array(scope, buf).into());
+ }
+ Op::Sync(_) => {}
+ Op::Async(fut) => {
+ let fut2 = fut.map(move |buf| (op_id, buf));
+ state.pending_ops.push(fut2.boxed_local());
+ state.have_unpolled_ops.set(true);
+ }
+ Op::AsyncUnref(fut) => {
+ let fut2 = fut.map(move |buf| (op_id, buf));
+ state.pending_unref_ops.push(fut2.boxed_local());
+ state.have_unpolled_ops.set(true);
+ }
+ Op::NotFound => {
+ let msg = format!("Unknown op id: {}", op_id);
+ let msg = v8::String::new(scope, &msg).unwrap();
+ let exc = v8::Exception::type_error(scope, msg);
+ scope.throw_exception(exc);
}
}
}
diff --git a/core/core_isolate.rs b/core/core_isolate.rs
index a60ce6a82..52e856174 100644
--- a/core/core_isolate.rs
+++ b/core/core_isolate.rs
@@ -13,16 +13,12 @@ use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use crate::ErrBox;
use crate::JSError;
-use crate::ResourceTable;
-use crate::ZeroCopyBuf;
-use futures::future::FutureExt;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::task::AtomicWaker;
use futures::Future;
-use serde_json::json;
-use serde_json::Value;
use std::any::Any;
+use std::cell::Cell;
use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::From;
@@ -37,7 +33,7 @@ use std::sync::Once;
use std::task::Context;
use std::task::Poll;
-type PendingOpFuture = Pin<Box<dyn Future<Output = (OpId, Buf)>>>;
+type PendingOpFuture = Pin<Box<dyn Future<Output = (OpId, Box<[u8]>)>>>;
/// Stores a script used to initialize a Isolate
pub struct Script<'a> {
@@ -87,7 +83,7 @@ impl StartupData<'_> {
type JSErrorCreateFn = dyn Fn(JSError) -> ErrBox;
-pub type GetErrorClassFn = &'static dyn for<'e> Fn(&'e ErrBox) -> &'static str;
+pub type GetErrorClassFn = dyn for<'e> Fn(&'e ErrBox) -> &'static str;
/// Objects that need to live as long as the isolate
#[derive(Default)]
@@ -118,19 +114,17 @@ pub struct CoreIsolate {
/// Internal state for CoreIsolate which is stored in one of v8::Isolate's
/// embedder slots.
pub struct CoreIsolateState {
- pub resource_table: Rc<RefCell<ResourceTable>>,
pub global_context: Option<v8::Global<v8::Context>>,
pub(crate) shared_ab: Option<v8::Global<v8::SharedArrayBuffer>>,
pub(crate) js_recv_cb: Option<v8::Global<v8::Function>>,
pub(crate) js_macrotask_cb: Option<v8::Global<v8::Function>>,
pub(crate) pending_promise_exceptions: HashMap<i32, v8::Global<v8::Value>>,
pub(crate) js_error_create_fn: Box<JSErrorCreateFn>,
- pub get_error_class_fn: GetErrorClassFn,
pub(crate) shared: SharedQueue,
- pending_ops: FuturesUnordered<PendingOpFuture>,
- pending_unref_ops: FuturesUnordered<PendingOpFuture>,
- have_unpolled_ops: bool,
- pub op_registry: OpRegistry,
+ pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>,
+ pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>,
+ pub(crate) have_unpolled_ops: Cell<bool>,
+ pub(crate) op_router: Rc<dyn OpRouter>,
waker: AtomicWaker,
}
@@ -205,21 +199,27 @@ pub struct HeapLimits {
}
pub(crate) struct IsolateOptions {
- will_snapshot: bool,
+ op_router: Rc<dyn OpRouter>,
startup_script: Option<OwnedScript>,
startup_snapshot: Option<Snapshot>,
+ will_snapshot: bool,
heap_limits: Option<HeapLimits>,
}
impl CoreIsolate {
/// startup_data defines the snapshot or script used at startup to initialize
/// the isolate.
- pub fn new(startup_data: StartupData, will_snapshot: bool) -> Self {
+ pub fn new(
+ op_router: Rc<dyn OpRouter>,
+ startup_data: StartupData,
+ will_snapshot: bool,
+ ) -> Self {
let (startup_script, startup_snapshot) = startup_data.into_options();
let options = IsolateOptions {
- will_snapshot,
+ op_router,
startup_script,
startup_snapshot,
+ will_snapshot,
heap_limits: None,
};
@@ -233,14 +233,16 @@ impl CoreIsolate {
/// Make sure to use [`add_near_heap_limit_callback`](#method.add_near_heap_limit_callback)
/// to prevent v8 from crashing when reaching the upper limit.
pub fn with_heap_limits(
+ op_router: Rc<dyn OpRouter>,
startup_data: StartupData,
heap_limits: HeapLimits,
) -> Self {
let (startup_script, startup_snapshot) = startup_data.into_options();
let options = IsolateOptions {
- will_snapshot: false,
+ op_router,
startup_script,
startup_snapshot,
+ will_snapshot: false,
heap_limits: Some(heap_limits),
};
@@ -304,18 +306,16 @@ impl CoreIsolate {
isolate.set_slot(Rc::new(RefCell::new(CoreIsolateState {
global_context: Some(global_context),
- resource_table: Rc::new(RefCell::new(ResourceTable::default())),
pending_promise_exceptions: HashMap::new(),
shared_ab: None,
js_recv_cb: None,
js_macrotask_cb: None,
js_error_create_fn: Box::new(JSError::create),
- get_error_class_fn: &|_| "Error",
shared: SharedQueue::new(RECOMMENDED_SIZE),
pending_ops: FuturesUnordered::new(),
pending_unref_ops: FuturesUnordered::new(),
- have_unpolled_ops: false,
- op_registry: OpRegistry::new(),
+ have_unpolled_ops: Cell::new(false),
+ op_router: options.op_router,
waker: AtomicWaker::new(),
})));
@@ -421,66 +421,6 @@ impl CoreIsolate {
snapshot
}
- /// Defines the how Deno.core.dispatch() acts.
- /// Called whenever Deno.core.dispatch() is called in JavaScript. zero_copy_buf
- /// corresponds to the second argument of Deno.core.dispatch().
- ///
- /// Requires runtime to explicitly ask for op ids before using any of the ops.
- pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId
- where
- F: Fn(&mut CoreIsolateState, &mut [ZeroCopyBuf]) -> Op + 'static,
- {
- let state_rc = Self::state(self);
- let mut state = state_rc.borrow_mut();
- state.op_registry.register(name, op)
- }
-
- pub fn register_op_json_sync<F>(&mut self, name: &str, op: F) -> OpId
- where
- F: 'static
- + Fn(
- &mut CoreIsolateState,
- serde_json::Value,
- &mut [ZeroCopyBuf],
- ) -> Result<serde_json::Value, ErrBox>,
- {
- let core_op =
- move |state: &mut CoreIsolateState, bufs: &mut [ZeroCopyBuf]| -> Op {
- let value = serde_json::from_slice(&bufs[0]).unwrap();
- let result = op(state, value, &mut bufs[1..]);
- let buf = serialize_result(None, result, state.get_error_class_fn);
- Op::Sync(buf)
- };
-
- let state_rc = Self::state(self);
- let mut state = state_rc.borrow_mut();
- state.op_registry.register(name, core_op)
- }
-
- pub fn register_op_json_async<F, Fut>(&mut self, name: &str, op: F) -> OpId
- where
- Fut: 'static + Future<Output = Result<serde_json::Value, ErrBox>>,
- F: 'static
- + Fn(&mut CoreIsolateState, serde_json::Value, &mut [ZeroCopyBuf]) -> Fut,
- {
- let core_op = move |state: &mut CoreIsolateState,
- bufs: &mut [ZeroCopyBuf]|
- -> Op {
- let get_error_class_fn = state.get_error_class_fn;
- let value: serde_json::Value = serde_json::from_slice(&bufs[0]).unwrap();
- let promise_id = value.get("promiseId").unwrap().as_u64().unwrap();
- let fut = op(state, value, &mut bufs[1..]);
- let fut2 = fut.map(move |result| {
- serialize_result(Some(promise_id), result, get_error_class_fn)
- });
- Op::Async(Box::pin(fut2))
- };
-
- let state_rc = Self::state(self);
- let mut state = state_rc.borrow_mut();
- state.op_registry.register(name, core_op)
- }
-
/// Registers a callback on the isolate when the memory limits are approached.
/// Use this to prevent V8 from crashing the process when reaching the limit.
///
@@ -536,24 +476,6 @@ where
callback(current_heap_limit, initial_heap_limit)
}
-fn serialize_result(
- promise_id: Option<u64>,
- result: Result<Value, ErrBox>,
- get_error_class_fn: GetErrorClassFn,
-) -> Buf {
- let value = match result {
- Ok(v) => json!({ "ok": v, "promiseId": promise_id }),
- Err(err) => json!({
- "promiseId": promise_id ,
- "err": {
- "className": (get_error_class_fn)(&err),
- "message": err.to_string(),
- }
- }),
- };
- serde_json::to_vec(&value).unwrap().into_boxed_slice()
-}
-
impl Future for CoreIsolate {
type Output = Result<(), ErrBox>;
@@ -574,12 +496,12 @@ impl Future for CoreIsolate {
check_promise_exceptions(scope)?;
- let mut overflow_response: Option<(OpId, Buf)> = None;
+ let mut overflow_response: Option<(OpId, Box<[u8]>)> = None;
loop {
let mut state = state_rc.borrow_mut();
// Now handle actual ops.
- state.have_unpolled_ops = false;
+ state.have_unpolled_ops.set(false);
let pending_r = state.pending_ops.poll_next_unpin(cx);
match pending_r {
@@ -644,7 +566,7 @@ impl Future for CoreIsolate {
if state.pending_ops.is_empty() {
Poll::Ready(Ok(()))
} else {
- if state.have_unpolled_ops {
+ if state.have_unpolled_ops.get() {
state.waker.wake();
}
Poll::Pending
@@ -653,18 +575,6 @@ impl Future for CoreIsolate {
}
impl CoreIsolateState {
- /// Defines the how Deno.core.dispatch() acts.
- /// Called whenever Deno.core.dispatch() is called in JavaScript. zero_copy_buf
- /// corresponds to the second argument of Deno.core.dispatch().
- ///
- /// Requires runtime to explicitly ask for op ids before using any of the ops.
- pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId
- where
- F: Fn(&mut CoreIsolateState, &mut [ZeroCopyBuf]) -> Op + 'static,
- {
- self.op_registry.register(name, op)
- }
-
/// Allows a callback to be set whenever a V8 exception is made. This allows
/// the caller to wrap the JSError into an error. By default this callback
/// is set to JSError::create.
@@ -674,49 +584,6 @@ impl CoreIsolateState {
) {
self.js_error_create_fn = Box::new(f);
}
-
- pub fn set_get_error_class_fn(&mut self, f: GetErrorClassFn) {
- self.get_error_class_fn = f;
- }
-
- pub fn dispatch_op<'s>(
- &mut self,
- scope: &mut v8::HandleScope<'s>,
- op_id: OpId,
- zero_copy_bufs: &mut [ZeroCopyBuf],
- ) -> Option<(OpId, Box<[u8]>)> {
- let op = if let Some(dispatcher) = self.op_registry.get(op_id) {
- dispatcher(self, zero_copy_bufs)
- } else {
- let message =
- v8::String::new(scope, &format!("Unknown op id: {}", op_id)).unwrap();
- let exception = v8::Exception::type_error(scope, message);
- scope.throw_exception(exception);
- return None;
- };
-
- debug_assert_eq!(self.shared.size(), 0);
- match op {
- Op::Sync(buf) => {
- // For sync messages, we always return the response via Deno.core.send's
- // return value. Sync messages ignore the op_id.
- let op_id = 0;
- Some((op_id, buf))
- }
- Op::Async(fut) => {
- let fut2 = fut.map(move |buf| (op_id, buf));
- self.pending_ops.push(fut2.boxed_local());
- self.have_unpolled_ops = true;
- None
- }
- Op::AsyncUnref(fut) => {
- let fut2 = fut.map(move |buf| (op_id, buf));
- self.pending_unref_ops.push(fut2.boxed_local());
- self.have_unpolled_ops = true;
- None
- }
- }
- }
}
fn async_op_response<'s>(
@@ -739,7 +606,7 @@ fn async_op_response<'s>(
let op_id: v8::Local<v8::Value> =
v8::Integer::new(tc_scope, op_id as i32).into();
let ui8: v8::Local<v8::Value> =
- bindings::boxed_slice_to_uint8array(tc_scope, buf).into();
+ boxed_slice_to_uint8array(tc_scope, buf).into();
js_recv_cb.call(tc_scope, global, &[op_id, ui8])
}
None => js_recv_cb.call(tc_scope, global, &[]),
@@ -848,11 +715,28 @@ pub fn js_check<T>(r: Result<T, ErrBox>) -> T {
r.unwrap()
}
+fn boxed_slice_to_uint8array<'sc>(
+ scope: &mut v8::HandleScope<'sc>,
+ buf: Box<[u8]>,
+) -> v8::Local<'sc, v8::Uint8Array> {
+ assert!(!buf.is_empty());
+ let buf_len = buf.len();
+ let backing_store = v8::ArrayBuffer::new_backing_store_from_boxed_slice(buf);
+ let backing_store_shared = backing_store.make_shared();
+ let ab = v8::ArrayBuffer::with_backing_store(scope, &backing_store_shared);
+ v8::Uint8Array::new(scope, ab, 0, buf_len)
+ .expect("Failed to create UintArray8")
+}
+
#[cfg(test)]
pub mod tests {
use super::*;
+ use crate::BasicState;
+ use crate::BufVec;
use futures::future::lazy;
+ use futures::FutureExt;
use std::ops::FnOnce;
+ use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
@@ -880,7 +764,7 @@ pub mod tests {
)
}
- pub enum Mode {
+ enum Mode {
Async,
AsyncUnref,
AsyncZeroCopy(u8),
@@ -890,28 +774,29 @@ pub mod tests {
OverflowResAsync,
}
- pub fn setup(mode: Mode) -> (CoreIsolate, Arc<AtomicUsize>) {
- let dispatch_count = Arc::new(AtomicUsize::new(0));
- let dispatch_count_ = dispatch_count.clone();
-
- let mut isolate = CoreIsolate::new(StartupData::None, false);
+ struct TestOpRouter {
+ mode: Mode,
+ dispatch_count: Arc<AtomicUsize>,
+ }
- let dispatcher = move |_state: &mut CoreIsolateState,
- zero_copy: &mut [ZeroCopyBuf]|
- -> Op {
- dispatch_count_.fetch_add(1, Ordering::Relaxed);
- match mode {
+ impl OpRouter for TestOpRouter {
+ fn route_op(self: Rc<Self>, op_id: OpId, bufs: BufVec) -> Op {
+ if op_id != 1 {
+ return Op::NotFound;
+ }
+ self.dispatch_count.fetch_add(1, Ordering::Relaxed);
+ match self.mode {
Mode::Async => {
- assert_eq!(zero_copy.len(), 1);
- assert_eq!(zero_copy[0].len(), 1);
- assert_eq!(zero_copy[0][0], 42);
+ assert_eq!(bufs.len(), 1);
+ assert_eq!(bufs[0].len(), 1);
+ assert_eq!(bufs[0][0], 42);
let buf = vec![43u8].into_boxed_slice();
Op::Async(futures::future::ready(buf).boxed())
}
Mode::AsyncUnref => {
- assert_eq!(zero_copy.len(), 1);
- assert_eq!(zero_copy[0].len(), 1);
- assert_eq!(zero_copy[0][0], 42);
+ assert_eq!(bufs.len(), 1);
+ assert_eq!(bufs[0].len(), 1);
+ assert_eq!(bufs[0][0], 42);
let fut = async {
// This future never finish.
futures::future::pending::<()>().await;
@@ -920,8 +805,8 @@ pub mod tests {
Op::AsyncUnref(fut.boxed())
}
Mode::AsyncZeroCopy(count) => {
- assert_eq!(zero_copy.len(), count as usize);
- zero_copy.iter().enumerate().for_each(|(idx, buf)| {
+ assert_eq!(bufs.len(), count as usize);
+ bufs.iter().enumerate().for_each(|(idx, buf)| {
assert_eq!(buf.len(), 1);
assert_eq!(idx, buf[0] as usize);
});
@@ -930,15 +815,15 @@ pub mod tests {
Op::Async(futures::future::ready(buf).boxed())
}
Mode::OverflowReqSync => {
- assert_eq!(zero_copy.len(), 1);
- assert_eq!(zero_copy[0].len(), 100 * 1024 * 1024);
+ assert_eq!(bufs.len(), 1);
+ assert_eq!(bufs[0].len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
Op::Sync(buf)
}
Mode::OverflowResSync => {
- assert_eq!(zero_copy.len(), 1);
- assert_eq!(zero_copy[0].len(), 1);
- assert_eq!(zero_copy[0][0], 42);
+ assert_eq!(bufs.len(), 1);
+ assert_eq!(bufs[0].len(), 1);
+ assert_eq!(bufs[0][0], 42);
let mut vec = Vec::<u8>::new();
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 99;
@@ -946,15 +831,15 @@ pub mod tests {
Op::Sync(buf)
}
Mode::OverflowReqAsync => {
- assert_eq!(zero_copy.len(), 1);
- assert_eq!(zero_copy[0].len(), 100 * 1024 * 1024);
+ assert_eq!(bufs.len(), 1);
+ assert_eq!(bufs[0].len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
Op::Async(futures::future::ready(buf).boxed())
}
Mode::OverflowResAsync => {
- assert_eq!(zero_copy.len(), 1);
- assert_eq!(zero_copy[0].len(), 1);
- assert_eq!(zero_copy[0][0], 42);
+ assert_eq!(bufs.len(), 1);
+ assert_eq!(bufs[0].len(), 1);
+ assert_eq!(bufs[0][0], 42);
let mut vec = Vec::<u8>::new();
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 4;
@@ -962,9 +847,16 @@ pub mod tests {
Op::Async(futures::future::ready(buf).boxed())
}
}
- };
+ }
+ }
- isolate.register_op("test", dispatcher);
+ fn setup(mode: Mode) -> (CoreIsolate, Arc<AtomicUsize>) {
+ let dispatch_count = Arc::new(AtomicUsize::new(0));
+ let test_state = Rc::new(TestOpRouter {
+ mode,
+ dispatch_count: dispatch_count.clone(),
+ });
+ let mut isolate = CoreIsolate::new(test_state, StartupData::None, false);
js_check(isolate.execute(
"setup.js",
@@ -1328,7 +1220,8 @@ pub mod tests {
#[test]
fn syntax_error() {
- let mut isolate = CoreIsolate::new(StartupData::None, false);
+ let mut isolate =
+ CoreIsolate::new(BasicState::new(), StartupData::None, false);
let src = "hocuspocus(";
let r = isolate.execute("i.js", src);
let e = r.unwrap_err();
@@ -1353,27 +1246,29 @@ pub mod tests {
#[test]
fn will_snapshot() {
let snapshot = {
- let mut isolate = CoreIsolate::new(StartupData::None, true);
+ let mut isolate =
+ CoreIsolate::new(BasicState::new(), StartupData::None, true);
js_check(isolate.execute("a.js", "a = 1 + 2"));
isolate.snapshot()
};
let startup_data = StartupData::Snapshot(Snapshot::JustCreated(snapshot));
- let mut isolate2 = CoreIsolate::new(startup_data, false);
+ let mut isolate2 = CoreIsolate::new(BasicState::new(), startup_data, false);
js_check(isolate2.execute("check.js", "if (a != 3) throw Error('x')"));
}
#[test]
fn test_from_boxed_snapshot() {
let snapshot = {
- let mut isolate = CoreIsolate::new(StartupData::None, true);
+ let mut isolate =
+ CoreIsolate::new(BasicState::new(), StartupData::None, true);
js_check(isolate.execute("a.js", "a = 1 + 2"));
let snap: &[u8] = &*isolate.snapshot();
Vec::from(snap).into_boxed_slice()
};
let startup_data = StartupData::Snapshot(Snapshot::Boxed(snapshot));
- let mut isolate2 = CoreIsolate::new(startup_data, false);
+ let mut isolate2 = CoreIsolate::new(BasicState::new(), startup_data, false);
js_check(isolate2.execute("check.js", "if (a != 3) throw Error('x')"));
}
@@ -1383,8 +1278,11 @@ pub mod tests {
initial: 0,
max: 20 * 1024, // 20 kB
};
- let mut isolate =
- CoreIsolate::with_heap_limits(StartupData::None, heap_limits);
+ let mut isolate = CoreIsolate::with_heap_limits(
+ BasicState::new(),
+ StartupData::None,
+ heap_limits,
+ );
let cb_handle = isolate.thread_safe_handle();
let callback_invoke_count = Rc::new(AtomicUsize::default());
@@ -1412,7 +1310,8 @@ pub mod tests {
#[test]
fn test_heap_limit_cb_remove() {
- let mut isolate = CoreIsolate::new(StartupData::None, false);
+ let mut isolate =
+ CoreIsolate::new(BasicState::new(), StartupData::None, false);
isolate.add_near_heap_limit_callback(|current_limit, _initial_limit| {
current_limit * 2
@@ -1427,8 +1326,11 @@ pub mod tests {
initial: 0,
max: 20 * 1024, // 20 kB
};
- let mut isolate =
- CoreIsolate::with_heap_limits(StartupData::None, heap_limits);
+ let mut isolate = CoreIsolate::with_heap_limits(
+ BasicState::new(),
+ StartupData::None,
+ heap_limits,
+ );
let cb_handle = isolate.thread_safe_handle();
let callback_invoke_count_first = Rc::new(AtomicUsize::default());
diff --git a/core/errors.rs b/core/errors.rs
index 8fdcca059..2ce12689a 100644
--- a/core/errors.rs
+++ b/core/errors.rs
@@ -446,19 +446,21 @@ impl fmt::Debug for ErrWithV8Handle {
}
}
-#[cfg(tests)]
+#[cfg(test)]
mod tests {
+ use super::*;
+
#[test]
fn test_bad_resource() {
let err = ErrBox::bad_resource("Resource has been closed");
- assert_eq!(err.1, "BadResource");
+ assert!(matches!(err, ErrBox::Simple { class: "BadResource", .. }));
assert_eq!(err.to_string(), "Resource has been closed");
}
#[test]
fn test_bad_resource_id() {
let err = ErrBox::bad_resource_id();
- assert_eq!(err.1, "BadResource");
+ assert!(matches!(err, ErrBox::Simple { class: "BadResource", .. }));
assert_eq!(err.to_string(), "Bad resource ID");
}
}
diff --git a/core/es_isolate.rs b/core/es_isolate.rs
index f2e2460ba..194f1adfa 100644
--- a/core/es_isolate.rs
+++ b/core/es_isolate.rs
@@ -10,6 +10,7 @@ use crate::bindings;
use crate::errors::ErrBox;
use crate::errors::ErrWithV8Handle;
use crate::futures::FutureExt;
+use crate::OpRouter;
use futures::ready;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
@@ -76,10 +77,12 @@ impl DerefMut for EsIsolate {
impl EsIsolate {
pub fn new(
loader: Rc<dyn ModuleLoader>,
+ op_router: Rc<dyn OpRouter>,
startup_data: StartupData,
will_snapshot: bool,
) -> Self {
- let mut core_isolate = CoreIsolate::new(startup_data, will_snapshot);
+ let mut core_isolate =
+ CoreIsolate::new(op_router, startup_data, will_snapshot);
{
core_isolate.set_host_initialize_import_meta_object_callback(
bindings::host_initialize_import_meta_object_callback,
@@ -640,11 +643,11 @@ impl EsIsolateState {
pub mod tests {
use super::*;
use crate::core_isolate::tests::run_in_task;
- use crate::core_isolate::CoreIsolateState;
use crate::js_check;
use crate::modules::ModuleSourceFuture;
use crate::ops::*;
- use crate::ZeroCopyBuf;
+ use crate::BasicState;
+ use crate::BufVec;
use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
@@ -681,24 +684,23 @@ pub mod tests {
}
let loader = Rc::new(ModsLoader::default());
+ let state = BasicState::new();
+
let resolve_count = loader.count.clone();
let dispatch_count = Arc::new(AtomicUsize::new(0));
let dispatch_count_ = dispatch_count.clone();
- let mut isolate = EsIsolate::new(loader, StartupData::None, false);
-
- let dispatcher = move |_state: &mut CoreIsolateState,
- zero_copy: &mut [ZeroCopyBuf]|
- -> Op {
+ let dispatcher = move |_state: Rc<BasicState>, bufs: BufVec| -> Op {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
- assert_eq!(zero_copy.len(), 1);
- assert_eq!(zero_copy[0].len(), 1);
- assert_eq!(zero_copy[0][0], 42);
- let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
+ assert_eq!(bufs.len(), 1);
+ assert_eq!(bufs[0].len(), 1);
+ assert_eq!(bufs[0][0], 42);
+ let buf = [43u8, 0, 0, 0][..].into();
Op::Async(futures::future::ready(buf).boxed())
};
+ state.register_op("test", dispatcher);
- isolate.register_op("test", dispatcher);
+ let mut isolate = EsIsolate::new(loader, state, StartupData::None, false);
js_check(isolate.execute(
"setup.js",
@@ -792,7 +794,8 @@ pub mod tests {
run_in_task(|cx| {
let loader = Rc::new(DynImportErrLoader::default());
let count = loader.count.clone();
- let mut isolate = EsIsolate::new(loader, StartupData::None, false);
+ let mut isolate =
+ EsIsolate::new(loader, BasicState::new(), StartupData::None, false);
js_check(isolate.execute(
"file:///dyn_import2.js",
@@ -869,7 +872,8 @@ pub mod tests {
let prepare_load_count = loader.prepare_load_count.clone();
let resolve_count = loader.resolve_count.clone();
let load_count = loader.load_count.clone();
- let mut isolate = EsIsolate::new(loader, StartupData::None, false);
+ let mut isolate =
+ EsIsolate::new(loader, BasicState::new(), StartupData::None, false);
// Dynamically import mod_b
js_check(isolate.execute(
@@ -909,7 +913,8 @@ pub mod tests {
run_in_task(|cx| {
let loader = Rc::new(DynImportOkLoader::default());
let prepare_load_count = loader.prepare_load_count.clone();
- let mut isolate = EsIsolate::new(loader, StartupData::None, false);
+ let mut isolate =
+ EsIsolate::new(loader, BasicState::new(), StartupData::None, false);
js_check(isolate.execute(
"file:///dyn_import3.js",
r#"
@@ -960,7 +965,8 @@ pub mod tests {
}
let loader = std::rc::Rc::new(ModsLoader::default());
- let mut runtime_isolate = EsIsolate::new(loader, StartupData::None, true);
+ let mut runtime_isolate =
+ EsIsolate::new(loader, BasicState::new(), StartupData::None, true);
let specifier = ModuleSpecifier::resolve_url("file:///main.js").unwrap();
let source_code = "Deno.core.print('hello\\n')".to_string();
diff --git a/core/examples/http_bench_bin_ops.rs b/core/examples/http_bench_bin_ops.rs
index 366779e8c..f93b8b079 100644
--- a/core/examples/http_bench_bin_ops.rs
+++ b/core/examples/http_bench_bin_ops.rs
@@ -1,10 +1,12 @@
#[macro_use]
extern crate log;
+use deno_core::js_check;
+use deno_core::BasicState;
+use deno_core::BufVec;
use deno_core::CoreIsolate;
-use deno_core::CoreIsolateState;
use deno_core::Op;
-use deno_core::ResourceTable;
+use deno_core::OpRegistry;
use deno_core::Script;
use deno_core::StartupData;
use deno_core::ZeroCopyBuf;
@@ -12,7 +14,6 @@ use futures::future::poll_fn;
use futures::future::FutureExt;
use futures::future::TryFuture;
use futures::future::TryFutureExt;
-use std::cell::RefCell;
use std::convert::TryInto;
use std::env;
use std::fmt::Debug;
@@ -27,6 +28,7 @@ use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
+use tokio::runtime;
struct Logger;
@@ -46,9 +48,9 @@ impl log::Log for Logger {
#[derive(Copy, Clone, Debug, PartialEq)]
struct Record {
- pub promise_id: u32,
- pub rid: u32,
- pub result: i32,
+ promise_id: u32,
+ rid: u32,
+ result: i32,
}
type RecordBuf = [u8; size_of::<Record>()];
@@ -75,131 +77,64 @@ impl From<Record> for RecordBuf {
}
}
-pub fn isolate_new() -> CoreIsolate {
+fn create_isolate() -> CoreIsolate {
+ let state = BasicState::new();
+ register_op_bin_sync(&state, "listen", op_listen);
+ register_op_bin_sync(&state, "close", op_close);
+ register_op_bin_async(&state, "accept", op_accept);
+ register_op_bin_async(&state, "read", op_read);
+ register_op_bin_async(&state, "write", op_write);
+
let startup_data = StartupData::Script(Script {
source: include_str!("http_bench_bin_ops.js"),
filename: "http_bench_bin_ops.js",
});
- let mut isolate = CoreIsolate::new(startup_data, false);
-
- fn register_sync_op<F>(
- isolate: &mut CoreIsolate,
- name: &'static str,
- handler: F,
- ) where
- F: 'static
- + Fn(
- Rc<RefCell<ResourceTable>>,
- u32,
- &mut [ZeroCopyBuf],
- ) -> Result<u32, Error>,
- {
- let core_handler = move |state: &mut CoreIsolateState,
- zero_copy_bufs: &mut [ZeroCopyBuf]|
- -> Op {
- assert!(!zero_copy_bufs.is_empty());
- let record = Record::from(zero_copy_bufs[0].as_ref());
- let is_sync = record.promise_id == 0;
- assert!(is_sync);
-
- let resource_table = state.resource_table.clone();
- let result: i32 =
- match handler(resource_table, record.rid, &mut zero_copy_bufs[1..]) {
- Ok(r) => r as i32,
- Err(_) => -1,
- };
- let buf = RecordBuf::from(Record { result, ..record })[..].into();
- Op::Sync(buf)
- };
-
- isolate.register_op(name, core_handler);
- }
-
- fn register_async_op<F>(
- isolate: &mut CoreIsolate,
- name: &'static str,
- handler: impl Fn(Rc<RefCell<ResourceTable>>, u32, &mut [ZeroCopyBuf]) -> F
- + Copy
- + 'static,
- ) where
- F: TryFuture,
- F::Ok: TryInto<i32>,
- <F::Ok as TryInto<i32>>::Error: Debug,
- {
- let core_handler = move |state: &mut CoreIsolateState,
- zero_copy_bufs: &mut [ZeroCopyBuf]|
- -> Op {
- assert!(!zero_copy_bufs.is_empty());
- let record = Record::from(zero_copy_bufs[0].as_ref());
- let is_sync = record.promise_id == 0;
- assert!(!is_sync);
-
- let mut zero_copy = zero_copy_bufs[1..].to_vec();
- let resource_table = state.resource_table.clone();
- let fut = async move {
- let op = handler(resource_table, record.rid, &mut zero_copy);
- let result = op
- .map_ok(|r| r.try_into().expect("op result does not fit in i32"))
- .unwrap_or_else(|_| -1)
- .await;
- RecordBuf::from(Record { result, ..record })[..].into()
- };
-
- Op::Async(fut.boxed_local())
- };
-
- isolate.register_op(name, core_handler);
- }
-
- register_sync_op(&mut isolate, "listen", op_listen);
- register_async_op(&mut isolate, "accept", op_accept);
- register_async_op(&mut isolate, "read", op_read);
- register_async_op(&mut isolate, "write", op_write);
- register_sync_op(&mut isolate, "close", op_close);
-
- isolate
-}
-
-fn op_close(
- resource_table: Rc<RefCell<ResourceTable>>,
- rid: u32,
- _buf: &mut [ZeroCopyBuf],
-) -> Result<u32, Error> {
- debug!("close rid={}", rid);
- let resource_table = &mut resource_table.borrow_mut();
- resource_table
- .close(rid)
- .map(|_| 0)
- .ok_or_else(bad_resource)
+ CoreIsolate::new(state, startup_data, false)
}
fn op_listen(
- resource_table: Rc<RefCell<ResourceTable>>,
+ state: &BasicState,
_rid: u32,
- _buf: &mut [ZeroCopyBuf],
+ _bufs: &mut [ZeroCopyBuf],
) -> Result<u32, Error> {
debug!("listen");
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let std_listener = std::net::TcpListener::bind(&addr)?;
let listener = TcpListener::from_std(std_listener)?;
- let resource_table = &mut resource_table.borrow_mut();
- let rid = resource_table.add("tcpListener", Box::new(listener));
+ let rid = state
+ .resource_table
+ .borrow_mut()
+ .add("tcpListener", Box::new(listener));
Ok(rid)
}
+fn op_close(
+ state: &BasicState,
+ rid: u32,
+ _bufs: &mut [ZeroCopyBuf],
+) -> Result<u32, Error> {
+ debug!("close rid={}", rid);
+ state
+ .resource_table
+ .borrow_mut()
+ .close(rid)
+ .map(|_| 0)
+ .ok_or_else(bad_resource_id)
+}
+
fn op_accept(
- resource_table: Rc<RefCell<ResourceTable>>,
+ state: Rc<BasicState>,
rid: u32,
- _buf: &mut [ZeroCopyBuf],
+ _bufs: BufVec,
) -> impl TryFuture<Ok = u32, Error = Error> {
debug!("accept rid={}", rid);
poll_fn(move |cx| {
- let resource_table = &mut resource_table.borrow_mut();
+ let resource_table = &mut state.resource_table.borrow_mut();
let listener = resource_table
.get_mut::<TcpListener>(rid)
- .ok_or_else(bad_resource)?;
+ .ok_or_else(bad_resource_id)?;
listener.poll_accept(cx).map_ok(|(stream, _addr)| {
resource_table.add("tcpStream", Box::new(stream))
})
@@ -207,9 +142,9 @@ fn op_accept(
}
fn op_read(
- resource_table: Rc<RefCell<ResourceTable>>,
+ state: Rc<BasicState>,
rid: u32,
- bufs: &mut [ZeroCopyBuf],
+ bufs: BufVec,
) -> impl TryFuture<Ok = usize, Error = Error> {
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
let mut buf = bufs[0].clone();
@@ -217,33 +152,85 @@ fn op_read(
debug!("read rid={}", rid);
poll_fn(move |cx| {
- let resource_table = &mut resource_table.borrow_mut();
+ let resource_table = &mut state.resource_table.borrow_mut();
let stream = resource_table
.get_mut::<TcpStream>(rid)
- .ok_or_else(bad_resource)?;
+ .ok_or_else(bad_resource_id)?;
Pin::new(stream).poll_read(cx, &mut buf)
})
}
fn op_write(
- resource_table: Rc<RefCell<ResourceTable>>,
+ state: Rc<BasicState>,
rid: u32,
- bufs: &mut [ZeroCopyBuf],
+ bufs: BufVec,
) -> impl TryFuture<Ok = usize, Error = Error> {
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
let buf = bufs[0].clone();
debug!("write rid={}", rid);
poll_fn(move |cx| {
- let resource_table = &mut resource_table.borrow_mut();
+ let resource_table = &mut state.resource_table.borrow_mut();
let stream = resource_table
.get_mut::<TcpStream>(rid)
- .ok_or_else(bad_resource)?;
+ .ok_or_else(bad_resource_id)?;
Pin::new(stream).poll_write(cx, &buf)
})
}
-fn bad_resource() -> Error {
+fn register_op_bin_sync<F>(state: &BasicState, name: &'static str, op_fn: F)
+where
+ F: Fn(&BasicState, u32, &mut [ZeroCopyBuf]) -> Result<u32, Error> + 'static,
+{
+ let base_op_fn = move |state: Rc<BasicState>, mut bufs: BufVec| -> Op {
+ let record = Record::from(bufs[0].as_ref());
+ let is_sync = record.promise_id == 0;
+ assert!(is_sync);
+
+ let zero_copy_bufs = &mut bufs[1..];
+ let result: i32 = match op_fn(&state, record.rid, zero_copy_bufs) {
+ Ok(r) => r as i32,
+ Err(_) => -1,
+ };
+ let buf = RecordBuf::from(Record { result, ..record })[..].into();
+ Op::Sync(buf)
+ };
+
+ state.register_op(name, base_op_fn);
+}
+
+fn register_op_bin_async<F, R>(state: &BasicState, name: &'static str, op_fn: F)
+where
+ F: Fn(Rc<BasicState>, u32, BufVec) -> R + Copy + 'static,
+ R: TryFuture,
+ R::Ok: TryInto<i32>,
+ <R::Ok as TryInto<i32>>::Error: Debug,
+{
+ let base_op_fn = move |state: Rc<BasicState>, bufs: BufVec| -> Op {
+ let mut bufs_iter = bufs.into_iter();
+ let record_buf = bufs_iter.next().unwrap();
+ let zero_copy_bufs = bufs_iter.collect::<BufVec>();
+
+ let record = Record::from(record_buf.as_ref());
+ let is_sync = record.promise_id == 0;
+ assert!(!is_sync);
+
+ let fut = async move {
+ let op = op_fn(state, record.rid, zero_copy_bufs);
+ let result = op
+ .map_ok(|r| r.try_into().expect("op result does not fit in i32"))
+ .unwrap_or_else(|_| -1)
+ .await;
+ RecordBuf::from(Record { result, ..record })[..].into()
+ };
+
+ Op::Async(fut.boxed_local())
+ };
+
+ state.register_op(name, base_op_fn);
+}
+
+fn bad_resource_id() -> Error {
Error::new(ErrorKind::NotFound, "bad resource id")
}
@@ -259,13 +246,13 @@ fn main() {
// NOTE: `--help` arg will display V8 help and exit
deno_core::v8_set_flags(env::args().collect());
- let isolate = isolate_new();
- let mut runtime = tokio::runtime::Builder::new()
+ let isolate = create_isolate();
+ let mut runtime = runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap();
- runtime.block_on(isolate).expect("unexpected isolate error");
+ js_check(runtime.block_on(isolate));
}
#[test]
diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs
index f0fc5f94c..6e3063a0f 100644
--- a/core/examples/http_bench_json_ops.rs
+++ b/core/examples/http_bench_json_ops.rs
@@ -1,25 +1,29 @@
#[macro_use]
extern crate log;
-use deno_core::serde_json;
+use deno_core::js_check;
+use deno_core::BasicState;
+use deno_core::BufVec;
use deno_core::CoreIsolate;
-use deno_core::CoreIsolateState;
use deno_core::ErrBox;
+use deno_core::OpRegistry;
use deno_core::Script;
use deno_core::StartupData;
use deno_core::ZeroCopyBuf;
use futures::future::poll_fn;
use futures::future::Future;
+use serde_json::Value;
+use std::convert::TryInto;
use std::env;
-use std::io::Error;
-use std::io::ErrorKind;
use std::net::SocketAddr;
use std::pin::Pin;
+use std::rc::Rc;
use std::task::Poll;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
+use tokio::runtime;
struct Logger;
@@ -37,66 +41,79 @@ impl log::Log for Logger {
fn flush(&self) {}
}
-pub fn isolate_new() -> CoreIsolate {
+fn create_isolate() -> CoreIsolate {
+ let state = BasicState::new();
+ state.register_op_json_sync("listen", op_listen);
+ state.register_op_json_sync("close", op_close);
+ state.register_op_json_async("accept", op_accept);
+ state.register_op_json_async("read", op_read);
+ state.register_op_json_async("write", op_write);
+
let startup_data = StartupData::Script(Script {
source: include_str!("http_bench_json_ops.js"),
filename: "http_bench_json_ops.js",
});
- let mut isolate = CoreIsolate::new(startup_data, false);
-
- isolate.register_op_json_sync("listen", op_listen);
- isolate.register_op_json_async("accept", op_accept);
- isolate.register_op_json_async("read", op_read);
- isolate.register_op_json_async("write", op_write);
- isolate.register_op_json_sync("close", op_close);
+ CoreIsolate::new(state, startup_data, false)
+}
- isolate
+fn op_listen(
+ state: &BasicState,
+ _args: Value,
+ _bufs: &mut [ZeroCopyBuf],
+) -> Result<Value, ErrBox> {
+ debug!("listen");
+ let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
+ let std_listener = std::net::TcpListener::bind(&addr)?;
+ let listener = TcpListener::from_std(std_listener)?;
+ let rid = state
+ .resource_table
+ .borrow_mut()
+ .add("tcpListener", Box::new(listener));
+ Ok(serde_json::json!({ "rid": rid }))
}
fn op_close(
- state: &mut CoreIsolateState,
- args: serde_json::Value,
+ state: &BasicState,
+ args: Value,
_buf: &mut [ZeroCopyBuf],
-) -> Result<serde_json::Value, ErrBox> {
- let rid = args.get("rid").unwrap().as_u64().unwrap() as u32;
+) -> Result<Value, ErrBox> {
+ let rid: u32 = args
+ .get("rid")
+ .unwrap()
+ .as_u64()
+ .unwrap()
+ .try_into()
+ .unwrap();
debug!("close rid={}", rid);
- let resource_table = &mut state.resource_table.borrow_mut();
- resource_table
+ state
+ .resource_table
+ .borrow_mut()
.close(rid)
.map(|_| serde_json::json!(()))
- .ok_or_else(bad_resource)
-}
-
-fn op_listen(
- state: &mut CoreIsolateState,
- _args: serde_json::Value,
- _buf: &mut [ZeroCopyBuf],
-) -> Result<serde_json::Value, ErrBox> {
- debug!("listen");
- let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
- let std_listener = std::net::TcpListener::bind(&addr)?;
- let listener = TcpListener::from_std(std_listener)?;
- let resource_table = &mut state.resource_table.borrow_mut();
- let rid = resource_table.add("tcpListener", Box::new(listener));
- Ok(serde_json::json!({ "rid": rid }))
+ .ok_or_else(ErrBox::bad_resource_id)
}
fn op_accept(
- state: &mut CoreIsolateState,
- args: serde_json::Value,
- _buf: &mut [ZeroCopyBuf],
-) -> impl Future<Output = Result<serde_json::Value, ErrBox>> {
- let rid = args.get("rid").unwrap().as_u64().unwrap() as u32;
+ state: Rc<BasicState>,
+ args: Value,
+ _bufs: BufVec,
+) -> impl Future<Output = Result<Value, ErrBox>> {
+ let rid: u32 = args
+ .get("rid")
+ .unwrap()
+ .as_u64()
+ .unwrap()
+ .try_into()
+ .unwrap();
debug!("accept rid={}", rid);
- let resource_table = state.resource_table.clone();
poll_fn(move |cx| {
- let resource_table = &mut resource_table.borrow_mut();
+ let resource_table = &mut state.resource_table.borrow_mut();
let listener = resource_table
.get_mut::<TcpListener>(rid)
- .ok_or_else(bad_resource)?;
+ .ok_or_else(ErrBox::bad_resource_id)?;
listener.poll_accept(cx)?.map(|(stream, _addr)| {
let rid = resource_table.add("tcpStream", Box::new(stream));
Ok(serde_json::json!({ "rid": rid }))
@@ -105,57 +122,59 @@ fn op_accept(
}
fn op_read(
- state: &mut CoreIsolateState,
- args: serde_json::Value,
- bufs: &mut [ZeroCopyBuf],
-) -> impl Future<Output = Result<serde_json::Value, ErrBox>> {
+ state: Rc<BasicState>,
+ args: Value,
+ mut bufs: BufVec,
+) -> impl Future<Output = Result<Value, ErrBox>> {
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
- let rid = args.get("rid").unwrap().as_u64().unwrap() as u32;
+ let rid: u32 = args
+ .get("rid")
+ .unwrap()
+ .as_u64()
+ .unwrap()
+ .try_into()
+ .unwrap();
debug!("read rid={}", rid);
- let mut buf = bufs[0].clone();
- let resource_table = state.resource_table.clone();
-
- poll_fn(move |cx| -> Poll<Result<serde_json::Value, ErrBox>> {
- let resource_table = &mut resource_table.borrow_mut();
+ poll_fn(move |cx| -> Poll<Result<Value, ErrBox>> {
+ let resource_table = &mut state.resource_table.borrow_mut();
let stream = resource_table
.get_mut::<TcpStream>(rid)
- .ok_or_else(bad_resource)?;
+ .ok_or_else(ErrBox::bad_resource_id)?;
Pin::new(stream)
- .poll_read(cx, &mut buf)?
+ .poll_read(cx, &mut bufs[0])?
.map(|nread| Ok(serde_json::json!({ "nread": nread })))
})
}
fn op_write(
- state: &mut CoreIsolateState,
- args: serde_json::Value,
- bufs: &mut [ZeroCopyBuf],
-) -> impl Future<Output = Result<serde_json::Value, ErrBox>> {
+ state: Rc<BasicState>,
+ args: Value,
+ bufs: BufVec,
+) -> impl Future<Output = Result<Value, ErrBox>> {
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
- let rid = args.get("rid").unwrap().as_u64().unwrap() as u32;
+ let rid: u32 = args
+ .get("rid")
+ .unwrap()
+ .as_u64()
+ .unwrap()
+ .try_into()
+ .unwrap();
debug!("write rid={}", rid);
- let buf = bufs[0].clone();
- let resource_table = state.resource_table.clone();
-
poll_fn(move |cx| {
- let resource_table = &mut resource_table.borrow_mut();
+ let resource_table = &mut state.resource_table.borrow_mut();
let stream = resource_table
.get_mut::<TcpStream>(rid)
- .ok_or_else(bad_resource)?;
+ .ok_or_else(ErrBox::bad_resource_id)?;
Pin::new(stream)
- .poll_write(cx, &buf)?
+ .poll_write(cx, &bufs[0])?
.map(|nwritten| Ok(serde_json::json!({ "nwritten": nwritten })))
})
}
-fn bad_resource() -> ErrBox {
- Error::new(ErrorKind::NotFound, "bad resource id").into()
-}
-
fn main() {
log::set_logger(&Logger).unwrap();
log::set_max_level(
@@ -168,11 +187,11 @@ fn main() {
// NOTE: `--help` arg will display V8 help and exit
deno_core::v8_set_flags(env::args().collect());
- let isolate = isolate_new();
- let mut runtime = tokio::runtime::Builder::new()
+ let isolate = create_isolate();
+ let mut runtime = runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap();
- deno_core::js_check(runtime.block_on(isolate));
+ js_check(runtime.block_on(isolate));
}
diff --git a/core/flags.rs b/core/flags.rs
index 7c1f5c449..b69abab6f 100644
--- a/core/flags.rs
+++ b/core/flags.rs
@@ -1,4 +1,5 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
use rusty_v8 as v8;
/// Pass the command line arguments to v8.
/// Returns a vector of command line arguments that V8 did not understand.
diff --git a/core/lib.rs b/core/lib.rs
index d4a348f63..647c91aa4 100644
--- a/core/lib.rs
+++ b/core/lib.rs
@@ -8,6 +8,7 @@ extern crate lazy_static;
#[macro_use]
extern crate log;
+mod basic_state;
mod bindings;
mod core_isolate;
mod errors;
@@ -24,6 +25,7 @@ mod zero_copy_buf;
pub use rusty_v8 as v8;
+pub use crate::basic_state::BasicState;
pub use crate::core_isolate::js_check;
pub use crate::core_isolate::CoreIsolate;
pub use crate::core_isolate::CoreIsolateState;
@@ -32,6 +34,7 @@ pub use crate::core_isolate::HeapLimits;
pub use crate::core_isolate::Script;
pub use crate::core_isolate::Snapshot;
pub use crate::core_isolate::StartupData;
+pub use crate::errors::AnyError;
pub use crate::errors::ErrBox;
pub use crate::errors::JSError;
pub use crate::es_isolate::EsIsolate;
@@ -47,10 +50,13 @@ pub use crate::modules::ModuleSource;
pub use crate::modules::ModuleSourceFuture;
pub use crate::modules::RecursiveModuleLoad;
pub use crate::normalize_path::normalize_path;
-pub use crate::ops::Buf;
pub use crate::ops::Op;
pub use crate::ops::OpAsyncFuture;
+pub use crate::ops::OpFn;
pub use crate::ops::OpId;
+pub use crate::ops::OpRegistry;
+pub use crate::ops::OpRouter;
+pub use crate::ops::OpTable;
pub use crate::resources::ResourceTable;
pub use crate::zero_copy_buf::BufVec;
pub use crate::zero_copy_buf::ZeroCopyBuf;
diff --git a/core/modules.rs b/core/modules.rs
index 516440bc0..817e1f25e 100644
--- a/core/modules.rs
+++ b/core/modules.rs
@@ -8,6 +8,7 @@ use futures::future::FutureExt;
use futures::stream::FuturesUnordered;
use futures::stream::Stream;
use futures::stream::TryStreamExt;
+use serde_json::Value;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt;
@@ -481,7 +482,7 @@ impl Deps {
}
}
- pub fn to_json(&self) -> serde_json::Value {
+ pub fn to_json(&self) -> Value {
let children;
if let Some(deps) = &self.deps {
children = deps.iter().map(|c| c.to_json()).collect();
@@ -521,6 +522,7 @@ mod tests {
use super::*;
use crate::es_isolate::EsIsolate;
use crate::js_check;
+ use crate::BasicState;
use crate::StartupData;
use futures::future::FutureExt;
use std::error::Error;
@@ -535,15 +537,14 @@ mod tests {
// removed in the future.
use crate::core_isolate::tests::run_in_task;
+ #[derive(Default)]
struct MockLoader {
pub loads: Arc<Mutex<Vec<String>>>,
}
impl MockLoader {
- fn new() -> Self {
- Self {
- loads: Arc::new(Mutex::new(Vec::new())),
- }
+ fn new() -> Rc<Self> {
+ Default::default()
}
}
@@ -699,7 +700,8 @@ mod tests {
fn test_recursive_load() {
let loader = MockLoader::new();
let loads = loader.loads.clone();
- let mut isolate = EsIsolate::new(Rc::new(loader), StartupData::None, false);
+ let mut isolate =
+ EsIsolate::new(loader, BasicState::new(), StartupData::None, false);
let spec = ModuleSpecifier::resolve_url("file:///a.js").unwrap();
let a_id_fut = isolate.load_module(&spec, None);
let a_id = futures::executor::block_on(a_id_fut).expect("Failed to load");
@@ -761,7 +763,8 @@ mod tests {
fn test_circular_load() {
let loader = MockLoader::new();
let loads = loader.loads.clone();
- let mut isolate = EsIsolate::new(Rc::new(loader), StartupData::None, false);
+ let mut isolate =
+ EsIsolate::new(loader, BasicState::new(), StartupData::None, false);
let fut = async move {
let spec = ModuleSpecifier::resolve_url("file:///circular1.js").unwrap();
@@ -834,7 +837,8 @@ mod tests {
fn test_redirect_load() {
let loader = MockLoader::new();
let loads = loader.loads.clone();
- let mut isolate = EsIsolate::new(Rc::new(loader), StartupData::None, false);
+ let mut isolate =
+ EsIsolate::new(loader, BasicState::new(), StartupData::None, false);
let fut = async move {
let spec = ModuleSpecifier::resolve_url("file:///redirect1.js").unwrap();
@@ -899,7 +903,7 @@ mod tests {
let loader = MockLoader::new();
let loads = loader.loads.clone();
let mut isolate =
- EsIsolate::new(Rc::new(loader), StartupData::None, false);
+ EsIsolate::new(loader, BasicState::new(), StartupData::None, false);
let spec = ModuleSpecifier::resolve_url("file:///main.js").unwrap();
let mut recursive_load = isolate.load_module(&spec, None).boxed_local();
@@ -945,7 +949,7 @@ mod tests {
run_in_task(|mut cx| {
let loader = MockLoader::new();
let mut isolate =
- EsIsolate::new(Rc::new(loader), StartupData::None, false);
+ EsIsolate::new(loader, BasicState::new(), StartupData::None, false);
let spec = ModuleSpecifier::resolve_url("file:///bad_import.js").unwrap();
let mut load_fut = isolate.load_module(&spec, None).boxed_local();
let result = load_fut.poll_unpin(&mut cx);
@@ -973,7 +977,8 @@ mod tests {
fn recursive_load_main_with_code() {
let loader = MockLoader::new();
let loads = loader.loads.clone();
- let mut isolate = EsIsolate::new(Rc::new(loader), StartupData::None, false);
+ let mut isolate =
+ EsIsolate::new(loader, BasicState::new(), StartupData::None, false);
// In default resolution code should be empty.
// Instead we explicitly pass in our own code.
// The behavior should be very similar to /a.js.
diff --git a/core/ops.rs b/core/ops.rs
index 65a0f325b..838596dc0 100644
--- a/core/ops.rs
+++ b/core/ops.rs
@@ -1,179 +1,146 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use crate::core_isolate::CoreIsolateState;
+
+use crate::BufVec;
+use crate::ErrBox;
use crate::ZeroCopyBuf;
use futures::Future;
+use futures::FutureExt;
+use indexmap::IndexMap;
+use serde_json::json;
+use serde_json::Value;
use std::collections::HashMap;
+use std::iter::once;
+use std::ops::Deref;
+use std::ops::DerefMut;
use std::pin::Pin;
use std::rc::Rc;
-pub type OpId = u32;
-
-pub type Buf = Box<[u8]>;
-
-pub type OpAsyncFuture = Pin<Box<dyn Future<Output = Buf>>>;
+pub type OpAsyncFuture = Pin<Box<dyn Future<Output = Box<[u8]>>>>;
+pub type OpFn<S> = dyn Fn(Rc<S>, BufVec) -> Op + 'static;
+pub type OpId = usize;
pub enum Op {
- Sync(Buf),
+ Sync(Box<[u8]>),
Async(OpAsyncFuture),
/// AsyncUnref is the variation of Async, which doesn't block the program
/// exiting.
AsyncUnref(OpAsyncFuture),
+ NotFound,
}
-/// Main type describing op
-pub type OpDispatcher =
- dyn Fn(&mut CoreIsolateState, &mut [ZeroCopyBuf]) -> Op + 'static;
-
-#[derive(Default)]
-pub struct OpRegistry {
- dispatchers: Vec<Rc<OpDispatcher>>,
- name_to_id: HashMap<String, OpId>,
+pub trait OpRouter {
+ fn route_op(self: Rc<Self>, op_id: OpId, bufs: BufVec) -> Op;
}
-impl OpRegistry {
- pub fn new() -> Self {
- let mut registry = Self::default();
- let op_id = registry.register("ops", |state, _| {
- let buf = state.op_registry.json_map();
+pub trait OpRegistry: OpRouter + 'static {
+ fn get_op_catalog(self: Rc<Self>) -> HashMap<String, OpId>;
+
+ fn register_op<F>(&self, name: &str, op_fn: F) -> OpId
+ where
+ F: Fn(Rc<Self>, BufVec) -> Op + 'static;
+
+ fn register_op_json_sync<F>(self: &Rc<Self>, name: &str, op_fn: F) -> OpId
+ where
+ F: Fn(&Self, Value, &mut [ZeroCopyBuf]) -> Result<Value, ErrBox> + 'static,
+ {
+ let base_op_fn = move |state: Rc<Self>, mut bufs: BufVec| -> Op {
+ let result = serde_json::from_slice(&bufs[0])
+ .map_err(ErrBox::from)
+ .and_then(|args| op_fn(&state, args, &mut bufs[1..]));
+ let buf = state.json_serialize_op_result(None, result);
Op::Sync(buf)
- });
- assert_eq!(op_id, 0);
- registry
+ };
+
+ self.register_op(name, base_op_fn)
}
- pub fn register<F>(&mut self, name: &str, op: F) -> OpId
+ fn register_op_json_async<F, R>(self: &Rc<Self>, name: &str, op_fn: F) -> OpId
where
- F: Fn(&mut CoreIsolateState, &mut [ZeroCopyBuf]) -> Op + 'static,
+ F: Fn(Rc<Self>, Value, BufVec) -> R + 'static,
+ R: Future<Output = Result<Value, ErrBox>> + 'static,
{
- let op_id = self.dispatchers.len() as u32;
-
- let existing = self.name_to_id.insert(name.to_string(), op_id);
- assert!(
- existing.is_none(),
- format!("Op already registered: {}", name)
- );
- self.dispatchers.push(Rc::new(op));
- op_id
- }
+ let try_dispatch_op = move |state: Rc<Self>,
+ bufs: BufVec|
+ -> Result<Op, ErrBox> {
+ let args: Value = serde_json::from_slice(&bufs[0])?;
+ let promise_id = args
+ .get("promiseId")
+ .and_then(Value::as_u64)
+ .ok_or_else(|| ErrBox::type_error("missing or invalid `promiseId`"))?;
+ let bufs = bufs[1..].into();
+ let fut = op_fn(state.clone(), args, bufs).map(move |result| {
+ state.json_serialize_op_result(Some(promise_id), result)
+ });
+ Ok(Op::Async(Box::pin(fut)))
+ };
- fn json_map(&self) -> Buf {
- let op_map_json = serde_json::to_string(&self.name_to_id).unwrap();
- op_map_json.as_bytes().to_owned().into_boxed_slice()
- }
+ let base_op_fn = move |state: Rc<Self>, bufs: BufVec| -> Op {
+ match try_dispatch_op(state.clone(), bufs) {
+ Ok(op) => op,
+ Err(err) => Op::Sync(state.json_serialize_op_result(None, Err(err))),
+ }
+ };
- pub fn get(&self, op_id: OpId) -> Option<Rc<OpDispatcher>> {
- self.dispatchers.get(op_id as usize).map(Rc::clone)
+ self.register_op(name, base_op_fn)
}
- pub fn unregister_op(&mut self, name: &str) {
- let id = self.name_to_id.remove(name).unwrap();
- drop(self.dispatchers.remove(id as usize));
+ fn json_serialize_op_result(
+ &self,
+ promise_id: Option<u64>,
+ result: Result<Value, ErrBox>,
+ ) -> Box<[u8]> {
+ let value = match result {
+ Ok(v) => json!({ "ok": v, "promiseId": promise_id }),
+ Err(err) => json!({
+ "promiseId": promise_id ,
+ "err": {
+ "className": self.get_error_class_name(&err),
+ "message": err.to_string(),
+ }
+ }),
+ };
+ serde_json::to_vec(&value).unwrap().into_boxed_slice()
}
-}
-#[test]
-fn test_op_registry() {
- use crate::CoreIsolate;
- use std::sync::atomic;
- use std::sync::Arc;
- let mut op_registry = OpRegistry::new();
-
- let c = Arc::new(atomic::AtomicUsize::new(0));
- let c_ = c.clone();
-
- let test_id = op_registry.register("test", move |_, _| {
- c_.fetch_add(1, atomic::Ordering::SeqCst);
- Op::Sync(Box::new([]))
- });
- assert!(test_id != 0);
-
- let mut expected = HashMap::new();
- expected.insert("ops".to_string(), 0);
- expected.insert("test".to_string(), 1);
- assert_eq!(op_registry.name_to_id, expected);
-
- let isolate = CoreIsolate::new(crate::StartupData::None, false);
-
- let dispatch = op_registry.get(test_id).unwrap();
- let state_rc = CoreIsolate::state(&isolate);
- let mut state = state_rc.borrow_mut();
- let res = dispatch(&mut state, &mut []);
- if let Op::Sync(buf) = res {
- assert_eq!(buf.len(), 0);
- } else {
- unreachable!();
+ fn get_error_class_name(&self, _err: &ErrBox) -> &'static str {
+ "Error"
}
- assert_eq!(c.load(atomic::Ordering::SeqCst), 1);
-
- assert!(op_registry.get(100).is_none());
- op_registry.unregister_op("test");
- expected.remove("test");
- assert_eq!(op_registry.name_to_id, expected);
- assert!(op_registry.get(1).is_none());
}
-#[test]
-fn register_op_during_call() {
- use crate::CoreIsolate;
- use std::sync::atomic;
- use std::sync::Arc;
- use std::sync::Mutex;
- let op_registry = Arc::new(Mutex::new(OpRegistry::new()));
-
- let c = Arc::new(atomic::AtomicUsize::new(0));
- let c_ = c.clone();
-
- let op_registry_ = op_registry.clone();
-
- let test_id = {
- let mut g = op_registry.lock().unwrap();
- g.register("dynamic_register_op", move |_, _| {
- let c__ = c_.clone();
- let mut g = op_registry_.lock().unwrap();
- g.register("test", move |_, _| {
- c__.fetch_add(1, atomic::Ordering::SeqCst);
- Op::Sync(Box::new([]))
- });
- Op::Sync(Box::new([]))
- })
- };
- assert!(test_id != 0);
+/// Collection for storing registered ops. The special 'get_op_catalog'
+/// op with OpId `0` is automatically added when the OpTable is created.
+pub struct OpTable<S>(IndexMap<String, Rc<OpFn<S>>>);
- let isolate = CoreIsolate::new(crate::StartupData::None, false);
+impl<S: OpRegistry> OpTable<S> {
+ pub fn get_op_catalog(&self) -> HashMap<String, OpId> {
+ self.keys().cloned().zip(0..).collect()
+ }
- let dispatcher1 = {
- let g = op_registry.lock().unwrap();
- g.get(test_id).unwrap()
- };
- {
- let state_rc = CoreIsolate::state(&isolate);
- let mut state = state_rc.borrow_mut();
- dispatcher1(&mut state, &mut []);
+ fn op_get_op_catalog(state: Rc<S>, _bufs: BufVec) -> Op {
+ let ops = state.get_op_catalog();
+ let buf = serde_json::to_vec(&ops).map(Into::into).unwrap();
+ Op::Sync(buf)
}
+}
- let mut expected = HashMap::new();
- expected.insert("ops".to_string(), 0);
- expected.insert("dynamic_register_op".to_string(), 1);
- expected.insert("test".to_string(), 2);
- {
- let g = op_registry.lock().unwrap();
- assert_eq!(g.name_to_id, expected);
+impl<S: OpRegistry> Default for OpTable<S> {
+ fn default() -> Self {
+ Self(
+ once(("ops".to_owned(), Rc::new(Self::op_get_op_catalog) as _)).collect(),
+ )
}
+}
- let dispatcher2 = {
- let g = op_registry.lock().unwrap();
- g.get(2).unwrap()
- };
- let state_rc = CoreIsolate::state(&isolate);
- let mut state = state_rc.borrow_mut();
- let res = dispatcher2(&mut state, &mut []);
- if let Op::Sync(buf) = res {
- assert_eq!(buf.len(), 0);
- } else {
- unreachable!();
+impl<S> Deref for OpTable<S> {
+ type Target = IndexMap<String, Rc<OpFn<S>>>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
}
- assert_eq!(c.load(atomic::Ordering::SeqCst), 1);
+}
- let g = op_registry.lock().unwrap();
- assert!(g.get(100).is_none());
+impl<S> DerefMut for OpTable<S> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
+ }
}
diff --git a/core/plugin_api.rs b/core/plugin_api.rs
index 0cb9acaeb..d57a5b3b5 100644
--- a/core/plugin_api.rs
+++ b/core/plugin_api.rs
@@ -8,7 +8,6 @@
// shared library itself, which would cause segfaults when the plugin is
// unloaded and all functions in the plugin library are unmapped from memory.
-pub use crate::Buf;
pub use crate::Op;
pub use crate::OpId;
pub use crate::ZeroCopyBuf;
diff --git a/core/shared_queue.rs b/core/shared_queue.rs
index f35fff012..e8ac30ebc 100644
--- a/core/shared_queue.rs
+++ b/core/shared_queue.rs
@@ -19,6 +19,7 @@ SharedQueue Binary Layout
use crate::bindings;
use crate::ops::OpId;
use rusty_v8 as v8;
+use std::convert::TryInto;
const MAX_RECORDS: usize = 100;
/// Total number of records added.
@@ -121,7 +122,7 @@ impl SharedQueue {
fn set_meta(&mut self, index: usize, end: usize, op_id: OpId) {
let s = self.as_u32_slice_mut();
s[INDEX_OFFSETS + 2 * index] = end as u32;
- s[INDEX_OFFSETS + 2 * index + 1] = op_id;
+ s[INDEX_OFFSETS + 2 * index + 1] = op_id.try_into().unwrap();
}
#[cfg(test)]
@@ -129,7 +130,7 @@ impl SharedQueue {
if index < self.num_records() {
let s = self.as_u32_slice();
let end = s[INDEX_OFFSETS + 2 * index] as usize;
- let op_id = s[INDEX_OFFSETS + 2 * index + 1];
+ let op_id = s[INDEX_OFFSETS + 2 * index + 1] as OpId;
Some((op_id, end))
} else {
None
@@ -218,7 +219,6 @@ impl SharedQueue {
#[cfg(test)]
mod tests {
use super::*;
- use crate::ops::Buf;
#[test]
fn basic() {
@@ -262,7 +262,7 @@ mod tests {
assert_eq!(q.size(), 0);
}
- fn alloc_buf(byte_length: usize) -> Buf {
+ fn alloc_buf(byte_length: usize) -> Box<[u8]> {
let mut v = Vec::new();
v.resize(byte_length, 0);
v.into_boxed_slice()