summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorValentin Anger <syrupthinker@gryphno.de>2020-06-01 20:20:47 +0200
committerGitHub <noreply@github.com>2020-06-01 14:20:47 -0400
commitbecbb56b19e96e4dd72b861217a855fba953d290 (patch)
treed9e99771c537ef87a4a945f0120775c337ef90aa /core
parent12d741c2fe453625d510313beaa2e1c282784c00 (diff)
feat(core): Ops can take several zero copy buffers (#4788)
Diffstat (limited to 'core')
-rw-r--r--core/bindings.rs45
-rw-r--r--core/core.js8
-rw-r--r--core/core_isolate.rs65
-rw-r--r--core/es_isolate.rs2
-rw-r--r--core/examples/http_bench.js8
-rw-r--r--core/examples/http_bench.rs30
-rw-r--r--core/ops.rs10
-rw-r--r--core/plugin_api.rs3
-rw-r--r--core/zero_copy_buf.rs1
9 files changed, 130 insertions, 42 deletions
diff --git a/core/bindings.rs b/core/bindings.rs
index 7bcb99a38..b6390073a 100644
--- a/core/bindings.rs
+++ b/core/bindings.rs
@@ -452,17 +452,50 @@ fn send(
Err(_) => &[],
};
- let zero_copy: Option<ZeroCopyBuf> =
- v8::Local::<v8::ArrayBufferView>::try_from(args.get(2))
- .map(ZeroCopyBuf::new)
- .ok();
-
let state_rc = CoreIsolate::state(scope.isolate());
let mut state = state_rc.borrow_mut();
assert!(!state.global_context.is_empty());
+ let mut buf_iter = (2..args.length()).map(|idx| {
+ v8::Local::<v8::ArrayBufferView>::try_from(args.get(idx))
+ .map(ZeroCopyBuf::new)
+ .map_err(|err| {
+ let msg = format!("Invalid argument at position {}: {}", idx, err);
+ let msg = v8::String::new(scope, &msg).unwrap();
+ v8::Exception::type_error(scope, msg)
+ })
+ });
+
+ let mut buf_one: ZeroCopyBuf;
+ let mut buf_vec: Vec<ZeroCopyBuf>;
+
+ // Collect all ArrayBufferView's
+ let buf_iter_result = match buf_iter.len() {
+ 0 => Ok(&mut [][..]),
+ 1 => match buf_iter.next().unwrap() {
+ Ok(buf) => {
+ buf_one = buf;
+ Ok(std::slice::from_mut(&mut buf_one))
+ }
+ Err(err) => Err(err),
+ },
+ _ => match buf_iter.collect::<Result<Vec<_>, _>>() {
+ Ok(v) => {
+ buf_vec = v;
+ Ok(&mut buf_vec[..])
+ }
+ Err(err) => Err(err),
+ },
+ };
+
// If response is empty then it's either async op or exception was thrown
- let maybe_response = state.dispatch_op(scope, op_id, control, zero_copy);
+ let maybe_response = match buf_iter_result {
+ Ok(bufs) => state.dispatch_op(scope, op_id, control, bufs),
+ Err(exc) => {
+ scope.isolate().throw_exception(exc);
+ return;
+ }
+ };
if let Some(response) = maybe_response {
// Synchronous response.
diff --git a/core/core.js b/core/core.js
index 4c6f708bb..23cc325ab 100644
--- a/core/core.js
+++ b/core/core.js
@@ -59,7 +59,7 @@ SharedQueue Binary Layout
function ops() {
// op id 0 is a special value to retrieve the map of registered ops.
- const opsMapBytes = send(0, new Uint8Array([]), null);
+ const opsMapBytes = send(0, new Uint8Array([]));
const opsMapJson = String.fromCharCode.apply(null, opsMapBytes);
return JSON.parse(opsMapJson);
}
@@ -181,13 +181,9 @@ SharedQueue Binary Layout
}
}
- function dispatch(opId, control, zeroCopy = null) {
- return send(opId, control, zeroCopy);
- }
-
Object.assign(window.Deno.core, {
setAsyncHandler,
- dispatch,
+ dispatch: send,
ops,
// sharedQueue is private but exposed for testing.
sharedQueue: {
diff --git a/core/core_isolate.rs b/core/core_isolate.rs
index dff887ab3..5ddc6571b 100644
--- a/core/core_isolate.rs
+++ b/core/core_isolate.rs
@@ -366,7 +366,7 @@ impl CoreIsolate {
/// Requires runtime to explicitly ask for op ids before using any of the ops.
pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId
where
- F: Fn(&mut CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op + 'static,
+ F: Fn(&mut CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op + 'static,
{
let state_rc = Self::state(self);
let mut state = state_rc.borrow_mut();
@@ -484,7 +484,7 @@ impl CoreIsolateState {
/// Requires runtime to explicitly ask for op ids before using any of the ops.
pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId
where
- F: Fn(&mut CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op + 'static,
+ F: Fn(&mut CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op + 'static,
{
self.op_registry.register(name, op)
}
@@ -504,10 +504,10 @@ impl CoreIsolateState {
scope: &mut impl v8::ToLocal<'s>,
op_id: OpId,
control_buf: &[u8],
- zero_copy_buf: Option<ZeroCopyBuf>,
+ zero_copy_bufs: &mut [ZeroCopyBuf],
) -> Option<(OpId, Box<[u8]>)> {
let op = if let Some(dispatcher) = self.op_registry.get(op_id) {
- dispatcher(self, control_buf, zero_copy_buf)
+ dispatcher(self, control_buf, zero_copy_bufs)
} else {
let message =
v8::String::new(scope, &format!("Unknown op id: {}", op_id)).unwrap();
@@ -718,6 +718,7 @@ pub mod tests {
pub enum Mode {
Async,
AsyncUnref,
+ AsyncZeroCopy(u8),
OverflowReqSync,
OverflowResSync,
OverflowReqAsync,
@@ -732,7 +733,7 @@ pub mod tests {
let dispatcher = move |_state: &mut CoreIsolateState,
control: &[u8],
- _zero_copy: Option<ZeroCopyBuf>|
+ zero_copy: &mut [ZeroCopyBuf]|
-> Op {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
match mode {
@@ -752,6 +753,18 @@ pub mod tests {
};
Op::AsyncUnref(fut.boxed())
}
+ Mode::AsyncZeroCopy(count) => {
+ assert_eq!(control.len(), 1);
+ assert_eq!(control[0], 24);
+ assert_eq!(zero_copy.len(), count as usize);
+ zero_copy.iter().enumerate().for_each(|(idx, buf)| {
+ assert_eq!(buf.len(), 1);
+ assert_eq!(idx, buf[0] as usize);
+ });
+
+ let buf = vec![43u8].into_boxed_slice();
+ Op::Async(futures::future::ready(buf).boxed())
+ }
Mode::OverflowReqSync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
@@ -817,6 +830,48 @@ pub mod tests {
}
#[test]
+ fn test_dispatch_no_zero_copy_buf() {
+ let (mut isolate, dispatch_count) = setup(Mode::AsyncZeroCopy(0));
+ js_check(isolate.execute(
+ "filename.js",
+ r#"
+ let control = new Uint8Array([24]);
+ Deno.core.send(1, control);
+ "#,
+ ));
+ assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
+ }
+
+ #[test]
+ fn test_dispatch_one_zero_copy_buf() {
+ let (mut isolate, dispatch_count) = setup(Mode::AsyncZeroCopy(1));
+ js_check(isolate.execute(
+ "filename.js",
+ r#"
+ let control = new Uint8Array([24]);
+ let zero_copy = new Uint8Array([0]);
+ Deno.core.send(1, control, zero_copy);
+ "#,
+ ));
+ assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
+ }
+
+ #[test]
+ fn test_dispatch_two_zero_copy_bufs() {
+ let (mut isolate, dispatch_count) = setup(Mode::AsyncZeroCopy(2));
+ js_check(isolate.execute(
+ "filename.js",
+ r#"
+ let control = new Uint8Array([24]);
+ let zero_copy_a = new Uint8Array([0]);
+ let zero_copy_b = new Uint8Array([1]);
+ Deno.core.send(1, control, zero_copy_a, zero_copy_b);
+ "#,
+ ));
+ assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
+ }
+
+ #[test]
fn test_poll_async_delayed_ops() {
run_in_task(|cx| {
let (mut isolate, dispatch_count) = setup(Mode::Async);
diff --git a/core/es_isolate.rs b/core/es_isolate.rs
index a23af43d7..35cf177f8 100644
--- a/core/es_isolate.rs
+++ b/core/es_isolate.rs
@@ -712,7 +712,7 @@ pub mod tests {
let dispatcher = move |_state: &mut CoreIsolateState,
control: &[u8],
- _zero_copy: Option<ZeroCopyBuf>|
+ _zero_copy: &mut [ZeroCopyBuf]|
-> Op {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
assert_eq!(control.len(), 1);
diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js
index d9878cbe7..a893dab40 100644
--- a/core/examples/http_bench.js
+++ b/core/examples/http_bench.js
@@ -36,18 +36,18 @@ const scratchBytes = new Uint8Array(
);
assert(scratchBytes.byteLength === 3 * 4);
-function send(promiseId, opId, rid, zeroCopy = null) {
+function send(promiseId, opId, rid, ...zeroCopy) {
scratch32[0] = promiseId;
scratch32[1] = rid;
scratch32[2] = -1;
- return Deno.core.dispatch(opId, scratchBytes, zeroCopy);
+ return Deno.core.dispatch(opId, scratchBytes, ...zeroCopy);
}
/** Returns Promise<number> */
-function sendAsync(opId, rid, zeroCopy = null) {
+function sendAsync(opId, rid, ...zeroCopy) {
const promiseId = nextPromiseId++;
const p = createResolvable();
- const buf = send(promiseId, opId, rid, zeroCopy);
+ const buf = send(promiseId, opId, rid, ...zeroCopy);
if (buf) {
const record = recordFromBuf(buf);
// Sync result.
diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs
index a52f69fcb..233864fac 100644
--- a/core/examples/http_bench.rs
+++ b/core/examples/http_bench.rs
@@ -113,19 +113,19 @@ impl Isolate {
fn register_sync_op<F>(&mut self, name: &'static str, handler: F)
where
- F: 'static + Fn(State, u32, Option<ZeroCopyBuf>) -> Result<u32, Error>,
+ F: 'static + Fn(State, u32, &mut [ZeroCopyBuf]) -> Result<u32, Error>,
{
let state = self.state.clone();
let core_handler = move |_isolate_state: &mut CoreIsolateState,
control_buf: &[u8],
- zero_copy_buf: Option<ZeroCopyBuf>|
+ zero_copy_bufs: &mut [ZeroCopyBuf]|
-> Op {
let state = state.clone();
let record = Record::from(control_buf);
let is_sync = record.promise_id == 0;
assert!(is_sync);
- let result: i32 = match handler(state, record.rid, zero_copy_buf) {
+ let result: i32 = match handler(state, record.rid, zero_copy_bufs) {
Ok(r) => r as i32,
Err(_) => -1,
};
@@ -139,7 +139,7 @@ impl Isolate {
fn register_op<F>(
&mut self,
name: &'static str,
- handler: impl Fn(State, u32, Option<ZeroCopyBuf>) -> F + Copy + 'static,
+ handler: impl Fn(State, u32, &mut [ZeroCopyBuf]) -> F + Copy + 'static,
) where
F: TryFuture,
F::Ok: TryInto<i32>,
@@ -148,15 +148,16 @@ impl Isolate {
let state = self.state.clone();
let core_handler = move |_isolate_state: &mut CoreIsolateState,
control_buf: &[u8],
- zero_copy_buf: Option<ZeroCopyBuf>|
+ zero_copy_bufs: &mut [ZeroCopyBuf]|
-> Op {
let state = state.clone();
let record = Record::from(control_buf);
let is_sync = record.promise_id == 0;
assert!(!is_sync);
+ let mut zero_copy = zero_copy_bufs.to_vec();
let fut = async move {
- let op = handler(state, record.rid, zero_copy_buf);
+ let op = handler(state, record.rid, &mut zero_copy);
let result = op
.map_ok(|r| r.try_into().expect("op result does not fit in i32"))
.unwrap_or_else(|_| -1)
@@ -182,7 +183,7 @@ impl Future for Isolate {
fn op_close(
state: State,
rid: u32,
- _buf: Option<ZeroCopyBuf>,
+ _buf: &mut [ZeroCopyBuf],
) -> Result<u32, Error> {
debug!("close rid={}", rid);
let resource_table = &mut state.borrow_mut().resource_table;
@@ -195,7 +196,7 @@ fn op_close(
fn op_listen(
state: State,
_rid: u32,
- _buf: Option<ZeroCopyBuf>,
+ _buf: &mut [ZeroCopyBuf],
) -> Result<u32, Error> {
debug!("listen");
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
@@ -209,7 +210,7 @@ fn op_listen(
fn op_accept(
state: State,
rid: u32,
- _buf: Option<ZeroCopyBuf>,
+ _buf: &mut [ZeroCopyBuf],
) -> impl TryFuture<Ok = u32, Error = Error> {
debug!("accept rid={}", rid);
@@ -227,9 +228,11 @@ fn op_accept(
fn op_read(
state: State,
rid: u32,
- buf: Option<ZeroCopyBuf>,
+ bufs: &mut [ZeroCopyBuf],
) -> impl TryFuture<Ok = usize, Error = Error> {
- let mut buf = buf.unwrap();
+ assert_eq!(bufs.len(), 1, "Invalid number of arguments");
+ let mut buf = bufs[0].clone();
+
debug!("read rid={}", rid);
poll_fn(move |cx| {
@@ -244,9 +247,10 @@ fn op_read(
fn op_write(
state: State,
rid: u32,
- buf: Option<ZeroCopyBuf>,
+ bufs: &mut [ZeroCopyBuf],
) -> impl TryFuture<Ok = usize, Error = Error> {
- let buf = buf.unwrap();
+ assert_eq!(bufs.len(), 1, "Invalid number of arguments");
+ let buf = bufs[0].clone();
debug!("write rid={}", rid);
poll_fn(move |cx| {
diff --git a/core/ops.rs b/core/ops.rs
index ecece7355..bd9d58283 100644
--- a/core/ops.rs
+++ b/core/ops.rs
@@ -22,7 +22,7 @@ pub enum Op {
/// Main type describing op
pub type OpDispatcher =
- dyn Fn(&mut CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op + 'static;
+ dyn Fn(&mut CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op + 'static;
#[derive(Default)]
pub struct OpRegistry {
@@ -43,7 +43,7 @@ impl OpRegistry {
pub fn register<F>(&mut self, name: &str, op: F) -> OpId
where
- F: Fn(&mut CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op + 'static,
+ F: Fn(&mut CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op + 'static,
{
let op_id = self.dispatchers.len() as u32;
@@ -92,7 +92,7 @@ fn test_op_registry() {
let dispatch = op_registry.get(test_id).unwrap();
let state_rc = CoreIsolate::state(&isolate);
let mut state = state_rc.borrow_mut();
- let res = dispatch(&mut state, &[], None);
+ let res = dispatch(&mut state, &[], &mut []);
if let Op::Sync(buf) = res {
assert_eq!(buf.len(), 0);
} else {
@@ -139,7 +139,7 @@ fn register_op_during_call() {
{
let state_rc = CoreIsolate::state(&isolate);
let mut state = state_rc.borrow_mut();
- dispatcher1(&mut state, &[], None);
+ dispatcher1(&mut state, &[], &mut []);
}
let mut expected = HashMap::new();
@@ -157,7 +157,7 @@ fn register_op_during_call() {
};
let state_rc = CoreIsolate::state(&isolate);
let mut state = state_rc.borrow_mut();
- let res = dispatcher2(&mut state, &[], None);
+ let res = dispatcher2(&mut state, &[], &mut []);
if let Op::Sync(buf) = res {
assert_eq!(buf.len(), 0);
} else {
diff --git a/core/plugin_api.rs b/core/plugin_api.rs
index 2e93fdb77..16f5d4a36 100644
--- a/core/plugin_api.rs
+++ b/core/plugin_api.rs
@@ -15,8 +15,7 @@ pub use crate::ZeroCopyBuf;
pub type InitFn = fn(&mut dyn Interface);
-pub type DispatchOpFn =
- fn(&mut dyn Interface, &[u8], Option<ZeroCopyBuf>) -> Op;
+pub type DispatchOpFn = fn(&mut dyn Interface, &[u8], &mut [ZeroCopyBuf]) -> Op;
pub trait Interface {
fn register_op(&mut self, name: &str, dispatcher: DispatchOpFn) -> OpId;
diff --git a/core/zero_copy_buf.rs b/core/zero_copy_buf.rs
index b10c14045..25c468ffe 100644
--- a/core/zero_copy_buf.rs
+++ b/core/zero_copy_buf.rs
@@ -8,6 +8,7 @@ use std::ops::DerefMut;
/// but the existence of a ZeroCopyBuf inhibits this until it is dropped. It
/// behaves much like an Arc<[u8]>, although a ZeroCopyBuf currently can't be
/// cloned.
+#[derive(Clone)]
pub struct ZeroCopyBuf {
backing_store: v8::SharedRef<v8::BackingStore>,
byte_offset: usize,